diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java index b38304c2420c..95a0fbd14d86 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-server/src/main/java/org/eclipse/jetty/fcgi/server/internal/ServerFCGIConnection.java @@ -45,24 +45,22 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory(); private final Attributes attributes = new Lazy(); private final Connector connector; - private final ByteBufferPool networkByteBufferPool; + private final ByteBufferPool bufferPool; private final boolean sendStatus200; private final Flusher flusher; - private final HttpConfiguration configuration; private final ServerParser parser; private final String id; private boolean useInputDirectByteBuffers; private boolean useOutputDirectByteBuffers; - private RetainableByteBuffer networkBuffer; + private RetainableByteBuffer inputBuffer; private HttpStreamOverFCGI stream; public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200) { super(connector, configuration, endPoint); this.connector = connector; - this.networkByteBufferPool = connector.getByteBufferPool(); + this.bufferPool = connector.getByteBufferPool(); this.flusher = new Flusher(endPoint); - this.configuration = configuration; this.sendStatus200 = sendStatus200; this.parser = new ServerParser(new ServerListener()); this.id = StringUtil.randomAlphaNumeric(16); @@ -168,6 +166,8 @@ public void onOpen() @Override public void onFillable() { + if (LOG.isDebugEnabled()) + LOG.debug(">>onFillable enter {} {} {}", this, stream, inputBuffer); acquireInputBuffer(); try { @@ -178,20 +178,24 @@ public void onFillable() LOG.debug("Read {} bytes from {} {}", read, getEndPoint(), this); if (read > 0) { - if (parse(networkBuffer.getByteBuffer())) + // The inputBuffer cannot be released immediately after parse() + // even if the buffer has been fully consumed because releaseInputBuffer() + // must be called as the last release for it to be able to null out the + // inputBuffer field exactly when the latter isn't used anymore. + if (parse(inputBuffer.getByteBuffer())) return; } else if (read == 0) { releaseInputBuffer(); fillInterested(); - break; + return; } else { releaseInputBuffer(); shutdown(); - break; + return; } } } @@ -199,10 +203,15 @@ else if (read == 0) { if (LOG.isDebugEnabled()) LOG.debug("Unable to fill endpoint", x); - networkBuffer.clear(); + inputBuffer.clear(); releaseInputBuffer(); // TODO: fail and close ? } + finally + { + if (LOG.isDebugEnabled()) + LOG.debug("< tryReleaseBuffer(false); + case NO_FRAME -> tryReleaseInputBuffer(false); case SWITCH_MODE -> { parser.setDataMode(false); @@ -132,14 +139,14 @@ private void processDataFrames(boolean setFillInterest) { // The last frame may have caused a write that we need to flush. getEndPoint().getQuicSession().flush(); - tryReleaseBuffer(false); + tryReleaseInputBuffer(false); } } } } catch (Throwable x) { - tryReleaseBuffer(true); + tryReleaseInputBuffer(true); long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(); getEndPoint().close(error, x); // Notify the application that a failure happened. @@ -151,7 +158,7 @@ private void processNonDataFrames() { try { - tryAcquireBuffer(); + tryAcquireInputBuffer(); while (true) { @@ -160,14 +167,14 @@ private void processNonDataFrames() { case NO_FRAME -> { - tryReleaseBuffer(false); + tryReleaseInputBuffer(false); return; } case BLOCKED_FRAME -> { // Return immediately because another thread may // resume the processing as the stream is unblocked. - tryReleaseBuffer(false); + tryReleaseInputBuffer(false); return; } case SWITCH_MODE -> @@ -192,7 +199,7 @@ private void processNonDataFrames() // However, the last frame may have // caused a write that we need to flush. getEndPoint().getQuicSession().flush(); - tryReleaseBuffer(false); + tryReleaseInputBuffer(false); return; } @@ -201,7 +208,7 @@ private void processNonDataFrames() if (stream.hasDemandOrStall()) { - if (networkBuffer != null && networkBuffer.hasRemaining()) + if (inputBuffer != null && inputBuffer.hasRemaining()) { // There are bytes left in the buffer; if there are not // enough bytes to parse a DATA frame and call the @@ -212,7 +219,7 @@ private void processNonDataFrames() { // No bytes left in the buffer, but there is demand. // Set fill interest to call the application when bytes arrive. - tryReleaseBuffer(false); + tryReleaseInputBuffer(false); fillInterested(); } } @@ -227,7 +234,7 @@ private void processNonDataFrames() } catch (Throwable x) { - tryReleaseBuffer(true); + tryReleaseInputBuffer(true); long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(); getEndPoint().close(error, x); // Notify the application that a failure happened. @@ -242,28 +249,28 @@ public void receive() processDataFrames(false); } - private void tryAcquireBuffer() + private void tryAcquireInputBuffer() { - if (networkBuffer == null) + if (inputBuffer == null) { - networkBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + inputBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) - LOG.debug("acquired {}", networkBuffer); + LOG.debug("acquired {}", inputBuffer); } } - private void tryReleaseBuffer(boolean force) + private void tryReleaseInputBuffer(boolean force) { - if (networkBuffer != null) + if (inputBuffer != null) { - if (networkBuffer.hasRemaining() && force) - networkBuffer.clear(); - if (!networkBuffer.hasRemaining()) + if (inputBuffer.hasRemaining() && force) + inputBuffer.clear(); + if (!inputBuffer.hasRemaining()) { - networkBuffer.release(); + inputBuffer.release(); if (LOG.isDebugEnabled()) - LOG.debug("released {}", networkBuffer); - networkBuffer = null; + LOG.debug("released {}", inputBuffer); + inputBuffer = null; } } } @@ -273,30 +280,30 @@ private MessageParser.Result parseAndFill(boolean setFillInterest) try { if (LOG.isDebugEnabled()) - LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, networkBuffer); + LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, inputBuffer); while (true) { - ByteBuffer byteBuffer = networkBuffer.getByteBuffer(); + ByteBuffer byteBuffer = inputBuffer.getByteBuffer(); MessageParser.Result result = parser.parse(byteBuffer); if (LOG.isDebugEnabled()) - LOG.debug("parsed {} on {} with buffer {}", result, this, networkBuffer); + LOG.debug("parsed {} on {} with buffer {}", result, this, inputBuffer); if (result != MessageParser.Result.NO_FRAME) return result; - if (networkBuffer.isRetained()) + if (inputBuffer.isRetained()) { - networkBuffer.release(); + inputBuffer.release(); RetainableByteBuffer newBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) - LOG.debug("reacquired {} for retained {}", newBuffer, networkBuffer); - networkBuffer = newBuffer; - byteBuffer = networkBuffer.getByteBuffer(); + LOG.debug("reacquired {} for retained {}", newBuffer, inputBuffer); + inputBuffer = newBuffer; + byteBuffer = inputBuffer.getByteBuffer(); } int filled = fill(byteBuffer); if (LOG.isDebugEnabled()) - LOG.debug("filled {} on {} with buffer {}", filled, this, networkBuffer); + LOG.debug("filled {} on {} with buffer {}", filled, this, inputBuffer); if (filled > 0) continue; @@ -395,9 +402,9 @@ private void processData(DataFrame frame, Runnable delegate) } else { - // No need to call networkBuffer.retain() here, since we know - // that the action will be run before releasing the networkBuffer. - data = new StreamData(frame, networkBuffer); + // No need to call inputBuffer.retain() here, since we know + // that the action will be run before releasing the inputBuffer. + data = new StreamData(frame, inputBuffer); } delegate.run(); diff --git a/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java index ac09155c693f..ff784c695521 100644 --- a/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-server/src/main/java/org/eclipse/jetty/http3/server/internal/HttpStreamOverHTTP3.java @@ -531,6 +531,7 @@ public Runnable onFailure(Throwable failure) chunk.release(); chunk = Content.Chunk.from(failure, true); } + connection.onFailure(failure); return httpChannel.onFailure(failure); } } diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java index 077312b1c511..9a969015e036 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java @@ -17,12 +17,12 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.ContentResponse; +import org.eclipse.jetty.client.StringRequestContent; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.Callback; -import org.junit.jupiter.api.Tag; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -35,25 +35,30 @@ public class HttpClientIdleTimeoutTest extends AbstractTest @ParameterizedTest @MethodSource("transports") - @Tag("DisableLeakTracking:server:FCGI") public void testClientIdleTimeout(Transport transport) throws Exception { + long serverIdleTimeout = idleTimeout * 2; + CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); start(transport, new Handler.Abstract() { @Override - public boolean handle(Request request, Response response, Callback callback) throws Exception + public boolean handle(Request request, Response response, Callback callback) { // Do not succeed the callback if it's a timeout request. if (!Request.getPathInContext(request).equals("/timeout")) callback.succeeded(); + else + request.addFailureListener(x -> serverIdleTimeoutLatch.countDown()); return true; } }); + connector.setIdleTimeout(serverIdleTimeout); client.setIdleTimeout(idleTimeout); CountDownLatch latch = new CountDownLatch(1); client.newRequest(newURI(transport)) .path("/timeout") + .body(new StringRequestContent("some data")) .send(result -> { if (result.isFailed()) @@ -65,15 +70,20 @@ public boolean handle(Request request, Response response, Callback callback) thr // Verify that after the timeout we can make another request. ContentResponse response = client.newRequest(newURI(transport)) .timeout(5, TimeUnit.SECONDS) + .body(new StringRequestContent("more data")) .send(); assertEquals(HttpStatus.OK_200, response.getStatus()); + + // Wait for the server's idle timeout to trigger to give it a chance to clean up its resources. + assertTrue(serverIdleTimeoutLatch.await(2 * serverIdleTimeout, TimeUnit.MILLISECONDS)); } @ParameterizedTest @MethodSource("transports") - @Tag("DisableLeakTracking:server:FCGI") public void testRequestIdleTimeout(Transport transport) throws Exception { + long serverIdleTimeout = idleTimeout * 2; + CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); start(transport, new Handler.Abstract() { @Override @@ -82,13 +92,17 @@ public boolean handle(Request request, Response response, Callback callback) thr // Do not succeed the callback if it's a timeout request. if (!Request.getPathInContext(request).equals("/timeout")) callback.succeeded(); + else + request.addFailureListener(x -> serverIdleTimeoutLatch.countDown()); return true; } }); + connector.setIdleTimeout(serverIdleTimeout); CountDownLatch latch = new CountDownLatch(1); client.newRequest(newURI(transport)) .path("/timeout") + .body(new StringRequestContent("some data")) .idleTimeout(idleTimeout, TimeUnit.MILLISECONDS) .send(result -> { @@ -100,9 +114,13 @@ public boolean handle(Request request, Response response, Callback callback) thr // Verify that after the timeout we can make another request. ContentResponse response = client.newRequest(newURI(transport)) + .body(new StringRequestContent("more data")) .timeout(5, TimeUnit.SECONDS) .send(); assertEquals(HttpStatus.OK_200, response.getStatus()); + + // Wait for the server's idle timeout to trigger to give it a chance to clean up its resources. + assertTrue(serverIdleTimeoutLatch.await(2 * serverIdleTimeout, TimeUnit.MILLISECONDS)); } @ParameterizedTest