From 7b5b28bdbeac6442a1bfef392f4a5ae6c0ad28b0 Mon Sep 17 00:00:00 2001 From: XiaoJiang521 <131635688+XiaoJiang521@users.noreply.github.com> Date: Tue, 12 Sep 2023 16:53:23 +0800 Subject: [PATCH] [Feature][Catalog] Catalog add Case Conversion Definition (#5328) --- .../api/table/catalog/TablePath.java | 48 ++++++++++--- .../mysql/MysqlCreateTableSqlBuilder.java | 18 +++-- .../oracle/OracleCreateTableSqlBuilder.java | 48 +++++++------ .../psql/PostgresCreateTableSqlBuilder.java | 33 +++++---- .../SqlServerCreateTableSqlBuilder.java | 35 ++++----- .../jdbc/catalog/utils/CatalogUtils.java | 71 +++++++++++++++++++ .../seatunnel/jdbc/sink/JdbcSink.java | 11 ++- .../sql/MysqlCreateTableSqlBuilderTest.java | 12 ++-- 8 files changed, 203 insertions(+), 73 deletions(-) create mode 100644 seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java index 7b2dd6d5533..358e873b991 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/catalog/TablePath.java @@ -22,6 +22,8 @@ import lombok.RequiredArgsConstructor; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; @Getter @EqualsAndHashCode @@ -54,14 +56,15 @@ public static TablePath of(String databaseName, String schemaName, String tableN } public String getSchemaAndTableName() { - return String.format("%s.%s", schemaName, tableName); + return getNameCommon(null, schemaName, tableName, null, null); + } + + public String getSchemaAndTableName(String quote) { + return getNameCommon(null, schemaName, tableName, quote, quote); } public String getFullName() { - if (schemaName == null) { - return String.format("%s.%s", databaseName, tableName); - } - return String.format("%s.%s.%s", databaseName, schemaName, tableName); + return getNameCommon(databaseName, schemaName, tableName, null, null); } public String getFullNameWithQuoted() { @@ -69,13 +72,36 @@ public String getFullNameWithQuoted() { } public String getFullNameWithQuoted(String quote) { - if (schemaName == null) { - return String.format( - "%s%s%s.%s%s%s", quote, databaseName, quote, quote, tableName, quote); + return getNameCommon(databaseName, schemaName, tableName, quote, quote); + } + + public String getFullNameWithQuoted(String quoteLeft, String quoteRight) { + return getNameCommon(databaseName, schemaName, tableName, quoteLeft, quoteRight); + } + + private String getNameCommon( + String databaseName, + String schemaName, + String tableName, + String quoteLeft, + String quoteRight) { + List joinList = new ArrayList<>(); + quoteLeft = quoteLeft == null ? "" : quoteLeft; + quoteRight = quoteRight == null ? "" : quoteRight; + + if (databaseName != null) { + joinList.add(quoteLeft + databaseName + quoteRight); + } + + if (schemaName != null) { + joinList.add(quoteLeft + schemaName + quoteRight); } - return String.format( - "%s%s%s.%s%s%s.%s%s%s", - quote, databaseName, quote, quote, schemaName, quote, quote, tableName, quote); + + if (tableName != null) { + joinList.add(quoteLeft + tableName + quoteRight); + } + + return String.join(".", joinList); } @Override diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java index 490ecd30ff8..3430de04b5a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/mysql/MysqlCreateTableSqlBuilder.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -55,6 +56,8 @@ public class MysqlCreateTableSqlBuilder { private MysqlDataTypeConvertor mysqlDataTypeConvertor; + private String fieldIde; + private MysqlCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); this.tableName = tableName; @@ -76,7 +79,8 @@ public static MysqlCreateTableSqlBuilder builder( .charset(null) .primaryKey(tableSchema.getPrimaryKey()) .constraintKeys(tableSchema.getConstraintKeys()) - .addColumn(tableSchema.getColumns()); + .addColumn(tableSchema.getColumns()) + .fieldIde(catalogTable.getOptions().get("fieldIde")); } public MysqlCreateTableSqlBuilder addColumn(List columns) { @@ -90,6 +94,11 @@ public MysqlCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) { return this; } + public MysqlCreateTableSqlBuilder fieldIde(String fieldIde) { + this.fieldIde = fieldIde; + return this; + } + public MysqlCreateTableSqlBuilder constraintKeys(List constraintKeys) { this.constraintKeys = constraintKeys; return this; @@ -120,7 +129,8 @@ public String build(String catalogName) { sqls.add( String.format( "CREATE TABLE %s (\n%s\n)", - tableName, buildColumnsIdentifySql(catalogName))); + CatalogUtils.quoteIdentifier(tableName, fieldIde, "`"), + buildColumnsIdentifySql(catalogName))); if (engine != null) { sqls.add("ENGINE = " + engine); } @@ -157,7 +167,7 @@ private String buildColumnsIdentifySql(String catalogName) { private String buildColumnIdentifySql(Column column, String catalogName) { final List columnSqls = new ArrayList<>(); - columnSqls.add(column.getName()); + columnSqls.add(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "`")); if (StringUtils.equals(catalogName, "mysql")) { columnSqls.add(column.getSourceType()); } else { @@ -243,7 +253,7 @@ private String buildPrimaryKeySql() { .map(columnName -> "`" + columnName + "`") .collect(Collectors.joining(", ")); // add sort type - return String.format("PRIMARY KEY (%s)", key); + return String.format("PRIMARY KEY (%s)", CatalogUtils.quoteIdentifier(key, fieldIde)); } private String buildConstraintKeySql(ConstraintKey constraintKey) { diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java index 984dd93e6a6..4b780131d54 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/oracle/OracleCreateTableSqlBuilder.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.lang3.StringUtils; @@ -36,23 +37,27 @@ public class OracleCreateTableSqlBuilder { private PrimaryKey primaryKey; private OracleDataTypeConvertor oracleDataTypeConvertor; private String sourceCatalogName; + private String fieldIde; public OracleCreateTableSqlBuilder(CatalogTable catalogTable) { this.columns = catalogTable.getTableSchema().getColumns(); this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); this.oracleDataTypeConvertor = new OracleDataTypeConvertor(); this.sourceCatalogName = catalogTable.getCatalogName(); + this.fieldIde = catalogTable.getOptions().get("fieldIde"); } public String build(TablePath tablePath) { StringBuilder createTableSql = new StringBuilder(); createTableSql .append("CREATE TABLE ") - .append(tablePath.getSchemaAndTableName()) + .append(tablePath.getSchemaAndTableName("\"")) .append(" (\n"); List columnSqls = - columns.stream().map(this::buildColumnSql).collect(Collectors.toList()); + columns.stream() + .map(column -> CatalogUtils.getFieldIde(buildColumnSql(column), fieldIde)) + .collect(Collectors.toList()); // Add primary key directly in the create table statement if (primaryKey != null @@ -70,7 +75,7 @@ public String build(TablePath tablePath) { .map( column -> buildColumnCommentSql( - column, tablePath.getSchemaAndTableName())) + column, tablePath.getSchemaAndTableName("\""))) .collect(Collectors.toList()); if (!commentSqls.isEmpty()) { @@ -83,7 +88,7 @@ public String build(TablePath tablePath) { private String buildColumnSql(Column column) { StringBuilder columnSql = new StringBuilder(); - columnSql.append(column.getName()).append(" "); + columnSql.append("\"").append(column.getName()).append("\" "); String columnType = sourceCatalogName.equals("oracle") @@ -95,11 +100,6 @@ private String buildColumnSql(Column column) { columnSql.append(" NOT NULL"); } - // if (column.getDefaultValue() != null) { - // columnSql.append(" DEFAULT - // '").append(column.getDefaultValue().toString()).append("'"); - // } - return columnSql.toString(); } @@ -140,7 +140,10 @@ private String buildColumnType(Column column) { private String buildPrimaryKeySql(PrimaryKey primaryKey) { String randomSuffix = UUID.randomUUID().toString().replace("-", "").substring(0, 4); - String columnNamesString = String.join(", ", primaryKey.getColumnNames()); + String columnNamesString = + primaryKey.getColumnNames().stream() + .map(columnName -> "\"" + columnName + "\"") + .collect(Collectors.joining(", ")); // In Oracle database, the maximum length for an identifier is 30 characters. String primaryKeyStr = primaryKey.getPrimaryKey(); @@ -148,21 +151,26 @@ private String buildPrimaryKeySql(PrimaryKey primaryKey) { primaryKeyStr = primaryKeyStr.substring(0, 25); } - return "CONSTRAINT " - + primaryKeyStr - + "_" - + randomSuffix - + " PRIMARY KEY (" - + columnNamesString - + ")"; + return CatalogUtils.getFieldIde( + "CONSTRAINT " + + primaryKeyStr + + "_" + + randomSuffix + + " PRIMARY KEY (" + + columnNamesString + + ")", + fieldIde); } private String buildColumnCommentSql(Column column, String tableName) { StringBuilder columnCommentSql = new StringBuilder(); - columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append("."); columnCommentSql - .append(column.getName()) - .append(" IS '") + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) + .append(tableName) + .append("."); + columnCommentSql + .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java index d423f183010..74b684c0e39 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/psql/PostgresCreateTableSqlBuilder.java @@ -23,6 +23,7 @@ import org.apache.seatunnel.api.table.catalog.TablePath; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.lang3.StringUtils; @@ -37,23 +38,30 @@ public class PostgresCreateTableSqlBuilder { private PrimaryKey primaryKey; private PostgresDataTypeConvertor postgresDataTypeConvertor; private String sourceCatalogName; + private String fieldIde; public PostgresCreateTableSqlBuilder(CatalogTable catalogTable) { this.columns = catalogTable.getTableSchema().getColumns(); this.primaryKey = catalogTable.getTableSchema().getPrimaryKey(); this.postgresDataTypeConvertor = new PostgresDataTypeConvertor(); this.sourceCatalogName = catalogTable.getCatalogName(); + this.fieldIde = catalogTable.getOptions().get("fieldIde"); } public String build(TablePath tablePath) { StringBuilder createTableSql = new StringBuilder(); createTableSql - .append("CREATE TABLE ") - .append(tablePath.getSchemaAndTableName()) + .append(CatalogUtils.quoteIdentifier("CREATE TABLE ", fieldIde)) + .append(tablePath.getSchemaAndTableName("\"")) .append(" (\n"); List columnSqls = - columns.stream().map(this::buildColumnSql).collect(Collectors.toList()); + columns.stream() + .map( + column -> + CatalogUtils.quoteIdentifier( + buildColumnSql(column), fieldIde)) + .collect(Collectors.toList()); createTableSql.append(String.join(",\n", columnSqls)); createTableSql.append("\n);"); @@ -64,7 +72,7 @@ public String build(TablePath tablePath) { .map( columns -> buildColumnCommentSql( - columns, tablePath.getSchemaAndTableName())) + columns, tablePath.getSchemaAndTableName("\""))) .collect(Collectors.toList()); if (!commentSqls.isEmpty()) { @@ -77,7 +85,7 @@ public String build(TablePath tablePath) { private String buildColumnSql(Column column) { StringBuilder columnSql = new StringBuilder(); - columnSql.append(column.getName()).append(" "); + columnSql.append("\"").append(column.getName()).append("\" "); // For simplicity, assume the column type in SeaTunnelDataType is the same as in PostgreSQL String columnType = @@ -96,12 +104,6 @@ private String buildColumnSql(Column column) { columnSql.append(" PRIMARY KEY"); } - // Add default value if exists - // if (column.getDefaultValue() != null) { - // columnSql.append(" DEFAULT - // '").append(column.getDefaultValue().toString()).append("'"); - // } - return columnSql.toString(); } @@ -133,10 +135,13 @@ private String buildColumnType(Column column) { private String buildColumnCommentSql(Column column, String tableName) { StringBuilder columnCommentSql = new StringBuilder(); - columnCommentSql.append("COMMENT ON COLUMN ").append(tableName).append("."); columnCommentSql - .append(column.getName()) - .append(" IS '") + .append(CatalogUtils.quoteIdentifier("COMMENT ON COLUMN ", fieldIde)) + .append(tableName) + .append("."); + columnCommentSql + .append(CatalogUtils.quoteIdentifier(column.getName(), fieldIde, "\"")) + .append(CatalogUtils.quoteIdentifier(" IS '", fieldIde)) .append(column.getComment()) .append("'"); return columnCommentSql.toString(); diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java index 0bec148b372..86afa6e41e1 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sqlserver/SqlServerCreateTableSqlBuilder.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.api.table.catalog.TableSchema; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.commons.collections4.CollectionUtils; import org.apache.commons.lang3.StringUtils; @@ -55,6 +56,8 @@ public class SqlServerCreateTableSqlBuilder { private SqlServerDataTypeConvertor sqlServerDataTypeConvertor; + private String fieldIde; + private SqlServerCreateTableSqlBuilder(String tableName) { checkNotNull(tableName, "tableName must not be null"); this.tableName = tableName; @@ -76,7 +79,8 @@ public static SqlServerCreateTableSqlBuilder builder( .charset(null) .primaryKey(tableSchema.getPrimaryKey()) .constraintKeys(tableSchema.getConstraintKeys()) - .addColumn(tableSchema.getColumns()); + .addColumn(tableSchema.getColumns()) + .fieldIde(catalogTable.getOptions().get("fieldIde")); } public SqlServerCreateTableSqlBuilder addColumn(List columns) { @@ -90,6 +94,11 @@ public SqlServerCreateTableSqlBuilder primaryKey(PrimaryKey primaryKey) { return this; } + public SqlServerCreateTableSqlBuilder fieldIde(String fieldIde) { + this.fieldIde = fieldIde; + return this; + } + public SqlServerCreateTableSqlBuilder constraintKeys(List constraintKeys) { this.constraintKeys = constraintKeys; return this; @@ -117,7 +126,7 @@ public SqlServerCreateTableSqlBuilder comment(String comment) { public String build(TablePath tablePath, CatalogTable catalogTable) { List sqls = new ArrayList<>(); - String sqlTableName = tablePath.getFullName(); + String sqlTableName = tablePath.getFullNameWithQuoted("[", "]"); Map columnComments = new HashMap<>(); sqls.add( String.format( @@ -137,6 +146,7 @@ public String build(TablePath tablePath, CatalogTable catalogTable) { sqls.add("COLLATE = " + collate); } String sqlTableSql = String.join(" ", sqls) + ";"; + sqlTableSql = CatalogUtils.quoteIdentifier(sqlTableSql, fieldIde); StringBuilder tableAndColumnComment = new StringBuilder(); if (comment != null) { sqls.add("COMMENT = '" + comment + "'"); @@ -185,7 +195,7 @@ private String buildColumnsIdentifySql(String catalogName, Map c private String buildColumnIdentifySql( Column column, String catalogName, Map columnComments) { final List columnSqls = new ArrayList<>(); - columnSqls.add(column.getName()); + columnSqls.add("[" + column.getName() + "]"); String tyNameDef = ""; if (StringUtils.equals(catalogName, "sqlserver")) { columnSqls.add(column.getSourceType()); @@ -244,19 +254,7 @@ private String buildColumnIdentifySql( } else { columnSqls.add("NOT NULL"); } - // default value - // if (column.getDefaultValue() != null) { - // String defaultValue = "'" + column.getDefaultValue().toString() + "'"; - // if (StringUtils.equals(SqlServerType.BINARY.getName(), tyNameDef) - // && defaultValue.contains("b'")) { - // String rep = defaultValue.replace("b", "").replace("'", ""); - // defaultValue = "0x" + Integer.toHexString(Integer.parseInt(rep)); - // } else if (StringUtils.equals(SqlServerType.BIT.getName(), tyNameDef) - // && defaultValue.contains("b'")) { - // defaultValue = defaultValue.replace("b", "").replace("'", ""); - // } - // columnSqls.add("DEFAULT " + defaultValue); - // } + // comment if (column.getComment() != null) { columnComments.put(column.getName(), column.getComment()); @@ -267,7 +265,10 @@ private String buildColumnIdentifySql( private String buildPrimaryKeySql() { // .map(columnName -> "`" + columnName + "`") - String key = String.join(", ", primaryKey.getColumnNames()); + String key = + primaryKey.getColumnNames().stream() + .map(columnName -> "[" + columnName + "]") + .collect(Collectors.joining(", ")); // add sort type return String.format("PRIMARY KEY (%s)", key); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java new file mode 100644 index 00000000000..4b60f92d80a --- /dev/null +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/utils/CatalogUtils.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils; + +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; + +import org.apache.commons.lang3.StringUtils; + +public class CatalogUtils { + public static String getFieldIde(String identifier, String fieldIde) { + if (StringUtils.isBlank(fieldIde)) { + return identifier; + } + switch (FieldIdeEnum.valueOf(fieldIde.toUpperCase())) { + case LOWERCASE: + return identifier.toLowerCase(); + case UPPERCASE: + return identifier.toUpperCase(); + default: + return identifier; + } + } + + public static String quoteIdentifier(String identifier, String fieldIde, String quote) { + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append(quote).append(parts[i]).append(quote).append("."); + } + return sb.append(quote) + .append(getFieldIde(parts[parts.length - 1], fieldIde)) + .append(quote) + .toString(); + } + + return quote + getFieldIde(identifier, fieldIde) + quote; + } + + public static String quoteIdentifier(String identifier, String fieldIde) { + return getFieldIde(identifier, fieldIde); + } + + public static String quoteTableIdentifier(String identifier, String fieldIde) { + if (identifier.contains(".")) { + String[] parts = identifier.split("\\."); + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + sb.append(parts[i]).append("."); + } + return sb.append(getFieldIde(parts[parts.length - 1], fieldIde)).toString(); + } + + return getFieldIde(identifier, fieldIde); + } +} diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java index 71e0a862496..bbb776e486a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java @@ -38,11 +38,13 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.utils.CatalogUtils; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcOptions; import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSinkConfig; import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect; import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectLoader; +import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dialectenum.FieldIdeEnum; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcAggregatedCommitInfo; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.JdbcSinkState; import org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo; @@ -210,14 +212,21 @@ public void handleSaveMode(DataSaveMode saveMode) { catalogFactory.factoryIdentifier(), ReadonlyConfig.fromMap(new HashMap<>(catalogOptions)))) { catalog.open(); + FieldIdeEnum fieldIdeEnumEnum = config.get(JdbcOptions.FIELD_IDE); + String fieldIde = + fieldIdeEnumEnum == null + ? FieldIdeEnum.ORIGINAL.getValue() + : fieldIdeEnumEnum.getValue(); TablePath tablePath = TablePath.of( jdbcSinkConfig.getDatabase() + "." - + jdbcSinkConfig.getTable()); + + CatalogUtils.quoteTableIdentifier( + jdbcSinkConfig.getTable(), fieldIde)); if (!catalog.databaseExists(jdbcSinkConfig.getDatabase())) { catalog.createDatabase(tablePath, true); } + catalogTable.getOptions().put("fieldIde", fieldIde); if (!catalog.tableExists(tablePath)) { catalog.createTable(tablePath, catalogTable, true); } diff --git a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java index b75ac68223b..04e00f1de1a 100644 --- a/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java +++ b/seatunnel-connectors-v2/connector-jdbc/src/test/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/sql/MysqlCreateTableSqlBuilderTest.java @@ -93,12 +93,12 @@ public void testBuild() { MysqlCreateTableSqlBuilder.builder(tablePath, catalogTable).build("mysql"); // create table sql is change; The old unit tests are no longer applicable String expect = - "CREATE TABLE test_table (\n" - + "\tid null NOT NULL COMMENT 'id', \n" - + "\tname null NOT NULL COMMENT 'name', \n" - + "\tage null NULL COMMENT 'age', \n" - + "\tcreateTime null NULL COMMENT 'createTime', \n" - + "\tlastUpdateTime null NULL COMMENT 'lastUpdateTime', \n" + "CREATE TABLE `test_table` (\n" + + "\t`id` null NOT NULL COMMENT 'id', \n" + + "\t`name` null NOT NULL COMMENT 'name', \n" + + "\t`age` null NULL COMMENT 'age', \n" + + "\t`createTime` null NULL COMMENT 'createTime', \n" + + "\t`lastUpdateTime` null NULL COMMENT 'lastUpdateTime', \n" + "\tPRIMARY KEY (`id`)\n" + ") COMMENT = 'User table';"; CONSOLE.println(expect);