From 2117bf41bce9b1b39cb834ce2c97c96ad8366870 Mon Sep 17 00:00:00 2001 From: e-mhui Date: Thu, 13 Apr 2023 10:45:21 +0800 Subject: [PATCH 1/2] [Oracle CDC] Fix Oracle CDC cannot capture newly added tables during task running --- .../relational/JdbcSourceEventDispatcher.java | 18 +++++++++++------- .../base/utils/SourceRecordUtils.java | 19 ++++++++++++++++--- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java index 1e75af5df62..460cd95785e 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java @@ -16,6 +16,8 @@ package com.ververica.cdc.connectors.base.relational; +import static com.ververica.cdc.connectors.base.utils.SourceRecordUtils.isMysqlConnector; + import com.ververica.cdc.connectors.base.source.meta.offset.Offset; import com.ververica.cdc.connectors.base.source.meta.split.SourceSplitBase; import com.ververica.cdc.connectors.base.source.meta.wartermark.WatermarkEvent; @@ -179,14 +181,16 @@ private Struct schemaChangeRecordKey(SchemaChangeEvent event) { } private Struct schemaChangeRecordValue(SchemaChangeEvent event) throws IOException { - Struct sourceInfo = event.getSource(); Map source = new HashMap<>(); - String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY); - Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY); - Long serverId = sourceInfo.getInt64(SERVER_ID_KEY); - source.put(SERVER_ID_KEY, serverId); - source.put(BINLOG_FILENAME_OFFSET_KEY, fileName); - source.put(BINLOG_POSITION_OFFSET_KEY, pos); + if (isMysqlConnector(event.getSource())) { + Struct sourceInfo = event.getSource(); + String fileName = sourceInfo.getString(BINLOG_FILENAME_OFFSET_KEY); + Long pos = sourceInfo.getInt64(BINLOG_POSITION_OFFSET_KEY); + Long serverId = sourceInfo.getInt64(SERVER_ID_KEY); + source.put(SERVER_ID_KEY, serverId); + source.put(BINLOG_FILENAME_OFFSET_KEY, fileName); + source.put(BINLOG_POSITION_OFFSET_KEY, pos); + } HistoryRecord historyRecord = new HistoryRecord( source, diff --git a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java index 99aca11c3f7..1d65985d74d 100644 --- a/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java +++ b/flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/utils/SourceRecordUtils.java @@ -44,9 +44,11 @@ public class SourceRecordUtils { private SourceRecordUtils() {} - public static final String SCHEMA_CHANGE_EVENT_KEY_NAME = - "io.debezium.connector.mysql.SchemaChangeKey"; private static final DocumentReader DOCUMENT_READER = DocumentReader.defaultReader(); + public static final String MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.mysql.SchemaChangeKey"; + public static final String ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME = "io.debezium.connector.oracle.SchemaChangeKey"; + public static final String CONNECTOR = "connector"; + public static final String MYSQL_CONNECTOR = "mysql"; /** Converts a {@link ResultSet} row to an array of Objects. */ public static Object[] rowToArray(ResultSet rs, int size) throws SQLException { @@ -93,7 +95,8 @@ public static Long getFetchTimestamp(SourceRecord record) { public static boolean isSchemaChangeEvent(SourceRecord sourceRecord) { Schema keySchema = sourceRecord.keySchema(); - return keySchema != null && SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()); + return keySchema != null && (MYSQL_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name()) + || ORACLE_SCHEMA_CHANGE_EVENT_KEY_NAME.equalsIgnoreCase(keySchema.name())); } public static boolean isDataChangeRecord(SourceRecord record) { @@ -189,4 +192,14 @@ public static HistoryRecord getHistoryRecord(SourceRecord schemaRecord) throws I String historyRecordStr = value.getString(HISTORY_RECORD_FIELD); return new HistoryRecord(DOCUMENT_READER.read(historyRecordStr)); } + + /** + * Whether the source belong Mysql Connector + * @param source + * @return true if the source belong Mysql Connector + */ + public static boolean isMysqlConnector(Struct source) { + String connector = source.getString(CONNECTOR); + return MYSQL_CONNECTOR.equalsIgnoreCase(connector); + } } From 0a46930411d5e7d80f96749f72728d4137f63d63 Mon Sep 17 00:00:00 2001 From: e-mhui Date: Tue, 25 Apr 2023 09:21:07 +0800 Subject: [PATCH 2/2] [Oracle] Capture changes made by connector user & document that SYS/SYSTEM changes are not captured --- .../oracle/logminer/LogMinerQueryBuilder.java | 231 ++++++++++++++++++ .../LogMinerStreamingChangeEventSource.java | 3 +- 2 files changed, 232 insertions(+), 2 deletions(-) create mode 100644 flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java new file mode 100644 index 00000000000..16c565bf71c --- /dev/null +++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerQueryBuilder.java @@ -0,0 +1,231 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ +package io.debezium.connector.oracle.logminer; + +import io.debezium.connector.oracle.OracleConnectorConfig; +import io.debezium.connector.oracle.OracleDatabaseSchema; +import io.debezium.util.Strings; +import java.util.Iterator; +import java.util.List; +import java.util.regex.Pattern; + +/** + * A builder that is responsible for producing the query to be executed against the LogMiner view. + * + * @author Chris Cranford + */ +public class LogMinerQueryBuilder { + + private static final String LOGMNR_CONTENTS_VIEW = "V$LOGMNR_CONTENTS"; + + /** + * Builds the LogMiner contents view query. + * + * The returned query will contain 2 bind parameters that the caller is responsible for binding before + * executing the query. The first bind parameter is the lower-bounds of the SCN mining window that is + * not-inclusive while the second is the upper-bounds of the SCN mining window that is inclusive. + * + * The built query relies on the following columns from V$LOGMNR_CONTENTS: + *
+     *     SCN - the system change number at which the change was made
+     *     SQL_REDO - the reconstructed SQL statement that initiated the change
+     *     OPERATION - the database operation type name
+     *     OPERATION_CODE - the database operation numeric code
+     *     TIMESTAMP - the time when the change event occurred
+     *     XID - the transaction identifier the change participated in
+     *     CSF - the continuation flag, identifies rows that should be processed together as single row, 0=no, 1=yes
+     *     TABLE_NAME - the name of the table for which the change is for
+     *     SEG_OWNER - the name of the schema for which the change is for
+     *     USERNAME - the name of the database user that caused the change
+     *     ROW_ID - the unique identifier of the row that the change is for, may not always be set with valid value
+     *     ROLLBACK - the rollback flag, value of 0 or 1.  1 implies the row was rolled back
+     *     RS_ID - the rollback segment idenifier where the change record was record from
+     * 
+ * + * @param connectorConfig connector configuration, should not be {@code null} + * @param schema database schema, should not be {@code null} + * @return the SQL string to be used to fetch changes from Oracle LogMiner + */ + public static String build(OracleConnectorConfig connectorConfig, OracleDatabaseSchema schema) { + final StringBuilder query = new StringBuilder(1024); + query.append("SELECT SCN, SQL_REDO, OPERATION_CODE, TIMESTAMP, XID, CSF, TABLE_NAME, SEG_OWNER, OPERATION, "); + query.append("USERNAME, ROW_ID, ROLLBACK, RS_ID "); + query.append("FROM ").append(LOGMNR_CONTENTS_VIEW).append(" "); + + // These bind parameters will be bound when the query is executed by the caller. + query.append("WHERE SCN > ? AND SCN <= ? "); + + // Restrict to configured PDB if one is supplied + final String pdbName = connectorConfig.getPdbName(); + if (!Strings.isNullOrEmpty(pdbName)) { + query.append("AND ").append("SRC_CON_NAME = '").append(pdbName.toUpperCase()).append("' "); + } + + query.append("AND ("); + + // Always include START, COMMIT, MISSING_SCN, and ROLLBACK operations + query.append("(OPERATION_CODE IN (6,7,34,36)"); + + if (!schema.storeOnlyCapturedTables()) { + // In this mode, the connector will always be fed DDL operations for all tables even if they + // are not part of the inclusion/exclusion lists. + query.append(" OR ").append(buildDdlPredicate()).append(" "); + // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase + if (connectorConfig.isLobEnabled()) { + query.append(") OR (OPERATION_CODE IN (1,2,3,9,10,11,29) "); + } + else { + query.append(") OR (OPERATION_CODE IN (1,2,3) "); + } + } + else { + // Insert, Update, Delete, SelectLob, LobWrite, LobTrim, and LobErase + if (connectorConfig.isLobEnabled()) { + query.append(") OR ((OPERATION_CODE IN (1,2,3,9,10,11,29) "); + } + else { + query.append(") OR ((OPERATION_CODE IN (1,2,3) "); + } + // In this mode, the connector will filter DDL operations based on the table inclusion/exclusion lists + query.append("OR ").append(buildDdlPredicate()).append(") "); + } + + // Always ignore the flush table + query.append("AND TABLE_NAME != '").append(SqlUtils.LOGMNR_FLUSH_TABLE).append("' "); + + // There are some common schemas that we automatically ignore when building the runtime Filter + // predicates and we put that same list of schemas here and apply those in the generated SQL. + if (!OracleConnectorConfig.EXCLUDED_SCHEMAS.isEmpty()) { + query.append("AND SEG_OWNER NOT IN ("); + for (Iterator i = OracleConnectorConfig.EXCLUDED_SCHEMAS.iterator(); i.hasNext();) { + String excludedSchema = i.next(); + query.append("'").append(excludedSchema.toUpperCase()).append("'"); + if (i.hasNext()) { + query.append(","); + } + } + query.append(") "); + } + + String schemaPredicate = buildSchemaPredicate(connectorConfig); + if (!Strings.isNullOrEmpty(schemaPredicate)) { + query.append("AND ").append(schemaPredicate).append(" "); + } + + String tablePredicate = buildTablePredicate(connectorConfig); + if (!Strings.isNullOrEmpty(tablePredicate)) { + query.append("AND ").append(tablePredicate).append(" "); + } + + query.append("))"); + + return query.toString(); + } + + /** + * Builds a common SQL fragment used to obtain DDL operations via LogMiner. + * + * @return predicate that can be used to obtain DDL operations via LogMiner + */ + private static String buildDdlPredicate() { + final StringBuilder predicate = new StringBuilder(256); + predicate.append("(OPERATION_CODE = 5 "); + predicate.append("AND USERNAME NOT IN ('SYS','SYSTEM') "); + predicate.append("AND INFO NOT LIKE 'INTERNAL DDL%' "); + predicate.append("AND (TABLE_NAME IS NULL OR TABLE_NAME NOT LIKE 'ORA_TEMP_%'))"); + return predicate.toString(); + } + + /** + * Builds a SQL predicate of what schemas to include/exclude based on the connector configuration. + * + * @param connectorConfig connector configuration, should not be {@code null} + * @return SQL predicate to filter results based on schema include/exclude configurations + */ + private static String buildSchemaPredicate(OracleConnectorConfig connectorConfig) { + StringBuilder predicate = new StringBuilder(); + if (Strings.isNullOrEmpty(connectorConfig.schemaIncludeList())) { + if (!Strings.isNullOrEmpty(connectorConfig.schemaExcludeList())) { + List patterns = Strings.listOfRegex(connectorConfig.schemaExcludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", true)).append(")"); + } + } + else { + List patterns = Strings.listOfRegex(connectorConfig.schemaIncludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER", false)).append(")"); + } + return predicate.toString(); + } + + /** + * Builds a SQL predicate of what tables to include/exclude based on the connector configuration. + * + * @param connectorConfig connector configuration, should not be {@code null} + * @return SQL predicate to filter results based on table include/exclude configuration + */ + private static String buildTablePredicate(OracleConnectorConfig connectorConfig) { + StringBuilder predicate = new StringBuilder(); + if (Strings.isNullOrEmpty(connectorConfig.tableIncludeList())) { + if (!Strings.isNullOrEmpty(connectorConfig.tableExcludeList())) { + List patterns = Strings.listOfRegex(connectorConfig.tableExcludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", true)).append(")"); + } + } + else { + List patterns = Strings.listOfRegex(connectorConfig.tableIncludeList(), 0); + predicate.append("(").append(listOfPatternsToSql(patterns, "SEG_OWNER || '.' || TABLE_NAME", false)).append(")"); + } + return predicate.toString(); + } + + /** + * Takes a list of reg-ex patterns and builds an Oracle-specific predicate using {@code REGEXP_LIKE} + * in order to take the connector configuration include/exclude lists and assemble them as SQL + * predicates. + * + * @param patterns list of each individual include/exclude reg-ex patterns from connector configuration + * @param columnName the column in which the reg-ex patterns are to be applied against + * @param inclusion should be {@code true} when passing inclusion patterns, {@code false} otherwise + * @return + */ + private static String listOfPatternsToSql(List patterns, String columnName, boolean inclusion) { + StringBuilder predicate = new StringBuilder(); + for (Iterator i = patterns.iterator(); i.hasNext();) { + Pattern pattern = i.next(); + if (inclusion) { + predicate.append("NOT "); + } + // NOTE: The REGEXP_LIKE operator was added in Oracle 10g (10.1.0.0.0) + final String text = resolveRegExpLikePattern(pattern); + predicate.append("REGEXP_LIKE(").append(columnName).append(",'").append(text).append("','i')"); + if (i.hasNext()) { + // Exclude lists imply combining them via AND, Include lists imply combining them via OR? + predicate.append(inclusion ? " AND " : " OR "); + } + } + return predicate.toString(); + } + + /** + * The {@code REGEXP_LIKE} Oracle operator acts identical to the {@code LIKE} operator. Internally, + * it prepends and appends a "%" qualifier. The include/exclude lists are meant to be explicit in + * that they have an implied "^" and "$" qualifier for start/end so that the LIKE operation does + * not mistakently filter "DEBEZIUM2" when using the reg-ex of "DEBEZIUM". + * + * @param pattern the pattern to be analyzed, should not be {@code null} + * @return the adjusted predicate, if necessary and doesn't already explicitly specify "^" or "$" + */ + private static String resolveRegExpLikePattern(Pattern pattern) { + String text = pattern.pattern(); + if (!text.startsWith("^")) { + text = "^" + text; + } + if (!text.endsWith("$")) { + text += "$"; + } + return text; + } +} diff --git a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java index e5519d9313a..22644da1078 100644 --- a/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java +++ b/flink-connector-oracle-cdc/src/main/java/io/debezium/connector/oracle/logminer/LogMinerStreamingChangeEventSource.java @@ -183,8 +183,7 @@ public void execute(ChangeEventSourceContext context, OracleOffsetContext offset historyRecorder); final String query = - LogMinerQueryBuilder.build( - connectorConfig, schema, jdbcConnection.username()); + LogMinerQueryBuilder.build(connectorConfig, schema); try (PreparedStatement miningView = jdbcConnection .connection()