Skip to content

Commit

Permalink
[cdc-connector][sqlserver] sqlserver support use specific chunk colum…
Browse files Browse the repository at this point in the history
…n as a split key
  • Loading branch information
gong committed Jan 6, 2024
1 parent 5579444 commit 878bba0
Show file tree
Hide file tree
Showing 6 changed files with 121 additions and 8 deletions.
8 changes: 8 additions & 0 deletions docs/content/connectors/sqlserver-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,14 @@ Connector Options
If the flink version is greater than or equal to 1.15, the default value of 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' has been changed to true,
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
</td>
</tr>
<tr>
<td>scan.incremental.snapshot.chunk.key-column</td>
<td>optional</td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table.
By default, the chunk key is the first column of the primary key. This column must be a column of the primary key.</td>
</tr>
</tbody>
</table>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ public Collection<SnapshotSplit> generateSplits(TableId tableId) {
long start = System.currentTimeMillis();

Table table = dialect.queryTableSchema(jdbc, tableId).getTable();
Column splitColumn = SqlServerUtils.getSplitColumn(table);
Column splitColumn =
SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
final List<ChunkRange> chunks;
try {
chunks = splitTableIntoChunks(jdbc, tableId, splitColumn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.debezium.pipeline.metrics.StreamingChangeEventSourceMetrics;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.relational.Column;
import io.debezium.relational.Table;
import io.debezium.relational.TableId;
import io.debezium.relational.Tables.TableFilter;
Expand Down Expand Up @@ -205,7 +206,8 @@ public SqlServerDatabaseSchema getDatabaseSchema() {

@Override
public RowType getSplitType(Table table) {
return SqlServerUtils.getSplitType(table);
Column splitColumn = SqlServerUtils.getSplitColumn(table, sourceConfig.getChunkKeyColumn());
return SqlServerUtils.getSplitType(splitColumn);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import io.debezium.util.SchemaNameAdjuster;
import org.apache.kafka.connect.source.SourceRecord;

import javax.annotation.Nullable;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
Expand Down Expand Up @@ -158,7 +160,7 @@ public static Object queryNextChunkMax(
});
}

public static Column getSplitColumn(Table table) {
public static Column getSplitColumn(Table table, @Nullable String chunkKeyColumn) {
List<Column> primaryKeys = table.primaryKeyColumns();
if (primaryKeys.isEmpty()) {
throw new ValidationException(
Expand All @@ -168,15 +170,27 @@ public static Column getSplitColumn(Table table) {
table.id()));
}

if (chunkKeyColumn != null) {
Optional<Column> targetPkColumn =
primaryKeys.stream()
.filter(col -> chunkKeyColumn.equals(col.name()))
.findFirst();
if (targetPkColumn.isPresent()) {
return targetPkColumn.get();
}
throw new ValidationException(
String.format(
"Chunk key column '%s' doesn't exist in the primary key [%s] of the table %s.",
chunkKeyColumn,
primaryKeys.stream().map(Column::name).collect(Collectors.joining(",")),
table.id()));
}

// use first field in primary key as the split key
return primaryKeys.get(0);
}

public static RowType getSplitType(Table table) {
return getSplitType(getSplitColumn(table));
}

private static RowType getSplitType(Column splitColumn) {
public static RowType getSplitType(Column splitColumn) {
return (RowType)
ROW(FIELD(splitColumn.name(), SqlServerTypeUtils.fromDbzColumn(splitColumn)))
.getLogicalType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,4 +472,67 @@ public void testMetadataColumns() throws Throwable {
assertEquals(expected, actual);
result.getJobClient().get().cancel().get();
}

@Test
public void testCompositePkTableSplitsUnevenlyWithChunkKeyColumn()
throws InterruptedException, ExecutionException {
if (parallelismSnapshot) {
testUseChunkColumn("product_kind");
}
}

@Test
public void testCompositePkTableSplitsEvenlyWithChunkKeyColumn()
throws ExecutionException, InterruptedException {
if (parallelismSnapshot) {
testUseChunkColumn("product_no");
}
}

private void testUseChunkColumn(String chunkColumn)
throws InterruptedException, ExecutionException {
initializeSqlServerTable("customer");
String sourceDDL =
String.format(
"CREATE TABLE evenly_shopping_cart (\n"
+ " product_no INT NOT NULL,\n"
+ " product_kind VARCHAR(255),\n"
+ " user_id VARCHAR(255) NOT NULL,\n"
+ " description VARCHAR(255) NOT NULL\n"
+ ") WITH ("
+ " 'connector' = 'sqlserver-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'scan.incremental.snapshot.chunk.key-column' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s',"
+ " 'database-name' = '%s',"
+ " 'table-name' = '%s'"
+ ")",
MSSQL_SERVER_CONTAINER.getHost(),
MSSQL_SERVER_CONTAINER.getMappedPort(MS_SQL_SERVER_PORT),
MSSQL_SERVER_CONTAINER.getUsername(),
MSSQL_SERVER_CONTAINER.getPassword(),
parallelismSnapshot,
chunkColumn,
4,
"customer",
"dbo.evenly_shopping_cart");
String sinkDDL =
"CREATE TABLE sink "
+ " WITH ("
+ " 'connector' = 'values',"
+ " 'sink-insert-only' = 'false'"
+ ") LIKE evenly_shopping_cart (EXCLUDING OPTIONS)";

tEnv.executeSql(sourceDDL);
tEnv.executeSql(sinkDDL);

// async submit job
TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM evenly_shopping_cart");
waitForSinkSize("sink", 12);
result.getJobClient().get().cancel().get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,28 @@ VALUES (101,'user_1','Shanghai','123567891234'),
(1019,'user_20','Shanghai','123567891234'),
(2000,'user_21','Shanghai','123567891234');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'customers_1', @role_name = NULL, @supports_net_changes = 0;

-- table has combined primary key and one of the primary key is evenly
CREATE TABLE evenly_shopping_cart (
product_no INT NOT NULL,
product_kind VARCHAR(255),
user_id VARCHAR(255) NOT NULL,
description VARCHAR(255) NOT NULL,
PRIMARY KEY(product_kind, product_no, user_id)
);

insert into evenly_shopping_cart
VALUES (101, 'KIND_001', 'user_1', 'my shopping cart'),
(102, 'KIND_002', 'user_1', 'my shopping cart'),
(103, 'KIND_007', 'user_1', 'my shopping cart'),
(104, 'KIND_008', 'user_1', 'my shopping cart'),
(105, 'KIND_100', 'user_2', 'my shopping list'),
(105, 'KIND_999', 'user_3', 'my shopping list'),
(107, 'KIND_010', 'user_4', 'my shopping list'),
(108, 'KIND_009', 'user_4', 'my shopping list'),
(109, 'KIND_002', 'user_5', 'leo list'),
(111, 'KIND_007', 'user_5', 'leo list'),
(111, 'KIND_008', 'user_5', 'leo list'),
(112, 'KIND_009', 'user_6', 'my shopping cart');
EXEC sys.sp_cdc_enable_table @source_schema = 'dbo', @source_name = 'evenly_shopping_cart', @role_name = NULL, @supports_net_changes = 0;

0 comments on commit 878bba0

Please sign in to comment.