From dd3d6d74e52350153353fb1c30b5ebbebaa36444 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Mon, 11 Sep 2023 11:03:40 +0200 Subject: [PATCH] #10226 tentative: revert buffer's thread safety Signed-off-by: Ludovic Orban --- .../jetty/server/internal/HttpConnection.java | 73 +++++++------------ 1 file changed, 28 insertions(+), 45 deletions(-) diff --git a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java index 542f0e9f38a0..006bb35249c6 100644 --- a/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java +++ b/jetty-core/jetty-server/src/main/java/org/eclipse/jetty/server/internal/HttpConnection.java @@ -108,8 +108,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ private final LongAdder bytesOut = new LongAdder(); private final AtomicBoolean _handling = new AtomicBoolean(false); private final HttpFields.Mutable _headerBuilder = HttpFields.build(); - // The retainable buffer needs to be atomic otherwise HalfCloseTest.testAsyncClose becomes flaky. - private final AtomicReference _retainableByteBuffer = new AtomicReference<>(); + private RetainableByteBuffer _retainableByteBuffer; private HttpFields.Mutable _trailers; private Runnable _onRequest; private long _requests; @@ -382,9 +381,8 @@ public ByteBuffer onUpgradeFrom() { if (!isRequestBufferEmpty()) { - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get(); - ByteBuffer unconsumed = ByteBuffer.allocateDirect(retainableByteBuffer.remaining()); - unconsumed.put(retainableByteBuffer.getByteBuffer()); + ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining()); + unconsumed.put(_retainableByteBuffer.getByteBuffer()); unconsumed.flip(); releaseRequestBuffer(); return unconsumed; @@ -407,40 +405,27 @@ public void onFlushed(long bytes) throws IOException void releaseRequestBuffer() { - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.getAndSet(null); - if (retainableByteBuffer != null && !retainableByteBuffer.hasRemaining()) + if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining()) { if (LOG.isDebugEnabled()) LOG.debug("releaseRequestBuffer {}", this); - if (!retainableByteBuffer.release()) - throw new IllegalStateException("unreleased buffer " + retainableByteBuffer); - } - else - { - _retainableByteBuffer.set(retainableByteBuffer); + if (_retainableByteBuffer.release()) + _retainableByteBuffer = null; + else + throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer); } } private ByteBuffer getRequestBuffer() { - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get(); - if (retainableByteBuffer == null) - { - retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); - RetainableByteBuffer witness = _retainableByteBuffer.compareAndExchange(null, retainableByteBuffer); - if (witness != null) - { - retainableByteBuffer.release(); - retainableByteBuffer = witness; - } - } - return retainableByteBuffer.getByteBuffer(); + if (_retainableByteBuffer == null) + _retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); + return _retainableByteBuffer.getByteBuffer(); } public boolean isRequestBufferEmpty() { - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get(); - return retainableByteBuffer == null || !retainableByteBuffer.hasRemaining(); + return _retainableByteBuffer == null || !_retainableByteBuffer.hasRemaining(); } @Override @@ -526,10 +511,9 @@ else if (filled == 0) { if (LOG.isDebugEnabled()) LOG.debug("caught exception {} {}", this, _httpChannel, x); - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get(); - if (retainableByteBuffer != null) + if (_retainableByteBuffer != null) { - retainableByteBuffer.clear(); + _retainableByteBuffer.clear(); releaseRequestBuffer(); } } @@ -578,15 +562,14 @@ void parseAndFillForContent() private int fillRequestBuffer() { - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get(); - if (retainableByteBuffer != null && retainableByteBuffer.isRetained()) + if (_retainableByteBuffer != null && _retainableByteBuffer.isRetained()) { // TODO this is almost certainly wrong RetainableByteBuffer newBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers()); if (LOG.isDebugEnabled()) LOG.debug("replace buffer {} <- {} in {}", _retainableByteBuffer, newBuffer, this); - retainableByteBuffer.release(); - _retainableByteBuffer.set(newBuffer); + _retainableByteBuffer.release(); + _retainableByteBuffer = newBuffer; } if (isRequestBufferEmpty()) @@ -631,14 +614,13 @@ private boolean parseRequestBuffer() if (_parser.isTerminated()) throw new RuntimeIOException("Parser is terminated"); - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get(); - boolean handle = _parser.parseNext(retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : retainableByteBuffer.getByteBuffer()); + boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getByteBuffer()); if (LOG.isDebugEnabled()) LOG.debug("{} parsed {} {}", this, handle, _parser); // recycle buffer ? - if (retainableByteBuffer != null && !retainableByteBuffer.isRetained()) + if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained()) releaseRequestBuffer(); return handle; @@ -690,9 +672,11 @@ public void onOpen() @Override public void onClose(Throwable cause) { - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.getAndSet(null); - if (retainableByteBuffer != null) - retainableByteBuffer.release(); + if (_retainableByteBuffer != null) + { + _retainableByteBuffer.release(); + _retainableByteBuffer = null; + } HttpStreamOverHTTP1 stream = _stream.get(); if (stream != null && cause != null) stream.failed(cause); @@ -1032,16 +1016,15 @@ public boolean headerComplete() @Override public boolean content(ByteBuffer buffer) { - RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get(); HttpStreamOverHTTP1 stream = _stream.get(); - if (stream == null || stream._chunk != null || retainableByteBuffer == null) + if (stream == null || stream._chunk != null || _retainableByteBuffer == null) throw new IllegalStateException(); if (LOG.isDebugEnabled()) - LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), retainableByteBuffer, HttpConnection.this); + LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _retainableByteBuffer, HttpConnection.this); - retainableByteBuffer.retain(); - stream._chunk = Content.Chunk.asChunk(buffer, false, retainableByteBuffer); + _retainableByteBuffer.retain(); + stream._chunk = Content.Chunk.asChunk(buffer, false, _retainableByteBuffer); return true; }