From 26f660f0fddd3a17836c3ca3dc70660a3cddf9af Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 10 Oct 2024 11:36:13 +0200 Subject: [PATCH] Fixes #12313 - Jetty 12 ee9/ee10 doesn't invoke callbacks when h2 client sends RST_STREAM. * Fixed invocation of AsyncListener.onError(), now called even if the response is already committed, in both EE9 and EE10. * Reworked EE9 HttpChannel state machine in case of failures to be like EE10's. In particular, calling abort now is a state change, rather than a failure of the Handler callback. In this way, the handle() loop continues, enters case TERMINATED, and the callback is completed in onCompleted(). * Fixed EE9 handling of idle timeout in HttpChannel.onRequest(), that was missing. Signed-off-by: Simone Bordet --- .../jetty/ee10/servlet/HttpOutput.java | 2 +- .../jetty/ee10/servlet/ServletChannel.java | 17 +-- .../ee10/servlet/ServletChannelState.java | 36 +++-- .../ee10/servlet/ServletContextHandler.java | 2 +- .../transport/Http2AsyncIOServletTest.java | 78 +++++++++- .../transport/Http3AsyncIOServletTest.java | 20 ++- .../jetty/ee9/nested/ContextHandler.java | 2 + .../eclipse/jetty/ee9/nested/HttpChannel.java | 56 +++---- .../jetty/ee9/nested/HttpChannelState.java | 89 +++++++---- .../eclipse/jetty/ee9/nested/HttpOutput.java | 2 +- .../transport/Http2AsyncIOServletTest.java | 140 +++++++++++++++++- .../transport/Http3AsyncIOServletTest.java | 20 ++- 12 files changed, 363 insertions(+), 101 deletions(-) diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java index 0ac43556411e..6d0b8e1b3fa5 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/HttpOutput.java @@ -698,7 +698,7 @@ public void flush() throws IOException private void checkWritable() throws EofException { if (_softClose) - throw new EofException("Closed"); + throw new EofException("Closed"); switch (_state) { diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java index 4a6ebb620d4a..3d3e3062d8f9 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannel.java @@ -661,20 +661,15 @@ else if (noStack != null) LOG.warn(request == null ? "unknown request" : request.getServletApiRequest().getRequestURI(), failure); } - if (isCommitted()) + try { - abort(failure); + boolean abort = _state.onError(failure); + if (abort) + abort(failure); } - else + catch (Throwable x) { - try - { - _state.onError(failure); - } - catch (IllegalStateException e) - { - abort(failure); - } + abort(failure); } } diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java index da225815373f..3c121ba78039 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletChannelState.java @@ -874,14 +874,15 @@ public boolean onIdleTimeout(TimeoutException timeout) } } - protected void onError(Throwable th) + protected boolean onError(Throwable th) { + boolean committed = _servletChannel.isCommitted(); final AsyncContextEvent asyncEvent; final List asyncListeners; try (AutoLock ignored = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("thrownException {}", getStatusStringLocked(), th); + LOG.debug("onError {}", getStatusStringLocked(), th); // This can only be called from within the handle loop if (_state != State.HANDLING) @@ -890,34 +891,42 @@ protected void onError(Throwable th) // If sendError has already been called, we can only handle one failure at a time! if (_sendError) { - LOG.warn("unhandled due to prior sendError", th); - return; + LOG.warn("onError not handled due to prior sendError() {}", getStatusStringLocked(), th); + return false; } // Check async state to determine type of handling switch (_requestState) { case BLOCKING: - // handle the exception with a sendError + { + // Handle the exception with a sendError. + if (committed) + return true; sendError(th); - return; - + return false; + } case DISPATCH: // Dispatch has already been called, but we ignore and handle exception below case COMPLETE: // Complete has already been called, but we ignore and handle exception below case ASYNC: + { if (_asyncListeners == null || _asyncListeners.isEmpty()) { + if (committed) + return true; sendError(th); - return; + return false; } asyncEvent = _event; asyncEvent.addThrowable(th); asyncListeners = _asyncListeners; break; - + } default: - LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th)); - return; + { + LOG.warn("onError not handled due to invalid requestState {}", getStatusStringLocked(), th); + return false; + } } } @@ -948,7 +957,10 @@ protected void onError(Throwable th) { // The listeners did not invoke API methods and the // container must provide a default error dispatch. + if (committed) + return true; sendError(th); + return false; } else if (_requestState != RequestState.COMPLETE) { @@ -957,12 +969,14 @@ else if (_requestState != RequestState.COMPLETE) else LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th)); } + return committed; } } private void sendError(Throwable th) { // No sync as this is always called with lock held + assert _lock.isHeldByCurrentThread(); // Determine the actual details of the exception final Request request = _servletChannel.getServletContextRequest(); diff --git a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletContextHandler.java b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletContextHandler.java index 6de8dd52d56b..4b2a34fa8b22 100644 --- a/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletContextHandler.java +++ b/jetty-ee10/jetty-ee10-servlet/src/main/java/org/eclipse/jetty/ee10/servlet/ServletContextHandler.java @@ -1167,7 +1167,7 @@ protected ContextRequest wrapRequest(Request request, Response response) ServletContextRequest servletContextRequest = newServletContextRequest(servletChannel, request, response, decodedPathInContext, matchedResource); servletChannel.associate(servletContextRequest); - Request.addCompletionListener(request, servletChannel::recycle); + Request.addCompletionListener(servletContextRequest, servletChannel::recycle); return servletContextRequest; } diff --git a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java index c55aadae64c9..1919fad72b37 100644 --- a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java +++ b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http2AsyncIOServletTest.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.ee10.test.client.transport; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -21,12 +22,14 @@ import jakarta.servlet.AsyncContext; import jakarta.servlet.AsyncEvent; import jakarta.servlet.AsyncListener; +import jakarta.servlet.ServletException; import jakarta.servlet.http.HttpServlet; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import org.eclipse.jetty.ee10.servlet.ServletContextHandler; import org.eclipse.jetty.ee10.servlet.ServletHolder; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; @@ -54,11 +57,12 @@ public class Http2AsyncIOServletTest { + private final HttpConfiguration httpConfig = new HttpConfiguration(); private Server server; private ServerConnector connector; private HTTP2Client client; - private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception + private void start(HttpServlet httpServlet) throws Exception { server = new Server(); connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig)); @@ -83,12 +87,10 @@ public void tearDown() @ValueSource(booleans = {true, false}) public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception { - HttpConfiguration httpConfig = new HttpConfiguration(); httpConfig.setNotifyRemoteAsyncErrors(notify); - AtomicReference errorAsyncEventRef = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - start(httpConfig, new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) @@ -138,14 +140,82 @@ public void onStartAsync(AsyncEvent event) stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code)); if (notify) + { // Wait for the reset to be notified to the async context listener. await().atMost(5, TimeUnit.SECONDS).until(() -> { AsyncEvent asyncEvent = errorAsyncEventRef.get(); return asyncEvent == null ? null : asyncEvent.getThrowable(); }, instanceOf(EofException.class)); + } else + { // Wait for the reset to NOT be notified to the failure listener. await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue()); + } + } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testClientResetNotifiesAsyncListener(boolean commitResponse) throws Exception + { + CountDownLatch requestLatch = new CountDownLatch(1); + CountDownLatch errorLatch = new CountDownLatch(1); + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + if (commitResponse) + response.flushBuffer(); + + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + + asyncContext.addListener(new AsyncListener() + { + @Override + public void onComplete(AsyncEvent event) + { + } + + @Override + public void onTimeout(AsyncEvent event) + { + } + + @Override + public void onError(AsyncEvent event) + { + if (!response.isCommitted()) + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); + asyncContext.complete(); + errorLatch.countDown(); + } + + @Override + public void onStartAsync(AsyncEvent event) + { + } + }); + + requestLatch.countDown(); + } + }); + + Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {}) + .get(5, TimeUnit.SECONDS); + MetaData.Request request = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY); + Stream stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + + // Wait for the server to become idle after the request. + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(500); + + // Reset the stream. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code)); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); } } diff --git a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http3AsyncIOServletTest.java b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http3AsyncIOServletTest.java index 64d5eaf00bbc..731fa72ad998 100644 --- a/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http3AsyncIOServletTest.java +++ b/jetty-ee10/jetty-ee10-tests/jetty-ee10-test-client-transports/src/test/java/org/eclipse/jetty/ee10/test/client/transport/Http3AsyncIOServletTest.java @@ -47,6 +47,7 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -56,17 +57,18 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; @ExtendWith(WorkDirExtension.class) public class Http3AsyncIOServletTest { public WorkDir workDir; - + private final HttpConfiguration httpConfig = new HttpConfiguration(); private Server server; private QuicServerConnector connector; private HTTP3Client client; - private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception + private void start(HttpServlet httpServlet) throws Exception { server = new Server(); SslContextFactory.Server serverSslContextFactory = new SslContextFactory.Server(); @@ -95,12 +97,10 @@ public void tearDown() @ValueSource(booleans = {true, false}) public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception { - HttpConfiguration httpConfig = new HttpConfiguration(); httpConfig.setNotifyRemoteAsyncErrors(notify); - AtomicReference errorAsyncEventRef = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - start(httpConfig, new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) @@ -158,4 +158,14 @@ public void onStartAsync(AsyncEvent event) // Wait for the reset to NOT be notified to the failure listener. await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue()); } + + @Test + public void testClientResetNotifiesAsyncListener() + { + // See the equivalent test in Http2AsyncIOServletTest for HTTP/2. + // For HTTP/3 we do not have a "reset" event that we can relay to applications, + // because HTTP/3 does not have a "reset" frame; QUIC has RESET_STREAM, but we + // do not have an event from Quiche to reliably report it to applications. + assumeTrue(false); + } } diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/ContextHandler.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/ContextHandler.java index 03d6e050c114..1c22040173da 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/ContextHandler.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/ContextHandler.java @@ -2809,6 +2809,8 @@ else if (httpChannel.getContextHandler() == ContextHandler.this && !request.getC CoreContextRequest coreContextRequest = new CoreContextRequest(request, this.getContext(), httpChannel); httpChannel.onRequest(coreContextRequest); + HttpChannel channel = httpChannel; + org.eclipse.jetty.server.Request.addCompletionListener(coreContextRequest, x -> channel.recycle()); return coreContextRequest; } diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java index f705b229362d..130e47d0877c 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannel.java @@ -90,10 +90,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor private final List _transientListeners = new ArrayList<>(); private MetaData.Response _committedMetaData; private long _oldIdleTimeout; - - /** - * Bytes written after interception (eg after compression) - */ + // Bytes written after interception (eg after compression) private long _written; private ContextHandler.CoreContextRequest _coreRequest; private org.eclipse.jetty.server.Response _coreResponse; @@ -462,6 +459,9 @@ public void recycle() _committedMetaData = null; _written = 0; _transientListeners.clear(); + _coreRequest = null; + _coreResponse = null; + _coreCallback = null; } @Override @@ -782,20 +782,15 @@ else if (noStack != null) LOG.warn(_request.getRequestURI(), failure); } - if (isCommitted()) + try { - abort(failure); + boolean abort = _state.onError(failure); + if (abort) + abort(failure); } - else + catch (Throwable x) { - try - { - _state.onError(failure); - } - catch (IllegalStateException e) - { - abort(failure); - } + abort(failure); } } @@ -856,7 +851,14 @@ public void onRequest(ContextHandler.CoreContextRequest coreRequest) _coreRequest.addIdleTimeoutListener(_state::onIdleTimeout); _requests.incrementAndGet(); _request.onRequest(coreRequest); + + long idleTO = _configuration.getIdleTimeout(); + _oldIdleTimeout = getIdleTimeout(); + if (idleTO >= 0 && _oldIdleTimeout != idleTO) + setIdleTimeout(idleTO); + _combinedListener.onRequestBegin(_request); + if (LOG.isDebugEnabled()) { MetaData.Request metaData = _request.getMetaData(); @@ -932,9 +934,11 @@ public void onCompleted() _request.onCompleted(); _combinedListener.onComplete(_request); Callback callback = _coreCallback; - _coreCallback = null; - if (callback != null) + Throwable failure = _state.completeResponse(); + if (failure == null) callback.succeeded(); + else + callback.failed(failure); } public void onBadMessage(BadMessageException failure) @@ -1146,14 +1150,14 @@ public boolean isUseOutputDirectByteBuffers() */ public void abort(Throwable failure) { - if (_state.abortResponse()) - { - _combinedListener.onResponseFailure(_request, failure); - Callback callback = _coreCallback; - _coreCallback = null; - if (callback != null) - callback.failed(failure); - } + Boolean handle = _state.abort(failure); + // Not aborted, just return. + if (handle == null) + return; + + _combinedListener.onResponseFailure(_request, failure); + if (handle) + _state.scheduleDispatch(); } public boolean isTunnellingSupported() @@ -1414,7 +1418,7 @@ public void succeeded() _combinedListener.onResponseCommit(_request); if (_length > 0) _combinedListener.onResponseContent(_request, _content); - if (_complete && _state.completeResponse()) + if (_complete) _combinedListener.onResponseEnd(_request); super.succeeded(); } diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java index b05626a44247..ff2e23e0c982 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpChannelState.java @@ -146,6 +146,7 @@ public enum Action private long _timeoutMs = DEFAULT_TIMEOUT; private AsyncContextEvent _event; private Thread _onTimeoutThread; + private Throwable _failure; private boolean _failureListener; protected HttpChannelState(HttpChannel channel) @@ -310,20 +311,13 @@ public boolean partialResponse() } } - public boolean completeResponse() + public Throwable completeResponse() { try (AutoLock l = lock()) { - switch (_outputState) - { - case OPEN: - case COMMITTED: - _outputState = OutputState.COMPLETED; - return true; - - default: - return false; - } + if (_outputState == OutputState.OPEN || _outputState == OutputState.COMMITTED) + _outputState = OutputState.COMPLETED; + return _failure; } } @@ -349,22 +343,39 @@ public boolean isResponseCompleted() } } - public boolean abortResponse() + Boolean abort(Throwable failure) + { + boolean handle; + try (AutoLock ignored = lock()) + { + boolean aborted = abortResponse(failure); + if (LOG.isDebugEnabled()) + LOG.debug("abort={} {}", aborted, this, failure); + if (aborted) + { + handle = _state == State.WAITING; + if (handle) + _state = State.WOKEN; + _requestState = RequestState.COMPLETED; + return handle; + } + return null; + } + } + + public boolean abortResponse(Throwable failure) { try (AutoLock l = lock()) { switch (_outputState) { + case COMPLETED: case ABORTED: return false; - case OPEN: - _channel.getResponse().setStatus(500); - _outputState = OutputState.ABORTED; - return true; - default: _outputState = OutputState.ABORTED; + _failure = failure; return true; } } @@ -719,7 +730,7 @@ public void complete() { case EXPIRING: if (Thread.currentThread() != _onTimeoutThread) - throw new IllegalStateException(this.getStatusStringLocked()); + throw new IllegalStateException(getStatusStringLocked()); _requestState = _sendError ? RequestState.BLOCKING : RequestState.COMPLETE; break; @@ -730,7 +741,7 @@ public void complete() case COMPLETE: return; default: - throw new IllegalStateException(this.getStatusStringLocked()); + throw new IllegalStateException(getStatusStringLocked()); } if (_state == State.WAITING) { @@ -779,14 +790,15 @@ public void asyncError(Throwable failure) } } - protected void onError(Throwable th) + protected boolean onError(Throwable th) { + boolean committed = _channel.isCommitted(); final AsyncContextEvent asyncEvent; final List asyncListeners; try (AutoLock l = lock()) { if (LOG.isDebugEnabled()) - LOG.debug("thrownException {}", getStatusStringLocked(), th); + LOG.debug("onError {}", getStatusStringLocked(), th); // This can only be called from within the handle loop if (_state != State.HANDLING) @@ -795,34 +807,42 @@ protected void onError(Throwable th) // If sendError has already been called, we can only handle one failure at a time! if (_sendError) { - LOG.warn("unhandled due to prior sendError", th); - return; + LOG.warn("onError not handled due to prior sendError() {}", getStatusStringLocked(), th); + return false; } // Check async state to determine type of handling switch (_requestState) { case BLOCKING: - // handle the exception with a sendError + { + // Handle the exception with a sendError. + if (committed) + return true; sendError(th); - return; - + return false; + } case DISPATCH: // Dispatch has already been called but we ignore and handle exception below case COMPLETE: // Complete has already been called but we ignore and handle exception below case ASYNC: + { if (_asyncListeners == null || _asyncListeners.isEmpty()) { + if (committed) + return true; sendError(th); - return; + return false; } asyncEvent = _event; asyncEvent.addThrowable(th); asyncListeners = _asyncListeners; break; - + } default: - LOG.warn("unhandled in state {}", _requestState, new IllegalStateException(th)); - return; + { + LOG.warn("onError not handled due to invalid requestState {}", getStatusStringLocked(), th); + return false; + } } } @@ -839,9 +859,9 @@ protected void onError(Throwable th) catch (Throwable x) { if (LOG.isDebugEnabled()) - LOG.warn("{} while invoking onError listener {}", x.toString(), listener, x); + LOG.debug("{} while invoking onError listener {}", x, listener, x); else - LOG.warn("{} while invoking onError listener {}", x.toString(), listener); + LOG.warn("{} while invoking onError listener {}", x, listener); } } }); @@ -853,7 +873,10 @@ protected void onError(Throwable th) { // The listeners did not invoke API methods and the // container must provide a default error dispatch. + if (committed) + return true; sendError(th); + return false; } else if (_requestState != RequestState.COMPLETE) { @@ -862,12 +885,14 @@ else if (_requestState != RequestState.COMPLETE) else LOG.warn("unhandled in state {}", _requestState, th); } + return committed; } } private void sendError(Throwable th) { // No sync as this is always called with lock held + assert _lock.isHeldByCurrentThread(); // Determine the actual details of the exception final Request request = _channel.getRequest(); diff --git a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java index c4600889f710..9387a51313d5 100644 --- a/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java +++ b/jetty-ee9/jetty-ee9-nested/src/main/java/org/eclipse/jetty/ee9/nested/HttpOutput.java @@ -763,7 +763,7 @@ public void flush() throws IOException private void checkWritable() throws EofException { if (_softClose) - throw new EofException("Closed"); + throw new EofException("Closed"); switch (_state) { diff --git a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http2AsyncIOServletTest.java b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http2AsyncIOServletTest.java index 12e6a7cfef6c..1dc1b9eee208 100644 --- a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http2AsyncIOServletTest.java +++ b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http2AsyncIOServletTest.java @@ -13,6 +13,7 @@ package org.eclipse.jetty.ee9.test.client.transport; +import java.io.IOException; import java.net.InetSocketAddress; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -21,12 +22,15 @@ import jakarta.servlet.AsyncContext; import jakarta.servlet.AsyncEvent; import jakarta.servlet.AsyncListener; +import jakarta.servlet.ServletException; +import jakarta.servlet.ServletOutputStream; import jakarta.servlet.http.HttpServlet; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import org.eclipse.jetty.ee9.servlet.ServletContextHandler; import org.eclipse.jetty.ee9.servlet.ServletHolder; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpURI; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; @@ -44,21 +48,24 @@ import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.component.LifeCycle; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; import static org.awaitility.Awaitility.await; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class Http2AsyncIOServletTest { + private final HttpConfiguration httpConfig = new HttpConfiguration(); private Server server; private ServerConnector connector; private HTTP2Client client; - private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception + private void start(HttpServlet httpServlet) throws Exception { server = new Server(); connector = new ServerConnector(server, 1, 1, new HTTP2CServerConnectionFactory(httpConfig)); @@ -82,12 +89,10 @@ public void tearDown() @ValueSource(booleans = {true, false}) public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception { - HttpConfiguration httpConfig = new HttpConfiguration(); httpConfig.setNotifyRemoteAsyncErrors(notify); - AtomicReference errorAsyncEventRef = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - start(httpConfig, new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) @@ -147,4 +152,131 @@ public void onStartAsync(AsyncEvent event) // Wait for the reset to NOT be notified to the failure listener. await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue()); } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testClientResetNotifiesAsyncListener(boolean commitResponse) throws Exception + { + CountDownLatch requestLatch = new CountDownLatch(1); + CountDownLatch errorLatch = new CountDownLatch(1); + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + if (commitResponse) + response.flushBuffer(); + + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + + asyncContext.addListener(new AsyncListener() + { + @Override + public void onComplete(AsyncEvent event) + { + } + + @Override + public void onTimeout(AsyncEvent event) + { + } + + @Override + public void onError(AsyncEvent event) + { + if (!response.isCommitted()) + response.setStatus(HttpStatus.INTERNAL_SERVER_ERROR_500); + asyncContext.complete(); + errorLatch.countDown(); + } + + @Override + public void onStartAsync(AsyncEvent event) + { + } + }); + + requestLatch.countDown(); + } + }); + + Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {}) + .get(5, TimeUnit.SECONDS); + MetaData.Request request = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY); + Stream stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + + // Wait for the server to become idle after the request. + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(500); + + // Reset the stream. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code)); + + assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testClientResetThenWriteAfterComplete() throws Exception + { + AtomicReference servletOutputRef = new AtomicReference<>(); + CountDownLatch requestLatch = new CountDownLatch(1); + CountDownLatch errorLatch = new CountDownLatch(1); + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.flushBuffer(); + + AsyncContext asyncContext = request.startAsync(); + asyncContext.setTimeout(0); + asyncContext.addListener(new AsyncListener() + { + @Override + public void onComplete(AsyncEvent event) + { + } + + @Override + public void onTimeout(AsyncEvent event) + { + } + + @Override + public void onError(AsyncEvent event) + { + asyncContext.complete(); + errorLatch.countDown(); + } + + @Override + public void onStartAsync(AsyncEvent event) + { + } + }); + servletOutputRef.set(response.getOutputStream()); + requestLatch.countDown(); + } + }); + + Session session = client.connect(new InetSocketAddress("localhost", connector.getLocalPort()), new Session.Listener() {}) + .get(5, TimeUnit.SECONDS); + MetaData.Request request = new MetaData.Request("GET", HttpURI.from("/"), HttpVersion.HTTP_2, HttpFields.EMPTY); + Stream stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS); + + // Wait for the server to become idle after the request. + assertTrue(requestLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(500); + + // Reset the stream and wait to take effect. + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code)); + assertTrue(errorLatch.await(5, TimeUnit.SECONDS)); + Thread.sleep(500); + + // Perform an after completion write. + assertThrows(IOException.class, () -> servletOutputRef.get().println("write after complete")); + } } diff --git a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http3AsyncIOServletTest.java b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http3AsyncIOServletTest.java index 737e29e4dd03..f74f51aefcca 100644 --- a/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http3AsyncIOServletTest.java +++ b/jetty-ee9/jetty-ee9-tests/jetty-ee9-test-client-transports/src/test/java/org/eclipse/jetty/ee9/test/client/transport/Http3AsyncIOServletTest.java @@ -48,6 +48,7 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; @@ -57,17 +58,18 @@ import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.nullValue; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assumptions.assumeTrue; @ExtendWith(WorkDirExtension.class) public class Http3AsyncIOServletTest { public WorkDir workDir; - + private final HttpConfiguration httpConfig = new HttpConfiguration(); private Server server; private QuicServerConnector connector; private HTTP3Client client; - private void start(HttpConfiguration httpConfig, HttpServlet httpServlet) throws Exception + private void start(HttpServlet httpServlet) throws Exception { server = new Server(); SslContextFactory.Server serverSslContextFactory = new SslContextFactory.Server(); @@ -95,12 +97,10 @@ public void tearDown() @ValueSource(booleans = {true, false}) public void testStartAsyncThenClientResetRemoteErrorNotification(boolean notify) throws Exception { - HttpConfiguration httpConfig = new HttpConfiguration(); httpConfig.setNotifyRemoteAsyncErrors(notify); - AtomicReference errorAsyncEventRef = new AtomicReference<>(); CountDownLatch latch = new CountDownLatch(1); - start(httpConfig, new HttpServlet() + start(new HttpServlet() { @Override protected void service(HttpServletRequest request, HttpServletResponse response) @@ -158,4 +158,14 @@ public void onStartAsync(AsyncEvent event) // Wait for the reset to NOT be notified to the failure listener. await().atMost(5, TimeUnit.SECONDS).during(1, TimeUnit.SECONDS).until(errorAsyncEventRef::get, nullValue()); } + + @Test + public void testClientResetNotifiesAsyncListener() + { + // See the equivalent test in Http2AsyncIOServletTest for HTTP/2. + // For HTTP/3 we do not have a "reset" event that we can relay to applications, + // because HTTP/3 does not have a "reset" frame; QUIC has RESET_STREAM, but we + // do not have an event from Quiche to reliably report it to applications. + assumeTrue(false); + } }