Skip to content

Commit 9d201bb

Browse files
authored
[Fix][transform-v2]SQL transform support max/min function (apache#8625)
1 parent b159cc0 commit 9d201bb

File tree

7 files changed

+228
-0
lines changed

7 files changed

+228
-0
lines changed

docs/en/transform-v2/sql-functions.md

+20
Original file line numberDiff line numberDiff line change
@@ -385,6 +385,26 @@ Example:
385385

386386
ACOS(D)
387387

388+
### ARRAY_MAX
389+
390+
```ARRAY_MAX(ARRAY)```
391+
392+
The MAX function returns the maximum value of the expression.
393+
394+
Example:
395+
396+
ARRAY_MAX(I)
397+
398+
### ARRAY_MIN
399+
400+
```ARRAY_MIN(ARRAY)```
401+
402+
The MIN function returns the minimum value of the expression.
403+
404+
Example:
405+
406+
ARRAY_MIN(I)
407+
388408
### ASIN
389409

390410
```ASIN(numeric)```

docs/zh/transform-v2/sql-functions.md

+21
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,27 @@ ABS(I)
386386

387387
ACOS(D)
388388

389+
### ARRAY_MAX
390+
391+
```ARRAY_MAX(ARRAY)```
392+
393+
MAX 函数返回表达式的最大值。
394+
395+
示例:
396+
397+
ARRAY_MAX(I)
398+
399+
### ARRAY_MIN
400+
401+
```ARRAY_MIN(ARRAY)```
402+
403+
MIN 函数返回表达式的最小值。
404+
405+
示例:
406+
407+
ARRAY_MIN(I)
408+
409+
389410
### ASIN
390411

391412
```ASIN(numeric)```

seatunnel-e2e/seatunnel-transforms-v2-e2e/seatunnel-transforms-v2-e2e-part-2/src/test/java/org/apache/seatunnel/e2e/transform/TestSQLIT.java

+4
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ public void testSQLTransform(TestContainer container) throws IOException, Interr
7777

7878
Container.ExecResult splitSql = container.executeJob("/sql_transform/func_split.conf");
7979
Assertions.assertEquals(0, splitSql.getExitCode());
80+
81+
Container.ExecResult maxMinSql =
82+
container.executeJob("/sql_transform/func_array_max_min.conf");
83+
Assertions.assertEquals(0, maxMinSql.getExitCode());
8084
}
8185

8286
@TestTemplate
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
######
18+
###### This config file is a demonstration of streaming processing in seatunnel config
19+
######
20+
21+
22+
env {
23+
job.mode = "BATCH"
24+
parallelism = 1
25+
}
26+
27+
source {
28+
FakeSource {
29+
plugin_output = "fake"
30+
schema = {
31+
fields {
32+
c_string = string
33+
c_num_array = "array<int>"
34+
c_string_array = "array<string>"
35+
}
36+
}
37+
rows = [
38+
{
39+
kind = INSERT
40+
fields = ["c_string",[1,2,3], ["a","b","c"]]
41+
}
42+
]
43+
}
44+
}
45+
46+
transform {
47+
Sql {
48+
plugin_input = "fake"
49+
plugin_output = "fake1"
50+
query = """select c_string,
51+
ARRAY_MAX(c_num_array) as c_num_max_array,
52+
ARRAY_MIN(c_num_array) as c_num_min_array,
53+
ARRAY_MAX(c_string_array) as c_string_max_array,
54+
ARRAY_MIN(c_string_array) as c_string_min_array
55+
from fake1"""
56+
}
57+
}
58+
59+
sink {
60+
Assert {
61+
plugin_input = "fake1"
62+
rules =
63+
{
64+
row_rules = [
65+
{
66+
rule_type = MIN_ROW
67+
rule_value = 1
68+
},
69+
{
70+
rule_type = MAX_ROW
71+
rule_value = 1
72+
}
73+
],
74+
field_rules = [
75+
{
76+
field_name = "c_string"
77+
field_type = "string"
78+
field_value = [
79+
{equals_to = "c_string"}
80+
]
81+
},
82+
{
83+
field_name = "c_num_max_array"
84+
field_type = "int"
85+
field_value = [
86+
{equals_to = 3}
87+
]
88+
},
89+
{
90+
field_name = "c_num_min_array"
91+
field_type = "int"
92+
field_value = [
93+
{equals_to = 1}
94+
]
95+
},
96+
{
97+
field_name = "c_string_max_array"
98+
field_type = "string"
99+
field_value = [
100+
{equals_to = "c"}
101+
]
102+
},
103+
{
104+
field_name = "c_string_min_array"
105+
field_type = "string"
106+
field_value = [
107+
{equals_to = "a"}
108+
]
109+
}
110+
]
111+
}
112+
}
113+
}

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLFunction.java

