Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Spanner connector #16724

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Prev Previous commit
Next Next commit
Added more datatypes parsing. Added create table features like primar…
…y keys,not null fields, timestamp field options etc.
  • Loading branch information
taher-koitawala committed Mar 27, 2023

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature. The key has expired.
commit 5deb3f22f08c5d396d4936e9bf079eb33a1c0395

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -47,11 +47,11 @@ public static ConnectionFactory createConnectionFactory(BaseJdbcConfig config,
Properties connectionProperties = new Properties();
String connectionUrl = config.getConnectionUrl();
JdbcDriver driver = new JdbcDriver();
File credentials = new File(spannerConfig.getCredentialsFile());
//File credentials = new File(spannerConfig.getCredentialsFile());
if (!driver.acceptsURL(connectionUrl)) {
throw new RuntimeException(config.getConnectionUrl() + " is incorrect");
}
connectionProperties.put("credentials", credentials.getAbsolutePath());
//connectionProperties.put("credentials", spannerConfig.getCredentialsFile());
return new ConfiguringConnectionFactory(new DriverConnectionFactory(
driver,
config.getConnectionUrl(),
Original file line number Diff line number Diff line change
@@ -3,20 +3,26 @@
import com.google.common.collect.ImmutableList;
import io.trino.plugin.jdbc.TablePropertiesProvider;
import io.trino.spi.session.PropertyMetadata;
import io.trino.spi.type.ArrayType;

import javax.inject.Inject;

import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Collectors;

import static io.trino.spi.session.PropertyMetadata.booleanProperty;
import static io.trino.spi.session.PropertyMetadata.stringProperty;
import static io.trino.spi.type.VarcharType.VARCHAR;
import static java.util.Objects.requireNonNull;

public class SpannerTableProperties
implements TablePropertiesProvider
{
public static final String PRIMARY_KEY = "primary_key";
public static final String PRIMARY_KEYS = "primary_keys";
public static final String NOT_NULL_FIELDS = "not_null_fields";
public static final String COMMIT_TIMESTAMP_FIELDS = "commit_timestamp_fields";
public static final String INTERLEAVE_IN_PARENT = "interleave_in_parent";
public static final String ON_DELETE_CASCADE = "on_delete_cascade";
private final ImmutableList<PropertyMetadata<?>> sessionProperties;
@@ -26,11 +32,33 @@ public SpannerTableProperties()
{
System.out.println("CALLED TABLE PROPERTIES ");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove.

sessionProperties = ImmutableList.<PropertyMetadata<?>>builder()
.add(stringProperty(
PRIMARY_KEY,
"Primary key for the table being created",
null,
false))
.add(new PropertyMetadata<>(
PRIMARY_KEYS,
"Primary keys for the table being created",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value))
.add(new PropertyMetadata<>(
NOT_NULL_FIELDS,
"Array of fields that should have NOT NULL constraints set on them in Spanner",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value))
.add(new PropertyMetadata<>(
COMMIT_TIMESTAMP_FIELDS,
"Array of timestamp fields that should have 'OPTIONS (allow_commit_timestamp=true)' constraints set on them in Spanner",
new ArrayType(VARCHAR),
List.class,
ImmutableList.of(),
false,
value -> (List<?>) value,
value -> value))
.add(stringProperty(INTERLEAVE_IN_PARENT,
"Table name which needs to be interleaved with this table", null, false))
.add(booleanProperty(ON_DELETE_CASCADE,
@@ -39,10 +67,10 @@ public SpannerTableProperties()
.build();
}

public static String getPrimaryKey(Map<String, Object> tableProperties)
public static List<String> getPrimaryKey(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return (String) tableProperties.get(PRIMARY_KEY);
return toUpperCase((List<String>) tableProperties.get(PRIMARY_KEYS));
}

public static String getInterleaveInParent(Map<String, Object> tableProperties)
@@ -51,12 +79,29 @@ public static String getInterleaveInParent(Map<String, Object> tableProperties)
return (String) tableProperties.get(INTERLEAVE_IN_PARENT);
}

public static List<String> getNotNullFields(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return toUpperCase((List<String>) tableProperties.get(NOT_NULL_FIELDS));
}

public static List<String> getCommitTimestampFields(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return toUpperCase((List<String>) tableProperties.get(COMMIT_TIMESTAMP_FIELDS));
}

public static boolean getOnDeleteCascade(Map<String, Object> tableProperties)
{
requireNonNull(tableProperties, "tableProperties is null");
return (boolean) tableProperties.get(ON_DELETE_CASCADE);
}

private static List<String> toUpperCase(List<String> collection)
{
return collection.stream().map(f -> f.toUpperCase(Locale.ENGLISH)).collect(Collectors.toList());
}

@Override
public List<PropertyMetadata<?>> getTableProperties()
{
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package io.trino.plugin.spanner;

import com.google.cloud.spanner.SpannerApiFutures;
import com.google.spanner.v1.SpannerGrpc;
import org.joda.time.DateTime;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.text.SimpleDateFormat;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.util.Calendar;
import java.util.Date;

public class Test
{
public static void main(String[] args)
throws SQLException
{
Connection connection = DriverManager.getConnection("jdbc:cloudspanner://0.0.0.0:9010/projects/spanner-project/instances/spanner-instance/databases/spanner-database;autoConfigEmulator=true");
Statement statement = connection.createStatement();
statement.execute("drop table t1");
//1679963300421000
//1591142320347
//2020-06-02T23:58:40.347847393Z
Instant parse = Instant.parse("2020-06-02T23:58:40.347847393Z");
//2023-03-28T00:42:31.756+00:00
System.out.println(parse.toEpochMilli());
System.out.println(Instant.ofEpochMilli(1679963300421L));
LocalDateTime localDateTime = LocalDateTime.ofEpochSecond(1679963300421000L, 0, ZoneOffset.UTC);
System.out.println(localDateTime);
statement.execute("create table t1 (id int64,ts TIMESTAMP NOT NULL OPTIONS(allow_commit_timestamp=true))PRIMARY KEY (id)");
statement.execute("select * from ");
/*PreparedStatement preparedStatement = connection.prepareStatement("insert into t1 values(?,?)");
preparedStatement.setInt(1,1);
preparedStatement.setTimestamp(1,1);*/
}
}
30 changes: 30 additions & 0 deletions plugin/trino-spanner/src/main/resources/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
version: '3'
services:
spanner:
image: gcr.io/cloud-spanner-emulator/emulator:latest
ports:
- "9010:9010"
- "9020:9020"

gcloud-spanner-init:
image: gcr.io/google.com/cloudsdktool/cloud-sdk:latest
environment:
PROJECT_ID: "spanner-project"
SPANNER_EMULATOR_URL: "http://localhost:9020/"
INSTANCE_NAME: "spanner-instance"
DATABASE_NAME: "spanner-database"
command: >
bash -c 'gcloud config configurations create emulator &&
gcloud config set auth/disable_credentials true &&
gcloud config set project $${PROJECT_ID} &&
gcloud config set api_endpoint_overrides/spanner $${SPANNER_EMULATOR_URL} &&
gcloud config set auth/disable_credentials true &&
gcloud spanner instances create $${INSTANCE_NAME} --config=emulator-config --description=Emulator --nodes=1
gcloud spanner databases create $${DATABASE_NAME} --instance=$${INSTANCE_NAME}'
spanner-cli:
image: sjdaws/spanner-cli:latest
environment:
SPANNER_EMULATOR_HOST: "spanner:9010"
depends_on:
- "gcloud-spanner-init"
command: ['sh', '-c', 'echo spanner client.... && tail -f /dev/null']
45 changes: 33 additions & 12 deletions plugin/trino-spanner/src/test/java/SpannerSqlQueryRunner.java
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import io.trino.testing.MaterializedResult;
import io.trino.testing.QueryRunner;
import io.trino.tpch.TpchTable;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.Map;
@@ -37,7 +38,8 @@
public final class SpannerSqlQueryRunner
{
private static final String TPCH_SCHEMA = "tpch";
private static final String JDBC_URL = "";
//url for emulated spanner host
private static final String JDBC_URL = "jdbc:cloudspanner://0.0.0.0:9010/projects/spanner-project/instances/spanner-instance/databases/spanner-database;autoConfigEmulator=true";
private static final String USER = "";
private static final String PASSWORD = "";
private static final String SCHEMA = "";
@@ -127,26 +129,31 @@ public static DistributedQueryRunner createSpannerSqlQueryRunner()
public static void main(String[] args)
throws Exception
{
DistributedQueryRunner queryRunner = createSpannerSqlQueryRunner(
ImmutableMap.of("http-server.http.port", "8080"),
ImmutableMap.of(),
ImmutableMap.of(),
TpchTable.getTables()
, xr -> {});
DistributedQueryRunner queryRunner = getQueryRunner();

queryRunner.installPlugin(new JmxPlugin());
queryRunner.createCatalog("jmx", "jmx");
MaterializedResult schemas = queryRunner.execute("SHOW SCHEMAS FROM spanner");
System.out.println(schemas);
MaterializedResult execute = queryRunner.execute("SHOW TABLES FROM spanner.public");
MaterializedResult execute = queryRunner.execute("SHOW TABLES FROM spanner.default");
System.out.println(execute);
MaterializedResult create = queryRunner.execute("create table spanner.public.dept" +
MaterializedResult emp = queryRunner.execute("create table if not exists spanner.default.emp" +
"( id int,name varchar" +
")WITH (primary_key = 'id'," +
") WITH(primary_keys = ARRAY['id'])");
System.out.println(emp);
MaterializedResult create = queryRunner.execute("create table if not exists spanner.default.dept" +
"( id int,name varchar" +
")WITH (primary_keys = ARRAY['id']," +
"interleave_in_parent='emp'," +
"on_delete_cascade=true)");
"on_delete_cascade=true" +
")");
System.out.println(create);
MaterializedResult drop = queryRunner.execute("DROP TABLE spanner.public.dept");
MaterializedResult insert = queryRunner.execute("insert into spanner.default.emp values(1,'Tom')");
System.out.println(insert);
MaterializedResult count = queryRunner.execute("select count(*) from spanner.default.emp");
System.out.println(count);

MaterializedResult drop = queryRunner.execute("DROP TABLE spanner.default.dept");
System.out.println(drop);

System.out.println("DONE");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove System.out.println.

@@ -155,4 +162,18 @@ public static void main(String[] args)
log.info("======== SERVER STARTED ========");
log.info("\n====\n%s\n====", queryRunner.getCoordinator().getBaseUrl());
}

static DistributedQueryRunner getQueryRunner()
throws Exception
{
DistributedQueryRunner queryRunner = createSpannerSqlQueryRunner(
ImmutableMap.of("http-server.http.port", "8080"),
ImmutableMap.of(),
ImmutableMap.of(),
TpchTable.getTables()
, xr -> {});
return queryRunner;
}


}
42 changes: 42 additions & 0 deletions plugin/trino-spanner/src/test/java/TestSpannerDataTypes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.MaterializedResult;
import org.testng.annotations.Test;

public class TestSpannerDataTypes
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename to TestSpannerTypeMapping

{
@Test
public void testDataTypes()
throws Exception
{
DistributedQueryRunner queryRunner = SpannerSqlQueryRunner.getQueryRunner();
queryRunner.execute("DROP TABLE IF EXISTS spanner.default.dTest");
queryRunner.execute("create table spanner.default.dTest" +
"( id int,name varchar,is_active boolean)" +
"WITH (primary_keys = ARRAY['id']," +
"not_null_fields=ARRAY['name','is_active'])");
queryRunner.execute("insert into spanner.default.dtest values(1,'Tom',true)");
MaterializedResult execute = queryRunner.execute("select \"id\" from spanner.default.dtest");
System.out.println(execute);
}
@Test
public void testTimestamp()
throws Exception
{
DistributedQueryRunner queryRunner = SpannerSqlQueryRunner.getQueryRunner();
queryRunner.execute("DROP TABLE IF EXISTS spanner.default.dTest");
queryRunner.execute("create table spanner.default.dTest" +
"( id int,name varchar,is_active boolean,ts timestamp," +
"LastContactDate DATE,PopularityScore DOUBLE)" +
"WITH (primary_keys = ARRAY['id']," +
"not_null_fields=ARRAY['name','is_active','ts'])");

queryRunner.execute("insert into spanner.default.dtest values(1,'Tom',true," +
"CURRENT_TIMESTAMP,CURRENT_DATE,1.111)");
queryRunner.execute("insert into spanner.default.dtest values(2,'Tom cat',true," +
"NULL,CURRENT_DATE,1.111)");

MaterializedResult execute = queryRunner.execute("select * from spanner.default.dtest");
System.out.println(execute);

}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please cover all types in this test.

}