Skip to content

Commit

Permalink
Map ClickHouse IPv4 and IPv6 types to Trino IPADDRESS type
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Jan 17, 2022
1 parent 7729e15 commit 304e96d
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 8 deletions.
4 changes: 2 additions & 2 deletions docs/src/main/sphinx/connector/clickhouse.rst
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ ClickHouse Trino Notes
``Date`` ``DATE``
``DateTime`` ``TIMESTAMP``
``IPv4`` ``VARCHAR``
``IPv6`` ``VARCHAR``
``Enum8`` ``VARCHAR``
``IPv6`` ``IPADDRESS``
``Enum8`` ``IPADDRESS``

This comment has been minimized.

Copy link
@wendigo

wendigo Jan 17, 2022

Contributor

Is this correct?

This comment has been minimized.

Copy link
@ebyhr

ebyhr Jan 17, 2022

Author Member

My mistake. I will send another PR.

This comment has been minimized.

Copy link
@wendigo

wendigo Jan 17, 2022

Contributor
``Enum16`` ``VARCHAR``
``UUID`` ``UUID``
================= =============== ===================================================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.net.InetAddresses;
import io.airlift.slice.Slice;
import io.trino.plugin.base.expression.AggregateFunctionRewriter;
import io.trino.plugin.base.expression.AggregateFunctionRule;
import io.trino.plugin.jdbc.BaseJdbcClient;
Expand Down Expand Up @@ -54,8 +56,12 @@
import javax.annotation.Nullable;
import javax.inject.Inject;

import java.io.UncheckedIOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
Expand All @@ -69,6 +75,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.base.Verify.verify;
import static io.airlift.slice.Slices.wrappedBuffer;
import static io.trino.plugin.clickhouse.ClickHouseSessionProperties.isMapStringAsVarchar;
import static io.trino.plugin.clickhouse.ClickHouseTableProperties.SAMPLE_BY_PROPERTY;
import static io.trino.plugin.jdbc.DecimalConfig.DecimalMapping.ALLOW_OVERFLOW;
Expand Down Expand Up @@ -100,6 +107,7 @@
import static io.trino.plugin.jdbc.StandardColumnMappings.varbinaryWriteFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharReadFunction;
import static io.trino.plugin.jdbc.StandardColumnMappings.varcharWriteFunction;
import static io.trino.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static io.trino.spi.StandardErrorCode.INVALID_TABLE_PROPERTY;
import static io.trino.spi.StandardErrorCode.NOT_SUPPORTED;
import static io.trino.spi.type.BigintType.BIGINT;
Expand All @@ -120,6 +128,7 @@
import static java.lang.Math.max;
import static java.lang.String.format;
import static java.lang.String.join;
import static java.lang.System.arraycopy;

public class ClickHouseClient
extends BaseJdbcClient
Expand All @@ -128,6 +137,7 @@ public class ClickHouseClient

private final AggregateFunctionRewriter<JdbcExpression> aggregateFunctionRewriter;
private final Type uuidType;
private final Type ipAddressType;

