Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix buffer leaks in FCGI and H3 HttpClientIdleTimeoutTest #10432

Merged
Merged
Show file tree
Hide file tree
Changes from 21 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
a424ae3
- fix HttpClientIdleTimeoutTest to wait for server's idle timeout bef…
lorban Dec 15, 2023
27da0b9
handle review comments
lorban Dec 18, 2023
1659401
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12-…
lorban Dec 18, 2023
231dcfa
fix missing acquire
lorban Dec 18, 2023
2689317
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12-…
lorban Jan 16, 2024
2bc69d2
release the network buffer when the parser commands to stop and it is…
lorban Jan 17, 2024
a617f3d
release the network buffer when the parser commands to stop and it is…
lorban Jan 17, 2024
2b800a9
fix over-releases and add comment
lorban Jan 24, 2024
fa05eda
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12-…
lorban Jan 25, 2024
7c394d9
- fix HttpClientIdleTimeoutTest to wait for server's idle timeout bef…
lorban Dec 15, 2023
8cd3f95
handle review comments
lorban Dec 18, 2023
0c4ad22
fix missing acquire
lorban Dec 18, 2023
82cda98
release the network buffer when the parser commands to stop and it is…
lorban Jan 17, 2024
7e9ff18
release the network buffer when the parser commands to stop and it is…
lorban Jan 17, 2024
4dbd35a
fix over-releases and add comment
lorban Jan 24, 2024
5f0891a
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12-…
lorban Jan 30, 2024
57fbdee
Merge branch 'fix/jetty-12-10226-fcgi-HttpClientIdleTimeoutTest-leaks…
lorban Jan 30, 2024
b08a292
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12-…
lorban Feb 20, 2024
67b5c5d
handle review comments
lorban Feb 22, 2024
81f51a5
Merge remote-tracking branch 'origin/jetty-12.0.x' into fix/jetty-12-…
lorban Feb 22, 2024
c783fcb
Using the input buffer size for the network buffer.
sbordet Feb 22, 2024
2bd8896
handle review comments
lorban Feb 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,11 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
private final ByteBufferPool networkByteBufferPool;
private final boolean sendStatus200;
private final Flusher flusher;
private final HttpConfiguration configuration;
private final ServerParser parser;
private final String id;
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
private RetainableByteBuffer networkBuffer;
private RetainableByteBuffer inputBuffer;
private HttpStreamOverFCGI stream;

public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
Expand All @@ -62,7 +61,6 @@ public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfigur
this.connector = connector;
this.networkByteBufferPool = connector.getByteBufferPool();
this.flusher = new Flusher(endPoint);
this.configuration = configuration;
this.sendStatus200 = sendStatus200;
this.parser = new ServerParser(new ServerListener());
this.id = StringUtil.randomAlphaNumeric(16);
Expand Down Expand Up @@ -168,6 +166,8 @@ public void onOpen()
@Override
public void onFillable()
{
if (LOG.isDebugEnabled())
LOG.debug(">>onFillable enter {} {} {}", this, stream, inputBuffer);
acquireInputBuffer();
try
{
Expand All @@ -178,31 +178,40 @@ public void onFillable()
LOG.debug("Read {} bytes from {} {}", read, getEndPoint(), this);
if (read > 0)
{
if (parse(networkBuffer.getByteBuffer()))
// The networkBuffer cannot be released immediately after parse()
// even if the buffer has been fully consumed because releaseInputBuffer()
// must be called as the last release for it to be able to null out the
// networkBuffer field exactly when the latter isn't used anymore.
if (parse(inputBuffer.getByteBuffer()))
return;
}
else if (read == 0)
{
releaseInputBuffer();
fillInterested();
break;
return;
}
else
{
releaseInputBuffer();
shutdown();
break;
return;
}
}
}
catch (Exception x)
{
if (LOG.isDebugEnabled())
LOG.debug("Unable to fill endpoint", x);
networkBuffer.clear();
inputBuffer.clear();
releaseInputBuffer();
// TODO: fail and close ?
}
finally
{
if (LOG.isDebugEnabled())
LOG.debug("<<onFillable exit {} {} {}", this, stream, inputBuffer);
}
}

