diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java index 174643b7189b..93859d695d7c 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpReceiver.java @@ -315,13 +315,35 @@ protected void responseHeaders(HttpExchange exchange) * Method to be invoked when response content is available to be read. *

* This method takes care of ensuring the {@link Content.Source} passed to - * {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)} calls the - * demand callback. + * {@link Response.ContentSourceListener#onContentSource(Response, Content.Source)} + * calls the demand callback. + * The call to the demand callback is serialized with other events. */ - protected void responseContentAvailable() + protected void responseContentAvailable(HttpExchange exchange) { if (LOG.isDebugEnabled()) - LOG.debug("Response content available on {}", this); + LOG.debug("Invoking responseContentAvailable on {}", this); + + invoker.run(() -> + { + if (LOG.isDebugEnabled()) + LOG.debug("Executing responseContentAvailable on {}", this); + + if (exchange.isResponseCompleteOrTerminated()) + return; + + responseContentAvailable(); + }); + } + + /** + * Method to be invoked when response content is available to be read. + *

+ * This method directly invokes the demand callback, assuming the caller + * is already serialized with other events. + */ + protected void responseContentAvailable() + { contentSource.onDataAvailable(); } @@ -344,7 +366,7 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) if (!exchange.responseComplete(null)) return; - invoker.run(() -> + Runnable successTask = () -> { if (LOG.isDebugEnabled()) LOG.debug("Executing responseSuccess on {}", this); @@ -365,7 +387,12 @@ protected void responseSuccess(HttpExchange exchange, Runnable afterSuccessTask) // Mark atomically the response as terminated, with // respect to concurrency between request and response. terminateResponse(exchange); - }, afterSuccessTask); + }; + + if (afterSuccessTask == null) + invoker.run(successTask); + else + invoker.run(successTask, afterSuccessTask); } /** @@ -712,9 +739,9 @@ public void onDataAvailable() { if (LOG.isDebugEnabled()) LOG.debug("onDataAvailable on {}", this); - // The demandCallback will call read() that will itself call - // HttpReceiver.read(boolean) so it must be called by the invoker. - invokeDemandCallback(true); + // The onDataAvailable() method is only ever called + // by the invoker so avoid using the invoker again. + invokeDemandCallback(false); } @Override @@ -760,8 +787,8 @@ private void processDemand() } } - // The processDemand method is only ever called by the - // invoker so there is no need to use the latter here. + // The processDemand() method is only ever called + // by the invoker so avoid using the invoker again. invokeDemandCallback(false); } @@ -769,20 +796,19 @@ private void invokeDemandCallback(boolean invoke) { Runnable demandCallback = demandCallbackRef.getAndSet(null); if (LOG.isDebugEnabled()) - LOG.debug("Invoking demand callback on {}", this); - if (demandCallback != null) + LOG.debug("Invoking demand callback {} on {}", demandCallback, this); + if (demandCallback == null) + return; + try { - try - { - if (invoke) - invoker.run(demandCallback); - else - demandCallback.run(); - } - catch (Throwable x) - { - fail(x); - } + if (invoke) + invoker.run(demandCallback); + else + demandCallback.run(); + } + catch (Throwable x) + { + fail(x); } } diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java index c292b23a79ab..de6ff4973a3d 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/internal/HttpReceiverOverHTTP.java @@ -79,7 +79,9 @@ void receive() } else { - responseContentAvailable(); + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + responseContentAvailable(exchange); } } diff --git a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java index b904a8347c16..8c810e215acd 100644 --- a/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java +++ b/jetty-core/jetty-fcgi/jetty-fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/transport/internal/HttpReceiverOverFCGI.java @@ -40,7 +40,9 @@ void receive() } else { - responseContentAvailable(); + HttpExchange exchange = getHttpExchange(); + if (exchange != null) + responseContentAvailable(exchange); } } @@ -107,6 +109,9 @@ public void failAndClose(Throwable failure) void content(Content.Chunk chunk) { + HttpExchange exchange = getHttpExchange(); + if (exchange == null) + return; if (this.chunk != null) throw new IllegalStateException(); // Retain the chunk because it is stored for later reads. diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java index 45f34c30674f..bddb5d7396f0 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java @@ -50,8 +50,6 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class); - private final Runnable onDataAvailableTask = new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, this::responseContentAvailable); - public HttpReceiverOverHTTP2(HttpChannel channel) { super(channel); @@ -213,7 +211,7 @@ public Runnable onDataAvailable() HttpExchange exchange = getHttpExchange(); if (exchange == null) return null; - return onDataAvailableTask; + return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseContentAvailable(exchange)); } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java index 86ae453cee64..0173430314a3 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; @@ -33,6 +34,8 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.stream.IntStream; +import org.eclipse.jetty.client.AsyncRequestContent; +import org.eclipse.jetty.client.CompletableResponseListener; import org.eclipse.jetty.client.Connection; import org.eclipse.jetty.client.ContentResponse; import org.eclipse.jetty.client.Destination; @@ -793,6 +796,37 @@ public void onComplete(Result result) assertThat(onContentSourceErrorRef.get(), is(nullValue())); } + @Test + public void testRequestContentResponseContent() throws Exception + { + start(new Handler.Abstract() + { + @Override + public boolean handle(Request request, org.eclipse.jetty.server.Response response, Callback callback) + { + Content.copy(request, response, callback); + return true; + } + }); + + AsyncRequestContent content = new AsyncRequestContent(); + var request = httpClient.newRequest("localhost", connector.getLocalPort()) + .method(HttpMethod.POST) + .body(content); + CompletableFuture completable = new CompletableResponseListener(request).send(); + + for (int i = 0; i < 16; ++i) + { + content.write(false, ByteBuffer.allocate(512), Callback.NOOP); + Thread.sleep(10); + } + content.close(); + + ContentResponse response = completable.get(15, TimeUnit.SECONDS); + + assertEquals(HttpStatus.OK_200, response.getStatus()); + } + @Test @Tag("external") public void testExternalServer() throws Exception diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java index 594de886793c..660fad9b9bee 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java @@ -127,7 +127,7 @@ public void onDataAvailable(Stream.Client stream) if (exchange == null) return; - responseContentAvailable(); + responseContentAvailable(exchange); } @Override