@Inject
public ClickHouseClient(
Expand All @@ -138,6 +148,7 @@ public ClickHouseClient(
{
super(config, "\"", connectionFactory, identifierMapping);
this.uuidType = typeManager.getType(new TypeSignature(StandardTypes.UUID));
this.ipAddressType = typeManager.getType(new TypeSignature(StandardTypes.IPADDRESS));
JdbcTypeHandle bigintTypeHandle = new JdbcTypeHandle(Types.BIGINT, Optional.of("bigint"), Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty());
this.aggregateFunctionRewriter = new AggregateFunctionRewriter<>(
this::quoted,
Expand Down Expand Up @@ -368,8 +379,9 @@ public Optional<ColumnMapping> toColumnMapping(ConnectorSession session, Connect

switch (jdbcTypeName.replaceAll("\\(.*\\)$", "")) {
case "IPv4":
return Optional.of(ipAddressColumnMapping("IPv4StringToNum(?)"));
case "IPv6":
// TODO (https://github.com/trinodb/trino/issues/7098) map to Trino IPADDRESS
return Optional.of(ipAddressColumnMapping("IPv6StringToNum(?)"));
case "Enum8":
case "Enum16":
return Optional.of(ColumnMapping.sliceMapping(
Expand Down Expand Up @@ -524,6 +536,56 @@ else if (prop.size() == 1) {
}
}

private ColumnMapping ipAddressColumnMapping(String writeBindExpression)
{
return ColumnMapping.sliceMapping(
ipAddressType,
(resultSet, columnIndex) -> {
// copied from IpAddressOperators.castFromVarcharToIpAddress
byte[] address = InetAddresses.forString(resultSet.getString(columnIndex)).getAddress();

byte[] bytes;
if (address.length == 4) {
bytes = new byte[16];
bytes[10] = (byte) 0xff;
bytes[11] = (byte) 0xff;
arraycopy(address, 0, bytes, 12, 4);
}
else if (address.length == 16) {
bytes = address;
}
else {
throw new TrinoException(GENERIC_INTERNAL_ERROR, "Invalid InetAddress length: " + address.length);
}

return wrappedBuffer(bytes);
},
ipAddressWriteFunction(writeBindExpression));
}

private static SliceWriteFunction ipAddressWriteFunction(String bindExpression)
{
return new SliceWriteFunction() {
@Override
public String getBindExpression()
{
return bindExpression;
}

@Override
public void set(PreparedStatement statement, int index, Slice value)
throws SQLException
{
try {
statement.setObject(index, InetAddresses.toAddrString(InetAddress.getByAddress(value.getBytes())), Types.OTHER);
}
catch (UnknownHostException e) {
throw new UncheckedIOException(e);
}
}
};
}

private ColumnMapping uuidColumnMapping()
{
return ColumnMapping.sliceMapping(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingSession;
import io.trino.testing.datatype.CreateAndInsertDataSetup;
import io.trino.testing.datatype.CreateAndTrinoInsertDataSetup;
import io.trino.testing.datatype.CreateAsSelectDataSetup;
import io.trino.testing.datatype.DataSetup;
import io.trino.testing.datatype.SqlDataTypeTest;
Expand Down Expand Up @@ -52,6 +53,7 @@
import static io.trino.spi.type.VarcharType.VARCHAR;
import static io.trino.spi.type.VarcharType.createUnboundedVarcharType;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static io.trino.type.IpAddressType.IPADDRESS;
import static java.lang.String.format;
import static java.time.ZoneOffset.UTC;

Expand Down Expand Up @@ -483,13 +485,26 @@ public void testUuid()
public void testIp()
{
SqlDataTypeTest.create()
// TODO map to Trino IPADDRESS
.addRoundTrip("IPv4", "'116.253.40.133'", createUnboundedVarcharType(), "VARCHAR '116.253.40.133'")
// TODO map to Trino IPADDRESS
.addRoundTrip("IPv6", "'2001:44c8:129:2632:33:0:252:2'", createUnboundedVarcharType(), "VARCHAR '2001:44c8:129:2632:33:0:252:2'")
.addRoundTrip("IPv4", "'0.0.0.0'", IPADDRESS, "IPADDRESS '0.0.0.0'")
.addRoundTrip("IPv4", "'116.253.40.133'", IPADDRESS, "IPADDRESS '116.253.40.133'")
.addRoundTrip("IPv4", "'255.255.255.255'", IPADDRESS, "IPADDRESS '255.255.255.255'")
.addRoundTrip("IPv6", "'::'", IPADDRESS, "IPADDRESS '::'")
.addRoundTrip("IPv6", "'2001:44c8:129:2632:33:0:252:2'", IPADDRESS, "IPADDRESS '2001:44c8:129:2632:33:0:252:2'")
.addRoundTrip("IPv6", "'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'", IPADDRESS, "IPADDRESS 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'")
.addRoundTrip("Nullable(IPv4)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)")
.addRoundTrip("Nullable(IPv6)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)")
.execute(getQueryRunner(), clickhouseCreateAndInsert("tpch.test_ip"));

// TODO add test with IPADDRESS written from Trino
SqlDataTypeTest.create()
.addRoundTrip("IPv4", "IPADDRESS '0.0.0.0'", IPADDRESS, "IPADDRESS '0.0.0.0'")
.addRoundTrip("IPv4", "IPADDRESS '116.253.40.133'", IPADDRESS, "IPADDRESS '116.253.40.133'")
.addRoundTrip("IPv4", "IPADDRESS '255.255.255.255'", IPADDRESS, "IPADDRESS '255.255.255.255'")
.addRoundTrip("IPv6", "IPADDRESS '::'", IPADDRESS, "IPADDRESS '::'")
.addRoundTrip("IPv6", "IPADDRESS '2001:44c8:129:2632:33:0:252:2'", IPADDRESS, "IPADDRESS '2001:44c8:129:2632:33:0:252:2'")
.addRoundTrip("IPv6", "IPADDRESS 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'", IPADDRESS, "IPADDRESS 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff'")
.addRoundTrip("Nullable(IPv4)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)")
.addRoundTrip("Nullable(IPv6)", "NULL", IPADDRESS, "CAST(NULL AS IPADDRESS)")
.execute(getQueryRunner(), clickhouseCreateTrinoInsert("tpch.test_ip"));
}

private static Session mapStringAsVarcharSession()
Expand Down Expand Up @@ -525,4 +540,9 @@ private DataSetup clickhouseCreateAndInsert(String tableNamePrefix)
{
return new CreateAndInsertDataSetup(new ClickHouseSqlExecutor(clickhouseServer::execute), tableNamePrefix);
}

private DataSetup clickhouseCreateTrinoInsert(String tableNamePrefix)
{
return new CreateAndTrinoInsertDataSetup(new ClickHouseSqlExecutor(clickhouseServer::execute), new TrinoSqlExecutor(getQueryRunner()), tableNamePrefix);
}
}

0 comments on commit 304e96d

Please sign in to comment.