Skip to content

Commit

Permalink
#10226 force completion of the handle's callback to fix a buffer leak
Browse files Browse the repository at this point in the history
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Mar 29, 2024
1 parent 0451bd4 commit e7af370
Showing 1 changed file with 57 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -990,79 +990,88 @@ public void close() throws IOException

@ParameterizedTest
@MethodSource("transports")
@Tag("DisableLeakTracking:server:HTTP")
@Tag("DisableLeakTracking:server:HTTPS")
@Tag("DisableLeakTracking:H3")
@Tag("DisableLeakTracking:server:FCGI")
@Tag("DisableLeakTracking:server:UNIX_DOMAIN")
@Tag("DisableLeakTracking:client:H3")
public void testUploadWithConcurrentServerCloseClosesStream(Transport transport) throws Exception
{
AtomicReference<Callback> callbackRef = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(1);
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
serverLatch.countDown();
// Do not complete the callback.
// Do not complete the callback, but store it aside for
// releasing the buffer later on.
callbackRef.set(callback);
return true;
}
});

AtomicBoolean commit = new AtomicBoolean();
CountDownLatch closeLatch = new CountDownLatch(1);
InputStream stream = new InputStream()
try
{
@Override
public int read() throws IOException
AtomicBoolean commit = new AtomicBoolean();
CountDownLatch closeLatch = new CountDownLatch(1);
InputStream stream = new InputStream()
{
// This method will be called few times before
// the request is committed.
// We wait for the request to commit, and we
// wait for the request to reach the server,
// to be sure that the server endPoint has
// been created, before stopping the connector.

if (commit.get())
@Override
public int read() throws IOException
{
try
// This method will be called few times before
// the request is committed.
// We wait for the request to commit, and we
// wait for the request to reach the server,
// to be sure that the server endPoint has
// been created, before stopping the connector.

if (commit.get())
{
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
connector.stop();
return 0;
try
{
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
connector.stop();
return 0;
}
catch (Throwable x)
{
throw new IOException(x);
}
}
catch (Throwable x)
else
{
throw new IOException(x);
return connector.isStopped() ? -1 : 0;
}
}
else

@Override
public void close() throws IOException
{
return connector.isStopped() ? -1 : 0;
super.close();
closeLatch.countDown();
}
}
};
InputStreamRequestContent content = new InputStreamRequestContent(stream, 1);

@Override
public void close() throws IOException
{
super.close();
closeLatch.countDown();
}
};
InputStreamRequestContent content = new InputStreamRequestContent(stream, 1);

CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest(newURI(transport))
.body(content)
.onRequestCommit(request -> commit.set(true))
.send(result ->
{
Assertions.assertTrue(result.isFailed());
completeLatch.countDown();
});
CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest(newURI(transport))
.body(content)
.onRequestCommit(request -> commit.set(true))
.send(result ->
{
Assertions.assertTrue(result.isFailed());
completeLatch.countDown();
});

assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
finally
{
// Release the buffer.
Callback callback = callbackRef.get();
if (callback != null)
callback.succeeded();
}
}

@ParameterizedTest
Expand Down

0 comments on commit e7af370

Please sign in to comment.