Skip to content

Commit

Permalink
[FLINK-29138][table-planner] fix project can not be pushed into looku…
Browse files Browse the repository at this point in the history
…p source

This closes #20729
  • Loading branch information
lincoln-lil authored Sep 5, 2022
1 parent d9ac4c7 commit 54b6b69
Show file tree
Hide file tree
Showing 9 changed files with 517 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.calcite.plan.RelOptRule;
import org.apache.calcite.plan.RelOptRuleCall;
import org.apache.calcite.plan.RelRule;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.logical.LogicalProject;
import org.apache.calcite.rel.logical.LogicalSnapshot;

/** Transpose {@link LogicalProject} past into {@link LogicalSnapshot}. */
public class ProjectSnapshotTransposeRule extends RelRule<ProjectSnapshotTransposeRule.Config> {

public static final RelOptRule INSTANCE =
ProjectSnapshotTransposeRule.Config.EMPTY.as(Config.class).withOperator().toRule();

public ProjectSnapshotTransposeRule(Config config) {
super(config);
}

@Override
public boolean matches(RelOptRuleCall call) {
LogicalProject project = call.rel(0);
// Don't push a project which contains over into a snapshot, snapshot on window aggregate is
// unsupported for now.
return !project.containsOver();
}

@Override
public void onMatch(RelOptRuleCall call) {
LogicalProject project = call.rel(0);
LogicalSnapshot snapshot = call.rel(1);
RelNode newProject = project.copy(project.getTraitSet(), snapshot.getInputs());
RelNode newSnapshot =
snapshot.copy(snapshot.getTraitSet(), newProject, snapshot.getPeriod());
call.transformTo(newSnapshot);
}

/** Configuration for {@link ProjectSnapshotTransposeRule}. */
public interface Config extends RelRule.Config {

@Override
default RelOptRule toRule() {
return new ProjectSnapshotTransposeRule(this);
}

default ProjectSnapshotTransposeRule.Config withOperator() {
final RelRule.OperandTransform snapshotTransform =
operandBuilder -> operandBuilder.operand(LogicalSnapshot.class).noInputs();

final RelRule.OperandTransform projectTransform =
operandBuilder ->
operandBuilder
.operand(LogicalProject.class)
.oneInput(snapshotTransform);

return withOperandSupplier(projectTransform).as(Config.class);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ object FlinkBatchRuleSets {
PushProjectIntoLegacyTableSourceScanRule.INSTANCE,
PushFilterIntoTableSourceScanRule.INSTANCE,
PushFilterIntoLegacyTableSourceScanRule.INSTANCE,

// transpose project and snapshot for scan optimization
ProjectSnapshotTransposeRule.INSTANCE,
// reorder sort and projection
CoreRules.SORT_PROJECT_TRANSPOSE,
// remove unnecessary sort rule
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,9 @@ object FlinkStreamRuleSets {
PushFilterIntoLegacyTableSourceScanRule.INSTANCE,
PushLimitIntoTableSourceScanRule.INSTANCE,

// transpose project and snapshot for scan optimization
ProjectSnapshotTransposeRule.INSTANCE,

// reorder the project and watermark assigner
ProjectWatermarkAssignerTransposeRule.INSTANCE,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ public DynamicTableSource createDynamicTableSource(Context context) {
}
} else {
return new TestValuesScanLookupTableSource(
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType(),
producedDataType,
changelogMode,
isBounded,
Expand Down Expand Up @@ -892,7 +893,7 @@ protected Collection<RowData> convertToRowData(DataStructureConverter converter)
return result;
}

private Row projectRow(Row row) {
protected Row projectRow(Row row) {
if (projectedPhysicalFields == null) {
return row;
}
Expand Down Expand Up @@ -1082,7 +1083,10 @@ private static class TestValuesScanLookupTableSource extends TestValuesScanTable
private final @Nullable String lookupFunctionClass;
private final boolean isAsync;

private final DataType originType;

private TestValuesScanLookupTableSource(
DataType originType,
DataType producedDataType,
ChangelogMode changelogMode,
boolean bounded,
Expand Down Expand Up @@ -1116,6 +1120,7 @@ private TestValuesScanLookupTableSource(
allPartitions,
readableMetadata,
projectedMetadataFields);
this.originType = originType;
this.lookupFunctionClass = lookupFunctionClass;
this.isAsync = isAsync;
}
Expand Down Expand Up @@ -1158,20 +1163,24 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
data = data.subList(numElementToSkip, data.size());
}
}

if (nestedProjectionSupported) {
throw new UnsupportedOperationException(
"nestedProjectionSupported is unsupported for lookup source currently.");
}
data.forEach(
record -> {
Row projected = projectRow(record);
Row key =
Row.of(
Arrays.stream(lookupIndices)
.mapToObj(record::getField)
.mapToObj(projected::getField)
.toArray());
List<Row> list = mapping.get(key);
if (list != null) {
list.add(record);
list.add(projected);
} else {
list = new ArrayList<>();
list.add(record);
list.add(projected);
mapping.put(key, list);
}
});
Expand All @@ -1185,6 +1194,7 @@ record -> {
@Override
public DynamicTableSource copy() {
return new TestValuesScanLookupTableSource(
originType,
producedDataType,
changelogMode,
bounded,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.rules.logical;

import org.apache.flink.table.api.TableConfig;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.planner.plan.optimize.program.FlinkBatchProgram;
import org.apache.flink.table.planner.plan.optimize.program.FlinkStreamProgram;
import org.apache.flink.table.planner.utils.BatchTableTestUtil;
import org.apache.flink.table.planner.utils.StreamTableTestUtil;
import org.apache.flink.table.planner.utils.TableTestBase;
import org.apache.flink.table.planner.utils.TableTestUtil;

import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;

/** Test rule {@link ProjectSnapshotTransposeRule}. */
@RunWith(Parameterized.class)
public class ProjectSnapshotTransposeRuleTest extends TableTestBase {

private static final String STREAM = "stream";
private static final String BATCH = "batch";

@Parameterized.Parameter public String mode;

@Parameterized.Parameters(name = "mode = {0}")
public static Collection<String> parameters() {
return Arrays.asList(STREAM, BATCH);
}

private TableTestUtil util;

@Before
public void setup() {
boolean isStreaming = STREAM.equals(mode);
if (isStreaming) {
util = streamTestUtil(TableConfig.getDefault());
((StreamTableTestUtil) util).buildStreamProgram(FlinkStreamProgram.LOGICAL_REWRITE());
} else {
util = batchTestUtil(TableConfig.getDefault());
((BatchTableTestUtil) util).buildBatchProgram(FlinkBatchProgram.LOGICAL_REWRITE());
}

TableEnvironment tEnv = util.getTableEnv();
String src =
String.format(
"CREATE TABLE MyTable (\n"
+ " a int,\n"
+ " b varchar,\n"
+ " c bigint,\n"
+ " proctime as PROCTIME(),\n"
+ " rowtime as TO_TIMESTAMP(FROM_UNIXTIME(c)),\n"
+ " watermark for rowtime as rowtime - INTERVAL '1' second \n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = '%s')",
!isStreaming);
String lookup =
String.format(
"CREATE TABLE LookupTable (\n"
+ " id int,\n"
+ " name varchar,\n"
+ " age int \n"
+ ") with (\n"
+ " 'connector' = 'values',\n"
+ " 'bounded' = '%s')",
!isStreaming);
tEnv.executeSql(src);
tEnv.executeSql(lookup);
}

@Test
public void testJoinTemporalTableWithProjectionPushDown() {
String sql =
"SELECT T.*, D.id\n"
+ "FROM MyTable AS T\n"
+ "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ "ON T.a = D.id";

util.verifyRelPlan(sql);
}

@Test
public void testJoinTemporalTableNotProjectable() {
String sql =
"SELECT T.*, D.*\n"
+ "FROM MyTable AS T\n"
+ "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ "ON T.a = D.id";

util.verifyRelPlan(sql);
}

@Test
public void testJoinTemporalTableWithReorderedProject() {
String sql =
"SELECT T.*, D.age, D.name, D.id\n"
+ "FROM MyTable AS T\n"
+ "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ "ON T.a = D.id";

util.verifyRelPlan(sql);
}

@Test
public void testJoinTemporalTableWithProjectAndFilter() {
String sql =
"SELECT T.*, D.id\n"
+ "FROM MyTable AS T\n"
+ "JOIN LookupTable FOR SYSTEM_TIME AS OF T.proctime AS D\n"
+ "ON T.a = D.id WHERE D.age > 20";

util.verifyRelPlan(sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ FlinkLogicalAggregate(group=[{0}], EXPR$1=[COUNT($1)], EXPR$2=[SUM($2)], EXPR$3=
: +- FlinkLogicalDataStreamTableScan(table=[[default_catalog, default_database, T1]], fields=[a, b, c, d])
+- FlinkLogicalSnapshot(period=[$cor0.proctime])
+- FlinkLogicalCalc(select=[id], where=[>(age, 10)])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable]], fields=[id, name, age])
+- FlinkLogicalTableSourceScan(table=[[default_catalog, default_database, LookupTable, project=[id, age], metadata=[]]], fields=[id, age])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,28 @@
"bounded" : "false",
"schema.0.name" : "id",
"schema.1.data-type" : "VARCHAR(2147483647)"
}
},
"sourceAbilitySpecs" : [ {
"type" : "ProjectPushDown",
"projectedFields" : [ [ 0 ] ],
"producedType" : {
"type" : "ROW",
"nullable" : false,
"fields" : [ {
"id" : "INT"
} ]
}
}, {
"type" : "ReadingMetadata",
"metadataKeys" : [ ],
"producedType" : {
"type" : "ROW",
"nullable" : false,
"fields" : [ {
"id" : "INT"
} ]
}
} ]
},
"outputType" : {
"structKind" : "FULLY_QUALIFIED",
Expand All @@ -245,15 +266,6 @@
"typeName" : "INTEGER",
"nullable" : true,
"fieldName" : "id"
}, {
"typeName" : "VARCHAR",
"nullable" : true,
"precision" : 2147483647,
"fieldName" : "name"
}, {
"typeName" : "INTEGER",
"nullable" : true,
"fieldName" : "age"
} ]
}
},
Expand All @@ -263,14 +275,7 @@
"index" : 0
}
},
"projectionOnTemporalTable" : [ {
"kind" : "INPUT_REF",
"inputIndex" : 0,
"type" : {
"typeName" : "INTEGER",
"nullable" : true
}
} ],
"projectionOnTemporalTable" : null,
"filterOnTemporalTable" : null,
"id" : 4,
"inputProperties" : [ {
Expand Down
Loading

0 comments on commit 54b6b69

Please sign in to comment.