diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpChannel.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpChannel.java index 5fef0ca8b8a7..b524ede09f9f 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpChannel.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/transport/HttpChannel.java @@ -165,7 +165,7 @@ public void abort(HttpExchange exchange, Throwable requestFailure, Throwable res else responsePromise.succeeded(false); - requestPromise.thenAcceptBoth(responsePromise, (requestAborted, responseAborted) -> promise.succeeded(requestAborted || responseAborted)); + promise.completeWith(requestPromise.thenCombine(responsePromise, (requestAborted, responseAborted) -> requestAborted || responseAborted)); } public void abortResponse(HttpExchange exchange, Throwable failure, Promise promise) diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java index f2394adb02f4..26e1125ec096 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/HttpClientTransportOverHTTP2.java @@ -30,6 +30,7 @@ import org.eclipse.jetty.client.Request; import org.eclipse.jetty.client.transport.HttpDestination; import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.client.HTTP2ClientConnectionFactory; @@ -168,9 +169,9 @@ public org.eclipse.jetty.io.Connection newConnection(EndPoint endPoint, Map context) } @Override - protected Connection newConnection(Destination destination, Session session) + protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection) { - return HttpClientTransportOverHTTP2.this.newConnection(destination, session); + return HttpClientTransportOverHTTP2.this.newConnection(destination, session, connection); } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java index e5f47edb5d1a..d3dc3150d447 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/ClientHTTP2StreamEndPoint.java @@ -15,12 +15,16 @@ import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.http2.ErrorCode; import org.eclipse.jetty.http2.HTTP2Channel; import org.eclipse.jetty.http2.HTTP2Stream; import org.eclipse.jetty.http2.HTTP2StreamEndPoint; +import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,26 +38,51 @@ public ClientHTTP2StreamEndPoint(HTTP2Stream stream) } @Override - public void onDataAvailable() + public Runnable onDataAvailable() { - processDataAvailable(); + // The InvocationType may change depending on the read callback. + return new Invocable.ReadyTask(getInvocationType(), this::processDataAvailable); } @Override - public void onTimeout(TimeoutException timeout, Promise promise) + public Runnable onReset(ResetFrame frame, Callback callback) + { + int error = frame.getError(); + EofException failure = new EofException(ErrorCode.toString(error, "error_code_" + error)); + return onFailure(failure, callback); + } + + @Override + public Runnable onTimeout(TimeoutException timeout, Promise promise) { if (LOG.isDebugEnabled()) LOG.debug("idle timeout on {}", this, timeout); Connection connection = getConnection(); - if (connection != null) - promise.succeeded(connection.onIdleExpired(timeout)); - else + if (connection == null) + { promise.succeeded(true); + return null; + } + return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + { + boolean expire = connection.onIdleExpired(timeout); + if (expire) + { + processFailure(timeout); + close(timeout); + } + promise.succeeded(expire); + }); } @Override - public void onFailure(Throwable failure, Callback callback) + public Runnable onFailure(Throwable failure, Callback callback) { - callback.failed(failure); + return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + { + processFailure(failure); + close(failure); + callback.failed(failure); + }); } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java index 0f6d98895555..54d2d5a7d7a6 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java @@ -21,6 +21,7 @@ import org.eclipse.jetty.client.Connection; import org.eclipse.jetty.client.Destination; import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.frames.GoAwayFrame; @@ -72,7 +73,8 @@ public void onSettings(Session session, SettingsFrame frame) private void onServerPreface(Session session) { - HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination(), session); + HTTP2Connection http2Connection = (HTTP2Connection)context.get(HTTP2Connection.class.getName()); + HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination(), session, http2Connection); if (this.connection.compareAndSet(null, connection, false, true)) { // The connection promise must be called synchronously @@ -82,9 +84,9 @@ private void onServerPreface(Session session) } } - protected Connection newConnection(Destination destination, Session session) + protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection) { - return new HttpConnectionOverHTTP2(destination, session); + return new HttpConnectionOverHTTP2(destination, session, connection); } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java index 8c8bae62a175..7e1991b9b777 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpChannelOverHTTP2.java @@ -197,29 +197,28 @@ public Stream.Listener onPush(Stream stream, PushPromiseFrame frame) public void onDataAvailable(Stream stream) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - channel.onDataAvailable(); + connection.offerTask(channel.onDataAvailable(), false); } @Override public void onReset(Stream stream, ResetFrame frame, Callback callback) { - // TODO: needs to call HTTP2Channel? - receiver.onReset(frame); - callback.succeeded(); + HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); + connection.offerTask(channel.onReset(frame, callback), false); } @Override public void onIdleTimeout(Stream stream, TimeoutException x, Promise promise) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - channel.onTimeout(x, promise); + connection.offerTask(channel.onTimeout(x, promise), false); } @Override public void onFailure(Stream stream, int error, String reason, Throwable failure, Callback callback) { HTTP2Channel.Client channel = (HTTP2Channel.Client)((HTTP2Stream)stream).getAttachment(); - channel.onFailure(failure, callback); + connection.offerTask(channel.onFailure(failure, callback), false); } } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java index 5cd0c0c54fa8..d7957e1e05b6 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; @@ -57,12 +58,14 @@ public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.S private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicInteger sweeps = new AtomicInteger(); private final Session session; + private final HTTP2Connection connection; private boolean recycleHttpChannels = true; - public HttpConnectionOverHTTP2(Destination destination, Session session) + public HttpConnectionOverHTTP2(Destination destination, Session session, HTTP2Connection connection) { super((HttpDestination)destination); this.session = session; + this.connection = connection; } public Session getSession() @@ -277,6 +280,12 @@ public boolean sweep() return sweeps.incrementAndGet() >= 4; } + void offerTask(Runnable task, boolean dispatch) + { + if (task != null) + connection.offerTask(task, dispatch); + } + @Override public String toString() { diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java index 73ddd6b139b4..d61cb8961fff 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java @@ -41,6 +41,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.Invocable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,6 +49,8 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. { private static final Logger LOG = LoggerFactory.getLogger(HttpReceiverOverHTTP2.class); + private final Runnable onDataAvailableTask = new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, this::responseContentAvailable); + public HttpReceiverOverHTTP2(HttpChannel channel) { super(channel); @@ -202,37 +205,49 @@ Stream.Listener onPush(Stream stream, PushPromiseFrame frame) } @Override - public void onDataAvailable() + public Runnable onDataAvailable() { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; - - responseContentAvailable(); + return null; + return onDataAvailableTask; } - void onReset(ResetFrame frame) + @Override + public Runnable onReset(ResetFrame frame, Callback callback) { HttpExchange exchange = getHttpExchange(); if (exchange == null) - return; - int error = frame.getError(); - exchange.getRequest().abort(new IOException(ErrorCode.toString(error, "reset_code_" + error))); + { + callback.succeeded(); + return null; + } + return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + { + int error = frame.getError(); + IOException failure = new IOException(ErrorCode.toString(error, "reset_code_" + error)); + callback.completeWith(exchange.getRequest().abort(failure)); + }); } @Override - public void onTimeout(TimeoutException failure, Promise promise) + public Runnable onTimeout(TimeoutException failure, Promise promise) { HttpExchange exchange = getHttpExchange(); - if (exchange != null) - exchange.abort(failure, Promise.from(aborted -> promise.succeeded(!aborted), promise::failed)); - else + if (exchange == null) + { promise.succeeded(false); + return null; + } + return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> + promise.completeWith(exchange.getRequest().abort(failure)) + ); } @Override - public void onFailure(Throwable failure, Callback callback) + public Runnable onFailure(Throwable failure, Callback callback) { - responseFailure(failure, Promise.from(failed -> callback.succeeded(), callback::failed)); + Promise promise = Promise.from(failed -> callback.succeeded(), callback::failed); + return new Invocable.ReadyTask(Invocable.InvocationType.NON_BLOCKING, () -> responseFailure(failure, promise)); } } diff --git a/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java b/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java index 9f1a8d1f8e17..048eab4d062e 100644 --- a/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java +++ b/jetty-core/jetty-http2/jetty-http2-client/src/main/java/org/eclipse/jetty/http2/client/HTTP2ClientConnectionFactory.java @@ -68,6 +68,7 @@ public Connection newConnection(EndPoint endPoint, Map context) session.setStreamIdleTimeout(streamIdleTimeout); HTTP2ClientConnection connection = new HTTP2ClientConnection(client, endPoint, session, sessionPromise, listener); + context.put(HTTP2Connection.class.getName(), connection); connection.addEventListener(connectionListener); parser.init(connection); diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java index 82c28fd73afa..ff4ef40c12e2 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Channel.java @@ -17,6 +17,7 @@ import java.util.function.BiConsumer; import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; @@ -33,11 +34,13 @@ public interface HTTP2Channel */ public interface Client { - public void onDataAvailable(); + public Runnable onDataAvailable(); + + public Runnable onReset(ResetFrame frame, Callback callback); - public void onTimeout(TimeoutException failure, Promise promise); + public Runnable onTimeout(TimeoutException failure, Promise promise); - public void onFailure(Throwable failure, Callback callback); + public Runnable onFailure(Throwable failure, Callback callback); } /** diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java index 20e6b36350c1..d3425e341dcd 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Connection.java @@ -187,7 +187,7 @@ public boolean onIdleExpired(TimeoutException timeoutException) return false; } - protected void offerTask(Runnable task, boolean dispatch) + public void offerTask(Runnable task, boolean dispatch) { offerTask(task); if (dispatch) diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java index 97d46f57db46..53bea9fab80a 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2StreamEndPoint.java @@ -497,6 +497,12 @@ private void process() callback.succeeded(); } + protected Invocable.InvocationType getInvocationType() + { + Callback callback = readCallback.get(); + return callback == null ? Invocable.InvocationType.NON_BLOCKING : callback.getInvocationType(); + } + @Override public String toString() { diff --git a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java index 8446dcea492b..7a2a9299636b 100644 --- a/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java +++ b/jetty-core/jetty-http2/jetty-http2-server/src/main/java/org/eclipse/jetty/http2/server/internal/HTTP2ServerConnection.java @@ -309,13 +309,6 @@ public boolean upgrade(Request request, HttpFields.Mutable responseFields) return true; } - // Overridden for visibility. - @Override - protected void offerTask(Runnable task, boolean dispatch) - { - super.offerTask(task, dispatch); - } - @Override public String getId() { diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java index 04e0e25b1779..0b22d3dd1e2a 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HttpClientTransportOverHTTP2Test.java @@ -48,6 +48,7 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http2.ErrorCode; +import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.RateControl; import org.eclipse.jetty.http2.api.Session; @@ -326,9 +327,9 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) httpClient = new HttpClient(new HttpClientTransportOverHTTP2(http2Client) { @Override - protected Connection newConnection(Destination destination, Session session) + protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection) { - return new HttpConnectionOverHTTP2(destination, session) + return new HttpConnectionOverHTTP2(destination, session, connection) { @Override protected HttpChannelOverHTTP2 newHttpChannel() @@ -520,10 +521,10 @@ public void testClientStopsServerDoesNotCloseClientCloses() throws Exception HttpClient client = new HttpClient(new HttpClientTransportOverHTTP2(h2Client) { @Override - protected Connection newConnection(Destination destination, Session session) + protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection) { sessions.add(session); - return super.newConnection(destination, session); + return super.newConnection(destination, session, connection); } }); QueuedThreadPool clientExecutor = new QueuedThreadPool(); diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpChannelAssociationTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpChannelAssociationTest.java index 676f68bb0a7f..620c1696e795 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpChannelAssociationTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpChannelAssociationTest.java @@ -29,6 +29,7 @@ import org.eclipse.jetty.fcgi.client.transport.HttpClientTransportOverFCGI; import org.eclipse.jetty.fcgi.client.transport.internal.HttpChannelOverFCGI; import org.eclipse.jetty.fcgi.client.transport.internal.HttpConnectionOverFCGI; +import org.eclipse.jetty.http2.HTTP2Connection; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.client.HTTP2Client; import org.eclipse.jetty.http2.client.transport.HttpClientTransportOverHTTP2; @@ -147,9 +148,9 @@ public boolean associate(HttpExchange exchange) yield new HttpClientTransportOverHTTP2(http2Client) { @Override - protected Connection newConnection(Destination destination, Session session) + protected Connection newConnection(Destination destination, Session session, HTTP2Connection connection) { - return new HttpConnectionOverHTTP2(destination, session) + return new HttpConnectionOverHTTP2(destination, session, connection) { @Override protected HttpChannelOverHTTP2 newHttpChannel()