From 6f133bed9817438d5a4b3f5e484f355ef903152c Mon Sep 17 00:00:00 2001 From: liuliu Date: Mon, 29 May 2023 19:57:21 +0800 Subject: [PATCH] [monir][mysql] Filter databases that do not need to be read --- .../mysql/source/utils/TableDiscoveryUtils.java | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java index 3cb9f27631b..664d360dd1f 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java @@ -56,7 +56,13 @@ public static List listTables(JdbcConnection jdbc, RelationalTableFilte "SHOW DATABASES", rs -> { while (rs.next()) { - databaseNames.add(rs.getString(1)); + String databaseName = rs.getString(1); + if (tableFilters.databaseFilter().test(databaseName)){ + databaseNames.add(databaseName); + LOG.info("\t including database '{}' for further processing", databaseName); + }else { + LOG.info("\t '{}' is filtered out of database capturing", databaseName); + } } }); LOG.info("\t list of available databases is: {}", databaseNames); @@ -78,9 +84,9 @@ public static List listTables(JdbcConnection jdbc, RelationalTableFilte TableId tableId = new TableId(dbName, null, rs.getString(1)); if (tableFilters.dataCollectionFilter().isIncluded(tableId)) { capturedTableIds.add(tableId); - LOG.info("\t including '{}' for further processing", tableId); + LOG.info("\t including table '{}' for further processing", tableId); } else { - LOG.info("\t '{}' is filtered out of capturing", tableId); + LOG.info("\t '{}' is filtered out of table capturing", tableId); } } });