diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java index e8c39ce3df7..7f7691961a8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/factory/MySqlDataSourceFactory.java @@ -36,12 +36,14 @@ import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils; import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectPath; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.Duration; import java.time.ZoneId; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -60,6 +62,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_NEWLY_ADDED_TABLE_ENABLED; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE; @@ -184,6 +187,35 @@ public DataSource createDataSource(Context context) { } configFactory.tableList(capturedTables.toArray(new String[0])); + String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); + if (chunkKeyColumns != null) { + Map chunkKeyColumnMap = new HashMap<>(); + List tableIds = + MySqlSchemaUtils.listTables(configFactory.createConfig(0), null); + for (String chunkKeyColumn : chunkKeyColumns.split(";")) { + String[] splits = chunkKeyColumn.split(":"); + if (splits.length == 2) { + Selectors chunkKeySelector = + new Selectors.SelectorsBuilder().includeTables(splits[0]).build(); + List tableList = + getChunkKeyColumnTableList(tableIds, chunkKeySelector); + for (ObjectPath table : tableList) { + chunkKeyColumnMap.put(table, splits[1]); + } + } else { + throw new IllegalArgumentException( + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key() + + " = " + + chunkKeyColumns + + " failed to be parsed in this part '" + + chunkKeyColumn + + "'."); + } + } + LOG.info("Add chunkKeyColumn {}.", chunkKeyColumnMap); + configFactory.chunkKeyColumn(chunkKeyColumnMap); + } + return new MySqlDataSource(configFactory); } @@ -219,6 +251,7 @@ public Set> optionalOptions() { options.add(CONNECTION_POOL_SIZE); options.add(HEARTBEAT_INTERVAL); options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED); + options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN); options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED); options.add(CHUNK_META_GROUP_SIZE); options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND); @@ -246,6 +279,14 @@ private static List getTableList(MySqlSourceConfig sourceConfig, Selecto .collect(Collectors.toList()); } + private static List getChunkKeyColumnTableList( + List tableIds, Selectors selectors) { + return tableIds.stream() + .filter(selectors::isMatch) + .map(tableId -> new ObjectPath(tableId.getSchemaName(), tableId.getTableName())) + .collect(Collectors.toList()); + } + private static StartupOptions getStartupOptions(Configuration config) { String modeString = config.get(SCAN_STARTUP_MODE); diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java index f6f1a671a0a..9a18350b348 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceOptions.java @@ -214,6 +214,16 @@ public class MySqlDataSourceOptions { + " and the query MySQL for splitting would happen when it is uneven." + " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount."); + @Experimental + public static final ConfigOption SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN = + ConfigOptions.key("scan.incremental.snapshot.chunk.key-column") + .stringType() + .noDefaultValue() + .withDescription( + "The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table." + + "By default, the chunk key is the first column of the primary key." + + "eg. db1.user_table_[0-9]+:col1;db[1-2].[app|web]_order_\\.*:col2;"); + @Experimental public static final ConfigOption SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED = ConfigOptions.key("scan.incremental.close-idle-reader.enabled") diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java index b1aab84b098..c7b4424807d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSourceFactoryTest.java @@ -23,6 +23,7 @@ import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory; import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase; import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.catalog.ObjectPath; import org.junit.Test; @@ -38,6 +39,7 @@ import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT; +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME; @@ -107,6 +109,7 @@ public void testExcludeTable() { .isEqualTo( Arrays.asList( inventoryDatabase.getDatabaseName() + ".customers", + inventoryDatabase.getDatabaseName() + ".multi_max_table", inventoryDatabase.getDatabaseName() + ".products")); } @@ -239,6 +242,40 @@ public void testPrefixRequireOption() { .isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".products")); } + @Test + public void testAddChunkKeyColumns() { + inventoryDatabase.createAndInitialize(); + Map options = new HashMap<>(); + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); + options.put(USERNAME.key(), TEST_USER); + options.put(PASSWORD.key(), TEST_PASSWORD); + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*"); + options.put( + SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key(), + inventoryDatabase.getDatabaseName() + + ".multi_max_\\.*:order_id;" + + inventoryDatabase.getDatabaseName() + + ".products:id;"); + Factory.Context context = new MockContext(Configuration.fromMap(options)); + + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); + ObjectPath multiMaxTable = + new ObjectPath(inventoryDatabase.getDatabaseName(), "multi_max_table"); + ObjectPath productsTable = new ObjectPath(inventoryDatabase.getDatabaseName(), "products"); + + assertThat(dataSource.getSourceConfig().getChunkKeyColumns()) + .isNotEmpty() + .isEqualTo( + new HashMap() { + { + put(multiMaxTable, "order_id"); + put(productsTable, "id"); + } + }); + } + class MockContext implements Factory.Context { Configuration factoryConfiguration; diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql index 86d985e992e..626107f5cf0 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/resources/ddl/inventory.sql @@ -69,4 +69,24 @@ VALUES (default, '2016-01-16', 1001, 1, 102), (default, '2016-02-19', 1002, 2, 106), (default, '16-02-21', 1003, 1, 107); +CREATE TABLE `multi_max_table` +( + `order_id` varchar(128) NOT NULL, + `index` int(11) NOT NULL, + `desc` varchar(512) NOT NULL, + PRIMARY KEY (`order_id`, `index`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8; +INSERT INTO multi_max_table +VALUES ('', 0, 'flink'), + ('', 1, 'flink'), + ('', 2, 'flink'), + ('a', 0, 'flink'), + ('b', 0, 'flink'), + ('c', 0, 'flink'), + ('d', 0, 'flink'), + ('E', 0, 'flink'), + ('E', 1, 'flink'), + ('E', 2, 'flink'), + ('e', 4, 'flink'), + ('E', 3, 'flink');