Skip to content

Commit

Permalink
#10226 tentative: revert buffer's thread safety
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Mar 29, 2024
1 parent 30cd8db commit a882927
Showing 1 changed file with 28 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ public class HttpConnection extends AbstractMetaDataConnection implements Runnab
private final LongAdder bytesOut = new LongAdder();
private final AtomicBoolean _handling = new AtomicBoolean(false);
private final HttpFields.Mutable _headerBuilder = HttpFields.build();
// The retainable buffer needs to be atomic otherwise HalfCloseTest.testAsyncClose and RequestTest.testCachedServletCookies becomes flaky.
private final AtomicReference<RetainableByteBuffer> _retainableByteBuffer = new AtomicReference<>();
private RetainableByteBuffer _retainableByteBuffer;
private HttpFields.Mutable _trailers;
private Runnable _onRequest;
private long _requests;
Expand Down Expand Up @@ -322,9 +321,8 @@ public ByteBuffer onUpgradeFrom()
{
if (!isRequestBufferEmpty())
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
ByteBuffer unconsumed = ByteBuffer.allocateDirect(retainableByteBuffer.remaining());
unconsumed.put(retainableByteBuffer.getByteBuffer());
ByteBuffer unconsumed = ByteBuffer.allocateDirect(_retainableByteBuffer.remaining());
unconsumed.put(_retainableByteBuffer.getByteBuffer());
unconsumed.flip();
releaseRequestBuffer();
return unconsumed;
Expand All @@ -347,40 +345,27 @@ public void onFlushed(long bytes) throws IOException

void releaseRequestBuffer()
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.getAndSet(null);
if (retainableByteBuffer != null && !retainableByteBuffer.hasRemaining())
if (_retainableByteBuffer != null && !_retainableByteBuffer.hasRemaining())
{
if (LOG.isDebugEnabled())
LOG.debug("releaseRequestBuffer {}", this);
if (!retainableByteBuffer.release())
throw new IllegalStateException("unreleased buffer " + retainableByteBuffer);
}
else
{
_retainableByteBuffer.set(retainableByteBuffer);
if (_retainableByteBuffer.release())
_retainableByteBuffer = null;
else
throw new IllegalStateException("unreleased buffer " + _retainableByteBuffer);
}
}

private ByteBuffer getRequestBuffer()
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
if (retainableByteBuffer == null)
{
retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
RetainableByteBuffer witness = _retainableByteBuffer.compareAndExchange(null, retainableByteBuffer);
if (witness != null)
{
retainableByteBuffer.release();
retainableByteBuffer = witness;
}
}
return retainableByteBuffer.getByteBuffer();
if (_retainableByteBuffer == null)
_retainableByteBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
return _retainableByteBuffer.getByteBuffer();
}

public boolean isRequestBufferEmpty()
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
return retainableByteBuffer == null || !retainableByteBuffer.hasRemaining();
return _retainableByteBuffer == null || !_retainableByteBuffer.hasRemaining();
}

@Override
Expand Down Expand Up @@ -466,10 +451,9 @@ else if (filled == 0)
{
if (LOG.isDebugEnabled())
LOG.debug("caught exception {} {}", this, _httpChannel, x);
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
if (retainableByteBuffer != null)
if (_retainableByteBuffer != null)
{
retainableByteBuffer.clear();
_retainableByteBuffer.clear();
releaseRequestBuffer();
}
}
Expand Down Expand Up @@ -518,15 +502,14 @@ void parseAndFillForContent()

private int fillRequestBuffer()
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
if (retainableByteBuffer != null && retainableByteBuffer.isRetained())
if (_retainableByteBuffer != null && _retainableByteBuffer.isRetained())
{
// TODO this is almost certainly wrong
RetainableByteBuffer newBuffer = _bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("replace buffer {} <- {} in {}", _retainableByteBuffer, newBuffer, this);
retainableByteBuffer.release();
_retainableByteBuffer.set(newBuffer);
_retainableByteBuffer.release();
_retainableByteBuffer = newBuffer;
}

if (!isRequestBufferEmpty())
Expand Down Expand Up @@ -581,14 +564,13 @@ private boolean parseRequestBuffer()

if (_parser.isTerminated())
throw new RuntimeIOException("Parser is terminated");
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
boolean handle = _parser.parseNext(retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : retainableByteBuffer.getByteBuffer());
boolean handle = _parser.parseNext(_retainableByteBuffer == null ? BufferUtil.EMPTY_BUFFER : _retainableByteBuffer.getByteBuffer());

if (LOG.isDebugEnabled())
LOG.debug("{} parsed {} {}", this, handle, _parser);

// recycle buffer ?
if (retainableByteBuffer != null && !retainableByteBuffer.isRetained())
if (_retainableByteBuffer != null && !_retainableByteBuffer.isRetained())
releaseRequestBuffer();

return handle;
Expand Down Expand Up @@ -640,9 +622,11 @@ public void onOpen()
@Override
public void onClose(Throwable cause)
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.getAndSet(null);
if (retainableByteBuffer != null)
retainableByteBuffer.release();
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.release();
_retainableByteBuffer = null;
}
HttpStreamOverHTTP1 stream = _stream.get();
if (stream != null && cause != null)
stream.failed(cause);
Expand Down Expand Up @@ -981,16 +965,15 @@ public boolean headerComplete()
@Override
public boolean content(ByteBuffer buffer)
{
RetainableByteBuffer retainableByteBuffer = _retainableByteBuffer.get();
HttpStreamOverHTTP1 stream = _stream.get();
if (stream == null || stream._chunk != null || retainableByteBuffer == null)
if (stream == null || stream._chunk != null || _retainableByteBuffer == null)
throw new IllegalStateException();

if (LOG.isDebugEnabled())
LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), retainableByteBuffer, HttpConnection.this);
LOG.debug("content {}/{} for {}", BufferUtil.toDetailString(buffer), _retainableByteBuffer, HttpConnection.this);

retainableByteBuffer.retain();
stream._chunk = Content.Chunk.asChunk(buffer, false, retainableByteBuffer);
_retainableByteBuffer.retain();
stream._chunk = Content.Chunk.asChunk(buffer, false, _retainableByteBuffer);
return true;
}

Expand Down

0 comments on commit a882927

Please sign in to comment.