From 50b049a41441953553e2981795c682bb1c45a721 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 30 Oct 2024 18:07:33 +0100 Subject: [PATCH] Updates from review. Introduced HTTP2Session.maxTotalLocalStreams to limit the max number of streams that might be created in a connection. Linked this new property with the high-level AbstractConnectionPool.maxUsage. Signed-off-by: Simone Bordet --- .../internal/HTTPSessionListenerPromise.java | 9 +- .../internal/HttpConnectionOverHTTP2.java | 10 +- .../org/eclipse/jetty/http2/HTTP2Session.java | 120 +++++++++++++----- .../eclipse/jetty/http2/tests/HTTP2Test.java | 52 +++++++- 4 files changed, 149 insertions(+), 42 deletions(-) diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java index 54d2d5a7d7a6..2d9e50d6a02f 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HTTPSessionListenerPromise.java @@ -18,7 +18,9 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicMarkableReference; +import org.eclipse.jetty.client.AbstractConnectionPool; import org.eclipse.jetty.client.Connection; +import org.eclipse.jetty.client.ConnectionPool; import org.eclipse.jetty.client.Destination; import org.eclipse.jetty.client.HttpClientTransport; import org.eclipse.jetty.http2.HTTP2Connection; @@ -73,10 +75,15 @@ public void onSettings(Session session, SettingsFrame frame) private void onServerPreface(Session session) { + Destination destination = destination(); HTTP2Connection http2Connection = (HTTP2Connection)context.get(HTTP2Connection.class.getName()); - HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination(), session, http2Connection); + HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination, session, http2Connection); if (this.connection.compareAndSet(null, connection, false, true)) { + ConnectionPool connectionPool = destination.getConnectionPool(); + if (connectionPool instanceof AbstractConnectionPool pool) + connection.setMaxUsage(pool.getMaxUsage()); + // The connection promise must be called synchronously // so that the HTTP/1 to HTTP/2 upgrade can create the // HTTP/2 stream that represents the HTTP/1 request. diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java index a0c18e52910f..15e77d308b9d 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpConnectionOverHTTP2.java @@ -111,10 +111,12 @@ public int getMaxMultiplex() @Override public int getMaxUsage() { - // SPEC: stream numbers can go up to 2^31-1; - // clients start at 1 and increment by 2, so a - // connection can only be used (2^31-2)/2 times. - return (Integer.MAX_VALUE - 1) / 2; + return ((HTTP2Session)session).getMaxTotalLocalStreams(); + } + + void setMaxUsage(int maxUsage) + { + ((HTTP2Session)session).setMaxTotalLocalStreams(maxUsage); } @Override diff --git a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 7fed70a9a27c..08138a39cb40 100644 --- a/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-core/jetty-http2/jetty-http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -82,6 +82,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session, Parser.Listener { private static final Logger LOG = LoggerFactory.getLogger(HTTP2Session.class); + // SPEC: stream numbers can go up to 2^31-1, but increment by 2. + private static final int MAX_TOTAL_LOCAL_STREAMS = Integer.MAX_VALUE / 2; private final Map streams = new ConcurrentHashMap<>(); private final Set priorityStreams = ConcurrentHashMap.newKeySet(); @@ -95,6 +97,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session private final AtomicInteger sendWindow = new AtomicInteger(); private final AtomicInteger recvWindow = new AtomicInteger(); private final AtomicLong bytesWritten = new AtomicLong(); + private final AtomicInteger totalLocalStreams = new AtomicInteger(); private final EndPoint endPoint; private final Parser parser; private final Generator generator; @@ -104,6 +107,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session private final StreamTimeouts streamTimeouts; private int maxLocalStreams; private int maxRemoteStreams; + private int maxTotalLocalStreams; private long streamIdleTimeout; private int initialSessionRecvWindow; private int writeThreshold; @@ -122,6 +126,7 @@ public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Parser parser, Gener this.streamTimeouts = new StreamTimeouts(scheduler); this.maxLocalStreams = -1; this.maxRemoteStreams = -1; + this.maxTotalLocalStreams = MAX_TOTAL_LOCAL_STREAMS; this.localStreamIds.set(initialStreamId); this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); @@ -167,6 +172,20 @@ public void setMaxLocalStreams(int maxLocalStreams) this.maxLocalStreams = maxLocalStreams; } + @ManagedAttribute("The maximum number of local streams that can be opened") + public int getMaxTotalLocalStreams() + { + return maxTotalLocalStreams; + } + + public void setMaxTotalLocalStreams(int maxTotalLocalStreams) + { + if (maxTotalLocalStreams > MAX_TOTAL_LOCAL_STREAMS) + throw new IllegalArgumentException("Invalid max total local streams " + maxTotalLocalStreams); + if (maxTotalLocalStreams > 0) + this.maxTotalLocalStreams = maxTotalLocalStreams; + } + @ManagedAttribute("The maximum number of concurrent remote streams") public int getMaxRemoteStreams() { @@ -2137,19 +2156,19 @@ private void newLocalStream(HTTP2Stream.FrameList frameList, Promise pro Slot slot = new Slot(); int currentStreamId = frameList.getStreamId(); int streamId = reserveSlot(slot, currentStreamId, promise::failed); - if (streamId > 0) + if (streamId <= 0) + return; + + List frames = frameList.getFrames(); + if (currentStreamId <= 0) { - List frames = frameList.getFrames(); - if (currentStreamId <= 0) - { - frames = frames.stream() - .map(frame -> frame.withStreamId(streamId)) - .collect(Collectors.toList()); - } - if (createLocalStream(slot, frames, promise, listener, streamId)) - return; - freeSlot(slot, streamId); + frames = frames.stream() + .map(frame -> frame.withStreamId(streamId)) + .collect(Collectors.toList()); } + if (createLocalStream(slot, frames, promise, listener, streamId)) + return; + freeSlot(slot, streamId); } private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer failFn) @@ -2193,13 +2212,13 @@ private void push(PushPromiseFrame frame, Promise promise, Stream.Listen { Slot slot = new Slot(); int streamId = reserveSlot(slot, frame.getPromisedStreamId(), promise::failed); - if (streamId > 0) - { - frame = frame.withStreamId(streamId); - if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId)) - return; - freeSlot(slot, streamId); - } + if (streamId <= 0) + return; + + frame = frame.withStreamId(streamId); + if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId)) + return; + freeSlot(slot, streamId); } private boolean createLocalStream(Slot slot, List frames, Promise promise, Stream.Listener listener, int streamId) @@ -2252,6 +2271,8 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) return 0; } + int maxTotal = getMaxTotalLocalStreams(); + boolean created = false; int reservedStreamId = 0; Throwable failure = null; @@ -2262,17 +2283,31 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) { if (streamId == 0) { - // Stream id generated internally. - reservedStreamId = localStreamIds.getAndAdd(2); - // Check for overflow. - if (reservedStreamId > 0) + int total = totalLocalStreams.updateAndGet(v -> { - slots.offer(slot); - created = true; + if (v <= maxTotal) + return v + 1; + return v; + }); + if (total <= maxTotal) + { + // Stream id generated internally. + reservedStreamId = localStreamIds.getAndAdd(2); + if (reservedStreamId > 0) + { + slots.offer(slot); + created = true; + } + else + { + totalLocalStreams.decrementAndGet(); + failure = new IllegalStateException("max stream id exceeded"); + } } else { - failure = new IllegalStateException("max streams exceeded"); + totalLocalStreams.decrementAndGet(); + failure = new IllegalStateException("max total streams exceeded"); } } else @@ -2285,12 +2320,33 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) { if (streamId >= nextStreamId) { - int newNextStreamId = streamId + 2; - if (localStreamIds.compareAndSet(nextStreamId, newNextStreamId)) + int total = totalLocalStreams.updateAndGet(v -> { - reservedStreamId = streamId; - slots.offer(slot); - created = true; + if (v <= maxTotal) + return v + 1; + return v; + }); + if (total <= maxTotal) + { + // This may overflow, but it's ok as the current streamId + // is valid; it is the next streamId that will be invalid. + int newNextStreamId = streamId + 2; + if (localStreamIds.compareAndSet(nextStreamId, newNextStreamId)) + { + reservedStreamId = streamId; + slots.offer(slot); + created = true; + break; + } + else + { + totalLocalStreams.decrementAndGet(); + } + } + else + { + totalLocalStreams.decrementAndGet(); + failure = new IllegalStateException("max total streams exceeded"); break; } } @@ -2311,7 +2367,7 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) else { reservedStreamId = nextStreamId; - failure = new IllegalStateException("max streams exceeded"); + failure = new IllegalStateException("max stream id exceeded"); break; } } @@ -2332,8 +2388,6 @@ private int reserveSlot(Slot slot, int streamId, Consumer fail) else { fail.accept(failure); - if (reservedStreamId < 0) - close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP); } return reservedStreamId; } diff --git a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java index 8360a0295e31..049e517ffd61 100644 --- a/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java +++ b/jetty-core/jetty-http2/jetty-http2-tests/src/test/java/org/eclipse/jetty/http2/tests/HTTP2Test.java @@ -1140,6 +1140,50 @@ public void onClose(Session session, GoAwayFrame frame, Callback callback) assertThat(failure.getMessage(), containsString("invalid_hpack_block")); } + @Test + public void testClientExceedsConnectionMaxUsage() throws Exception + { + start(new ServerSessionListener() {}); + + Session session = newClientSession(new Session.Listener() {}); + ((HTTP2Session)session).setMaxTotalLocalStreams(1); + + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}); + + // Must not be able to create more streams than allowed. + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Must not be able to create more streams than allowed with an explicit streamId. + int explicitStreamId = Integer.MAX_VALUE; + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Session must still be valid. + assertFalse(session.isClosed()); + } + + @Test + public void testClientExceedsMaxStreamId() throws Exception + { + start(new ServerSessionListener() {}); + + Session session = newClientSession(new Session.Listener() {}); + + // Use the max possible streamId. + int explicitStreamId = Integer.MAX_VALUE; + MetaData.Request request = newRequest("GET", HttpFields.EMPTY); + session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {}); + + // Must not be able to create more streams. + assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) + .get(5, TimeUnit.SECONDS)); + + // Session must still be valid. + assertFalse(session.isClosed()); + } + @Test public void testClientCreatesStreamsWithExplicitStreamId() throws Exception { @@ -1179,8 +1223,8 @@ public void testClientCreatesStreamsWithExplicitStreamId() throws Exception // After the stream with the max id, cannot create more streams on this connection. assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {}) .get(5, TimeUnit.SECONDS)); - // Session should be closed. - assertTrue(session.isClosed()); + // Session must still be valid. + assertFalse(session.isClosed()); } @Test @@ -1225,8 +1269,8 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) // After the stream with the max id, cannot push more streams on this connection. assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Stream.Listener() {}) .get(5, TimeUnit.SECONDS)); - // Session should be closed. - assertTrue(stream.getSession().isClosed()); + // Session must still be valid. + assertFalse(stream.getSession().isClosed()); latch.countDown();