Skip to content

Commit

Permalink
[cdc-connector][oracle] Use database name in upper case at the snapsh…
Browse files Browse the repository at this point in the history
…ot phase to fix fetching two different db name for the same table at the snapshot phase and incremental phase(apache#2088)
  • Loading branch information
e-mhui authored and joyCurry30 committed Mar 22, 2024
1 parent 22512ee commit 5836663
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import static com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.SCHEMA_NAME;
import static com.ververica.cdc.connectors.oracle.source.config.OracleSourceOptions.URL;
import static com.ververica.cdc.debezium.table.DebeziumOptions.getDebeziumProperties;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/** Factory for creating configured instance of {@link OracleTableSource}. */
Expand All @@ -75,6 +76,16 @@ public DynamicTableSource createDynamicTableSource(Context context) {
String username = config.get(USERNAME);
String password = config.get(PASSWORD);
String databaseName = config.get(DATABASE_NAME);
checkNotNull(databaseName);
// During the incremental phase, Debezium uses the uppercase database name.
// However, during the snapshot phase, the database name is user-configurable.
// To avoid inconsistencies between the database names in the snapshot and incremental
// phases,
// it is necessary to convert the database name to uppercase when constructing the Oracle
// Source.
// For more details, please refer to:
// https://github.com/ververica/flink-cdc-connectors/pull/2088.
databaseName = databaseName.toUpperCase();
String tableName = config.get(TABLE_NAME);
String schemaName = config.get(SCHEMA_NAME);
int port = config.get(PORT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ public class OracleTableSourceFactoryTest {
private static final String MY_LOCALHOST = "localhost";
private static final String MY_USERNAME = "flinkuser";
private static final String MY_PASSWORD = "flinkpw";
private static final String MY_DATABASE = "myDB";
private static final String MY_DATABASE = "MYDB";
private static final String MY_TABLE = "myTable";
private static final String MY_SCHEMA = "mySchema";
private static final Properties PROPERTIES = new Properties();
Expand Down

0 comments on commit 5836663

Please sign in to comment.