Skip to content

Commit

Permalink
#10226 for releasing the buffer and failing the stream in Connection.…
Browse files Browse the repository at this point in the history
…onClose()

Signed-off-by: Ludovic Orban <lorban@bitronix.be>
  • Loading branch information
lorban committed Mar 29, 2024
1 parent e7af370 commit dd75e74
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 56 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Content;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.server.AbstractMetaDataConnection;
import org.eclipse.jetty.server.ConnectionMetaData;
Expand Down Expand Up @@ -163,6 +164,20 @@ public void onOpen()
fillInterested();
}

@Override
public void onClose(Throwable cause)
{
if (networkBuffer != null)
{
networkBuffer.release();
networkBuffer = null;
}
if (stream != null)
stream.failed(new EofException());

super.onClose(cause);
}

@Override
public void onFillable()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,6 +623,15 @@ public void onOpen()
@Override
public void onClose(Throwable cause)
{
if (_retainableByteBuffer != null)
{
_retainableByteBuffer.release();
_retainableByteBuffer = null;
}
HttpStreamOverHTTP1 stream = _stream.get();
if (stream != null)
stream.failed(new EofException());

// TODO: do we really need to do this?
// This event is fired really late, sendCallback should already be failed at this point.
// Revisit whether we still need IteratingCallback.close().
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -993,85 +993,72 @@ public void close() throws IOException
@Tag("DisableLeakTracking:client:H3")
public void testUploadWithConcurrentServerCloseClosesStream(Transport transport) throws Exception
{
AtomicReference<Callback> callbackRef = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(1);
start(transport, new Handler.Abstract()
{
@Override
public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback)
{
serverLatch.countDown();
// Do not complete the callback, but store it aside for
// releasing the buffer later on.
callbackRef.set(callback);
// Do not complete the callback.
return true;
}
});

try
AtomicBoolean commit = new AtomicBoolean();
CountDownLatch closeLatch = new CountDownLatch(1);
InputStream stream = new InputStream()
{
AtomicBoolean commit = new AtomicBoolean();
CountDownLatch closeLatch = new CountDownLatch(1);
InputStream stream = new InputStream()
@Override
public int read() throws IOException
{
@Override
public int read() throws IOException
// This method will be called few times before
// the request is committed.
// We wait for the request to commit, and we
// wait for the request to reach the server,
// to be sure that the server endPoint has
// been created, before stopping the connector.

if (commit.get())
{
// This method will be called few times before
// the request is committed.
// We wait for the request to commit, and we
// wait for the request to reach the server,
// to be sure that the server endPoint has
// been created, before stopping the connector.

if (commit.get())
try
{
try
{
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
connector.stop();
return 0;
}
catch (Throwable x)
{
throw new IOException(x);
}
assertTrue(serverLatch.await(5, TimeUnit.SECONDS));
connector.stop();
return 0;
}
else
catch (Throwable x)
{
return connector.isStopped() ? -1 : 0;
throw new IOException(x);
}
}

@Override
public void close() throws IOException
else
{
super.close();
closeLatch.countDown();
return connector.isStopped() ? -1 : 0;
}
};
InputStreamRequestContent content = new InputStreamRequestContent(stream, 1);
}

CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest(newURI(transport))
.body(content)
.onRequestCommit(request -> commit.set(true))
.send(result ->
{
Assertions.assertTrue(result.isFailed());
completeLatch.countDown();
});
@Override
public void close() throws IOException
{
super.close();
closeLatch.countDown();
}
};
InputStreamRequestContent content = new InputStreamRequestContent(stream, 1);

assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}
finally
{
// Release the buffer.
Callback callback = callbackRef.get();
if (callback != null)
callback.succeeded();
}
CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest(newURI(transport))
.body(content)
.onRequestCommit(request -> commit.set(true))
.send(result ->
{
Assertions.assertTrue(result.isFailed());
completeLatch.countDown();
});

assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
assertTrue(closeLatch.await(5, TimeUnit.SECONDS));
}

@ParameterizedTest
Expand Down

0 comments on commit dd75e74

Please sign in to comment.