Skip to content

Commit

Permalink
Implement HTTP proxy CONNECT with ALPN
Browse files Browse the repository at this point in the history
  • Loading branch information
idelpivnitskiy committed Sep 19, 2023
1 parent a50c57b commit 499d011
Show file tree
Hide file tree
Showing 12 changed files with 179 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public interface SingleAddressHttpClientBuilder<U, R> extends HttpClientBuilder<
* If the client talks to a proxy over http (not https, {@link #sslConfig(ClientSslConfig) ClientSslConfig} is NOT
* configured), it will rewrite the request-target to
* <a href="https://tools.ietf.org/html/rfc7230#section-5.3.2">absolute-form</a>, as specified by the RFC.
* <p>
* For secure proxy tunnels (when {@link #sslConfig(ClientSslConfig) ClientSslConfig} is configured) the tunnel is
* always initialized using
* <a href="https://datatracker.ietf.org/doc/html/rfc9110#section-9.3.6">HTTP/1.1 CONNECT</a> request. The actual
* protocol will be negotiated via <a href="https://tools.ietf.org/html/rfc7301">ALPN extension</a> of TLS protocol,
* taking into account HTTP protocols configured via {@link #protocols(HttpProtocolConfig...)} method.
*
* @param proxyAddress Unresolved address of the proxy. When used with a builder created for a resolved address,
* {@code proxyAddress} should also be already resolved – otherwise runtime exceptions may occur.
Expand All @@ -70,6 +76,12 @@ default SingleAddressHttpClientBuilder<U, R> proxyAddress(U proxyAddress) { // F
* If the client talks to a proxy over http (not https, {@link #sslConfig(ClientSslConfig) ClientSslConfig} is NOT
* configured), it will rewrite the request-target to
* <a href="https://tools.ietf.org/html/rfc7230#section-5.3.2">absolute-form</a>, as specified by the RFC.
* <p>
* For secure proxy tunnels (when {@link #sslConfig(ClientSslConfig) ClientSslConfig} is configured) the tunnel is
* always initialized using
* <a href="https://datatracker.ietf.org/doc/html/rfc9110#section-9.3.6">HTTP/1.1 CONNECT</a> request. The actual
* protocol will be negotiated via <a href="https://tools.ietf.org/html/rfc7301">ALPN extension</a> of TLS protocol,
* taking into account HTTP protocols configured via {@link #protocols(HttpProtocolConfig...)} method.
*
* @param proxyAddress Unresolved address of the proxy. When used with a builder created for a resolved address,
* {@code proxyAddress} should also be already resolved – otherwise runtime exceptions may occur.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,27 +28,34 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Consumer;
import javax.annotation.Nullable;

import static io.servicetalk.http.netty.AlpnIds.HTTP_1_1;
import static io.servicetalk.transport.netty.internal.ChannelCloseUtils.assignConnectionError;
import static java.util.Objects.requireNonNull;

/**
* A {@link Single} that initializes ALPN handler and completes after protocol negotiation.
*/
final class AlpnChannelSingle extends ChannelInitSingle<String> {
private final boolean forceChannelRead;
private final Consumer<ChannelHandlerContext> onHandlerAdded;

AlpnChannelSingle(final Channel channel,
final ChannelInitializer channelInitializer) {
this(channel, channelInitializer, __ -> { });
}

AlpnChannelSingle(final Channel channel,
final ChannelInitializer channelInitializer,
final boolean forceChannelRead) {
final Consumer<ChannelHandlerContext> onHandlerAdded) {
super(channel, channelInitializer);
this.forceChannelRead = forceChannelRead;
this.onHandlerAdded = requireNonNull(onHandlerAdded);
}

@Override
protected ChannelHandler newChannelHandler(final Subscriber<? super String> subscriber) {
return new AlpnChannelHandler(subscriber, forceChannelRead);
return new AlpnChannelHandler(subscriber, onHandlerAdded);
}

/**
Expand All @@ -61,24 +68,19 @@ private static final class AlpnChannelHandler extends ApplicationProtocolNegotia

@Nullable
private SingleSource.Subscriber<? super String> subscriber;
private final boolean forceRead;
private final Consumer<ChannelHandlerContext> onHandlerAdded;

AlpnChannelHandler(final SingleSource.Subscriber<? super String> subscriber,
final boolean forceRead) {
final Consumer<ChannelHandlerContext> onHandlerAdded) {
super(HTTP_1_1);
this.subscriber = subscriber;
this.forceRead = forceRead;
this.onHandlerAdded = onHandlerAdded;
}

@Override
public void handlerAdded(final ChannelHandlerContext ctx) throws Exception {
super.handlerAdded(ctx);
if (forceRead) {
// Force a read to get the SSL handshake started. We initialize pipeline before
// SslHandshakeCompletionEvent will complete, therefore, no data will be propagated before we finish
// initialization.
ctx.read();
}
onHandlerAdded.accept(ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ private Single<FilterableStreamingHttpConnection> createConnection(
final Channel channel, final ConnectionObserver connectionObserver,
final ReadOnlyTcpClientConfig tcpConfig) {
return new AlpnChannelSingle(channel,
new TcpClientChannelInitializer(tcpConfig, connectionObserver), false).flatMap(protocol -> {
new TcpClientChannelInitializer(tcpConfig, connectionObserver)).flatMap(protocol -> {
switch (protocol) {
case HTTP_1_1:
final H1ProtocolConfig h1Config = config.h1Config();
Expand All @@ -89,8 +89,12 @@ private Single<FilterableStreamingHttpConnection> createConnection(
new H2ClientParentChannelInitializer(h2Config),
connectionObserver, config.allowDropTrailersReadFromTransport());
default:
return failed(new IllegalStateException("Unknown ALPN protocol negotiated: " + protocol));
return unknownAlpnProtocol(protocol);
}
});
}

static Single<FilterableStreamingHttpConnection> unknownAlpnProtocol(final String protocol) {
return failed(new IllegalStateException("Unknown ALPN protocol negotiated: " + protocol));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -235,8 +235,9 @@ public HttpExecutionStrategy executionStrategy() {
return computedStrategy;
}
};
if (roConfig.h2Config() != null && roConfig.hasProxy()) {
throw new IllegalStateException("Proxying is not yet supported with HTTP/2");
final SslContext sslContext = roConfig.tcpConfig().sslContext();
if (roConfig.hasProxy() && sslContext == null && roConfig.h2Config() != null) {
throw new IllegalStateException("Proxying is not yet supported with plaintext HTTP/2");
}

// Track resources that potentially need to be closed when an exception is thrown during buildStreaming
Expand All @@ -250,7 +251,6 @@ public HttpExecutionStrategy executionStrategy() {
final ExecutionStrategy connectionFactoryStrategy =
ctx.builder.strategyComputation.buildForConnectionFactory();

final SslContext sslContext = roConfig.tcpConfig().sslContext();
if (roConfig.hasProxy() && sslContext != null) {
assert roConfig.connectAddress() != null;
final ConnectionFactoryFilter<R, FilterableStreamingHttpConnection> proxy =
Expand All @@ -269,14 +269,14 @@ public HttpExecutionStrategy executionStrategy() {
ctx.builder.addIdleTimeoutConnectionFilter ?
appendConnectionFilter(ctx.builder.connectionFilterFactory, DEFAULT_IDLE_TIMEOUT_FILTER) :
ctx.builder.connectionFilterFactory;
if (roConfig.isH2PriorKnowledge()) {
if (!roConfig.hasProxy() && roConfig.isH2PriorKnowledge()) {
H2ProtocolConfig h2Config = roConfig.h2Config();
assert h2Config != null;
connectionFactory = new H2LBHttpConnectionFactory<>(roConfig, executionContext,
connectionFilterFactory, reqRespFactory,
connectionFactoryStrategy, connectionFactoryFilter,
ctx.builder.loadBalancerFactory::toLoadBalancedConnection);
} else if (roConfig.tcpConfig().preferredAlpnProtocol() != null) {
} else if (!roConfig.hasProxy() && roConfig.tcpConfig().preferredAlpnProtocol() != null) {
H1ProtocolConfig h1Config = roConfig.h1Config();
H2ProtocolConfig h2Config = roConfig.h2Config();
connectionFactory = new AlpnLBHttpConnectionFactory<>(roConfig, executionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.servicetalk.transport.netty.internal.NettyConnectionContext;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -90,8 +91,11 @@ private static Single<NettyConnectionContext> alpnInitChannel(final SocketAddres
final StreamingHttpService service,
final boolean drainRequestPayloadBody,
final ConnectionObserver observer) {
return new AlpnChannelSingle(channel,
new TcpServerChannelInitializer(config.tcpConfig(), observer), true).flatMap(protocol -> {
return new AlpnChannelSingle(channel, new TcpServerChannelInitializer(config.tcpConfig(), observer),
// Force a read to get the SSL handshake started. We initialize pipeline before
// SslHandshakeCompletionEvent will complete, therefore, no data will be propagated before we finish
// initialization.
ChannelHandlerContext::read).flatMap(protocol -> {
switch (protocol) {
case HTTP_1_1:
return NettyHttpServer.initChannel(channel, httpExecutionContext, config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ final class PipelinedLBHttpConnectionFactory<ResolvedAddress> extends AbstractLB
Single<FilterableStreamingHttpConnection> newFilterableConnection(final ResolvedAddress resolvedAddress,
final TransportObserver observer) {
assert config.h1Config() != null;
return buildStreaming(executionContext, resolvedAddress, config, observer)
return buildStreaming(executionContext, resolvedAddress, config.tcpConfig(), config.h1Config(),
config.hasProxy(), observer)
.map(conn -> new PipelinedStreamingHttpConnection(conn, config.h1Config(),
reqRespFactoryFunc.apply(HTTP_1_1), config.allowDropTrailersReadFromTransport()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.servicetalk.http.netty;

import io.servicetalk.client.api.ConnectionFactoryFilter;
import io.servicetalk.concurrent.SingleSource;
import io.servicetalk.concurrent.api.Completable;
import io.servicetalk.concurrent.api.Publisher;
import io.servicetalk.concurrent.api.Single;
Expand All @@ -27,31 +26,33 @@
import io.servicetalk.http.api.StreamingHttpRequest;
import io.servicetalk.http.api.StreamingHttpRequestResponseFactory;
import io.servicetalk.http.api.StreamingHttpResponse;
import io.servicetalk.http.netty.AlpnChannelSingle.NoopChannelInitializer;
import io.servicetalk.tcp.netty.internal.ReadOnlyTcpClientConfig;
import io.servicetalk.transport.api.ExecutionStrategy;
import io.servicetalk.transport.api.TransportObserver;
import io.servicetalk.transport.netty.internal.CopyByteBufHandlerChannelInitializer;
import io.servicetalk.transport.netty.internal.DefaultNettyConnection;
import io.servicetalk.transport.netty.internal.DeferSslHandler;
import io.servicetalk.transport.netty.internal.NoopTransportObserver.NoopConnectionObserver;
import io.servicetalk.transport.netty.internal.StacklessClosedChannelException;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.ssl.SslHandshakeCompletionEvent;

import java.util.function.Consumer;
import javax.annotation.Nullable;

import static io.servicetalk.concurrent.api.Processors.newSingleProcessor;
import static io.servicetalk.concurrent.api.SourceAdapters.fromSource;
import static io.servicetalk.http.api.HttpApiConversions.isPayloadEmpty;
import static io.servicetalk.http.api.HttpContextKeys.HTTP_EXECUTION_STRATEGY_KEY;
import static io.servicetalk.http.api.HttpExecutionStrategies.customStrategyBuilder;
import static io.servicetalk.http.api.HttpExecutionStrategies.offloadNone;
import static io.servicetalk.http.api.HttpHeaderNames.HOST;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_1_1;
import static io.servicetalk.http.api.HttpProtocolVersion.HTTP_2_0;
import static io.servicetalk.http.api.HttpResponseStatus.StatusClass.SUCCESSFUL_2XX;
import static io.servicetalk.http.netty.AlpnLBHttpConnectionFactory.unknownAlpnProtocol;
import static io.servicetalk.http.netty.HttpProtocolConfigs.h1Default;
import static io.servicetalk.http.netty.StreamingConnectionFactory.buildStreaming;
import static io.servicetalk.utils.internal.ThrowableUtils.addSuppressed;
import static java.util.Objects.requireNonNull;

final class ProxyConnectLBHttpConnectionFactory<ResolvedAddress>
extends AbstractLBHttpConnectionFactory<ResolvedAddress> {
Expand All @@ -71,7 +72,6 @@ final class ProxyConnectLBHttpConnectionFactory<ResolvedAddress>
final Consumer<StreamingHttpRequest> connectRequestInitializer) {
super(config, executionContext, version -> reqRespFactory, connectStrategy, connectionFactoryFilter,
connectionFilterFunction, protocolBinding);
requireNonNull(config.h1Config(), "H1ProtocolConfig is required");
assert config.connectAddress() != null;
this.connectAddress = config.connectAddress().toString();
this.connectRequestInitializer = connectRequestInitializer;
Expand All @@ -80,9 +80,13 @@ final class ProxyConnectLBHttpConnectionFactory<ResolvedAddress>
@Override
Single<FilterableStreamingHttpConnection> newFilterableConnection(final ResolvedAddress resolvedAddress,
final TransportObserver observer) {
assert config.h1Config() != null;
return buildStreaming(executionContext, resolvedAddress, config, observer)
.map(c -> new PipelinedStreamingHttpConnection(c, config.h1Config(),
final H1ProtocolConfig h1Config = config.h1Config() != null ? config.h1Config() : h1Default();
return buildStreaming(executionContext, resolvedAddress, config.tcpConfig(), h1Config, config.hasProxy(),
observer)
// Always create PipelinedStreamingHttpConnection because:
// 1. buildStreaming creates a CloseHandler for pipelined request-response
// 2. in case ALPN negotiates HTTP/1.x we won't need to change the connection
.map(c -> new PipelinedStreamingHttpConnection(c, h1Config,
reqRespFactoryFunc.apply(HTTP_1_1), config.allowDropTrailersReadFromTransport()))
.flatMap(this::processConnect);
}
Expand Down Expand Up @@ -137,27 +141,10 @@ private static void configureOffloading(final StreamingHttpRequest request) {
private Single<FilterableStreamingHttpConnection> handshake(
final NettyFilterableStreamingHttpConnection connection) {
return Single.defer(() -> {
final SingleSource.Processor<FilterableStreamingHttpConnection, FilterableStreamingHttpConnection>
processor = newSingleProcessor();
final Channel channel = connection.nettyChannel();
assert channel.eventLoop().inEventLoop();
channel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt) {
if (evt instanceof SslHandshakeCompletionEvent) {
SslHandshakeCompletionEvent event = (SslHandshakeCompletionEvent) evt;
if (event.isSuccess()) {
processor.onSuccess(connection);
} else {
processor.onError(event.cause());
}
channel.pipeline().remove(this);
}
ctx.fireUserEventTriggered(evt);
}
});

final Single<FilterableStreamingHttpConnection> result;
final Single<String> result;
final DeferSslHandler deferSslHandler = channel.pipeline().get(DeferSslHandler.class);
if (deferSslHandler == null) {
if (!channel.isActive()) {
Expand All @@ -171,8 +158,39 @@ public void userEventTriggered(final ChannelHandlerContext ctx, final Object evt
DeferSslHandler.class + " in the channel pipeline."));
}
} else {
deferSslHandler.ready();
result = fromSource(processor);
result = new AlpnChannelSingle(channel, NoopChannelInitializer.INSTANCE, __ -> deferSslHandler.ready());
}
return result.shareContextOnSubscribe();
}).flatMap(protocol -> {
final Single<FilterableStreamingHttpConnection> result;
switch (protocol) {
case AlpnIds.HTTP_1_1:
// Nothing to do, HTTP/1.1 pipeline is already initialized
result = Single.succeeded(connection);
break;
case AlpnIds.HTTP_2:
final Channel channel = connection.nettyChannel();
assert channel.eventLoop().inEventLoop();
// Remove HTTP/1.1 handlers:
channel.pipeline().remove(HttpRequestEncoder.class);
channel.pipeline().remove(HttpResponseDecoder.class);
channel.pipeline().remove(CopyByteBufHandlerChannelInitializer.handlerClass());
channel.pipeline().remove(DefaultNettyConnection.handlerClass());
// Initialize HTTP/2:
final H2ProtocolConfig h2Config = config.h2Config();
assert h2Config != null;
final ReadOnlyTcpClientConfig tcpConfig = config.tcpConfig();
result = H2ClientParentConnectionContext.initChannel(channel, executionContext,
h2Config, reqRespFactoryFunc.apply(HTTP_2_0), tcpConfig.flushStrategy(),
tcpConfig.idleTimeoutMs(), tcpConfig.sslConfig(),
new H2ClientParentChannelInitializer(h2Config),
// FIXME: propagate real observer
NoopConnectionObserver.INSTANCE, config.allowDropTrailersReadFromTransport())
.cast(FilterableStreamingHttpConnection.class);
break;
default:
result = unknownAlpnProtocol(protocol);
break;
}
return result.shareContextOnSubscribe();
});
Expand Down
Loading

0 comments on commit 499d011

Please sign in to comment.