Skip to content

Commit

Permalink
Fix buffer leaks in FCGI and H3 HttpClientIdleTimeoutTest (#10432)
Browse files Browse the repository at this point in the history
#10226
- fix HttpClientIdleTimeoutTest to wait for server's idle timeout before checking for leaks
- improve HttpClientIdleTimeoutTest by making it upload some content
- fix FCGI server leak caused by idle timeout
- fix H3 server leak caused by idle timeout

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban authored Feb 23, 2024
1 parent 1e5aa10 commit efc2785
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,24 +45,22 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
private final HttpChannel.Factory httpChannelFactory = new HttpChannel.DefaultFactory();
private final Attributes attributes = new Lazy();
private final Connector connector;
private final ByteBufferPool networkByteBufferPool;
private final ByteBufferPool bufferPool;
private final boolean sendStatus200;
private final Flusher flusher;
private final HttpConfiguration configuration;
private final ServerParser parser;
private final String id;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
private RetainableByteBuffer networkBuffer;
private RetainableByteBuffer inputBuffer;
private HttpStreamOverFCGI stream;

public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
{
super(connector, configuration, endPoint);
this.connector = connector;
this.networkByteBufferPool = connector.getByteBufferPool();
this.bufferPool = connector.getByteBufferPool();
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;
this.parser = new ServerParser(new ServerListener());
this.id = StringUtil.randomAlphaNumeric(16);
Expand Down Expand Up @@ -168,6 +166,8 @@ public void onOpen()
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug(">>onFillable enter {} {} {}", this, stream, inputBuffer);
acquireInputBuffer();
try
{
Expand All @@ -178,31 +178,40 @@ public void onFillable()
LOG.debug("Read {} bytes from {} {}", read, getEndPoint(), this);
if (read > 0)
{
if (parse(networkBuffer.getByteBuffer()))
// The inputBuffer cannot be released immediately after parse()
// even if the buffer has been fully consumed because releaseInputBuffer()
// must be called as the last release for it to be able to null out the
// inputBuffer field exactly when the latter isn't used anymore.
if (parse(inputBuffer.getByteBuffer()))
return;
}
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
break;
return;
}
else
{
releaseInputBuffer();
shutdown();
break;
return;
}
}
}
catch (Exception x)
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to fill endpoint", x);
networkBuffer.clear();
inputBuffer.clear();
releaseInputBuffer();
// TODO: fail and close ?
}
finally
{
if (LOG.isDebugEnabled())
LOG.debug("<<onFillable exit {} {} {}", this, stream, inputBuffer);
}
}

