diff --git a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java index 6f445fb89a0..13dc4b90f7a 100644 --- a/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java +++ b/flink-connector-mysql-cdc/src/main/java/com/ververica/cdc/connectors/mysql/source/utils/StatementUtils.java @@ -244,7 +244,7 @@ private static void addPrimaryKeyColumnsToCondition( RowType pkRowType, StringBuilder sql, String predicate) { for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()).append(predicate); + sql.append(quote(fieldNamesIt.next())).append(predicate); if (fieldNamesIt.hasNext()) { sql.append(" AND "); } @@ -255,7 +255,7 @@ private static String getPrimaryKeyColumnsProjection(RowType pkRowType) { StringBuilder sql = new StringBuilder(); for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append(fieldNamesIt.next()); + sql.append(quote(fieldNamesIt.next())); if (fieldNamesIt.hasNext()) { sql.append(" , "); } @@ -267,7 +267,7 @@ private static String getMaxPrimaryKeyColumnsProjection(RowType pkRowType) { StringBuilder sql = new StringBuilder(); for (Iterator fieldNamesIt = pkRowType.getFieldNames().iterator(); fieldNamesIt.hasNext(); ) { - sql.append("MAX(" + fieldNamesIt.next() + ")"); + sql.append("MAX(" + quote(fieldNamesIt.next()) + ")"); if (fieldNamesIt.hasNext()) { sql.append(" , "); }