Skip to content

Commit

Permalink
Add TCP Fast Open support (#1375)
Browse files Browse the repository at this point in the history
Motivation:
Netty recently added support for
[TCP fast open](https://tools.ietf.org/html/rfc7413) on the client, and
has had server support for a while. We currently don't expose the
necessary configuration knobs to enable this feature, but it may be able
to reduce round trips and reduce connection setup latency.

Modifications:
- Add socket options to configure client and server TCP fast open.
- Add mechanism to configure the listen/acceptor socket options, and use
  this for the backlog option which was an outlier.

Result:
TCP Fast Open can be enabled for client and server. Note that it is only
supported by Netty's native EPOLL transport on linux, and only
initiated on the client when TLS is enabled.
  • Loading branch information
Scottmitch authored Apr 5, 2021
1 parent d4037d3 commit 808b31c
Show file tree
Hide file tree
Showing 12 changed files with 407 additions and 130 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ public abstract class GrpcServerBuilder {
/**
* The maximum queue length for incoming connection indications (a request to connect) is set to the backlog
* parameter. If a connection indication arrives when the queue is full, the connection may time out.
*
* @deprecated Use {@link #listenSocketOption(SocketOption, Object)} with key
* {@link ServiceTalkSocketOptions#SO_BACKLOG}.
* @param backlog the backlog to use when accepting connections.
* @return {@code this}.
*/
public abstract GrpcServerBuilder backlog(int backlog);
@Deprecated
public GrpcServerBuilder backlog(int backlog) {
listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, backlog);
return this;
}

/**
* Set the SSL/TLS configuration.
Expand Down Expand Up @@ -102,6 +107,17 @@ public abstract class GrpcServerBuilder {
*/
public abstract <T> GrpcServerBuilder socketOption(SocketOption<T> option, T value);

/**
* Adds a {@link SocketOption} that is applied to the server socket channel which listens/accepts socket channels.
* @param <T> the type of the value.
* @param option the option to apply.
* @param value the value.
* @return this.
* @see StandardSocketOptions
* @see ServiceTalkSocketOptions
*/
public abstract <T> GrpcServerBuilder listenSocketOption(SocketOption<T> option, T value);

/**
* Enable wire-logging for this server.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,6 @@ public GrpcServerBuilder protocols(final HttpProtocolConfig... protocols) {
return this;
}

@Override
public GrpcServerBuilder backlog(final int backlog) {
httpServerBuilder.backlog(backlog);
return this;
}

@Override
public GrpcServerBuilder sslConfig(final ServerSslConfig config) {
httpServerBuilder.sslConfig(config);
Expand All @@ -87,6 +81,13 @@ public <T> GrpcServerBuilder socketOption(final SocketOption<T> option, final T
return this;
}

@Override
public <T> GrpcServerBuilder listenSocketOption(final SocketOption<T> option, final T value) {
httpServerBuilder.listenSocketOption(option, value);
return this;
}

@Deprecated
@Override
public GrpcServerBuilder enableWireLogging(final String loggerName) {
httpServerBuilder.enableWireLogging(loggerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,16 @@ public abstract class HttpServerBuilder {
/**
* Sets the maximum queue length for incoming connection indications (a request to connect) is set to the backlog
* parameter. If a connection indication arrives when the queue is full, the connection may time out.
*
* @deprecated Use {@link #listenSocketOption(SocketOption, Object)} with key
* {@link ServiceTalkSocketOptions#SO_BACKLOG}.
* @param backlog the backlog to use when accepting connections.
* @return {@code this}.
*/
public abstract HttpServerBuilder backlog(int backlog);
@Deprecated
public HttpServerBuilder backlog(int backlog) {
listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, backlog);
return this;
}

/**
* Set the SSL/TLS configuration.
Expand All @@ -94,7 +99,7 @@ public abstract class HttpServerBuilder {
public abstract HttpServerBuilder sslConfig(ServerSslConfig defaultConfig, Map<String, ServerSslConfig> sniMap);

/**
* Adds a {@link SocketOption} that is applied.
* Adds a {@link SocketOption} that is applied to connected/accepted socket channels.
*
* @param <T> the type of the value.
* @param option the option to apply.
Expand All @@ -105,6 +110,17 @@ public abstract class HttpServerBuilder {
*/
public abstract <T> HttpServerBuilder socketOption(SocketOption<T> option, T value);

/**
* Adds a {@link SocketOption} that is applied to the server socket channel which listens/accepts socket channels.
* @param <T> the type of the value.
* @param option the option to apply.
* @param value the value.
* @return this.
* @see StandardSocketOptions
* @see ServiceTalkSocketOptions
*/
public abstract <T> HttpServerBuilder listenSocketOption(SocketOption<T> option, T value);

/**
* Enables wire-logging for this server.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.servicetalk.transport.api.IoExecutor;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.ServerSslConfig;
import io.servicetalk.transport.api.ServiceTalkSocketOptions;
import io.servicetalk.transport.api.TransportObserver;

import java.net.SocketAddress;
Expand All @@ -51,9 +52,18 @@ public HttpServerBuilder protocols(final HttpProtocolConfig... protocols) {
return this;
}

@Override
public HttpServerBuilder backlog(final int backlog) {
config.tcpConfig().backlog(backlog);
/**
* Sets the maximum queue length for incoming connection indications (a request to connect) is set to the backlog
* parameter. If a connection indication arrives when the queue is full, the connection may time out.
*
* @deprecated Use {@link #listenSocketOption(SocketOption, Object)} with key
* {@link ServiceTalkSocketOptions#SO_BACKLOG}.
* @param backlog the backlog to use when accepting connections.
* @return {@code this}.
*/
@Deprecated
public HttpServerBuilder backlog(int backlog) {
listenSocketOption(ServiceTalkSocketOptions.SO_BACKLOG, backlog);
return this;
}

Expand All @@ -75,6 +85,12 @@ public <T> HttpServerBuilder socketOption(final SocketOption<T> option, final T
return this;
}

@Override
public <T> HttpServerBuilder listenSocketOption(final SocketOption<T> option, final T value) {
config.tcpConfig().listenSocketOption(option, value);
return this;
}

@Override
public HttpServerBuilder enableWireLogging(final String loggerName) {
config.tcpConfig().enableWireLogging(loggerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,65 +18,95 @@
import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.test.resources.DefaultTestCerts;
import io.servicetalk.transport.api.ClientSslConfigBuilder;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.ServerContext;
import io.servicetalk.transport.api.ServerSslConfigBuilder;
import io.servicetalk.transport.api.SslProvider;

import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.theories.DataPoints;
import org.junit.experimental.theories.FromDataPoints;
import org.junit.experimental.theories.Theories;
import org.junit.experimental.theories.Theory;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;
import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import static io.servicetalk.http.netty.TcpFastOpenTest.clientTcpFastOpenOptions;
import static io.servicetalk.http.netty.TcpFastOpenTest.serverTcpFastOpenOptions;
import static io.servicetalk.test.resources.DefaultTestCerts.serverPemHostname;
import static io.servicetalk.transport.api.SslClientAuthMode.REQUIRE;
import static io.servicetalk.transport.api.SslProvider.JDK;
import static io.servicetalk.transport.api.SslProvider.OPENSSL;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
@RunWith(Theories.class)
public class MutualSslTest {
@Rule
public final Timeout timeout = new ServiceTalkTestTimeout();
private final SslProvider serverSslProvider;
private final SslProvider clientSslProvider;
@DataPoints("serverSslProvider")
public static final SslProvider[] SERVER_PROVIDERS = {JDK, OPENSSL};
@DataPoints("clientSslProvider")
public static final SslProvider[] CLIENT_PROVIDERS = {JDK, OPENSSL};
@DataPoints("serverListenOptions")
@SuppressWarnings("rawtypes")
public static final List<Map<SocketOption, Object>> SERVER_LISTEN_OPTIONS =
asList(emptyMap(), serverTcpFastOpenOptions());
@DataPoints("clientOptions")
@SuppressWarnings("rawtypes")
public static final List<Map<SocketOption, Object>> CLIENT_OPTIONS = asList(emptyMap(), clientTcpFastOpenOptions());

public MutualSslTest(final SslProvider serverSslProvider, final SslProvider clientSslProvider) {
this.serverSslProvider = serverSslProvider;
this.clientSslProvider = clientSslProvider;
}

@Parameterized.Parameters(name = "server={0} client={1}")
public static Collection<Object[]> sslProviders() {
return asList(
new Object[]{JDK, JDK},
new Object[]{JDK, OPENSSL},
new Object[]{OPENSSL, JDK},
new Object[]{OPENSSL, OPENSSL}
);
}

@Test
public void mutualSsl() throws Exception {
try (ServerContext serverContext = HttpServers.forAddress(localAddress(0))
@Theory
public void mutualSsl(@FromDataPoints("serverSslProvider") SslProvider serverSslProvider,
@FromDataPoints("clientSslProvider") SslProvider clientSslProvider,
@SuppressWarnings("rawtypes")
@FromDataPoints("serverListenOptions") Map<SocketOption, Object> serverListenOptions,
@SuppressWarnings("rawtypes")
@FromDataPoints("clientOptions") Map<SocketOption, Object> clientOptions)
throws Exception {
HttpServerBuilder serverBuilder = HttpServers.forAddress(localAddress(0))
.sslConfig(new ServerSslConfigBuilder(
DefaultTestCerts::loadServerPem, DefaultTestCerts::loadServerKey)
.trustManager(DefaultTestCerts::loadClientCAPem)
.clientAuthMode(REQUIRE).provider(serverSslProvider).build())
.listenBlockingAndAwait((ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = HttpClients.forSingleAddress(serverHostAndPort(serverContext))
.trustManager(DefaultTestCerts::loadClientCAPem)
.clientAuthMode(REQUIRE).provider(serverSslProvider).build());
for (@SuppressWarnings("rawtypes") Entry<SocketOption, Object> entry : serverListenOptions.entrySet()) {
@SuppressWarnings("unchecked")
SocketOption<Object> option = entry.getKey();
serverBuilder.listenSocketOption(option, entry.getValue());
}
try (ServerContext serverContext = serverBuilder.listenBlockingAndAwait(
(ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = newClientBuilder(serverContext, clientOptions)
.sslConfig(new ClientSslConfigBuilder(DefaultTestCerts::loadServerCAPem)
.provider(clientSslProvider).peerHost(serverPemHostname())
.keyManager(DefaultTestCerts::loadClientPem, DefaultTestCerts::loadClientKey).build())
.provider(clientSslProvider).peerHost(serverPemHostname())
.keyManager(DefaultTestCerts::loadClientPem, DefaultTestCerts::loadClientKey).build())
.buildBlocking()) {
assertEquals(HttpResponseStatus.OK, client.request(client.get("/")).status());
}
}

private SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder(
ServerContext serverContext, @SuppressWarnings("rawtypes") Map<SocketOption, Object> clientOptions) {
SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builder =
HttpClients.forSingleAddress(serverHostAndPort(serverContext));
for (@SuppressWarnings("rawtypes") Entry<SocketOption, Object> entry : clientOptions.entrySet()) {
@SuppressWarnings("unchecked")
SocketOption<Object> option = entry.getKey();
builder.socketOption(option, entry.getValue());
}
return builder;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Copyright © 2020 Apple Inc. and the ServiceTalk project authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.servicetalk.http.netty;

import io.servicetalk.concurrent.internal.ServiceTalkTestTimeout;
import io.servicetalk.http.api.BlockingHttpClient;
import io.servicetalk.http.api.HttpResponseStatus;
import io.servicetalk.http.api.HttpServerBuilder;
import io.servicetalk.http.api.SingleAddressHttpClientBuilder;
import io.servicetalk.transport.api.HostAndPort;
import io.servicetalk.transport.api.ServerContext;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.net.InetSocketAddress;
import java.net.SocketOption;
import java.util.Collection;
import java.util.Map;
import java.util.Map.Entry;

import static io.servicetalk.transport.api.ServiceTalkSocketOptions.TCP_FASTOPEN_BACKLOG;
import static io.servicetalk.transport.api.ServiceTalkSocketOptions.TCP_FASTOPEN_CONNECT;
import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress;
import static io.servicetalk.transport.netty.internal.AddressUtils.serverHostAndPort;
import static java.util.Arrays.asList;
import static java.util.Collections.emptyMap;
import static java.util.Collections.singletonMap;
import static org.junit.Assert.assertEquals;

@RunWith(Parameterized.class)
public class TcpFastOpenTest {
@Rule
public final Timeout timeout = new ServiceTalkTestTimeout();
@SuppressWarnings("rawtypes")
private final Map<SocketOption, Object> serverListenOptions;
@SuppressWarnings("rawtypes")
private final Map<SocketOption, Object> clientOptions;

public TcpFastOpenTest(
@SuppressWarnings("rawtypes") final Map<SocketOption, Object> serverListenOptions,
@SuppressWarnings("rawtypes") final Map<SocketOption, Object> clientOptions) {
this.serverListenOptions = serverListenOptions;
this.clientOptions = clientOptions;
}

@Parameterized.Parameters(name = "{index}: server opts={0} client opts={1}")
public static Collection<Object[]> sslProviders() {
return asList(
new Object[]{emptyMap(), emptyMap()},
new Object[]{emptyMap(), clientTcpFastOpenOptions()},
new Object[]{serverTcpFastOpenOptions(), emptyMap()},
new Object[]{serverTcpFastOpenOptions(), clientTcpFastOpenOptions()}
);
}

@SuppressWarnings("rawtypes")
static Map<SocketOption, Object> clientTcpFastOpenOptions() {
return singletonMap(TCP_FASTOPEN_CONNECT, true);
}

@SuppressWarnings("rawtypes")
static Map<SocketOption, Object> serverTcpFastOpenOptions() {
return singletonMap(TCP_FASTOPEN_BACKLOG, 1);
}

@Test
public void requestSucceedsEvenIfTcpFastOpenNotEnabledOrSupported() throws Exception {
HttpServerBuilder serverBuilder = HttpServers.forAddress(localAddress(0));
for (@SuppressWarnings("rawtypes") Entry<SocketOption, Object> entry : serverListenOptions.entrySet()) {
@SuppressWarnings("unchecked")
SocketOption<Object> option = entry.getKey();
serverBuilder.listenSocketOption(option, entry.getValue());
}
try (ServerContext serverContext = serverBuilder.listenBlockingAndAwait(
(ctx, request, responseFactory) -> responseFactory.ok());
BlockingHttpClient client = newClientBuilder(serverContext).buildBlocking()) {
assertEquals(HttpResponseStatus.OK, client.request(client.get("/")).status());
}
}

private SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> newClientBuilder(
ServerContext serverContext) {
SingleAddressHttpClientBuilder<HostAndPort, InetSocketAddress> builder =
HttpClients.forSingleAddress(serverHostAndPort(serverContext));
for (@SuppressWarnings("rawtypes") Entry<SocketOption, Object> entry : clientOptions.entrySet()) {
@SuppressWarnings("unchecked")
SocketOption<Object> option = entry.getKey();
builder.socketOption(option, entry.getValue());
}
return builder;
}
}
Loading

0 comments on commit 808b31c

Please sign in to comment.