From d663703df85624977a3c3887e639f4e69194847c Mon Sep 17 00:00:00 2001 From: mcy Date: Wed, 24 May 2023 15:15:22 +0800 Subject: [PATCH 1/6] Add catalog support for SQL Transform plugin --- .../seatunnel/transform/sql/SQLEngine.java | 4 +- .../seatunnel/transform/sql/SQLTransform.java | 147 ++++++++++++++++-- .../transform/sql/SQLTransformFactory.java | 11 +- .../transform/sql/zeta/ZetaSQLEngine.java | 20 ++- 4 files changed, 163 insertions(+), 19 deletions(-) diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java index f52858fef36..b1e734c31ef 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java @@ -20,10 +20,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import java.util.List; + public interface SQLEngine { void init(String inputTableName, SeaTunnelRowType inputRowType, String sql); - SeaTunnelRowType typeMapping(); + SeaTunnelRowType typeMapping(List inputColumnsMapping); SeaTunnelRow transformBySQL(SeaTunnelRow inputRow); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index a60640b7df6..87db78d14cd 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -21,20 +21,37 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.ConfigValidator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform; import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType; import com.google.auto.service.AutoService; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; import static org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA; +@Slf4j +@NoArgsConstructor @AutoService(SeaTunnelTransform.class) -public class SQLTransform extends AbstractSeaTunnelTransform { +public class SQLTransform extends AbstractCatalogSupportTransform { + public static final String PLUGIN_NAME = "Sql"; public static final Option KEY_QUERY = Options.key("query").stringType().noDefaultValue().withDescription("The query SQL"); @@ -51,22 +68,40 @@ public class SQLTransform extends AbstractSeaTunnelTransform { private transient SQLEngine sqlEngine; + public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) { + super(catalogTable); + this.query = config.get(KEY_QUERY); + if (config.getOptional(KEY_ENGINE).isPresent()) { + this.engineType = EngineType.valueOf(config.get(KEY_ENGINE).toUpperCase()); + } else { + this.engineType = ZETA; + } + + this.inputTableName = catalogTable.getTableId().getTableName(); + List columns = catalogTable.getTableSchema().getColumns(); + String[] fieldNames = new String[columns.size()]; + SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + fieldNames[i] = columns.get(i).getName(); + fieldTypes[i] = columns.get(i).getDataType(); + } + this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes); + } + @Override public String getPluginName() { - return "Sql"; + return PLUGIN_NAME; } @Override protected void setConfig(Config pluginConfig) { - CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY.key()); - if (!checkResult.isSuccess()) { - throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg()); - } - query = pluginConfig.getString(KEY_QUERY.key()); - if (pluginConfig.hasPath(KEY_ENGINE.key())) { - engineType = EngineType.valueOf(pluginConfig.getString(KEY_ENGINE.key()).toUpperCase()); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig); + ConfigValidator.of(readonlyConfig).validate(new SQLTransformFactory().optionRule()); + this.query = readonlyConfig.get(KEY_QUERY); + if (readonlyConfig.getOptional(KEY_ENGINE).isPresent()) { + this.engineType = EngineType.valueOf(readonlyConfig.get(KEY_ENGINE).toUpperCase()); } else { - engineType = ZETA; + this.engineType = ZETA; } } @@ -85,7 +120,7 @@ private void tryOpen() { @Override protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) { tryOpen(); - return sqlEngine.typeMapping(); + return sqlEngine.typeMapping(null); } @Override @@ -95,7 +130,87 @@ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) { } @Override - public void close() { - sqlEngine.close(); + protected TableSchema transformTableSchema() { + tryOpen(); + List inputColumnsMapping = new ArrayList<>(); + SeaTunnelRowType outRowType = sqlEngine.typeMapping(inputColumnsMapping); + + TableSchema.Builder builder = TableSchema.builder(); + if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) { + List outPkColumnNames = new ArrayList<>(); + for (String pkColumnName : + inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()) { + for (int i = 0; i < inputColumnsMapping.size(); i++) { + if (pkColumnName.equals(inputColumnsMapping.get(i))) { + outPkColumnNames.add(outRowType.getFieldName(i)); + } + } + } + builder.primaryKey( + PrimaryKey.of( + inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(), + outPkColumnNames)); + } + if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) { + List outConstraintKey = new ArrayList<>(); + for (ConstraintKey constraintKey : + inputCatalogTable.getTableSchema().getConstraintKeys()) { + List outConstraintColumnKeys = new ArrayList<>(); + for (ConstraintKey.ConstraintKeyColumn constraintKeyColumn : + constraintKey.getColumnNames()) { + String constraintColumnName = constraintKeyColumn.getColumnName(); + for (int i = 0; i < inputColumnsMapping.size(); i++) { + if (constraintColumnName.equals(inputColumnsMapping.get(i))) { + outConstraintColumnKeys.add( + ConstraintKey.ConstraintKeyColumn.of( + outRowType.getFieldName(i), + constraintKeyColumn.getSortType())); + } + } + } + outConstraintKey.add( + ConstraintKey.of( + constraintKey.getConstraintType(), + constraintKey.getConstraintName(), + outConstraintColumnKeys)); + } + builder.constraintKey(outConstraintKey); + } + + String[] fieldNames = outRowType.getFieldNames(); + SeaTunnelDataType[] fieldTypes = outRowType.getFieldTypes(); + List columns = new ArrayList<>(fieldNames.length); + for (int i = 0; i < fieldNames.length; i++) { + Column simpleColumn = null; + String inputColumnName = inputColumnsMapping.get(i); + if (inputColumnName != null) { + for (Column inputColumn : inputCatalogTable.getTableSchema().getColumns()) { + if (inputColumnName.equals(inputColumn.getName())) { + simpleColumn = inputColumn; + break; + } + } + } + Column column; + if (simpleColumn != null) { + column = + PhysicalColumn.of( + fieldNames[i], + fieldTypes[i], + simpleColumn.getColumnLength(), + simpleColumn.isNullable(), + simpleColumn.getDefaultValue(), + simpleColumn.getComment()); + } else { + column = PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, true, null, null); + } + columns.add(column); + } + return builder.columns(columns).build(); + } + + @Override + protected TableIdentifier transformTableIdentifier() { + return inputCatalogTable.getTableId().copy(); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java index 3b97c96a755..f509af832cb 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java @@ -18,7 +18,10 @@ package org.apache.seatunnel.transform.sql; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import com.google.auto.service.AutoService; @@ -29,11 +32,17 @@ public class SQLTransformFactory implements TableTransformFactory { @Override public String factoryIdentifier() { - return "Sql"; + return SQLTransform.PLUGIN_NAME; } @Override public OptionRule optionRule() { return OptionRule.builder().required(KEY_QUERY).build(); } + + @Override + public TableTransform createTransform(TableFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new SQLTransform(context.getOptions(), catalogTable); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index 62fa8d90153..b6b368b3b78 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -38,11 +38,14 @@ import net.sf.jsqlparser.statement.select.SelectItem; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.ServiceLoader; +import java.util.stream.Collectors; public class ZetaSQLEngine implements SQLEngine { private String inputTableName; + private SeaTunnelRowType inputRowType; private String sql; private PlainSelect selectBody; @@ -56,6 +59,7 @@ public ZetaSQLEngine() {} @Override public void init(String inputTableName, SeaTunnelRowType inputRowType, String sql) { this.inputTableName = inputTableName; + this.inputRowType = inputRowType; this.sql = sql; List udfList = new ArrayList<>(); @@ -140,11 +144,19 @@ private void validateSQL(Statement statement) { } @Override - public SeaTunnelRowType typeMapping() { + public SeaTunnelRowType typeMapping(List inputColumnsMapping) { List selectItems = selectBody.getSelectItems(); String[] fieldNames = new String[selectItems.size()]; SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[selectItems.size()]; + if (inputColumnsMapping != null) { + for (int i = 0; i < selectItems.size(); i++) { + inputColumnsMapping.add(null); + } + } + + List inputColumnNames = + Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList()); for (int i = 0; i < selectItems.size(); i++) { SelectItem selectItem = selectItems.get(i); @@ -162,6 +174,12 @@ public SeaTunnelRowType typeMapping() { } } + if (inputColumnsMapping != null + && expression instanceof Column + && inputColumnNames.contains(((Column) expression).getColumnName())) { + inputColumnsMapping.set(i, ((Column) expression).getColumnName()); + } + seaTunnelDataTypes[i] = zetaSQLType.getExpressionType(expression); } } From de88063e4faaf474e5b91da8e882d40cf4531306 Mon Sep 17 00:00:00 2001 From: mcy Date: Wed, 24 May 2023 16:28:46 +0800 Subject: [PATCH 2/6] optimize code --- .../apache/seatunnel/transform/sql/SQLTransform.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index 87db78d14cd..f98ecf6aefd 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -82,8 +82,9 @@ public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable catalo String[] fieldNames = new String[columns.size()]; SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[columns.size()]; for (int i = 0; i < columns.size(); i++) { - fieldNames[i] = columns.get(i).getName(); - fieldTypes[i] = columns.get(i).getDataType(); + Column column = columns.get(i); + fieldNames[i] = column.getName(); + fieldTypes[i] = column.getDataType(); } this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes); } @@ -213,4 +214,9 @@ protected TableSchema transformTableSchema() { protected TableIdentifier transformTableIdentifier() { return inputCatalogTable.getTableId().copy(); } + + @Override + public void close() { + sqlEngine.close(); + } } From ba17b50162d00e08ed6c404b240c495c194513ae Mon Sep 17 00:00:00 2001 From: mcy Date: Fri, 26 May 2023 13:10:46 +0800 Subject: [PATCH 3/6] Add test config into mysqlcdc_to_mysql.conf for SQLTransform catalog support --- .../src/test/resources/mysqlcdc_to_mysql.conf | 29 ++++++++++------ .../seatunnel/transform/sql/SQLTransform.java | 34 +++++++++++++------ .../transform/sql/zeta/ZetaSQLEngine.java | 5 ++- 3 files changed, 46 insertions(+), 22 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf index 7b92bd986ac..28107c36366 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf @@ -37,20 +37,29 @@ source { } transform { + sql { + source_table_name = "customers_mysql_cdc" + query = """ select id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, + f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, + f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, + f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year + from customers_mysql_cdc """ + result_table_name = "trans_mysql_cdc" + } } sink { jdbc { - source_table_name = "customers_mysql_cdc" - url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" - driver = "com.mysql.cj.jdbc.Driver" - user = "st_user" - password = "seatunnel" + source_table_name = "trans_mysql_cdc" + url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user" + password = "seatunnel" - generate_sink_sql = true - # You need to configure both database and table - database = mysql_cdc - table = mysql_cdc_e2e_sink_table - primary_keys = ["id"] + generate_sink_sql = true + # You need to configure both database and table + database = mysql_cdc + table = mysql_cdc_e2e_sink_table + primary_keys = ["id"] } } \ No newline at end of file diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index f98ecf6aefd..20a07dcee02 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; import org.apache.seatunnel.api.configuration.ReadonlyConfig; @@ -77,7 +78,12 @@ public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable catalo this.engineType = ZETA; } - this.inputTableName = catalogTable.getTableId().getTableName(); + List sourceTableNames = config.get(CommonOptions.SOURCE_TABLE_NAME); + if (sourceTableNames != null && !sourceTableNames.isEmpty()) { + this.inputTableName = sourceTableNames.get(0); + } else { + this.inputTableName = catalogTable.getTableId().getTableName(); + } List columns = catalogTable.getTableSchema().getColumns(); String[] fieldNames = new String[columns.size()]; SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[columns.size()]; @@ -147,10 +153,12 @@ protected TableSchema transformTableSchema() { } } } - builder.primaryKey( - PrimaryKey.of( - inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(), - outPkColumnNames)); + if (!outPkColumnNames.isEmpty()) { + builder.primaryKey( + PrimaryKey.of( + inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(), + outPkColumnNames)); + } } if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) { List outConstraintKey = new ArrayList<>(); @@ -169,13 +177,17 @@ protected TableSchema transformTableSchema() { } } } - outConstraintKey.add( - ConstraintKey.of( - constraintKey.getConstraintType(), - constraintKey.getConstraintName(), - outConstraintColumnKeys)); + if (!outConstraintColumnKeys.isEmpty()) { + outConstraintKey.add( + ConstraintKey.of( + constraintKey.getConstraintType(), + constraintKey.getConstraintName(), + outConstraintColumnKeys)); + } + } + if (!outConstraintKey.isEmpty()) { + builder.constraintKey(outConstraintKey); } - builder.constraintKey(outConstraintKey); } String[] fieldNames = outRowType.getFieldNames(); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index b6b368b3b78..fa491e7e6e5 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -201,7 +201,10 @@ public SeaTunnelRow transformBySQL(SeaTunnelRow inputRow) { // Project Object[] outputFields = project(inputFields); - return new SeaTunnelRow(outputFields); + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields); + seaTunnelRow.setRowKind(inputRow.getRowKind()); + seaTunnelRow.setTableId(inputRow.getTableId()); + return seaTunnelRow; } private Object[] scanTable(SeaTunnelRow inputRow) { From f5cb21ee0de3344419ae9e69073c72d726eea851 Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Fri, 26 May 2023 21:21:52 +0800 Subject: [PATCH 4/6] format conf file --- .../src/test/resources/mysqlcdc_to_mysql.conf | 20 +++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf index 28107c36366..7251917b80b 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf @@ -50,16 +50,16 @@ transform { sink { jdbc { - source_table_name = "trans_mysql_cdc" - url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" - driver = "com.mysql.cj.jdbc.Driver" - user = "st_user" - password = "seatunnel" + source_table_name = "trans_mysql_cdc" + url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + driver = "com.mysql.cj.jdbc.Driver" + user = "st_user" + password = "seatunnel" - generate_sink_sql = true - # You need to configure both database and table - database = mysql_cdc - table = mysql_cdc_e2e_sink_table - primary_keys = ["id"] + generate_sink_sql = true + # You need to configure both database and table + database = mysql_cdc + table = mysql_cdc_e2e_sink_table + primary_keys = ["id"] } } \ No newline at end of file From bfaf5f1242397e097cd7c94c0e14051caa14bc8c Mon Sep 17 00:00:00 2001 From: rewerkt Date: Sun, 28 May 2023 09:32:09 +0800 Subject: [PATCH 5/6] Add catalog factory test for mysqlcdc_to_mysql.conf --- .../src/test/resources/mysqlcdc_to_mysql.conf | 3 +++ 1 file changed, 3 insertions(+) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf index 7251917b80b..9f85853ab7a 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf @@ -33,6 +33,9 @@ source { password = "seatunnel" table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + catalog { + factory=MySQL + } } } From 00ff93b17b048fc8d42db85430764dcd7d171e28 Mon Sep 17 00:00:00 2001 From: rewerkt Date: Sun, 28 May 2023 09:32:56 +0800 Subject: [PATCH 6/6] optimize code style --- .../src/test/resources/mysqlcdc_to_mysql.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf index 9f85853ab7a..e8d85aecc5c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf @@ -34,7 +34,7 @@ source { table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" catalog { - factory=MySQL + factory = MySQL } } }