diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java index 24c04937bbe2b9..c84c5212a5b47f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java @@ -262,6 +262,9 @@ public List buildRules() { filter.getExpressions(), project.getExpressions() )) ); + if (mvPlanWithoutAgg.getSelectedIndexId() == result.indexId) { + mvPlanWithoutAgg = mvPlanWithoutAgg.withPreAggStatus(result.preAggStatus); + } SlotContext slotContextWithoutAgg = generateBaseScanExprToMvExpr(mvPlanWithoutAgg); return agg.withChildren(new LogicalProject( @@ -745,22 +748,22 @@ public PreAggStatus visit(Expression expr, CheckContext context) { @Override public PreAggStatus visitAggregateFunction(AggregateFunction aggregateFunction, CheckContext context) { - return checkAggFunc(aggregateFunction, AggregateType.NONE, context); + return checkAggFunc(aggregateFunction, AggregateType.NONE, context, false); } @Override public PreAggStatus visitMax(Max max, CheckContext context) { - return checkAggFunc(max, AggregateType.MAX, context); + return checkAggFunc(max, AggregateType.MAX, context, true); } @Override public PreAggStatus visitMin(Min min, CheckContext context) { - return checkAggFunc(min, AggregateType.MIN, context); + return checkAggFunc(min, AggregateType.MIN, context, true); } @Override public PreAggStatus visitSum(Sum sum, CheckContext context) { - return checkAggFunc(sum, AggregateType.SUM, context); + return checkAggFunc(sum, AggregateType.SUM, context, false); } @Override @@ -829,7 +832,8 @@ public PreAggStatus visitHllUnion(HllUnion hllUnion, CheckContext context) { private PreAggStatus checkAggFunc( AggregateFunction aggFunc, AggregateType matchingAggType, - CheckContext ctx) { + CheckContext ctx, + boolean canUseKeyColumn) { String childNameWithFuncName = ctx.isBaseIndex() ? normalizeName(aggFunc.child(0).toSql()) : normalizeName(CreateMaterializedViewStmt.mvColumnBuilder( @@ -837,7 +841,7 @@ private PreAggStatus checkAggFunc( boolean contains = containsAllColumn(aggFunc.child(0), ctx.keyNameToColumn.keySet()); if (contains || ctx.keyNameToColumn.containsKey(childNameWithFuncName)) { - if (ctx.isDupKeysOrMergeOnWrite || (!ctx.isBaseIndex() && contains)) { + if (canUseKeyColumn || ctx.isDupKeysOrMergeOnWrite || (!ctx.isBaseIndex() && contains)) { return PreAggStatus.on(); } else { Column column = ctx.keyNameToColumn.get(childNameWithFuncName); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PreAggStatus.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PreAggStatus.java index 70d2a90db9b080..7affac49b2bc09 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PreAggStatus.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PreAggStatus.java @@ -21,6 +21,7 @@ /** * Pre-aggregate status for OLAP scan table. + * @see org.apache.doris.planner.OlapScanNode#isPreAggregation */ public class PreAggStatus { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java index d83a2f59f7fb79..c69a0b4dcf8378 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/logical/LogicalOlapScan.java @@ -42,6 +42,7 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import org.apache.commons.lang3.tuple.Pair; +import org.json.JSONObject; import java.util.Arrays; import java.util.List; @@ -429,4 +430,15 @@ private List createSlotsVectorized(List columns) { } return (List) Arrays.asList(slots); } + + @Override + public JSONObject toJson() { + JSONObject olapScan = super.toJson(); + JSONObject properties = new JSONObject(); + properties.put("OlapTable", table.getName()); + properties.put("SelectedIndexId", Long.toString(selectedIndexId)); + properties.put("PreAggStatus", preAggStatus.toString()); + olapScan.put("Properties", properties); + return olapScan; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index 61ffc770ae3905..bccd8a4962d36d 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -121,7 +121,8 @@ public class OlapScanNode extends ScanNode { * When the field value is ON, the storage engine can return the data directly * without pre-aggregation. * When the field value is OFF, the storage engine needs to aggregate the data - * before returning to scan node. + * before returning to scan node. And if the table is an aggregation table, + * all key columns need to be read an participate in aggregation. * For example: * Aggregate table: k1, k2, v1 sum * Field value is ON @@ -135,7 +136,9 @@ public class OlapScanNode extends ScanNode { * Query2: select k1, min(v1) from table group by k1 * This aggregation function in query is min which different from the schema. * So the data stored in storage engine need to be merged firstly before - * returning to scan node. + * returning to scan node. Although we only queried key column k1, key column + * k2 still needs to be detected and participate in aggregation to ensure the + * results are correct. * * There are currently two places to modify this variable: * 1. The turnOffPreAgg() method of SingleNodePlanner. diff --git a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/mv/SelectRollupIndexTest.java b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/mv/SelectRollupIndexTest.java index 706be618f9817e..0686edba64e01e 100644 --- a/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/mv/SelectRollupIndexTest.java +++ b/fe/fe-core/src/test/java/org/apache/doris/nereids/rules/rewrite/mv/SelectRollupIndexTest.java @@ -263,11 +263,10 @@ public void testKeyColumnInAggFunction() { })); } - @Disabled("reopen it if we fix rollup select bugs") @Test public void testMaxCanUseKeyColumn() { PlanChecker.from(connectContext) - .analyze("select k2, max(k3) from t group by k3") + .analyze("select k2, max(k3) from t group by k2") .applyTopDown(new SelectMaterializedIndexWithAggregate()) .applyTopDown(new SelectMaterializedIndexWithoutAggregate()) .matches(logicalOlapScan().when(scan -> { @@ -278,11 +277,10 @@ public void testMaxCanUseKeyColumn() { })); } - @Disabled("reopen it if we fix rollup select bugs") @Test public void testMinCanUseKeyColumn() { PlanChecker.from(connectContext) - .analyze("select k2, min(k3) from t group by k3") + .analyze("select k2, min(k3) from t group by k2") .applyTopDown(new SelectMaterializedIndexWithAggregate()) .applyTopDown(new SelectMaterializedIndexWithoutAggregate()) .matches(logicalOlapScan().when(scan -> { @@ -293,6 +291,34 @@ public void testMinCanUseKeyColumn() { })); } + @Test + public void testMinMaxCanUseKeyColumnWithBaseTable() { + PlanChecker.from(connectContext) + .analyze("select k1, min(k2), max(k2) from t group by k1") + .applyTopDown(new SelectMaterializedIndexWithAggregate()) + .applyTopDown(new SelectMaterializedIndexWithoutAggregate()) + .matches(logicalOlapScan().when(scan -> { + PreAggStatus preAgg = scan.getPreAggStatus(); + Assertions.assertTrue(preAgg.isOn()); + Assertions.assertEquals("t", scan.getSelectedMaterializedIndexName().get()); + return true; + })); + } + + @Test + public void testFilterAggWithBaseTable() { + PlanChecker.from(connectContext) + .analyze("select k1 from t where k1 = 0 group by k1") + .applyTopDown(new SelectMaterializedIndexWithAggregate()) + .applyTopDown(new SelectMaterializedIndexWithoutAggregate()) + .matches(logicalOlapScan().when(scan -> { + PreAggStatus preAgg = scan.getPreAggStatus(); + Assertions.assertTrue(preAgg.isOn()); + Assertions.assertEquals("t", scan.getSelectedMaterializedIndexName().get()); + return true; + })); + } + @Test public void testDuplicatePreAggOn() { PlanChecker.from(connectContext) diff --git a/regression-test/data/nereids_p0/explain/test_scan_preaggregation_explain.out b/regression-test/data/nereids_p0/explain/test_scan_preaggregation_explain.out new file mode 100644 index 00000000000000..bbabaedc69ee1c --- /dev/null +++ b/regression-test/data/nereids_p0/explain/test_scan_preaggregation_explain.out @@ -0,0 +1,7 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !right_when_preagg_on -- +1 2 6 + +-- !right_when_preagg_off -- +1 7 + diff --git a/regression-test/suites/nereids_p0/explain/test_scan_preaggregation_explain.groovy b/regression-test/suites/nereids_p0/explain/test_scan_preaggregation_explain.groovy new file mode 100644 index 00000000000000..9754d08f55b95a --- /dev/null +++ b/regression-test/suites/nereids_p0/explain/test_scan_preaggregation_explain.groovy @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_scan_preaggregation_explain") { + sql "SET enable_nereids_planner=true" + sql "SET enable_fallback_to_original_planner=false" + sql "use nereids_test_query_db" + + sql "DROP TABLE IF EXISTS test_scan_preaggregation" + sql """ CREATE TABLE `test_scan_preaggregation` ( + `k1` int(11) NULL, + `k2` int(11) NULL, + `k3` int(11) NULL, + `v1` int(11) SUM NULL + ) ENGINE=OLAP + AGGREGATE KEY(`k1`, `k2`, `k3`) + COMMENT 'OLAP' + DISTRIBUTED BY HASH(`k1`) BUCKETS 3 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "in_memory" = "false", + "storage_format" = "V2", + "disable_auto_compaction" = "false" + ); """ + sql """ insert into test_scan_preaggregation values + (1, 2, 3, 4), + (1, 5, 6, 7); """ + explain { + sql("select k1, min(k2), max(k3) from test_scan_preaggregation where k1 = 0 group by k1;") + contains "PREAGGREGATION: ON" + } + + qt_right_when_preagg_on "select k1, min(k2), max(k3) from test_scan_preaggregation where k1 = 1 group by k1;" + qt_right_when_preagg_off "select k1, sum(k2) from test_scan_preaggregation group by k1;" +}