Skip to content

WebSockets

If you’ve built an HTTP handler in Luxis, you already know how to build a WebSocket handler. The pipeline is exactly the samemap, flatMap, blockingMap, asyncMap, validate, complete — with the same threading rules, the same compiler-enforced concurrency safety, and the same in-memory testing.

All WebSocket messages use a JSON envelope with two fields:

{"type": "echo", "payload": {"message": "hello"}}
  • type — a string key that routes the message to the correct handler
  • payload — the JSON body, deserialized into your handler’s input class

Responses follow the same format:

{"type": "echoResponse", "payload": {"echo": "echo: hello"}}

This envelope structure means a single WebSocket connection can handle many different message types, each routed to its own handler with its own typed pipeline.

Extend WebSocketRoutes and register your inbound and outbound message types:

public class ChatWebSocketRoutes extends WebSocketRoutes<AppState, ChatResponse> {
@Override
public void registerRoutes(WebSocketRoutesRegister<AppState, ChatResponse> routes) {
// Register response types — Luxis needs these to build the envelope
routes.registerOutbound("messageResponse", MessageResponse.class);
routes.registerOutbound("statusResponse", StatusResponse.class);
// Register handlers — each one is a typed pipeline
routes.registerInbound("sendMessage", SendMessageRequest.class, stream ->
stream
.validate(v -> {
v.field("text", r -> r.text).required().minLength(1);
})
.map(this::addToHistory) // event loop — access app state
.blockingMap(this::persistToDatabase) // worker thread — blocking I/O
.map(ctx -> new MessageResponse(ctx.in()))
.complete());
routes.registerInbound("getStatus", StatusRequest.class, stream ->
stream
.map(ctx -> new StatusResponse(ctx.app().onlineUsers()))
.complete());
}
}

Every registerInbound handler is a pipeline — the same one you use for HTTP. The same rules apply:

  • map / flatMap run on the event loop with access to application state and the WebSocket session
  • blockingMap / blockingFlatMap run on a worker thread, no application state or session access (compiler-enforced)
  • asyncMap / asyncFlatMap for async operations
  • validate for declarative field validation
  • flat variants let you explicitly handle error cases with a specific response
ThreadAvailable context
Event loop (map, flatMap, asyncMap, etc.)ctx.in(), ctx.session(), ctx.app()
Worker (blockingMap, blockingFlatMap, etc.)ctx.in()

Unlike HTTP pipelines where ctx.session() is available on worker threads, WebSocket pipelines do not expose ctx.session() in blocking methods. This is because WebSocket write operations (such as sending messages) are not safe to call from a worker thread — they must run on the event loop that owns the connection. HTTP response metadata like headers and cookies can be safely buffered from a worker thread because Vert.x’s blocking handler mechanism supports this, but WebSocket writes go directly to the network and must stay on the event loop.

To send a message after a blocking operation, return your result and use ctx.session() in a subsequent map step, which runs back on the event loop:

routes.registerInbound("query", QueryRequest.class, stream ->
stream
.blockingMap(ctx -> queryDatabase(ctx.in())) // worker thread — no session
.map(ctx -> { // event loop — session available
ctx.session().send(new StatusUpdate("query complete"));
return new QueryResponse(ctx.in());
})
.complete());

Register WebSocket routes alongside your HTTP routes:

public static AppState registerRoutes(RoutesRegister routesRegister) {
AppState state = new AppState();
// HTTP routes
routesRegister.jsonRoute("/api/users", Method.GET, state, Void.class, new ListUsersHandler());
// WebSocket routes
routesRegister.webSocketRoute("/ws/chat", state, new ChatWebSocketRoutes());
return state;
}

Override onOpen and onClose to react to connection events:

public class ChatWebSocketRoutes extends WebSocketRoutes<AppState, ChatResponse> {
@Override
public void onOpen(WebSocketSession<ChatResponse> session, AppState appState) {
appState.addConnection(session);
session.send(new StatusResponse("connected"));
}
@Override
public void onClose(WebSocketSession<ChatResponse> session, AppState appState) {
appState.removeConnection(session);
}
@Override
public void registerRoutes(WebSocketRoutesRegister<AppState, ChatResponse> routes) {
// ...
}
}

