WebSockets
If you’ve built an HTTP handler in Luxis, you already know how to build a WebSocket handler. The pipeline is exactly the same — map, flatMap, blockingMap, asyncMap, validate, complete — with the same threading rules, the same compiler-enforced concurrency safety, and the same in-memory testing.
Message format
Section titled “Message format”All WebSocket messages use a JSON envelope with two fields:
{"type": "echo", "payload": {"message": "hello"}}type— a string key that routes the message to the correct handlerpayload— the JSON body, deserialized into your handler’s input class
Responses follow the same format:
{"type": "echoResponse", "payload": {"echo": "echo: hello"}}This envelope structure means a single WebSocket connection can handle many different message types, each routed to its own handler with its own typed pipeline.
Defining a WebSocket route
Section titled “Defining a WebSocket route”Extend WebSocketRoutes and register your inbound and outbound message types:
public class ChatWebSocketRoutes extends WebSocketRoutes<AppState, ChatResponse> {
@Override public void registerRoutes(WebSocketRoutesRegister<AppState, ChatResponse> routes) { // Register response types — Luxis needs these to build the envelope routes.registerOutbound("messageResponse", MessageResponse.class); routes.registerOutbound("statusResponse", StatusResponse.class);
// Register handlers — each one is a typed pipeline routes.registerInbound("sendMessage", SendMessageRequest.class, stream -> stream .validate(v -> { v.field("text", r -> r.text).required().minLength(1); }) .map(this::addToHistory) // event loop — access app state .blockingMap(this::persistToDatabase) // worker thread — blocking I/O .map(ctx -> new MessageResponse(ctx.in())) .complete());
routes.registerInbound("getStatus", StatusRequest.class, stream -> stream .map(ctx -> new StatusResponse(ctx.app().onlineUsers())) .complete()); }}Every registerInbound handler is a pipeline — the same one you use for HTTP. The same rules apply:
map/flatMaprun on the event loop with access to application state and the WebSocket sessionblockingMap/blockingFlatMaprun on a worker thread, no application state or session access (compiler-enforced)asyncMap/asyncFlatMapfor async operationsvalidatefor declarative field validationflatvariants let you explicitly handle error cases with a specific response
Context access
Section titled “Context access”| Thread | Available context |
|---|---|
Event loop (map, flatMap, asyncMap, etc.) | ctx.in(), ctx.session(), ctx.app() |
Worker (blockingMap, blockingFlatMap, etc.) | ctx.in() |
Unlike HTTP pipelines where ctx.session() is available on worker threads, WebSocket pipelines do not expose ctx.session() in blocking methods. This is because WebSocket write operations (such as sending messages) are not safe to call from a worker thread — they must run on the event loop that owns the connection. HTTP response metadata like headers and cookies can be safely buffered from a worker thread because Vert.x’s blocking handler mechanism supports this, but WebSocket writes go directly to the network and must stay on the event loop.
To send a message after a blocking operation, return your result and use ctx.session() in a subsequent map step, which runs back on the event loop:
routes.registerInbound("query", QueryRequest.class, stream -> stream .blockingMap(ctx -> queryDatabase(ctx.in())) // worker thread — no session .map(ctx -> { // event loop — session available ctx.session().send(new StatusUpdate("query complete")); return new QueryResponse(ctx.in()); }) .complete());Registering the route
Section titled “Registering the route”Register WebSocket routes alongside your HTTP routes:
public static AppState registerRoutes(RoutesRegister routesRegister) { AppState state = new AppState();
// HTTP routes routesRegister.jsonRoute("/api/users", Method.GET, state, Void.class, new ListUsersHandler());
// WebSocket routes routesRegister.webSocketRoute("/ws/chat", state, new ChatWebSocketRoutes());
return state;}Lifecycle hooks
Section titled “Lifecycle hooks”Override onOpen and onClose to react to connection events:
public class ChatWebSocketRoutes extends WebSocketRoutes<AppState, ChatResponse> {
@Override public void onOpen(WebSocketSession<ChatResponse> session, AppState appState) { appState.addConnection(session); session.send(new StatusResponse("connected")); }
@Override public void onClose(WebSocketSession<ChatResponse> session, AppState appState) { appState.removeConnection(session); }
@Override public void registerRoutes(WebSocketRoutesRegister<AppState, ChatResponse> routes) { // ... }}session.send() wraps your response object in the {"type": ..., "payload": ...} envelope automatically — the type key is looked up from your registerOutbound registration.
Sending messages without a response
Section titled “Sending messages without a response”Use completeWithNoResponse() when the handler should process the message but not send anything back:
routes.registerInbound("heartbeat", HeartbeatRequest.class, stream -> stream .map(ctx -> { ctx.app().recordHeartbeat(ctx.session()); return ctx.in(); }) .completeWithNoResponse());Splitting handlers into methods
Section titled “Splitting handlers into methods”As your WebSocket route grows, extract handlers into separate methods:
public class ChatWebSocketRoutes extends WebSocketRoutes<AppState, ChatResponse> {
private WebSocketPipeline<MessageResponse> handleSendMessage( WebSocketStream<SendMessageRequest, AppState, ChatResponse> stream) { return stream .map(this::addToHistory) .blockingMap(this::persistToDatabase) .map(ctx -> new MessageResponse(ctx.in())) .complete(); }
private WebSocketPipeline<StatusResponse> handleGetStatus( WebSocketStream<StatusRequest, AppState, ChatResponse> stream) { return stream .map(ctx -> new StatusResponse(ctx.app().onlineUsers())) .complete(); }
@Override public void registerRoutes(WebSocketRoutesRegister<AppState, ChatResponse> routes) { routes.registerOutbound("messageResponse", MessageResponse.class); routes.registerOutbound("statusResponse", StatusResponse.class);
routes.registerInbound("sendMessage", SendMessageRequest.class, this::handleSendMessage); routes.registerInbound("getStatus", StatusRequest.class, this::handleGetStatus); }}Testing WebSockets
Section titled “Testing WebSockets”WebSocket testing works the same way as HTTP — in-memory with Luxis.test(), or against a real server with Luxis.start():
public class ChatWebSocketTest {
private Luxis<AppState> luxis; private TestClient client; private TestWebSocketClient ws;
@Before public void setUp() { luxis = Luxis.test(MyApp::registerRoutes); client = new StubTestClient("127.0.0.1", 8080, luxis); }
@After public void tearDown() throws Exception { if (ws != null) ws.close(); client.close(); luxis.close(); }
@Test public void shouldEchoMessage() { ws = client.webSocket(StubRequest.request("/ws/chat"));
ws.send(json() .put("type", "sendMessage") .set("payload", json().put("text", "hello")) .toString());
ws.onResponses(received -> { Assert.assertEquals(1, received.size()); // Response is a JSON envelope: {"type":"messageResponse","payload":{...}} }); }}To switch to the real Vert.x server, change the setup — the test code stays the same:
luxis = Luxis.start(MyApp::registerRoutes, new WebServiceConfigBuilder().setPort(8080).build());client = new VertxTestClient("127.0.0.1", 8080);Error handling
Section titled “Error handling”Validation errors and flatMap errors are sent as responses with type "error":
{"type": "error", "payload": {"message": "Validation failed", "errors": {"text": ["must not be blank"]}}}Unhandled exceptions are caught, reported to your exception handler, and the client receives a generic error response — just like HTTP.
Configuration
Section titled “Configuration”Customise WebSocket behaviour with WebSocketRouteConfigBuilder:
routesRegister.webSocketRoute("/ws/chat", state, new ChatWebSocketRoutes(), new WebSocketRouteConfigBuilder() .setCorruptInputStrategy(new SendErrorResponse("Bad message format")) .setFailedValidationStrategy(JustSendValidationError.INSTANCE) .setBackpressureStrategy(BackpressureStrategy.DISCONNECT_CLIENT) .build());- Corrupt input — what happens when the client sends invalid JSON or an unknown
type. Default: disconnect. - Failed validation — what happens when validation fails. Default: send the validation errors and keep the connection open.
- Backpressure — what happens when the write queue is full. Default: buffer all the messages.