From 97a33a0b6232c2295a1b3b4fc59536e88ee158f9 Mon Sep 17 00:00:00 2001 From: Maksym Ostroverkhov Date: Sun, 30 Jun 2024 10:16:27 +0300 Subject: [PATCH] WebSocketFrameFactory, WebSocketFrameFactory.Encoder: add fragmentation support for outbound binary and text frames. --- .../http/websocketx/WebSocketCodecTest.java | 683 +++++++++++++++++- .../websocketx/MaskingWebSocketEncoder.java | 72 ++ .../NonMaskingWebSocketEncoder.java | 71 ++ .../websocketx/WebSocketFrameFactory.java | 53 +- 4 files changed, 869 insertions(+), 10 deletions(-) diff --git a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java index d725aac..4ee69c0 100644 --- a/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java +++ b/netty-websocket-http1-test/src/test/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketCodecTest.java @@ -155,6 +155,78 @@ void textFramesBulkEncoder(boolean mask) throws Exception { client.close(); } + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void binaryFramesFragmentsEncoder(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundBinaryFragmentationEncoderClientHandler clientHandler = + new OutboundBinaryFragmentationEncoderClientHandler(maxFrameSize / 3 - 1); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void binaryFramesFragmentsFactory(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundBinaryFragmentationClientHandler clientHandler = + new OutboundBinaryFragmentationClientHandler(maxFrameSize / 3 - 1); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFragmentsEncoder(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundTextFragmentationEncoderClientHandler clientHandler = + new OutboundTextFragmentationEncoderClientHandler(maxFrameSize / 3 - 1, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + + @Timeout(300) + @ValueSource(booleans = {true, false}) + @ParameterizedTest + void textFramesFragmentsFactory(boolean mask) throws Exception { + int maxFrameSize = 3_000; + Channel s = server = nettyServer(new WebSocketFramesTestServerHandler(), mask, false); + OutboundTextFragmentationClientHandler clientHandler = + new OutboundTextFragmentationClientHandler(maxFrameSize / 3 - 1, 'a'); + Channel client = + webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); + + clientHandler.onHandshakeCompleted().join(); + + CompletableFuture onComplete = clientHandler.startFramesExchange(); + onComplete.join(); + client.close(); + } + @Timeout(300) @ValueSource(booleans = {true, false}) @ParameterizedTest @@ -393,8 +465,8 @@ void fragmentDefaultDecoder( int maxFrameSize = DEFAULT_CODEC_MAX_FRAME_SIZE; Channel s = server = nettyServer(new FragmentTestServerHandler(1500), mask, false); - FragmentationFramesTestClientHandler clientHandler = - new FragmentationFramesTestClientHandler(3333); + InboundFragmentationFramesTestClientHandler clientHandler = + new InboundFragmentationFramesTestClientHandler(3333); Channel client = webSocketCallbacksClient(s.localAddress(), mask, true, maxFrameSize, clientHandler); @@ -413,8 +485,8 @@ void fragmentSmallDecoder() throws Exception { int maxFrameSize = SMALL_CODEC_MAX_FRAME_SIZE; Channel s = server = nettyServer(new FragmentTestServerHandler(33), false, false); - FragmentationFramesTestClientHandler clientHandler = - new FragmentationFramesTestClientHandler(70); + InboundFragmentationFramesTestClientHandler clientHandler = + new InboundFragmentationFramesTestClientHandler(70); Channel client = webSocketCallbacksClient(s.localAddress(), false, false, maxFrameSize, clientHandler); @@ -1517,7 +1589,7 @@ public void onOpen(ChannelHandlerContext ctx) { } } - static class FragmentationFramesTestClientHandler + static class InboundFragmentationFramesTestClientHandler implements WebSocketCallbacksHandler, WebSocketFrameListener { final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); @@ -1525,7 +1597,7 @@ static class FragmentationFramesTestClientHandler WebSocketFrameFactory webSocketFrameFactory; volatile ChannelHandlerContext ctx; - FragmentationFramesTestClientHandler(int frameSize) { + InboundFragmentationFramesTestClientHandler(int frameSize) { this.frameSize = frameSize; } @@ -1638,6 +1710,605 @@ private void sendFrames(ChannelHandlerContext c) { } } + static class OutboundBinaryFragmentationEncoderClientHandler + extends OutboundBinaryFragmentationClientHandler { + OutboundBinaryFragmentationEncoderClientHandler(int frameSize) { + super(frameSize); + } + + static ByteBuf withPayload( + ByteBufAllocator allocator, WebSocketFrameFactory.Encoder encoder, byte content, int size) { + int frameSize = encoder.sizeofBinaryFrame(size); + ByteBuf binaryFrame = allocator.buffer(frameSize); + binaryFrame.writerIndex(frameSize - size); + for (int i = 0; i < size; i++) { + binaryFrame.writeByte(content); + } + return binaryFrame; + } + + @Override + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory.Encoder encoder = webSocketFrameFactory.encoder(); + ByteBufAllocator allocator = c.alloc(); + while (framesSent < frameSize) { + int size = ++framesSent; + + ByteBuf shortFragmentStart = + encoder.encodeBinaryFragmentStart(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf shortFragmentEnd = + encoder.encodeContinuationFragmentEnd( + withPayload(allocator, encoder, (byte) 0xFE, size)); + + ByteBuf longFragmentStart = + encoder.encodeBinaryFragmentStart(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf longFragmentContinuation = + encoder.encodeContinuationFragment(withPayload(allocator, encoder, (byte) 0xFE, size)); + ByteBuf longFragmentEnd = + encoder.encodeContinuationFragmentEnd( + withPayload(allocator, encoder, (byte) 0xFE, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + } + + static class OutboundTextFragmentationEncoderClientHandler + extends OutboundTextFragmentationClientHandler { + + OutboundTextFragmentationEncoderClientHandler(int frameSize, char expectedContent) { + super(frameSize, expectedContent); + } + + static ByteBuf withPayload( + ByteBufAllocator allocator, WebSocketFrameFactory.Encoder encoder, char content, int size) { + int frameSize = encoder.sizeofTextFrame(size); + ByteBuf binaryFrame = allocator.buffer(frameSize); + binaryFrame.writerIndex(frameSize - size); + for (int i = 0; i < size; i++) { + binaryFrame.writeByte(content); + } + return binaryFrame; + } + + @Override + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory.Encoder encoder = webSocketFrameFactory.encoder(); + ByteBufAllocator allocator = c.alloc(); + while (framesSent < frameSize) { + int size = ++framesSent; + + char expected = expectedContent; + ByteBuf shortFragmentStart = + encoder.encodeTextFragmentStart(withPayload(allocator, encoder, expected, size)); + ByteBuf shortFragmentEnd = + encoder.encodeContinuationFragmentEnd(withPayload(allocator, encoder, expected, size)); + + ByteBuf longFragmentStart = + encoder.encodeTextFragmentStart(withPayload(allocator, encoder, expected, size)); + ByteBuf longFragmentContinuation = + encoder.encodeContinuationFragment(withPayload(allocator, encoder, expected, size)); + ByteBuf longFragmentEnd = + encoder.encodeContinuationFragmentEnd(withPayload(allocator, encoder, expected, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + } + + static class OutboundTextFragmentationClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + final char expectedContent; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + int framesReceived; + int framesSent; + int fragmentsReceived; + + OutboundTextFragmentationClientHandler(int frameSize, char expectedContent) { + this.frameSize = frameSize; + this.expectedContent = expectedContent; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode == WebSocketProtocol.OPCODE_CLOSE) { + onFrameExchangeComplete.completeExceptionally(new AssertionError("received close frame")); + payload.release(); + return; + } + switch (fragmentsReceived) { + case /*short start*/ 0: + case /*long start*/ 2: + { + if (opcode != WebSocketProtocol.OPCODE_TEXT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-text opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-fragmented frame")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + fragmentsReceived++; + } + break; + case /*short end*/ 1: + case /*long end*/ 4: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (!finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final fragment, expected final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + payload.release(); + if (fragmentsReceived == /*long end*/ 4) { + fragmentsReceived = 0; + if (++framesReceived == frameSize) { + onFrameExchangeComplete.complete(null); + } + } else { + fragmentsReceived++; + } + } + break; + case /*long continuation*/ 3: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received final fragment, expected non-final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + for (int i = 0; i < payload.readableBytes(); i++) { + char ch = (char) payload.readByte(); + if (ch != expectedContent) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + ch + + ", expected: " + + expectedContent)); + return; + } + } + payload.release(); + fragmentsReceived++; + } + break; + default: + throw new AssertionError("Unexpected fragmentsReceived state: " + fragmentsReceived); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + while (framesSent < frameSize) { + int size = ++framesSent; + ByteBuf shortFragmentStart = + factory.mask( + withPayload( + factory.createTextFragmentStart(c.alloc(), size), expectedContent, size)); + ByteBuf shortFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), expectedContent, size)); + + ByteBuf longFragmentStart = + factory.mask( + withPayload( + factory.createTextFragmentStart(c.alloc(), size), expectedContent, size)); + ByteBuf longFragmentContinuation = + factory.mask( + withPayload( + factory.createContinuationFragment(c.alloc(), size), expectedContent, size)); + ByteBuf longFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), expectedContent, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + + static ByteBuf withPayload(ByteBuf fragment, char content, int size) { + for (int i = 0; i < size; i++) { + fragment.writeByte(content); + } + return fragment; + } + } + + static class OutboundBinaryFragmentationClientHandler + implements WebSocketCallbacksHandler, WebSocketFrameListener { + final CompletableFuture onHandshakeComplete = new CompletableFuture<>(); + final CompletableFuture onFrameExchangeComplete = new CompletableFuture<>(); + final int frameSize; + WebSocketFrameFactory webSocketFrameFactory; + volatile ChannelHandlerContext ctx; + int framesReceived; + int framesSent; + int fragmentsReceived; + + OutboundBinaryFragmentationClientHandler(int frameSize) { + this.frameSize = frameSize; + } + + @Override + public WebSocketFrameListener exchange( + ChannelHandlerContext ctx, WebSocketFrameFactory webSocketFrameFactory) { + this.webSocketFrameFactory = webSocketFrameFactory; + return this; + } + + @Override + public void onChannelRead( + ChannelHandlerContext ctx, boolean finalFragment, int rsv, int opcode, ByteBuf payload) { + if (rsv != 0) { + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received frame with non-zero rsv: " + rsv)); + payload.release(); + return; + } + if (opcode == WebSocketProtocol.OPCODE_CLOSE) { + onFrameExchangeComplete.completeExceptionally(new AssertionError("received close frame")); + payload.release(); + return; + } + switch (fragmentsReceived) { + case /*short start*/ 0: + case /*long start*/ 2: + { + if (opcode != WebSocketProtocol.OPCODE_BINARY) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-binary opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-fragmented frame")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + fragmentsReceived++; + } + break; + case /*short end*/ 1: + case /*long end*/ 4: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (!finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received non-final fragment, expected final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + payload.release(); + if (fragmentsReceived == /*long end*/ 4) { + fragmentsReceived = 0; + if (++framesReceived == frameSize) { + onFrameExchangeComplete.complete(null); + } + } else { + fragmentsReceived++; + } + } + break; + case /*long continuation*/ 3: + { + if (opcode != WebSocketProtocol.OPCODE_CONT) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("first fragment with non-continuation opcode: " + opcode)); + return; + } + if (finalFragment) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError("received final fragment, expected non-final")); + return; + } + int expectedSize = framesReceived + 1; + int payloadSize = payload.readableBytes(); + if (payloadSize != expectedSize) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload size: " + + payloadSize + + ", expected: " + + expectedSize)); + return; + } + byte expectedPayload = (byte) 0xFE; + for (int i = 0; i < payload.readableBytes(); i++) { + byte b = payload.readByte(); + if (b != expectedPayload) { + payload.release(); + onFrameExchangeComplete.completeExceptionally( + new AssertionError( + "received fragment frame with payload: " + + Long.toHexString(b) + + ", expected: " + + Long.toHexString(expectedPayload))); + return; + } + } + payload.release(); + fragmentsReceived++; + } + break; + default: + throw new AssertionError("Unexpected fragmentsReceived state: " + fragmentsReceived); + } + } + + @Override + public void onOpen(ChannelHandlerContext ctx) { + this.ctx = ctx; + onHandshakeComplete.complete(webSocketFrameFactory); + } + + @Override + public void onClose(ChannelHandlerContext ctx) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(new ClosedChannelException()); + } + } + + @Override + public void onExceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + if (!onFrameExchangeComplete.isDone()) { + onFrameExchangeComplete.completeExceptionally(cause); + } + } + + CompletableFuture onHandshakeCompleted() { + return onHandshakeComplete; + } + + CompletableFuture startFramesExchange() { + ChannelHandlerContext c = ctx; + c.executor().execute(() -> sendFrames(c)); + return onFrameExchangeComplete; + } + + protected void sendFrames(ChannelHandlerContext c) { + WebSocketFrameFactory factory = webSocketFrameFactory; + while (framesSent < frameSize) { + int size = ++framesSent; + ByteBuf shortFragmentStart = + factory.mask( + withPayload(factory.createBinaryFragmentStart(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf shortFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), (byte) 0xFE, size)); + + ByteBuf longFragmentStart = + factory.mask( + withPayload(factory.createBinaryFragmentStart(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf longFragmentContinuation = + factory.mask( + withPayload( + factory.createContinuationFragment(c.alloc(), size), (byte) 0xFE, size)); + ByteBuf longFragmentEnd = + factory.mask( + withPayload( + factory.createContinuationFragmentEnd(c.alloc(), size), (byte) 0xFE, size)); + + ctx.write(shortFragmentStart); + ctx.write(shortFragmentEnd); + ctx.write(longFragmentStart); + ctx.write(longFragmentContinuation); + ctx.writeAndFlush(longFragmentEnd); + } + } + + static ByteBuf withPayload(ByteBuf fragment, byte content, int size) { + for (int i = 0; i < size; i++) { + fragment.writeByte(content); + } + return fragment; + } + } + static class TextFramesTestServerHandler extends ChannelInboundHandlerAdapter { final String content; final int framesCount; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java index dde1868..3ecd570 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/MaskingWebSocketEncoder.java @@ -59,6 +59,12 @@ static class FrameFactory static final int TEXT_FRAME_SMALL = OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; + static final int BINARY_FRAGMENT_START_SMALL = OPCODE_BINARY << 8 | /*MASK*/ (byte) 1 << 7; + static final int TEXT_FRAGMENT_START_SMALL = OPCODE_TEXT << 8 | /*MASK*/ (byte) 1 << 7; + static final int DATA_FRAGMENT_CONTINUATION_SMALL = /*MASK*/ (byte) 1 << 7; + static final int DATA_FRAGMENT_CONTINUATION_END_SMALL = /*FIN*/ + (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; + static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15 | /*MASK*/ (byte) 1 << 7; static final int PING_FRAME = @@ -70,6 +76,15 @@ static class FrameFactory static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int BINARY_FRAGMENT_START_MEDIUM = + (BINARY_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAGMENT_START_MEDIUM = + (TEXT_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_END_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_END_SMALL | /*LEN*/ (byte) 126) << 16; + static final WebSocketFrameFactory INSTANCE = new FrameFactory(); static ByteBuf createDataFrame( @@ -100,6 +115,33 @@ public ByteBuf createTextFrame(ByteBufAllocator allocator, int payloadSize) { return createDataFrame(allocator, payloadSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + return createDataFrame( + allocator, binaryDataSize, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame( + allocator, textDataSize, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, dataSize, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, + dataSize, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -177,6 +219,36 @@ public ByteBuf encodeTextFrame(ByteBuf textFrame) { return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame(fragmentFrame, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + + @Override + public int sizeofFragment(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 6; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java index 1734904..dfd3fde 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/NonMaskingWebSocketEncoder.java @@ -56,6 +56,11 @@ static class FrameFactory static final int BINARY_FRAME_SMALL = OPCODE_BINARY << 8 | /*FIN*/ (byte) 1 << 15; static final int TEXT_FRAME_SMALL = OPCODE_TEXT << 8 | /*FIN*/ (byte) 1 << 15; + static final int BINARY_FRAGMENT_START_SMALL = OPCODE_BINARY << 8; + static final int TEXT_FRAGMENT_START_SMALL = OPCODE_TEXT << 8; + static final int DATA_FRAGMENT_CONTINUATION_SMALL = 0; + static final int DATA_FRAGMENT_CONTINUATION_END_SMALL = /*FIN*/ (byte) 1 << 15; + static final int CLOSE_FRAME = OPCODE_CLOSE << 8 | /*FIN*/ (byte) 1 << 15; static final int PING_FRAME = OPCODE_PING << 8 | /*FIN*/ (byte) 1 << 15; static final int PONG_FRAME = OPCODE_PONG << 8 | /*FIN*/ (byte) 1 << 15; @@ -64,6 +69,15 @@ static class FrameFactory static final int BINARY_FRAME_MEDIUM = (BINARY_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; static final int TEXT_FRAME_MEDIUM = (TEXT_FRAME_SMALL | /*LEN*/ (byte) 126) << 16; + static final int BINARY_FRAGMENT_START_MEDIUM = + (BINARY_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int TEXT_FRAGMENT_START_MEDIUM = + (TEXT_FRAGMENT_START_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_SMALL | /*LEN*/ (byte) 126) << 16; + static final int DATA_FRAGMENT_CONTINUATION_END_MEDIUM = + (DATA_FRAGMENT_CONTINUATION_END_SMALL | /*LEN*/ (byte) 126) << 16; + static final WebSocketFrameFactory INSTANCE = new FrameFactory(); static ByteBuf createDataFrame( @@ -92,6 +106,33 @@ public ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { return createDataFrame(allocator, textDataSize, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + return createDataFrame( + allocator, binaryDataSize, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + return createDataFrame( + allocator, textDataSize, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, dataSize, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + return createDataFrame( + allocator, + dataSize, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + @Override public ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason) { if (!WebSocketCloseStatus.isValidStatusCode(statusCode)) { @@ -157,6 +198,36 @@ public ByteBuf encodeTextFrame(ByteBuf textFrame) { return encodeDataFrame(textFrame, TEXT_FRAME_SMALL, TEXT_FRAME_MEDIUM); } + @Override + public ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, BINARY_FRAGMENT_START_SMALL, BINARY_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + return encodeDataFrame(fragmentFrame, TEXT_FRAGMENT_START_SMALL, TEXT_FRAGMENT_START_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, DATA_FRAGMENT_CONTINUATION_SMALL, DATA_FRAGMENT_CONTINUATION_MEDIUM); + } + + @Override + public ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + return encodeDataFrame( + fragmentFrame, + DATA_FRAGMENT_CONTINUATION_END_SMALL, + DATA_FRAGMENT_CONTINUATION_END_MEDIUM); + } + + @Override + public int sizeofFragment(int payloadSize) { + return sizeOfDataFrame(payloadSize); + } + static ByteBuf encodeDataFrame(ByteBuf binaryFrame, int prefixSmall, int prefixMedium) { int frameSize = binaryFrame.readableBytes(); int smallPrefixSize = 2; diff --git a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java index 63d19a7..39cab66 100644 --- a/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java +++ b/netty-websocket-http1/src/main/java/com/jauntsdn/netty/handler/codec/http/websocketx/WebSocketFrameFactory.java @@ -20,8 +20,8 @@ import io.netty.buffer.ByteBufAllocator; /** - * Creates frame ByteBufs containing webSocket prefix. It is user's responsibility to call ByteBuf - * mask(ByteBuf) after frame payload is written. + * Creates frame bytebuffers containing webSocket prefix. It is user's responsibility to call + * ByteBuf mask(ByteBuf) after data frame payload is written. */ public interface WebSocketFrameFactory { @@ -32,6 +32,26 @@ default ByteBuf createTextFrame(ByteBufAllocator allocator, int textDataSize) { "WebSocketFrameFactory.createTextFrame() not implemented"); } + default ByteBuf createBinaryFragmentStart(ByteBufAllocator allocator, int binaryDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createBinaryFragmentStart() not implemented"); + } + + default ByteBuf createTextFragmentStart(ByteBufAllocator allocator, int textDataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createTextFragmentStart() not implemented"); + } + + default ByteBuf createContinuationFragment(ByteBufAllocator allocator, int dataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createContinuationFragment() not implemented"); + } + + default ByteBuf createContinuationFragmentEnd(ByteBufAllocator allocator, int dataSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.createContinuationFragmentEnd() not implemented"); + } + ByteBuf createCloseFrame(ByteBufAllocator allocator, int statusCode, String reason); ByteBuf createPingFrame(ByteBufAllocator allocator, int binaryDataSize); @@ -46,7 +66,7 @@ default BulkEncoder bulkEncoder() { throw new UnsupportedOperationException("WebSocketFrameFactory.bulkEncoder() not implemented"); } - /** Encodes prefix of single binary websocket frame into provided bytebuffer. */ + /** Encodes prefix of single data websocket frame into provided bytebuffer. */ interface Encoder { ByteBuf encodeBinaryFrame(ByteBuf binaryFrame); @@ -62,9 +82,34 @@ default int sizeofTextFrame(int textPayloadSize) { throw new UnsupportedOperationException( "WebSocketFrameFactory.Encoder.sizeofTextFrame() not implemented"); } + + default ByteBuf encodeBinaryFragmentStart(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeBinaryFragmentStart() not implemented"); + } + + default ByteBuf encodeTextFragmentStart(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeTextFragmentStart() not implemented"); + } + + default ByteBuf encodeContinuationFragment(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeContinuationFragment() not implemented"); + } + + default ByteBuf encodeContinuationFragmentEnd(ByteBuf fragmentFrame) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.encodeContinuationFragmentEnd() not implemented"); + } + + default int sizeofFragment(int payloadSize) { + throw new UnsupportedOperationException( + "WebSocketFrameFactory.Encoder.sizeofFragment() not implemented"); + } } - /** Encodes prefixes of multiple binary websocket frames into provided bytebuffer. */ + /** Encodes prefixes of multiple data websocket frames into provided bytebuffer. */ interface BulkEncoder { /** @return frame mask, or -1 if masking not applicable */