Skip to content

Commit

Permalink
support custom socket factory
Browse files Browse the repository at this point in the history
  • Loading branch information
zhicwu committed Aug 14, 2023
1 parent be7d6aa commit 2f0aa9f
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
Expand Down Expand Up @@ -62,6 +65,19 @@ boolean isDone() {

private static final Logger log = LoggerFactory.getLogger(AbstractSocketClient.class);

private static final Map<String, ClickHouseSocketFactory> cache = Collections.synchronizedMap(new WeakHashMap<>());

public static final ClickHouseSocketFactory getCustomSocketFactory(String className,
ClickHouseSocketFactory defaultFactory, Class<?> forClass) {
if (ClickHouseChecker.isNullOrEmpty(className) || forClass == null) {
return defaultFactory;
}

ClickHouseSocketFactory factory = cache.computeIfAbsent(className,
k -> ClickHouseUtils.newInstance(k, ClickHouseSocketFactory.class, defaultFactory.getClass()));
return factory.supports(forClass) ? factory : defaultFactory;
}

/**
* Sets socket options. May be called at any time(e.g. before or even after the
* socket is bound or connected).
Expand All @@ -71,7 +87,7 @@ boolean isDone() {
* @return the given socket
* @throws SocketException when there's error setting socket options
*/
public static Socket setSocketOptions(ClickHouseConfig config, Socket socket) throws SocketException {
public static final Socket setSocketOptions(ClickHouseConfig config, Socket socket) throws SocketException {
if (socket == null || socket.isClosed()) {
throw new IllegalArgumentException("Cannot set option(s) on a null or closed socket");
} else if (config == null) {
Expand Down Expand Up @@ -119,7 +135,8 @@ public static Socket setSocketOptions(ClickHouseConfig config, Socket socket) th
* @return the given socket channel
* @throws IOException when there's error setting socket options
*/
public static SocketChannel setSocketOptions(ClickHouseConfig config, SocketChannel socket) throws IOException {
public static final SocketChannel setSocketOptions(ClickHouseConfig config, SocketChannel socket)
throws IOException {
if (socket == null || socket.socket().isClosed()) {
throw new IllegalArgumentException("Cannot set option(s) on a null or closed socket channel");
} else if (config == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,9 @@ public static Map<ClickHouseOption, Serializable> toClientOptions(Map<?, ?> prop
// common options optimized for read
private final boolean async;
private final boolean autoDiscovery;
private final Map<String, String> customSettings; // serializable
private final Map<String, String> customSettings;
private final String customSocketFactory;
private final Map<String, String> customSocketFactoryOptions;
private final String clientName;
private final boolean compressRequest;
private final ClickHouseCompression compressAlgorithm;
Expand Down Expand Up @@ -295,6 +297,9 @@ public ClickHouseConfig(Map<ClickHouseOption, Serializable> options, ClickHouseC
this.async = (boolean) getOption(ClickHouseClientOption.ASYNC, ClickHouseDefaults.ASYNC);
this.autoDiscovery = getBoolOption(ClickHouseClientOption.AUTO_DISCOVERY);
this.customSettings = ClickHouseOption.toKeyValuePairs(getStrOption(ClickHouseClientOption.CUSTOM_SETTINGS));
this.customSocketFactory = getStrOption(ClickHouseClientOption.CUSTOM_SOCKET_FACTORY);
this.customSocketFactoryOptions = ClickHouseOption
.toKeyValuePairs(getStrOption(ClickHouseClientOption.CUSTOM_SOCKET_FACTORY_OPTIONS));
this.clientName = getStrOption(ClickHouseClientOption.CLIENT_NAME);
this.compressRequest = getBoolOption(ClickHouseClientOption.DECOMPRESS);
this.compressAlgorithm = getOption(ClickHouseClientOption.DECOMPRESS_ALGORITHM, ClickHouseCompression.class);
Expand Down Expand Up @@ -386,6 +391,14 @@ public Map<String, String> getCustomSettings() {
return customSettings;
}

public String getCustomSocketFactory() {
return customSocketFactory;
}

public Map<String, String> getCustomSocketFactoryOptions() {
return customSocketFactoryOptions;
}

public String getClientName() {
return clientName;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.clickhouse.client;

import java.io.IOException;

/**
* Generic factory interface for creating sockets used by the TCP and Apache
* Http clients.
*/
public interface ClickHouseSocketFactory {
/**
* Creates a new instance of the provided configuration and class type.
*
* @param <T> type of class to create
* @param config configuration
* @param clazz class instance for the type to instantiate
* @return non-null new instance of the given class
* @throws IOException when failed to create the instance
* @throws UnsupportedOperationException when the given class is not supported
*/
<T> T create(ClickHouseConfig config, Class<T> clazz) throws IOException, UnsupportedOperationException;

/**
* Tests whether this factory supports creating instances of the given class
* type. For example, before calling {@link #create(ClickHouseConfig, Class)},
* you may want to call this method first to verify the factory can produce the
* desired type.
*
* @param clazz non-null class reflecting the type to check support for
* @return true if the factory supports creating instances of the given type;
* false otherwise
*/
boolean supports(Class<?> clazz);
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,18 @@ public enum ClickHouseClientOption implements ClickHouseOption {
/**
* Custom server settings for all queries.
*/
CUSTOM_SETTINGS("custom_settings", "", "Custom server settings for all queries."),
CUSTOM_SETTINGS("custom_settings", "", "Comma separated custom server settings for all queries."),
/**
* Custom socket factory.
*/
CUSTOM_SOCKET_FACTORY("custom_socket_factory", "",
"Full qualified class name of custom socket factory. This is only supported by TCP client and Apache Http Client."),
/**
* Additional socket factory options. Only useful only when
* {@link #CUSTOM_SOCKET_FACTORY} is set.
*/
CUSTOM_SOCKET_FACTORY_OPTIONS("custom_socket_factory_options", "",
"Comma separated options for custom socket factory."),
/**
* Load balancing policy.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.clickhouse.client;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.testng.Assert;
import org.testng.annotations.Test;

public class AbstractSocketClientTest {
public static class CustomListFactory implements ClickHouseSocketFactory {
@Override
public <T> T create(ClickHouseConfig config, Class<T> clazz) throws IOException, UnsupportedOperationException {
return null;
}

@Override
public boolean supports(Class<?> clazz) {
return List.class.isAssignableFrom(clazz);
}
}

@Test(groups = { "unit" })
public void testGetCustomSocketFactory() {
CustomListFactory defaultFactory = new CustomListFactory();

Assert.assertEquals(AbstractSocketClient.getCustomSocketFactory(null, null, null), null);
Assert.assertEquals(AbstractSocketClient.getCustomSocketFactory(null, defaultFactory, null), defaultFactory);
Assert.assertEquals(AbstractSocketClient.getCustomSocketFactory("", defaultFactory, null), defaultFactory);

Assert.assertEquals(AbstractSocketClient.getCustomSocketFactory(CustomListFactory.class.getName(),
defaultFactory, List.class),
AbstractSocketClient.getCustomSocketFactory(CustomListFactory.class.getName(),
defaultFactory, List.class));

ClickHouseSocketFactory factory = AbstractSocketClient.getCustomSocketFactory(CustomListFactory.class.getName(),
defaultFactory, List.class);
Assert.assertEquals(factory.getClass(), defaultFactory.getClass());
Assert.assertNotEquals(factory, defaultFactory);

factory = AbstractSocketClient.getCustomSocketFactory(CustomListFactory.class.getName(),
defaultFactory, Map.class);
Assert.assertEquals(factory, defaultFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.lang.reflect.InvocationTargetException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.net.URLDecoder;
Expand Down Expand Up @@ -296,6 +297,32 @@ public static Map<String, String> extractParameters(String query, Map<String, St
return params;
}

public static <T> T newInstance(String className, Class<T> returnType, Class<?> callerClass) {
if (className == null || className.isEmpty() || returnType == null) {
throw new IllegalArgumentException("Non-empty class name and return type are required");
} else if (callerClass == null) {
callerClass = returnType;
}

try {
Class<?> clazz = Class.forName(className, false, callerClass.getClassLoader());
if (!returnType.isAssignableFrom(clazz)) {
throw new IllegalArgumentException(
format("Invalid %s class type. Input class should be a superclass of %s.", className,
returnType));
}

return returnType.cast(clazz.getConstructor().newInstance());
} catch (ClassNotFoundException e) {
throw new IllegalArgumentException(format("Class %s is not found in the classpath.", className));
} catch (IllegalAccessException | NoSuchMethodException e) {
throw new IllegalArgumentException(
format("Class %s does not have a public constructor without any argument.", className));
} catch (InstantiationException | InvocationTargetException e) {
throw new IllegalArgumentException(format("Error while creating an %s class instance.", className), e);
}
}

/**
* Gets absolute and normalized path to the given file.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -58,6 +59,23 @@ public void testExtractParameters() {
Assert.assertEquals(ClickHouseUtils.extractParameters("*&a=1&!b", expected), expected);
}

@Test(groups = { "unit" })
public void testNewInstance() {
Assert.assertThrows(IllegalArgumentException.class, () -> ClickHouseUtils.newInstance(null, null, null));
Assert.assertThrows(IllegalArgumentException.class, () -> ClickHouseUtils.newInstance("", Object.class, null));
Assert.assertThrows(IllegalArgumentException.class,
() -> ClickHouseUtils.newInstance("java.util.List", Object.class, null));
Assert.assertThrows(IllegalArgumentException.class,
() -> ClickHouseUtils.newInstance("java.util.NoSuchListClass", Object.class, null));
Assert.assertThrows(IllegalArgumentException.class,
() -> ClickHouseUtils.newInstance("java.lang.Object", ArrayList.class, null));
Assert.assertThrows(IllegalArgumentException.class,
() -> ClickHouseUtils.newInstance("java.util.ArrayList", Collections.class, null));

Assert.assertEquals(ClickHouseUtils.newInstance("java.util.ArrayList", List.class, null).getClass(),
ArrayList.class);
}

@Test(groups = { "unit" })
public void testGetFile() throws IOException {
Assert.assertThrows(IllegalArgumentException.class, () -> ClickHouseUtils.getFile(null));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package com.clickhouse.client.http;

import com.clickhouse.client.AbstractSocketClient;

import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseConfig;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseRequest;
import com.clickhouse.client.ClickHouseSocketFactory;
import com.clickhouse.client.ClickHouseSslContextProvider;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseProxyType;
Expand Down Expand Up @@ -42,6 +42,9 @@
import org.apache.hc.core5.util.Timeout;
import org.apache.hc.core5.util.VersionInfo;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;

import java.io.BufferedReader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
Expand All @@ -61,9 +64,6 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLException;

/**
* Created by wujianchao on 2022/12/1.
*/
Expand All @@ -80,17 +80,20 @@ protected ApacheHttpConnectionImpl(ClickHouseNode server, ClickHouseRequest<?> r
}

private CloseableHttpClient newConnection(ClickHouseConfig c) throws IOException {
final ClickHouseSocketFactory socketFactory = AbstractSocketClient.getCustomSocketFactory(
c.getCustomSocketFactory(), ApacheHttpClientSocketFactory.instance, PlainConnectionSocketFactory.class);

RegistryBuilder<ConnectionSocketFactory> r = RegistryBuilder.<ConnectionSocketFactory>create()
.register("http", SocketFactory.create(c));
.register("http", socketFactory.create(c, PlainConnectionSocketFactory.class));
if (c.isSsl()) {
r.register("https", SSLSocketFactory.create(c));
r.register("https", socketFactory.create(c, SSLConnectionSocketFactory.class));
}

HttpConnectionManager connManager = new HttpConnectionManager(r.build(), c);
int max_connection = config.getIntOption(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS);
int maxConnection = config.getIntOption(ClickHouseHttpOption.MAX_OPEN_CONNECTIONS);

connManager.setMaxTotal(max_connection);
connManager.setDefaultMaxPerRoute(max_connection);
connManager.setMaxTotal(maxConnection);
connManager.setDefaultMaxPerRoute(maxConnection);

HttpClientBuilder builder = HttpClientBuilder.create().setConnectionManager(connManager)
.disableContentCompression();
Expand Down Expand Up @@ -267,6 +270,31 @@ public void close() throws IOException {
client.close();
}

static class ApacheHttpClientSocketFactory implements ClickHouseSocketFactory {
static final ApacheHttpClientSocketFactory instance = new ApacheHttpClientSocketFactory();

@Override
public <T> T create(ClickHouseConfig config, Class<T> clazz) throws IOException, UnsupportedOperationException {
if (config == null || clazz == null) {
throw new IllegalArgumentException("Non-null configuration and class are required");
} else if (SSLConnectionSocketFactory.class.equals(clazz)) {
return clazz.cast(new SSLSocketFactory(config));
} else if (PlainConnectionSocketFactory.class.equals(clazz)) {
return clazz.cast(new SocketFactory(config));
}

throw new UnsupportedOperationException(ClickHouseUtils.format("Class %s is not supported", clazz));
}

@Override
public boolean supports(Class<?> clazz) {
return PlainConnectionSocketFactory.class.equals(clazz) || SSLConnectionSocketFactory.class.equals(clazz);
}

private ApacheHttpClientSocketFactory() {
}
}

static class SocketFactory extends PlainConnectionSocketFactory {
private final ClickHouseConfig config;

Expand Down
Loading

0 comments on commit 2f0aa9f

Please sign in to comment.