diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
index b315e9353806..8d6a857c41f7 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java
@@ -1321,6 +1321,7 @@ public void succeeded()
httpChannel = _request.lockedGetHttpChannelState();
httpChannel.lockedStreamSendCompleted(true);
}
+
if (callback != null)
httpChannel._writeInvoker.run(callback::succeeded);
}
@@ -1350,6 +1351,7 @@ public void failed(Throwable x)
httpChannel = _request.lockedGetHttpChannelState();
httpChannel.lockedStreamSendCompleted(false);
}
+
if (callback != null)
httpChannel._writeInvoker.run(() -> HttpChannelState.failed(callback, x));
}
diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
index 2c1e6733da72..11384b9beedd 100644
--- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
+++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java
@@ -1587,9 +1587,7 @@ public void failed(Throwable x)
if (LOG.isDebugEnabled())
LOG.debug("aborting", x);
abort(x);
- _httpChannel.recycle();
- _parser.reset();
- _generator.reset();
+ _httpChannel.setHttpStream(null);
if (!_handling.compareAndSet(true, false))
resume();
}
diff --git a/jetty-ee8/jetty-ee8-nested/pom.xml b/jetty-ee8/jetty-ee8-nested/pom.xml
index e88eb54b9717..ecb49d60ebe6 100644
--- a/jetty-ee8/jetty-ee8-nested/pom.xml
+++ b/jetty-ee8/jetty-ee8-nested/pom.xml
@@ -48,6 +48,11 @@
org.slf4j
slf4j-api
+
+ org.awaitility
+ awaitility
+ test
+
org.eclipse.jetty
jetty-http-tools
diff --git a/jetty-ee9/jetty-ee9-nested/pom.xml b/jetty-ee9/jetty-ee9-nested/pom.xml
index 5d84125b3752..20ad455048ce 100644
--- a/jetty-ee9/jetty-ee9-nested/pom.xml
+++ b/jetty-ee9/jetty-ee9-nested/pom.xml
@@ -47,6 +47,11 @@
org.slf4j
slf4j-api
+
+ org.awaitility
+ awaitility
+ test
+
org.eclipse.jetty
jetty-http-tools
diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java
index 8ccad21c9a35..3145e9e67381 100644
--- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java
+++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java
@@ -156,8 +156,12 @@ public ConnectionMetaData getConnectionMetaData()
*/
public boolean needContent()
{
+ ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest();
+ // When coreContextRequest is null, produceContent() always immediately returns an error content.
+ if (coreContextRequest == null)
+ return true;
// TODO: optimize by attempting a read?
- getCoreRequest().demand(_needContentTask);
+ coreContextRequest.demand(_needContentTask);
return false;
}
@@ -172,7 +176,10 @@ public boolean needContent()
*/
public HttpInput.Content produceContent()
{
- Content.Chunk chunk = getCoreRequest().read();
+ ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest();
+ if (coreContextRequest == null)
+ return new HttpInput.ErrorContent(new IOException("Channel has been recycled"));
+ Content.Chunk chunk = coreContextRequest.read();
if (chunk == null)
return null;
diff --git a/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java b/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java
index ffb59d701d61..11d731614b2c 100644
--- a/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java
+++ b/jetty-ee9/jetty-ee9-nested/src/test/java/org/eclipse/jetty/ee9/nested/BlockingTest.java
@@ -39,9 +39,9 @@
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
@@ -51,7 +51,6 @@
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;
-@Disabled // TODO
public class BlockingTest
{
private Server server;
@@ -86,7 +85,7 @@ public void testBlockingReadThenNormalComplete() throws Exception
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
- new Thread(() ->
+ Thread thread = new Thread(() ->
{
try
{
@@ -103,14 +102,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
- }).start();
+ });
+ thread.start();
try
{
// wait for thread to start and read first byte
- started.await(10, TimeUnit.SECONDS);
+ assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread.State state = thread.getState();
+ return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
+ });
}
catch (Throwable e)
{
@@ -253,13 +257,14 @@ public void testNormalCompleteThenBlockingRead() throws Exception
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference readException = new AtomicReference<>();
+ AtomicReference threadRef = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
- new Thread(() ->
+ Thread thread = new Thread(() ->
{
try
{
@@ -267,8 +272,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
if (b == '1')
{
started.countDown();
- completed.await(10, TimeUnit.SECONDS);
- Thread.sleep(500);
+ assertTrue(completed.await(10, TimeUnit.SECONDS));
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
@@ -278,14 +282,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
- }).start();
+ });
+ threadRef.set(thread);
+ thread.start();
try
{
// wait for thread to start and read first byte
- started.await(10, TimeUnit.SECONDS);
+ assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread.State state = thread.getState();
+ return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
+ });
}
catch (Throwable e)
{
@@ -321,7 +331,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
assertThat(response.getContent(), containsString("OK"));
completed.countDown();
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED);
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
@@ -335,6 +345,7 @@ public void testStartAsyncThenBlockingReadThenTimeout() throws Exception
CountDownLatch started = new CountDownLatch(1);
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
+ AtomicReference threadRef = new AtomicReference<>();
AtomicReference readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@@ -347,7 +358,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
AsyncContext async = request.startAsync();
async.setTimeout(100);
- new Thread(() ->
+ Thread thread = new Thread(() ->
{
try
{
@@ -355,8 +366,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
if (b == '1')
{
started.countDown();
- completed.await(10, TimeUnit.SECONDS);
- Thread.sleep(500);
+ assertTrue(completed.await(10, TimeUnit.SECONDS));
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
@@ -366,14 +376,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
- }).start();
+ });
+ threadRef.set(thread);
+ thread.start();
try
{
// wait for thread to start and read first byte
- started.await(10, TimeUnit.SECONDS);
+ assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread.State state = thread.getState();
+ return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
+ });
}
catch (Throwable e)
{
@@ -406,7 +422,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
assertThat(response.getContent(), containsString("AsyncContext timeout"));
completed.countDown();
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED);
// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
@@ -428,7 +444,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
baseRequest.setHandled(true);
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
{
- new Thread(() ->
+ Thread thread = new Thread(() ->
{
try
{
@@ -445,14 +461,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
- }).start();
+ });
+ thread.start();
try
{
// wait for thread to start and read first byte
- started.await(10, TimeUnit.SECONDS);
+ assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread.State state = thread.getState();
+ return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
+ });
}
catch (Throwable e)
{
@@ -505,7 +526,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentType("text/plain");
- new Thread(() ->
+ Thread thread = new Thread(() ->
{
try
{
@@ -523,14 +544,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
- }).start();
+ });
+ thread.start();
try
{
// wait for thread to start and read first byte
- started.await(10, TimeUnit.SECONDS);
+ assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on write
- Thread.sleep(1000);
+ await().atMost(5, TimeUnit.SECONDS).until(() ->
+ {
+ Thread.State state = thread.getState();
+ return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
+ });
}
catch (Throwable e)
{