Skip to content

Commit 12c36f1

Browse files
authored
[FLINK-36656][mysql] Fix type conversion failure for newly-added sharding table with mysql boolean type (#3684)
1 parent 2c4aa3c commit 12c36f1

File tree

3 files changed

+378
-72
lines changed

3 files changed

+378
-72
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/table/MySqlDeserializationConverterFactory.java

+19
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public static DeserializationRuntimeConverterFactory instance() {
5353
public Optional<DeserializationRuntimeConverter> createUserDefinedConverter(
5454
LogicalType logicalType, ZoneId serverTimeZone) {
5555
switch (logicalType.getTypeRoot()) {
56+
case TINYINT:
57+
return createTinyIntConverter();
5658
case CHAR:
5759
case VARCHAR:
5860
return createStringConverter();
@@ -148,6 +150,23 @@ public Object convert(Object dbzObj, Schema schema) throws Exception {
148150
}
149151
}
150152

153+
private static Optional<DeserializationRuntimeConverter> createTinyIntConverter() {
154+
155+
return Optional.of(
156+
new DeserializationRuntimeConverter() {
157+
private static final long serialVersionUID = 1L;
158+
159+
@Override
160+
public Object convert(Object dbzObj, Schema schema) throws Exception {
161+
if (dbzObj instanceof Boolean) {
162+
return dbzObj == Boolean.TRUE ? (byte) 1 : (byte) 0;
163+
} else {
164+
return Byte.parseByte(dbzObj.toString());
165+
}
166+
}
167+
});
168+
}
169+
151170
private static boolean hasFamily(LogicalType logicalType, LogicalTypeFamily family) {
152171
return logicalType.getTypeRoot().getFamilies().contains(family);
153172
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlConnectorITCase.java

-72
Original file line numberDiff line numberDiff line change
@@ -1592,78 +1592,6 @@ public void testAlterWithDefaultStringValue() throws Exception {
15921592
jobClient.cancel().get();
15931593
}
15941594

1595-
@Test
1596-
public void testShardingTablesWithInconsistentSchema() throws Exception {
1597-
userDatabase1.createAndInitialize();
1598-
userDatabase2.createAndInitialize();
1599-
String sourceDDL =
1600-
String.format(
1601-
"CREATE TABLE `user` ("
1602-
+ " `id` DECIMAL(20, 0) NOT NULL,"
1603-
+ " name STRING,"
1604-
+ " address STRING,"
1605-
+ " phone_number STRING,"
1606-
+ " email STRING,"
1607-
+ " age INT,"
1608-
+ " primary key (`id`) not enforced"
1609-
+ ") WITH ("
1610-
+ " 'connector' = 'mysql-cdc',"
1611-
+ " 'hostname' = '%s',"
1612-
+ " 'port' = '%s',"
1613-
+ " 'username' = '%s',"
1614-
+ " 'password' = '%s',"
1615-
+ " 'database-name' = '%s',"
1616-
+ " 'table-name' = '%s',"
1617-
+ " 'scan.incremental.snapshot.enabled' = '%s',"
1618-
+ " 'server-time-zone' = 'UTC',"
1619-
+ " 'server-id' = '%s',"
1620-
+ " 'scan.incremental.snapshot.chunk.size' = '%s'"
1621-
+ ")",
1622-
MYSQL_CONTAINER.getHost(),
1623-
MYSQL_CONTAINER.getDatabasePort(),
1624-
userDatabase1.getUsername(),
1625-
userDatabase1.getPassword(),
1626-
String.format(
1627-
"(%s|%s)",
1628-
userDatabase1.getDatabaseName(), userDatabase2.getDatabaseName()),
1629-
"user_table_.*",
1630-
incrementalSnapshot,
1631-
getServerId(),
1632-
getSplitSize());
1633-
tEnv.executeSql(sourceDDL);
1634-
1635-
// async submit job
1636-
TableResult result = tEnv.executeSql("SELECT * FROM `user`");
1637-
1638-
CloseableIterator<Row> iterator = result.collect();
1639-
waitForSnapshotStarted(iterator);
1640-
1641-
try (Connection connection = userDatabase1.getJdbcConnection();
1642-
Statement statement = connection.createStatement()) {
1643-
statement.execute("UPDATE user_table_1_1 SET email = 'user_111@bar.org' WHERE id=111;");
1644-
}
1645-
1646-
try (Connection connection = userDatabase2.getJdbcConnection();
1647-
Statement statement = connection.createStatement()) {
1648-
statement.execute("UPDATE user_table_2_2 SET age = 20 WHERE id=221;");
1649-
}
1650-
1651-
String[] expected =
1652-
new String[] {
1653-
"+I[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
1654-
"-U[111, user_111, Shanghai, 123567891234, user_111@foo.com, null]",
1655-
"+U[111, user_111, Shanghai, 123567891234, user_111@bar.org, null]",
1656-
"+I[121, user_121, Shanghai, 123567891234, null, null]",
1657-
"+I[211, user_211, Shanghai, 123567891234, null, null]",
1658-
"+I[221, user_221, Shanghai, 123567891234, null, 18]",
1659-
"-U[221, user_221, Shanghai, 123567891234, null, 18]",
1660-
"+U[221, user_221, Shanghai, 123567891234, null, 20]",
1661-
};
1662-
1663-
assertEqualsInAnyOrder(Arrays.asList(expected), fetchRows(iterator, expected.length));
1664-
result.getJobClient().get().cancel().get();
1665-
}
1666-
16671595
@Test
16681596
public void testStartupFromSpecificBinlogFilePos() throws Exception {
16691597
inventoryDatabase.createAndInitialize();

0 commit comments

Comments
 (0)