Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixes #5888 - Limit usage of HTTP/2 connections. #12401

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ protected Connection activate()

int maxUsage = getMaxUsage();
if (connection instanceof MaxUsable maxUsable)
maxUsage = maxUsable.getMaxUsage();
maxUsage = Math.min(maxUsage, maxUsable.getMaxUsage());
if (maxUsage > 0)
{
EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.MaxMultiplexable
public class HttpConnectionOverHTTP2 extends HttpConnection implements Sweeper.Sweepable, ConnectionPool.MaxMultiplexable, ConnectionPool.MaxUsable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP2.class);

Expand Down Expand Up @@ -108,6 +108,15 @@ public int getMaxMultiplex()
return ((HTTP2Session)session).getMaxLocalStreams();
}

@Override
public int getMaxUsage()
{
// SPEC: stream numbers can go up to 2^31-1;
// clients start at 1 and increment by 2, so a
// connection can only be used (2^31-2)/2 times.
return (Integer.MAX_VALUE - 1) / 2;
}
gregw marked this conversation as resolved.
Show resolved Hide resolved

@Override
protected Iterator<HttpChannel> getHttpChannels()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -836,13 +836,21 @@ protected HTTP2Stream createLocalStream(int streamId, MetaData.Request request,
}

HTTP2Stream stream = newStream(streamId, request, true);
if (streams.putIfAbsent(streamId, stream) == null)

HTTP2Stream newStream = streams.compute(streamId, (k, v) ->
{
stream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(stream);
if (v == null || v.isPlaceHolder())
gregw marked this conversation as resolved.
Show resolved Hide resolved
return stream;
return null;
});

if (newStream != null)
{
newStream.setIdleTimeout(getStreamIdleTimeout());
flowControl.onStreamCreated(newStream);
if (LOG.isDebugEnabled())
LOG.debug("Created local {} for {}", stream, this);
return stream;
LOG.debug("Created local {} for {}", newStream, this);
return newStream;
}
else
{
Expand Down Expand Up @@ -2111,16 +2119,30 @@ private int priority(PriorityFrame frame, Callback callback)
int streamId = reserveSlot(slot, currentStreamId, callback::failed);
if (streamId > 0)
{
if (currentStreamId <= 0)
HTTP2Stream stream;
if (currentStreamId > 0)
{
stream = streams.get(streamId);
}
else
{
frame = frame.withStreamId(streamId);
slot.entries = List.of(newEntry(frame, null, Callback.from(callback::succeeded, x ->
// Create a placeholder stream, replaced when a follow-up HEADERS frame will be sent.
stream = HTTP2Session.this.createLocalStream(streamId, null, callback::failed);
gregw marked this conversation as resolved.
Show resolved Hide resolved
}

if (stream != null)
{
HTTP2Session.this.onStreamDestroyed(streamId);
callback.failed(x);
})));
flush();
slot.entries = List.of(newEntry(frame, stream, Callback.from(callback::succeeded, x ->
{
HTTP2Session.this.onStreamDestroyed(streamId);
callback.failed(x);
})));
flush();
return streamId;
}
}
return streamId;
return 0;
}

