Skip to content

Messaging

Luxis supports event publishing with two modes that share the same call site:

  • Inside inTransaction, ctx.publisher().publish(...) is void. Events are buffered, batch-inserted into your outbox table immediately before commit (in the same transaction as your business writes), and dispatched after commit succeeds. If the outbox insert fails, the whole transaction rolls back.
  • Outside a transaction, ctx.publisher().publish(...) returns LuxisAsync<Void, ERR>. Luxis opens its own short-lived transaction, appends the event to your OutboxStore, commits, then kicks the drainer. The returned future resolves once the row is durably appended; broker delivery happens on the drain pass.

The drainer that reads the outbox and pushes to your broker is owned by Luxis — you provide the OutboxStore (how to read/append/mark) and Publisher (how to send).

stream
.inTransaction(tx -> tx
.asyncPeek(ctx -> {
ctx.publisher().publish("orders", "{...}"); // String
ctx.publisher().publish("orders", payloadBytes); // byte[]
ctx.publisher().publish("orders", buffer); // ByteBuffer
return ctx.db().update("insert into orders ...");
})
.commit())
.complete(...);

publish returns void. Events are held in memory until commit time. Order is preserved.

stream.<Void>asyncMap(ctx ->
ctx.publisher().publish("topic", "ping"))
.complete(...);

Same call shape; this version returns LuxisAsync<Void, ERR> so you can chain or await. The returned future completes when the outbox row is committed — not when the broker accepts — so delivery is at-least-once just like the transactional path.

public interface Publisher {
Future<Void> publish(List<PendingOutboxEvent> events);
}

Each PendingOutboxEvent is (long id, OutboxEvent event). The id is the persisted outbox row id — propagate it to your broker as a stable dedup key (e.g. a Kafka header x-luxis-event-id) so consumers can drop duplicates produced when a drain pass retries the whole batch.

Treated as all-or-nothing: if the returned future fails, Luxis assumes the entire batch failed and retries the whole batch on the next drain pass. Implementations should fail the future on any partial failure rather than reporting “some sent”. Inside the batch, each OutboxEvent carries a sealed PayloadStr, Bytes, or Buf — so a single batch can mix payload types; the implementation matches on the variant to dispatch each event.

The non-transactional ctx.publisher().publish("key", value) overloads (String, byte[], ByteBuffer) append a single-row batch through the outbox; the Publisher always sees events with real persisted ids regardless of which call site emitted them.

public interface OutboxStore<TX> {
Future<Void> append(TX tx, List<OutboxEvent> events);
Future<List<PendingOutboxEvent>> readPending(int limit);
Future<Void> markBatchSent(List<Long> ids);
default int batchSize() { return 100; }
default long pollIntervalMillis() { return 1000L; }
default boolean drainerEnabled() { return true; }
}

Override drainerEnabled() to return false if you want to drain the outbox yourself (e.g. via Debezium or a separate worker process). append still runs pre-commit; Luxis just won’t call readPending / markBatchSent or schedule the periodic tick.

append runs inside the transaction (Luxis hands back the TX token from your DatabaseClient), so outbox rows commit atomically with your business data. readPending and markSent are called by the Luxis-owned drainer.

OutboxEvent is a sealed wrapper with three payload variants (Str, Bytes, Buf) so a single batch can carry mixed payload types.

Luxis.start(routes, config, databaseClient, publisher, outboxStore);

When all three are registered, Luxis starts a periodic drainer (pollIntervalMillis, batchSize) and kicks it after every successful commit so events ship as soon as possible.

If you don’t register a Publisher + OutboxStore, calls to ctx.publisher().publish(...) fail with a clear IllegalStateException — no silent dropping.

  • Buffers in-tx events in memory.
  • Calls OutboxStore.append(tx, events) immediately before DatabaseClient.commit(tx).
  • Rolls back if either append or commit fails (events are lost together with the data — atomic outcome).
  • Owns a periodic drainer (one per Luxis instance, started on Luxis.start, stopped on close).
  • Re-entrancy guard: only one drain pass at a time per instance.
  • Kicks the drainer after every commit.
  • Designing the outbox table (Luxis doesn’t prescribe a schema — your OutboxStore knows how to read/write it).
  • Deduplication / idempotency on the consumer side. The drainer retries on the next tick when Publisher.publish fails, so your broker may see duplicates.
  • Backoff and dead-lettering policy if you want them — wrap your Publisher and OutboxStore accordingly.
WhenWhat happens
OutboxStore.append / DatabaseClient.commit fails on the non-tx pathSurfaces as LuxisAsync failure to the caller; transaction is rolled back.
Publisher.publish fails outside txCaller does not see this directly — it occurs on the drainer pass after LuxisAsync has already completed. The exception is forwarded to the configured handler and the batch is retried.
ctx.publisher() inside tx with no OutboxStore registeredSub-chain throws IllegalStateException → rollback → exceptionHandler invoked.
OutboxStore.append failsRollback → exceptionHandler invoked.
Publisher.publish fails during a drain passWhole batch stays pending; next tick retries the entire batch. Exception forwarded to the configured handler.
OutboxStore.markBatchSent failsLogged via exceptionHandler; the batch may be re-published on the next tick. Plan for at-least-once on the consumer side.