diff --git a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
index 7d229d1d0315..f76edcfd7e4b 100644
--- a/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
+++ b/jetty-core/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java
@@ -1865,7 +1865,7 @@ public void onError(Throwable t)
}
});
// Close the parser to cause the issue.
- org.eclipse.jetty.server.HttpConnection.getCurrentConnection().getParser().close();
+ org.eclipse.jetty.server.internal.HttpConnection.getCurrentConnection().getParser().close();
}
});
server.start();
diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java
index 595fb5f5df5e..c620216a8298 100644
--- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java
+++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java
@@ -455,12 +455,18 @@ else if (status == HttpStatus.NO_CONTENT_204 || status == HttpStatus.NOT_MODIFIE
}
}
- public void servletUpgrade()
+ public void startTunnel()
{
_noContentResponse = false;
_state = State.COMMITTED;
}
+ @Deprecated(since = "12.1.0", forRemoval = true)
+ public void servletUpgrade()
+ {
+ startTunnel();
+ }
+
private void prepareChunk(ByteBuffer chunk, int remaining)
{
// if we need CRLF add this to header
diff --git a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java
index 93be3478cb01..de5f6edc3d36 100644
--- a/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java
+++ b/jetty-core/jetty-http/src/main/java/org/eclipse/jetty/http/HttpParser.java
@@ -2016,13 +2016,19 @@ public void reset()
_headerComplete = false;
}
- public void servletUpgrade()
+ public void startTunnel()
{
- setState(State.CONTENT);
- _endOfContent = EndOfContent.UNKNOWN_CONTENT;
+ setState(State.EOF_CONTENT);
+ _endOfContent = EndOfContent.EOF_CONTENT;
_contentLength = -1;
}
+ @Deprecated(since = "12.1.0", forRemoval = true)
+ public void servletUpgrade()
+ {
+ startTunnel();
+ }
+
protected void setState(State state)
{
if (debugEnabled)
diff --git a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java
index edbd16f20580..edf6d0b5a3bf 100644
--- a/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java
+++ b/jetty-core/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java
@@ -140,6 +140,16 @@ interface UpgradeTo
void onUpgradeTo(ByteBuffer buffer);
}
+ /**
+ *
Start a tunnel over the current connection without replacing the connection.
+ * This can be used for upgrade within a connection, but it is not really an upgrade for this connection
+ * as the connection remains and just tunnels data to/from its endpoint.
+ */
+ interface Tunnel
+ {
+ void startTunnel();
+ }
+
/**
* A Listener for connection events.
* Listeners can be added to a {@link Connection} to get open and close events.
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
index 6abca7848391..327a3e62c48d 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
@@ -80,7 +80,7 @@
/**
*
A {@link Connection} that handles the HTTP protocol.
*/
-public class HttpConnection extends AbstractMetaDataConnection implements Runnable, Connection.UpgradeFrom, Connection.UpgradeTo, ConnectionMetaData
+public class HttpConnection extends AbstractMetaDataConnection implements Runnable, Connection.UpgradeFrom, Connection.UpgradeTo, Connection.Tunnel, ConnectionMetaData
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
private static final HttpField PREAMBLE_UPGRADE_H2C = new HttpField(HttpHeader.UPGRADE, "h2c");
@@ -335,6 +335,13 @@ public void onUpgradeTo(ByteBuffer buffer)
BufferUtil.append(getRequestBuffer(), buffer);
}
+ @Override
+ public void startTunnel()
+ {
+ getParser().startTunnel();
+ getGenerator().startTunnel();
+ }
+
void releaseRequestBuffer()
{
if (_retainableByteBuffer != null && _retainableByteBuffer.isEmpty())
diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletApiRequest.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletApiRequest.java
index 38e93d25c1d2..cd5504e9c120 100644
--- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletApiRequest.java
+++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletApiRequest.java
@@ -41,15 +41,18 @@
import jakarta.servlet.AsyncContext;
import jakarta.servlet.DispatcherType;
+import jakarta.servlet.ReadListener;
import jakarta.servlet.RequestDispatcher;
import jakarta.servlet.ServletConnection;
import jakarta.servlet.ServletContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletRequestAttributeEvent;
import jakarta.servlet.ServletRequestAttributeListener;
import jakarta.servlet.ServletResponse;
+import jakarta.servlet.WriteListener;
import jakarta.servlet.http.Cookie;
import jakarta.servlet.http.HttpServletMapping;
import jakarta.servlet.http.HttpServletRequest;
@@ -58,7 +61,10 @@
import jakarta.servlet.http.HttpUpgradeHandler;
import jakarta.servlet.http.Part;
import jakarta.servlet.http.PushBuilder;
+import jakarta.servlet.http.WebConnection;
import org.eclipse.jetty.ee10.servlet.ServletContextHandler.ServletRequestInfo;
+import org.eclipse.jetty.ee10.servlet.util.ServletInputStreamWrapper;
+import org.eclipse.jetty.ee10.servlet.util.ServletOutputStreamWrapper;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.CookieCompliance;
import org.eclipse.jetty.http.HttpCookie;
@@ -72,6 +78,7 @@
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.SetCookieParser;
import org.eclipse.jetty.http.pathmap.MatchedResource;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.security.AuthenticationState;
@@ -737,8 +744,255 @@ public Part getPart(String name) throws IOException, ServletException
@Override
public T upgrade(Class handlerClass) throws IOException, ServletException
{
- // Not implemented. Throw ServletException as per spec.
- throw new ServletException("Not implemented");
+ Response response = _servletContextRequest.getServletContextResponse();
+ if (response.getStatus() != HttpStatus.SWITCHING_PROTOCOLS_101)
+ throw new IllegalStateException("Response status should be 101");
+ if (response.getHeaders().get("Upgrade") == null)
+ throw new IllegalStateException("Missing Upgrade header");
+ if (!"Upgrade".equalsIgnoreCase(response.getHeaders().get("Connection")))
+ throw new IllegalStateException("Invalid Connection header");
+ if (response.isCommitted())
+ throw new IllegalStateException("Cannot upgrade committed response");
+ if (_servletChannel.getConnectionMetaData().getHttpVersion() != HttpVersion.HTTP_1_1)
+ throw new IllegalStateException("Only requests over HTTP/1.1 can be upgraded");
+
+ CompletableFuture outputStreamComplete = new CompletableFuture<>();
+ CompletableFuture inputStreamComplete = new CompletableFuture<>();
+ ServletOutputStream outputStream = new ServletOutputStreamWrapper(_servletContextRequest.getHttpOutput())
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ try
+ {
+ super.write(b);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException
+ {
+ try
+ {
+ super.write(b);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ try
+ {
+ super.write(b, off, len);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ super.close();
+ outputStreamComplete.complete(null);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void setWriteListener(WriteListener writeListener)
+ {
+ super.setWriteListener(new WriteListener()
+ {
+ @Override
+ public void onWritePossible() throws IOException
+ {
+ writeListener.onWritePossible();
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ writeListener.onError(t);
+ outputStreamComplete.completeExceptionally(t);
+ }
+ });
+ }
+ };
+ ServletInputStream inputStream = new ServletInputStreamWrapper(_servletContextRequest.getHttpInput())
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ int read = super.read();
+ if (read == -1)
+ inputStreamComplete.complete(null);
+ return read;
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException
+ {
+ try
+ {
+ int read = super.read(b);
+ if (read == -1)
+ inputStreamComplete.complete(null);
+ return read;
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ try
+ {
+ int read = super.read(b, off, len);
+ if (read == -1)
+ inputStreamComplete.complete(null);
+ return read;
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ super.close();
+ inputStreamComplete.complete(null);
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener)
+ {
+ super.setReadListener(new ReadListener()
+ {
+ @Override
+ public void onDataAvailable() throws IOException
+ {
+ readListener.onDataAvailable();
+ }
+
+ @Override
+ public void onAllDataRead() throws IOException
+ {
+ try
+ {
+ readListener.onAllDataRead();
+ inputStreamComplete.complete(null);
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ readListener.onError(t);
+ inputStreamComplete.completeExceptionally(t);
+ }
+ });
+ }
+ };
+
+ T upgradeHandler;
+ try
+ {
+ upgradeHandler = handlerClass.getDeclaredConstructor().newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new ServletException("Unable to instantiate handler class", e);
+ }
+
+ Connection connection = _servletContextRequest.getConnectionMetaData().getConnection();
+ if (connection instanceof Connection.Tunnel upgradeableConnection)
+ {
+ outputStream.flush(); // commit the 101 response
+ upgradeableConnection.startTunnel();
+ }
+ else
+ {
+ LOG.warn("Unexpected connection type {}", connection);
+ throw new IllegalStateException();
+ }
+ AsyncContext asyncContext = forceStartAsync(); // force the servlet in async mode
+ CompletableFuture.allOf(inputStreamComplete, outputStreamComplete).whenComplete((result, failure) ->
+ {
+ upgradeHandler.destroy();
+ asyncContext.complete();
+ });
+
+ WebConnection webConnection = new WebConnection()
+ {
+ @Override
+ public void close() throws Exception
+ {
+ IO.close(inputStream);
+ IO.close(outputStream);
+ }
+
+ @Override
+ public ServletInputStream getInputStream()
+ {
+ return inputStream;
+ }
+
+ @Override
+ public ServletOutputStream getOutputStream()
+ {
+ return outputStream;
+ }
+ };
+
+ upgradeHandler.init(webConnection);
+ return upgradeHandler;
}
@Override
@@ -1374,6 +1628,11 @@ public AsyncContext startAsync() throws IllegalStateException
{
if (!isAsyncSupported())
throw new IllegalStateException("Async Not Supported");
+ return forceStartAsync();
+ }
+
+ private AsyncContext forceStartAsync()
+ {
ServletChannelState state = getServletRequestInfo().getState();
if (_async == null)
_async = new AsyncContextState(state);
diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/util/ServletInputStreamWrapper.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/util/ServletInputStreamWrapper.java
new file mode 100644
index 000000000000..f2c033944ed3
--- /dev/null
+++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/util/ServletInputStreamWrapper.java
@@ -0,0 +1,114 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.ee10.servlet.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+
+public class ServletInputStreamWrapper extends ServletInputStream
+{
+ private final ServletInputStream _servletInputStream;
+
+ public ServletInputStreamWrapper(ServletInputStream servletInputStream)
+ {
+ _servletInputStream = servletInputStream;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return _servletInputStream.isFinished();
+ }
+
+ @Override
+ public boolean isReady()
+ {
+ return _servletInputStream.isReady();
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener)
+ {
+ _servletInputStream.setReadListener(readListener);
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return _servletInputStream.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException
+ {
+ return _servletInputStream.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ return _servletInputStream.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException
+ {
+ return _servletInputStream.skip(n);
+ }
+
+ @Override
+ public void skipNBytes(long n) throws IOException
+ {
+ _servletInputStream.skipNBytes(n);
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ return _servletInputStream.available();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ _servletInputStream.close();
+ }
+
+ @Override
+ public void mark(int readlimit)
+ {
+ _servletInputStream.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ _servletInputStream.reset();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return _servletInputStream.markSupported();
+ }
+
+ @Override
+ public long transferTo(OutputStream out) throws IOException
+ {
+ return _servletInputStream.transferTo(out);
+ }
+}
diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/util/ServletOutputStreamWrapper.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/util/ServletOutputStreamWrapper.java
index def8070f40f4..696b1576a0e5 100644
--- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/util/ServletOutputStreamWrapper.java
+++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/util/ServletOutputStreamWrapper.java
@@ -14,7 +14,6 @@
package org.eclipse.jetty.ee10.servlet.util;
import java.io.IOException;
-import java.io.OutputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
@@ -28,96 +27,6 @@ public ServletOutputStreamWrapper(ServletOutputStream outputStream)
_outputStream = outputStream;
}
- @Override
- public void print(String s) throws IOException
- {
- _outputStream.print(s);
- }
-
- @Override
- public void print(boolean b) throws IOException
- {
- _outputStream.print(b);
- }
-
- @Override
- public void print(char c) throws IOException
- {
- _outputStream.print(c);
- }
-
- @Override
- public void print(int i) throws IOException
- {
- _outputStream.print(i);
- }
-
- @Override
- public void print(long l) throws IOException
- {
- _outputStream.print(l);
- }
-
- @Override
- public void print(float f) throws IOException
- {
- _outputStream.print(f);
- }
-
- @Override
- public void print(double d) throws IOException
- {
- _outputStream.print(d);
- }
-
- @Override
- public void println() throws IOException
- {
- _outputStream.println();
- }
-
- @Override
- public void println(String s) throws IOException
- {
- _outputStream.println(s);
- }
-
- @Override
- public void println(boolean b) throws IOException
- {
- _outputStream.println(b);
- }
-
- @Override
- public void println(char c) throws IOException
- {
- _outputStream.println(c);
- }
-
- @Override
- public void println(int i) throws IOException
- {
- _outputStream.println(i);
- }
-
- @Override
- public void println(long l) throws IOException
- {
- _outputStream.println(l);
- }
-
- @Override
- public void println(float f) throws IOException
- {
- _outputStream.println(f);
- }
-
- @Override
- public void println(double d) throws IOException
- {
- _outputStream.println(d);
- }
-
@Override
public boolean isReady()
{
@@ -130,11 +39,6 @@ public void setWriteListener(WriteListener writeListener)
_outputStream.setWriteListener(writeListener);
}
- public static OutputStream nullOutputStream()
- {
- return OutputStream.nullOutputStream();
- }
-
@Override
public void write(int b) throws IOException
{
diff --git a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ServletUpgradeTest.java b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ServletUpgradeTest.java
index 8fd6a338b95f..49b7076e36a8 100644
--- a/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ServletUpgradeTest.java
+++ b/jetty-ee10/jetty-ee10-servlet/src/test/java/org/eclipse/jetty/ee10/servlet/ServletUpgradeTest.java
@@ -17,6 +17,9 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
@@ -29,16 +32,18 @@
import jakarta.servlet.http.WebConnection;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.util.StringUtil.CRLF;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ServletUpgradeTest
@@ -47,10 +52,13 @@ public class ServletUpgradeTest
private Server server;
private int port;
+ private static CountDownLatch destroyLatch;
@BeforeEach
public void setUp() throws Exception
{
+ destroyLatch = new CountDownLatch(1);
+
server = new Server();
ServerConnector connector = new ServerConnector(server);
@@ -72,130 +80,64 @@ public void tearDown() throws Exception
server.stop();
}
- @Disabled
@Test
public void upgradeTest() throws Exception
{
- boolean passed1 = false;
- boolean passed2 = false;
- boolean passed3 = false;
- String expectedResponse1 = "TCKHttpUpgradeHandler.init";
- String expectedResponse2 = "onDataAvailable|Hello";
- String expectedResponse3 = "onDataAvailable|World";
-
- InputStream input = null;
- OutputStream output = null;
- Socket s = null;
-
- try
+ Socket socket = new Socket("localhost", port);
+ socket.setSoTimeout(0);
+ InputStream input = socket.getInputStream();
+ OutputStream output = socket.getOutputStream();
+
+ String request = "POST /TestServlet HTTP/1.1" + CRLF +
+ "Host: localhost:" + port + CRLF +
+ "Upgrade: YES" + CRLF +
+ "Connection: Upgrade" + CRLF +
+ CRLF;
+
+ output.write(request.getBytes());
+ writeChunk(output, "Hello");
+ writeChunk(output, "World");
+ output.flush();
+ socket.shutdownOutput();
+
+ CompletableFuture futureContent = new CompletableFuture<>();
+ new Thread(() ->
{
- s = new Socket("localhost", port);
- output = s.getOutputStream();
-
- StringBuilder reqStr = new StringBuilder()
- .append("POST /TestServlet HTTP/1.1").append(CRLF)
- .append("User-Agent: Java/1.6.0_33").append(CRLF)
- .append("Host: localhost:").append(port).append(CRLF)
- .append("Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2").append(CRLF)
- .append("Upgrade: YES").append(CRLF)
- .append("Connection: Upgrade").append(CRLF)
- .append("Content-type: application/x-www-form-urlencoded").append(CRLF)
- .append(CRLF);
-
- LOG.info("REQUEST=========" + reqStr.toString());
- output.write(reqStr.toString().getBytes());
-
- LOG.info("Writing first chunk");
- writeChunk(output, "Hello");
-
- LOG.info("Writing second chunk");
- writeChunk(output, "World");
-
LOG.info("Consuming the response from the server");
-
- // Consume the response from the server
- input = s.getInputStream();
- int len;
- byte[] b = new byte[1024];
- boolean receivedFirstMessage = false;
- boolean receivedSecondMessage = false;
- boolean receivedThirdMessage = false;
- StringBuilder sb = new StringBuilder();
- while ((len = input.read(b)) != -1)
- {
- String line = new String(b, 0, len);
- sb.append(line);
- LOG.info("==============Read from server:" + CRLF + sb + CRLF);
- if (passed1 = compareString(expectedResponse1, sb.toString()))
- {
- LOG.info("==============Received first expected response!" + CRLF);
- receivedFirstMessage = true;
- }
- if (passed2 = compareString(expectedResponse2, sb.toString()))
- {
- LOG.info("==============Received second expected response!" + CRLF);
- receivedSecondMessage = true;
- }
- if (passed3 = compareString(expectedResponse3, sb.toString()))
- {
- LOG.info("==============Received third expected response!" + CRLF);
- receivedThirdMessage = true;
- }
- LOG.info("receivedFirstMessage : " + receivedFirstMessage);
- LOG.info("receivedSecondMessage : " + receivedSecondMessage);
- LOG.info("receivedThirdMessage : " + receivedThirdMessage);
- if (receivedFirstMessage && receivedSecondMessage && receivedThirdMessage)
- {
- break;
- }
- }
- }
- finally
- {
+ Utf8StringBuilder sb = new Utf8StringBuilder();
try
{
- if (input != null)
+ while (true)
{
- LOG.info("Closing input...");
- input.close();
- LOG.info("Input closed.");
+ int read = input.read();
+ if (read == -1)
+ break;
+ sb.append((byte)read);
}
+ futureContent.complete(sb.toCompleteString());
}
- catch (Exception ex)
+ catch (Throwable t)
{
- LOG.error("Failed to close input:" + ex.getMessage(), ex);
+ LOG.warn("failed with content: " + sb, t);
+ futureContent.completeExceptionally(t);
}
- try
- {
- if (output != null)
- {
- LOG.info("Closing output...");
- output.close();
- LOG.info("Output closed .");
- }
- }
- catch (Exception ex)
- {
- LOG.error("Failed to close output:" + ex.getMessage(), ex);
- }
-
- try
- {
- if (s != null)
- {
- LOG.info("Closing socket..." + CRLF);
- s.close();
- LOG.info("Socked closed.");
- }
- }
- catch (Exception ex)
- {
- LOG.error("Failed to close socket:" + ex.getMessage(), ex);
- }
- }
-
- assertTrue(passed1 && passed2 && passed3);
+ }).start();
+
+ String content = futureContent.get(5, TimeUnit.SECONDS);
+ String expectedContent = """
+ TCKHttpUpgradeHandler.init\r
+ =onDataAvailable\r
+ HelloWorld\r
+ =onAllDataRead\r
+ """;
+ assertThat(content, startsWith("HTTP/1.1 101 Switching Protocols"));
+ assertThat(content, endsWith(expectedContent));
+
+ input.close();
+ output.close();
+ socket.close();
+ assertTrue(destroyLatch.await(5, TimeUnit.SECONDS));
}
private static class TestServlet extends HttpServlet
@@ -227,7 +169,7 @@ public TestHttpUpgradeHandler()
@Override
public void destroy()
{
- LOG.debug("===============destroy");
+ destroyLatch.countDown();
}
@Override
@@ -237,9 +179,9 @@ public void init(WebConnection wc)
{
ServletInputStream input = wc.getInputStream();
ServletOutputStream output = wc.getOutputStream();
- TestReadListener readListener = new TestReadListener("/", input, output);
+ TestReadListener readListener = new TestReadListener(input, output);
input.setReadListener(readListener);
- output.println("===============TCKHttpUpgradeHandler.init");
+ output.println("TCKHttpUpgradeHandler.init");
output.flush();
}
catch (Exception ex)
@@ -253,20 +195,20 @@ private static class TestReadListener implements ReadListener
{
private final ServletInputStream input;
private final ServletOutputStream output;
- private final String delimiter;
+ private boolean outputOnDataAvailable = false;
- TestReadListener(String del, ServletInputStream in, ServletOutputStream out)
+ TestReadListener(ServletInputStream in, ServletOutputStream out)
{
input = in;
output = out;
- delimiter = del;
}
+ @Override
public void onAllDataRead()
{
try
{
- output.println("=onAllDataRead");
+ output.println("\r\n=onAllDataRead");
output.close();
}
catch (Exception ex)
@@ -275,11 +217,17 @@ public void onAllDataRead()
}
}
+ @Override
public void onDataAvailable()
{
try
{
- output.println("=onDataAvailable");
+ if (!outputOnDataAvailable)
+ {
+ outputOnDataAvailable = true;
+ output.println("=onDataAvailable");
+ }
+
StringBuilder sb = new StringBuilder();
int len;
byte[] b = new byte[1024];
@@ -288,7 +236,7 @@ public void onDataAvailable()
String data = new String(b, 0, len);
sb.append(data);
}
- output.println(delimiter + sb.toString());
+ output.print(sb.toString());
output.flush();
}
catch (Exception ex)
@@ -297,50 +245,13 @@ public void onDataAvailable()
}
}
+ @Override
public void onError(final Throwable t)
{
LOG.error("TestReadListener error", t);
}
}
- private static boolean compareString(String expected, String actual)
- {
- String[] listExpected = expected.split("[|]");
- boolean found = true;
- for (int i = 0, n = listExpected.length, startIdx = 0, bodyLength = actual.length(); i < n; i++)
- {
- String search = listExpected[i];
- if (startIdx >= bodyLength)
- {
- startIdx = bodyLength;
- }
-
- int searchIdx = actual.toLowerCase().indexOf(search.toLowerCase(), startIdx);
-
- LOG.debug("[ServletTestUtil] Scanning response for " + "search string: '" + search + "' starting at index " + "location: " + startIdx);
- if (searchIdx < 0)
- {
- found = false;
- String s = "[ServletTestUtil] Unable to find the following " +
- "search string in the server's " +
- "response: '" + search + "' at index: " +
- startIdx +
- "\n[ServletTestUtil] Server's response:\n" +
- "-------------------------------------------\n" +
- actual +
- "\n-------------------------------------------\n";
- LOG.debug(s);
- break;
- }
-
- LOG.debug("[ServletTestUtil] Found search string: '" + search + "' at index '" + searchIdx + "' in the server's " + "response");
- // the new searchIdx is the old index plus the lenght of the
- // search string.
- startIdx = searchIdx + search.length();
- }
- return found;
- }
-
private static void writeChunk(OutputStream out, String data) throws IOException
{
if (data != null)
diff --git a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/ServletApiRequest.java b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/ServletApiRequest.java
index 1981b89a8741..5a7203b31009 100644
--- a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/ServletApiRequest.java
+++ b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/ServletApiRequest.java
@@ -41,15 +41,18 @@
import jakarta.servlet.AsyncContext;
import jakarta.servlet.DispatcherType;
+import jakarta.servlet.ReadListener;
import jakarta.servlet.RequestDispatcher;
import jakarta.servlet.ServletConnection;
import jakarta.servlet.ServletContext;
import jakarta.servlet.ServletException;
import jakarta.servlet.ServletInputStream;
+import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.ServletRequest;
import jakarta.servlet.ServletRequestAttributeEvent;
import jakarta.servlet.ServletRequestAttributeListener;
import jakarta.servlet.ServletResponse;
+import jakarta.servlet.WriteListener;
import jakarta.servlet.http.Cookie;
import jakarta.servlet.http.HttpServletMapping;
import jakarta.servlet.http.HttpServletRequest;
@@ -58,7 +61,10 @@
import jakarta.servlet.http.HttpUpgradeHandler;
import jakarta.servlet.http.Part;
import jakarta.servlet.http.PushBuilder;
+import jakarta.servlet.http.WebConnection;
import org.eclipse.jetty.ee11.servlet.ServletContextHandler.ServletRequestInfo;
+import org.eclipse.jetty.ee11.servlet.util.ServletInputStreamWrapper;
+import org.eclipse.jetty.ee11.servlet.util.ServletOutputStreamWrapper;
import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.CookieCompliance;
import org.eclipse.jetty.http.HttpCookie;
@@ -72,6 +78,7 @@
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.http.SetCookieParser;
import org.eclipse.jetty.http.pathmap.MatchedResource;
+import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.QuietException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.security.AuthenticationState;
@@ -737,8 +744,256 @@ public Part getPart(String name) throws IOException, ServletException
@Override
public T upgrade(Class handlerClass) throws IOException, ServletException
{
- // Not implemented. Throw ServletException as per spec.
- throw new ServletException("Not implemented");
+ Response response = _servletContextRequest.getServletContextResponse();
+ if (response.getStatus() != HttpStatus.SWITCHING_PROTOCOLS_101)
+ throw new IllegalStateException("Response status should be 101");
+ if (response.getHeaders().get("Upgrade") == null)
+ throw new IllegalStateException("Missing Upgrade header");
+ if (!"Upgrade".equalsIgnoreCase(response.getHeaders().get("Connection")))
+ throw new IllegalStateException("Invalid Connection header");
+ if (response.isCommitted())
+ throw new IllegalStateException("Cannot upgrade committed response");
+ if (_servletChannel.getConnectionMetaData().getHttpVersion() != HttpVersion.HTTP_1_1)
+ throw new IllegalStateException("Only requests over HTTP/1.1 can be upgraded");
+
+ CompletableFuture outputStreamComplete = new CompletableFuture<>();
+ CompletableFuture inputStreamComplete = new CompletableFuture<>();
+ ServletOutputStream outputStream = new ServletOutputStreamWrapper(_servletContextRequest.getHttpOutput())
+ {
+ @Override
+ public void write(int b) throws IOException
+ {
+ try
+ {
+ super.write(b);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException
+ {
+ try
+ {
+ super.write(b);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ try
+ {
+ super.write(b, off, len);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ super.close();
+ outputStreamComplete.complete(null);
+ }
+ catch (Throwable t)
+ {
+ outputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void setWriteListener(WriteListener writeListener)
+ {
+ super.setWriteListener(new WriteListener()
+ {
+ @Override
+ public void onWritePossible() throws IOException
+ {
+ writeListener.onWritePossible();
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ writeListener.onError(t);
+ outputStreamComplete.completeExceptionally(t);
+ }
+ });
+ }
+ };
+ ServletInputStream inputStream = new ServletInputStreamWrapper(_servletContextRequest.getHttpInput())
+ {
+ @Override
+ public int read() throws IOException
+ {
+ try
+ {
+ int read = super.read();
+ if (read == -1)
+ inputStreamComplete.complete(null);
+ return read;
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException
+ {
+ try
+ {
+ int read = super.read(b);
+ if (read == -1)
+ inputStreamComplete.complete(null);
+ return read;
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ try
+ {
+ int read = super.read(b, off, len);
+ if (read == -1)
+ inputStreamComplete.complete(null);
+ return read;
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ super.close();
+ inputStreamComplete.complete(null);
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener)
+ {
+ super.setReadListener(new ReadListener()
+ {
+ @Override
+ public void onDataAvailable() throws IOException
+ {
+ readListener.onDataAvailable();
+ }
+
+ @Override
+ public void onAllDataRead() throws IOException
+ {
+ try
+ {
+ readListener.onAllDataRead();
+ inputStreamComplete.complete(null);
+ }
+ catch (Throwable t)
+ {
+ inputStreamComplete.completeExceptionally(t);
+ throw t;
+ }
+ }
+
+ @Override
+ public void onError(Throwable t)
+ {
+ readListener.onError(t);
+ inputStreamComplete.completeExceptionally(t);
+ }
+ });
+ }
+ };
+
+ T upgradeHandler;
+ try
+ {
+ upgradeHandler = handlerClass.getDeclaredConstructor().newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new ServletException("Unable to instantiate handler class", e);
+ }
+
+ Connection connection = _servletContextRequest.getConnectionMetaData().getConnection();
+ if (connection instanceof Connection.Tunnel upgradeableConnection)
+ {
+ outputStream.flush(); // commit the 101 response
+ upgradeableConnection.startTunnel();
+ }
+ else
+ {
+ LOG.warn("Unexpected connection type {}", connection);
+ throw new IllegalStateException();
+ }
+
+ AsyncContext asyncContext = forceStartAsync(); // force the servlet in async mode
+ CompletableFuture.allOf(inputStreamComplete, outputStreamComplete).whenComplete((result, failure) ->
+ {
+ upgradeHandler.destroy();
+ asyncContext.complete();
+ });
+
+ WebConnection webConnection = new WebConnection()
+ {
+ @Override
+ public void close() throws Exception
+ {
+ IO.close(inputStream);
+ IO.close(outputStream);
+ }
+
+ @Override
+ public ServletInputStream getInputStream()
+ {
+ return inputStream;
+ }
+
+ @Override
+ public ServletOutputStream getOutputStream()
+ {
+ return outputStream;
+ }
+ };
+
+ upgradeHandler.init(webConnection);
+ return upgradeHandler;
}
@Override
@@ -1382,6 +1637,11 @@ public AsyncContext startAsync() throws IllegalStateException
{
if (!isAsyncSupported())
throw new IllegalStateException("Async Not Supported");
+ return forceStartAsync();
+ }
+
+ private AsyncContext forceStartAsync()
+ {
ServletChannelState state = getServletRequestInfo().getState();
if (_async == null)
_async = new AsyncContextState(state);
diff --git a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/util/ServletInputStreamWrapper.java b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/util/ServletInputStreamWrapper.java
new file mode 100644
index 000000000000..91de243b87e1
--- /dev/null
+++ b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/util/ServletInputStreamWrapper.java
@@ -0,0 +1,114 @@
+//
+// ========================================================================
+// Copyright (c) 1995 Mort Bay Consulting Pty Ltd and others.
+//
+// This program and the accompanying materials are made available under the
+// terms of the Eclipse Public License v. 2.0 which is available at
+// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
+// which is available at https://www.apache.org/licenses/LICENSE-2.0.
+//
+// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
+// ========================================================================
+//
+
+package org.eclipse.jetty.ee11.servlet.util;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import jakarta.servlet.ReadListener;
+import jakarta.servlet.ServletInputStream;
+
+public class ServletInputStreamWrapper extends ServletInputStream
+{
+ private final ServletInputStream _servletInputStream;
+
+ public ServletInputStreamWrapper(ServletInputStream servletInputStream)
+ {
+ _servletInputStream = servletInputStream;
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ return _servletInputStream.isFinished();
+ }
+
+ @Override
+ public boolean isReady()
+ {
+ return _servletInputStream.isReady();
+ }
+
+ @Override
+ public void setReadListener(ReadListener readListener)
+ {
+ _servletInputStream.setReadListener(readListener);
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return _servletInputStream.read();
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException
+ {
+ return _servletInputStream.read(b);
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ return _servletInputStream.read(b, off, len);
+ }
+
+ @Override
+ public long skip(long n) throws IOException
+ {
+ return _servletInputStream.skip(n);
+ }
+
+ @Override
+ public void skipNBytes(long n) throws IOException
+ {
+ _servletInputStream.skipNBytes(n);
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ return _servletInputStream.available();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ _servletInputStream.close();
+ }
+
+ @Override
+ public void mark(int readlimit)
+ {
+ _servletInputStream.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ _servletInputStream.reset();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return _servletInputStream.markSupported();
+ }
+
+ @Override
+ public long transferTo(OutputStream out) throws IOException
+ {
+ return _servletInputStream.transferTo(out);
+ }
+}
diff --git a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/util/ServletOutputStreamWrapper.java b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/util/ServletOutputStreamWrapper.java
index 773925651676..97c808ab04c7 100644
--- a/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/util/ServletOutputStreamWrapper.java
+++ b/jetty-ee11/jetty-ee11-servlet/src/main/java/org/eclipse/jetty/ee11/servlet/util/ServletOutputStreamWrapper.java
@@ -14,7 +14,6 @@
package org.eclipse.jetty.ee11.servlet.util;
import java.io.IOException;
-import java.io.OutputStream;
import jakarta.servlet.ServletOutputStream;
import jakarta.servlet.WriteListener;
@@ -28,96 +27,6 @@ public ServletOutputStreamWrapper(ServletOutputStream outputStream)
_outputStream = outputStream;
}
- @Override
- public void print(String s) throws IOException
- {
- _outputStream.print(s);
- }
-
- @Override
- public void print(boolean b) throws IOException
- {
- _outputStream.print(b);
- }
-
- @Override
- public void print(char c) throws IOException
- {
- _outputStream.print(c);
- }
-
- @Override
- public void print(int i) throws IOException
- {
- _outputStream.print(i);
- }
-
- @Override
- public void print(long l) throws IOException
- {
- _outputStream.print(l);
- }
-
- @Override
- public void print(float f) throws IOException
- {
- _outputStream.print(f);
- }
-
- @Override
- public void print(double d) throws IOException
- {
- _outputStream.print(d);
- }
-
- @Override
- public void println() throws IOException
- {
- _outputStream.println();
- }
-
- @Override
- public void println(String s) throws IOException
- {
- _outputStream.println(s);
- }
-
- @Override
- public void println(boolean b) throws IOException
- {
- _outputStream.println(b);
- }
-
- @Override
- public void println(char c) throws IOException
- {
- _outputStream.println(c);
- }
-
- @Override
- public void println(int i) throws IOException
- {
- _outputStream.println(i);
- }
-
- @Override
- public void println(long l) throws IOException
- {
- _outputStream.println(l);
- }
-
- @Override
- public void println(float f) throws IOException
- {
- _outputStream.println(f);
- }
-
- @Override
- public void println(double d) throws IOException
- {
- _outputStream.println(d);
- }
-
@Override
public boolean isReady()
{
@@ -130,11 +39,6 @@ public void setWriteListener(WriteListener writeListener)
_outputStream.setWriteListener(writeListener);
}
- public static OutputStream nullOutputStream()
- {
- return OutputStream.nullOutputStream();
- }
-
@Override
public void write(int b) throws IOException
{
diff --git a/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/ServletUpgradeTest.java b/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/ServletUpgradeTest.java
index d08785673c45..3fb9eee21c9e 100644
--- a/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/ServletUpgradeTest.java
+++ b/jetty-ee11/jetty-ee11-servlet/src/test/java/org/eclipse/jetty/ee11/servlet/ServletUpgradeTest.java
@@ -17,6 +17,9 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import jakarta.servlet.ReadListener;
import jakarta.servlet.ServletException;
@@ -29,16 +32,18 @@
import jakarta.servlet.http.WebConnection;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.eclipse.jetty.util.StringUtil.CRLF;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.endsWith;
import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.startsWith;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class ServletUpgradeTest
@@ -47,10 +52,13 @@ public class ServletUpgradeTest
private Server server;
private int port;
+ private static CountDownLatch destroyLatch;
@BeforeEach
public void setUp() throws Exception
{
+ destroyLatch = new CountDownLatch(1);
+
server = new Server();
ServerConnector connector = new ServerConnector(server);
@@ -72,130 +80,64 @@ public void tearDown() throws Exception
server.stop();
}
- @Disabled
@Test
public void upgradeTest() throws Exception
{
- boolean passed1 = false;
- boolean passed2 = false;
- boolean passed3 = false;
- String expectedResponse1 = "TCKHttpUpgradeHandler.init";
- String expectedResponse2 = "onDataAvailable|Hello";
- String expectedResponse3 = "onDataAvailable|World";
-
- InputStream input = null;
- OutputStream output = null;
- Socket s = null;
-
- try
+ Socket socket = new Socket("localhost", port);
+ socket.setSoTimeout(0);
+ InputStream input = socket.getInputStream();
+ OutputStream output = socket.getOutputStream();
+
+ String request = "POST /TestServlet HTTP/1.1" + CRLF +
+ "Host: localhost:" + port + CRLF +
+ "Upgrade: YES" + CRLF +
+ "Connection: Upgrade" + CRLF +
+ CRLF;
+
+ output.write(request.getBytes());
+ writeChunk(output, "Hello");
+ writeChunk(output, "World");
+ output.flush();
+ socket.shutdownOutput();
+
+ CompletableFuture futureContent = new CompletableFuture<>();
+ new Thread(() ->
{
- s = new Socket("localhost", port);
- output = s.getOutputStream();
-
- StringBuilder reqStr = new StringBuilder()
- .append("POST /TestServlet HTTP/1.1").append(CRLF)
- .append("User-Agent: Java/1.6.0_33").append(CRLF)
- .append("Host: localhost:").append(port).append(CRLF)
- .append("Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2").append(CRLF)
- .append("Upgrade: YES").append(CRLF)
- .append("Connection: Upgrade").append(CRLF)
- .append("Content-type: application/x-www-form-urlencoded").append(CRLF)
- .append(CRLF);
-
- LOG.info("REQUEST=========" + reqStr.toString());
- output.write(reqStr.toString().getBytes());
-
- LOG.info("Writing first chunk");
- writeChunk(output, "Hello");
-
- LOG.info("Writing second chunk");
- writeChunk(output, "World");
-
LOG.info("Consuming the response from the server");
-
- // Consume the response from the server
- input = s.getInputStream();
- int len;
- byte[] b = new byte[1024];
- boolean receivedFirstMessage = false;
- boolean receivedSecondMessage = false;
- boolean receivedThirdMessage = false;
- StringBuilder sb = new StringBuilder();
- while ((len = input.read(b)) != -1)
- {
- String line = new String(b, 0, len);
- sb.append(line);
- LOG.info("==============Read from server:" + CRLF + sb + CRLF);
- if (passed1 = compareString(expectedResponse1, sb.toString()))
- {
- LOG.info("==============Received first expected response!" + CRLF);
- receivedFirstMessage = true;
- }
- if (passed2 = compareString(expectedResponse2, sb.toString()))
- {
- LOG.info("==============Received second expected response!" + CRLF);
- receivedSecondMessage = true;
- }
- if (passed3 = compareString(expectedResponse3, sb.toString()))
- {
- LOG.info("==============Received third expected response!" + CRLF);
- receivedThirdMessage = true;
- }
- LOG.info("receivedFirstMessage : " + receivedFirstMessage);
- LOG.info("receivedSecondMessage : " + receivedSecondMessage);
- LOG.info("receivedThirdMessage : " + receivedThirdMessage);
- if (receivedFirstMessage && receivedSecondMessage && receivedThirdMessage)
- {
- break;
- }
- }
- }
- finally
- {
+ Utf8StringBuilder sb = new Utf8StringBuilder();
try
{
- if (input != null)
+ while (true)
{
- LOG.info("Closing input...");
- input.close();
- LOG.info("Input closed.");
+ int read = input.read();
+ if (read == -1)
+ break;
+ sb.append((byte)read);
}
+ futureContent.complete(sb.toCompleteString());
}
- catch (Exception ex)
+ catch (Throwable t)
{
- LOG.error("Failed to close input:" + ex.getMessage(), ex);
+ LOG.warn("failed with content: " + sb, t);
+ futureContent.completeExceptionally(t);
}
- try
- {
- if (output != null)
- {
- LOG.info("Closing output...");
- output.close();
- LOG.info("Output closed .");
- }
- }
- catch (Exception ex)
- {
- LOG.error("Failed to close output:" + ex.getMessage(), ex);
- }
-
- try
- {
- if (s != null)
- {
- LOG.info("Closing socket..." + CRLF);
- s.close();
- LOG.info("Socked closed.");
- }
- }
- catch (Exception ex)
- {
- LOG.error("Failed to close socket:" + ex.getMessage(), ex);
- }
- }
-
- assertTrue(passed1 && passed2 && passed3);
+ }).start();
+
+ String content = futureContent.get(5, TimeUnit.SECONDS);
+ String expectedContent = """
+ TCKHttpUpgradeHandler.init\r
+ =onDataAvailable\r
+ HelloWorld\r
+ =onAllDataRead\r
+ """;
+ assertThat(content, startsWith("HTTP/1.1 101 Switching Protocols"));
+ assertThat(content, endsWith(expectedContent));
+
+ input.close();
+ output.close();
+ socket.close();
+ assertTrue(destroyLatch.await(5, TimeUnit.SECONDS));
}
private static class TestServlet extends HttpServlet
@@ -227,7 +169,7 @@ public TestHttpUpgradeHandler()
@Override
public void destroy()
{
- LOG.debug("===============destroy");
+ destroyLatch.countDown();
}
@Override
@@ -237,9 +179,9 @@ public void init(WebConnection wc)
{
ServletInputStream input = wc.getInputStream();
ServletOutputStream output = wc.getOutputStream();
- TestReadListener readListener = new TestReadListener("/", input, output);
+ TestReadListener readListener = new TestReadListener(input, output);
input.setReadListener(readListener);
- output.println("===============TCKHttpUpgradeHandler.init");
+ output.println("TCKHttpUpgradeHandler.init");
output.flush();
}
catch (Exception ex)
@@ -253,20 +195,20 @@ private static class TestReadListener implements ReadListener
{
private final ServletInputStream input;
private final ServletOutputStream output;
- private final String delimiter;
+ private boolean outputOnDataAvailable = false;
- TestReadListener(String del, ServletInputStream in, ServletOutputStream out)
+ TestReadListener(ServletInputStream in, ServletOutputStream out)
{
input = in;
output = out;
- delimiter = del;
}
+ @Override
public void onAllDataRead()
{
try
{
- output.println("=onAllDataRead");
+ output.println("\r\n=onAllDataRead");
output.close();
}
catch (Exception ex)
@@ -275,11 +217,17 @@ public void onAllDataRead()
}
}
+ @Override
public void onDataAvailable()
{
try
{
- output.println("=onDataAvailable");
+ if (!outputOnDataAvailable)
+ {
+ outputOnDataAvailable = true;
+ output.println("=onDataAvailable");
+ }
+
StringBuilder sb = new StringBuilder();
int len;
byte[] b = new byte[1024];
@@ -288,7 +236,7 @@ public void onDataAvailable()
String data = new String(b, 0, len);
sb.append(data);
}
- output.println(delimiter + sb.toString());
+ output.print(sb.toString());
output.flush();
}
catch (Exception ex)
@@ -297,50 +245,13 @@ public void onDataAvailable()
}
}
+ @Override
public void onError(final Throwable t)
{
LOG.error("TestReadListener error", t);
}
}
- private static boolean compareString(String expected, String actual)
- {
- String[] listExpected = expected.split("[|]");
- boolean found = true;
- for (int i = 0, n = listExpected.length, startIdx = 0, bodyLength = actual.length(); i < n; i++)
- {
- String search = listExpected[i];
- if (startIdx >= bodyLength)
- {
- startIdx = bodyLength;
- }
-
- int searchIdx = actual.toLowerCase().indexOf(search.toLowerCase(), startIdx);
-
- LOG.debug("[ServletTestUtil] Scanning response for " + "search string: '" + search + "' starting at index " + "location: " + startIdx);
- if (searchIdx < 0)
- {
- found = false;
- String s = "[ServletTestUtil] Unable to find the following " +
- "search string in the server's " +
- "response: '" + search + "' at index: " +
- startIdx +
- "\n[ServletTestUtil] Server's response:\n" +
- "-------------------------------------------\n" +
- actual +
- "\n-------------------------------------------\n";
- LOG.debug(s);
- break;
- }
-
- LOG.debug("[ServletTestUtil] Found search string: '" + search + "' at index '" + searchIdx + "' in the server's " + "response");
- // the new searchIdx is the old index plus the lenght of the
- // search string.
- startIdx = searchIdx + search.length();
- }
- return found;
- }
-
private static void writeChunk(OutputStream out, String data) throws IOException
{
if (data != null)
diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/Request.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/Request.java
index cd87e9962795..ee810f9fe711 100644
--- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/Request.java
+++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/Request.java
@@ -596,7 +596,7 @@ public ComplianceViolation.Listener getComplianceViolationListener()
*
* - org.eclipse.jetty.server.Server
- The Jetty Server instance
* - org.eclipse.jetty.server.HttpChannel
- The HttpChannel for this request
- * - org.eclipse.jetty.server.HttpConnection
- The HttpConnection or null if another transport is used
+ * - org.eclipse.jetty.io.Connection
- The Connection or null if another transport is used
*
* While these attributes may look like security problems, they are exposing nothing that is not already
* available via reflection from a Request instance.
diff --git a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-integration/src/test/java/org/eclipse/jetty/ee9/test/GzipWithSendErrorTest.java b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-integration/src/test/java/org/eclipse/jetty/ee9/test/GzipWithSendErrorTest.java
index 25542d3dcf98..a640c74429f7 100644
--- a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-integration/src/test/java/org/eclipse/jetty/ee9/test/GzipWithSendErrorTest.java
+++ b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-integration/src/test/java/org/eclipse/jetty/ee9/test/GzipWithSendErrorTest.java
@@ -44,7 +44,6 @@
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
-import org.eclipse.jetty.server.internal.HttpChannelState;
import org.eclipse.jetty.server.internal.HttpConnection;
import org.eclipse.jetty.toolchain.test.MavenTestingUtils;
import org.eclipse.jetty.util.Callback;