Skip to content

Commit

Permalink
Support hostname verification on proxy to broker connection (#1214) (#14
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rdhabalia authored and Jai Asher committed Feb 12, 2018
1 parent 1336b85 commit 136c241
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 9 deletions.
3 changes: 3 additions & 0 deletions conf/proxy.conf
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,6 @@ tlsCertificateFilePath=

# Path for the TLS private key file
tlsKeyFilePath=

# Validates hostname when proxy creates tls connection with broker
tlsHostnameVerificationEnabled=false
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import java.net.URISyntaxException;
import java.security.cert.X509Certificate;

import javax.net.ssl.SSLSession;

import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.common.api.Commands;
Expand All @@ -38,15 +40,18 @@
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.ssl.SslContext;
import io.netty.handler.ssl.SslContextBuilder;
import io.netty.handler.ssl.SslHandler;
import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import org.apache.http.conn.ssl.DefaultHostnameVerifier;

public class DirectProxyHandler {

Expand Down Expand Up @@ -104,7 +109,7 @@ protected void initChannel(SocketChannel ch) throws Exception {
}
ch.pipeline().addLast("frameDecoder",
new PulsarLengthFieldFrameDecoder(PulsarDecoder.MaxFrameSize, 0, 4, 0, 4));
ch.pipeline().addLast(new ProxyBackendHandler());
ch.pipeline().addLast("proxyOutboundHandler", new ProxyBackendHandler(config));
}
});

Expand All @@ -124,7 +129,10 @@ protected void initChannel(SocketChannel ch) throws Exception {
if (!future.isSuccess()) {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
return;
}
final ProxyBackendHandler cnx = (ProxyBackendHandler) outboundChannel.pipeline().get("proxyOutboundHandler");
cnx.setRemoteHostName(targetBroker.getHost());
});
}