/**
Expand All @@ -214,42 +223,51 @@ void parseAndFill()
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill {}", this);
acquireInputBuffer();
// This loop must run only until the request is completed.
// See also HttpConnection.parseAndFillForContent().
while (stream != null)
{
if (parse(networkBuffer.getByteBuffer()))
// The inputBuffer cannot be released immediately after parse()
// even if the buffer has been fully consumed because releaseInputBuffer()
// must be called as the last release for it to be able to null out the
// inputBuffer field exactly when the latter isn't used anymore.
if (parse(inputBuffer.getByteBuffer()))
return;

// Check if the request was completed by the parsing.
if (stream == null)
if (stream == null || fillInputBuffer() <= 0)
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill completed the request by parsing {}", this);
releaseInputBuffer();
return;
if (fillInputBuffer() <= 0)
break;
}
}
}

private void acquireInputBuffer()
{
if (networkBuffer == null)
networkBuffer = networkByteBufferPool.acquire(configuration.getResponseHeaderSize(), isUseInputDirectByteBuffers());
if (inputBuffer == null)
inputBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
}

private void releaseInputBuffer()
{
if (networkBuffer == null)
if (inputBuffer == null)
return;
boolean released = networkBuffer.release();
boolean released = inputBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("releaseInputBuffer {} {}", released, this);
if (released)
networkBuffer = null;
inputBuffer = null;
}

private int fillInputBuffer()
{
try
{
return getEndPoint().fill(networkBuffer.getByteBuffer());
return getEndPoint().fill(inputBuffer.getByteBuffer());
}
catch (Throwable x)
{
Expand Down Expand Up @@ -351,9 +369,9 @@ public boolean onContent(int request, FCGI.StreamType streamType, ByteBuffer buf
LOG.debug("Request {} {} content {} on {}", request, streamType, buffer, stream);
if (stream != null)
{
// No need to call networkBuffer.retain() here.
// No need to call inputBuffer.retain() here.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, inputBuffer);
stream.onContent(chunk);
// Signal that the content is processed asynchronously, to ensure backpressure.
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
private HTTP3Stream stream;
private RetainableByteBuffer networkBuffer;
private RetainableByteBuffer inputBuffer;
private boolean remotelyClosed;

public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool bufferPool, MessageParser parser)
Expand All @@ -56,6 +56,13 @@ public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, Byt
parser.init(MessageListener::new);
}

public void onFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("onFailure on {}", this, failure);
tryReleaseInputBuffer(true);
}

@Override
public QuicStreamEndPoint getEndPoint()
{
Expand Down Expand Up @@ -112,12 +119,12 @@ private void processDataFrames(boolean setFillInterest)
{
try
{
tryAcquireBuffer();
tryAcquireInputBuffer();

MessageParser.Result result = parseAndFill(setFillInterest);
switch (result)
{
case NO_FRAME -> tryReleaseBuffer(false);
case NO_FRAME -> tryReleaseInputBuffer(false);
case SWITCH_MODE ->
{
parser.setDataMode(false);
Expand All @@ -132,14 +139,14 @@ private void processDataFrames(boolean setFillInterest)
{
// The last frame may have caused a write that we need to flush.
getEndPoint().getQuicSession().flush();
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
}
}
}
}
catch (Throwable x)
{
tryReleaseBuffer(true);
tryReleaseInputBuffer(true);
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
getEndPoint().close(error, x);
// Notify the application that a failure happened.
Expand All @@ -151,7 +158,7 @@ private void processNonDataFrames()
{
try
{
tryAcquireBuffer();
tryAcquireInputBuffer();

while (true)
{
Expand All @@ -160,14 +167,14 @@ private void processNonDataFrames()
{
case NO_FRAME ->
{
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
return;
}
case BLOCKED_FRAME ->
{
// Return immediately because another thread may
// resume the processing as the stream is unblocked.
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
return;
}
case SWITCH_MODE ->
Expand All @@ -192,7 +199,7 @@ private void processNonDataFrames()
// However, the last frame may have
// caused a write that we need to flush.
getEndPoint().getQuicSession().flush();
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
return;
}

Expand All @@ -201,7 +208,7 @@ private void processNonDataFrames()

if (stream.hasDemandOrStall())
{
if (networkBuffer != null && networkBuffer.hasRemaining())
if (inputBuffer != null && inputBuffer.hasRemaining())
{
// There are bytes left in the buffer; if there are not
// enough bytes to parse a DATA frame and call the
Expand All @@ -212,7 +219,7 @@ private void processNonDataFrames()
{
// No bytes left in the buffer, but there is demand.
// Set fill interest to call the application when bytes arrive.
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
fillInterested();
}
}
Expand All @@ -227,7 +234,7 @@ private void processNonDataFrames()
}
catch (Throwable x)
{
tryReleaseBuffer(true);
tryReleaseInputBuffer(true);
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
getEndPoint().close(error, x);
// Notify the application that a failure happened.
Expand All @@ -242,28 +249,28 @@ public void receive()
processDataFrames(false);
}

private void tryAcquireBuffer()
private void tryAcquireInputBuffer()
{
if (networkBuffer == null)
if (inputBuffer == null)
{
networkBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
inputBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", networkBuffer);
LOG.debug("acquired {}", inputBuffer);
}
}

private void tryReleaseBuffer(boolean force)
private void tryReleaseInputBuffer(boolean force)
{
if (networkBuffer != null)
if (inputBuffer != null)
{
if (networkBuffer.hasRemaining() && force)
networkBuffer.clear();
if (!networkBuffer.hasRemaining())
if (inputBuffer.hasRemaining() && force)
inputBuffer.clear();
if (!inputBuffer.hasRemaining())
{
networkBuffer.release();
inputBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("released {}", networkBuffer);
networkBuffer = null;
LOG.debug("released {}", inputBuffer);
inputBuffer = null;
}
}
}
Expand All @@ -273,30 +280,30 @@ private MessageParser.Result parseAndFill(boolean setFillInterest)
try
{
if (LOG.isDebugEnabled())
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, networkBuffer);
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, inputBuffer);

while (true)
{
ByteBuffer byteBuffer = networkBuffer.getByteBuffer();
ByteBuffer byteBuffer = inputBuffer.getByteBuffer();
MessageParser.Result result = parser.parse(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} on {} with buffer {}", result, this, networkBuffer);
LOG.debug("parsed {} on {} with buffer {}", result, this, inputBuffer);
if (result != MessageParser.Result.NO_FRAME)
return result;

if (networkBuffer.isRetained())
if (inputBuffer.isRetained())
{
networkBuffer.release();
inputBuffer.release();
RetainableByteBuffer newBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("reacquired {} for retained {}", newBuffer, networkBuffer);
networkBuffer = newBuffer;
byteBuffer = networkBuffer.getByteBuffer();
LOG.debug("reacquired {} for retained {}", newBuffer, inputBuffer);
inputBuffer = newBuffer;
byteBuffer = inputBuffer.getByteBuffer();
}

int filled = fill(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {} with buffer {}", filled, this, networkBuffer);
LOG.debug("filled {} on {} with buffer {}", filled, this, inputBuffer);

if (filled > 0)
continue;
Expand Down Expand Up @@ -395,9 +402,9 @@ private void processData(DataFrame frame, Runnable delegate)
}
else
{
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
data = new StreamData(frame, networkBuffer);
// No need to call inputBuffer.retain() here, since we know
// that the action will be run before releasing the inputBuffer.
data = new StreamData(frame, inputBuffer);
}

delegate.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ public Runnable onFailure(Throwable failure)
chunk.release();
chunk = Content.Chunk.from(failure, true);
}
connection.onFailure(failure);
return httpChannel.onFailure(failure);
}
}
Loading

0 comments on commit efc2785

Please sign in to comment.