Messaging
Luxis supports event publishing with two modes that share the same call site:
- Inside
inTransaction,ctx.publisher().publish(...)is void. Events are buffered, batch-inserted into your outbox table immediately before commit (in the same transaction as your business writes), and dispatched after commit succeeds. If the outbox insert fails, the whole transaction rolls back. - Outside a transaction,
ctx.publisher().publish(...)returnsLuxisAsync<Void, ERR>. Luxis opens its own short-lived transaction, appends the event to yourOutboxStore, commits, then kicks the drainer. The returned future resolves once the row is durably appended; broker delivery happens on the drain pass.
The drainer that reads the outbox and pushes to your broker is owned by Luxis — you provide the OutboxStore (how to read/append/mark) and Publisher (how to send).
Inside a transaction
Section titled “Inside a transaction”stream .inTransaction(tx -> tx .asyncPeek(ctx -> { ctx.publisher().publish("orders", "{...}"); // String ctx.publisher().publish("orders", payloadBytes); // byte[] ctx.publisher().publish("orders", buffer); // ByteBuffer return ctx.db().update("insert into orders ..."); }) .commit()) .complete(...);publish returns void. Events are held in memory until commit time. Order is preserved.
Outside a transaction
Section titled “Outside a transaction”stream.<Void>asyncMap(ctx -> ctx.publisher().publish("topic", "ping")).complete(...);Same call shape; this version returns LuxisAsync<Void, ERR> so you can chain or await. The returned future completes when the outbox row is committed — not when the broker accepts — so delivery is at-least-once just like the transactional path.
Publisher
Section titled “Publisher”public interface Publisher { Future<Void> publish(List<PendingOutboxEvent> events);}Each PendingOutboxEvent is (long id, OutboxEvent event). The id is the persisted outbox row id — propagate it to your broker as a stable dedup key (e.g. a Kafka header x-luxis-event-id) so consumers can drop duplicates produced when a drain pass retries the whole batch.
Treated as all-or-nothing: if the returned future fails, Luxis assumes the entire batch failed and retries the whole batch on the next drain pass. Implementations should fail the future on any partial failure rather than reporting “some sent”. Inside the batch, each OutboxEvent carries a sealed Payload — Str, Bytes, or Buf — so a single batch can mix payload types; the implementation matches on the variant to dispatch each event.
The non-transactional ctx.publisher().publish("key", value) overloads (String, byte[], ByteBuffer) append a single-row batch through the outbox; the Publisher always sees events with real persisted ids regardless of which call site emitted them.
OutboxStore
Section titled “OutboxStore”public interface OutboxStore<TX> { Future<Void> append(TX tx, List<OutboxEvent> events); Future<List<PendingOutboxEvent>> readPending(int limit); Future<Void> markBatchSent(List<Long> ids); default int batchSize() { return 100; } default long pollIntervalMillis() { return 1000L; } default boolean drainerEnabled() { return true; }}Override drainerEnabled() to return false if you want to drain the outbox yourself (e.g. via Debezium or a separate worker process). append still runs pre-commit; Luxis just won’t call readPending / markBatchSent or schedule the periodic tick.
append runs inside the transaction (Luxis hands back the TX token from your DatabaseClient), so outbox rows commit atomically with your business data. readPending and markSent are called by the Luxis-owned drainer.
OutboxEvent is a sealed wrapper with three payload variants (Str, Bytes, Buf) so a single batch can carry mixed payload types.
Registration
Section titled “Registration”Luxis.start(routes, config, databaseClient, publisher, outboxStore);When all three are registered, Luxis starts a periodic drainer (pollIntervalMillis, batchSize) and kicks it after every successful commit so events ship as soon as possible.
If you don’t register a Publisher + OutboxStore, calls to ctx.publisher().publish(...) fail with a clear IllegalStateException — no silent dropping.
What Luxis does for you
Section titled “What Luxis does for you”- Buffers in-tx events in memory.
- Calls
OutboxStore.append(tx, events)immediately beforeDatabaseClient.commit(tx). - Rolls back if either append or commit fails (events are lost together with the data — atomic outcome).
- Owns a periodic drainer (one per Luxis instance, started on
Luxis.start, stopped onclose). - Re-entrancy guard: only one drain pass at a time per instance.
- Kicks the drainer after every commit.
What’s yours
Section titled “What’s yours”- Designing the outbox table (Luxis doesn’t prescribe a schema — your
OutboxStoreknows how to read/write it). - Deduplication / idempotency on the consumer side. The drainer retries on the next tick when
Publisher.publishfails, so your broker may see duplicates. - Backoff and dead-lettering policy if you want them — wrap your
PublisherandOutboxStoreaccordingly.
Error semantics
Section titled “Error semantics”| When | What happens |
|---|---|
OutboxStore.append / DatabaseClient.commit fails on the non-tx path | Surfaces as LuxisAsync failure to the caller; transaction is rolled back. |
Publisher.publish fails outside tx | Caller does not see this directly — it occurs on the drainer pass after LuxisAsync has already completed. The exception is forwarded to the configured handler and the batch is retried. |
ctx.publisher() inside tx with no OutboxStore registered | Sub-chain throws IllegalStateException → rollback → exceptionHandler invoked. |
OutboxStore.append fails | Rollback → exceptionHandler invoked. |
Publisher.publish fails during a drain pass | Whole batch stays pending; next tick retries the entire batch. Exception forwarded to the configured handler. |
OutboxStore.markBatchSent fails | Logged via exceptionHandler; the batch may be re-published on the next tick. Plan for at-least-once on the consumer side. |