Skip to content

Commit

Permalink
Updates from review.
Browse files Browse the repository at this point in the history
Introduced HTTP2Session.maxTotalLocalStreams to limit the max number of streams that might be created in a connection.

Linked this new property with the high-level AbstractConnectionPool.maxUsage.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
  • Loading branch information
sbordet committed Oct 30, 2024
1 parent 4332db1 commit 50b049a
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicMarkableReference;

import org.eclipse.jetty.client.AbstractConnectionPool;
import org.eclipse.jetty.client.Connection;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.Destination;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.http2.HTTP2Connection;
Expand Down Expand Up @@ -73,10 +75,15 @@ public void onSettings(Session session, SettingsFrame frame)

private void onServerPreface(Session session)
{
Destination destination = destination();
HTTP2Connection http2Connection = (HTTP2Connection)context.get(HTTP2Connection.class.getName());
HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination(), session, http2Connection);
HttpConnectionOverHTTP2 connection = (HttpConnectionOverHTTP2)newConnection(destination, session, http2Connection);
if (this.connection.compareAndSet(null, connection, false, true))
{
ConnectionPool connectionPool = destination.getConnectionPool();
if (connectionPool instanceof AbstractConnectionPool pool)
connection.setMaxUsage(pool.getMaxUsage());

// The connection promise must be called synchronously
// so that the HTTP/1 to HTTP/2 upgrade can create the
// HTTP/2 stream that represents the HTTP/1 request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,12 @@ public int getMaxMultiplex()
@Override
public int getMaxUsage()
{
// SPEC: stream numbers can go up to 2^31-1;
// clients start at 1 and increment by 2, so a
// connection can only be used (2^31-2)/2 times.
return (Integer.MAX_VALUE - 1) / 2;
return ((HTTP2Session)session).getMaxTotalLocalStreams();
}

void setMaxUsage(int maxUsage)
{
((HTTP2Session)session).setMaxTotalLocalStreams(maxUsage);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@
public abstract class HTTP2Session extends ContainerLifeCycle implements Session, Parser.Listener
{
private static final Logger LOG = LoggerFactory.getLogger(HTTP2Session.class);
// SPEC: stream numbers can go up to 2^31-1, but increment by 2.
private static final int MAX_TOTAL_LOCAL_STREAMS = Integer.MAX_VALUE / 2;

private final Map<Integer, HTTP2Stream> streams = new ConcurrentHashMap<>();
private final Set<Integer> priorityStreams = ConcurrentHashMap.newKeySet();
Expand All @@ -95,6 +97,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicLong bytesWritten = new AtomicLong();
private final AtomicInteger totalLocalStreams = new AtomicInteger();
private final EndPoint endPoint;
private final Parser parser;
private final Generator generator;
Expand All @@ -104,6 +107,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements Session
private final StreamTimeouts streamTimeouts;
private int maxLocalStreams;
private int maxRemoteStreams;
private int maxTotalLocalStreams;
private long streamIdleTimeout;
private int initialSessionRecvWindow;
private int writeThreshold;
Expand All @@ -122,6 +126,7 @@ public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Parser parser, Gener
this.streamTimeouts = new StreamTimeouts(scheduler);
this.maxLocalStreams = -1;
this.maxRemoteStreams = -1;
this.maxTotalLocalStreams = MAX_TOTAL_LOCAL_STREAMS;
this.localStreamIds.set(initialStreamId);
this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE);
Expand Down Expand Up @@ -167,6 +172,20 @@ public void setMaxLocalStreams(int maxLocalStreams)
this.maxLocalStreams = maxLocalStreams;
}

@ManagedAttribute("The maximum number of local streams that can be opened")
public int getMaxTotalLocalStreams()
{
return maxTotalLocalStreams;
}

public void setMaxTotalLocalStreams(int maxTotalLocalStreams)
{
if (maxTotalLocalStreams > MAX_TOTAL_LOCAL_STREAMS)
throw new IllegalArgumentException("Invalid max total local streams " + maxTotalLocalStreams);
if (maxTotalLocalStreams > 0)
this.maxTotalLocalStreams = maxTotalLocalStreams;
}

@ManagedAttribute("The maximum number of concurrent remote streams")
public int getMaxRemoteStreams()
{
Expand Down Expand Up @@ -2137,19 +2156,19 @@ private void newLocalStream(HTTP2Stream.FrameList frameList, Promise<Stream> pro
Slot slot = new Slot();
int currentStreamId = frameList.getStreamId();
int streamId = reserveSlot(slot, currentStreamId, promise::failed);
if (streamId > 0)
if (streamId <= 0)
return;

List<StreamFrame> frames = frameList.getFrames();
if (currentStreamId <= 0)
{
List<StreamFrame> frames = frameList.getFrames();
if (currentStreamId <= 0)
{
frames = frames.stream()
.map(frame -> frame.withStreamId(streamId))
.collect(Collectors.toList());
}
if (createLocalStream(slot, frames, promise, listener, streamId))
return;
freeSlot(slot, streamId);
frames = frames.stream()
.map(frame -> frame.withStreamId(streamId))
.collect(Collectors.toList());
}
if (createLocalStream(slot, frames, promise, listener, streamId))
return;
freeSlot(slot, streamId);
}

private Stream newUpgradeStream(HeadersFrame frame, Stream.Listener listener, Consumer<Throwable> failFn)
Expand Down Expand Up @@ -2193,13 +2212,13 @@ private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listen
{
Slot slot = new Slot();
int streamId = reserveSlot(slot, frame.getPromisedStreamId(), promise::failed);
if (streamId > 0)
{
frame = frame.withStreamId(streamId);
if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId))
return;
freeSlot(slot, streamId);
}
if (streamId <= 0)
return;

frame = frame.withStreamId(streamId);
if (createLocalStream(slot, Collections.singletonList(frame), promise, listener, streamId))
return;
freeSlot(slot, streamId);
}

