From f7783a49f67c66d334b24aa0d12587c50e29895e Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 14 Sep 2023 23:53:52 +0800 Subject: [PATCH 1/6] [Feature][doc][Connector-V2][SqlServer] Add SqlServer connector documentation --- docs/en/connector-v2/sink/SqlServer.md | 171 ++++++++++ docs/en/connector-v2/source/MySQL-CDC.md | 1 + docs/en/connector-v2/source/SqlServer-CDC.md | 336 +++++++++---------- docs/en/connector-v2/source/SqlServer.md | 165 +++++++++ 4 files changed, 494 insertions(+), 179 deletions(-) create mode 100644 docs/en/connector-v2/sink/SqlServer.md create mode 100644 docs/en/connector-v2/source/SqlServer.md diff --git a/docs/en/connector-v2/sink/SqlServer.md b/docs/en/connector-v2/sink/SqlServer.md new file mode 100644 index 00000000000..5b10477b552 --- /dev/null +++ b/docs/en/connector-v2/sink/SqlServer.md @@ -0,0 +1,171 @@ +# SQL Server + +> JDBC SQL Server Sink Connector + +## Support Mysql Version + +- server:2008 (Or later version for information only) + +## Support Those engines + +> Spark
+> Flink
+> Seatunnel Zeta
+ +## Key features + +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) + +> Use `Xa transactions` to ensure `exactly-once`. So only support `exactly-once` for the database which is +> support `Xa transactions`. You can set `is_exactly_once=true` to enable it. + +## Description + +Write data through jdbc. Support Batch mode and Streaming mode, support concurrent writing, support exactly-once +semantics (using XA transaction guarantee). + +## Supported DataSource Info + +| datasource | supported versions | driver | url | maven | +|------------|-------------------------|----------------------------------------------|---------------------------------|-----------------------------------------------------------------------------------| +| SQL Server | support version >= 2008 | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | [Download](https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc) | + +## Database dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
+> For example SQL Server datasource: cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +| SQLserver Data type | Seatunnel Data type | +|-----------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT | BOOLEAN | +| TINYINT
SMALLINT | SHORT | +| INTEGER | INT | +| BIGINT | LONG | +| DECIMAL
NUMERIC
MONEY
SMALLMONEY | DECIMAL((Get the designated column's specified column size)+1,
(Gets the designated column's number of digits to right of the
decimal point.))) | +| REAL | FLOAT | +| FLOAT | DOUBLE | +| CHAR
NCHAR
VARCHAR
NTEXT
NVARCHAR
TEXT | STRING | +| DATE | LOCAL_DATE | +| TIME | LOCAL_TIME | +| DATETIME
DATETIME2
SMALLDATETIME
DATETIMEOFFSET | LOCAL_DATE_TIME | +| TIMESTAMP
BINARY
VARBINARY
IMAGE
UNKNOWN | Not supported yet | + +## Sink Options + +| Name | Type | Required | Default | Description | +|-------------------------------------------|---------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:sqlserver://localhost:1433;databaseName=mydatabase | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
if you use sqlServer the value is `com.microsoft.sqlserver.jdbc.SQLServerDriver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | No | - | Use this sql write upstream input datas to database. e.g `INSERT ...`,`query` have the higher priority | +| database | String | No | - | Use this `database` and `table-name` auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| table | String | No | - | Use database and this table-name auto-generate sql and receive upstream input datas write to database.
This option is mutually exclusive with `query` and has a higher priority. | +| primary_keys | Array | No | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql. | +| support_upsert_by_query_primary_key_exist | Boolean | No | false | Choose to use INSERT sql, UPDATE sql to process update events(INSERT, UPDATE_AFTER) based on query primary key exists. This configuration is only used when database unsupport upsert syntax. **Note**: that this method has low performance | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete. | +| max_retries | Int | No | 0 | The number of retries to submit failed (executeBatch) | +| batch_size | Int | No | 1000 | For batch writing, when the number of buffered records reaches the number of `batch_size` or the time reaches `checkpoint.interval`
, the data will be flushed into the database | +| is_exactly_once | Boolean | No | false | Whether to enable exactly-once semantics, which will use Xa transactions. If on, you need to
set `xa_data_source_class_name`. | +| generate_sink_sql | Boolean | No | false | Generate sql statements based on the database table you want to write to | +| xa_data_source_class_name | String | No | - | The xa data source class name of the database Driver, for example, SqlServer is `com.microsoft.sqlserver.jdbc.SQLServerXADataSource`, and
please refer to appendix for other data sources | +| max_commit_attempts | Int | No | 3 | The number of retries for transaction commit failures | +| transaction_timeout_sec | Int | No | -1 | The timeout after the transaction is opened, the default is -1 (never timeout). Note that setting the timeout may affect
exactly-once semantics | +| auto_commit | Boolean | No | true | Automatic transaction commit is enabled by default | +| common-options | | no | - | Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details | + +## tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### simple: + +> This is one that reads Sqlserver data and inserts it directly into another table + +``` +env { + # You can set engine configuration here + execution.parallelism = 10 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + user = SA + password = "Y.sa123456" + query = "select * from column_type_test.dbo.full_types_jdbc" + # Parallel sharding reads fields + partition_column = "id" + # Number of fragments + partition_num = 10 + + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform-v2/sql +} + +sink { + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + user = SA + password = "Y.sa123456" + query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" + + } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} +``` + +### CDC(Change data capture) event + +> CDC change data is also supported by us In this case, you need config database, table and primary_keys. + +``` +Jdbc { + source_table_name = "customers" + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + user = SA + password = "Y.sa123456" + generate_sink_sql = true + database = "column_type_test" + table = "dbo.full_types_sink" + batch_size = 100 + primary_keys = ["id"] +} +``` + +### Exactly Once Sink + +> Transactional writes may be slower but more accurate to the data + +``` + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + user = SA + password = "Y.sa123456" + query = "insert into full_types_jdbc_sink( id, val_char, val_varchar, val_text, val_nchar, val_nvarchar, val_ntext, val_decimal, val_numeric, val_float, val_real, val_smallmoney, val_money, val_bit, val_tinyint, val_smallint, val_int, val_bigint, val_date, val_time, val_datetime2, val_datetime, val_smalldatetime ) values( ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ? )" + is_exactly_once = "true" + + xa_data_source_class_name = "com.microsoft.sqlserver.jdbc.SQLServerXADataSource" + + } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc + +``` + diff --git a/docs/en/connector-v2/source/MySQL-CDC.md b/docs/en/connector-v2/source/MySQL-CDC.md index 6740fd4b8b2..49e05a6e22e 100644 --- a/docs/en/connector-v2/source/MySQL-CDC.md +++ b/docs/en/connector-v2/source/MySQL-CDC.md @@ -5,6 +5,7 @@ ## Support Those Engines > SeaTunnel Zeta
+> Flink
## Key features diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index 87456a02c7f..7188a6e223d 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -2,12 +2,16 @@ > SqlServer CDC source connector -## Description +## Support Mysql Version -The SqlServer CDC connector allows for reading snapshot data and incremental data from SqlServer database. This document -describes how to setup the SqlServer CDC connector to run SQL queries against SqlServer databases. +- server:2019 (Or later version for information only) -## Key features +## Support Those Engines + +> SeaTunnel Zeta
+> Flink
+ +## Key Features - [ ] [batch](../../concept/connector-v2-features.md) - [x] [stream](../../concept/connector-v2-features.md) @@ -16,201 +20,175 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq - [x] [parallelism](../../concept/connector-v2-features.md) - [x] [support user-defined split](../../concept/connector-v2-features.md) -## Options - -| name | type | required | default value | -|------------------------------------------------|----------|----------|---------------| -| username | String | Yes | - | -| password | String | Yes | - | -| database-names | List | Yes | - | -| table-names | List | Yes | - | -| base-url | String | Yes | - | -| startup.mode | Enum | No | INITIAL | -| startup.timestamp | Long | No | - | -| startup.specific-offset.file | String | No | - | -| startup.specific-offset.pos | Long | No | - | -| stop.mode | Enum | No | NEVER | -| stop.timestamp | Long | No | - | -| stop.specific-offset.file | String | No | - | -| stop.specific-offset.pos | Long | No | - | -| incremental.parallelism | Integer | No | 1 | -| snapshot.split.size | Integer | No | 8096 | -| snapshot.fetch.size | Integer | No | 1024 | -| server-time-zone | String | No | UTC | -| connect.timeout | Duration | No | 30s | -| connect.max-retries | Integer | No | 3 | -| connection.pool.size | Integer | No | 20 | -| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | -| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | -| sample-sharding.threshold | int | No | 1000 | -| inverse-sampling.rate | int | No | 1000 | -| exactly_once | Boolean | No | true | -| debezium.* | config | No | - | -| format | Enum | No | DEFAULT | -| common-options | | no | - | - -### username [String] - -Name of the database to use when connecting to the database server. - -### password [String] - -Password to use when connecting to the database server. - -### database-names [List] - -Database name of the database to monitor. - -### table-names [List] - -Table name is a combination of schema name and table name (databaseName.schemaName.tableName). - -### base-url [String] - -URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test". - -### startup.mode [Enum] - -Optional startup mode for SqlServer CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific". - -### startup.timestamp [Long] - -Start from the specified epoch timestamp (in milliseconds). - -**Note, This option is required when the "startup.mode" option used `'timestamp'`.** - -### startup.specific-offset.file [String] - -Start from the specified binlog file name. - -**Note, This option is required when the "startup.mode" option used `'specific'`.** - -### startup.specific-offset.pos [Long] - -Start from the specified binlog file position. - -**Note, This option is required when the "startup.mode" option used `'specific'`.** - -### stop.mode [Enum] - -Optional stop mode for SqlServer CDC consumer, valid enumerations are "never". - -### stop.timestamp [Long] - -Stop from the specified epoch timestamp (in milliseconds). - -**Note, This option is required when the "stop.mode" option used `'timestamp'`.** - -### stop.specific-offset.file [String] - -Stop from the specified binlog file name. - -**Note, This option is required when the "stop.mode" option used `'specific'`.** - -### stop.specific-offset.pos [Long] - -Stop from the specified binlog file position. - -**Note, This option is required when the "stop.mode" option used `'specific'`.** - -### incremental.parallelism [Integer] - -The number of parallel readers in the incremental phase. - -### snapshot.split.size [Integer] - -The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshot -of table. - -### snapshot.fetch.size [Integer] - -The maximum fetch size for per poll when read table snapshot. - -### chunk-key.even-distribution.factor.upper-bound [Double] - -The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. - -### chunk-key.even-distribution.factor.lower-bound [Double] - -The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. - -### sample-sharding.threshold [Integer] - -This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. - -### inverse-sampling.rate [Integer] - -The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. - -### server-time-zone [String] - -The session time zone in database server. - -### connect.timeout [Duration] - -The maximum time that the connector should wait after trying to connect to the database server before timing out. - -### connect.max-retries [Integer] - -The max retry times that the connector should retry to build database server connection. - -### connection.pool.size [Integer] - -The connection pool size. - -### exactly_once [Boolean] - -Enable exactly once semantic. - -### debezium [Config] - -Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SqlServer server. +## Description -See more about -the [Debezium's SqlServer Connector properties](https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html#sqlserver-connector-properties) +The SqlServer CDC connector allows for reading snapshot data and incremental data from SqlServer database. This document +describes how to setup the SqlServer CDC connector to run SQL queries against SqlServer databases. -### format [Enum] +## Supported DataSource Info + +| Datasource | Supported versions | Driver | Url | Maven | +|------------|-------------------------------------------------------------------------------------------------------|----------------------------------------------|---------------------------------|-----------------------------------------------------------------------| +| SqlServer |
  • [SqlServer](https://dev.mysql.com/doc): server:2019 (Or later version for information only)
  • | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | + +### Install Jdbc Driver + +Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For example: cp mssql-jdbc-xxx.jar `$SEATNUNNEL_HOME/lib/` + +## Data Type Mapping + +| Mysql Data type | SeaTunnel Data type | +|---------------------------------------------------------------------------------------------------|----------------------------------------------------| +| CHAR
    VARCHAR
    NCHAR
    NVARCHAR
    STRUCT
    CLOB
    LONGVARCHAR
    LONGNVARCHAR
    | STRING | +| BLOB | BYTES | +| INTEGER | INT | +| SMALLINT
    TINYINT
    | SMALLINT | +| BIGINT | BIGINT | +| FLOAT
    REAL
    | FLOAT | +| DOUBLE | DOUBLE | +| NUMERIC
    DECIMAL(column.length(), column.scale().orElse(0))
    | DECIMAL(column.length(), column.scale().orElse(0)) | +| TIMESTAMP | TIMESTAMP | +| DATE | DATE | +| TIME | TIME | +| BOOLEAN
    BIT
    | BOOLEAN | + +## Source Options + +| Name | Type | Required | Default | Description | +|------------------------------------------------|----------|----------|---------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| username | String | Yes | - | Name of the database to use when connecting to the database server. | +| password | String | Yes | - | Password to use when connecting to the database server. | +| database-names | List | Yes | - | Database name of the database to monitor. | +| table-names | List | Yes | - | Table name is a combination of schema name and table name (databaseName.schemaName.tableName). | +| base-url | String | Yes | - | URL has to be with database, like "jdbc:sqlserver://localhost:1433;databaseName=test". | +| startup.mode | Enum | No | INITIAL | Optional startup mode for SqlServer CDC consumer, valid enumerations are "initial", "earliest", "latest" and "specific". | +| startup.timestamp | Long | No | - | Start from the specified epoch timestamp (in milliseconds).
    **Note, This option is required when** the **"startup.mode" option used `'timestamp'`.** | +| startup.specific-offset.file | String | No | - | Start from the specified binlog file name.
    **Note, This option is required when the "startup.mode" option used `'specific'`.** | +| startup.specific-offset.pos | Long | No | - | Start from the specified binlog file position.
    **Note, This option is required when the "startup.mode" option used `'specific'`.** | +| stop.mode | Enum | No | NEVER | Optional stop mode for SqlServer CDC consumer, valid enumerations are "never". | +| stop.timestamp | Long | No | - | Stop from the specified epoch timestamp (in milliseconds).
    **Note, This option is required when the "stop.mode" option used `'timestamp'`.** | +| stop.specific-offset.file | String | No | - | Stop from the specified binlog file name.
    **Note, This option is required when the "stop.mode" option used `'specific'`.** | +| stop.specific-offset.pos | Long | No | - | Stop from the specified binlog file position.
    **Note, This option is required when the "stop.mode" option used `'specific'`.** | +| incremental.parallelism | Integer | No | 1 | The number of parallel readers in the incremental phase. | +| snapshot.split.size | Integer | No | 8096 | The split size (number of rows) of table snapshot, captured tables are split into multiple splits when read the snapshotof table. | +| snapshot.fetch.size | Integer | No | 1024 | The maximum fetch size for per poll when read table snapshot. | +| server-time-zone | String | No | UTC | The session time zone in database server. | +| connect.timeout | Duration | No | 30s | The maximum time that the connector should wait after trying to connect to the database server before timing out. | +| connect.max-retries | Integer | No | 3 | The max retry times that the connector should retry to build database server connection. | +| connection.pool.size | Integer | No | 20 | The connection pool size. | +| chunk-key.even-distribution.factor.upper-bound | Double | No | 100 | The upper bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be less than or equal to this upper bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is greater, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 100.0. | +| chunk-key.even-distribution.factor.lower-bound | Double | No | 0.05 | The lower bound of the chunk key distribution factor. This factor is used to determine whether the table data is evenly distributed. If the distribution factor is calculated to be greater than or equal to this lower bound (i.e., (MAX(id) - MIN(id) + 1) / row count), the table chunks would be optimized for even distribution. Otherwise, if the distribution factor is less, the table will be considered as unevenly distributed and the sampling-based sharding strategy will be used if the estimated shard count exceeds the value specified by `sample-sharding.threshold`. The default value is 0.05. | +| sample-sharding.threshold | int | No | 1000 | This configuration specifies the threshold of estimated shard count to trigger the sample sharding strategy. When the distribution factor is outside the bounds specified by `chunk-key.even-distribution.factor.upper-bound` and `chunk-key.even-distribution.factor.lower-bound`, and the estimated shard count (calculated as approximate row count / chunk size) exceeds this threshold, the sample sharding strategy will be used. This can help to handle large datasets more efficiently. The default value is 1000 shards. | +| inverse-sampling.rate | int | No | 1000 | The inverse of the sampling rate used in the sample sharding strategy. For example, if this value is set to 1000, it means a 1/1000 sampling rate is applied during the sampling process. This option provides flexibility in controlling the granularity of the sampling, thus affecting the final number of shards. It's especially useful when dealing with very large datasets where a lower sampling rate is preferred. The default value is 1000. | +| exactly_once | Boolean | No | true | Enable exactly once semantic. | +| debezium.* | config | No | - | Pass-through Debezium's properties to Debezium Embedded Engine which is used to capture data changes from SqlServer server.
    See more about
    the [Debezium's SqlServer Connector properties](https://debezium.io/documentation/reference/1.6/connectors/sqlserver.html#sqlserver-connector-properties) | +| format | Enum | No | DEFAULT | Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON". | +| common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | + +## Task Example + +### initiali read Simple + +> This is a stream mode cdc initializes read table data will be read incrementally after successful read The following sql DDL is for reference only -Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON". +``` +CREATE DATABASE column_type_test; + +USE column_type_test; +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE full_types ( + id int NOT NULL, + val_char char(3), + val_varchar varchar(1000), + val_text text, + val_nchar nchar(3), + val_nvarchar nvarchar(1000), + val_ntext ntext, + val_decimal decimal(6,3), + val_numeric numeric, + val_float float, + val_real real, + val_smallmoney smallmoney, + val_money money, + val_bit bit, + val_tinyint tinyint, + val_smallint smallint, + val_int int, + val_bigint bigint, + val_date date, + val_time time, + val_datetime2 datetime2, + val_datetime datetime, + val_smalldatetime smalldatetime, + val_xml xml + PRIMARY KEY (id) +); +``` -#### example +``` +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + execution.checkpoint.interval = 5000 +} -```conf source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** SqlServer-CDC { - debezium { - snapshot.mode = "never" - decimal.handling.mode = "double" - } + result_table_name = "customers" + username = "sa" + password = "Y.sa123456" + startup.mode="initial" + database-names = ["column_type_test"] + table-names = ["column_type_test.dbo.full_types"] + base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" } } + +transform { +} + +sink { + console { + source_table_name = "customers" + } ``` -### common options +### increment read Simple -Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. +> This is an incremental read that reads the changed data for printing -## Example +``` +env { + # You can set engine configuration here + execution.parallelism = 1 + job.mode = "STREAMING" + execution.checkpoint.interval = 5000 +} -```Jdbc { source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** SqlServer-CDC { + # Set up accurate one read + exactly_once=true result_table_name = "customers" - username = "sa" - password = "Password!" - database-names = ["exampledb"] - table-names = ["exampledb.dbo.table_x"] - base-url="jdbc:sqlserver://localhost:1433;databaseName=exampledb" + password = "Y.sa123456" + startup.mode="latest" + database-names = ["column_type_test"] + table-names = ["column_type_test.dbo.full_types"] + base-url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" } } -``` - -## Changelog -### next version +transform { +} -- Add SqlServer CDC Source Connector -- [Doc] Add SqlServer CDC Source Connector document ([3993](https://github.com/apache/seatunnel/pull/3993)) -- [Feature] Support multi-table read ([4377](https://github.com/apache/seatunnel/pull/4377)) +sink { + console { + source_table_name = "customers" + } +``` diff --git a/docs/en/connector-v2/source/SqlServer.md b/docs/en/connector-v2/source/SqlServer.md new file mode 100644 index 00000000000..758fed0ec79 --- /dev/null +++ b/docs/en/connector-v2/source/SqlServer.md @@ -0,0 +1,165 @@ +# SQL Server + +> JDBC SQL Server Source Connector + +## Support SQL Server Version + +- server:2008 (Or later version for information only) + +## Support Those Engines + +> Spark
    +> Flink
    +> Seatunnel Zeta
    + +## Key Features + +- [x] [batch](../../concept/connector-v2-features.md) +- [ ] [stream](../../concept/connector-v2-features.md) +- [x] [exactly-once](../../concept/connector-v2-features.md) +- [x] [column projection](../../concept/connector-v2-features.md) +- [x] [parallelism](../../concept/connector-v2-features.md) +- [x] [support user-defined split](../../concept/connector-v2-features.md) + +> supports query SQL and can achieve projection effect. + +## Description + +Read external data source data through JDBC. + +## Supported DataSource Info + +| datasource | supported versions | driver | url | maven | +|------------|-------------------------|----------------------------------------------|---------------------------------|-----------------------------------------------------------------------------------| +| SQL Server | support version >= 2008 | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | [Download](https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc) | + +## Database dependency + +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
    +> For example SQL Server datasource: cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ + +## Data Type Mapping + +| SQLserver Data type | Seatunnel Data type | +|-----------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------| +| BIT | BOOLEAN | +| TINYINT
    SMALLINT | SHORT | +| INTEGER | INT | +| BIGINT | LONG | +| DECIMAL
    NUMERIC
    MONEY
    SMALLMONEY | DECIMAL((Get the designated column's specified column size)+1,
    (Gets the designated column's number of digits to right of the
    decimal point.))) | +| REAL | FLOAT | +| FLOAT | DOUBLE | +| CHAR
    NCHAR
    VARCHAR
    NTEXT
    NVARCHAR
    TEXT | STRING | +| DATE | LOCAL_DATE | +| TIME | LOCAL_TIME | +| DATETIME
    DATETIME2
    SMALLDATETIME
    DATETIMEOFFSET | LOCAL_DATE_TIME | +| TIMESTAMP
    BINARY
    VARBINARY
    IMAGE
    UNKNOWN | Not supported yet | + +## Source Options + +| name | type | required | default | Description | +|------------------------------|--------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:sqlserver://127.0.0.1:1434;database=TestDB | +| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,
    if you use SQLserver the value is `com.microsoft.sqlserver.jdbc.SQLServerDriver`. | +| user | String | No | - | Connection instance user name | +| password | String | No | - | Connection instance password | +| query | String | Yes | - | Query statement | +| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete | +| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type. | +| partition_lower_bound | Long | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. | +| partition_upper_bound | Long | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. | +| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism | +| fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure
    the row fetch size used in the query toimprove performance by
    reducing the number database hits required to satisfy the selection criteria.
    Zero means use jdbc default value. | +| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details | + +## tips + +> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed in parallel according to the concurrency of tasks. + +## Task Example + +### Simple: + +> Read the table directly and print to the console DDL is for reference only + +``` +CREATE DATABASE column_type_test; + +USE column_type_test; +EXEC sys.sp_cdc_enable_db; + +CREATE TABLE full_types_jdbc ( + id int NOT NULL, + val_char char(3), + val_varchar varchar(1000), + val_text text, + val_nchar nchar(3), + val_nvarchar nvarchar(1000), + val_ntext ntext, + val_decimal decimal(6,3), + val_numeric numeric, + val_float float, + val_real real, + val_smallmoney smallmoney, + val_money money, + val_bit bit, + val_tinyint tinyint, + val_smallint smallint, + val_int int, + val_bigint bigint, + val_date date, + val_time time, + val_datetime2 datetime2, + val_datetime datetime, + val_smalldatetime smalldatetime + PRIMARY KEY (id) +); + +``` + +### Parallel: + +> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table + +### Fragmented Parallel Read Simple: + +> It is a shard that reads data in parallel fast + +``` +env { + # You can set engine configuration here + execution.parallelism = 10 +} + +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + user = SA + password = "Y.sa123456" + query = "select * from column_type_test.dbo.full_types_jdbc" + # Parallel sharding reads fields + partition_column = "id" + # Number of fragments + partition_num = 10 + + } + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/Jdbc +} + + +transform { + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform-v2/sql +} + +sink { + Console {} + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc +} +``` + From ff1bc0db32fb8299669ae10e1612af9e0cbe25cb Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 15 Sep 2023 15:32:32 +0800 Subject: [PATCH 2/6] [Feature][doc][Connector-V2][SqlServer] Add SqlServer connector documentation --- docs/en/connector-v2/source/SqlServer.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/SqlServer.md b/docs/en/connector-v2/source/SqlServer.md index 758fed0ec79..7f3ffca1a8c 100644 --- a/docs/en/connector-v2/source/SqlServer.md +++ b/docs/en/connector-v2/source/SqlServer.md @@ -35,7 +35,7 @@ Read external data source data through JDBC. ## Database dependency -> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
    +> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/' working directory
    > For example SQL Server datasource: cp mssql-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/ ## Data Type Mapping From 4f44f1ef5227b1968cf441ad1f436c83e835a865 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Fri, 15 Sep 2023 17:59:28 +0800 Subject: [PATCH 3/6] [Feature][doc][Connector-V2][SqlServer] Add SqlServer connector documentation --- docs/en/connector-v2/sink/SqlServer.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/connector-v2/sink/SqlServer.md b/docs/en/connector-v2/sink/SqlServer.md index 5b10477b552..7ec5d453cf8 100644 --- a/docs/en/connector-v2/sink/SqlServer.md +++ b/docs/en/connector-v2/sink/SqlServer.md @@ -2,7 +2,7 @@ > JDBC SQL Server Sink Connector -## Support Mysql Version +## Support SQL Server Version - server:2008 (Or later version for information only) From 900f07e7d93864667d31d54d8a5a568bab1d02a0 Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 25 Sep 2023 18:00:00 +0800 Subject: [PATCH 4/6] fix doc error --- docs/en/connector-v2/source/SqlServer-CDC.md | 47 ++-------- docs/en/connector-v2/source/SqlServer.md | 92 ++++++++++++-------- 2 files changed, 64 insertions(+), 75 deletions(-) diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index 7188a6e223d..4727ef05d8a 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -1,8 +1,8 @@ -# SqlServer CDC +# SQL Server CDC > SqlServer CDC source connector -## Support Mysql Version +## Support SQL Server Version - server:2019 (Or later version for information only) @@ -27,9 +27,9 @@ describes how to setup the SqlServer CDC connector to run SQL queries against Sq ## Supported DataSource Info -| Datasource | Supported versions | Driver | Url | Maven | -|------------|-------------------------------------------------------------------------------------------------------|----------------------------------------------|---------------------------------|-----------------------------------------------------------------------| -| SqlServer |
  • [SqlServer](https://dev.mysql.com/doc): server:2019 (Or later version for information only)
  • | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433 | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | +| Datasource | Supported versions | Driver | Url | Maven | +|------------|---------------------------------------------------------------|----------------------------------------------|---------------------------------------------------------------|-----------------------------------------------------------------------| +| SqlServer |
  • server:2019 (Or later version for information only)
  • | com.microsoft.sqlserver.jdbc.SQLServerDriver | jdbc:sqlserver://localhost:1433;databaseName=column_type_test | https://mvnrepository.com/artifact/com.microsoft.sqlserver/mssql-jdbc | ### Install Jdbc Driver @@ -37,7 +37,7 @@ Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For ex ## Data Type Mapping -| Mysql Data type | SeaTunnel Data type | +| SQLserver Data type | SeaTunnel Data type | |---------------------------------------------------------------------------------------------------|----------------------------------------------------| | CHAR
    VARCHAR
    NCHAR
    NVARCHAR
    STRUCT
    CLOB
    LONGVARCHAR
    LONGNVARCHAR
    | STRING | | BLOB | BYTES | @@ -91,41 +91,6 @@ Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For ex > This is a stream mode cdc initializes read table data will be read incrementally after successful read The following sql DDL is for reference only -``` -CREATE DATABASE column_type_test; - -USE column_type_test; -EXEC sys.sp_cdc_enable_db; - -CREATE TABLE full_types ( - id int NOT NULL, - val_char char(3), - val_varchar varchar(1000), - val_text text, - val_nchar nchar(3), - val_nvarchar nvarchar(1000), - val_ntext ntext, - val_decimal decimal(6,3), - val_numeric numeric, - val_float float, - val_real real, - val_smallmoney smallmoney, - val_money money, - val_bit bit, - val_tinyint tinyint, - val_smallint smallint, - val_int int, - val_bigint bigint, - val_date date, - val_time time, - val_datetime2 datetime2, - val_datetime datetime, - val_smalldatetime smalldatetime, - val_xml xml - PRIMARY KEY (id) -); -``` - ``` env { # You can set engine configuration here diff --git a/docs/en/connector-v2/source/SqlServer.md b/docs/en/connector-v2/source/SqlServer.md index 7f3ffca1a8c..0e5701d7bcd 100644 --- a/docs/en/connector-v2/source/SqlServer.md +++ b/docs/en/connector-v2/source/SqlServer.md @@ -80,46 +80,71 @@ Read external data source data through JDBC. ### Simple: -> Read the table directly and print to the console DDL is for reference only +> Simple single task to read the data table ``` -CREATE DATABASE column_type_test; - -USE column_type_test; -EXEC sys.sp_cdc_enable_db; - -CREATE TABLE full_types_jdbc ( - id int NOT NULL, - val_char char(3), - val_varchar varchar(1000), - val_text text, - val_nchar nchar(3), - val_nvarchar nvarchar(1000), - val_ntext ntext, - val_decimal decimal(6,3), - val_numeric numeric, - val_float float, - val_real real, - val_smallmoney smallmoney, - val_money money, - val_bit bit, - val_tinyint tinyint, - val_smallint smallint, - val_int int, - val_bigint bigint, - val_date date, - val_time time, - val_datetime2 datetime2, - val_datetime datetime, - val_smalldatetime smalldatetime - PRIMARY KEY (id) -); +# Defining the runtime environment +env { + # You can set flink configuration here + execution.parallelism = 1 + job.mode = "BATCH" +} +source{ + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + user = SA + password = "Y.sa123456" + query = "select * from full_types_jdbc" + } +} +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform-v2/sql +} + +sink { + Console {} +} ``` ### Parallel: -> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table +> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want to read the whole table + +``` +env { + # You can set flink configuration here + execution.parallelism = 10 + job.mode = "BATCH" +} + +source { + Jdbc { + driver = com.microsoft.sqlserver.jdbc.SQLServerDriver + url = "jdbc:sqlserver://localhost:1433;databaseName=column_type_test" + user = SA + password = "Y.sa123456" + # Define query logic as required + query = "select * from full_types_jdbc" + # Parallel sharding reads fields + partition_column = "id" + # Number of fragments + partition_num = 10 + } +} + +transform { + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/transform-v2/sql +} + +sink { + Console {} +} + +``` ### Fragmented Parallel Read Simple: @@ -151,7 +176,6 @@ source { transform { - # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, # please go to https://seatunnel.apache.org/docs/transform-v2/sql } From c564faf3f08fe931ee7550ae34cb346e19b6e66e Mon Sep 17 00:00:00 2001 From: zhilinli Date: Thu, 26 Oct 2023 20:10:45 +0800 Subject: [PATCH 5/6] update --- docs/en/connector-v2/source/SqlServer-CDC.md | 33 ++++++++++++++++++-- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index 4727ef05d8a..b5f357f4c9d 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -1,6 +1,6 @@ # SQL Server CDC -> SqlServer CDC source connector +> Sql Server CDC source connector ## Support SQL Server Version @@ -22,8 +22,8 @@ ## Description -The SqlServer CDC connector allows for reading snapshot data and incremental data from SqlServer database. This document -describes how to setup the SqlServer CDC connector to run SQL queries against SqlServer databases. +The Sql Server CDC connector allows for reading snapshot data and incremental data from SqlServer database. This document +describes how to setup the Sql Server CDC connector to run SQL queries against SqlServer databases. ## Supported DataSource Info @@ -85,6 +85,33 @@ Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For ex | format | Enum | No | DEFAULT | Optional output format for SqlServer CDC, valid enumerations are "DEFAULT"、"COMPATIBLE_DEBEZIUM_JSON". | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | +### Enable Sql Server CDC + +1. Check whether the CDC Agent is enabled + +> EXEC xp_servicecontrol N'querystate', N'SQLServerAGENT';
    +> If the result is running, prove that it is enabled. Otherwise, you need to manually enable it +> 2. Enable the CDC Agent +> /opt/mssql/bin/mssql-conf setup + +3. The result is as follows + +> 1) Evaluation (free, no production use rights, 180-day limit) +> 2) Developer (free, no production use rights) +> 3) Express (free) +> 4) Web (PAID) +> 5) Standard (PAID) +> 6) Enterprise (PAID) +> 7) Enterprise Core (PAID) +> 8) I bought a license through a retail sales channel and have a product key to enter. + +4. Set the CDC at the library level + Set the library level below to enable CDC. At this level, all tables under the libraries of the enabled CDC automatically enable CDC + +> USE TestDB; -- Replace with the actual database name
    +> EXEC sys.sp_cdc_enable_db;
    +> SELECT name, is_tracked_by_cdc FROM sys.tables WHERE name = 'table'; -- table Replace with the name of the table you want to check + ## Task Example ### initiali read Simple From 2dd457c04fc9e87649a3c3fb94d2ff3033cf24eb Mon Sep 17 00:00:00 2001 From: zhilinli Date: Mon, 30 Oct 2023 19:02:34 +0800 Subject: [PATCH 6/6] update --- docs/en/connector-v2/source/SqlServer-CDC.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/en/connector-v2/source/SqlServer-CDC.md b/docs/en/connector-v2/source/SqlServer-CDC.md index b5f357f4c9d..fbb3d6c3938 100644 --- a/docs/en/connector-v2/source/SqlServer-CDC.md +++ b/docs/en/connector-v2/source/SqlServer-CDC.md @@ -91,7 +91,9 @@ Please download and put SqlServer driver in `${SEATUNNEL_HOME}/lib/` dir. For ex > EXEC xp_servicecontrol N'querystate', N'SQLServerAGENT';
    > If the result is running, prove that it is enabled. Otherwise, you need to manually enable it -> 2. Enable the CDC Agent + +2.Enable the CDC Agent + > /opt/mssql/bin/mssql-conf setup 3. The result is as follows