session.send() wraps your response object in the {"type": ..., "payload": ...} envelope automatically — the type key is looked up from your registerOutbound registration.

Use completeWithNoResponse() when the handler should process the message but not send anything back:

routes.registerInbound("heartbeat", HeartbeatRequest.class, stream ->
stream
.map(ctx -> {
ctx.app().recordHeartbeat(ctx.session());
return ctx.in();
})
.completeWithNoResponse());

As your WebSocket route grows, extract handlers into separate methods:

public class ChatWebSocketRoutes extends WebSocketRoutes<AppState, ChatResponse> {
private WebSocketPipeline<MessageResponse> handleSendMessage(
WebSocketStream<SendMessageRequest, AppState, ChatResponse> stream) {
return stream
.map(this::addToHistory)
.blockingMap(this::persistToDatabase)
.map(ctx -> new MessageResponse(ctx.in()))
.complete();
}
private WebSocketPipeline<StatusResponse> handleGetStatus(
WebSocketStream<StatusRequest, AppState, ChatResponse> stream) {
return stream
.map(ctx -> new StatusResponse(ctx.app().onlineUsers()))
.complete();
}
@Override
public void registerRoutes(WebSocketRoutesRegister<AppState, ChatResponse> routes) {
routes.registerOutbound("messageResponse", MessageResponse.class);
routes.registerOutbound("statusResponse", StatusResponse.class);
routes.registerInbound("sendMessage", SendMessageRequest.class, this::handleSendMessage);
routes.registerInbound("getStatus", StatusRequest.class, this::handleGetStatus);
}
}

WebSocket testing works the same way as HTTP — in-memory with Luxis.test(), or against a real server with Luxis.start():

public class ChatWebSocketTest {
private Luxis<AppState> luxis;
private TestClient client;
private TestWebSocketClient ws;
@Before
public void setUp() {
luxis = Luxis.test(MyApp::registerRoutes);
client = new StubTestClient("127.0.0.1", 8080, luxis);
}
@After
public void tearDown() throws Exception {
if (ws != null) ws.close();
client.close();
luxis.close();
}
@Test
public void shouldEchoMessage() {
ws = client.webSocket(StubRequest.request("/ws/chat"));
ws.send(json()
.put("type", "sendMessage")
.set("payload", json().put("text", "hello"))
.toString());
ws.onResponses(received -> {
Assert.assertEquals(1, received.size());
// Response is a JSON envelope: {"type":"messageResponse","payload":{...}}
});
}
}

To switch to the real Vert.x server, change the setup — the test code stays the same:

luxis = Luxis.start(MyApp::registerRoutes,
new WebServiceConfigBuilder().setPort(8080).build());
client = new VertxTestClient("127.0.0.1", 8080);

Validation errors and flatMap errors are sent as responses with type "error":

{"type": "error", "payload": {"message": "Validation failed", "errors": {"text": ["must not be blank"]}}}

Unhandled exceptions are caught, reported to your exception handler, and the client receives a generic error response — just like HTTP.

Customise WebSocket behaviour with WebSocketRouteConfigBuilder:

routesRegister.webSocketRoute("/ws/chat", state, new ChatWebSocketRoutes(),
new WebSocketRouteConfigBuilder()
.setCorruptInputStrategy(new SendErrorResponse("Bad message format"))
.setFailedValidationStrategy(JustSendValidationError.INSTANCE)
.setBackpressureStrategy(BackpressureStrategy.DISCONNECT_CLIENT)
.build());
  • Corrupt input — what happens when the client sends invalid JSON or an unknown type. Default: disconnect.
  • Failed validation — what happens when validation fails. Default: send the validation errors and keep the connection open.
  • Backpressure — what happens when the write queue is full. Default: buffer all the messages.