Skip to content

Commit

Permalink
#10226 Attempt at fixing FCGI buffer leak
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Mar 29, 2024
1 parent 0451bd4 commit 484882b
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;

import org.eclipse.jetty.fcgi.FCGI;
import org.eclipse.jetty.fcgi.generator.Flusher;
Expand Down Expand Up @@ -50,10 +51,10 @@ public class ServerFCGIConnection extends AbstractMetaDataConnection implements
private final Flusher flusher;
private final ServerParser parser;
private final String id;
private final AtomicReference<HttpStreamOverFCGI> stream = new AtomicReference<>();
private boolean useInputDirectByteBuffers;
private boolean useOutputDirectByteBuffers;
private RetainableByteBuffer inputBuffer;
private HttpStreamOverFCGI stream;

public ServerFCGIConnection(Connector connector, EndPoint endPoint, HttpConfiguration configuration, boolean sendStatus200)
{
Expand Down Expand Up @@ -163,6 +164,23 @@ public void onOpen()
fillInterested();
}

@Override
public void onClose(Throwable cause)
{
super.onClose(cause);
HttpStreamOverFCGI stream = this.stream.getAndSet(null);
if (stream != null)
{
stream.failed(cause);

if (inputBuffer != null)
{
inputBuffer.release();
inputBuffer = null;
}
}
}

