Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[mysql] Fix NullPointerException when database name or table name contains dot #2006

Merged
merged 1 commit into from
Jun 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import static com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.rowToSerializedString;
import static com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.serializedStringToRow;
import static com.ververica.cdc.connectors.mysql.source.utils.SerializerUtils.writeBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.StatementUtils.quote;

/** A serializer for the {@link MySqlSplit}. */
public final class MySqlSplitSerializer implements SimpleVersionedSerializer<MySqlSplit> {
Expand Down Expand Up @@ -70,7 +71,7 @@ public byte[] serialize(MySqlSplit split) throws IOException {

final DataOutputSerializer out = SERIALIZER_CACHE.get();
out.writeInt(SNAPSHOT_SPLIT_FLAG);
out.writeUTF(snapshotSplit.getTableId().toString());
out.writeUTF(quote(snapshotSplit.getTableId()));
out.writeUTF(snapshotSplit.splitId());
out.writeUTF(snapshotSplit.getSplitKeyType().asSerializableString());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,20 +64,26 @@ public class SnapshotSplitReaderTest extends MySqlSourceTestBase {
private static final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", "mysqluser", "mysqlpw");

private static final UniqueDatabase customer3_0Database =
new UniqueDatabase(MYSQL_CONTAINER, "customer3.0", "mysqluser", "mysqlpw");

private static BinaryLogClient binaryLogClient;
private static MySqlConnection mySqlConnection;

@BeforeClass
public static void init() {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers"}, 10);
customer3_0Database.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(customerDatabase, new String[] {"customers"}, 10);
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
}

@Test
public void testReadSingleSnapshotSplit() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
MySqlSourceConfig sourceConfig =
getConfig(customerDatabase, new String[] {"customers_even_dist"}, 4);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final DataType dataType =
Expand All @@ -100,9 +106,48 @@ public void testReadSingleSnapshotSplit() throws Exception {
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
public void testReadSingleSnapshotSplitWithDotName() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(customer3_0Database, new String[] {"customers3.0"}, 4);
BinaryLogClient binaryLogClient =
DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
MySqlConnection mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);
final DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));
List<MySqlSplit> mySqlSplits =
getMySqlSplits(
sourceConfig,
Arrays.asList(
String.format(
"`%s`.`customers3.0`",
customer3_0Database.getDatabaseName()))
.stream()
.map(TableId::parse)
.collect(Collectors.toList()));

String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]"
};
List<String> actual =
readTableSnapshotSplits(mySqlSplits, statefulTaskContext, 1, dataType);
assertEqualsInAnyOrder(Arrays.asList(expected), actual);
}

@Test
public void testReadAllSnapshotSplitsForOneTable() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customers_even_dist"}, 4);
MySqlSourceConfig sourceConfig =
getConfig(customerDatabase, new String[] {"customers_even_dist"}, 4);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);

