Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-35884][mysql-pipeline-connector] MySQL pipeline support snapshot chunk key-column #3490

Merged
merged 9 commits into from
Aug 12, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
[FLINK-35884] MySQL pipeline support snapshot chunk key-column
  • Loading branch information
wangjunbo committed Aug 6, 2024

Verified

This commit was signed with the committer’s verified signature.
hansieodendaal Hansie Odendaal
commit e0b5a0ca8036c2b58fa66abb91d9e5523c919c5d
Original file line number Diff line number Diff line change
@@ -36,6 +36,7 @@
import org.apache.flink.cdc.connectors.mysql.utils.MySqlSchemaUtils;
import org.apache.flink.cdc.connectors.mysql.utils.OptionUtils;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -60,6 +61,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_STARTUP_MODE;
@@ -181,6 +183,25 @@ public DataSource createDataSource(Context context) {
}
configFactory.tableList(capturedTables.toArray(new String[0]));

String chunkKeyColumns = config.get(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);
for (String chunkKeyColumn : chunkKeyColumns.split(";")) {
String[] splits = chunkKeyColumn.split(":");
if (splits.length == 2) {
Selectors chunkKeySelector =
new Selectors.SelectorsBuilder().includeTables(splits[0]).build();
List<ObjectPath> tableList =
getChunkKeyColumnTableList(configFactory.createConfig(0), chunkKeySelector);
for (ObjectPath table : tableList) {
LOG.info("Add chunkKeyColumn {} {}.", table, splits[1]);
configFactory.chunkKeyColumn(table, splits[1]);
}
} else {
throw new IllegalArgumentException(
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key()
+ " is malformed, please refer to the documents");
}
}

return new MySqlDataSource(configFactory);
}

@@ -216,6 +237,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(CONNECTION_POOL_SIZE);
options.add(HEARTBEAT_INTERVAL);
options.add(SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED);
options.add(SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN);

options.add(CHUNK_META_GROUP_SIZE);
options.add(CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND);
@@ -243,6 +265,14 @@ private static List<String> getTableList(MySqlSourceConfig sourceConfig, Selecto
.collect(Collectors.toList());
}

private static List<ObjectPath> getChunkKeyColumnTableList(
MySqlSourceConfig sourceConfig, Selectors selectors) {
return MySqlSchemaUtils.listTables(sourceConfig, null).stream()
.filter(selectors::isMatch)
.map(tableId -> new ObjectPath(tableId.getSchemaName(), tableId.getTableName()))
.collect(Collectors.toList());
}

private static StartupOptions getStartupOptions(Configuration config) {
String modeString = config.get(SCAN_STARTUP_MODE);

Original file line number Diff line number Diff line change
@@ -214,6 +214,17 @@ public class MySqlDataSourceOptions {
+ " and the query MySQL for splitting would happen when it is uneven."
+ " The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.");

@Experimental
public static final ConfigOption<String> SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN =
ConfigOptions.key("scan.incremental.snapshot.chunk.key-column")
.stringType()
.noDefaultValue()
.withDescription(
"The chunk key of table snapshot, captured tables are split into multiple chunks by a chunk key when read the snapshot of table."
+ "By default, the chunk key is the first column of the primary key."
+ "This column must be a column of the primary key."
+ "eg. db0.table1:col1;db0.table2:col2;");

@Experimental
public static final ConfigOption<Boolean> SCAN_INCREMENTAL_CLOSE_IDLE_READER_ENABLED =
ConfigOptions.key("scan.incremental.close-idle-reader.enabled")
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectPath;

import org.junit.Test;

@@ -38,6 +39,7 @@
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.HOSTNAME;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PASSWORD;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.PORT;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE;
import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.USERNAME;
@@ -239,6 +241,35 @@ public void testPrefixRequireOption() {
.isEqualTo(Arrays.asList(inventoryDatabase.getDatabaseName() + ".products"));
}

@Test
public void testAddChunkKeyColumns() {
inventoryDatabase.createAndInitialize();
Map<String, String> options = new HashMap<>();
options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost());
options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort()));
options.put(USERNAME.key(), TEST_USER);
options.put(PASSWORD.key(), TEST_PASSWORD);
options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".\\.*");
options.put(
SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN.key(),
inventoryDatabase.getDatabaseName() + ".multi_max_table:order_id");
Factory.Context context = new MockContext(Configuration.fromMap(options));

MySqlDataSourceFactory factory = new MySqlDataSourceFactory();
MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context);
ObjectPath multiMaxTable =
new ObjectPath(inventoryDatabase.getDatabaseName(), "multi_max_table");

assertThat(dataSource.getSourceConfig().getChunkKeyColumns())
.isNotEmpty()
.isEqualTo(
new HashMap<ObjectPath, String>() {
{
put(multiMaxTable, "order_id");
}
});
}

class MockContext implements Factory.Context {

Configuration factoryConfiguration;
Original file line number Diff line number Diff line change
@@ -69,4 +69,24 @@ VALUES (default, '2016-01-16', 1001, 1, 102),
(default, '2016-02-19', 1002, 2, 106),
(default, '16-02-21', 1003, 1, 107);

CREATE TABLE `multi_max_table`
(
`order_id` varchar(128) NOT NULL,
`index` int(11) NOT NULL,
`desc` varchar(512) NOT NULL,
PRIMARY KEY (`order_id`, `index`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
INSERT INTO multi_max_table
VALUES ('', 0, 'flink'),
('', 1, 'flink'),
('', 2, 'flink'),
('a', 0, 'flink'),
('b', 0, 'flink'),
('c', 0, 'flink'),
('d', 0, 'flink'),
('E', 0, 'flink'),
('E', 1, 'flink'),
('E', 2, 'flink'),
('e', 4, 'flink'),
('E', 3, 'flink');