@Override
public void onFillable()
{
Expand Down Expand Up @@ -226,7 +244,7 @@ void parseAndFill()
acquireInputBuffer();
// This loop must run only until the request is completed.
// See also HttpConnection.parseAndFillForContent().
while (stream != null)
while (true)
{
// The inputBuffer cannot be released immediately after parse()
// even if the buffer has been fully consumed because releaseInputBuffer()
Expand All @@ -236,7 +254,7 @@ void parseAndFill()
return;

// Check if the request was completed by the parsing.
if (stream == null || fillInputBuffer() <= 0)
if (stream.get() == null || fillInputBuffer() <= 0)
{
if (LOG.isDebugEnabled())
LOG.debug("parseAndFill completed the request by parsing {}", this);
Expand Down Expand Up @@ -280,6 +298,7 @@ private int fillInputBuffer()
@Override
protected boolean onReadTimeout(TimeoutException timeout)
{
HttpStreamOverFCGI stream = this.stream.get();
if (stream != null)
return stream.onIdleTimeout(timeout);
return true;
Expand Down Expand Up @@ -313,7 +332,7 @@ void onCompleted(Throwable failure)
@Override
public boolean onIdleExpired(TimeoutException timeoutException)
{
HttpStreamOverFCGI stream = this.stream;
HttpStreamOverFCGI stream = this.stream.get();
if (stream == null)
return true;
Runnable task = stream.getHttpChannel().onIdleTimeout(timeoutException);
Expand All @@ -328,12 +347,13 @@ private class ServerListener implements ServerParser.Listener
public void onStart(int request, FCGI.Role role, int flags)
{
// TODO: handle flags
if (stream != null)
if (stream.get() != null)
throw new UnsupportedOperationException("FastCGI Multiplexing");
HttpChannel channel = httpChannelFactory.newHttpChannel(ServerFCGIConnection.this);
ServerGenerator generator = new ServerGenerator(connector.getByteBufferPool(), isUseOutputDirectByteBuffers(), sendStatus200);
stream = new HttpStreamOverFCGI(ServerFCGIConnection.this, generator, channel, request);
HttpStreamOverFCGI stream = new HttpStreamOverFCGI(ServerFCGIConnection.this, generator, channel, request);
channel.setHttpStream(stream);
ServerFCGIConnection.this.stream.set(stream);
if (LOG.isDebugEnabled())
LOG.debug("Request {} start on {}", request, channel);
}
Expand All @@ -343,6 +363,7 @@ public void onHeader(int request, HttpField field)
{
if (LOG.isDebugEnabled())
LOG.debug("Request {} header {} on {}", request, field, stream);
HttpStreamOverFCGI stream = ServerFCGIConnection.this.stream.get();
if (stream != null)
stream.onHeader(field);
}
Expand All @@ -352,6 +373,7 @@ public boolean onHeaders(int request)
{
if (LOG.isDebugEnabled())
LOG.debug("Request {} headers on {}", request, stream);
HttpStreamOverFCGI stream = ServerFCGIConnection.this.stream.get();
if (stream != null)
{
stream.onHeaders();
Expand All @@ -367,6 +389,7 @@ public boolean onContent(int request, FCGI.StreamType streamType, ByteBuffer buf
{
if (LOG.isDebugEnabled())
LOG.debug("Request {} {} content {} on {}", request, streamType, buffer, stream);
HttpStreamOverFCGI stream = ServerFCGIConnection.this.stream.get();
if (stream != null)
{
// No need to call inputBuffer.retain() here.
Expand All @@ -384,27 +407,25 @@ public void onEnd(int request)
{
if (LOG.isDebugEnabled())
LOG.debug("Request {} end on {}", request, stream);
// Nulling out the stream signals that the
// request is complete, see also parseAndFill().
HttpStreamOverFCGI stream = ServerFCGIConnection.this.stream.getAndSet(null);
if (stream != null)
{
stream.onComplete();
// Nulling out the stream signals that the
// request is complete, see also parseAndFill().
stream = null;
}
}

@Override
public void onFailure(int request, Throwable failure)
{
if (LOG.isDebugEnabled())
LOG.debug("Request {} failure on {}: {}", request, stream, failure);
HttpStreamOverFCGI stream = ServerFCGIConnection.this.stream.getAndSet(null);
if (stream != null)
{
Runnable runnable = stream.getHttpChannel().onFailure(new BadMessageException(null, failure));
if (runnable != null)
getExecutor().execute(runnable);
}
stream = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

public abstract class AbstractHttpTest
Expand All @@ -42,12 +43,14 @@ public abstract class AbstractHttpTest
protected static Server server;
protected static ServerConnector connector;
private StacklessLogging stacklessChannelLogging;
private ArrayByteBufferPool.Tracking bufferPool;

@BeforeEach
public void setUp() throws Exception
{
server = new Server();
connector = new ServerConnector(server, null, null, new ArrayByteBufferPool(64, 2048, 64 * 1024), 1, 1, new HttpConnectionFactory());
bufferPool = new ArrayByteBufferPool.Tracking();
connector = new ServerConnector(server, null, null, bufferPool, 1, 1, new HttpConnectionFactory());
connector.setIdleTimeout(100000);

server.addConnector(connector);
Expand All @@ -57,8 +60,10 @@ public void setUp() throws Exception
@AfterEach
public void tearDown() throws Exception
{
Set<ArrayByteBufferPool.Tracking.Buffer> serverLeaks = bufferPool.getLeaks();
server.stop();
stacklessChannelLogging.close();
assertEquals(0, serverLeaks.size(), bufferPool.dumpLeaks());
}

protected HttpTester.Response executeRequest(HttpVersion httpVersion) throws URISyntaxException, IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ public void testInputStreamResponseListenerFailedBeforeResponse(Transport transp
@ParameterizedTest
@MethodSource("transports")
@Tag("DisableLeakTracking:client")
@Tag("DisableLeakTracking:server:FCGI")
public void testInputStreamContentProviderThrowingWhileReading(Transport transport) throws Exception
{
start(transport, new Handler.Abstract()
Expand Down Expand Up @@ -990,11 +989,7 @@ public void close() throws IOException

@ParameterizedTest
@MethodSource("transports")
@Tag("DisableLeakTracking:server:HTTP")
@Tag("DisableLeakTracking:server:HTTPS")
@Tag("DisableLeakTracking:H3")
@Tag("DisableLeakTracking:server:FCGI")
@Tag("DisableLeakTracking:server:UNIX_DOMAIN")
@Tag("DisableLeakTracking:client:H3")
public void testUploadWithConcurrentServerCloseClosesStream(Transport transport) throws Exception
{
CountDownLatch serverLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -1206,10 +1201,6 @@ public boolean handle(Request request, org.eclipse.jetty.server.Response respons

@ParameterizedTest
@MethodSource("transports")
@Tag("DisableLeakTracking:server:H2")
@Tag("DisableLeakTracking:server:H2C")
@Tag("DisableLeakTracking:server:H3")
@Tag("DisableLeakTracking:server:FCGI")
public void testHttpStreamConsumeAvailableUponClientAbort(Transport transport) throws Exception
{
AtomicReference<org.eclipse.jetty.client.Request> clientRequestRef = new AtomicReference<>();
Expand Down

0 comments on commit 484882b

Please sign in to comment.