Skip to content

Commit

Permalink
Optimize encoding of HTTP/2 frames to write less on the wire (#1518)
Browse files Browse the repository at this point in the history
Motivation:

We can infer from the `PayloadInfo` when HTTP message does not contain
payload body or trailers. In this case, we can write it in a single frame.
Also, we do not need to write `Http2DataFrame`(s) for empty `Buffer`(s)
or process `HttpHeaders` when it's empty.

Modifications:

- Detect if the `HttpMetaData` can be written in a single frame;
- Emit `CloseHandler#protocolPayloadEndOutbound` on `endStream`;
- Ignore empty `Buffer`(s) when writing `Http2DataFrame`(s);
- Do not process `HttpHeaders` when they are empty;
- Add tests for new behavior;

Result:

Less `Http2StreamFrame`(s) allocated and written on the wire, results in
+10% RPS, -12% latency for some HTTP/2 use-cases.
  • Loading branch information
idelpivnitskiy authored May 5, 2021
1 parent 2bc89b5 commit edc0e6d
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ public static ServiceAdapterHolder toStreamingHttpService(BlockingHttpService se
* @return {@code true} is the request/response payload body is empty, {@code false} otherwise.
*/
public static boolean isPayloadEmpty(HttpMetaData metadata) {
return (metadata instanceof PayloadInfo && ((PayloadInfo) metadata).isEmpty());
return metadata instanceof PayloadInfo && ((PayloadInfo) metadata).isEmpty();
}

/**
Expand All @@ -201,7 +201,7 @@ public static boolean isPayloadEmpty(HttpMetaData metadata) {
* @return {@code true} is the request/response payload is safe to aggregate, {@code false} otherwise.
*/
public static boolean isSafeToAggregate(HttpMetaData metadata) {
return (metadata instanceof PayloadInfo && ((PayloadInfo) metadata).isSafeToAggregate());
return metadata instanceof PayloadInfo && ((PayloadInfo) metadata).isSafeToAggregate();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.servicetalk.buffer.api.BufferAllocator;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpMetaData;
import io.servicetalk.transport.api.ConnectionObserver.StreamObserver;
import io.servicetalk.transport.netty.internal.CloseHandler;

Expand All @@ -32,21 +33,28 @@
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2ResetFrame;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import static io.netty.buffer.Unpooled.EMPTY_BUFFER;
import static io.servicetalk.buffer.netty.BufferUtils.toByteBuf;
import static io.servicetalk.http.api.HttpApiConversions.isPayloadEmpty;
import static io.servicetalk.http.api.HttpApiConversions.mayHaveTrailers;
import static io.servicetalk.http.netty.H2ToStH1Utils.h1HeadersToH2Headers;
import static io.servicetalk.http.netty.Http2Exception.newStreamResetException;
import static io.servicetalk.http.netty.HttpObjectEncoder.encodeAndRetain;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.channelError;

abstract class AbstractH2DuplexHandler extends ChannelDuplexHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(AbstractH2DuplexHandler.class);

final BufferAllocator allocator;
final HttpHeadersFactory headersFactory;
final CloseHandler closeHandler;
private final StreamObserver observer;
private boolean endStream;

AbstractH2DuplexHandler(BufferAllocator allocator, HttpHeadersFactory headersFactory, CloseHandler closeHandler,
StreamObserver observer) {
Expand All @@ -65,18 +73,51 @@ public final void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
}
}

final void writeMetaData(ChannelHandlerContext ctx, HttpMetaData metaData, Http2Headers h2Headers,
ChannelPromise promise) {
endStream = !mayHaveTrailers(metaData) && isPayloadEmpty(metaData);
if (endStream) {
closeHandler.protocolPayloadEndOutbound(ctx, promise);
}
ctx.write(new DefaultHttp2HeadersFrame(h2Headers, endStream), promise);
}

static void writeBuffer(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
ctx.write(new DefaultHttp2DataFrame(encodeAndRetain((Buffer) msg), false), promise);
Buffer buffer = (Buffer) msg;
if (buffer.readableBytes() > 0) {
ctx.write(new DefaultHttp2DataFrame(encodeAndRetain(buffer), false), promise);
} else {
promise.setSuccess();
}
}

final void writeTrailers(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
HttpHeaders trailers = (HttpHeaders) msg;
if (endStream) {
promise.setSuccess();
if (!trailers.isEmpty()) {
LOGGER.warn("{} Received unexpected non-empty trailers while endStream was already sent: {}",
ctx.channel(), trailers);
}
return;
}

closeHandler.protocolPayloadEndOutbound(ctx, promise);
Http2Headers h2Headers = h1HeadersToH2Headers((HttpHeaders) msg);
if (h2Headers.isEmpty()) {
ctx.write(new DefaultHttp2DataFrame(EMPTY_BUFFER, true), promise);
if (trailers.isEmpty()) {
writeEmptyEndStream(ctx, promise);
} else {
ctx.write(new DefaultHttp2HeadersFrame(h2Headers, true), promise);
Http2Headers h2Headers = h1HeadersToH2Headers(trailers);
if (h2Headers.isEmpty()) {
writeEmptyEndStream(ctx, promise);
} else {
ctx.write(new DefaultHttp2HeadersFrame(h2Headers, true), promise);
}
}
endStream = true;
}

private static void writeEmptyEndStream(ChannelHandlerContext ctx, ChannelPromise promise) {
ctx.write(new DefaultHttp2DataFrame(true), promise);
}

final void readDataFrame(ChannelHandlerContext ctx, Object msg) {
Expand Down Expand Up @@ -116,7 +157,7 @@ private static Http2DataFrame release(Http2DataFrame dataFrame) {
}

@Override
public void channelInactive(final ChannelHandlerContext ctx) {
public final void channelInactive(final ChannelHandlerContext ctx) {
final Throwable t = channelError(ctx.channel());
if (t == null) {
observer.streamClosed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.HttpScheme;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
h2Headers.scheme(scheme.name());
h2Headers.path(metaData.requestTarget());
}
ctx.write(new DefaultHttp2HeadersFrame(h2Headers, false), promise);
writeMetaData(ctx, metaData, h2Headers, promise);
} else if (msg instanceof Buffer) {
writeBuffer(ctx, msg, promise);
} else if (msg instanceof HttpHeaders) {
Expand All @@ -109,7 +108,7 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
throw new IllegalArgumentException("a response must have " + STATUS + " header");
}
httpStatus = HttpResponseStatus.of(status);
if (httpStatus.statusClass().equals(INFORMATIONAL_1XX)) {
if (httpStatus.statusClass() == INFORMATIONAL_1XX) {
// We don't expose 1xx "interim responses" [2] to the user, and discard them to make way for the
// "real" response.
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
Expand Down Expand Up @@ -64,7 +63,7 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
HttpResponseMetaData metaData = (HttpResponseMetaData) msg;
Http2Headers h2Headers = h1HeadersToH2Headers(metaData.headers());
h2Headers.status(metaData.status().codeAsCharSequence());
ctx.write(new DefaultHttp2HeadersFrame(h2Headers, false), promise);
writeMetaData(ctx, metaData, h2Headers, promise);
} else if (msg instanceof Buffer) {
writeBuffer(ctx, msg, promise);
} else if (msg instanceof HttpHeaders) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,13 @@ static void h1HeadersSplitCookieCrumbs(HttpHeaders h1Headers) {
}

static Http2Headers h1HeadersToH2Headers(HttpHeaders h1Headers) {
if (h1Headers.isEmpty()) {
if (h1Headers instanceof NettyH2HeadersToHttpHeaders) {
return ((NettyH2HeadersToHttpHeaders) h1Headers).nettyHeaders();
}
return new DefaultHttp2Headers(false, 0);
}

// H2 doesn't support connection headers, so remove each one, and the headers corresponding to the
// connection value.
// https://tools.ietf.org/html/rfc7540#section-8.1.2.2
Expand Down Expand Up @@ -166,6 +173,10 @@ static Http2Headers h1HeadersToH2Headers(HttpHeaders h1Headers) {
return ((NettyH2HeadersToHttpHeaders) h1Headers).nettyHeaders();
}

if (h1Headers.isEmpty()) {
return new DefaultHttp2Headers(false, 0);
}

DefaultHttp2Headers http2Headers = new DefaultHttp2Headers(false);
for (Map.Entry<CharSequence, CharSequence> h1Entry : h1Headers) {
// header field names MUST be converted to lowercase prior to their encoding in HTTP/2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@

import io.servicetalk.buffer.api.Buffer;
import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
import io.servicetalk.http.api.DefaultHttpHeadersFactory;
import io.servicetalk.http.api.EmptyHttpHeaders;
import io.servicetalk.http.api.HttpHeaders;
import io.servicetalk.http.api.HttpHeadersFactory;
import io.servicetalk.http.api.HttpMetaData;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopStreamObserver;

import io.netty.buffer.UnpooledByteBufAllocator;
Expand All @@ -31,7 +34,9 @@
import io.netty.handler.codec.http2.DefaultHttp2DataFrame;
import io.netty.handler.codec.http2.DefaultHttp2Headers;
import io.netty.handler.codec.http2.DefaultHttp2HeadersFrame;
import io.netty.handler.codec.http2.Http2DataFrame;
import io.netty.handler.codec.http2.Http2Headers;
import io.netty.handler.codec.http2.Http2HeadersFrame;
import org.junit.After;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -44,15 +49,19 @@
import static io.netty.handler.codec.http.HttpMethod.PUT;
import static io.netty.handler.codec.http.HttpResponseStatus.NO_CONTENT;
import static io.netty.handler.codec.http.HttpResponseStatus.OK;
import static io.servicetalk.buffer.api.EmptyBuffer.EMPTY_BUFFER;
import static io.servicetalk.buffer.api.Matchers.contentEqualTo;
import static io.servicetalk.buffer.netty.BufferAllocators.DEFAULT_ALLOCATOR;
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.http.api.HeaderUtils.isTransferEncodingChunked;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.api.HttpRequestMethod.GET;
import static io.servicetalk.http.api.HttpRequestMethod.HEAD;
import static io.servicetalk.http.api.StreamingHttpRequests.newRequest;
import static io.servicetalk.http.api.StreamingHttpResponses.newResponse;
import static io.servicetalk.transport.netty.internal.CloseHandler.forNonPipelined;
import static java.lang.String.valueOf;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
Expand All @@ -65,7 +74,7 @@
@RunWith(Parameterized.class)
public class AbstractH2DuplexHandlerTest {

private static final HttpHeadersFactory HEADERS_FACTORY = DefaultHttpHeadersFactory.INSTANCE;
private static final HttpHeadersFactory HEADERS_FACTORY = H2HeadersFactory.INSTANCE;

private enum Variant {

Expand Down Expand Up @@ -310,4 +319,65 @@ public void singleHeadersFrameWithZeroContentLength() {
assertThat(trailers.isEmpty(), is(true));
assertThat(channel.inboundMessages(), is(empty()));
}

@Test
public void emptyMessageWrittenAsSingleFrame() {
HttpMetaData msg;
switch (variant) {
case CLIENT_HANDLER:
msg = newRequest(GET, "/", HTTP_2_0, HEADERS_FACTORY.newHeaders(), DEFAULT_ALLOCATOR,
HEADERS_FACTORY);
break;
case SERVER_HANDLER:
msg = newResponse(HttpResponseStatus.OK, HTTP_2_0, HEADERS_FACTORY.newHeaders(), DEFAULT_ALLOCATOR,
HEADERS_FACTORY);
break;
default:
throw new IllegalStateException("Unexpected variant: " + variant);
}
channel.writeOutbound(msg);
channel.writeOutbound(EMPTY_BUFFER);
channel.writeOutbound(EmptyHttpHeaders.INSTANCE);

Http2HeadersFrame frame = channel.readOutbound();
assertThat("Unexpected endStream flag value", frame.isEndStream(), is(true));
assertThat("Unexpected outbound messages", channel.outboundMessages(), empty());
}

@Test
public void noDataFramesForEmptyBuffers() {
Buffer[] payload = {EMPTY_BUFFER, DEFAULT_ALLOCATOR.fromAscii("data"), EMPTY_BUFFER};

HttpMetaData msg;
switch (variant) {
case CLIENT_HANDLER:
StreamingHttpRequest request = newRequest(GET, "/", HTTP_2_0,
HEADERS_FACTORY.newHeaders(), DEFAULT_ALLOCATOR, HEADERS_FACTORY);
request.payloadBody(from(payload));
msg = request;
break;
case SERVER_HANDLER:
StreamingHttpResponse response = newResponse(HttpResponseStatus.OK, HTTP_2_0,
HEADERS_FACTORY.newHeaders(), DEFAULT_ALLOCATOR, HEADERS_FACTORY);
response.payloadBody(from(payload));
msg = response;
break;
default:
throw new IllegalStateException("Unexpected variant: " + variant);
}
channel.writeOutbound(msg);
for (Buffer buffer : payload) {
channel.writeOutbound(buffer);
}
channel.writeOutbound(EmptyHttpHeaders.INSTANCE);

Http2HeadersFrame headersFrame = channel.readOutbound();
assertThat("Unexpected endStream flag value at headers frame", headersFrame.isEndStream(), is(false));
Http2DataFrame dataFrame = channel.readOutbound();
assertThat("Unexpected data", dataFrame.content().toString(US_ASCII), is("data"));
assertThat("Unexpected endStream flag value at data frame", dataFrame.isEndStream(), is(false));
dataFrame = channel.readOutbound();
assertThat("Unexpected endStream flag value at last frame", dataFrame.isEndStream(), is(true));
assertThat("Unexpected outbound messages", channel.outboundMessages(), empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.api.StreamingHttpResponseFactory;
import io.servicetalk.http.api.StreamingHttpService;
import io.servicetalk.http.api.StreamingHttpServiceFilter;

import org.junit.Test;
Expand All @@ -39,6 +40,7 @@
import static io.servicetalk.buffer.api.Matchers.contentEqualTo;
import static io.servicetalk.concurrent.api.Publisher.from;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_LENGTH;
import static io.servicetalk.http.api.HttpHeaderNames.CONTENT_TYPE;
import static io.servicetalk.http.api.HttpHeaderNames.TRANSFER_ENCODING;
import static io.servicetalk.http.api.HttpHeaderValues.CHUNKED;
import static io.servicetalk.http.api.HttpResponseStatus.OK;
Expand All @@ -47,7 +49,6 @@
import static io.servicetalk.http.netty.AbstractNettyHttpServerTest.ExecutorSupplier.CACHED_SERVER;
import static io.servicetalk.http.netty.HttpProtocol.HTTP_1;
import static io.servicetalk.http.netty.HttpProtocol.HTTP_2;
import static io.servicetalk.http.netty.TestServiceStreaming.SVC_ECHO;
import static java.lang.String.valueOf;
import static java.nio.charset.StandardCharsets.US_ASCII;
import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -102,6 +103,40 @@ public static Collection<Object[]> data() {
return list;
}

@Override
protected void service(final StreamingHttpService __) {
// Replace the original service with custom impl.
// We use a custom "echo" service instead of TestServiceStreaming#SVC_ECHO because we want to modify PayloadInfo
// flags only when payload body or trailers are present in the request:
super.service((ctx, request, factory) -> request.toRequest().map(req -> {
final StreamingHttpResponse response = factory.ok().version(req.version());
if (req.payloadBody().readableBytes() > 0) {
response.payloadBody(from(req.payloadBody()));
}
if (!req.trailers().isEmpty()) {
response.transform(new StatelessTrailersTransformer<Buffer>() {
@Override
protected HttpHeaders payloadComplete(final HttpHeaders trailers) {
return trailers.add(req.trailers());
}
});
}
final CharSequence contentLength = req.headers().get(CONTENT_LENGTH);
if (contentLength != null) {
response.addHeader(CONTENT_LENGTH, contentLength);
}
final CharSequence contentType = req.headers().get(CONTENT_TYPE);
if (contentType != null) {
response.addHeader(CONTENT_TYPE, contentType);
}
final CharSequence transferEncoding = req.headers().get(TRANSFER_ENCODING);
if (transferEncoding != null) {
response.addHeader(TRANSFER_ENCODING, transferEncoding);
}
return response;
}));
}

@Test
public void contentLengthAddedAutomaticallyByAggregatedApiConversion() throws Exception {
test(r -> r.toRequest().toFuture().get().toStreamingRequest(), r -> r, true, false, false);
Expand All @@ -119,7 +154,10 @@ public void transferEncodingAddedAutomatically() throws Exception {

@Test
public void transferEncodingAddedManually() throws Exception {
test(r -> r.setHeader(TRANSFER_ENCODING, CHUNKED), r -> r, false, true, false);
// HTTP/2 can write a request without payload body as a single frame,
// server adds "content-length: 0" when it reads those requests
boolean hasContentLength = protocol == HTTP_2 && content.isEmpty();
test(r -> r.setHeader(TRANSFER_ENCODING, CHUNKED), r -> r, hasContentLength, !hasContentLength, false);
}

@Test
Expand Down Expand Up @@ -244,7 +282,7 @@ private void test(Transformer<StreamingHttpRequest> requestTransformer,
Transformer<StreamingHttpResponse> responseTransformer,
boolean hasContentLength, boolean chunked, boolean hasTrailers) throws Exception {

StreamingHttpRequest preRequest = streamingHttpConnection().post(SVC_ECHO);
StreamingHttpRequest preRequest = streamingHttpConnection().post("/");
if (!content.isEmpty()) {
preRequest.payloadBody(from(content), textSerializer());
}
Expand Down
Loading

0 comments on commit edc0e6d

Please sign in to comment.