diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index d3d59ca1802c..4306c161a61e 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -318,15 +318,16 @@ public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers) @Override public ByteBuffer onUpgradeFrom() { - if (!isRequestBufferEmpty()) + if (isRequestBufferEmpty()) { - ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining()); - unconsumed.put(_retainableByteBuffer.getByteBuffer()); - unconsumed.flip(); releaseRequestBuffer(); - return unconsumed; + return null; } - return null; + ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining()); + unconsumed.put(_retainableByteBuffer.getByteBuffer()); + unconsumed.flip(); + releaseRequestBuffer(); + return unconsumed; } @Override @@ -341,10 +342,10 @@ void releaseRequestBuffer() { if (LOG.isDebugEnabled()) LOG.debug("releaseRequestBuffer {}", this); - if (_retainableByteBuffer.release()) - _retainableByteBuffer = null; - else - throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer); + RetainableByteBuffer buffer = _retainableByteBuffer; + _retainableByteBuffer = null; + if (!buffer.release()) + throw new IllegalStateException("unreleased buffer " + buffer); } } @@ -369,7 +370,9 @@ public void onFillable() HttpConnection last = setCurrentConnection(this); try { - while (getEndPoint().isOpen()) + // We must loop until we fill -1 or there is an async pause in handling. + // Note that the endpoint might already be closed in some special circumstances. + while (true) { // Fill the request buffer (if needed). int filled = fillRequestBuffer(); @@ -906,6 +909,13 @@ private void releaseChunk() @Override protected void onCompleteSuccess() { + // If we are a non-persistent connection and have succeeded the last write... + if (_shutdownOut && !(_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection)) + { + // then we shutdown the output here so that the client sees the body termination ASAP and + // cannot be delayed by any further server handling before the stream callback is completed. + getEndPoint().shutdownOutput(); + } release().succeeded(); } @@ -1513,8 +1523,7 @@ public void succeeded() return; } - Connection upgradeConnection = (Connection)_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE); - if (upgradeConnection != null) + if (_httpChannel.getRequest().getAttribute(HttpStream.UPGRADE_CONNECTION_ATTRIBUTE) instanceof Connection upgradeConnection) { getEndPoint().upgrade(upgradeConnection); _httpChannel.recycle(); @@ -1523,13 +1532,8 @@ public void succeeded() return; } - // As this is not an upgrade, we can shutdown the output if we know we are not persistent - if (_sendCallback._shutdownOut) - getEndPoint().shutdownOutput(); - _httpChannel.recycle(); - // If a 100 Continue is still expected to be sent, but no content was read, then // close the parser so that seeks EOF below, not the next request. if (_expects100Continue) diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java index dad6703e3543..f86408c292a6 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java @@ -52,11 +52,15 @@ import org.eclipse.jetty.util.Blocker; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.IO; +import org.eclipse.jetty.util.NanoTime; import org.eclipse.jetty.util.StringUtil; import org.hamcrest.Matchers; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,6 +72,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -1085,6 +1090,56 @@ public void testCloseWhileWriteBlocked() throws Exception } } + @Test + public void testCloseWhileCompletePending() throws Exception + { + String content = "The End!\r\n"; + CountDownLatch handleComplete = new CountDownLatch(1); + startServer(new Handler.Abstract() + { + @Override + public boolean handle(Request request, Response response, Callback callback) throws Exception + { + FutureCallback writeComplete = new FutureCallback(); + Content.Sink.write(response, true, content, writeComplete); + // Wait until the write is complete + writeComplete.get(30, TimeUnit.SECONDS); + + // Wait until test lets the handling complete + assertTrue(handleComplete.await(30, TimeUnit.SECONDS)); + + callback.succeeded(); + return true; + } + }); + + try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) + { + OutputStream output = client.getOutputStream(); + output.write(""" + GET / HTTP/1.1\r + Host: localhost:%d\r + Connection: close\r + \r + """.formatted(_serverURI.getPort()) + .getBytes()); + output.flush(); + + client.setSoTimeout(5000); + long start = NanoTime.now(); + HttpTester.Input input = HttpTester.from(client.getInputStream()); + HttpTester.Response response = HttpTester.parseResponse(input); + assertEquals(HttpStatus.OK_200, response.getStatus()); + assertEquals(content, response.getContent()); + assertFalse(input.isEOF()); + assertEquals(-1, input.fillBuffer()); + assertTrue(input.isEOF()); + assertThat(NanoTime.secondsSince(start), lessThan(5L)); + + } + handleComplete.countDown(); + } + @Test public void testBigBlocks() throws Exception { @@ -1813,8 +1868,9 @@ public void testChunkedShutdown() throws Exception } } - @Test - public void testHoldContent() throws Exception + @ParameterizedTest + @ValueSource(booleans = {false /* TODO, true */}) + public void testHoldContent(boolean close) throws Exception { Queue contents = new ConcurrentLinkedQueue<>(); final int bufferSize = 1024; @@ -1857,6 +1913,10 @@ public void onClosed(Connection connection) } response.setStatus(200); + + if (close) + request.getConnectionMetaData().getConnection().getEndPoint().close(); + callback.succeeded(); return true; } @@ -1897,9 +1957,12 @@ public void onClosed(Connection connection) out.flush(); // check the response - HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); - assertNotNull(response); - assertThat(response.getStatus(), is(200)); + if (!close) + { + HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); + assertNotNull(response); + assertThat(response.getStatus(), is(200)); + } } assertTrue(closed.await(10, TimeUnit.SECONDS));