Skip to content

Commit

Permalink
[improve][proxy] Reuse authentication instance in pulsar-proxy (apach…
Browse files Browse the repository at this point in the history
  • Loading branch information
equanz authored Aug 14, 2024
1 parent fe21441 commit 3e461c0
Show file tree
Hide file tree
Showing 41 changed files with 536 additions and 94 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ void testAuthentication() throws Exception {
proxyConfig.setForwardAuthorizationCredentials(true);
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig));
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService);
@Cleanup
final Authentication proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();
ProxyService proxyService = new ProxyService(proxyConfig, authenticationService, proxyClientAuthentication);

proxyService.start();
final String proxyServiceUrl = "pulsar://localhost:" + proxyService.getListenPort().get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLContext;
Expand All @@ -40,7 +39,6 @@
import org.apache.pulsar.broker.web.AuthenticationFilter;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.KeyStoreParams;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.SecurityUtility;
Expand Down Expand Up @@ -87,12 +85,15 @@ class AdminProxyHandler extends ProxyServlet {

private final ProxyConfiguration config;
private final BrokerDiscoveryProvider discoveryProvider;
private final Authentication proxyClientAuthentication;
private final String brokerWebServiceUrl;
private final String functionWorkerWebServiceUrl;

AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider) {
AdminProxyHandler(ProxyConfiguration config, BrokerDiscoveryProvider discoveryProvider,
Authentication proxyClientAuthentication) {
this.config = config;
this.discoveryProvider = discoveryProvider;
this.proxyClientAuthentication = proxyClientAuthentication;
this.brokerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getBrokerWebServiceURLTLS()
: config.getBrokerWebServiceURL();
this.functionWorkerWebServiceUrl = config.isTlsEnabledWithBroker() ? config.getFunctionWorkerWebServiceURLTLS()
Expand Down Expand Up @@ -256,22 +257,13 @@ protected ContentProvider proxyRequestContent(HttpServletRequest request,
@Override
protected HttpClient newHttpClient() {
try {
Authentication auth = AuthenticationFactory.create(
config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()
);

Objects.requireNonNull(auth, "No supported auth found for proxy");

auth.start();

if (config.isTlsEnabledWithBroker()) {
try {
X509Certificate[] trustCertificates = SecurityUtility
.loadCertificatesFromPemFile(config.getBrokerClientTrustCertsFilePath());

SSLContext sslCtx;
AuthenticationDataProvider authData = auth.getAuthData();
AuthenticationDataProvider authData = proxyClientAuthentication.getAuthData();
if (config.isBrokerClientTlsEnabledWithKeyStore()) {
KeyStoreParams params = authData.hasDataForTls() ? authData.getTlsKeyStoreParams() : null;
sslCtx = KeyStoreSSLContext.createClientSslContext(
Expand Down Expand Up @@ -311,11 +303,6 @@ protected HttpClient newHttpClient() {
return new JettyHttpClient(contextFactory);
} catch (Exception e) {
LOG.error("new jetty http client exception ", e);
try {
auth.close();
} catch (IOException ioe) {
LOG.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationDataProvider;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.AuthData;
Expand Down Expand Up @@ -114,8 +113,7 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection)

if (!isEmpty(config.getBrokerClientAuthenticationPlugin())) {
try {
authData = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters()).getAuthData();
authData = authentication.getAuthData();
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,6 @@
import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServlets;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.netty.DnsResolverUtil;
Expand Down Expand Up @@ -158,7 +156,8 @@ public class ProxyService implements Closeable {
private boolean gracefulShutdown = true;

public ProxyService(ProxyConfiguration proxyConfig,
AuthenticationService authenticationService) throws Exception {
AuthenticationService authenticationService,
Authentication proxyClientAuthentication) throws Exception {
requireNonNull(proxyConfig);
this.proxyConfig = proxyConfig;
this.clientCnxs = Sets.newConcurrentHashSet();
Expand Down Expand Up @@ -207,12 +206,7 @@ public ProxyService(ProxyConfiguration proxyConfig,
});
}, 60, TimeUnit.SECONDS);
this.proxyAdditionalServlets = AdditionalServlets.load(proxyConfig);
if (proxyConfig.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}
this.proxyClientAuthentication = proxyClientAuthentication;
this.connectionController = new ConnectionController.DefaultConnectionController(
proxyConfig.getMaxConcurrentInboundConnections(),
proxyConfig.getMaxConcurrentInboundConnectionsPerIp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import io.prometheus.client.Gauge;
import io.prometheus.client.Gauge.Child;
import io.prometheus.client.hotspot.DefaultExports;
import java.io.IOException;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import lombok.Getter;
Expand All @@ -44,6 +46,10 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet;
import org.apache.pulsar.broker.web.plugin.servlet.AdditionalServletWithClassLoader;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.configuration.VipStatus;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand Down Expand Up @@ -104,6 +110,9 @@ public class ProxyServiceStarter {

private ProxyConfiguration config;

@Getter
private Authentication proxyClientAuthentication;

@Getter
private ProxyService proxyService;

Expand Down Expand Up @@ -244,8 +253,27 @@ public static void main(String[] args) throws Exception {
public void start() throws Exception {
AuthenticationService authenticationService = new AuthenticationService(
PulsarConfigurationLoader.convertFrom(config));

if (config.getBrokerClientAuthenticationPlugin() != null) {
proxyClientAuthentication = AuthenticationFactory.create(config.getBrokerClientAuthenticationPlugin(),
config.getBrokerClientAuthenticationParameters());
Objects.requireNonNull(proxyClientAuthentication, "No supported auth found for proxy");
try {
proxyClientAuthentication.start();
} catch (Exception e) {
try {
proxyClientAuthentication.close();
} catch (IOException ioe) {
log.error("Failed to close the authentication service", ioe);
}
throw new PulsarClientException.InvalidConfigurationException(e.getMessage());
}
} else {
proxyClientAuthentication = AuthenticationDisabled.INSTANCE;
}

// create proxy service
proxyService = new ProxyService(config, authenticationService);
proxyService = new ProxyService(config, authenticationService, proxyClientAuthentication);
// create a web-service
server = new WebServer(config, authenticationService);

Expand Down Expand Up @@ -293,7 +321,8 @@ public double get() {
}

AtomicReference<WebSocketService> webSocketServiceRef = new AtomicReference<>();
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef);
addWebServerHandlers(server, config, proxyService, proxyService.getDiscoveryProvider(), webSocketServiceRef,
proxyClientAuthentication);
webSocketService = webSocketServiceRef.get();

// start web-service
Expand All @@ -311,6 +340,9 @@ public void close() {
if (webSocketService != null) {
webSocketService.close();
}
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}
} catch (Exception e) {
log.warn("server couldn't stop gracefully {}", e.getMessage(), e);
} finally {
Expand All @@ -323,15 +355,17 @@ public void close() {
public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider) throws Exception {
addWebServerHandlers(server, config, service, discoveryProvider, null);
BrokerDiscoveryProvider discoveryProvider,
Authentication proxyClientAuthentication) throws Exception {
addWebServerHandlers(server, config, service, discoveryProvider, null, proxyClientAuthentication);
}

public static void addWebServerHandlers(WebServer server,
ProxyConfiguration config,
ProxyService service,
BrokerDiscoveryProvider discoveryProvider,
AtomicReference<WebSocketService> webSocketServiceRef) throws Exception {
AtomicReference<WebSocketService> webSocketServiceRef,
Authentication proxyClientAuthentication) throws Exception {
// We can make 'status.html' publicly accessible without authentication since
// it does not contain any sensitive data.
server.addRestResource("/", VipStatus.ATTRIBUTE_STATUS_FILE_PATH, config.getStatusFilePath(),
Expand All @@ -348,7 +382,8 @@ public static void addWebServerHandlers(WebServer server,
}
}

AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider);
AdminProxyHandler adminProxyHandler = new AdminProxyHandler(config, discoveryProvider,
proxyClientAuthentication);
ServletHolder servletHolder = new ServletHolder(adminProxyHandler);
server.addServlet("/admin", servletHolder);
server.addServlet("/lookup", servletHolder);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.util.PortManager;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
Expand Down Expand Up @@ -121,6 +123,7 @@ public void close() {
private ProxyService proxyService;
private boolean useSeparateThreadPoolForProxyExtensions;
private ProxyConfiguration proxyConfig = new ProxyConfiguration();
private Authentication proxyClientAuthentication;

public SimpleProxyExtensionTestBase(boolean useSeparateThreadPoolForProxyExtensions) {
this.useSeparateThreadPoolForProxyExtensions = useSeparateThreadPoolForProxyExtensions;
Expand All @@ -142,8 +145,12 @@ protected void setup() throws Exception {
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
proxyConfig.setClusterName(configClusterName);

proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();

proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig))));
PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication));
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeper))).when(proxyService).createLocalMetadataStore();
doReturn(registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal))).when(proxyService)
.createConfigurationMetadataStore();
Expand Down Expand Up @@ -174,6 +181,9 @@ public void testBootstrapProtocolHandler() throws Exception {
protected void cleanup() throws Exception {
super.internalCleanup();
proxyService.close();
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}

if (tempDirectory != null) {
FileUtils.deleteDirectory(tempDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.pulsar.broker.authentication.AuthenticationService;
import org.apache.pulsar.broker.resources.PulsarResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls;
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
import org.apache.pulsar.common.policies.data.ClusterData;
Expand All @@ -47,6 +49,8 @@ public class AdminProxyHandlerKeystoreTLSTest extends MockedPulsarServiceBaseTes

private final ProxyConfiguration proxyConfig = new ProxyConfiguration();

private Authentication proxyClientAuthentication;

private WebServer webServer;

private BrokerDiscoveryProvider discoveryProvider;
Expand Down Expand Up @@ -103,14 +107,18 @@ protected void setup() throws Exception {
KEYSTORE_TYPE, BROKER_KEYSTORE_FILE_PATH, BROKER_KEYSTORE_PW));
proxyConfig.setClusterName(configClusterName);

proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
proxyConfig.getBrokerClientAuthenticationParameters());
proxyClientAuthentication.start();

resource = new PulsarResources(registerCloseable(new ZKMetadataStore(mockZooKeeper)),
registerCloseable(new ZKMetadataStore(mockZooKeeperGlobal)));
webServer = new WebServer(proxyConfig, new AuthenticationService(
PulsarConfigurationLoader.convertFrom(proxyConfig)));
discoveryProvider = spy(registerCloseable(new BrokerDiscoveryProvider(proxyConfig, resource)));
LoadManagerReport report = new LoadReport(brokerUrl.toString(), brokerUrlTls.toString(), null, null);
doReturn(report).when(discoveryProvider).nextBroker();
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider));
ServletHolder servletHolder = new ServletHolder(new AdminProxyHandler(proxyConfig, discoveryProvider, proxyClientAuthentication));
webServer.addServlet("/admin", servletHolder);
webServer.addServlet("/lookup", servletHolder);
webServer.start();
Expand All @@ -120,6 +128,9 @@ protected void setup() throws Exception {
@Override
protected void cleanup() throws Exception {
webServer.stop();
if (proxyClientAuthentication != null) {
proxyClientAuthentication.close();
}
super.internalCleanup();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.apache.pulsar.client.api.Authentication;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.Request;
import org.testng.Assert;
Expand All @@ -46,7 +47,7 @@ public void setupMocks() throws ServletException {
// given
HttpClient httpClient = mock(HttpClient.class);
adminProxyHandler = new AdminProxyHandler(mock(ProxyConfiguration.class),
mock(BrokerDiscoveryProvider.class)) {
mock(BrokerDiscoveryProvider.class), mock(Authentication.class)) {
@Override
protected HttpClient createHttpClient() throws ServletException {
return httpClient;
Expand Down
Loading

0 comments on commit 3e461c0

Please sign in to comment.