Expand All @@ -135,9 +143,17 @@ enum BackendState {
public class ProxyBackendHandler extends PulsarDecoder implements FutureListener<Void> {

private BackendState state = BackendState.Init;
private String remoteHostName;
protected ChannelHandlerContext ctx;
private ProxyConfiguration config;

public ProxyBackendHandler(ProxyConfiguration config) {
this.config = config;
}

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
this.ctx = ctx;
// Send the Connect command to broker
String authData = "";
if (authentication.getAuthData().hasDataFromCommand()) {
Expand Down Expand Up @@ -195,6 +211,15 @@ protected void handleConnected(CommandConnected connected) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Received Connected from broker", inboundChannel, outboundChannel);
}

if (config.isTlsHostnameVerificationEnabled() && remoteHostName != null
&& !verifyTlsHostName(remoteHostName, ctx)) {
// close the connection if host-verification failed with the broker
log.warn("[{}] Failed to verify hostname of {}", ctx.channel(), remoteHostName);
ctx.close();
return;
}

state = BackendState.HandshakeCompleted;

inboundChannel.writeAndFlush(Commands.newConnected(connected.getProtocolVersion())).addListener(future -> {
Expand All @@ -220,6 +245,21 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
log.warn("[{}] [{}] Caught exception: {}", inboundChannel, outboundChannel, cause.getMessage(), cause);
ctx.close();
}

public void setRemoteHostName(String remoteHostName) {
this.remoteHostName = remoteHostName;
}

private boolean verifyTlsHostName(String hostname, ChannelHandlerContext ctx) {
ChannelHandler sslHandler = ctx.channel().pipeline().get("tls");

SSLSession sslSession = null;
if (sslHandler != null) {
sslSession = ((SslHandler) sslHandler).engine().getSession();
return (new DefaultHostnameVerifier()).verify(hostname, sslSession);
}
return false;
}
}

private static final Logger log = LoggerFactory.getLogger(DirectProxyHandler.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ public class ProxyConfiguration implements PulsarConfiguration {
private String tlsTrustCertsFilePath;
// Accept untrusted TLS certificate from client
private boolean tlsAllowInsecureConnection = false;
// Validates hostname when proxy creates tls connection with broker
private boolean tlsHostnameVerificationEnabled = false;

private Properties properties = new Properties();

Expand Down Expand Up @@ -213,6 +215,14 @@ public void setTlsAllowInsecureConnection(boolean tlsAllowInsecureConnection) {
this.tlsAllowInsecureConnection = tlsAllowInsecureConnection;
}

public boolean isTlsHostnameVerificationEnabled() {
return tlsHostnameVerificationEnabled;
}

public void setTlsHostnameVerificationEnabled(boolean tlsHostnameVerificationEnabled) {
this.tlsHostnameVerificationEnabled = tlsHostnameVerificationEnabled;
}

public String getBrokerClientAuthenticationPlugin() {
return brokerClientAuthenticationPlugin;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.broker.authentication.AuthenticationProviderTls;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.ClientConfiguration;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerConfiguration;
import org.apache.pulsar.client.api.Message;
Expand Down Expand Up @@ -134,7 +135,6 @@ protected void setup() throws Exception {

proxyService = Mockito.spy(new ProxyService(proxyConfig));

proxyService.start();
}

@AfterMethod
Expand All @@ -144,9 +144,13 @@ protected void cleanup() throws Exception {
proxyService.close();
}

void startProxy() throws Exception {
proxyService.start();
}

/**
* <pre>
* It verifies e2e tls + Authentication + Authorization (client -> proxy -> broker>
* It verifies e2e tls + Authentication + Authorization (client -> proxy -> broker)
*
* 1. client connects to proxy over tls and pass auth-data
* 2. proxy authenticate client and retrieve client-role
Expand All @@ -162,10 +166,12 @@ protected void cleanup() throws Exception {
public void textProxyAuthorization() throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, false);
ClientConfiguration clientConf = new ClientConfiguration();
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);

String namespaceName = "my-property/proxy-authorization/my-ns";
admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
Expand Down Expand Up @@ -211,16 +217,75 @@ public void textProxyAuthorization() throws Exception {
}

@Test(dataProvider = "hostnameVerification")
public void textProxyAuthorizationTlsHostVerification(boolean hostnameVerificationEnabled) throws Exception {
public void textTlsHostVerificationProxyToClient(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

startProxy();
createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setTlsHostnameVerificationEnable(hostnameVerificationEnabled);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);

admin.namespaces().grantPermissionOnNamespace(namespaceName, "Proxy",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));
admin.namespaces().grantPermissionOnNamespace(namespaceName, "Client",
Sets.newHashSet(AuthAction.consume, AuthAction.produce));

ConsumerConfiguration conf = new ConsumerConfiguration();
conf.setSubscriptionType(SubscriptionType.Exclusive);
try {
Consumer consumer = proxyClient.subscribe("persistent://my-property/proxy-authorization/my-ns/my-topic1",
"my-subscriber-name", conf);
if (hostnameVerificationEnabled) {
Assert.fail("Connection should be failed due to hostnameVerification enabled");
}
} catch (PulsarClientException e) {
if (!hostnameVerificationEnabled) {
Assert.fail("Consumer should be created because hostnameverification is disabled");
}
}

log.info("-- Exiting {} test --", methodName);
}

/**
* It verifies hostname verification at proxy when proxy tries to connect with broker. Proxy performs hostname
* verification when broker sends its certs over tls .
* <pre>
* 1. Broker sends certs back to proxy with CN="Broker" however, proxy tries to connect with hostname=localhost
* 2. so, client fails to create consumer if proxy is enabled with hostname verification
* </pre>
*
* @param hostnameVerificationEnabled
* @throws Exception
*/
@Test(dataProvider = "hostnameVerification")
public void textTlsHostVerificationProxyToBroker(boolean hostnameVerificationEnabled) throws Exception {
log.info("-- Starting {} test --", methodName);

proxyConfig.setTlsHostnameVerificationEnabled(hostnameVerificationEnabled);
startProxy();
createAdminClient();
final String proxyServiceUrl = "pulsar://localhost:" + proxyConfig.getServicePortTls();
// create a client which connects to proxy over tls and pass authData
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, hostnameVerificationEnabled);
ClientConfiguration clientConf = new ClientConfiguration();
clientConf.setOperationTimeout(1, TimeUnit.SECONDS);
PulsarClient proxyClient = createPulsarClient(proxyServiceUrl, clientConf);

String namespaceName = "my-property/proxy-authorization/my-ns";

admin.clusters().createCluster("proxy-authorization", new ClusterData(brokerUrl.toString(), brokerUrlTls.toString(),
"pulsar://localhost:" + BROKER_PORT, "pulsar+ssl://localhost:" + BROKER_PORT_TLS));
admin.properties().createProperty("my-property",
new PropertyAdmin(Lists.newArrayList("appid1", "appid2"), Sets.newHashSet("proxy-authorization")));
admin.namespaces().createNamespace(namespaceName);
Expand Down Expand Up @@ -263,19 +328,17 @@ protected final void createAdminClient() throws Exception {
admin = spy(new PulsarAdmin(brokerUrlTls, clientConf));
}

private PulsarClient createPulsarClient(String proxyServiceUrl, boolean hosnameVerificationEnabled) throws PulsarClientException {
private PulsarClient createPulsarClient(String proxyServiceUrl, ClientConfiguration clientConf) throws PulsarClientException {
Map<String, String> authParams = Maps.newHashMap();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
Authentication authTls = new AuthenticationTls();
authTls.configure(authParams);
org.apache.pulsar.client.api.ClientConfiguration clientConf = new org.apache.pulsar.client.api.ClientConfiguration();
clientConf.setStatsInterval(0, TimeUnit.SECONDS);
clientConf.setTlsTrustCertsFilePath(TLS_CLIENT_TRUST_CERT_FILE_PATH);
clientConf.setTlsAllowInsecureConnection(true);
clientConf.setAuthentication(authTls);
clientConf.setUseTls(true);
clientConf.setTlsHostnameVerificationEnable(hosnameVerificationEnabled);
return PulsarClient.create(proxyServiceUrl, clientConf);
}
}

0 comments on commit 136c241

Please sign in to comment.