From eb44fc4f98a13386b244e8e39f0230a3982178e7 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Sat, 8 Oct 2022 23:23:29 +0800 Subject: [PATCH 01/22] gbase 8a connector --- .../dialect/gbase8a/Gbase8aDialect.java | 39 ++++++ .../gbase8a/Gbase8aDialectFactory.java | 36 ++++++ .../gbase8a/Gbase8aJdbcRowConverter.java | 38 ++++++ .../dialect/gbase8a/Gbase8aTypeMapper.java | 113 ++++++++++++++++++ 4 files changed, 226 insertions(+) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java new file mode 100644 index 00000000000..b90097445c8 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialect.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +public class Gbase8aDialect implements JdbcDialect { + @Override + public String dialectName() { + return "Gbase8a"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new Gbase8aJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new Gbase8aTypeMapper(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java new file mode 100644 index 00000000000..31787f49740 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aDialectFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +@AutoService(JdbcDialectFactory.class) +public class Gbase8aDialectFactory implements JdbcDialectFactory { + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:gbase:"); + } + + @Override + public JdbcDialect create() { + return new Gbase8aDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java new file mode 100644 index 00000000000..2365d9cea34 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class Gbase8aJdbcRowConverter extends AbstractJdbcRowConverter { + @Override + public String converterName() { + return "Gbase8a"; + } + + @Override + public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { + return super.toInternal(rs, metaData, typeInfo); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java new file mode 100644 index 00000000000..459faf3b9af --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import lombok.extern.slf4j.Slf4j; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +@Slf4j +public class Gbase8aTypeMapper implements JdbcDialectTypeMapper { + + // ============================data types===================== + private static final String GBASE8A_UNKNOWN = "UNKNOWN"; + + // -------------------------number---------------------------- + private static final String GBASE8A_INT = "INT"; + private static final String GBASE8A_TINYINT = "TINYINT"; + private static final String GBASE8A_SMALLINT = "SMALLINT"; + private static final String GBASE8A_MEDIUMINT = "MEDIUMINT"; + private static final String GBASE8A_BIGINT = "BIGINT"; + private static final String GBASE8A_DECIMAL = "DECIMAL"; + private static final String GBASE8A_FLOAT = "FLOAT"; + private static final String GBASE8A_DOUBLE = "DOUBLE"; + + // -------------------------string---------------------------- + private static final String GBASE8A_CHAR = "CHAR"; + private static final String GBASE8A_VARCHAR = "VARCHAR"; + + + // ------------------------------time------------------------- + private static final String GBASE8A_DATE = "DATE"; + private static final String GBASE8A_TIME = "TIME"; + private static final String GBASE8A_TIMESTAMP = "TIMESTAMP"; + private static final String GBASE8A_DATETIME = "DATETIME"; + + // ------------------------------blob------------------------- + private static final String GBASE8A_BLOB = "BLOB"; + private static final String GBASE8A_LONGBLOB = "LONGBLOB"; + private static final String GBASE8A_TEXT = "TEXT"; + + @SuppressWarnings("checkstyle:MagicNumber") + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) throws SQLException { + String gbase8aType = metadata.getColumnTypeName(colIndex).toUpperCase(); + int precision = metadata.getPrecision(colIndex); + int scale = metadata.getScale(colIndex); + switch (gbase8aType) { + case GBASE8A_TINYINT: + return BasicType.BYTE_TYPE; + case GBASE8A_SMALLINT: + return BasicType.SHORT_TYPE; + case GBASE8A_MEDIUMINT: + case GBASE8A_INT: + return BasicType.INT_TYPE; + case GBASE8A_BIGINT: + return BasicType.LONG_TYPE; + case GBASE8A_DECIMAL: + if (precision < 38) { + return new DecimalType(precision, scale); + } + return new DecimalType(38, 18); + case GBASE8A_DOUBLE: + return BasicType.DOUBLE_TYPE; + case GBASE8A_FLOAT: + return BasicType.FLOAT_TYPE; + case GBASE8A_CHAR: + case GBASE8A_VARCHAR: + return BasicType.STRING_TYPE; + case GBASE8A_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case GBASE8A_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case GBASE8A_TIMESTAMP: + case GBASE8A_DATETIME: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case GBASE8A_BLOB: + case GBASE8A_TEXT: + case GBASE8A_LONGBLOB: + return PrimitiveByteArrayType.INSTANCE; + //Doesn't support yet + case GBASE8A_UNKNOWN: + default: + final String jdbcColumnName = metadata.getColumnName(colIndex); + throw new UnsupportedOperationException( + String.format( + "Doesn't support GBASE8A type '%s' on column '%s' yet.", + gbase8aType, jdbcColumnName)); + } + } +} From 74928f9fee8851548a61287deb729dab93adedb5 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 10 Oct 2022 15:37:50 +0800 Subject: [PATCH 02/22] add gbase8a e2e test --- .../dialect/gbase8a/Gbase8aTypeMapper.java | 5 +- .../seatunnel/jdbc/AbstractJdbcIT.java | 7 +- .../seatunnel/jdbc/JdbcGbse8adbIT.java | 181 ++++++++++++++++++ .../jdbc_gbase8a_source_to_sink.conf | 60 ++++++ 4 files changed, 246 insertions(+), 7 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java create mode 100644 seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java index 459faf3b9af..65677ea229f 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aTypeMapper.java @@ -32,6 +32,7 @@ @Slf4j public class Gbase8aTypeMapper implements JdbcDialectTypeMapper { + //ref http://www.gbase.cn/down/4419.html // ============================data types===================== private static final String GBASE8A_UNKNOWN = "UNKNOWN"; @@ -39,7 +40,6 @@ public class Gbase8aTypeMapper implements JdbcDialectTypeMapper { private static final String GBASE8A_INT = "INT"; private static final String GBASE8A_TINYINT = "TINYINT"; private static final String GBASE8A_SMALLINT = "SMALLINT"; - private static final String GBASE8A_MEDIUMINT = "MEDIUMINT"; private static final String GBASE8A_BIGINT = "BIGINT"; private static final String GBASE8A_DECIMAL = "DECIMAL"; private static final String GBASE8A_FLOAT = "FLOAT"; @@ -58,7 +58,6 @@ public class Gbase8aTypeMapper implements JdbcDialectTypeMapper { // ------------------------------blob------------------------- private static final String GBASE8A_BLOB = "BLOB"; - private static final String GBASE8A_LONGBLOB = "LONGBLOB"; private static final String GBASE8A_TEXT = "TEXT"; @SuppressWarnings("checkstyle:MagicNumber") @@ -72,7 +71,6 @@ public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) th return BasicType.BYTE_TYPE; case GBASE8A_SMALLINT: return BasicType.SHORT_TYPE; - case GBASE8A_MEDIUMINT: case GBASE8A_INT: return BasicType.INT_TYPE; case GBASE8A_BIGINT: @@ -98,7 +96,6 @@ public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) th return LocalTimeType.LOCAL_DATE_TIME_TYPE; case GBASE8A_BLOB: case GBASE8A_TEXT: - case GBASE8A_LONGBLOB: return PrimitiveByteArrayType.INSTANCE; //Doesn't support yet case GBASE8A_UNKNOWN: diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 3082881da1a..723d426a054 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -37,6 +37,7 @@ import org.testcontainers.lifecycle.Startables; import java.io.IOException; +import java.net.MalformedURLException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; @@ -68,7 +69,7 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour Assertions.assertEquals(0, extraCommands.getExitCode()); }; - private void getContainer() throws ClassNotFoundException, SQLException { + private void getContainer() throws SQLException { jdbcCase = this.getJdbcCase(); dbServer = new GenericContainer<>(jdbcCase.getDockerImage()) .withNetwork(NETWORK) @@ -78,7 +79,6 @@ private void getContainer() throws ClassNotFoundException, SQLException { dbServer.setPortBindings(Lists.newArrayList( String.format("%s:%s", jdbcCase.getPort(), jdbcCase.getPort()))); Startables.deepStart(Stream.of(dbServer)).join(); - Class.forName(jdbcCase.getDriverClass()); given().ignoreExceptions() .await() .atMost(180, TimeUnit.SECONDS) @@ -86,7 +86,8 @@ private void getContainer() throws ClassNotFoundException, SQLException { initializeJdbcTable(); } - protected void initializeJdbcConnection() throws SQLException { + protected void initializeJdbcConnection() throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException { + Class.forName(jdbcCase.getDriverClass()); jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()); jdbcConnection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword()); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java new file mode 100644 index 00000000000..7fff3fa355d --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc; + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import org.junit.jupiter.api.Assertions; +import org.testcontainers.shaded.com.google.common.collect.Lists; +import org.testcontainers.shaded.org.apache.commons.io.IOUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; +import java.nio.charset.StandardCharsets; +import java.sql.Driver; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; + +public class JdbcGbse8adbIT extends AbstractJdbcIT { + + private static final String DOCKER_IMAGE = "shihd/gbase8a"; + private static final String NETWORK_ALIASES = "e2e_gbase8aDb"; + private static final String DRIVER_CLASS = "com.gbase.jdbc.Driver"; + private static final int PORT = 5258; + private static final String URL = "jdbc:gbase://" + HOST + ":%s/%s?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true"; + private static final String USERNAME = "root"; + private static final String PASSWORD = "root"; + private static final String DATABASE = "gbase"; + private static final String SOURCE_TABLE = "e2e_table_source"; + private static final String SINK_TABLE = "e2e_table_sink"; + private static final String DRIVER_JAR = "https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar"; + private static final String CONFIG_FILE = "/jdbc_gbase8a_source_to_sink.conf"; + private static final String DDL_SOURCE = "CREATE TABLE" + SOURCE_TABLE + "(\n" + + " \"varchar_10_col\" varchar(10) DEFAULT NULL,\n" + + " \"char_10_col\" char(10) DEFAULT NULL,\n" + + " \"text_col\" text,\n" + + " \"decimal_col\" decimal(10,0) DEFAULT NULL,\n" + + " \"float_col\" float(12,0) DEFAULT NULL,\n" + + " \"int_col\" int(11) DEFAULT NULL,\n" + + " \"tinyint_col\" tinyint(4) DEFAULT NULL,\n" + + " \"smallint_col\" smallint(6) DEFAULT NULL,\n" + + " \"double_col\" double(22,0) DEFAULT NULL,\n" + + " \"bigint_col\" bigint(20) DEFAULT NULL,\n" + + " \"date_col\" date DEFAULT NULL,\n" + + " \"time_col\" time DEFAULT NULL,\n" + + " \"timestamp_col\" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" + + " \"datetime_col\" datetime DEFAULT NULL,\n" + + " \"blob_col\" blob\n" + + ")"; + private static final String DDL_SINK = "CREATE TABLE" + SINK_TABLE + "(\n" + + " \"varchar_10_col\" varchar(10) DEFAULT NULL,\n" + + " \"char_10_col\" char(10) DEFAULT NULL,\n" + + " \"text_col\" text,\n" + + " \"decimal_col\" decimal(10,0) DEFAULT NULL,\n" + + " \"float_col\" float(12,0) DEFAULT NULL,\n" + + " \"int_col\" int(11) DEFAULT NULL,\n" + + " \"tinyint_col\" tinyint(4) DEFAULT NULL,\n" + + " \"smallint_col\" smallint(6) DEFAULT NULL,\n" + + " \"double_col\" double(22,0) DEFAULT NULL,\n" + + " \"bigint_col\" bigint(20) DEFAULT NULL,\n" + + " \"date_col\" date DEFAULT NULL,\n" + + " \"time_col\" time DEFAULT NULL,\n" + + " \"timestamp_col\" timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,\n" + + " \"datetime_col\" datetime DEFAULT NULL,\n" + + " \"blob_col\" blob\n" + + ")"; + private static final String INIT_DATA_SQL = "insert into " + SOURCE_TABLE + " (\n" + + " varchar_10_col,\n" + + " char_10_col,\n" + + " text_col,\n" + + " decimal_col,\n" + + " float_col,\n" + + " int_col,\n" + + " tinyint_col,\n" + + " smallint_col,\n" + + " double_col,\n" + + " bigint_col,\n" + + " date_col,\n" + + " time_col,\n" + + " timestamp_col,\n" + + " datetime_col,\n" + + " blob_col\n" + + ")values(\n" + + "\t?,?,?,?,?,?,?,?,?,?,?,?,?,?,?\n" + + ")"; + + @Override + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + containerEnv.put("ORACLE_PASSWORD", PASSWORD); + containerEnv.put("APP_USER", USERNAME); + containerEnv.put("APP_USER_PASSWORD", PASSWORD); + String jdbcUrl = String.format(URL, PORT, DATABASE); + return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS) + .host(HOST).port(PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE) + .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(DRIVER_JAR) + .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build(); + } + + @Override + void compareResult() throws SQLException, IOException { + String sourceSql = "select * from " + SOURCE_TABLE; + String sinkSql = "select * from " + SINK_TABLE; + List columns = Lists.newArrayList("varchar_10_col", "char_10_col", "text_col", "decimal_col", "float_col", "int_col", "tinyint_col", "smallint_col", "double_col", "bigint_col", "date_col", "time_col", "timestamp_col", "datetime_col", "blob_col"); + Statement sourceStatement = jdbcConnection.createStatement(); + Statement sinkStatement = jdbcConnection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columns) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { + + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + Assertions.assertTrue(true); + } + } + } + clearSinkTable(); + } + + @Override + void clearSinkTable() { + try (Statement statement = jdbcConnection.createStatement()) { + statement.execute(String.format("TRUNCATE TABLE %s", SINK_TABLE)); + } catch (SQLException e) { + throw new RuntimeException("test gbase8a server image error", e); + } + } + + @Override + SeaTunnelRow initTestData() { + return new SeaTunnelRow( + new Object[]{"varchar", "char10col1", "text_col".getBytes(StandardCharsets.UTF_8), 122, 122.0, 122, 100, 1212, 122.0, + 3112121, LocalDate.now(), LocalDateTime.now(), LocalDateTime.now(), LocalDateTime.now(), "blob".getBytes(StandardCharsets.UTF_8)}); + } + + @Override + public void initializeJdbcConnection() throws MalformedURLException, ClassNotFoundException, SQLException, InstantiationException, IllegalAccessException { + URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(DRIVER_JAR)}, JdbcGbse8adbIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver gbase8aDriver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + jdbcConnection = + gbase8aDriver.connect(getJdbcCase().getJdbcUrl().replace(HOST, dbServer.getHost()), props); + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf new file mode 100644 index 00000000000..7b6e3fd181f --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf @@ -0,0 +1,60 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" + #execution.checkpoint.interval = 10000 + #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.gbase.jdbc.Driver + url = "jdbc:gbase:e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = root + password = root + query = "select varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col from e2e_table_source" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Jdbc { + driver = com.gbase.jdbc.Driver + url = "jdbc:gbase:e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + user = root + password = root + query = "INSERT INTO e2e_table_sink (varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} From 9971e654478b87cd3e3b44a334cef00e7bc06f00 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 10 Oct 2022 21:12:37 +0800 Subject: [PATCH 03/22] add gbase8a e2e test --- .../seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java | 2 +- .../seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java | 2 +- .../src/test/resources/jdbc_gbase8a_source_to_sink.conf | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 723d426a054..0f128a82b84 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -94,7 +94,7 @@ protected void initializeJdbcConnection() throws SQLException, ClassNotFoundExce private void batchInsertData() { try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) { - connection.setAutoCommit(false); + jdbcConnection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(jdbcCase.getInitDataSql())) { for (int index = 0; index < jdbcCase.getSeaTunnelRow().getFields().length; index++) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java index 7fff3fa355d..e61d87eb3c6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java @@ -43,7 +43,7 @@ public class JdbcGbse8adbIT extends AbstractJdbcIT { - private static final String DOCKER_IMAGE = "shihd/gbase8a"; + private static final String DOCKER_IMAGE = "shihd/gbase8a:1.0"; private static final String NETWORK_ALIASES = "e2e_gbase8aDb"; private static final String DRIVER_CLASS = "com.gbase.jdbc.Driver"; private static final int PORT = 5258; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf index 7b6e3fd181f..ac54bf37fc4 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf @@ -30,7 +30,7 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** Jdbc { driver = com.gbase.jdbc.Driver - url = "jdbc:gbase:e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + url = "jdbc:gbase://e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" user = root password = root query = "select varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col from e2e_table_source" @@ -49,7 +49,7 @@ transform { sink { Jdbc { driver = com.gbase.jdbc.Driver - url = "jdbc:gbase:e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + url = "jdbc:gbase://e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" user = root password = root query = "INSERT INTO e2e_table_sink (varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" From c1c86a725973b7f966ab8d0b66d424d663f3ecc7 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Mon, 10 Oct 2022 23:36:47 +0800 Subject: [PATCH 04/22] fix some error --- .../seatunnel/jdbc/AbstractJdbcIT.java | 54 +++++++------ .../seatunnel/jdbc/JdbcGbse8adbIT.java | 77 +++++++++---------- .../seatunnel/jdbc/JdbcOracledbIT.java | 41 +++++----- .../jdbc_gbase8a_source_to_sink.conf | 4 +- 4 files changed, 92 insertions(+), 84 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 0f128a82b84..154ae143c26 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -38,11 +38,14 @@ import java.io.IOException; import java.net.MalformedURLException; +import java.net.URL; +import java.net.URLClassLoader; import java.sql.Connection; -import java.sql.DriverManager; +import java.sql.Driver; import java.sql.PreparedStatement; import java.sql.SQLException; import java.sql.Statement; +import java.util.Properties; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; @@ -50,10 +53,8 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResource { protected static final String HOST = "HOST"; - protected Connection jdbcConnection; protected GenericContainer dbServer; - private JdbcCase jdbcCase; - private String jdbcUrl; + protected JdbcCase jdbcCase; abstract JdbcCase getJdbcCase(); @@ -63,13 +64,18 @@ public abstract class AbstractJdbcIT extends TestSuiteBase implements TestResour abstract SeaTunnelRow initTestData(); + protected Connection createAndChangeDatabase(Connection connection) { + //do nothing + return connection; + } + @TestContainerExtension private final ContainerExtendedFactory extendedFactory = container -> { Container.ExecResult extraCommands = container.execInContainer("bash", "-c", "mkdir -p /tmp/seatunnel/plugins/Jdbc/lib && cd /tmp/seatunnel/plugins/Jdbc/lib && curl -O " + jdbcCase.getDriverJar()); Assertions.assertEquals(0, extraCommands.getExitCode()); }; - private void getContainer() throws SQLException { + private void getContainer() throws SQLException, MalformedURLException, ClassNotFoundException, InstantiationException, IllegalAccessException { jdbcCase = this.getJdbcCase(); dbServer = new GenericContainer<>(jdbcCase.getDockerImage()) .withNetwork(NETWORK) @@ -79,22 +85,26 @@ private void getContainer() throws SQLException { dbServer.setPortBindings(Lists.newArrayList( String.format("%s:%s", jdbcCase.getPort(), jdbcCase.getPort()))); Startables.deepStart(Stream.of(dbServer)).join(); + this.initializeJdbcConnection(jdbcCase.getJdbcUrl()); given().ignoreExceptions() .await() .atMost(180, TimeUnit.SECONDS) - .untilAsserted(this::initializeJdbcConnection); - initializeJdbcTable(); + .untilAsserted(this::initializeJdbcTable); } - protected void initializeJdbcConnection() throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException { - Class.forName(jdbcCase.getDriverClass()); - jdbcUrl = jdbcCase.getJdbcUrl().replace(HOST, dbServer.getHost()); - jdbcConnection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword()); + protected Connection initializeJdbcConnection(String jdbcUrl) throws SQLException, ClassNotFoundException, MalformedURLException, InstantiationException, IllegalAccessException { + URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(jdbcCase.getDriverJar())}, AbstractJdbcIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(jdbcCase.getDriverClass()).newInstance(); + Properties props = new Properties(); + props.put("user", jdbcCase.getUserName()); + props.put("password", jdbcCase.getPassword()); + return driver.connect(jdbcUrl.replace(HOST, dbServer.getHost()), props); } private void batchInsertData() { - try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) { - jdbcConnection.setAutoCommit(false); + try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) { + connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(jdbcCase.getInitDataSql())) { for (int index = 0; index < jdbcCase.getSeaTunnelRow().getFields().length; index++) { @@ -103,20 +113,21 @@ private void batchInsertData() { preparedStatement.execute(); } connection.commit(); - } catch (SQLException exception) { - exception.printStackTrace(); + } catch (Exception exception) { + throw new RuntimeException("get gbase8a connection error", exception); } } - private void initializeJdbcTable() throws SQLException { - try (Connection connection = DriverManager.getConnection(jdbcUrl, jdbcCase.getUserName(), jdbcCase.getPassword())) { - Statement statement = connection.createStatement(); + private void initializeJdbcTable() { + try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) { + Connection newConnection = createAndChangeDatabase(connection); + Statement statement = newConnection.createStatement(); String createSource = jdbcCase.getDdlSource(); String createSink = jdbcCase.getDdlSink(); statement.execute(createSource); statement.execute(createSink); - } catch (SQLException exception) { - exception.printStackTrace(); + } catch (Exception exception) { + throw new RuntimeException("get gbase8a connection error", exception); } this.batchInsertData(); } @@ -129,9 +140,6 @@ public void startUp() throws Exception { @Override public void tearDown() throws Exception { - if (jdbcConnection != null) { - jdbcConnection.close(); - } if (dbServer != null) { dbServer.close(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java index e61d87eb3c6..d85e5aaf8d6 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java @@ -23,15 +23,10 @@ import org.testcontainers.shaded.com.google.common.collect.Lists; import org.testcontainers.shaded.org.apache.commons.io.IOUtils; -import java.io.IOException; import java.io.InputStream; -import java.net.MalformedURLException; -import java.net.URL; -import java.net.URLClassLoader; import java.nio.charset.StandardCharsets; -import java.sql.Driver; +import java.sql.Connection; import java.sql.ResultSet; -import java.sql.SQLException; import java.sql.Statement; import java.time.LocalDate; import java.time.LocalDateTime; @@ -39,7 +34,6 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; public class JdbcGbse8adbIT extends AbstractJdbcIT { @@ -55,7 +49,7 @@ public class JdbcGbse8adbIT extends AbstractJdbcIT { private static final String SINK_TABLE = "e2e_table_sink"; private static final String DRIVER_JAR = "https://www.gbase8.cn/wp-content/uploads/2020/10/gbase-connector-java-8.3.81.53-build55.5.7-bin_min_mix.jar"; private static final String CONFIG_FILE = "/jdbc_gbase8a_source_to_sink.conf"; - private static final String DDL_SOURCE = "CREATE TABLE" + SOURCE_TABLE + "(\n" + + private static final String DDL_SOURCE = "CREATE TABLE " + SOURCE_TABLE + "(\n" + " \"varchar_10_col\" varchar(10) DEFAULT NULL,\n" + " \"char_10_col\" char(10) DEFAULT NULL,\n" + " \"text_col\" text,\n" + @@ -72,7 +66,7 @@ public class JdbcGbse8adbIT extends AbstractJdbcIT { " \"datetime_col\" datetime DEFAULT NULL,\n" + " \"blob_col\" blob\n" + ")"; - private static final String DDL_SINK = "CREATE TABLE" + SINK_TABLE + "(\n" + + private static final String DDL_SINK = "CREATE TABLE " + SINK_TABLE + "(\n" + " \"varchar_10_col\" varchar(10) DEFAULT NULL,\n" + " \"char_10_col\" char(10) DEFAULT NULL,\n" + " \"text_col\" text,\n" + @@ -112,9 +106,6 @@ public class JdbcGbse8adbIT extends AbstractJdbcIT { @Override JdbcCase getJdbcCase() { Map containerEnv = new HashMap<>(); - containerEnv.put("ORACLE_PASSWORD", PASSWORD); - containerEnv.put("APP_USER", USERNAME); - containerEnv.put("APP_USER_PASSWORD", PASSWORD); String jdbcUrl = String.format(URL, PORT, DATABASE); return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS) .host(HOST).port(PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE) @@ -123,39 +114,43 @@ JdbcCase getJdbcCase() { } @Override - void compareResult() throws SQLException, IOException { + void compareResult() { String sourceSql = "select * from " + SOURCE_TABLE; String sinkSql = "select * from " + SINK_TABLE; List columns = Lists.newArrayList("varchar_10_col", "char_10_col", "text_col", "decimal_col", "float_col", "int_col", "tinyint_col", "smallint_col", "double_col", "bigint_col", "date_col", "time_col", "timestamp_col", "datetime_col", "blob_col"); - Statement sourceStatement = jdbcConnection.createStatement(); - Statement sinkStatement = jdbcConnection.createStatement(); - ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); - ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); - while (sourceResultSet.next()) { - if (sinkResultSet.next()) { - for (String column : columns) { - Object source = sourceResultSet.getObject(column); - Object sink = sinkResultSet.getObject(column); - if (!Objects.deepEquals(source, sink)) { + try (Connection connection = initializeJdbcConnection(String.format(URL, PORT, "test"))) { + Statement sourceStatement = connection.createStatement(); + Statement sinkStatement = connection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columns) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { - InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); - InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); - String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); - String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); - Assertions.assertEquals(sourceValue, sinkValue); + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + Assertions.assertTrue(true); } - Assertions.assertTrue(true); } } + } catch (Exception e) { + throw new RuntimeException("get gbase8a connection error", e); } clearSinkTable(); } @Override void clearSinkTable() { - try (Statement statement = jdbcConnection.createStatement()) { + try (Statement statement = initializeJdbcConnection(String.format(URL, PORT, "test")).createStatement()) { statement.execute(String.format("TRUNCATE TABLE %s", SINK_TABLE)); - } catch (SQLException e) { + } catch (Exception e) { throw new RuntimeException("test gbase8a server image error", e); } } @@ -167,15 +162,15 @@ SeaTunnelRow initTestData() { 3112121, LocalDate.now(), LocalDateTime.now(), LocalDateTime.now(), LocalDateTime.now(), "blob".getBytes(StandardCharsets.UTF_8)}); } - @Override - public void initializeJdbcConnection() throws MalformedURLException, ClassNotFoundException, SQLException, InstantiationException, IllegalAccessException { - URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(DRIVER_JAR)}, JdbcGbse8adbIT.class.getClassLoader()); - Thread.currentThread().setContextClassLoader(urlClassLoader); - Driver gbase8aDriver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); - Properties props = new Properties(); - props.put("user", USERNAME); - props.put("password", PASSWORD); - jdbcConnection = - gbase8aDriver.connect(getJdbcCase().getJdbcUrl().replace(HOST, dbServer.getHost()), props); + protected Connection createAndChangeDatabase(Connection connection) { + try { + connection.prepareStatement("CREATE DATABASE test").executeUpdate(); + jdbcCase.setDataBase("test"); + connection.close(); + return initializeJdbcConnection(String.format(URL, PORT, jdbcCase.getDataBase())); + } catch (Exception e) { + throw new RuntimeException("create database error", e); + } + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java index 48e34c0c74a..dd19616db2f 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcOracledbIT.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; +import java.sql.Connection; import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; @@ -119,35 +120,39 @@ void compareResult() throws SQLException, IOException { String sourceSql = "select * from " + SOURCE_TABLE; String sinkSql = "select * from " + SINK_TABLE; List columns = Lists.newArrayList("varchar_10_col", "char_10_col", "clob_col", "number_3_sf_2_dp", "integer_col", "float_col", "real_col", "binary_float_col", "binary_double_col", "date_col", "timestamp_with_3_frac_sec_col", "timestamp_with_local_tz", "raw_col", "blob_col"); - Statement sourceStatement = jdbcConnection.createStatement(); - Statement sinkStatement = jdbcConnection.createStatement(); - ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); - ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); - while (sourceResultSet.next()) { - if (sinkResultSet.next()) { - for (String column : columns) { - Object source = sourceResultSet.getObject(column); - Object sink = sinkResultSet.getObject(column); - if (!Objects.deepEquals(source, sink)) { + try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) { + Statement sourceStatement = connection.createStatement(); + Statement sinkStatement = connection.createStatement(); + ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + while (sourceResultSet.next()) { + if (sinkResultSet.next()) { + for (String column : columns) { + Object source = sourceResultSet.getObject(column); + Object sink = sinkResultSet.getObject(column); + if (!Objects.deepEquals(source, sink)) { - InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); - InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); - String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); - String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); - Assertions.assertEquals(sourceValue, sinkValue); + InputStream sourceAsciiStream = sourceResultSet.getBinaryStream(column); + InputStream sinkAsciiStream = sinkResultSet.getBinaryStream(column); + String sourceValue = IOUtils.toString(sourceAsciiStream, StandardCharsets.UTF_8); + String sinkValue = IOUtils.toString(sinkAsciiStream, StandardCharsets.UTF_8); + Assertions.assertEquals(sourceValue, sinkValue); + } + Assertions.assertTrue(true); } - Assertions.assertTrue(true); } } + } catch (Exception e) { + throw new RuntimeException("get oracle connection error", e); } clearSinkTable(); } @Override void clearSinkTable() { - try (Statement statement = jdbcConnection.createStatement()) { + try (Statement statement = initializeJdbcConnection(jdbcCase.getJdbcUrl()).createStatement()) { statement.execute(String.format("TRUNCATE TABLE %s", SINK_TABLE)); - } catch (SQLException e) { + } catch (Exception e) { throw new RuntimeException("test oracle server image error", e); } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf index ac54bf37fc4..6ce5f23e5ce 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf @@ -30,7 +30,7 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** Jdbc { driver = com.gbase.jdbc.Driver - url = "jdbc:gbase://e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + url = "jdbc:gbase://e2e_oracleDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" user = root password = root query = "select varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col from e2e_table_source" @@ -49,7 +49,7 @@ transform { sink { Jdbc { driver = com.gbase.jdbc.Driver - url = "jdbc:gbase://e2e_oracleDb:5258/gbase?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + url = "jdbc:gbase://e2e_oracleDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" user = root password = root query = "INSERT INTO e2e_table_sink (varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" From 65820585a7f72e0a0f681271de6c686bd4ef42b2 Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Tue, 11 Oct 2022 13:55:33 +0800 Subject: [PATCH 05/22] fix some error --- .../gbase8a/Gbase8aJdbcRowConverter.java | 56 ++++++++++++++++++- .../seatunnel/jdbc/AbstractJdbcIT.java | 5 +- .../connectors/seatunnel/jdbc/JdbcCase.java | 1 + .../seatunnel/jdbc/JdbcGbse8adbIT.java | 9 ++- .../jdbc_gbase8a_source_to_sink.conf | 4 +- 5 files changed, 68 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java index 2365d9cea34..39293cfbd80 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/gbase8a/Gbase8aJdbcRowConverter.java @@ -17,13 +17,22 @@ package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.gbase8a; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; +import java.math.BigDecimal; +import java.math.BigInteger; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; public class Gbase8aJdbcRowConverter extends AbstractJdbcRowConverter { @Override @@ -33,6 +42,51 @@ public String converterName() { @Override public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException { - return super.toInternal(rs, metaData, typeInfo); + List fields = new ArrayList<>(); + SeaTunnelDataType[] seaTunnelDataTypes = typeInfo.getFieldTypes(); + + for (int i = 1; i <= seaTunnelDataTypes.length; i++) { + Object seatunnelField; + SeaTunnelDataType seaTunnelDataType = seaTunnelDataTypes[i - 1]; + if (null == rs.getObject(i)) { + seatunnelField = null; + } else if (BasicType.BOOLEAN_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getBoolean(i); + } else if (BasicType.BYTE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getByte(i); + } else if (BasicType.SHORT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getShort(i); + } else if (BasicType.INT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getInt(i); + } else if (BasicType.LONG_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getLong(i); + } else if (seaTunnelDataType instanceof DecimalType) { + Object value = rs.getObject(i); + seatunnelField = value instanceof BigInteger ? + new BigDecimal((BigInteger) value, 0) + : value; + } else if (BasicType.FLOAT_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getFloat(i); + } else if (BasicType.DOUBLE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDouble(i); + } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getString(i); + } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getTime(i); + } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getDate(i); + } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(seaTunnelDataType)) { + seatunnelField = rs.getTimestamp(i); + } else if (PrimitiveByteArrayType.INSTANCE.equals(seaTunnelDataType)) { + seatunnelField = rs.getBytes(i); + } else { + throw new IllegalStateException("Unexpected value: " + seaTunnelDataType); + } + + fields.add(seatunnelField); + } + + return new SeaTunnelRow(fields.toArray()); } + } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java index 154ae143c26..d00ed103ae2 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/AbstractJdbcIT.java @@ -20,6 +20,7 @@ import static org.awaitility.Awaitility.given; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.ExceptionUtil; import org.apache.seatunnel.e2e.common.TestResource; import org.apache.seatunnel.e2e.common.TestSuiteBase; import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory; @@ -103,7 +104,7 @@ protected Connection initializeJdbcConnection(String jdbcUrl) throws SQLExceptio } private void batchInsertData() { - try (Connection connection = initializeJdbcConnection(jdbcCase.getJdbcUrl())) { + try (Connection connection = initializeJdbcConnection(String.format(jdbcCase.getJdbcTemplate(), jdbcCase.getPort(), jdbcCase.getDataBase()))) { connection.setAutoCommit(false); try (PreparedStatement preparedStatement = connection.prepareStatement(jdbcCase.getInitDataSql())) { @@ -114,6 +115,7 @@ private void batchInsertData() { } connection.commit(); } catch (Exception exception) { + log.error(ExceptionUtil.getMessage(exception)); throw new RuntimeException("get gbase8a connection error", exception); } } @@ -127,6 +129,7 @@ private void initializeJdbcTable() { statement.execute(createSource); statement.execute(createSink); } catch (Exception exception) { + log.error(ExceptionUtil.getMessage(exception)); throw new RuntimeException("get gbase8a connection error", exception); } this.batchInsertData(); diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java index fd182ce3ba1..d4659a9b3fc 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcCase.java @@ -38,6 +38,7 @@ public class JdbcCase { private String sourceTable; private String sinkTable; private String driverJar; + private String jdbcTemplate; private String jdbcUrl; private String ddlSource; private String ddlSink; diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java index d85e5aaf8d6..81ecccbeff5 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcGbse8adbIT.java @@ -28,8 +28,11 @@ import java.sql.Connection; import java.sql.ResultSet; import java.sql.Statement; +import java.sql.Time; +import java.sql.Timestamp; import java.time.LocalDate; import java.time.LocalDateTime; +import java.time.ZoneOffset; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -108,7 +111,7 @@ JdbcCase getJdbcCase() { Map containerEnv = new HashMap<>(); String jdbcUrl = String.format(URL, PORT, DATABASE); return JdbcCase.builder().dockerImage(DOCKER_IMAGE).networkAliases(NETWORK_ALIASES).containerEnv(containerEnv).driverClass(DRIVER_CLASS) - .host(HOST).port(PORT).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE) + .host(HOST).port(PORT).jdbcTemplate(URL).jdbcUrl(jdbcUrl).userName(USERNAME).password(PASSWORD).dataBase(DATABASE) .sourceTable(SOURCE_TABLE).sinkTable(SINK_TABLE).driverJar(DRIVER_JAR) .ddlSource(DDL_SOURCE).ddlSink(DDL_SINK).initDataSql(INIT_DATA_SQL).configFile(CONFIG_FILE).seaTunnelRow(initTestData()).build(); } @@ -118,7 +121,7 @@ void compareResult() { String sourceSql = "select * from " + SOURCE_TABLE; String sinkSql = "select * from " + SINK_TABLE; List columns = Lists.newArrayList("varchar_10_col", "char_10_col", "text_col", "decimal_col", "float_col", "int_col", "tinyint_col", "smallint_col", "double_col", "bigint_col", "date_col", "time_col", "timestamp_col", "datetime_col", "blob_col"); - try (Connection connection = initializeJdbcConnection(String.format(URL, PORT, "test"))) { + try (Connection connection = initializeJdbcConnection(String.format(URL, PORT, jdbcCase.getDataBase()))) { Statement sourceStatement = connection.createStatement(); Statement sinkStatement = connection.createStatement(); ResultSet sourceResultSet = sourceStatement.executeQuery(sourceSql); @@ -159,7 +162,7 @@ void clearSinkTable() { SeaTunnelRow initTestData() { return new SeaTunnelRow( new Object[]{"varchar", "char10col1", "text_col".getBytes(StandardCharsets.UTF_8), 122, 122.0, 122, 100, 1212, 122.0, - 3112121, LocalDate.now(), LocalDateTime.now(), LocalDateTime.now(), LocalDateTime.now(), "blob".getBytes(StandardCharsets.UTF_8)}); + 3112121, new java.sql.Date(LocalDate.now().toEpochDay()), new Time(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)), new Timestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)), new Timestamp(LocalDateTime.now().toEpochSecond(ZoneOffset.UTC)), "blob".getBytes(StandardCharsets.UTF_8)}); } protected Connection createAndChangeDatabase(Connection connection) { diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf index 6ce5f23e5ce..37153c96cde 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/src/test/resources/jdbc_gbase8a_source_to_sink.conf @@ -30,7 +30,7 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** Jdbc { driver = com.gbase.jdbc.Driver - url = "jdbc:gbase://e2e_oracleDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + url = "jdbc:gbase://e2e_gbase8aDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" user = root password = root query = "select varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col from e2e_table_source" @@ -49,7 +49,7 @@ transform { sink { Jdbc { driver = com.gbase.jdbc.Driver - url = "jdbc:gbase://e2e_oracleDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" + url = "jdbc:gbase://e2e_gbase8aDb:5258/test?useUnicode=true&characterEncoding=UTF-8&rewriteBatchedStatements=true" user = root password = root query = "INSERT INTO e2e_table_sink (varchar_10_col, char_10_col, text_col, decimal_col, float_col, int_col, tinyint_col, smallint_col, double_col, bigint_col, date_col, time_col, timestamp_col, datetime_col, blob_col) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)" From f524864c6ae92915edd83b1efd6e031c7d76c10d Mon Sep 17 00:00:00 2001 From: liugddx <804167098@qq.com> Date: Tue, 11 Oct 2022 14:30:03 +0800 Subject: [PATCH 06/22] FIX time type error --- .../seatunnel/api/table/type/SqlDateType.java | 71 +++++++++++++++++++ .../dialect/gbase8a/Gbase8aTypeMapper.java | 8 +-- 2 files changed, 75 insertions(+), 4 deletions(-) create mode 100644 seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlDateType.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlDateType.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlDateType.java new file mode 100644 index 00000000000..2dff00b6e4b --- /dev/null +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/SqlDateType.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.api.table.type; + +import java.sql.Date; +import java.sql.Time; +import java.sql.Timestamp; +import java.util.Objects; + +public class SqlDateType implements SeaTunnelDataType { + private static final long serialVersionUID = 2L; + + public static final SqlDateType SQL_DATE_TYPE = new SqlDateType<>(Date.class, SqlType.DATE); + public static final SqlDateType