Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Undisable ee9 BlockingTest and fix HttpChannel.produceContent #12529

Merged
merged 13 commits into from
Dec 18, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -1321,6 +1321,7 @@ public void succeeded()
httpChannel = _request.lockedGetHttpChannelState();
httpChannel.lockedStreamSendCompleted(true);
}

if (callback != null)
httpChannel._writeInvoker.run(callback::succeeded);
}
Expand Down Expand Up @@ -1350,6 +1351,7 @@ public void failed(Throwable x)
httpChannel = _request.lockedGetHttpChannelState();
httpChannel.lockedStreamSendCompleted(false);
}

if (callback != null)
httpChannel._writeInvoker.run(() -> HttpChannelState.failed(callback, x));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1587,9 +1587,6 @@ public void failed(Throwable x)
if (LOG.isDebugEnabled())
LOG.debug("aborting", x);
abort(x);
_httpChannel.recycle();
_parser.reset();
_generator.reset();
if (!_handling.compareAndSet(true, false))
resume();
}
Expand Down
5 changes: 5 additions & 0 deletions jetty-ee8/jetty-ee8-nested/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http-tools</artifactId>
Expand Down
5 changes: 5 additions & 0 deletions jetty-ee9/jetty-ee9-nested/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-http-tools</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,12 @@ public ConnectionMetaData getConnectionMetaData()
*/
public boolean needContent()
{
ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest();
// When coreContextRequest is null, produceContent() always immediately returns an error content.
if (coreContextRequest == null)
return true;
// TODO: optimize by attempting a read?
getCoreRequest().demand(_needContentTask);
coreContextRequest.demand(_needContentTask);
return false;
}

Expand All @@ -172,7 +176,10 @@ public boolean needContent()
*/
public HttpInput.Content produceContent()
{
Content.Chunk chunk = getCoreRequest().read();
ContextHandler.CoreContextRequest coreContextRequest = getCoreRequest();
if (coreContextRequest == null)
return new HttpInput.ErrorContent(new IOException("Channel has been recycled"));
Content.Chunk chunk = coreContextRequest.read();
if (chunk == null)
return null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@
import org.eclipse.jetty.server.handler.gzip.GzipHandler;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.instanceOf;
Expand All @@ -51,7 +51,6 @@
import static org.hamcrest.core.Is.is;
import static org.junit.jupiter.api.Assertions.assertTrue;

@Disabled // TODO
public class BlockingTest
{
private Server server;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void testBlockingReadThenNormalComplete() throws Exception
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
Expand All @@ -103,14 +102,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -253,22 +257,22 @@ public void testNormalCompleteThenBlockingRead() throws Exception
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Throwable> readException = new AtomicReference<>();
AtomicReference<Thread> threadRef = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
baseRequest.setHandled(true);
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
assertTrue(completed.await(10, TimeUnit.SECONDS));
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
Expand All @@ -278,14 +282,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
threadRef.set(thread);
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -321,7 +331,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
assertThat(response.getContent(), containsString("OK"));

completed.countDown();
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED);

// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
Expand All @@ -335,6 +345,7 @@ public void testStartAsyncThenBlockingReadThenTimeout() throws Exception
CountDownLatch started = new CountDownLatch(1);
CountDownLatch completed = new CountDownLatch(1);
CountDownLatch stopped = new CountDownLatch(1);
AtomicReference<Thread> threadRef = new AtomicReference<>();
AtomicReference<Throwable> readException = new AtomicReference<>();
AbstractHandler handler = new AbstractHandler()
{
Expand All @@ -347,16 +358,15 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
AsyncContext async = request.startAsync();
async.setTimeout(100);

new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
int b = baseRequest.getHttpInput().read();
if (b == '1')
{
started.countDown();
completed.await(10, TimeUnit.SECONDS);
Thread.sleep(500);
assertTrue(completed.await(10, TimeUnit.SECONDS));
if (baseRequest.getHttpInput().read() > Integer.MIN_VALUE)
throw new IllegalStateException();
}
Expand All @@ -366,14 +376,20 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
threadRef.set(thread);
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -406,7 +422,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
assertThat(response.getContent(), containsString("AsyncContext timeout"));

completed.countDown();
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() -> threadRef.get().getState() == Thread.State.TERMINATED);

// Async thread should have stopped
assertTrue(stopped.await(10, TimeUnit.SECONDS));
Expand All @@ -428,7 +444,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
baseRequest.setHandled(true);
if (baseRequest.getDispatcherType() != DispatcherType.ERROR)
{
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
Expand All @@ -445,14 +461,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on second byte
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down Expand Up @@ -505,7 +526,7 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
baseRequest.setHandled(true);
response.setStatus(200);
response.setContentType("text/plain");
new Thread(() ->
Thread thread = new Thread(() ->
{
try
{
Expand All @@ -523,14 +544,19 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques
readException.set(t);
stopped.countDown();
}
}).start();
});
thread.start();

try
{
// wait for thread to start and read first byte
started.await(10, TimeUnit.SECONDS);
assertTrue(started.await(10, TimeUnit.SECONDS));
// give it time to block on write
Thread.sleep(1000);
await().atMost(5, TimeUnit.SECONDS).until(() ->
{
Thread.State state = thread.getState();
return state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING;
});
}
catch (Throwable e)
{
Expand Down
Loading