/**
Expand All @@ -214,42 +223,51 @@ void parseAndFill()
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill {}", this);
acquireInputBuffer();
lorban marked this conversation as resolved.
Show resolved Hide resolved
// This loop must run only until the request is completed.
// See also HttpConnection.parseAndFillForContent().
while (stream != null)
{
if (parse(networkBuffer.getByteBuffer()))
// The networkBuffer cannot be released immediately after parse()
// even if the buffer has been fully consumed because releaseInputBuffer()
// must be called as the last release for it to be able to null out the
// networkBuffer field exactly when the latter isn't used anymore.
if (parse(inputBuffer.getByteBuffer()))
return;

// Check if the request was completed by the parsing.
if (stream == null)
if (stream == null || fillInputBuffer() <= 0)
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill completed the request by parsing {}", this);
releaseInputBuffer();
return;
if (fillInputBuffer() <= 0)
break;
}
}
}

private void acquireInputBuffer()
{
if (networkBuffer == null)
networkBuffer = networkByteBufferPool.acquire(configuration.getResponseHeaderSize(), isUseInputDirectByteBuffers());
if (inputBuffer == null)
inputBuffer = networkByteBufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
}

private void releaseInputBuffer()
{
if (networkBuffer == null)
if (inputBuffer == null)
return;
boolean released = networkBuffer.release();
boolean released = inputBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("releaseInputBuffer {} {}", released, this);
if (released)
networkBuffer = null;
inputBuffer = null;
}

private int fillInputBuffer()
{
try
{
return getEndPoint().fill(networkBuffer.getByteBuffer());
return getEndPoint().fill(inputBuffer.getByteBuffer());
}
catch (Throwable x)
{
Expand Down Expand Up @@ -353,7 +371,7 @@ public boolean onContent(int request, FCGI.StreamType streamType, ByteBuffer buf
{
// No need to call networkBuffer.retain() here.
// The receiver of the chunk decides whether to consume/retain it.
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, networkBuffer);
Content.Chunk chunk = Content.Chunk.asChunk(buffer, false, inputBuffer);
stream.onContent(chunk);
// Signal that the content is processed asynchronously, to ensure backpressure.
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public abstract class HTTP3StreamConnection extends AbstractConnection
private final MessageParser parser;
private boolean useInputDirectByteBuffers = true;
private HTTP3Stream stream;
private RetainableByteBuffer networkBuffer;
private RetainableByteBuffer inputBuffer;
private boolean remotelyClosed;

public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, ByteBufferPool bufferPool, MessageParser parser)
Expand All @@ -56,6 +56,13 @@ public HTTP3StreamConnection(QuicStreamEndPoint endPoint, Executor executor, Byt
parser.init(MessageListener::new);
}

public void onFailure(Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("onFailure on {}", this, failure);
tryReleaseInputBuffer(true);
}

@Override
public QuicStreamEndPoint getEndPoint()
{
Expand Down Expand Up @@ -112,12 +119,12 @@ private void processDataFrames(boolean setFillInterest)
{
try
{
tryAcquireBuffer();
tryAcquireInputBuffer();

MessageParser.Result result = parseAndFill(setFillInterest);
switch (result)
{
case NO_FRAME -> tryReleaseBuffer(false);
case NO_FRAME -> tryReleaseInputBuffer(false);
case SWITCH_MODE ->
{
parser.setDataMode(false);
Expand All @@ -132,14 +139,14 @@ private void processDataFrames(boolean setFillInterest)
{
// The last frame may have caused a write that we need to flush.
getEndPoint().getQuicSession().flush();
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
}
}
}
}
catch (Throwable x)
{
tryReleaseBuffer(true);
tryReleaseInputBuffer(true);
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
getEndPoint().close(error, x);
// Notify the application that a failure happened.
Expand All @@ -151,7 +158,7 @@ private void processNonDataFrames()
{
try
{
tryAcquireBuffer();
tryAcquireInputBuffer();

while (true)
{
Expand All @@ -160,14 +167,14 @@ private void processNonDataFrames()
{
case NO_FRAME ->
{
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
return;
}
case BLOCKED_FRAME ->
{
// Return immediately because another thread may
// resume the processing as the stream is unblocked.
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
return;
}
case SWITCH_MODE ->
Expand All @@ -192,7 +199,7 @@ private void processNonDataFrames()
// However, the last frame may have
// caused a write that we need to flush.
getEndPoint().getQuicSession().flush();
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
return;
}

Expand All @@ -201,7 +208,7 @@ private void processNonDataFrames()

if (stream.hasDemandOrStall())
{
if (networkBuffer != null && networkBuffer.hasRemaining())
if (inputBuffer != null && inputBuffer.hasRemaining())
{
// There are bytes left in the buffer; if there are not
// enough bytes to parse a DATA frame and call the
Expand All @@ -212,7 +219,7 @@ private void processNonDataFrames()
{
// No bytes left in the buffer, but there is demand.
// Set fill interest to call the application when bytes arrive.
tryReleaseBuffer(false);
tryReleaseInputBuffer(false);
fillInterested();
}
}
Expand All @@ -227,7 +234,7 @@ private void processNonDataFrames()
}
catch (Throwable x)
{
tryReleaseBuffer(true);
tryReleaseInputBuffer(true);
long error = HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code();
getEndPoint().close(error, x);
// Notify the application that a failure happened.
Expand All @@ -242,28 +249,28 @@ public void receive()
processDataFrames(false);
}

private void tryAcquireBuffer()
private void tryAcquireInputBuffer()
{
if (networkBuffer == null)
if (inputBuffer == null)
{
networkBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
inputBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("acquired {}", networkBuffer);
LOG.debug("acquired {}", inputBuffer);
}
}

private void tryReleaseBuffer(boolean force)
private void tryReleaseInputBuffer(boolean force)
{
if (networkBuffer != null)
if (inputBuffer != null)
{
if (networkBuffer.hasRemaining() && force)
networkBuffer.clear();
if (!networkBuffer.hasRemaining())
if (inputBuffer.hasRemaining() && force)
inputBuffer.clear();
if (!inputBuffer.hasRemaining())
{
networkBuffer.release();
inputBuffer.release();
if (LOG.isDebugEnabled())
LOG.debug("released {}", networkBuffer);
networkBuffer = null;
LOG.debug("released {}", inputBuffer);
inputBuffer = null;
}
}
}
Expand All @@ -273,30 +280,30 @@ private MessageParser.Result parseAndFill(boolean setFillInterest)
try
{
if (LOG.isDebugEnabled())
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, networkBuffer);
LOG.debug("parse+fill setFillInterest={} on {} with buffer {}", setFillInterest, this, inputBuffer);

while (true)
{
ByteBuffer byteBuffer = networkBuffer.getByteBuffer();
ByteBuffer byteBuffer = inputBuffer.getByteBuffer();
MessageParser.Result result = parser.parse(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("parsed {} on {} with buffer {}", result, this, networkBuffer);
LOG.debug("parsed {} on {} with buffer {}", result, this, inputBuffer);
if (result != MessageParser.Result.NO_FRAME)
return result;

if (networkBuffer.isRetained())
if (inputBuffer.isRetained())
{
networkBuffer.release();
inputBuffer.release();
RetainableByteBuffer newBuffer = bufferPool.acquire(getInputBufferSize(), isUseInputDirectByteBuffers());
if (LOG.isDebugEnabled())
LOG.debug("reacquired {} for retained {}", newBuffer, networkBuffer);
networkBuffer = newBuffer;
byteBuffer = networkBuffer.getByteBuffer();
LOG.debug("reacquired {} for retained {}", newBuffer, inputBuffer);
inputBuffer = newBuffer;
byteBuffer = inputBuffer.getByteBuffer();
}

int filled = fill(byteBuffer);
if (LOG.isDebugEnabled())
LOG.debug("filled {} on {} with buffer {}", filled, this, networkBuffer);
LOG.debug("filled {} on {} with buffer {}", filled, this, inputBuffer);

if (filled > 0)
continue;
Expand Down Expand Up @@ -397,7 +404,7 @@ private void processData(DataFrame frame, Runnable delegate)
{
// No need to call networkBuffer.retain() here, since we know
// that the action will be run before releasing the networkBuffer.
data = new StreamData(frame, networkBuffer);
data = new StreamData(frame, inputBuffer);
}

delegate.run();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,7 @@ public Runnable onFailure(Throwable failure)
chunk.release();
chunk = Content.Chunk.from(failure, true);
}
connection.onFailure(failure);
return httpChannel.onFailure(failure);
}
}
Loading
Loading