diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf index 7b92bd986ac..e8d85aecc5c 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/mysqlcdc_to_mysql.conf @@ -33,15 +33,27 @@ source { password = "seatunnel" table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"] base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" + catalog { + factory = MySQL + } } } transform { + sql { + source_table_name = "customers_mysql_cdc" + query = """ select id, f_binary, f_blob, f_long_varbinary, f_longblob, f_tinyblob, f_varbinary, f_smallint, f_smallint_unsigned, f_mediumint, + f_mediumint_unsigned, f_int, f_int_unsigned, f_integer, f_integer_unsigned, f_bigint, f_bigint_unsigned, f_numeric, f_decimal, + f_float, f_double, f_double_precision, f_longtext, f_mediumtext, f_text, f_tinytext, f_varchar, f_date, f_datetime, f_timestamp, + f_bit1, f_bit64, f_char, f_enum, f_mediumblob, f_long_varchar, f_real, f_time, f_tinyint, f_tinyint_unsigned, f_json, f_year + from customers_mysql_cdc """ + result_table_name = "trans_mysql_cdc" + } } sink { jdbc { - source_table_name = "customers_mysql_cdc" + source_table_name = "trans_mysql_cdc" url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc" driver = "com.mysql.cj.jdbc.Driver" user = "st_user" diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java index f52858fef36..b1e734c31ef 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLEngine.java @@ -20,10 +20,12 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import java.util.List; + public interface SQLEngine { void init(String inputTableName, SeaTunnelRowType inputRowType, String sql); - SeaTunnelRowType typeMapping(); + SeaTunnelRowType typeMapping(List inputColumnsMapping); SeaTunnelRow transformBySQL(SeaTunnelRow inputRow); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java index a60640b7df6..20a07dcee02 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransform.java @@ -19,22 +19,40 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.api.common.CommonOptions; import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.configuration.ReadonlyConfig; +import org.apache.seatunnel.api.configuration.util.ConfigValidator; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.catalog.Column; +import org.apache.seatunnel.api.table.catalog.ConstraintKey; +import org.apache.seatunnel.api.table.catalog.PhysicalColumn; +import org.apache.seatunnel.api.table.catalog.PrimaryKey; +import org.apache.seatunnel.api.table.catalog.TableIdentifier; +import org.apache.seatunnel.api.table.catalog.TableSchema; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; -import org.apache.seatunnel.common.config.CheckConfigUtil; -import org.apache.seatunnel.common.config.CheckResult; -import org.apache.seatunnel.transform.common.AbstractSeaTunnelTransform; +import org.apache.seatunnel.transform.common.AbstractCatalogSupportTransform; import org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType; import com.google.auto.service.AutoService; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.extern.slf4j.Slf4j; + +import java.util.ArrayList; +import java.util.List; import static org.apache.seatunnel.transform.sql.SQLEngineFactory.EngineType.ZETA; +@Slf4j +@NoArgsConstructor @AutoService(SeaTunnelTransform.class) -public class SQLTransform extends AbstractSeaTunnelTransform { +public class SQLTransform extends AbstractCatalogSupportTransform { + public static final String PLUGIN_NAME = "Sql"; public static final Option KEY_QUERY = Options.key("query").stringType().noDefaultValue().withDescription("The query SQL"); @@ -51,22 +69,46 @@ public class SQLTransform extends AbstractSeaTunnelTransform { private transient SQLEngine sqlEngine; + public SQLTransform(@NonNull ReadonlyConfig config, @NonNull CatalogTable catalogTable) { + super(catalogTable); + this.query = config.get(KEY_QUERY); + if (config.getOptional(KEY_ENGINE).isPresent()) { + this.engineType = EngineType.valueOf(config.get(KEY_ENGINE).toUpperCase()); + } else { + this.engineType = ZETA; + } + + List sourceTableNames = config.get(CommonOptions.SOURCE_TABLE_NAME); + if (sourceTableNames != null && !sourceTableNames.isEmpty()) { + this.inputTableName = sourceTableNames.get(0); + } else { + this.inputTableName = catalogTable.getTableId().getTableName(); + } + List columns = catalogTable.getTableSchema().getColumns(); + String[] fieldNames = new String[columns.size()]; + SeaTunnelDataType[] fieldTypes = new SeaTunnelDataType[columns.size()]; + for (int i = 0; i < columns.size(); i++) { + Column column = columns.get(i); + fieldNames[i] = column.getName(); + fieldTypes[i] = column.getDataType(); + } + this.inputRowType = new SeaTunnelRowType(fieldNames, fieldTypes); + } + @Override public String getPluginName() { - return "Sql"; + return PLUGIN_NAME; } @Override protected void setConfig(Config pluginConfig) { - CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY.key()); - if (!checkResult.isSuccess()) { - throw new IllegalArgumentException("Failed to check config! " + checkResult.getMsg()); - } - query = pluginConfig.getString(KEY_QUERY.key()); - if (pluginConfig.hasPath(KEY_ENGINE.key())) { - engineType = EngineType.valueOf(pluginConfig.getString(KEY_ENGINE.key()).toUpperCase()); + ReadonlyConfig readonlyConfig = ReadonlyConfig.fromConfig(pluginConfig); + ConfigValidator.of(readonlyConfig).validate(new SQLTransformFactory().optionRule()); + this.query = readonlyConfig.get(KEY_QUERY); + if (readonlyConfig.getOptional(KEY_ENGINE).isPresent()) { + this.engineType = EngineType.valueOf(readonlyConfig.get(KEY_ENGINE).toUpperCase()); } else { - engineType = ZETA; + this.engineType = ZETA; } } @@ -85,7 +127,7 @@ private void tryOpen() { @Override protected SeaTunnelRowType transformRowType(SeaTunnelRowType inputRowType) { tryOpen(); - return sqlEngine.typeMapping(); + return sqlEngine.typeMapping(null); } @Override @@ -94,6 +136,97 @@ protected SeaTunnelRow transformRow(SeaTunnelRow inputRow) { return sqlEngine.transformBySQL(inputRow); } + @Override + protected TableSchema transformTableSchema() { + tryOpen(); + List inputColumnsMapping = new ArrayList<>(); + SeaTunnelRowType outRowType = sqlEngine.typeMapping(inputColumnsMapping); + + TableSchema.Builder builder = TableSchema.builder(); + if (inputCatalogTable.getTableSchema().getPrimaryKey() != null) { + List outPkColumnNames = new ArrayList<>(); + for (String pkColumnName : + inputCatalogTable.getTableSchema().getPrimaryKey().getColumnNames()) { + for (int i = 0; i < inputColumnsMapping.size(); i++) { + if (pkColumnName.equals(inputColumnsMapping.get(i))) { + outPkColumnNames.add(outRowType.getFieldName(i)); + } + } + } + if (!outPkColumnNames.isEmpty()) { + builder.primaryKey( + PrimaryKey.of( + inputCatalogTable.getTableSchema().getPrimaryKey().getPrimaryKey(), + outPkColumnNames)); + } + } + if (inputCatalogTable.getTableSchema().getConstraintKeys() != null) { + List outConstraintKey = new ArrayList<>(); + for (ConstraintKey constraintKey : + inputCatalogTable.getTableSchema().getConstraintKeys()) { + List outConstraintColumnKeys = new ArrayList<>(); + for (ConstraintKey.ConstraintKeyColumn constraintKeyColumn : + constraintKey.getColumnNames()) { + String constraintColumnName = constraintKeyColumn.getColumnName(); + for (int i = 0; i < inputColumnsMapping.size(); i++) { + if (constraintColumnName.equals(inputColumnsMapping.get(i))) { + outConstraintColumnKeys.add( + ConstraintKey.ConstraintKeyColumn.of( + outRowType.getFieldName(i), + constraintKeyColumn.getSortType())); + } + } + } + if (!outConstraintColumnKeys.isEmpty()) { + outConstraintKey.add( + ConstraintKey.of( + constraintKey.getConstraintType(), + constraintKey.getConstraintName(), + outConstraintColumnKeys)); + } + } + if (!outConstraintKey.isEmpty()) { + builder.constraintKey(outConstraintKey); + } + } + + String[] fieldNames = outRowType.getFieldNames(); + SeaTunnelDataType[] fieldTypes = outRowType.getFieldTypes(); + List columns = new ArrayList<>(fieldNames.length); + for (int i = 0; i < fieldNames.length; i++) { + Column simpleColumn = null; + String inputColumnName = inputColumnsMapping.get(i); + if (inputColumnName != null) { + for (Column inputColumn : inputCatalogTable.getTableSchema().getColumns()) { + if (inputColumnName.equals(inputColumn.getName())) { + simpleColumn = inputColumn; + break; + } + } + } + Column column; + if (simpleColumn != null) { + column = + PhysicalColumn.of( + fieldNames[i], + fieldTypes[i], + simpleColumn.getColumnLength(), + simpleColumn.isNullable(), + simpleColumn.getDefaultValue(), + simpleColumn.getComment()); + } else { + column = PhysicalColumn.of(fieldNames[i], fieldTypes[i], 0, true, null, null); + } + columns.add(column); + } + return builder.columns(columns).build(); + } + + @Override + protected TableIdentifier transformTableIdentifier() { + return inputCatalogTable.getTableId().copy(); + } + @Override public void close() { sqlEngine.close(); diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java index 3b97c96a755..f509af832cb 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/SQLTransformFactory.java @@ -18,7 +18,10 @@ package org.apache.seatunnel.transform.sql; import org.apache.seatunnel.api.configuration.util.OptionRule; +import org.apache.seatunnel.api.table.catalog.CatalogTable; +import org.apache.seatunnel.api.table.connector.TableTransform; import org.apache.seatunnel.api.table.factory.Factory; +import org.apache.seatunnel.api.table.factory.TableFactoryContext; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import com.google.auto.service.AutoService; @@ -29,11 +32,17 @@ public class SQLTransformFactory implements TableTransformFactory { @Override public String factoryIdentifier() { - return "Sql"; + return SQLTransform.PLUGIN_NAME; } @Override public OptionRule optionRule() { return OptionRule.builder().required(KEY_QUERY).build(); } + + @Override + public TableTransform createTransform(TableFactoryContext context) { + CatalogTable catalogTable = context.getCatalogTable(); + return () -> new SQLTransform(context.getOptions(), catalogTable); + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index 62fa8d90153..fa491e7e6e5 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -38,11 +38,14 @@ import net.sf.jsqlparser.statement.select.SelectItem; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.ServiceLoader; +import java.util.stream.Collectors; public class ZetaSQLEngine implements SQLEngine { private String inputTableName; + private SeaTunnelRowType inputRowType; private String sql; private PlainSelect selectBody; @@ -56,6 +59,7 @@ public ZetaSQLEngine() {} @Override public void init(String inputTableName, SeaTunnelRowType inputRowType, String sql) { this.inputTableName = inputTableName; + this.inputRowType = inputRowType; this.sql = sql; List udfList = new ArrayList<>(); @@ -140,11 +144,19 @@ private void validateSQL(Statement statement) { } @Override - public SeaTunnelRowType typeMapping() { + public SeaTunnelRowType typeMapping(List inputColumnsMapping) { List selectItems = selectBody.getSelectItems(); String[] fieldNames = new String[selectItems.size()]; SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[selectItems.size()]; + if (inputColumnsMapping != null) { + for (int i = 0; i < selectItems.size(); i++) { + inputColumnsMapping.add(null); + } + } + + List inputColumnNames = + Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList()); for (int i = 0; i < selectItems.size(); i++) { SelectItem selectItem = selectItems.get(i); @@ -162,6 +174,12 @@ public SeaTunnelRowType typeMapping() { } } + if (inputColumnsMapping != null + && expression instanceof Column + && inputColumnNames.contains(((Column) expression).getColumnName())) { + inputColumnsMapping.set(i, ((Column) expression).getColumnName()); + } + seaTunnelDataTypes[i] = zetaSQLType.getExpressionType(expression); } } @@ -183,7 +201,10 @@ public SeaTunnelRow transformBySQL(SeaTunnelRow inputRow) { // Project Object[] outputFields = project(inputFields); - return new SeaTunnelRow(outputFields); + SeaTunnelRow seaTunnelRow = new SeaTunnelRow(outputFields); + seaTunnelRow.setRowKind(inputRow.getRowKind()); + seaTunnelRow.setTableId(inputRow.getTableId()); + return seaTunnelRow; } private Object[] scanTable(SeaTunnelRow inputRow) {