From 6611a7b600ecda20f46887646f7e7deb46be80d1 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 4 Sep 2024 21:30:08 +0200 Subject: [PATCH 1/7] Fixes #12227 - Improve HttpConnection buffer recycling. Alternative to #12228. In this PR, the responsibility to release the buffers is in 2 methods: onFillable() (called when network data is available, and to process the next request) and parseAndFillForContent() (called from Request.read()). Signed-off-by: Simone Bordet --- .../jetty/server/internal/HttpConnection.java | 241 ++++++++---------- .../jetty/server/HttpServerTestBase.java | 88 +++++-- .../jetty/server/RequestListenersTest.java | 2 +- 3 files changed, 177 insertions(+), 154 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 4306c161a61e..784ed5eee831 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -103,7 +103,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab private final LongAdder bytesOut = new LongAdder(); private final AtomicBoolean _handling = new AtomicBoolean(false); private final HttpFields.Mutable _headerBuilder = HttpFields.build(); - private volatile RetainableByteBuffer _retainableByteBuffer; + private volatile RetainableByteBuffer _requestBuffer; private HttpFields.Mutable _trailers; private Runnable _onRequest; private long _requests; @@ -323,8 +323,8 @@ public ByteBuffer onUpgradeFrom() releaseRequestBuffer(); return null; } - ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining()); - unconsumed.put(_retainableByteBuffer.getByteBuffer()); + ByteBuffer unconsumed = ByteBuffer.allocateDirect(_requestBuffer.remaining()); + unconsumed.put(_requestBuffer.getByteBuffer()); unconsumed.flip(); releaseRequestBuffer(); return unconsumed; @@ -333,56 +333,51 @@ public ByteBuffer onUpgradeFrom() @Override public void onUpgradeTo(ByteBuffer buffer) { - BufferUtil.append(getRequestBuffer(), buffer); + ensureRequestBuffer(); + BufferUtil.append(_requestBuffer.getByteBuffer(), buffer); } - void releaseRequestBuffer() + private void releaseRequestBuffer() { - if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining()) - { - if (LOG.isDebugEnabled()) - LOG.debug("releaseRequestBuffer {}", this); - RetainableByteBuffer buffer = _retainableByteBuffer; - _retainableByteBuffer = null; - if (!buffer.release()) - throw new IllegalStateException("unreleased buffer " + buffer); - } + if (LOG.isDebugEnabled()) + LOG.debug("releasing request buffer {} {}", this, _requestBuffer); + _requestBuffer.release(); + _requestBuffer = null; } - private ByteBuffer getRequestBuffer() + private void ensureRequestBuffer() { - if (_retainableByteBuffer == null) - _retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); - return _retainableByteBuffer.getByteBuffer(); + if (_requestBuffer == null) + _requestBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); } public boolean isRequestBufferEmpty() { - return _retainableByteBuffer == null || !_retainableByteBuffer.hasRemaining(); + return _requestBuffer == null || !_requestBuffer.hasRemaining(); } @Override public void onFillable() { if (LOG.isDebugEnabled()) - LOG.debug(">>onFillable enter {} {} {}", this, _httpChannel, _retainableByteBuffer); + LOG.debug("onFillable enter {} {} {}", this, _httpChannel, _requestBuffer); HttpConnection last = setCurrentConnection(this); try { + ensureRequestBuffer(); + // We must loop until we fill -1 or there is an async pause in handling. // Note that the endpoint might already be closed in some special circumstances. while (true) { - // Fill the request buffer (if needed). int filled = fillRequestBuffer(); if (LOG.isDebugEnabled()) - LOG.debug("onFillable filled {} {} {} {}", filled, this, _httpChannel, _retainableByteBuffer); + LOG.debug("onFillable filled {} {} {} {}", filled, this, _httpChannel, _requestBuffer); if (filled < 0 && getEndPoint().isOutputShutdown()) close(); - // Parse the request buffer. boolean handle = parseRequestBuffer(); // There could be a connection upgrade before handling @@ -390,52 +385,64 @@ public void onFillable() // If there was a connection upgrade, the other // connection took over, nothing more to do here. if (getEndPoint().getConnection() != this) + { + releaseRequestBuffer(); break; + } - // Handle channel event. This will only be true when the headers of a request have been received. + // The headers of a request have been received. if (handle) { Request request = _httpChannel.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("HANDLE {} {}", request, this); - // handle the request by running the task obtained from onRequest + // Handle the request by running the task. _handling.set(true); Runnable onRequest = _onRequest; _onRequest = null; onRequest.run(); - // If the _handling boolean has already been CaS'd to false, then stream is completed and we are no longer - // handling, so the caller can continue to fill and parse more connections. If it is still true, then some - // thread is still handling the request and they will need to organize more filling and parsing once complete. + // If the CaS succeeds, then some thread is still handling the request. + // If the CaS fails, then stream is completed, we are no longer handling, + // so the caller can continue to fill and parse more connections. if (_handling.compareAndSet(true, false)) { if (LOG.isDebugEnabled()) LOG.debug("request !complete {} {}", request, this); + // Cannot release the request buffer here, because the + // application may read concurrently from another thread. + // The request buffer will be released by the application + // reading the request content, or by the implementation + // trying to consume the request content. break; } - // If the request is complete, but has been upgraded, then break + // If there was an upgrade, release and return. if (getEndPoint().getConnection() != this) { if (LOG.isDebugEnabled()) LOG.debug("upgraded {} -> {}", this, getEndPoint().getConnection()); + releaseRequestBuffer(); break; } } + else if (filled == 0) + { + releaseRequestBuffer(); + fillInterested(); + break; + } else if (filled < 0) { + releaseRequestBuffer(); getEndPoint().shutdownOutput(); break; } else if (_requestHandler._failure != null) { // There was an error, don't fill more. - break; - } - else if (filled == 0) - { - fillInterested(); + releaseRequestBuffer(); break; } } @@ -446,11 +453,8 @@ else if (filled == 0) { if (LOG.isDebugEnabled()) LOG.debug("caught exception {} {}", this, _httpChannel, x); - if (_retainableByteBuffer != null) - { - _retainableByteBuffer.clear(); + if (_requestBuffer != null) releaseRequestBuffer(); - } } finally { @@ -461,7 +465,7 @@ else if (filled == 0) { setCurrentConnection(last); if (LOG.isDebugEnabled()) - LOG.debug("< 0) - { bytesIn.add(filled); - } - else - { - if (filled < 0) - _parser.atEOF(); - releaseRequestBuffer(); - } + else if (filled < 0) + _parser.atEOF(); return filled; } @@ -543,11 +539,6 @@ private int fillRequestBuffer() if (LOG.isDebugEnabled()) LOG.debug("Unable to fill from endpoint {}", getEndPoint(), x); _parser.atEOF(); - if (_retainableByteBuffer != null) - { - _retainableByteBuffer.clear(); - releaseRequestBuffer(); - } return -1; } } @@ -555,20 +546,16 @@ private int fillRequestBuffer() private boolean parseRequestBuffer() { if (LOG.isDebugEnabled()) - LOG.debug("{} parse {}", this, _retainableByteBuffer); + LOG.debug("{} parse {}", this, _requestBuffer); if (_parser.isTerminated()) throw new RuntimeIOException("Parser is terminated"); - boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getByteBuffer()); + boolean handle = _parser.parseNext(_requestBuffer.getByteBuffer()); if (LOG.isDebugEnabled()) LOG.debug("{} parsed {} {}", this, handle, _parser); - // recycle buffer ? - if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained()) - releaseRequestBuffer(); - return handle; } @@ -969,14 +956,14 @@ public boolean headerComplete() public boolean content(ByteBuffer buffer) { HttpStreamOverHTTP1 stream = _stream.get(); - if (stream == null || stream._chunk != null || _retainableByteBuffer == null) + if (stream == null || stream._chunk != null || _requestBuffer == null) throw new IllegalStateException(); if (LOG.isDebugEnabled()) - LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _retainableByteBuffer, HttpConnection.this); + LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _requestBuffer, HttpConnection.this); - _retainableByteBuffer.retain(); - stream._chunk = Content.Chunk.asChunk(buffer, false, _retainableByteBuffer); + _requestBuffer.retain(); + stream._chunk = Content.Chunk.asChunk(buffer, false, _requestBuffer); return true; } @@ -1520,6 +1507,11 @@ public void succeeded() if (LOG.isDebugEnabled()) LOG.debug("abort due to pending read {} {} ", this, getEndPoint()); abort(new IOException("Pending read in onCompleted")); + _httpChannel.recycle(); + _parser.reset(); + _generator.reset(); + if (!_handling.compareAndSet(true, false)) + resume(); return; } @@ -1529,6 +1521,8 @@ public void succeeded() _httpChannel.recycle(); _parser.close(); _generator.reset(); + if (!_handling.compareAndSet(true, false)) + releaseRequestBuffer(); return; } @@ -1560,40 +1554,7 @@ public void succeeded() if (LOG.isDebugEnabled()) LOG.debug("non-current completion {}", this); - // If we are looking for the next request - if (_parser.isStart()) - { - // if the buffer is empty - if (isRequestBufferEmpty()) - { - // look for more data - fillInterested(); - } - // else if we are still running - else if (getConnector().isRunning()) - { - // Dispatched to handle a pipelined request - try - { - getExecutor().execute(HttpConnection.this); - } - catch (RejectedExecutionException e) - { - if (getConnector().isRunning()) - LOG.warn("Failed dispatch of {}", this, e); - else - LOG.trace("IGNORED", e); - getEndPoint().close(); - } - } - else - { - getEndPoint().close(); - } - } - // else the parser must be closed, so seek the EOF if we are still open - else if (getEndPoint().isOpen()) - fillInterested(); + resume(); } @Override @@ -1609,6 +1570,28 @@ public void failed(Throwable x) if (LOG.isDebugEnabled()) LOG.debug("aborting", x); abort(x); + _httpChannel.recycle(); + _parser.reset(); + _generator.reset(); + if (!_handling.compareAndSet(true, false)) + resume(); + } + + private void resume() + { + try + { + if (LOG.isDebugEnabled()) + LOG.debug("Resuming onFillable() {}", HttpConnection.this); + // Dispatch to handle pipelined requests. + getExecutor().execute(HttpConnection.this); + } + catch (RejectedExecutionException x) + { + getEndPoint().close(x); + // Resume by running, to release the request buffer. + run(); + } } private void abort(Throwable failure) diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java index f86408c292a6..6cf5573bc50a 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/HttpServerTestBase.java @@ -60,7 +60,7 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; -import org.junit.jupiter.params.provider.ValueSource; +import org.junit.jupiter.params.provider.CsvSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,7 +68,6 @@ import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; @@ -1869,8 +1868,8 @@ public void testChunkedShutdown() throws Exception } @ParameterizedTest - @ValueSource(booleans = {false /* TODO, true */}) - public void testHoldContent(boolean close) throws Exception + @CsvSource({"false,false", "false,true", "true,false", "true,true"}) + public void testHoldContent(boolean close, boolean pipeline) throws Exception { Queue contents = new ConcurrentLinkedQueue<>(); final int bufferSize = 1024; @@ -1881,6 +1880,15 @@ public void testHoldContent(boolean close) throws Exception @Override public boolean handle(Request request, Response response, Callback callback) throws Exception { + if (LOG.isDebugEnabled()) + LOG.debug("Handling request: {}", request); + if ("GET".equals(request.getMethod())) + { + response.setStatus(200); + callback.succeeded(); + return true; + } + request.getConnectionMetaData().getConnection().addEventListener(new Connection.Listener() { @Override @@ -1889,10 +1897,12 @@ public void onClosed(Connection connection) closed.countDown(); } }); + while (true) { Content.Chunk chunk = request.read(); - + if (LOG.isDebugEnabled()) + LOG.debug("read: {}", chunk); if (chunk == null) { try (Blocker.Runnable blocker = Blocker.runnable()) @@ -1902,81 +1912,111 @@ public void onClosed(Connection connection) continue; } } - if (chunk.hasRemaining()) contents.add(chunk); else chunk.release(); - if (chunk.isLast()) break; } - response.setStatus(200); if (close) + { + LOG.info("Closing {}", request.getConnectionMetaData().getConnection().getEndPoint()); request.getConnectionMetaData().getConnection().getEndPoint().close(); + } callback.succeeded(); return true; } }); - byte[] chunk = new byte[bufferSize / 2]; Arrays.fill(chunk, (byte)'X'); - try (Socket client = newSocket(_serverURI.getHost(), _serverURI.getPort())) { OutputStream os = client.getOutputStream(); BufferedOutputStream out = new BufferedOutputStream(os, bufferSize); - out.write((""" + String request = """ POST / HTTP/1.1\r Host: localhost\r - Connection: close\r + Connection: %s\r Transfer-Encoding: chunked\r \r - """).getBytes(StandardCharsets.ISO_8859_1)); + """.formatted(pipeline ? "other" : "close"); + if (LOG.isDebugEnabled()) + LOG.debug("raw request {}", request); + out.write(request.getBytes(StandardCharsets.ISO_8859_1)); // single chunk out.write((Integer.toHexString(chunk.length) + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); out.write(chunk); out.write("\r\n".getBytes(StandardCharsets.ISO_8859_1)); out.flush(); - - // double chunk (will overflow) + // double chunk (will overflow bufferSize) out.write((Integer.toHexString(chunk.length * 2) + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); out.write(chunk); out.write(chunk); out.write("\r\n".getBytes(StandardCharsets.ISO_8859_1)); out.flush(); - // single chunk and end chunk - out.write((Integer.toHexString(chunk.length) + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); - out.write(chunk); - out.write("\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + // single chunk and end chunk plus optional pipelined request + ByteBuffer last = BufferUtil.allocate(4096); + BufferUtil.append(last, (Integer.toHexString(chunk.length) + "\r\n").getBytes(StandardCharsets.ISO_8859_1)); + BufferUtil.append(last, chunk); + BufferUtil.append(last, "\r\n0\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + if (pipeline) + BufferUtil.append(last, "GET / HTTP/1.1\r\nHost: localhost\r\nConnection: close\r\n\r\n".getBytes(StandardCharsets.ISO_8859_1)); + + if (LOG.isDebugEnabled()) + LOG.debug("last {}", BufferUtil.toString(last)); + out.write(BufferUtil.toArray(last)); out.flush(); // check the response - if (!close) + if (close) + { + assertThat(client.getInputStream().read(), equalTo(-1)); + } + else { HttpTester.Response response = HttpTester.parseResponse(client.getInputStream()); assertNotNull(response); assertThat(response.getStatus(), is(200)); + + if (pipeline) + { + response = HttpTester.parseResponse(client.getInputStream()); + assertNotNull(response); + assertThat(response.getStatus(), is(200)); + } } } assertTrue(closed.await(10, TimeUnit.SECONDS)); - long total = contents.stream().mapToLong(Content.Chunk::remaining).sum(); assertThat(total, equalTo(chunk.length * 4L)); - ByteBufferPool rbbp = _connector.getByteBufferPool(); if (rbbp instanceof ArrayByteBufferPool pool) { long buffersBeforeRelease = pool.getAvailableDirectByteBufferCount() + pool.getAvailableHeapByteBufferCount(); + if (LOG.isDebugEnabled()) + { + LOG.debug("pool {}", pool); + contents.stream().map(Content.Chunk::toString).forEach(LOG::debug); + } contents.forEach(Content.Chunk::release); - long buffersAfterRelease = pool.getAvailableDirectByteBufferCount() + pool.getAvailableHeapByteBufferCount(); - assertThat(buffersAfterRelease, greaterThan(buffersBeforeRelease)); + + Awaitility.waitAtMost(5, TimeUnit.SECONDS).until(() -> + { + if (LOG.isDebugEnabled()) + { + LOG.debug("pool {}", pool); + contents.stream().map(Content.Chunk::toString).forEach(LOG::debug); + } + long buffersAfterRelease = pool.getAvailableDirectByteBufferCount() + pool.getAvailableHeapByteBufferCount(); + return buffersAfterRelease > buffersBeforeRelease; + }); assertThat(pool.getAvailableDirectMemory() + pool.getAvailableHeapMemory(), greaterThanOrEqualTo(chunk.length * 4L)); } else diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java index b3ee4573815a..2e80b4f71377 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java @@ -191,7 +191,7 @@ public boolean handle(Request request, Response response, Callback callback) Content-Length: 1 Connection: close - """, 2000 * idleTimeout, TimeUnit.MILLISECONDS)); + """, 2 * idleTimeout, TimeUnit.MILLISECONDS)); int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500; assertEquals(expectedStatus, response.getStatus()); From bad4df0e71fc1ca3a0167e3e376d4a7186ed3099 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 4 Sep 2024 22:15:41 +0200 Subject: [PATCH 2/7] Fixed test expectation. Signed-off-by: Simone Bordet --- .../java/org/eclipse/jetty/server/RequestListenersTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java index 2e80b4f71377..4f57580227eb 100644 --- a/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java +++ b/jetty-core/jetty-server/src/test/java/org/eclipse/jetty/server/RequestListenersTest.java @@ -195,7 +195,9 @@ public boolean handle(Request request, Response response, Callback callback) int expectedStatus = succeedCallback ? HttpStatus.OK_200 : HttpStatus.INTERNAL_SERVER_ERROR_500; assertEquals(expectedStatus, response.getStatus()); - assertThat(failureLatch.await(idleTimeout + 500, TimeUnit.MILLISECONDS), is(failIdleTimeout && !succeedCallback)); + // The failure listener is never invoked because completing the callback + // produces a response that completes the stream so the failure is ignored. + assertThat(failureLatch.await(idleTimeout + 500, TimeUnit.MILLISECONDS), is(false)); } @ParameterizedTest From 9d2aac5228e740a0c119ee8cba57f51a9136c1bc Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 5 Sep 2024 00:07:02 +0200 Subject: [PATCH 3/7] Removed release of request buffer from onUpgradeFrom(). Using the request executor to dispatch onFillable(). Signed-off-by: Simone Bordet --- .../org/eclipse/jetty/server/internal/HttpChannelState.java | 3 ++- .../org/eclipse/jetty/server/internal/HttpConnection.java | 6 +----- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java index bd9a75581c60..92f9bcf43dfc 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpChannelState.java @@ -482,7 +482,8 @@ private Runnable onFailure(Throwable x, boolean remote) } } - // Consume content as soon as possible to open any flow control window. + // Consume content as soon as possible to open any + // flow control window and release any request buffer. Throwable unconsumed = stream.consumeAvailable(); if (unconsumed != null && LOG.isDebugEnabled()) LOG.debug("consuming content during error {}", unconsumed.toString()); diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 784ed5eee831..3cfe9d1c23ae 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -319,14 +319,10 @@ public void setUseOutputDirectByteBuffers(boolean useOutputDirectByteBuffers) public ByteBuffer onUpgradeFrom() { if (isRequestBufferEmpty()) - { - releaseRequestBuffer(); return null; - } ByteBuffer unconsumed = ByteBuffer.allocateDirect(_requestBuffer.remaining()); unconsumed.put(_requestBuffer.getByteBuffer()); unconsumed.flip(); - releaseRequestBuffer(); return unconsumed; } @@ -1584,7 +1580,7 @@ private void resume() if (LOG.isDebugEnabled()) LOG.debug("Resuming onFillable() {}", HttpConnection.this); // Dispatch to handle pipelined requests. - getExecutor().execute(HttpConnection.this); + _httpChannel.getRequest().getComponents().getExecutor().execute(HttpConnection.this); } catch (RejectedExecutionException x) { From e0445791a4f04b19164ed6239c3503d3d16f6e31 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 5 Sep 2024 08:47:36 +1000 Subject: [PATCH 4/7] Improve HttpConnection buffer recycling Ensure there is a request before accessing components --- .../org/eclipse/jetty/server/internal/HttpConnection.java | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 3cfe9d1c23ae..873262452024 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -19,6 +19,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.concurrent.Executor; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -425,12 +426,14 @@ public void onFillable() } else if (filled == 0) { + assert isRequestBufferEmpty(); releaseRequestBuffer(); fillInterested(); break; } else if (filled < 0) { + assert isRequestBufferEmpty(); releaseRequestBuffer(); getEndPoint().shutdownOutput(); break; @@ -1580,7 +1583,9 @@ private void resume() if (LOG.isDebugEnabled()) LOG.debug("Resuming onFillable() {}", HttpConnection.this); // Dispatch to handle pipelined requests. - _httpChannel.getRequest().getComponents().getExecutor().execute(HttpConnection.this); + Request request = _httpChannel.getRequest(); + Executor executor = request == null ? getExecutor() : request.getComponents().getExecutor(); + executor.execute(HttpConnection.this); } catch (RejectedExecutionException x) { From e93d1f3d0e7a4bd94e1ca9944aa12b85939ba2fb Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 5 Sep 2024 10:08:06 +1000 Subject: [PATCH 5/7] Improve HttpConnection buffer recycling Release buffer after consumeAvailable --- .../jetty/server/internal/HttpConnection.java | 21 ++++++++++++------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 873262452024..0f7e617539d7 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -337,7 +337,7 @@ public void onUpgradeTo(ByteBuffer buffer) private void releaseRequestBuffer() { if (LOG.isDebugEnabled()) - LOG.debug("releasing request buffer {} {}", this, _requestBuffer); + LOG.debug("releasing request buffer {} {}", _requestBuffer, this); _requestBuffer.release(); _requestBuffer = null; } @@ -345,7 +345,11 @@ private void releaseRequestBuffer() private void ensureRequestBuffer() { if (_requestBuffer == null) + { _requestBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + if (LOG.isDebugEnabled()) + LOG.debug("request buffer acquired {} {}", _requestBuffer, this); + } } public boolean isRequestBufferEmpty() @@ -357,7 +361,7 @@ public boolean isRequestBufferEmpty() public void onFillable() { if (LOG.isDebugEnabled()) - LOG.debug("onFillable enter {} {} {}", this, _httpChannel, _requestBuffer); + LOG.debug("onFillable enter {} {} {}", _httpChannel, _requestBuffer, this); HttpConnection last = setCurrentConnection(this); try @@ -370,7 +374,7 @@ public void onFillable() { int filled = fillRequestBuffer(); if (LOG.isDebugEnabled()) - LOG.debug("onFillable filled {} {} {} {}", filled, this, _httpChannel, _requestBuffer); + LOG.debug("onFillable filled {} {} {} {}", filled, _httpChannel, _requestBuffer, this); if (filled < 0 && getEndPoint().isOutputShutdown()) close(); @@ -406,7 +410,7 @@ public void onFillable() if (_handling.compareAndSet(true, false)) { if (LOG.isDebugEnabled()) - LOG.debug("request !complete {} {}", request, this); + LOG.debug("request !complete {} {} {}", request, _requestBuffer, this); // Cannot release the request buffer here, because the // application may read concurrently from another thread. // The request buffer will be released by the application @@ -464,7 +468,7 @@ else if (_requestHandler._failure != null) { setCurrentConnection(last); if (LOG.isDebugEnabled()) - LOG.debug("onFillable exit {} {} {}", this, _httpChannel, _requestBuffer); + LOG.debug("onFillable exit {} {} {}", _httpChannel, _requestBuffer, this); } } @@ -524,7 +528,7 @@ private int fillRequestBuffer() filled = getEndPoint().fill(requestBuffer); if (LOG.isDebugEnabled()) - LOG.debug("{} filled {} {}", this, filled, _requestBuffer); + LOG.debug("filled {} {} {}", filled, _requestBuffer, this); if (filled > 0) bytesIn.add(filled); @@ -545,7 +549,7 @@ else if (filled < 0) private boolean parseRequestBuffer() { if (LOG.isDebugEnabled()) - LOG.debug("{} parse {}", this, _requestBuffer); + LOG.debug("parse {} {}", _requestBuffer, this); if (_parser.isTerminated()) throw new RuntimeIOException("Parser is terminated"); @@ -553,7 +557,7 @@ private boolean parseRequestBuffer() boolean handle = _parser.parseNext(_requestBuffer.getByteBuffer()); if (LOG.isDebugEnabled()) - LOG.debug("{} parsed {} {}", this, handle, _parser); + LOG.debug("parsed {} {} {} {}", handle, _parser, _requestBuffer, this); return handle; } @@ -1114,6 +1118,7 @@ public Throwable consumeAvailable() _chunk.release(); _chunk = Content.Chunk.from(result, true); } + releaseRequestBuffer(); return result; } From 27adb552fa0dcceb0a2edefc501fb790f8db3b46 Mon Sep 17 00:00:00 2001 From: gregw Date: Thu, 5 Sep 2024 16:01:24 +1000 Subject: [PATCH 6/7] Improve HttpConnection buffer recycling Still problems --- .../java/org/eclipse/jetty/server/internal/HttpConnection.java | 1 - 1 file changed, 1 deletion(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 0f7e617539d7..f24434472d0f 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -1118,7 +1118,6 @@ public Throwable consumeAvailable() _chunk.release(); _chunk = Content.Chunk.from(result, true); } - releaseRequestBuffer(); return result; } From 17080e6a4da59b2fb9ee5ebed96fedc09ccf3a4e Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Thu, 5 Sep 2024 12:14:09 +0200 Subject: [PATCH 7/7] Removed HttpConnection.onClose() override, as it should not be necessary. Fixed failing tests that were not completing the Handler Callback. Signed-off-by: Simone Bordet --- .../jetty/server/internal/HttpConnection.java | 20 +++----------- .../transport/HttpClientIdleTimeoutTest.java | 26 ++++++++++++------- 2 files changed, 20 insertions(+), 26 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index f24434472d0f..bb291501a4ae 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -488,11 +488,12 @@ void parseAndFillForContent() if (!_parser.inContentState()) { // The request is complete, and we are going to re-enter onFillable(), - // because either A) the request/response was completed synchronously - // so the onFillable() thread will loop, or B) the request/response + // because either A: the request/response was completed synchronously + // so the onFillable() thread will loop, or B: the request/response // was completed asynchronously, and the HttpStreamOverHTTP1 dispatches // a call to onFillable() to process the next request. - // Therefore, there is no need to release the request buffer here. + // Therefore, there is no need to release the request buffer here, + // also because the buffer may contain pipelined requests. break; } @@ -614,19 +615,6 @@ public void onOpen() getExecutor().execute(this); } - @Override - public void onClose(Throwable cause) - { - // TODO: do we really need to do this? - // This event is fired really late, sendCallback should already be failed at this point. - // Revisit whether we still need IteratingCallback.close(). - if (cause == null) - _sendCallback.close(); - else - _sendCallback.abort(cause); - super.onClose(cause); - } - @Override public void run() { diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java index 9a969015e036..01cdf57c6725 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientIdleTimeoutTest.java @@ -15,6 +15,8 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.StringRequestContent; @@ -26,6 +28,8 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; +import static org.awaitility.Awaitility.await; +import static org.hamcrest.Matchers.notNullValue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -38,17 +42,17 @@ public class HttpClientIdleTimeoutTest extends AbstractTest public void testClientIdleTimeout(Transport transport) throws Exception { long serverIdleTimeout = idleTimeout * 2; - CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); + AtomicReference serverCallbackRef = new AtomicReference<>(); start(transport, new Handler.Abstract() { @Override public boolean handle(Request request, Response response, Callback callback) { // Do not succeed the callback if it's a timeout request. - if (!Request.getPathInContext(request).equals("/timeout")) - callback.succeeded(); + if (Request.getPathInContext(request).equals("/timeout")) + request.addFailureListener(x -> serverCallbackRef.set(callback)); else - request.addFailureListener(x -> serverIdleTimeoutLatch.countDown()); + callback.succeeded(); return true; } }); @@ -75,7 +79,8 @@ public boolean handle(Request request, Response response, Callback callback) assertEquals(HttpStatus.OK_200, response.getStatus()); // Wait for the server's idle timeout to trigger to give it a chance to clean up its resources. - assertTrue(serverIdleTimeoutLatch.await(2 * serverIdleTimeout, TimeUnit.MILLISECONDS)); + Callback callback = await().atMost(2 * serverIdleTimeout, TimeUnit.MILLISECONDS).until(serverCallbackRef::get, notNullValue()); + callback.failed(new TimeoutException()); } @ParameterizedTest @@ -83,17 +88,17 @@ public boolean handle(Request request, Response response, Callback callback) public void testRequestIdleTimeout(Transport transport) throws Exception { long serverIdleTimeout = idleTimeout * 2; - CountDownLatch serverIdleTimeoutLatch = new CountDownLatch(1); + AtomicReference serverCallbackRef = new AtomicReference<>(); start(transport, new Handler.Abstract() { @Override public boolean handle(Request request, Response response, Callback callback) throws Exception { // Do not succeed the callback if it's a timeout request. - if (!Request.getPathInContext(request).equals("/timeout")) - callback.succeeded(); + if (Request.getPathInContext(request).equals("/timeout")) + request.addFailureListener(x -> serverCallbackRef.set(callback)); else - request.addFailureListener(x -> serverIdleTimeoutLatch.countDown()); + callback.succeeded(); return true; } }); @@ -120,7 +125,8 @@ public boolean handle(Request request, Response response, Callback callback) thr assertEquals(HttpStatus.OK_200, response.getStatus()); // Wait for the server's idle timeout to trigger to give it a chance to clean up its resources. - assertTrue(serverIdleTimeoutLatch.await(2 * serverIdleTimeout, TimeUnit.MILLISECONDS)); + Callback callback = await().atMost(2 * serverIdleTimeout, TimeUnit.MILLISECONDS).until(serverCallbackRef::get, notNullValue()); + callback.failed(new TimeoutException()); } @ParameterizedTest