+6
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ public class ZetaSQLFunction {
148148
public static final String SIGN = "SIGN";
149149
public static final String TRUNC = "TRUNC";
150150
public static final String TRUNCATE = "TRUNCATE";
151+
public static final String ARRAY_MAX = "ARRAY_MAX";
152+
public static final String ARRAY_MIN = "ARRAY_MIN";
151153

152154
// -------------------------time and date functions----------------------------
153155
public static final String CURRENT_DATE = "CURRENT_DATE";
@@ -554,6 +556,10 @@ public Object executeFunctionExpr(String functionName, List<Object> args) {
554556
return SystemFunction.nullif(args);
555557
case ARRAY:
556558
return ArrayFunction.array(args);
559+
case ARRAY_MAX:
560+
return ArrayFunction.arrayMax(args);
561+
case ARRAY_MIN:
562+
return ArrayFunction.arrayMin(args);
557563
case UUID:
558564
return randomUUID().toString();
559565
default:

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/ZetaSQLType.java

+3
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,9 @@ private SeaTunnelDataType<?> getFunctionType(Function function) {
450450
return BasicType.DOUBLE_TYPE;
451451
case ZetaSQLFunction.ARRAY:
452452
return ArrayFunction.castArrayTypeMapping(function, inputRowType);
453+
case ZetaSQLFunction.ARRAY_MAX:
454+
case ZetaSQLFunction.ARRAY_MIN:
455+
return ArrayFunction.getElementType(function, inputRowType);
453456
case ZetaSQLFunction.SPLIT:
454457
return ArrayType.STRING_ARRAY_TYPE;
455458
case ZetaSQLFunction.NOW:

seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/sql/zeta/functions/ArrayFunction.java

+61
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,11 @@
1717
package org.apache.seatunnel.transform.sql.zeta.functions;
1818

1919
import org.apache.seatunnel.api.table.type.ArrayType;
20+
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
2021
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
22+
import org.apache.seatunnel.common.exception.CommonErrorCode;
2123
import org.apache.seatunnel.common.utils.SeaTunnelException;
24+
import org.apache.seatunnel.transform.exception.TransformException;
2225

2326
import net.sf.jsqlparser.expression.DoubleValue;
2427
import net.sf.jsqlparser.expression.Expression;
@@ -30,10 +33,60 @@
3033
import net.sf.jsqlparser.schema.Column;
3134

3235
import java.util.ArrayList;
36+
import java.util.Arrays;
37+
import java.util.Comparator;
3338
import java.util.List;
3439

3540
public class ArrayFunction {
3641

42+
public static Object arrayMax(List<Object> args) {
43+
if (args == null || args.isEmpty()) {
44+
return null;
45+
}
46+
Object[] dataList = (Object[]) args.get(0);
47+
if (dataList == null || dataList.length == 0) {
48+
return null;
49+
}
50+
if (dataList[0] instanceof String) {
51+
return Arrays.stream(dataList)
52+
.map(String.class::cast)
53+
.max(String::compareTo)
54+
.orElse(null);
55+
} else if (dataList[0] instanceof Number) {
56+
return Arrays.stream(dataList)
57+
.map(Number.class::cast)
58+
.max(Comparator.comparingDouble(Number::doubleValue))
59+
.orElse(null);
60+
}
61+
throw new TransformException(
62+
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
63+
String.format("Unsupported function max() arguments: %s", args));
64+
}
65+
66+
public static Object arrayMin(List<Object> args) {
67+
if (args == null || args.isEmpty()) {
68+
return null;
69+
}
70+
Object[] dataList = (Object[]) args.get(0);
71+
if (dataList == null || dataList.length == 0) {
72+
return null;
73+
}
74+
if (dataList[0] instanceof String) {
75+
return Arrays.stream(dataList)
76+
.map(String.class::cast)
77+
.min(String::compareTo)
78+
.orElse(null);
79+
} else if (dataList[0] instanceof Number) {
80+
return Arrays.stream(dataList)
81+
.map(Number.class::cast)
82+
.min(Comparator.comparingDouble(Number::doubleValue))
83+
.orElse(null);
84+
}
85+
throw new TransformException(
86+
CommonErrorCode.UNSUPPORTED_DATA_TYPE,
87+
String.format("Unsupported function max() arguments: %s", args));
88+
}
89+
3790
public static Object[] array(List<Object> args) {
3891
if (args == null || args.isEmpty()) {
3992
return new Object[0];
@@ -150,6 +203,14 @@ private static Class<?> getDataClassType(List<Object> args) {
150203
return arrayType == null ? String.class : arrayType;
151204
}
152205

206+
public static SeaTunnelDataType<?> getElementType(
207+
Function function, SeaTunnelRowType inputRowType) {
208+
String columnName = function.getParameters().getExpressions().get(0).toString();
209+
int columnIndex = inputRowType.indexOf(columnName);
210+
ArrayType arrayType = (ArrayType) inputRowType.getFieldType(columnIndex);
211+
return arrayType.getElementType();
212+
}
213+
153214
private static List<Class<?>> getFunctionArgs(
154215
Function function, SeaTunnelRowType inputRowType) {
155216
ExpressionList<Expression> expressionList =

0 commit comments

Comments
 (0)