diff --git a/docs/en/connector-v2/sink/Jdbc.md b/docs/en/connector-v2/sink/Jdbc.md index f845dea6efd..394fadde801 100644 --- a/docs/en/connector-v2/sink/Jdbc.md +++ b/docs/en/connector-v2/sink/Jdbc.md @@ -176,6 +176,7 @@ there are some reference value for params above. | Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb | com.amazon.redshift.xa.RedshiftXADataSource | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | | Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | / | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | / | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | / | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | | OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | / | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | ## Example diff --git a/docs/en/connector-v2/sink/Kingbase.md b/docs/en/connector-v2/sink/Kingbase.md new file mode 100644 index 00000000000..b92b12fc420 --- /dev/null +++ b/docs/en/connector-v2/sink/Kingbase.md @@ -0,0 +1,168 @@ +# Kingbase + +> JDBC Kingbase Sink Connector + +## Support Connector Version + +- 8.6 + +## Support Those Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## Key Features + +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [ ] [cdc](../../concept/connector-v2-features.md) + +## Description + +> Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` for the database which is +> support `Xa transactions`. You can set `is_exactly_once=true` to enable it.Kingbase currently does not support + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------|----------------------|------------------------------------------|------------------------------------------------------------------------------------------------| +| Kingbase | 8.6 | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | [Download](https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' +> working directory
+> For example: cp kingbase8-8.6.0.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +| Kingbase Data type | SeaTunnel Data type | +|----------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------| +| BOOL | BOOLEAN | +| INT2 | SHORT | +| SMALLSERIAL
SERIAL
INT4 | INT | +| INT8
BIGSERIAL | BIGINT | +| FLOAT4 | FLOAT | +| FLOAT8 | DOUBLE | +| NUMERIC | DECIMAL((Get the designated column's specified column size),
(Gets the designated column's number of digits to right of the decimal point.))) | +| BPCHAR
CHARACTER
VARCHAR
TEXT | STRING | +| TIMESTAMP | LOCALDATETIME | +| TIME | LOCALTIME | +| DATE | LOCALDATE | +| Other data type | Not supported yet | + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:db2://127.0.0.1:50000/dbname | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use DB2 the value is `com.ibm.db2.jdbc.app.DB2Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. Kingbase currently does not support | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver,Kingbase currently does not support | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed +> in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +> This example defines a SeaTunnel synchronization task that automatically generates data through FakeSource and sends +> it to JDBC Sink. FakeSource generates a total of 16 rows of data (row.num=16), with each row having 12 fields. The final target table is test_table will also be 16 rows of data in the table. +> Before +> run this job, you need create database test and table test_table in your Kingbase. And if you have not yet installed and +> deployed SeaTunnel, you need to follow the instructions in [Install SeaTunnel](../../start-v2/locally/deployment.md) +> to +> install and deploy SeaTunnel. And then follow the instructions +> in [Quick Start With SeaTunnel Engine](../../start-v2/locally/quick-start-seatunnel-engine.md) to run this job. + +``` +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 16 + schema = { + fields { + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_date = date + c_time = time + c_timestamp = timestamp + } + } + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform-v2 +} + +sink { + jdbc { + url = "jdbc:kingbase8://127.0.0.1:54321/dbname" + driver = "com.kingbase8.Driver" + user = "root" + password = "123456" + query = "insert into test_table(c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_decimal,c_date,c_time,c_timestamp) values(?,?,?,?,?,?,?,?,?,?,?,?)" + } + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/category/sink-v2 +} +``` + +### Generate Sink SQL + +> This example not need to write complex sql statements, you can configure the database name table name to automatically +> generate add statements for you + +``` +sink { + jdbc { + url = "jdbc:kingbase8://127.0.0.1:54321/dbname" + driver = "com.kingbase8.Driver" + user = "root" + password = "123456" + # Automatically generate sql statements based on database table names + generate_sink_sql = true + database = test + table = test_table + } +} +``` + diff --git a/docs/en/connector-v2/source/Jdbc.md b/docs/en/connector-v2/source/Jdbc.md index 585c2bc0024..b86a7b33854 100644 --- a/docs/en/connector-v2/source/Jdbc.md +++ b/docs/en/connector-v2/source/Jdbc.md @@ -125,6 +125,7 @@ there are some reference value for params above. | Snowflake | net.snowflake.client.jdbc.SnowflakeDriver | jdbc:snowflake://.snowflakecomputing.com | https://mvnrepository.com/artifact/net.snowflake/snowflake-jdbc | | Redshift | com.amazon.redshift.jdbc42.Driver | jdbc:redshift://localhost:5439/testdb?defaultRowFetchSize=1000 | https://mvnrepository.com/artifact/com.amazon.redshift/redshift-jdbc42 | | Vertica | com.vertica.jdbc.Driver | jdbc:vertica://localhost:5433 | https://repo1.maven.org/maven2/com/vertica/jdbc/vertica-jdbc/12.0.3-0/vertica-jdbc-12.0.3-0.jar | +| Kingbase | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar | | OceanBase | com.oceanbase.jdbc.Driver | jdbc:oceanbase://localhost:2881 | https://repo1.maven.org/maven2/com/oceanbase/oceanbase-client/2.4.3/oceanbase-client-2.4.3.jar | ## Example diff --git a/docs/en/connector-v2/source/Kingbase.md b/docs/en/connector-v2/source/Kingbase.md new file mode 100644 index 00000000000..62e280675dd --- /dev/null +++ b/docs/en/connector-v2/source/Kingbase.md @@ -0,0 +1,148 @@ +# Kingbase + +> JDBC Kingbase Source Connector + +## Support Connector Version + +- 8.6 + +## Support Those Engines + +> Spark
+> Flink
+> SeaTunnel Zeta
+ +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) + +## Description + +Read external data source data through JDBC. + +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|--------------------|----------------------|------------------------------------------|------------------------------------------------------------------------------------------------| +| Kingbase | 8.6 | com.kingbase8.Driver | jdbc:kingbase8://localhost:54321/db_test | [Download](https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar) | + +## Database Dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example: cp kingbase8-8.6.0.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +| Kingbase Data type | SeaTunnel Data type | +|-------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------| +| BOOL | BOOLEAN | +| INT2 | SHORT | +| SMALLSERIAL
SERIAL
INT4 | INT | +| INT8
BIGSERIAL | BIGINT | +| FLOAT4 | FLOAT | +| FLOAT8 | DOUBLE | +| NUMERIC | DECIMAL((Get the designated column's specified column size),
(Gets the designated column's number of digits to right of the decimal point.))) | +| BPCHAR
CHARACTER
VARCHAR
TEXT | STRING | +| TIMESTAMP | LOCALDATETIME | +| TIME | LOCALTIME | +| DATE | LOCALDATE | +| Other data type | Not supported yet | + +## Source Options + +| Name | Type | Required | Default | Description | +|------------------------------|------------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:kingbase8://localhost:54321/test | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source, should be `com.kingbase8.Driver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | Yes | - | Query statement | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete | +| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type column and string type column. | +| partition_lower_bound | BigDecimal | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. | +| partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | +| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. Default value is job parallelism. | +| fetch_size | Int | No | 0 | For queries that return a large number of objects, you can configure
the row fetch size used in the query to improve performance by
reducing the number database hits required to satisfy the selection criteria.
Zero means use jdbc default value. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +### Tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +``` +env { + execution.parallelism = 2 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = "com.kingbase8.Driver" + url = "jdbc:kingbase8://localhost:54321/db_test" + user = "root" + password = "" + query = "select * from source" + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform/sql +} + +sink { + Console {} +} +``` + +### Parallel: + +> Read your query table in parallel with the shard field you configured and the shard data. You can do this if you want to read the whole table + +``` +source { + Jdbc { + driver = "com.kingbase8.Driver" + url = "jdbc:kingbase8://localhost:54321/db_test" + user = "root" + password = "" + query = "select * from source" + # Parallel sharding reads fields + partition_column = "id" + # Number of fragments + partition_num = 10 + } +} +``` + +### Parallel Boundary: + +> It is more efficient to read your data source according to the upper and lower boundaries you configured + +``` +source { + Jdbc { + driver = "com.kingbase8.Driver" + url = "jdbc:kingbase8://localhost:54321/db_test" + user = "root" + password = "" + query = "select * from source" + partition_column = "id" + partition_num = 10 + # Read start boundary + partition_lower_bound = 1 + # Read end boundary + partition_upper_bound = 500 + } +} +``` + diff --git a/release-note.md b/release-note.md index 1b797ff3154..61664d773f4 100644 --- a/release-note.md +++ b/release-note.md @@ -159,6 +159,7 @@ - [Connector-V2] [Paimon] Introduce paimon connector (#4178) - [Connector V2] [Cassandra] Expose configurable options in Cassandra (#3681) - [Connector V2] [Jdbc] Supports GEOMETRY data type for PostgreSQL (#4673) +- [Connector V2] [Jdbc] Supports Kingbase database (#4803) - [Transform-V2] Add UDF SPI and an example implement for SQL Transform plugin (#4392) - [Transform-V2] Support copy field list (#4404) - [Transform-V2] Add support CatalogTable for FieldMapperTransform (#4423) diff --git a/seatunnel-connectors-v2/connector-jdbc/pom.xml b/seatunnel-connectors-v2/connector-jdbc/pom.xml index e76237e7e07..62d541d19f0 100644 --- a/seatunnel-connectors-v2/connector-jdbc/pom.xml +++ b/seatunnel-connectors-v2/connector-jdbc/pom.xml @@ -46,6 +46,7 @@ 3.13.29 12.0.3-0 2.5.1 + 8.6.0 @@ -143,6 +144,12 @@ ${vertica.version} provided + + cn.com.kingbase + kingbase8 + ${kingbase8.version} + provided + @@ -218,5 +225,11 @@ com.vertica.jdbc vertica-jdbc + + + cn.com.kingbase + kingbase8 + + diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java new file mode 100644 index 00000000000..2f6d5661063 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialect.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.util.Arrays; +import java.util.Optional; +import java.util.stream.Collectors; + +public class KingbaseDialect implements JdbcDialect { + + @Override + public String dialectName() { + return "Kingbase"; + } + + @Override + public JdbcRowConverter getRowConverter() { + return new KingbaseJdbcRowConverter(); + } + + @Override + public JdbcDialectTypeMapper getJdbcDialectTypeMapper() { + return new KingbaseTypeMapper(); + } + + @Override + public Optional getUpsertStatement( + String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) { + String uniqueColumns = + Arrays.stream(uniqueKeyFields) + .map(this::quoteIdentifier) + .collect(Collectors.joining(", ")); + String updateClause = + Arrays.stream(fieldNames) + .map( + fieldName -> + quoteIdentifier(fieldName) + + "=EXCLUDED." + + quoteIdentifier(fieldName)) + .collect(Collectors.joining(", ")); + String upsertSQL = + String.format( + "%s ON CONFLICT (%s) DO UPDATE SET %s", + getInsertIntoStatement(database, tableName, fieldNames), + uniqueColumns, + updateClause); + return Optional.of(upsertSQL); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java new file mode 100644 index 00000000000..f9998610351 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseDialectFactory.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory; + +import com.google.auto.service.AutoService; + +/** Factory for {@link KingbaseDialect}. */ +@AutoService(JdbcDialectFactory.class) +public class KingbaseDialectFactory implements JdbcDialectFactory { + + @Override + public boolean acceptsURL(String url) { + return url.startsWith("jdbc:kingbase8:"); + } + + @Override + public JdbcDialect create() { + return new KingbaseDialect(); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java new file mode 100644 index 00000000000..9577e12f620 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseJdbcRowConverter.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter; + +import java.math.BigDecimal; +import java.sql.Date; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.Optional; + +public class KingbaseJdbcRowConverter extends AbstractJdbcRowConverter { + + @Override + public String converterName() { + return "KingBase"; + } + + @Override + @SuppressWarnings("checkstyle:Indentation") + public SeaTunnelRow toInternal(ResultSet rs, SeaTunnelRowType typeInfo) throws SQLException { + Object[] fields = new Object[typeInfo.getTotalFields()]; + for (int fieldIndex = 0; fieldIndex < typeInfo.getTotalFields(); fieldIndex++) { + SeaTunnelDataType seaTunnelDataType = typeInfo.getFieldType(fieldIndex); + int resultSetIndex = fieldIndex + 1; + switch (seaTunnelDataType.getSqlType()) { + case STRING: + fields[fieldIndex] = rs.getString(resultSetIndex); + break; + case BOOLEAN: + fields[fieldIndex] = rs.getBoolean(resultSetIndex); + break; + case TINYINT: + fields[fieldIndex] = rs.getByte(resultSetIndex); + break; + case SMALLINT: + fields[fieldIndex] = rs.getShort(resultSetIndex); + break; + case INT: + fields[fieldIndex] = rs.getInt(resultSetIndex); + break; + case BIGINT: + fields[fieldIndex] = rs.getLong(resultSetIndex); + break; + case FLOAT: + fields[fieldIndex] = rs.getFloat(resultSetIndex); + break; + case DOUBLE: + fields[fieldIndex] = rs.getDouble(resultSetIndex); + break; + case DECIMAL: + fields[fieldIndex] = rs.getBigDecimal(resultSetIndex); + break; + case DATE: + Date sqlDate = rs.getDate(resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlDate).map(Date::toLocalDate).orElse(null); + break; + case TIME: + Time sqlTime = rs.getTime(resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlTime).map(Time::toLocalTime).orElse(null); + break; + case TIMESTAMP: + Timestamp sqlTimestamp = rs.getTimestamp(resultSetIndex); + fields[fieldIndex] = + Optional.ofNullable(sqlTimestamp) + .map(Timestamp::toLocalDateTime) + .orElse(null); + break; + case BYTES: + fields[fieldIndex] = rs.getBytes(resultSetIndex); + break; + case NULL: + fields[fieldIndex] = null; + break; + case ROW: + case MAP: + case ARRAY: + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType); + } + } + return new SeaTunnelRow(fields); + } + + @Override + public PreparedStatement toExternal( + SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement) + throws SQLException { + for (int fieldIndex = 0; fieldIndex < rowType.getTotalFields(); fieldIndex++) { + SeaTunnelDataType seaTunnelDataType = rowType.getFieldType(fieldIndex); + int statementIndex = fieldIndex + 1; + Object fieldValue = row.getField(fieldIndex); + if (fieldValue == null) { + statement.setObject(statementIndex, null); + continue; + } + + switch (seaTunnelDataType.getSqlType()) { + case STRING: + statement.setString(statementIndex, (String) row.getField(fieldIndex)); + break; + case BOOLEAN: + statement.setBoolean(statementIndex, (Boolean) row.getField(fieldIndex)); + break; + case TINYINT: + statement.setByte(statementIndex, (Byte) row.getField(fieldIndex)); + break; + case SMALLINT: + statement.setShort(statementIndex, (Short) row.getField(fieldIndex)); + break; + case INT: + statement.setInt(statementIndex, (Integer) row.getField(fieldIndex)); + break; + case BIGINT: + statement.setLong(statementIndex, (Long) row.getField(fieldIndex)); + break; + case FLOAT: + statement.setFloat(statementIndex, (Float) row.getField(fieldIndex)); + break; + case DOUBLE: + statement.setDouble(statementIndex, (Double) row.getField(fieldIndex)); + break; + case DECIMAL: + statement.setBigDecimal(statementIndex, (BigDecimal) row.getField(fieldIndex)); + break; + case DATE: + LocalDate localDate = (LocalDate) row.getField(fieldIndex); + statement.setDate(statementIndex, java.sql.Date.valueOf(localDate)); + break; + case TIME: + LocalTime localTime = (LocalTime) row.getField(fieldIndex); + statement.setTime(statementIndex, java.sql.Time.valueOf(localTime)); + break; + case TIMESTAMP: + LocalDateTime localDateTime = (LocalDateTime) row.getField(fieldIndex); + statement.setTimestamp( + statementIndex, java.sql.Timestamp.valueOf(localDateTime)); + break; + case BYTES: + statement.setBytes(statementIndex, (byte[]) row.getField(fieldIndex)); + break; + case NULL: + statement.setNull(statementIndex, java.sql.Types.NULL); + break; + case ROW: + case MAP: + case ARRAY: + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_DATA_TYPE, + "Unexpected value: " + seaTunnelDataType); + } + } + return statement; + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeMapper.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeMapper.java new file mode 100644 index 00000000000..439c8fc4202 --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/dialect/kingbase/KingbaseTypeMapper.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.kingbase; + +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper; + +import java.sql.ResultSetMetaData; +import java.sql.SQLException; + +public class KingbaseTypeMapper implements JdbcDialectTypeMapper { + + private static final String KB_SMALLSERIAL = "SMALLSERIAL"; + private static final String KB_SERIAL = "SERIAL"; + private static final String KB_BIGSERIAL = "BIGSERIAL"; + private static final String KB_BYTEA = "BYTEA"; + private static final String KB_BYTEA_ARRAY = "_BYTEA"; + private static final String KB_SMALLINT = "INT2"; + private static final String KB_SMALLINT_ARRAY = "_INT2"; + private static final String KB_INTEGER = "INT4"; + private static final String KB_INTEGER_ARRAY = "_INT4"; + private static final String KB_BIGINT = "INT8"; + private static final String KB_BIGINT_ARRAY = "_INT8"; + private static final String KB_REAL = "FLOAT4"; + private static final String KB_REAL_ARRAY = "_FLOAT4"; + private static final String KB_DOUBLE_PRECISION = "FLOAT8"; + private static final String KB_DOUBLE_PRECISION_ARRAY = "_FLOAT8"; + private static final String KB_NUMERIC = "NUMERIC"; + private static final String KB_NUMERIC_ARRAY = "_NUMERIC"; + private static final String KB_BOOLEAN = "BOOL"; + private static final String KB_BOOLEAN_ARRAY = "_BOOL"; + private static final String KB_TIMESTAMP = "TIMESTAMP"; + private static final String KB_TIMESTAMP_ARRAY = "_TIMESTAMP"; + private static final String KB_TIMESTAMPTZ = "TIMESTAMPTZ"; + private static final String KB_TIMESTAMPTZ_ARRAY = "_TIMESTAMPTZ"; + private static final String KB_DATE = "DATE"; + private static final String KB_DATE_ARRAY = "_DATE"; + private static final String KB_TIME = "TIME"; + private static final String KB_TIME_ARRAY = "_TIME"; + private static final String KB_TEXT = "TEXT"; + private static final String KB_TEXT_ARRAY = "_TEXT"; + private static final String KB_CHAR = "BPCHAR"; + private static final String KB_CHAR_ARRAY = "_BPCHAR"; + private static final String KB_CHARACTER = "CHARACTER"; + + private static final String KB_CHARACTER_VARYING = "VARCHAR"; + private static final String KB_CHARACTER_VARYING_ARRAY = "_VARCHAR"; + private static final String KB_JSON = "JSON"; + private static final String KB_JSONB = "JSONB"; + + @SuppressWarnings("checkstyle:MagicNumber") + @Override + public SeaTunnelDataType mapping(ResultSetMetaData metadata, int colIndex) + throws SQLException { + + String kbType = metadata.getColumnTypeName(colIndex).toUpperCase(); + + int precision = metadata.getPrecision(colIndex); + + switch (kbType) { + case KB_BOOLEAN: + return BasicType.BOOLEAN_TYPE; + case KB_SMALLINT: + return BasicType.SHORT_TYPE; + case KB_SMALLSERIAL: + case KB_INTEGER: + case KB_SERIAL: + return BasicType.INT_TYPE; + case KB_BIGINT: + case KB_BIGSERIAL: + return BasicType.LONG_TYPE; + case KB_REAL: + return BasicType.FLOAT_TYPE; + case KB_DOUBLE_PRECISION: + return BasicType.DOUBLE_TYPE; + case KB_NUMERIC: + // see SPARK-26538: handle numeric without explicit precision and scale. + if (precision > 0) { + return new DecimalType(precision, metadata.getScale(colIndex)); + } + return new DecimalType(38, 18); + case KB_CHAR: + case KB_CHARACTER: + case KB_CHARACTER_VARYING: + case KB_TEXT: + return BasicType.STRING_TYPE; + case KB_TIMESTAMP: + return LocalTimeType.LOCAL_DATE_TIME_TYPE; + case KB_TIME: + return LocalTimeType.LOCAL_TIME_TYPE; + case KB_DATE: + return LocalTimeType.LOCAL_DATE_TYPE; + case KB_CHAR_ARRAY: + case KB_CHARACTER_VARYING_ARRAY: + case KB_TEXT_ARRAY: + case KB_DOUBLE_PRECISION_ARRAY: + case KB_REAL_ARRAY: + case KB_BIGINT_ARRAY: + case KB_SMALLINT_ARRAY: + case KB_INTEGER_ARRAY: + case KB_BYTEA_ARRAY: + case KB_BOOLEAN_ARRAY: + case KB_TIMESTAMP_ARRAY: + case KB_NUMERIC_ARRAY: + case KB_TIMESTAMPTZ: + case KB_TIMESTAMPTZ_ARRAY: + case KB_TIME_ARRAY: + case KB_DATE_ARRAY: + case KB_JSONB: + case KB_JSON: + case KB_BYTEA: + default: + throw new JdbcConnectorException( + CommonErrorCode.UNSUPPORTED_OPERATION, + String.format("Doesn't support KingBaseES type '%s' yet", kbType)); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java new file mode 100644 index 00000000000..17d53bb87d9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/JdbcKingbaseIT.java @@ -0,0 +1,223 @@ +package org.apache.seatunnel.connectors.seatunnel.jdbc; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException; +import org.apache.seatunnel.common.utils.ExceptionUtils; + +import org.apache.commons.lang3.tuple.Pair; + +import org.junit.jupiter.api.Disabled; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerLoggerFactory; + +import com.google.common.collect.Lists; +import lombok.extern.slf4j.Slf4j; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * If you want to run this e2e, you need to download km license from + * https://www.kingbase.com.cn/sqwjxz/index.htm and modify the KM_LICENSE_PATH variable to the + * address where you downloaded the certificate. Also, remove the @Disabled annotation. The spark + * engine does not support the TIME type.Two environment variables need to be added to the spark + * container: "LANG"="C.UTF-8", "JAVA_TOOL_OPTIONS"="-Dfile.encoding=UTF8" + */ +@Slf4j +@Disabled("Due to copyright reasons, you need to download the trial version km license yourself") +public class JdbcKingbaseIT extends AbstractJdbcIT { + private static final String KINGBASE_IMAGE = "huzhihui/kingbase:v8r6"; + private static final String KINGBASE_CONTAINER_HOST = "e2e_KINGBASEDb"; + private static final String KINGBASE_DATABASE = "test"; + private static final String KINGBASE_SCHEMA = "public"; + private static final String KINGBASE_SOURCE = "e2e_table_source"; + private static final String KINGBASE_SINK = "e2e_table_sink"; + + private static final String KINGBASE_USERNAME = "SYSTEM"; + private static final String KINGBASE_PASSWORD = "123456"; + private static final int KINGBASE_PORT = 54321; + private static final String KINGBASE_URL = "jdbc:kingbase8://" + HOST + ":%s/test"; + private static final String DRIVER_CLASS = "com.kingbase8.Driver"; + private static final String KM_LICENSE_PATH = "KM_LICENSE_PATH"; + + private static final List CONFIG_FILE = + Lists.newArrayList("/jdbc_kingbase_source_and_sink.conf"); + private static final String CREATE_SQL = + "create table %s \n" + + "(\n" + + " c1 SMALLSERIAL,\n" + + " c2 SERIAL,\n" + + " c3 BIGSERIAL,\n" + + " c5 INT2,\n" + + " c7 INT4,\n" + + " c9 INT8,\n" + + " c11 FLOAT4,\n" + + " c13 FLOAT8,\n" + + " c15 NUMERIC,\n" + + " c16 BOOL,\n" + + " c18 TIMESTAMP,\n" + + " c19 DATE,\n" + + " c20 TIME,\n" + + " c21 TEXT,\n" + + " c23 BPCHAR,\n" + + " c25 CHARACTER,\n" + + " c26 VARCHAR\n" + + ");\n"; + + @Override + JdbcCase getJdbcCase() { + Map containerEnv = new HashMap<>(); + String jdbcUrl = String.format(KINGBASE_URL, KINGBASE_PORT); + Pair> testDataSet = initTestData(); + String[] fieldNames = testDataSet.getKey(); + + String insertSql = insertTable(KINGBASE_SCHEMA, KINGBASE_SOURCE, fieldNames); + + return JdbcCase.builder() + .dockerImage(KINGBASE_IMAGE) + .networkAliases(KINGBASE_CONTAINER_HOST) + .containerEnv(containerEnv) + .driverClass(DRIVER_CLASS) + .host(HOST) + .port(KINGBASE_PORT) + .localPort(KINGBASE_PORT) + .jdbcTemplate(KINGBASE_URL) + .jdbcUrl(jdbcUrl) + .userName(KINGBASE_USERNAME) + .password(KINGBASE_PASSWORD) + .database(KINGBASE_DATABASE) + .sourceTable(KINGBASE_SOURCE) + .sinkTable(KINGBASE_SINK) + .createSql(CREATE_SQL) + .configFile(CONFIG_FILE) + .insertSql(insertSql) + .testData(testDataSet) + .build(); + } + + @Override + void compareResult() throws SQLException, IOException {} + + @Override + String driverUrl() { + return "https://repo1.maven.org/maven2/cn/com/kingbase/kingbase8/8.6.0/kingbase8-8.6.0.jar"; + } + + @Override + Pair> initTestData() { + String[] fieldNames = + new String[] { + "c1", "c2", "c3", "c5", "c7", "c9", "c11", "c13", "c15", "c16", "c18", "c19", + "c20", "c21", "c23", "c25", "c26" + }; + List rows = new ArrayList<>(); + for (int i = 0; i < 100; i++) { + SeaTunnelRow row = + new SeaTunnelRow( + new Object[] { + i, + Long.parseLong(String.valueOf(i)), + Long.parseLong(String.valueOf(i)), + (short) i, + i, + Long.parseLong(String.valueOf(i)), + Float.parseFloat("1.1"), + Double.parseDouble("1.1"), + BigDecimal.valueOf(i, 10), + true, + LocalDateTime.now(), + LocalDate.now(), + LocalTime.now(), + String.valueOf(i), + String.valueOf(i), + String.valueOf(1), + String.valueOf(i) + }); + rows.add(row); + } + + return Pair.of(fieldNames, rows); + } + + @Override + GenericContainer initContainer() { + GenericContainer container = + new GenericContainer<>(KINGBASE_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(KINGBASE_CONTAINER_HOST) + .withEnv("KINGBASE_SYSTEM_PASSWORD", "123456") + .withFileSystemBind(KM_LICENSE_PATH, "/home/kingbase/license.dat") + .withLogConsumer( + new Slf4jLogConsumer( + DockerLoggerFactory.getLogger(KINGBASE_IMAGE))); + container.setPortBindings( + Lists.newArrayList(String.format("%s:%s", KINGBASE_PORT, KINGBASE_PORT))); + return container; + } + + protected void createNeededTables() { + try (Statement statement = connection.createStatement()) { + String createTemplate = jdbcCase.getCreateSql(); + + String createSource = + String.format( + createTemplate, KINGBASE_SCHEMA + "." + jdbcCase.getSourceTable()); + String createSink = + String.format(createTemplate, KINGBASE_SCHEMA + "." + jdbcCase.getSinkTable()); + + statement.execute(createSource); + statement.execute(createSink); + + connection.commit(); + } catch (Exception exception) { + log.error(ExceptionUtils.getMessage(exception)); + throw new SeaTunnelRuntimeException(JdbcITErrorCode.CREATE_TABLE_FAILED, exception); + } + } + + public String insertTable(String schema, String table, String... fields) { + String columns = String.join(", ", fields); + String placeholders = Arrays.stream(fields).map(f -> "?").collect(Collectors.joining(", ")); + + return "INSERT INTO " + + schema + + "." + + table + + " (" + + columns + + " )" + + " VALUES (" + + placeholders + + ")"; + } + + public void clearTable(String schema, String table) {} +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_kingbase_source_and_sink.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_kingbase_source_and_sink.conf new file mode 100644 index 00000000000..326fc727241 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-jdbc-e2e/connector-jdbc-e2e-part-3/src/test/resources/jdbc_kingbase_source_and_sink.conf @@ -0,0 +1,43 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source{ + jdbc{ + driver = "com.kingbase8.Driver" + url = "jdbc:kingbase8://e2e_KINGBASEDb:54321/test" + user = "SYSTEM" + password = "123456" + query ="select * from public.e2e_table_source" + } +} + + +sink { + jdbc{ + driver = "com.kingbase8.Driver" + url = "jdbc:kingbase8://e2e_KINGBASEDb:54321/test" + user = "SYSTEM" + password = "123456" + query ="INSERT INTO public.e2e_table_sink (c1, c2, c3, c5, c7, c9, c11, c13, c15, c16, c18, c19, c20, c21, c23, c25, c26) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)" + } +} +