private void newLocalStream(HTTP2Stream.FrameList frameList, Promise<Stream> promise, Stream.Listener listener)
Expand Down Expand Up @@ -2183,7 +2205,7 @@ private boolean newRemoteStream(int streamId)
private void push(PushPromiseFrame frame, Promise<Stream> promise, Stream.Listener listener)
{
Slot slot = new Slot();
int streamId = reserveSlot(slot, 0, promise::failed);
int streamId = reserveSlot(slot, frame.getPromisedStreamId(), promise::failed);
if (streamId > 0)
{
frame = frame.withStreamId(streamId);
Expand Down Expand Up @@ -2237,19 +2259,76 @@ private MetaData.Request extractMetaDataRequest(StreamFrame frame)

private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
{
if (streamId < 0 || (streamId > 0 && !isLocalStream(streamId)))
{
fail.accept(new IllegalArgumentException("invalid stream id " + streamId));
return 0;
}

boolean created = false;
int reservedStreamId = 0;
Throwable failure = null;
boolean reserved = false;
try (AutoLock ignored = lock.lock())
{
// SPEC: cannot create new streams after receiving a GOAWAY.
if (closed == CloseState.NOT_CLOSED)
{
if (streamId <= 0)
if (streamId == 0)
{
streamId = localStreamIds.getAndAdd(2);
reserved = true;
// Stream id generated internally.
reservedStreamId = localStreamIds.getAndAdd(2);
// Check for overflow.
if (reservedStreamId > 0)
gregw marked this conversation as resolved.
Show resolved Hide resolved
{
slots.offer(slot);
created = true;
}
else
{
failure = new IllegalStateException("max streams exceeded");
}
}
else
{
// Stream id is given.
while (true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a long maze of ifs and elses in a while loop that partially duplicates the logic of the enclosing if branch, and is quite involved to follow.

Can't this be simplified and de-duplicated?

{
int nextStreamId = localStreamIds.get();
if (nextStreamId > 0)
{
if (streamId >= nextStreamId)
{
int newNextStreamId = streamId + 2;
if (localStreamIds.compareAndSet(nextStreamId, newNextStreamId))
{
reservedStreamId = streamId;
slots.offer(slot);
created = true;
break;
}
}
else
{
if (streams.containsKey(streamId))
{
reservedStreamId = streamId;
slots.offer(slot);
}
else
{
failure = new IllegalArgumentException("invalid stream id " + streamId);
}
break;
}
}
else
{
reservedStreamId = nextStreamId;
failure = new IllegalStateException("max streams exceeded");
break;
}
}
}
slots.offer(slot);
}
else
{
Expand All @@ -2260,15 +2339,16 @@ private int reserveSlot(Slot slot, int streamId, Consumer<Throwable> fail)
}
if (failure == null)
{
if (reserved)
if (created)
HTTP2Session.this.onStreamCreated(streamId);
return streamId;
}
else
{
fail.accept(failure);
return 0;
if (reservedStreamId < 0)
close(ErrorCode.NO_ERROR.code, "max_streams_exceeded", Callback.NOOP);
}
return reservedStreamId;
}

private void freeSlot(Slot slot, int streamId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,11 @@ public int hashCode()
return streamId;
}

boolean isPlaceHolder()
{
return request == null;
}

@Override
public Object getAttachment()
{
Expand Down Expand Up @@ -959,7 +964,7 @@ public void dump(Appendable out, String indent) throws IOException
@Override
public String toString()
{
return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,attachment=%s}",
return String.format("%s#%d@%x{sendWindow=%s,recvWindow=%s,queue=%d,demand=%b,reset=%b/%b,%s,age=%d,request=%s,attachment=%s}",
getClass().getSimpleName(),
getId(),
session.hashCode(),
Expand All @@ -971,6 +976,7 @@ public String toString()
remoteReset,
closeState,
NanoTime.millisSince(creationNanoTime),
request,
attachment);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.eclipse.jetty.http2.frames.DataFrame;
import org.eclipse.jetty.http2.frames.GoAwayFrame;
import org.eclipse.jetty.http2.frames.HeadersFrame;
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
import org.eclipse.jetty.http2.frames.ResetFrame;
import org.eclipse.jetty.http2.frames.SettingsFrame;
import org.eclipse.jetty.http2.hpack.HpackException;
Expand Down Expand Up @@ -1139,6 +1140,113 @@ public void onClose(Session session, GoAwayFrame frame, Callback callback)
assertThat(failure.getMessage(), containsString("invalid_hpack_block"));
}

@Test
public void testClientCreatesStreamsWithExplicitStreamId() throws Exception
{
start(new ServerSessionListener() {});

Session session = newClientSession(new Session.Listener() {});

int evenStreamId = 128;
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(evenStreamId, request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

// Equivalent to Integer.MAX_VALUE + 2.
int negativeStreamId = Integer.MIN_VALUE + 1;
lorban marked this conversation as resolved.
Show resolved Hide resolved
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(negativeStreamId, request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

int explicitStreamId = 127;
Stream stream = session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);
assertThat(stream.getId(), equalTo(explicitStreamId));

stream = session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);
assertThat(stream.getId(), equalTo(explicitStreamId + 2));

// Cannot create streams with smaller id.
int smallerStreamId = explicitStreamId - 2;
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(smallerStreamId, request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

// Should be possible to create the stream with the max id.
explicitStreamId = Integer.MAX_VALUE;
session.newStream(new HeadersFrame(explicitStreamId, request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);

// After the stream with the max id, cannot create more streams on this connection.
assertThrows(ExecutionException.class, () -> session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));
// Session should be closed.
assertTrue(session.isClosed());
}

@Test
public void testServerPushesStreamsWithExplicitStreamId() throws Exception
{
CountDownLatch latch = new CountDownLatch(1);
start(new ServerSessionListener()
{
@Override
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
{
try
{
int oddStreamId = 129;
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), oddStreamId, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

int negativeStreamId = Integer.MIN_VALUE;
assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), negativeStreamId, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

int explicitStreamId = 128;
Stream pushedStream = stream.push(new PushPromiseFrame(stream.getId(), explicitStreamId, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);
assertThat(pushedStream.getId(), equalTo(explicitStreamId));

pushedStream = stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);
assertThat(pushedStream.getId(), equalTo(explicitStreamId + 2));

// Cannot push streams with smaller id.
int smallerStreamId = explicitStreamId - 2;
assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), smallerStreamId, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));

// Should be possible to push the stream with the max id.
explicitStreamId = Integer.MAX_VALUE - 1;
stream.push(new PushPromiseFrame(stream.getId(), explicitStreamId, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);

// After the stream with the max id, cannot push more streams on this connection.
assertThrows(ExecutionException.class, () -> stream.push(new PushPromiseFrame(stream.getId(), 0, request), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS));
// Session should be closed.
assertTrue(stream.getSession().isClosed());

latch.countDown();

return null;
}
catch (Throwable x)
{
throw new RuntimeException(x);
}
}
});

Session session = newClientSession(new Session.Listener() {});
MetaData.Request request = newRequest("GET", HttpFields.EMPTY);
session.newStream(new HeadersFrame(request, null, true), new Stream.Listener() {})
.get(5, TimeUnit.SECONDS);

assertTrue(latch.await(5, TimeUnit.SECONDS));
}

private static void sleep(long time)
{
try
Expand Down
Loading