diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/WriteErrorClosedConnectionTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/WriteErrorClosedConnectionTest.java new file mode 100644 index 0000000000000..ec46f533c4564 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/errors/WriteErrorClosedConnectionTest.java @@ -0,0 +1,70 @@ +package io.quarkus.websockets.next.test.errors; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.time.Duration; +import java.util.concurrent.atomic.AtomicBoolean; + +import jakarta.inject.Inject; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnBinaryMessage; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.WebSocketConnection; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.smallrye.mutiny.Uni; +import io.vertx.core.Vertx; +import io.vertx.core.buffer.Buffer; + +public class WriteErrorClosedConnectionTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }); + + @Inject + Vertx vertx; + + @TestHTTPResource("echo") + URI testUri; + + @Test + void testError() { + WSClient client = WSClient.create(vertx).connect(testUri); + client.sendAndAwait(Buffer.buffer("1")); + Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> client.isClosed()); + assertTrue(Echo.ERROR_HANDLER_CALLED.get()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + static final AtomicBoolean ERROR_HANDLER_CALLED = new AtomicBoolean(); + + @OnBinaryMessage + Uni process(Buffer message, WebSocketConnection connection) { + // This should result in a failure because the connection is closed + // but we still try to write a binary message + return connection.close().replaceWith(message); + } + + @OnError + void runtimeProblem(Throwable t, WebSocketConnection connection) { + if (connection.isOpen()) { + throw new IllegalStateException(); + } + ERROR_HANDLER_CALLED.set(true); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/maxmessagesize/MaxMessageSizeTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/maxmessagesize/MaxMessageSizeTest.java new file mode 100644 index 0000000000000..2ffe0778d69f7 --- /dev/null +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/maxmessagesize/MaxMessageSizeTest.java @@ -0,0 +1,63 @@ +package io.quarkus.websockets.next.test.maxmessagesize; + +import static org.junit.jupiter.api.Assertions.assertNotEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.net.URI; +import java.util.concurrent.atomic.AtomicBoolean; + +import jakarta.inject.Inject; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.http.TestHTTPResource; +import io.quarkus.websockets.next.OnError; +import io.quarkus.websockets.next.OnTextMessage; +import io.quarkus.websockets.next.WebSocket; +import io.quarkus.websockets.next.test.utils.WSClient; +import io.vertx.core.Vertx; + +public class MaxMessageSizeTest { + + @RegisterExtension + public static final QuarkusUnitTest test = new QuarkusUnitTest() + .withApplicationRoot(root -> { + root.addClasses(Echo.class, WSClient.class); + }).overrideConfigKey("quarkus.websockets-next.max-message-size", "10"); + + @Inject + Vertx vertx; + + @TestHTTPResource("/echo") + URI echoUri; + + @Test + void testMaxMessageSize() { + WSClient client = WSClient.create(vertx).connect(echoUri); + String msg = "foo".repeat(10); + String reply = client.sendAndAwaitReply(msg).toString(); + assertNotEquals(msg, reply); + assertTrue(Echo.ISE_THROWN.get()); + } + + @WebSocket(path = "/echo") + public static class Echo { + + static final AtomicBoolean ISE_THROWN = new AtomicBoolean(); + + @OnTextMessage + String process(String message) { + return message; + } + + @OnError + String onError(IllegalStateException ise) { + ISE_THROWN.set(true); + return ise.getMessage(); + } + + } + +} diff --git a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/subprotocol/SubprotocolNotAvailableTest.java b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/subprotocol/SubprotocolNotAvailableTest.java index 9ef02fe878268..9a79b8d12fda0 100644 --- a/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/subprotocol/SubprotocolNotAvailableTest.java +++ b/extensions/websockets-next/server/deployment/src/test/java/io/quarkus/websockets/next/test/subprotocol/SubprotocolNotAvailableTest.java @@ -5,11 +5,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.net.URI; +import java.time.Duration; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; +import jakarta.enterprise.context.Destroyed; +import jakarta.enterprise.context.SessionScoped; +import jakarta.enterprise.event.Observes; import jakarta.inject.Inject; +import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; @@ -43,18 +48,26 @@ void testConnectionRejected() { Throwable cause = e.getCause(); assertTrue(cause instanceof WebSocketClientHandshakeException); assertFalse(Endpoint.OPEN_CALLED.get()); + // Wait until the CDI singleton context is destroyed + // Otherwise the test app is shut down before the WebSocketSessionContext is ended properly + Awaitility.await().atMost(Duration.ofSeconds(5)).until(() -> Endpoint.SESSION_CONTEXT_DESTROYED.get()); } @WebSocket(path = "/endpoint") public static class Endpoint { static final AtomicBoolean OPEN_CALLED = new AtomicBoolean(); + static final AtomicBoolean SESSION_CONTEXT_DESTROYED = new AtomicBoolean(); @OnOpen void open() { OPEN_CALLED.set(true); } + static void sessionContextDestroyed(@Observes @Destroyed(SessionScoped.class) Object event) { + SESSION_CONTEXT_DESTROYED.set(true); + } + } } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java index ff38df72391d6..e1c76dc33dde3 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/WebSocketsRuntimeConfig.java @@ -1,12 +1,14 @@ package io.quarkus.websockets.next; -import java.time.Duration; import java.util.List; import java.util.Optional; +import java.util.OptionalInt; import io.quarkus.runtime.annotations.ConfigPhase; import io.quarkus.runtime.annotations.ConfigRoot; import io.smallrye.config.ConfigMapping; +import io.smallrye.config.WithDefault; +import io.vertx.core.http.HttpServerOptions; @ConfigMapping(prefix = "quarkus.websockets-next") @ConfigRoot(phase = ConfigPhase.RUN_TIME) @@ -14,16 +16,27 @@ public interface WebSocketsRuntimeConfig { /** * See The WebSocket Protocol - * - * @return the supported subprotocols */ Optional> supportedSubprotocols(); /** - * TODO Not implemented yet. - * - * The default timeout to complete processing of a message. + * Compression Extensions for WebSocket are supported by default. + *

+ * See also RFC 7692 */ - Optional timeout(); + @WithDefault("true") + boolean perMessageCompressionSupported(); + + /** + * The compression level must be a value between 0 and 9. The default value is + * {@value HttpServerOptions#DEFAULT_WEBSOCKET_COMPRESSION_LEVEL}. + */ + OptionalInt compressionLevel(); + + /** + * The maximum size of a message in bytes. The default values is + * {@value HttpServerOptions#DEFAULT_MAX_WEBSOCKET_MESSAGE_SIZE}. + */ + OptionalInt maxMessageSize(); } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java index 6edb66693f906..9e4b7a4e81525 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/ContextSupport.java @@ -69,7 +69,7 @@ void endSession() { } ContextState currentRequestContextState() { - return requestContext.getState(); + return requestContext.getStateIfActive(); } static Context createNewDuplicatedContext(Context context, WebSocketConnection connection) { diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java index 5018b1aee2b35..5233fd4a1cc34 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketHttpServerOptionsCustomizer.java @@ -17,12 +17,23 @@ public class WebSocketHttpServerOptionsCustomizer implements HttpServerOptionsCu @Override public void customizeHttpServer(HttpServerOptions options) { - config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol); + customize(options); } @Override public void customizeHttpsServer(HttpServerOptions options) { + customize(options); + } + + private void customize(HttpServerOptions options) { config.supportedSubprotocols().orElse(List.of()).forEach(options::addWebSocketSubProtocol); + options.setPerMessageWebSocketCompressionSupported(config.perMessageCompressionSupported()); + if (config.compressionLevel().isPresent()) { + options.setWebSocketCompressionLevel(config.compressionLevel().getAsInt()); + } + if (config.maxMessageSize().isPresent()) { + options.setMaxWebSocketMessageSize(config.maxMessageSize().getAsInt()); + } } } diff --git a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java index a0b98d13b1209..c53d15645b01d 100644 --- a/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java +++ b/extensions/websockets-next/server/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketServerRecorder.java @@ -234,6 +234,21 @@ public void handle(Void event) { }); } }); + + ws.exceptionHandler(new Handler() { + @Override + public void handle(Throwable t) { + ContextSupport.createNewDuplicatedContext(context, connection).runOnContext(new Handler() { + @Override + public void handle(Void event) { + endpoint.doOnError(t).subscribe().with( + v -> LOG.debugf("Error [%s] processed: %s", t.getClass(), connection), + t -> LOG.errorf(t, "Unhandled error occured: %s", t.toString(), + connection)); + } + }); + } + }); }); } };