private boolean createLocalStream(Slot slot, List<StreamFrame> frames, Promise<Stream> promise, Stream.Listener listener, int streamId)
Expand Down Expand Up @@ -2252,6 +2271,8 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
return 0;
}

int maxTotal = getMaxTotalLocalStreams();

boolean created = false;
int reservedStreamId = 0;
Throwable failure = null;
Expand All @@ -2262,17 +2283,31 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
{
if (streamId == 0)
{
// Stream id generated internally.
reservedStreamId = localStreamIds.getAndAdd(2);
// Check for overflow.
if (reservedStreamId > 0)
int total = totalLocalStreams.updateAndGet(v ->
{
slots.offer(slot);
created = true;
if (v <= maxTotal)
return v + 1;
return v;
});
if (total <= maxTotal)
{
// Stream id generated internally.
reservedStreamId = localStreamIds.getAndAdd(2);
if (reservedStreamId > 0)
{
slots.offer(slot);
created = true;
}
else
{
totalLocalStreams.decrementAndGet();
failure = new IllegalStateException("max stream id exceeded");
}
}
else
{
failure = new IllegalStateException("max streams exceeded");
totalLocalStreams.decrementAndGet();
failure = new IllegalStateException("max total streams exceeded");
}
}
else
Expand All @@ -2285,12 +2320,33 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
{
if (streamId >= nextStreamId)
{
int newNextStreamId = streamId + 2;
if (localStreamIds.compareAndSet(nextStreamId, newNextStreamId))
int total = totalLocalStreams.updateAndGet(v ->
{
reservedStreamId = streamId;
slots.offer(slot);
created = true;
if (v <= maxTotal)
return v + 1;
return v;
});
if (total <= maxTotal)
{
// This may overflow, but it's ok as the current streamId
// is valid; it is the next streamId that will be invalid.
int newNextStreamId = streamId + 2;
if (localStreamIds.compareAndSet(nextStreamId, newNextStreamId))
{
reservedStreamId = streamId;
slots.offer(slot);
created = true;
break;
}
else
{
totalLocalStreams.decrementAndGet();
}
}
else
{
totalLocalStreams.decrementAndGet();
failure = new IllegalStateException("max total streams exceeded");
break;
}
}
Expand All @@ -2311,7 +2367,7 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
else
{
reservedStreamId = nextStreamId;
failure = new IllegalStateException("max streams exceeded");
failure = new IllegalStateException("max stream id exceeded");
break;
}
}
Expand All @@ -2332,8 +2388,6 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
else
{
fail.accept(failure);
if (reservedStreamId < 0)
close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP);
}
return reservedStreamId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,50 @@ public void onClose(Session session, GoAwayFrame frame, Callback callback)
assertThat(failure.getMessage(), containsString("invalid_hpack_block"));
}

@Test
public void testClientExceedsConnectionMaxUsage() throws Exception
{
start(new ServerSessionListener() {});

Session session = newClientSession(new Session.Listener() {});
((HTTP2Session)session).setMaxTotalLocalStreams(1);

MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {});

// Must not be able to create more streams than allowed.
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

// Must not be able to create more streams than allowed with an explicit streamId.
int explicitStreamId = Integer.MAX_VALUE;
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

// Session must still be valid.
assertFalse(session.isClosed());
}

@Test
public void testClientExceedsMaxStreamId() throws Exception
{
start(new ServerSessionListener() {});

Session session = newClientSession(new Session.Listener() {});

// Use the max possible streamId.
int explicitStreamId = Integer.MAX_VALUE;
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {});

// Must not be able to create more streams.
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

// Session must still be valid.
assertFalse(session.isClosed());
}

@Test
public void testClientCreatesStreamsWithExplicitStreamId() throws Exception
{
Expand Down Expand Up @@ -1179,8 +1223,8 @@ public void testClientCreatesStreamsWithExplicitStreamId() throws Exception
// After the stream with the max id, cannot create more streams on this connection.
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));
// Session should be closed.
assertTrue(session.isClosed());
// Session must still be valid.
assertFalse(session.isClosed());
}

@Test
Expand Down Expand Up @@ -1225,8 +1269,8 @@ public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
// After the stream with the max id, cannot push more streams on this connection.
assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));
// Session should be closed.
assertTrue(stream.getSession().isClosed());
// Session must still be valid.
assertFalse(stream.getSession().isClosed());

latch.countDown();

Expand Down

0 comments on commit 50b049a

Please sign in to comment.