diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index b315e9353806..8d6a857c41f7 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -1321,6 +1321,7 @@ public void succeeded() httpChannel = _request.lockedGetHttpChannelState(); httpChannel.lockedStreamSendCompleted(true); } + if (callback != null) httpChannel._writeInvoker.run(callback::succeeded); } @@ -1350,6 +1351,7 @@ public void failed(Throwable x) httpChannel = _request.lockedGetHttpChannelState(); httpChannel.lockedStreamSendCompleted(false); } + if (callback != null) httpChannel._writeInvoker.run(() -> HttpChannelState.failed(callback, x)); } diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 2c1e6733da72..11384b9beedd 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -1587,9 +1587,7 @@ public void failed(Throwable x) if (LOG.isDebugEnabled()) LOG.debug("aborting", x); abort(x); - _httpChannel.recycle(); - _parser.reset(); - _generator.reset(); + _httpChannel.setHttpStream(null); if (!_handling.compareAndSet(true, false)) resume(); } diff --git a/jetty-ee8/jetty-ee8-nested/pom.xml b/jetty-ee8/jetty-ee8-nested/pom.xml index e88eb54b9717..ecb49d60ebe6 100644 --- a/jetty-ee8/jetty-ee8-nested/pom.xml +++ b/jetty-ee8/jetty-ee8-nested/pom.xml @@ -48,6 +48,11 @@ org.slf4j slf4j-api + + org.awaitility + awaitility + test + org.eclipse.jetty jetty-http-tools diff --git a/jetty-ee9/jetty-ee9-nested/pom.xml b/jetty-ee9/jetty-ee9-nested/pom.xml index 5d84125b3752..20ad455048ce 100644 --- a/jetty-ee9/jetty-ee9-nested/pom.xml +++ b/jetty-ee9/jetty-ee9-nested/pom.xml @@ -47,6 +47,11 @@ org.slf4j slf4j-api + + org.awaitility + awaitility + test + org.eclipse.jetty jetty-http-tools diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java index 8ccad21c9a35..3145e9e67381 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java @@ -156,8 +156,12 @@ public ConnectionMetaData getConnectionMetaData() */ public boolean needContent() { + ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest(); + // When coreContextRequest is null, produceContent() always immediately returns an error content. + if (coreContextRequest == null) + return true; // TODO: optimize by attempting a read? - getCoreRequest().demand(_needContentTask); + coreContextRequest.demand(_needContentTask); return false; } @@ -172,7 +176,10 @@ public boolean needContent() */ public HttpInput.Content produceContent() { - Content.Chunk chunk = getCoreRequest().read(); + ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest(); + if (coreContextRequest == null) + return new HttpInput.ErrorContent(new IOException("Channel has been recycled")); + Content.Chunk chunk = coreContextRequest.read(); if (chunk == null) return null; diff --git a/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java b/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java index ffb59d701d61..11d731614b2c 100644 --- a/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java +++ b/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java @@ -39,9 +39,9 @@ import org.eclipse.jetty.server.handler.gzip.GzipHandler; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -51,7 +51,6 @@ import static org.hamcrest.core.Is.is; import static org.junit.jupiter.api.Assertions.assertTrue; -@Disabled // TODO public class BlockingTest { private Server server; @@ -86,7 +85,7 @@ public void testBlockingReadThenNormalComplete() throws Exception public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { baseRequest.setHandled(true); - new Thread(() -> + Thread thread = new Thread(() -> { try { @@ -103,14 +102,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques readException.set(t); stopped.countDown(); } - }).start(); + }); + thread.start(); try { // wait for thread to start and read first byte - started.await(10, TimeUnit.SECONDS); + assertTrue(started.await(10, TimeUnit.SECONDS)); // give it time to block on second byte - Thread.sleep(1000); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread.State state = thread.getState(); + return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; + }); } catch (Throwable e) { @@ -253,13 +257,14 @@ public void testNormalCompleteThenBlockingRead() throws Exception CountDownLatch completed = new CountDownLatch(1); CountDownLatch stopped = new CountDownLatch(1); AtomicReference readException = new AtomicReference<>(); + AtomicReference threadRef = new AtomicReference<>(); AbstractHandler handler = new AbstractHandler() { @Override public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { baseRequest.setHandled(true); - new Thread(() -> + Thread thread = new Thread(() -> { try { @@ -267,8 +272,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques if (b == '1') { started.countDown(); - completed.await(10, TimeUnit.SECONDS); - Thread.sleep(500); + assertTrue(completed.await(10, TimeUnit.SECONDS)); if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) throw new IllegalStateException(); } @@ -278,14 +282,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques readException.set(t); stopped.countDown(); } - }).start(); + }); + threadRef.set(thread); + thread.start(); try { // wait for thread to start and read first byte - started.await(10, TimeUnit.SECONDS); + assertTrue(started.await(10, TimeUnit.SECONDS)); // give it time to block on second byte - Thread.sleep(1000); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread.State state = thread.getState(); + return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; + }); } catch (Throwable e) { @@ -321,7 +331,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques assertThat(response.getContent(), containsString("OK")); completed.countDown(); - Thread.sleep(1000); + await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED); // Async thread should have stopped assertTrue(stopped.await(10, TimeUnit.SECONDS)); @@ -335,6 +345,7 @@ public void testStartAsyncThenBlockingReadThenTimeout() throws Exception CountDownLatch started = new CountDownLatch(1); CountDownLatch completed = new CountDownLatch(1); CountDownLatch stopped = new CountDownLatch(1); + AtomicReference threadRef = new AtomicReference<>(); AtomicReference readException = new AtomicReference<>(); AbstractHandler handler = new AbstractHandler() { @@ -347,7 +358,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques AsyncContext async = request.startAsync(); async.setTimeout(100); - new Thread(() -> + Thread thread = new Thread(() -> { try { @@ -355,8 +366,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques if (b == '1') { started.countDown(); - completed.await(10, TimeUnit.SECONDS); - Thread.sleep(500); + assertTrue(completed.await(10, TimeUnit.SECONDS)); if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE) throw new IllegalStateException(); } @@ -366,14 +376,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques readException.set(t); stopped.countDown(); } - }).start(); + }); + threadRef.set(thread); + thread.start(); try { // wait for thread to start and read first byte - started.await(10, TimeUnit.SECONDS); + assertTrue(started.await(10, TimeUnit.SECONDS)); // give it time to block on second byte - Thread.sleep(1000); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread.State state = thread.getState(); + return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; + }); } catch (Throwable e) { @@ -406,7 +422,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques assertThat(response.getContent(), containsString("AsyncContext timeout")); completed.countDown(); - Thread.sleep(1000); + await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED); // Async thread should have stopped assertTrue(stopped.await(10, TimeUnit.SECONDS)); @@ -428,7 +444,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques baseRequest.setHandled(true); if (baseRequest.getDispatcherType() != DispatcherType.ERROR) { - new Thread(() -> + Thread thread = new Thread(() -> { try { @@ -445,14 +461,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques readException.set(t); stopped.countDown(); } - }).start(); + }); + thread.start(); try { // wait for thread to start and read first byte - started.await(10, TimeUnit.SECONDS); + assertTrue(started.await(10, TimeUnit.SECONDS)); // give it time to block on second byte - Thread.sleep(1000); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread.State state = thread.getState(); + return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; + }); } catch (Throwable e) { @@ -505,7 +526,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques baseRequest.setHandled(true); response.setStatus(200); response.setContentType("text/plain"); - new Thread(() -> + Thread thread = new Thread(() -> { try { @@ -523,14 +544,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques readException.set(t); stopped.countDown(); } - }).start(); + }); + thread.start(); try { // wait for thread to start and read first byte - started.await(10, TimeUnit.SECONDS); + assertTrue(started.await(10, TimeUnit.SECONDS)); // give it time to block on write - Thread.sleep(1000); + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + Thread.State state = thread.getState(); + return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING; + }); } catch (Throwable e) {