Expand Down Expand Up @@ -135,7 +180,8 @@ public void testReadAllSnapshotSplitsForOneTable() throws Exception {

@Test
public void testReadAllSplitForTableWithSingleLine() throws Exception {
MySqlSourceConfig sourceConfig = getConfig(new String[] {"customer_card_single_line"}, 10);
MySqlSourceConfig sourceConfig =
getConfig(customerDatabase, new String[] {"customer_card_single_line"}, 10);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);

Expand All @@ -156,7 +202,10 @@ public void testReadAllSplitForTableWithSingleLine() throws Exception {
@Test
public void testReadAllSnapshotSplitsForTables() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customer_card_single_line"}, 10);
getConfig(
customerDatabase,
new String[] {"customer_card", "customer_card_single_line"},
10);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);

Expand Down Expand Up @@ -200,7 +249,7 @@ public void testReadAllSnapshotSplitsForTables() throws Exception {
@Test
public void testThrowRuntimeExceptionInSnapshotScan() throws Exception {
MySqlSourceConfig sourceConfig =
getConfig(new String[] {"customer_card", "customers_1"}, 10);
getConfig(customerDatabase, new String[] {"customer_card", "customers_1"}, 10);
StatefulTaskContext statefulTaskContext =
new StatefulTaskContext(sourceConfig, binaryLogClient, mySqlConnection);

Expand Down Expand Up @@ -230,7 +279,7 @@ public void testThrowRuntimeExceptionInSnapshotScan() throws Exception {
@Test
public void testChangingDataInSnapshotScan() throws Exception {
String tableName = "customers_even_dist";
MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, 10);
MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {tableName}, 10);

String tableId = customerDatabase.getDatabaseName() + "." + tableName;
String[] changingDataSql =
Expand Down Expand Up @@ -279,7 +328,7 @@ public void testChangingDataInSnapshotScan() throws Exception {
@Test
public void testInsertDataInSnapshotScan() throws Exception {
String tableName = "customers_even_dist";
MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, 10);
MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {tableName}, 10);

String tableId = customerDatabase.getDatabaseName() + "." + tableName;
String[] insertDataSql =
Expand Down Expand Up @@ -335,7 +384,7 @@ public void testInsertDataInSnapshotScan() throws Exception {
@Test
public void testDeleteDataInSnapshotScan() throws Exception {
String tableName = "customers_even_dist";
MySqlSourceConfig sourceConfig = getConfig(new String[] {tableName}, 10);
MySqlSourceConfig sourceConfig = getConfig(customerDatabase, new String[] {tableName}, 10);

String tableId = customerDatabase.getDatabaseName() + "." + tableName;
String[] deleteDataSql =
Expand Down Expand Up @@ -431,6 +480,11 @@ private List<MySqlSplit> getMySqlSplits(MySqlSourceConfig sourceConfig) {
sourceConfig.getTableList().stream()
.map(TableId::parse)
.collect(Collectors.toList());
return getMySqlSplits(sourceConfig, remainingTables);
}

private List<MySqlSplit> getMySqlSplits(
MySqlSourceConfig sourceConfig, List<TableId> remainingTables) {
final MySqlSnapshotSplitAssigner assigner =
new MySqlSnapshotSplitAssigner(
sourceConfig, DEFAULT_PARALLELISM, remainingTables, false);
Expand All @@ -448,22 +502,23 @@ private List<MySqlSplit> getMySqlSplits(MySqlSourceConfig sourceConfig) {
return mySqlSplitList;
}

public static MySqlSourceConfig getConfig(String[] captureTables, int splitSize) {
public static MySqlSourceConfig getConfig(
UniqueDatabase database, String[] captureTables, int splitSize) {
String[] captureTableIds =
Arrays.stream(captureTables)
.map(tableName -> customerDatabase.getDatabaseName() + "." + tableName)
.map(tableName -> database.getDatabaseName() + "." + tableName)
.toArray(String[]::new);

return new MySqlSourceConfigFactory()
.databaseList(customerDatabase.getDatabaseName())
.databaseList(database.getDatabaseName())
.tableList(captureTableIds)
.serverId("1001-1002")
.hostname(MYSQL_CONTAINER.getHost())
.port(MYSQL_CONTAINER.getDatabasePort())
.username(customerDatabase.getUsername())
.username(database.getUsername())
.splitSize(splitSize)
.fetchSize(2)
.password(customerDatabase.getPassword())
.password(database.getPassword())
.createConfig(0);
}

Expand All @@ -482,7 +537,7 @@ private boolean executeSql(MySqlSourceConfig sourceConfig, String[] sqlStatement

class MakeBinlogEventTaskContext extends StatefulTaskContext {

private Supplier<Boolean> makeBinlogFunction;
private final Supplier<Boolean> makeBinlogFunction;

public MakeBinlogEventTaskContext(
MySqlSourceConfig sourceConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ public class MySqlConnectorITCase extends MySqlSourceTestBase {

private final UniqueDatabase customerDatabase =
new UniqueDatabase(MYSQL_CONTAINER, "customer", TEST_USER, TEST_PASSWORD);
private static final UniqueDatabase customer3_0Database =
new UniqueDatabase(MYSQL_CONTAINER, "customer3.0", TEST_USER, TEST_PASSWORD);

private final UniqueDatabase userDatabase1 =
new UniqueDatabase(MYSQL_CONTAINER, "user_1", TEST_USER, TEST_PASSWORD);
Expand Down Expand Up @@ -1112,6 +1114,74 @@ public void testPrimaryKeyWithSnowflakeAlgorithm() throws Exception {
result.getJobClient().get().cancel().get();
}

@Test
public void testReadingWithDotTableName() throws Exception {
if (!incrementalSnapshot) {
return;
}
customer3_0Database.createAndInitialize();
String sourceDDL =
String.format(
"CREATE TABLE customers ("
+ " `id` INTEGER NOT NULL,"
+ " name STRING,"
+ " address STRING,"
+ " phone_number STRING,"
+ " primary key (`id`) not enforced"
+ ") WITH ("
+ " 'connector' = 'mysql-cdc',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'database-name' = '%s',"
leonardBang marked this conversation as resolved.
Show resolved Hide resolved
+ " 'table-name' = '%s',"
+ " 'debezium.internal.implementation' = '%s',"
+ " 'scan.incremental.snapshot.enabled' = '%s',"
+ " 'server-id' = '%s',"
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
+ ")",
MYSQL_CONTAINER.getHost(),
MYSQL_CONTAINER.getDatabasePort(),
customer3_0Database.getUsername(),
customer3_0Database.getPassword(),
customer3_0Database.getDatabaseName(),
"customers3.0",
getDezImplementation(),
incrementalSnapshot,
getServerId(),
getSplitSize());
tEnv.executeSql(sourceDDL);
// async submit job
TableResult result =
tEnv.executeSql(
"SELECT id,\n" + "name,\n" + "address,\n" + "phone_number FROM customers");

CloseableIterator<Row> iterator = result.collect();
waitForSnapshotStarted(iterator);

try (Connection connection = customer3_0Database.getJdbcConnection();
Statement statement = connection.createStatement()) {
statement.execute("UPDATE `customers3.0` SET address = 'Hangzhou' WHERE id=103;");
statement.execute(
"INSERT INTO `customers3.0` VALUES(110, 'newCustomer', 'Berlin', '12345678')");
}

String[] expected =
new String[] {
"+I[101, user_1, Shanghai, 123567891234]",
"+I[102, user_2, Shanghai, 123567891234]",
"+I[103, user_3, Shanghai, 123567891234]",
"+I[104, user_4, Shanghai, 123567891234]",
"-U[103, user_3, Shanghai, 123567891234]",
"+U[103, user_3, Hangzhou, 123567891234]",
"+I[110, newCustomer, Berlin, 12345678]"
};
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
result.getJobClient().get().cancel().get();
customer3_0Database.dropDatabase();
}

@Test
public void testDdlWithDefaultStringValue() throws Exception {
if (!incrementalSnapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@
public class UniqueDatabase {

private static final String[] CREATE_DATABASE_DDL =
new String[] {"CREATE DATABASE $DBNAME$;", "USE $DBNAME$;"};
private static final String DROP_DATABASE_DDL = "DROP DATABASE IF EXISTS $DBNAME$;";
new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
private static final String DROP_DATABASE_DDL = "DROP DATABASE IF EXISTS `$DBNAME$`;";
private static final Pattern COMMENT_PATTERN = Pattern.compile("^(.*)--.*$");

private final MySqlContainer container;
Expand Down
30 changes: 30 additions & 0 deletions flink-connector-mysql-cdc/src/test/resources/ddl/customer3.0.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- Copyright 2022 Ververica Inc.
--
-- Licensed under the Apache License, Version 2.0 (the "License");
-- you may not use this file except in compliance with the License.
-- You may obtain a copy of the License at
-- http://www.apache.org/licenses/LICENSE-2.0
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.

-- ----------------------------------------------------------------------------------------------------------------
-- DATABASE: customer3.0
-- ----------------------------------------------------------------------------------------------------------------

-- Create and populate our customers using a single insert with many rows
CREATE TABLE `customers3.0` (
id INTEGER NOT NULL PRIMARY KEY,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512)
);

INSERT INTO `customers3.0`
VALUES (101,"user_1","Shanghai","123567891234"),
(102,"user_2","Shanghai","123567891234"),
(103,"user_3","Shanghai","123567891234"),
(104,"user_4","Shanghai","123567891234");