Skip to content

Commit

Permalink
#10226 release retainable buffer atomically
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Sep 11, 2023
1 parent 2f6a204 commit 43730e4
Showing 1 changed file with 45 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,8 @@ public class HttpConnection extends AbstractConnection implements Runnable, Writ
private final LongAdder bytesOut = new LongAdder();
private final AtomicBoolean _handling = new AtomicBoolean(false);
private final HttpFields.Mutable _headerBuilder = HttpFields.build();
private volatile RetainableByteBuffer _retainableByteBuffer;
// The retainable buffer needs to be atomic otherwise HalfCloseTest.testAsyncClose and RequestTest.testCachedServletCookies becomes flaky.
private final AtomicReference<RetainableByteBuffer> _retainableByteBuffer = new AtomicReference<>();
private HttpFields.Mutable _trailers;
private Runnable _onRequest;
private long _requests;
Expand Down Expand Up @@ -381,8 +382,9 @@ public ByteBuffer onUpgradeFrom()
{
if (!isRequestBufferEmpty())
{
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
ByteBuffer unconsumed = ByteBuffer.allocateDirect(retainableByteBuffer.remaining());
unconsumed.put(retainableByteBuffer.getByteBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
Expand All @@ -405,27 +407,40 @@ public void onFlushed(long bytes) throws IOException

void releaseRequestBuffer()
{
if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining())
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.getAndSet(null);
if (retainableByteBuffer != null && !retainableByteBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
if (_retainableByteBuffer.release())
_retainableByteBuffer = null;
else
throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer);
if (!retainableByteBuffer.release())
throw new IllegalStateException("unreleased buffer " + retainableByteBuffer);
}
else
{
_retainableByteBuffer.set(retainableByteBuffer);
}
}

private ByteBuffer getRequestBuffer()
{
if (_retainableByteBuffer == null)
_retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
return _retainableByteBuffer.getByteBuffer();
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
if (retainableByteBuffer == null)
{
retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
RetainableByteBuffer witness = _retainableByteBuffer.compareAndExchange(null, retainableByteBuffer);
if (witness != null)
{
retainableByteBuffer.release();
retainableByteBuffer = witness;
}
}
return retainableByteBuffer.getByteBuffer();
}

public boolean isRequestBufferEmpty()
{
return _retainableByteBuffer == null || !_retainableByteBuffer.hasRemaining();
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
return retainableByteBuffer == null || !retainableByteBuffer.hasRemaining();
}

@Override
Expand Down Expand Up @@ -511,9 +526,10 @@ else if (filled == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("caught exception {} {}", this, _httpChannel, x);
if (_retainableByteBuffer != null)
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
if (retainableByteBuffer != null)
{
_retainableByteBuffer.clear();
retainableByteBuffer.clear();
releaseRequestBuffer();
}
}
Expand Down Expand Up @@ -562,14 +578,15 @@ void parseAndFillForContent()

private int fillRequestBuffer()
{
if (_retainableByteBuffer != null && _retainableByteBuffer.isRetained())
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
if (retainableByteBuffer != null && retainableByteBuffer.isRetained())
{
// TODO this is almost certainly wrong
RetainableByteBuffer newBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("replace buffer {} <- {} in {}", _retainableByteBuffer, newBuffer, this);
_retainableByteBuffer.release();
_retainableByteBuffer = newBuffer;
retainableByteBuffer.release();
_retainableByteBuffer.set(newBuffer);
}

if (isRequestBufferEmpty())
Expand Down Expand Up @@ -614,13 +631,14 @@ private boolean parseRequestBuffer()

if (_parser.isTerminated())
throw new RuntimeIOException("Parser is terminated");
boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getByteBuffer());
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
boolean handle = _parser.parseNext(retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : retainableByteBuffer.getByteBuffer());

if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}", this, handle, _parser);

// recycle buffer ?
if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained())
if (retainableByteBuffer != null && !retainableByteBuffer.isRetained())
releaseRequestBuffer();

return handle;
Expand Down Expand Up @@ -672,11 +690,9 @@ public void onOpen()
@Override
public void onClose(Throwable cause)
{
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.release();
_retainableByteBuffer = null;
}
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.getAndSet(null);
if (retainableByteBuffer != null)
retainableByteBuffer.release();
HttpStreamOverHTTP1 stream = _stream.get();
if (stream != null && cause != null)
stream.failed(cause);
Expand Down Expand Up @@ -1016,15 +1032,16 @@ public boolean headerComplete()
@Override
public boolean content(ByteBuffer buffer)
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
HttpStreamOverHTTP1 stream = _stream.get();
if (stream == null || stream._chunk != null || _retainableByteBuffer == null)
if (stream == null || stream._chunk != null || retainableByteBuffer == null)
throw new IllegalStateException();

if (LOG.isDebugEnabled())
LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _retainableByteBuffer, HttpConnection.this);
LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), retainableByteBuffer, HttpConnection.this);

_retainableByteBuffer.retain();
stream._chunk = Content.Chunk.asChunk(buffer, false, _retainableByteBuffer);
retainableByteBuffer.retain();
stream._chunk = Content.Chunk.asChunk(buffer, false, retainableByteBuffer);
return true;
}

Expand Down

0 comments on commit 43730e4

Please sign in to comment.