Skip to content

Commit

Permalink
[Feature][Transform-V2][SQL] Support 'select *' and 'like' clause for…
Browse files Browse the repository at this point in the history
… SQL Transform plugin (apache#4991)

Co-authored-by: mcy <rewrma@163.com>
  • Loading branch information
2 people authored and liunaijie committed Jul 13, 2023
1 parent fec8cba commit 8fe6bde
Show file tree
Hide file tree
Showing 5 changed files with 222 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,5 +49,8 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
Container.ExecResult sqlCriteriaFilter =
container.executeJob("/sql_transform/criteria_filter.conf");
Assertions.assertEquals(0, sqlCriteriaFilter.getExitCode());
Container.ExecResult sqlAllColumns =
container.executeJob("/sql_transform/sql_all_columns.conf");
Assertions.assertEquals(0, sqlAllColumns.getExitCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select id,name,age from fake where id=1 and id!=0 and name<>'Kin Dom' and (age>=20 or age<22) and regexp_like(name, '[A-Z ]*') and id>0 and id>=1 and id in (1,2,3,4) and id not in (5,6,7) and name is not null and email is null and id<4 and id<=4"
query = "select id,name,age from fake where id=1 and id!=0 and name<>'Kin Dom' and (age>=20 or age<22) and regexp_like(name, '[A-Z ]*') and id>0 and id>=1 and id in (1,2,3,4) and id not in (5,6,7) and name is not null and email is null and id<4 and id<=4 and name like '%Din_'"
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
######
###### This config file is a demonstration of streaming processing in seatunnel config
######

env {
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
row.num = 100
schema = {
fields {
id = "int"
name = "string"
age = "int"
c_timestamp = "timestamp"
c_date = "date"
c_map = "map<string, string>"
c_array = "array<int>"
c_decimal = "decimal(30, 8)"
}
}
}
}

transform {
Sql {
source_table_name = "fake"
result_table_name = "fake1"
query = "select *, id as id_ from fake"
}
}

sink {
Console {
source_table_name = "fake1"
}
Assert {
source_table_name = "fake1"
rules =
{
row_rules = [
{
rule_type = MIN_ROW
rule_value = 100
}
]
field_rules = [
{
field_name = id
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = name
field_type = string
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = age
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = c_timestamp
field_type = timestamp
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = c_date
field_type = date
field_value = [
{
rule_type = NOT_NULL
}
]
},
{
field_name = id_
field_type = int
field_value = [
{
rule_type = NOT_NULL
}
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ public class ZetaSQLEngine implements SQLEngine {
private ZetaSQLFilter zetaSQLFilter;
private ZetaSQLType zetaSQLType;

private Integer allColumnsCount = null;

public ZetaSQLEngine() {}

@Override
Expand Down Expand Up @@ -131,11 +133,11 @@ private void validateSQL(Statement statement) {
throw new IllegalArgumentException("Unsupported LIMIT,OFFSET syntax");
}

for (SelectItem selectItem : selectBody.getSelectItems()) {
if (selectItem instanceof AllColumns) {
throw new IllegalArgumentException("Unsupported all columns select syntax");
}
}
// for (SelectItem selectItem : selectBody.getSelectItems()) {
// if (selectItem instanceof AllColumns) {
// throw new IllegalArgumentException("Unsupported all columns select syntax");
// }
// }
} catch (Exception e) {
throw new TransformException(
CommonErrorCode.UNSUPPORTED_OPERATION,
Expand All @@ -147,40 +149,55 @@ private void validateSQL(Statement statement) {
public SeaTunnelRowType typeMapping(List<String> inputColumnsMapping) {
List<SelectItem> selectItems = selectBody.getSelectItems();

String[] fieldNames = new String[selectItems.size()];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[selectItems.size()];
// count number of all columns
int columnsSize = countColumnsSize(selectItems);

String[] fieldNames = new String[columnsSize];
SeaTunnelDataType<?>[] seaTunnelDataTypes = new SeaTunnelDataType<?>[columnsSize];
if (inputColumnsMapping != null) {
for (int i = 0; i < selectItems.size(); i++) {
for (int i = 0; i < columnsSize; i++) {
inputColumnsMapping.add(null);
}
}

List<String> inputColumnNames =
Arrays.stream(inputRowType.getFieldNames()).collect(Collectors.toList());

for (int i = 0; i < selectItems.size(); i++) {
SelectItem selectItem = selectItems.get(i);
if (selectItem instanceof SelectExpressionItem) {
int idx = 0;
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
for (int i = 0; i < inputRowType.getFieldNames().length; i++) {
fieldNames[idx] = inputRowType.getFieldName(i);
seaTunnelDataTypes[idx] = inputRowType.getFieldType(i);
if (inputColumnsMapping != null) {
inputColumnsMapping.set(idx, inputRowType.getFieldName(i));
}
idx++;
}
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();

if (expressionItem.getAlias() != null) {
fieldNames[i] = expressionItem.getAlias().getName();
fieldNames[idx] = expressionItem.getAlias().getName();
} else {
if (expression instanceof Column) {
fieldNames[i] = ((Column) expression).getColumnName();
fieldNames[idx] = ((Column) expression).getColumnName();
} else {
fieldNames[i] = expression.toString();
fieldNames[idx] = expression.toString();
}
}

if (inputColumnsMapping != null
&& expression instanceof Column
&& inputColumnNames.contains(((Column) expression).getColumnName())) {
inputColumnsMapping.set(i, ((Column) expression).getColumnName());
inputColumnsMapping.set(idx, ((Column) expression).getColumnName());
}

seaTunnelDataTypes[i] = zetaSQLType.getExpressionType(expression);
seaTunnelDataTypes[idx] = zetaSQLType.getExpressionType(expression);
idx++;
} else {
idx++;
}
}
return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
Expand Down Expand Up @@ -214,13 +231,47 @@ private Object[] scanTable(SeaTunnelRow inputRow) {

private Object[] project(Object[] inputFields) {
List<SelectItem> selectItems = selectBody.getSelectItems();
Object[] fields = new Object[selectItems.size()];
for (int i = 0; i < selectItems.size(); i++) {
SelectItem selectItem = selectItems.get(i);
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();
fields[i] = zetaSQLFunction.computeForValue(expression, inputFields);

int columnsSize = countColumnsSize(selectItems);

Object[] fields = new Object[columnsSize];
for (int i = 0; i < columnsSize; i++) {
fields[i] = null;
}

int idx = 0;
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
for (Object inputField : inputFields) {
fields[idx] = inputField;
idx++;
}
} else if (selectItem instanceof SelectExpressionItem) {
SelectExpressionItem expressionItem = (SelectExpressionItem) selectItem;
Expression expression = expressionItem.getExpression();
fields[idx] = zetaSQLFunction.computeForValue(expression, inputFields);
idx++;
} else {
idx++;
}
}
return fields;
}

private int countColumnsSize(List<SelectItem> selectItems) {
if (allColumnsCount != null) {
return allColumnsCount;
}
int allColumnsCnt = 0;
for (SelectItem selectItem : selectItems) {
if (selectItem instanceof AllColumns) {
allColumnsCnt++;
}
}
allColumnsCount =
selectItems.size()
+ inputRowType.getFieldNames().length * allColumnsCnt
- allColumnsCnt;
return allColumnsCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class ZetaSQLFilter {
private final ZetaSQLFunction zetaSQLFunction;
Expand Down Expand Up @@ -153,8 +155,25 @@ private boolean inExpr(InExpression inExpression, Object[] inputFields) {
* @return filter result
*/
private boolean likeExpr(LikeExpression likeExpression, Object[] inputFields) {
throw new TransformException(
CommonErrorCode.UNSUPPORTED_OPERATION, "Unsupported [LIKE] filter expression yet");
Expression leftExpr = likeExpression.getLeftExpression();
Object leftVal = zetaSQLFunction.computeForValue(leftExpr, inputFields);
if (leftVal == null) {
return false;
}
Expression rightExpr = likeExpression.getRightExpression();
Object rightVal = zetaSQLFunction.computeForValue(rightExpr, inputFields);
if (rightVal == null) {
return false;
}

String regex = rightVal.toString().replace("%", ".*").replace("_", ".");
if (regex.startsWith("'") && regex.endsWith("'")) {
regex = regex.substring(0, regex.length() - 1).substring(1);
}
Pattern pattern = Pattern.compile(regex);
Matcher matcher = pattern.matcher(leftVal.toString());

return matcher.matches();
}

private Pair<Object, Object> executeComparisonOperator(
Expand Down

0 comments on commit 8fe6bde

Please sign in to comment.