diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java index 3dc770c1d80..d54a2addaf5 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java @@ -49,5 +49,8 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr Container.ExecResult sqlCriteriaFilter = container.executeJob("/sql_transform/criteria_filter.conf"); Assertions.assertEquals(0, sqlCriteriaFilter.getExitCode()); + Container.ExecResult sqlAllColumns = + container.executeJob("/sql_transform/sql_all_columns.conf"); + Assertions.assertEquals(0, sqlAllColumns.getExitCode()); } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf index a8c51a87497..c1e9ecc3b0c 100644 --- a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/criteria_filter.conf @@ -47,7 +47,7 @@ transform { Sql { source_table_name = "fake" result_table_name = "fake1" - query = "select id,name,age from fake where id=1 and id!=0 and name<>'Kin Dom' and (age>=20 or age<22) and regexp_like(name, '[A-Z ]*') and id>0 and id>=1 and id in (1,2,3,4) and id not in (5,6,7) and name is not null and email is null and id<4 and id<=4" + query = "select id,name,age from fake where id=1 and id!=0 and name<>'Kin Dom' and (age>=20 or age<22) and regexp_like(name, '[A-Z ]*') and id>0 and id>=1 and id in (1,2,3,4) and id not in (5,6,7) and name is not null and email is null and id<4 and id<=4 and name like '%Din_'" } } diff --git a/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/sql_all_columns.conf b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/sql_all_columns.conf new file mode 100644 index 00000000000..0dfa9db5a7d --- /dev/null +++ b/seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/resources/sql_transform/sql_all_columns.conf @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + row.num = 100 + schema = { + fields { + id = "int" + name = "string" + age = "int" + c_timestamp = "timestamp" + c_date = "date" + c_map = "map" + c_array = "array" + c_decimal = "decimal(30, 8)" + } + } + } +} + +transform { + Sql { + source_table_name = "fake" + result_table_name = "fake1" + query = "select *, id as id_ from fake" + } +} + +sink { + Console { + source_table_name = "fake1" + } + Assert { + source_table_name = "fake1" + rules = + { + row_rules = [ + { + rule_type = MIN_ROW + rule_value = 100 + } + ] + field_rules = [ + { + field_name = id + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = name + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = age + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_timestamp + field_type = timestamp + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_date + field_type = date + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = id_ + field_type = int + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } + } +} diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java index fa491e7e6e5..55fbe04cf13 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLEngine.java @@ -54,6 +54,8 @@ public class ZetaSQLEngine implements SQLEngine { private ZetaSQLFilter zetaSQLFilter; private ZetaSQLType zetaSQLType; + private Integer allColumnsCount = null; + public ZetaSQLEngine() {} @Override @@ -131,11 +133,11 @@ private void validateSQL(Statement statement) { throw new IllegalArgumentException("Unsupported LIMIT,OFFSET syntax"); } - for (SelectItem selectItem : selectBody.getSelectItems()) { - if (selectItem instanceof AllColumns) { - throw new IllegalArgumentException("Unsupported all columns select syntax"); - } - } + // for (SelectItem selectItem : selectBody.getSelectItems()) { + // if (selectItem instanceof AllColumns) { + // throw new IllegalArgumentException("Unsupported all columns select syntax"); + // } + // } } catch (Exception e) { throw new TransformException( CommonErrorCode.UNSUPPORTED_OPERATION, @@ -147,10 +149,13 @@ private void validateSQL(Statement statement) { public SeaTunnelRowType typeMapping(List inputColumnsMapping) { List selectItems = selectBody.getSelectItems(); - String[] fieldNames = new String[selectItems.size()]; - SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[selectItems.size()]; + // count number of all columns + int columnsSize = countColumnsSize(selectItems); + + String[] fieldNames = new String[columnsSize]; + SeaTunnelDataType[] seaTunnelDataTypes = new SeaTunnelDataType[columnsSize]; if (inputColumnsMapping != null) { - for (int i = 0; i < selectItems.size(); i++) { + for (int i = 0; i < columnsSize; i++) { inputColumnsMapping.add(null); } } @@ -158,29 +163,41 @@ public SeaTunnelRowType typeMapping(List inputColumnsMapping) { List inputColumnNames = Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList()); - for (int i = 0; i < selectItems.size(); i++) { - SelectItem selectItem = selectItems.get(i); - if (selectItem instanceof SelectExpressionItem) { + int idx = 0; + for (SelectItem selectItem : selectItems) { + if (selectItem instanceof AllColumns) { + for (int i = 0; i < inputRowType.getFieldNames().length; i++) { + fieldNames[idx] = inputRowType.getFieldName(i); + seaTunnelDataTypes[idx] = inputRowType.getFieldType(i); + if (inputColumnsMapping != null) { + inputColumnsMapping.set(idx, inputRowType.getFieldName(i)); + } + idx++; + } + } else if (selectItem instanceof SelectExpressionItem) { SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem; Expression expression = expressionItem.getExpression(); if (expressionItem.getAlias() != null) { - fieldNames[i] = expressionItem.getAlias().getName(); + fieldNames[idx] = expressionItem.getAlias().getName(); } else { if (expression instanceof Column) { - fieldNames[i] = ((Column) expression).getColumnName(); + fieldNames[idx] = ((Column) expression).getColumnName(); } else { - fieldNames[i] = expression.toString(); + fieldNames[idx] = expression.toString(); } } if (inputColumnsMapping != null && expression instanceof Column && inputColumnNames.contains(((Column) expression).getColumnName())) { - inputColumnsMapping.set(i, ((Column) expression).getColumnName()); + inputColumnsMapping.set(idx, ((Column) expression).getColumnName()); } - seaTunnelDataTypes[i] = zetaSQLType.getExpressionType(expression); + seaTunnelDataTypes[idx] = zetaSQLType.getExpressionType(expression); + idx++; + } else { + idx++; } } return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes); @@ -214,13 +231,47 @@ private Object[] scanTable(SeaTunnelRow inputRow) { private Object[] project(Object[] inputFields) { List selectItems = selectBody.getSelectItems(); - Object[] fields = new Object[selectItems.size()]; - for (int i = 0; i < selectItems.size(); i++) { - SelectItem selectItem = selectItems.get(i); - SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem; - Expression expression = expressionItem.getExpression(); - fields[i] = zetaSQLFunction.computeForValue(expression, inputFields); + + int columnsSize = countColumnsSize(selectItems); + + Object[] fields = new Object[columnsSize]; + for (int i = 0; i < columnsSize; i++) { + fields[i] = null; + } + + int idx = 0; + for (SelectItem selectItem : selectItems) { + if (selectItem instanceof AllColumns) { + for (Object inputField : inputFields) { + fields[idx] = inputField; + idx++; + } + } else if (selectItem instanceof SelectExpressionItem) { + SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem; + Expression expression = expressionItem.getExpression(); + fields[idx] = zetaSQLFunction.computeForValue(expression, inputFields); + idx++; + } else { + idx++; + } } return fields; } + + private int countColumnsSize(List selectItems) { + if (allColumnsCount != null) { + return allColumnsCount; + } + int allColumnsCnt = 0; + for (SelectItem selectItem : selectItems) { + if (selectItem instanceof AllColumns) { + allColumnsCnt++; + } + } + allColumnsCount = + selectItems.size() + + inputRowType.getFieldNames().length * allColumnsCnt + - allColumnsCnt; + return allColumnsCount; + } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java index f444764a071..77b59c51bc6 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFilter.java @@ -42,6 +42,8 @@ import java.time.LocalDate; import java.time.LocalDateTime; import java.time.LocalTime; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class ZetaSQLFilter { private final ZetaSQLFunction zetaSQLFunction; @@ -153,8 +155,25 @@ private boolean inExpr(InExpression inExpression, Object[] inputFields) { * @return filter result */ private boolean likeExpr(LikeExpression likeExpression, Object[] inputFields) { - throw new TransformException( - CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported [LIKE] filter expression yet"); + Expression leftExpr = likeExpression.getLeftExpression(); + Object leftVal = zetaSQLFunction.computeForValue(leftExpr, inputFields); + if (leftVal == null) { + return false; + } + Expression rightExpr = likeExpression.getRightExpression(); + Object rightVal = zetaSQLFunction.computeForValue(rightExpr, inputFields); + if (rightVal == null) { + return false; + } + + String regex = rightVal.toString().replace("%", ".*").replace("_", "."); + if (regex.startsWith("'") && regex.endsWith("'")) { + regex = regex.substring(0, regex.length() - 1).substring(1); + } + Pattern pattern = Pattern.compile(regex); + Matcher matcher = pattern.matcher(leftVal.toString()); + + return matcher.matches(); } private Pair executeComparisonOperator(