diff --git a/docs/src/main/sphinx/connector/clickhouse.rst b/docs/src/main/sphinx/connector/clickhouse.rst index 46106f1e1a9d..8536fa35fd7d 100644 --- a/docs/src/main/sphinx/connector/clickhouse.rst +++ b/docs/src/main/sphinx/connector/clickhouse.rst @@ -145,8 +145,8 @@ ClickHouse Trino Notes ``Date`` ``DATE`` ``DateTime`` ``TIMESTAMP`` ``IPv4`` ``VARCHAR`` -``IPv6`` ``VARCHAR`` -``Enum8`` ``VARCHAR`` +``IPv6`` ``IPADDRESS`` +``Enum8`` ``IPADDRESS`` ``Enum16`` ``VARCHAR`` ``UUID`` ``UUID`` ================= =============== =================================================================================================== diff --git a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java index 783008905365..978f6494d0f2 100644 --- a/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java +++ b/plugin/trino-clickhouse/src/main/java/io/trino/plugin/clickhouse/ClickHouseClient.java @@ -15,6 +15,8 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.net.InetAddresses; +import io.airlift.slice.Slice; import io.trino.plugin.base.expression.AggregateFunctionRewriter; import io.trino.plugin.base.expression.AggregateFunctionRule; import io.trino.plugin.jdbc.BaseJdbcClient; @@ -54,8 +56,12 @@ import javax.annotation.Nullable; import javax.inject.Inject; +import java.io.UncheckedIOException; +import java.net.InetAddress; +import java.net.UnknownHostException; import java.sql.Connection; import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Types; @@ -69,6 +75,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Strings.isNullOrEmpty; import static com.google.common.base.Verify.verify; +import static io.airlift.slice.Slices.wrappedBuffer; import static io.trino.plugin.clickhouse.ClickHouseSessionProperties.isMapStringAsVarchar; import static io.trino.plugin.clickhouse.ClickHouseTableProperties.SAMPLE_BY_PROPERTY; import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW; @@ -100,6 +107,7 @@ import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction; import static io.trino.plugin.jdbc.StandardColumnMappings.varcharReadFunction; import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction; +import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY; import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED; import static io.trino.spi.type.BigintType.BIGINT; @@ -120,6 +128,7 @@ import static java.lang.Math.max; import static java.lang.String.format; import static java.lang.String.join; +import static java.lang.System.arraycopy; public class ClickHouseClient extends BaseJdbcClient @@ -128,6 +137,7 @@ public class ClickHouseClient private final AggregateFunctionRewriter aggregateFunctionRewriter; private final Type uuidType; + private final Type ipAddressType; @Inject public ClickHouseClient( @@ -138,6 +148,7 @@ public ClickHouseClient( { super(config, "\"", connectionFactory, identifierMapping); this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID)); + this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS)); JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty()); this.aggregateFunctionRewriter = new AggregateFunctionRewriter<>( this::quoted, @@ -368,8 +379,9 @@ public Optional toColumnMapping(ConnectorSession session, Connect switch (jdbcTypeName.replaceAll("\\(.*\\)$", "")) { case "IPv4": + return Optional.of(ipAddressColumnMapping("IPv4StringToNum(?)")); case "IPv6": - // TODO (https://github.com/trinodb/trino/issues/7098) map to Trino IPADDRESS + return Optional.of(ipAddressColumnMapping("IPv6StringToNum(?)")); case "Enum8": case "Enum16": return Optional.of(ColumnMapping.sliceMapping( @@ -524,6 +536,56 @@ else if (prop.size() == 1) { } } + private ColumnMapping ipAddressColumnMapping(String writeBindExpression) + { + return ColumnMapping.sliceMapping( + ipAddressType, + (resultSet, columnIndex) -> { + // copied from IpAddressOperators.castFromVarcharToIpAddress + byte[] address = InetAddresses.forString(resultSet.getString(columnIndex)).getAddress(); + + byte[] bytes; + if (address.length == 4) { + bytes = new byte[16]; + bytes[10] = (byte) 0xff; + bytes[11] = (byte) 0xff; + arraycopy(address, 0, bytes, 12, 4); + } + else if (address.length == 16) { + bytes = address; + } + else { + throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length); + } + + return wrappedBuffer(bytes); + }, + ipAddressWriteFunction(writeBindExpression)); + } + + private static SliceWriteFunction ipAddressWriteFunction(String bindExpression) + { + return new SliceWriteFunction() { + @Override + public String getBindExpression() + { + return bindExpression; + } + + @Override + public void set(PreparedStatement statement, int index, Slice value) + throws SQLException + { + try { + statement.setObject(index, InetAddresses.toAddrString(InetAddress.getByAddress(value.getBytes())), Types.OTHER); + } + catch (UnknownHostException e) { + throw new UncheckedIOException(e); + } + } + }; + } + private ColumnMapping uuidColumnMapping() { return ColumnMapping.sliceMapping( diff --git a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseTypeMapping.java b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseTypeMapping.java index d1c0dfcc35e9..b459ee6381ea 100644 --- a/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseTypeMapping.java +++ b/plugin/trino-clickhouse/src/test/java/io/trino/plugin/clickhouse/TestClickHouseTypeMapping.java @@ -22,6 +22,7 @@ import io.trino.testing.QueryRunner; import io.trino.testing.TestingSession; import io.trino.testing.datatype.CreateAndInsertDataSetup; +import io.trino.testing.datatype.CreateAndTrinoInsertDataSetup; import io.trino.testing.datatype.CreateAsSelectDataSetup; import io.trino.testing.datatype.DataSetup; import io.trino.testing.datatype.SqlDataTypeTest; @@ -52,6 +53,7 @@ import static io.trino.spi.type.VarcharType.VARCHAR; import static io.trino.spi.type.VarcharType.createUnboundedVarcharType; import static io.trino.testing.TestingSession.testSessionBuilder; +import static io.trino.type.IpAddressType.IPADDRESS; import static java.lang.String.format; import static java.time.ZoneOffset.UTC; @@ -484,13 +486,26 @@ public void testUuid() public void testIp() { SqlDataTypeTest.create() - // TODO map to Trino IPADDRESS - .addRoundTrip("IPv4", "'116.253.40.133'", createUnboundedVarcharType(), "VARCHAR '116.253.40.133'") - // TODO map to Trino IPADDRESS - .addRoundTrip("IPv6", "'2001:44c8:129:2632:33:0:252:2'", createUnboundedVarcharType(), "VARCHAR '2001:44c8:129:2632:33:0:252:2'") + .addRoundTrip("IPv4", "'0.0.0.0'", IPADDRESS, "IPADDRESS '0.0.0.0'") + .addRoundTrip("IPv4", "'116.253.40.133'", IPADDRESS, "IPADDRESS '116.253.40.133'") + .addRoundTrip("IPv4", "'255.255.255.255'", IPADDRESS, "IPADDRESS '255.255.255.255'") + .addRoundTrip("IPv6", "'::'", IPADDRESS, "IPADDRESS '::'") + .addRoundTrip("IPv6", "'2001:44c8:129:2632:33:0:252:2'", IPADDRESS, "IPADDRESS '2001:44c8:129:2632:33:0:252:2'") + .addRoundTrip("IPv6", "'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'", IPADDRESS, "IPADDRESS 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'") + .addRoundTrip("Nullable(IPv4)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)") + .addRoundTrip("Nullable(IPv6)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)") .execute(getQueryRunner(), clickhouseCreateAndInsert("tpch.test_ip")); - // TODO add test with IPADDRESS written from Trino + SqlDataTypeTest.create() + .addRoundTrip("IPv4", "IPADDRESS '0.0.0.0'", IPADDRESS, "IPADDRESS '0.0.0.0'") + .addRoundTrip("IPv4", "IPADDRESS '116.253.40.133'", IPADDRESS, "IPADDRESS '116.253.40.133'") + .addRoundTrip("IPv4", "IPADDRESS '255.255.255.255'", IPADDRESS, "IPADDRESS '255.255.255.255'") + .addRoundTrip("IPv6", "IPADDRESS '::'", IPADDRESS, "IPADDRESS '::'") + .addRoundTrip("IPv6", "IPADDRESS '2001:44c8:129:2632:33:0:252:2'", IPADDRESS, "IPADDRESS '2001:44c8:129:2632:33:0:252:2'") + .addRoundTrip("IPv6", "IPADDRESS 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'", IPADDRESS, "IPADDRESS 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'") + .addRoundTrip("Nullable(IPv4)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)") + .addRoundTrip("Nullable(IPv6)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)") + .execute(getQueryRunner(), clickhouseCreateTrinoInsert("tpch.test_ip")); } private static Session mapStringAsVarcharSession() @@ -526,4 +541,9 @@ private DataSetup clickhouseCreateAndInsert(String tableNamePrefix) { return new CreateAndInsertDataSetup(new ClickHouseSqlExecutor(clickhouseServer::execute), tableNamePrefix); } + + private DataSetup clickhouseCreateTrinoInsert(String tableNamePrefix) + { + return new CreateAndTrinoInsertDataSetup(new ClickHouseSqlExecutor(clickhouseServer::execute), new TrinoSqlExecutor(getQueryRunner()), tableNamePrefix); + } }