diff --git a/docs/content/connectors/oracle-cdc.md b/docs/content/connectors/oracle-cdc.md
index 9842a06bdd9..7f5bfa1dd5b 100644
--- a/docs/content/connectors/oracle-cdc.md
+++ b/docs/content/connectors/oracle-cdc.md
@@ -96,6 +96,7 @@ You have to enable log archiving for Oracle database and define an Oracle user w
GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
GRANT SELECT ANY TRANSACTION TO flinkuser;
GRANT LOGMINING TO flinkuser;
+ GRANT ANALYZE ANY TO flinkuser;
GRANT CREATE TABLE TO flinkuser;
-- need not to execute if set scan.incremental.snapshot.enabled=true(default)
@@ -368,6 +369,14 @@ Connector Options
so it does not need to be explicitly configured 'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true'
+
+ scan.incremental.snapshot.chunk.key-column |
+ optional |
+ (none) |
+ String |
+ The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table.
+ By default, the chunk key is 'ROWID'. This column must be a column of the primary key. |
+
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java
index 0d5e4c2b93a..c26285b3cd3 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/main/java/com/ververica/cdc/connectors/oracle/table/OracleTableSource.java
@@ -186,6 +186,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.distributionFactorLower(distributionFactorLower)
.closeIdleReaders(closeIdleReaders)
.skipSnapshotBackfill(skipSnapshotBackfill)
+ .chunkKeyColumn(chunkKeyColumn)
.build();
return SourceProvider.of(oracleChangeEventSource);
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java
index 29d3c4669a9..88f538905ae 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/java/com/ververica/cdc/connectors/oracle/table/OracleConnectorITCase.java
@@ -356,6 +356,10 @@ public void testSkipNestedTables() throws Exception {
public void testConsumingAllEventsByChunkKeyColumn() throws Exception {
createAndInitialize("product.sql");
+ try (Connection dbaConnection = getJdbcConnectionAsDBA();
+ Statement dbaStatement = dbaConnection.createStatement()) {
+ dbaStatement.execute("GRANT ANALYZE ANY TO " + ORACLE_CONTAINER.getUsername());
+ }
if (!parallelismSnapshot) {
return;
}
@@ -383,8 +387,8 @@ public void testConsumingAllEventsByChunkKeyColumn() throws Exception {
+ ")",
ORACLE_CONTAINER.getHost(),
ORACLE_CONTAINER.getOraclePort(),
- "dbzuser",
- "dbz",
+ ORACLE_CONTAINER.getUsername(),
+ ORACLE_CONTAINER.getPassword(),
parallelismSnapshot,
"debezium",
"products");
@@ -985,4 +989,70 @@ private static int sinkSize(String sinkName) {
}
}
}
+
+ @Test
+ public void testCompositePkTableSplitsUnevenlyWithChunkKeyColumn() throws Exception {
+ if (parallelismSnapshot) {
+ testUseChunkColumn("PRODUCT_KIND");
+ }
+ }
+
+ @Test
+ public void testCompositePkTableSplitsEvenlyWithChunkKeyColumn() throws Exception {
+ if (parallelismSnapshot) {
+ testUseChunkColumn("PRODUCT_NO");
+ }
+ }
+
+ private void testUseChunkColumn(String chunkColumn) throws Exception {
+ createAndInitialize("customer.sql");
+ try (Connection dbaConnection = getJdbcConnectionAsDBA();
+ Statement dbaStatement = dbaConnection.createStatement()) {
+ dbaStatement.execute("GRANT ANALYZE ANY TO " + ORACLE_CONTAINER.getUsername());
+ }
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE evenly_shopping_cart (\n"
+ + " PRODUCT_NO INT NOT NULL,\n"
+ + " PRODUCT_KIND VARCHAR(255),\n"
+ + " USER_ID VARCHAR(255) NOT NULL,\n"
+ + " DESCRIPTION VARCHAR(255) NOT NULL\n"
+ + ") WITH ("
+ + " 'connector' = 'oracle-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.key-column' = '%s',"
+ + " 'scan.incremental.snapshot.chunk.size' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s'"
+ + ")",
+ ORACLE_CONTAINER.getHost(),
+ ORACLE_CONTAINER.getOraclePort(),
+ ORACLE_CONTAINER.getUsername(),
+ ORACLE_CONTAINER.getPassword(),
+ parallelismSnapshot,
+ chunkColumn,
+ 4,
+ "ORCLCDB",
+ "DEBEZIUM",
+ "EVENLY_SHOPPING_CART");
+ String sinkDDL =
+ "CREATE TABLE sink "
+ + " WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false'"
+ + ") LIKE evenly_shopping_cart (EXCLUDING OPTIONS)";
+
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result = tEnv.executeSql("INSERT INTO sink SELECT * FROM evenly_shopping_cart");
+ waitForSinkSize("sink", 12);
+ result.getJobClient().get().cancel().get();
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/resources/ddl/customer.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/resources/ddl/customer.sql
index d1b0d8b3903..c37cad1101f 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/resources/ddl/customer.sql
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/src/test/resources/ddl/customer.sql
@@ -78,3 +78,27 @@ INSERT INTO DEBEZIUM.CUSTOMERS_1 VALUES (1017,'user_18','Shanghai','123567891234
INSERT INTO DEBEZIUM.CUSTOMERS_1 VALUES (1018,'user_19','Shanghai','123567891234');
INSERT INTO DEBEZIUM.CUSTOMERS_1 VALUES (1019,'user_20','Shanghai','123567891234');
INSERT INTO DEBEZIUM.CUSTOMERS_1 VALUES (2000,'user_21','Shanghai','123567891234');
+
+-- table has combined primary key and one of the primary key is evenly
+CREATE TABLE EVENLY_SHOPPING_CART (
+ PRODUCT_NO INT NOT NULL,
+ PRODUCT_KIND VARCHAR(255),
+ USER_ID VARCHAR(255) NOT NULL,
+ DESCRIPTION VARCHAR(255) NOT NULL,
+ PRIMARY KEY(PRODUCT_KIND, PRODUCT_NO, USER_ID)
+);
+
+ALTER TABLE DEBEZIUM.EVENLY_SHOPPING_CART ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
+
+INSERT INTO EVENLY_SHOPPING_CART VALUES (101, 'KIND_001', 'user_1', 'my shopping cart');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (102, 'KIND_002', 'user_1', 'my shopping cart');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (103, 'KIND_007', 'user_1', 'my shopping cart');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (104, 'KIND_008', 'user_1', 'my shopping cart');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (105, 'KIND_100', 'user_2', 'my shopping list');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (105, 'KIND_999', 'user_3', 'my shopping list');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (107, 'KIND_010', 'user_4', 'my shopping list');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (108, 'KIND_009', 'user_4', 'my shopping list');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (109, 'KIND_002', 'user_5', 'leo list');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (111, 'KIND_007', 'user_5', 'leo list');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (111, 'KIND_008', 'user_5', 'leo list');
+INSERT INTO EVENLY_SHOPPING_CART VALUES (112, 'KIND_009', 'user_6', 'my shopping cart');