Skip to content

Commit

Permalink
Issue #11398 - allow frames to be demanded in WebSocket onOpen
Browse files Browse the repository at this point in the history
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
  • Loading branch information
lachlan-roberts committed Feb 14, 2024
1 parent a683690 commit 25fb7be
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public boolean isClosed()
public boolean isInputOpen()
{
State state = getState();
return (state == State.OPEN || state == State.OSHUT);
return (state == State.CONNECTED || state == State.OPEN || state == State.OSHUT);
}

public boolean isOutputOpen()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,31 @@
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.api.Callback;
import org.eclipse.jetty.websocket.api.Frame;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -74,6 +81,38 @@ public void onWebSocketFrame(Frame frame, Callback callback)
}
}

@WebSocket(autoDemand = false)
public static class OnOpenSocket implements Session.Listener
{
CountDownLatch onOpen = new CountDownLatch(1);
BlockingQueue<String> textMessages = new BlockingArrayQueue<>();
Session session;

@Override
public void onWebSocketOpen(Session session)
{
try
{
this.session = session;
session.demand();
onOpen.await();
}
catch (InterruptedException e)
{
throw new RuntimeException(e);
}
}

@Override
public void onWebSocketFrame(Frame frame, Callback callback)
{
if (frame.getOpCode() == OpCode.TEXT)
textMessages.add(BufferUtil.toString(frame.getPayload()));
callback.succeed();

}
}

@WebSocket(autoDemand = false)
public static class PingSocket extends ListenerSocket
{
Expand All @@ -99,6 +138,7 @@ public void onWebSocketFrame(Frame frame, Callback callback)
private final WebSocketClient client = new WebSocketClient();
private final SuspendSocket serverSocket = new SuspendSocket();
private final ListenerSocket listenerSocket = new ListenerSocket();
private final OnOpenSocket onOpenSocket = new OnOpenSocket();
private final PingSocket pingSocket = new PingSocket();
private ServerConnector connector;

Expand All @@ -113,6 +153,7 @@ public void start() throws Exception
container.addMapping("/suspend", (rq, rs, cb) -> serverSocket);
container.addMapping("/listenerSocket", (rq, rs, cb) -> listenerSocket);
container.addMapping("/ping", (rq, rs, cb) -> pingSocket);
container.addMapping("/onOpen", (rq, rs, cb) -> onOpenSocket);
});

server.setHandler(wsHandler);
Expand Down Expand Up @@ -213,4 +254,23 @@ public void testServerPing() throws Exception
frame = pingSocket.frames.get(2);
assertThat(frame.getType(), is(Frame.Type.CLOSE));
}

@Test
public void testDemandInOnOpen() throws Exception
{
URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/onOpen");
EventSocket clientSocket = new EventSocket();

Future<Session> connect = client.connect(clientSocket, uri);
Session session = connect.get(5, TimeUnit.SECONDS);
session.sendText("test-text", Callback.NOOP);

String received = onOpenSocket.textMessages.poll(5, TimeUnit.SECONDS);
assertThat(received, equalTo("test-text"));
onOpenSocket.onOpen.countDown();

session.close();
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertThat(clientSocket.closeCode, equalTo(CloseStatus.NORMAL));
}
}

0 comments on commit 25fb7be

Please sign in to comment.