Skip to content

Commit

Permalink
[fix](nereids) Use correct PREAGGREGATION in agg(filter(scan)) (#33454)
Browse files Browse the repository at this point in the history
1. set `PreAggStatus` to `ON` when agg key column by max or min;
2. #28747 may change `PreAggStatus` of scan, inherit it from the previous one.
  • Loading branch information
liutang123 authored and dataroaring committed Apr 24, 2024
1 parent db4f978 commit f0eb359
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ public List<Rule> buildRules() {
filter.getExpressions(), project.getExpressions()
))
);
if (mvPlanWithoutAgg.getSelectedIndexId() == result.indexId) {
mvPlanWithoutAgg = mvPlanWithoutAgg.withPreAggStatus(result.preAggStatus);
}
SlotContext slotContextWithoutAgg = generateBaseScanExprToMvExpr(mvPlanWithoutAgg);

return agg.withChildren(new LogicalProject(
Expand Down Expand Up @@ -745,22 +748,22 @@ public PreAggStatus visit(Expression expr, CheckContext context) {

@Override
public PreAggStatus visitAggregateFunction(AggregateFunction aggregateFunction, CheckContext context) {
return checkAggFunc(aggregateFunction, AggregateType.NONE, context);
return checkAggFunc(aggregateFunction, AggregateType.NONE, context, false);
}

@Override
public PreAggStatus visitMax(Max max, CheckContext context) {
return checkAggFunc(max, AggregateType.MAX, context);
return checkAggFunc(max, AggregateType.MAX, context, true);
}

@Override
public PreAggStatus visitMin(Min min, CheckContext context) {
return checkAggFunc(min, AggregateType.MIN, context);
return checkAggFunc(min, AggregateType.MIN, context, true);
}

@Override
public PreAggStatus visitSum(Sum sum, CheckContext context) {
return checkAggFunc(sum, AggregateType.SUM, context);
return checkAggFunc(sum, AggregateType.SUM, context, false);
}

@Override
Expand Down Expand Up @@ -829,15 +832,16 @@ public PreAggStatus visitHllUnion(HllUnion hllUnion, CheckContext context) {
private PreAggStatus checkAggFunc(
AggregateFunction aggFunc,
AggregateType matchingAggType,
CheckContext ctx) {
CheckContext ctx,
boolean canUseKeyColumn) {
String childNameWithFuncName = ctx.isBaseIndex()
? normalizeName(aggFunc.child(0).toSql())
: normalizeName(CreateMaterializedViewStmt.mvColumnBuilder(
matchingAggType, normalizeName(aggFunc.child(0).toSql())));

boolean contains = containsAllColumn(aggFunc.child(0), ctx.keyNameToColumn.keySet());
if (contains || ctx.keyNameToColumn.containsKey(childNameWithFuncName)) {
if (ctx.isDupKeysOrMergeOnWrite || (!ctx.isBaseIndex() && contains)) {
if (canUseKeyColumn || ctx.isDupKeysOrMergeOnWrite || (!ctx.isBaseIndex() && contains)) {
return PreAggStatus.on();
} else {
Column column = ctx.keyNameToColumn.get(childNameWithFuncName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

/**
* Pre-aggregate status for OLAP scan table.
* @see org.apache.doris.planner.OlapScanNode#isPreAggregation
*/
public class PreAggStatus {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;

import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -429,4 +430,15 @@ private List<SlotReference> createSlotsVectorized(List<Column> columns) {
}
return (List) Arrays.asList(slots);
}

@Override
public JSONObject toJson() {
JSONObject olapScan = super.toJson();
JSONObject properties = new JSONObject();
properties.put("OlapTable", table.getName());
properties.put("SelectedIndexId", Long.toString(selectedIndexId));
properties.put("PreAggStatus", preAggStatus.toString());
olapScan.put("Properties", properties);
return olapScan;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ public class OlapScanNode extends ScanNode {
* When the field value is ON, the storage engine can return the data directly
* without pre-aggregation.
* When the field value is OFF, the storage engine needs to aggregate the data
* before returning to scan node.
* before returning to scan node. And if the table is an aggregation table,
* all key columns need to be read an participate in aggregation.
* For example:
* Aggregate table: k1, k2, v1 sum
* Field value is ON
Expand All @@ -135,7 +136,9 @@ public class OlapScanNode extends ScanNode {
* Query2: select k1, min(v1) from table group by k1
* This aggregation function in query is min which different from the schema.
* So the data stored in storage engine need to be merged firstly before
* returning to scan node.
* returning to scan node. Although we only queried key column k1, key column
* k2 still needs to be detected and participate in aggregation to ensure the
* results are correct.
*
* There are currently two places to modify this variable:
* 1. The turnOffPreAgg() method of SingleNodePlanner.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,11 +263,10 @@ public void testKeyColumnInAggFunction() {
}));
}

@Disabled("reopen it if we fix rollup select bugs")
@Test
public void testMaxCanUseKeyColumn() {
PlanChecker.from(connectContext)
.analyze("select k2, max(k3) from t group by k3")
.analyze("select k2, max(k3) from t group by k2")
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
Expand All @@ -278,11 +277,10 @@ public void testMaxCanUseKeyColumn() {
}));
}

@Disabled("reopen it if we fix rollup select bugs")
@Test
public void testMinCanUseKeyColumn() {
PlanChecker.from(connectContext)
.analyze("select k2, min(k3) from t group by k3")
.analyze("select k2, min(k3) from t group by k2")
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
Expand All @@ -293,6 +291,34 @@ public void testMinCanUseKeyColumn() {
}));
}

@Test
public void testMinMaxCanUseKeyColumnWithBaseTable() {
PlanChecker.from(connectContext)
.analyze("select k1, min(k2), max(k2) from t group by k1")
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOn());
Assertions.assertEquals("t", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}

@Test
public void testFilterAggWithBaseTable() {
PlanChecker.from(connectContext)
.analyze("select k1 from t where k1 = 0 group by k1")
.applyTopDown(new SelectMaterializedIndexWithAggregate())
.applyTopDown(new SelectMaterializedIndexWithoutAggregate())
.matches(logicalOlapScan().when(scan -> {
PreAggStatus preAgg = scan.getPreAggStatus();
Assertions.assertTrue(preAgg.isOn());
Assertions.assertEquals("t", scan.getSelectedMaterializedIndexName().get());
return true;
}));
}

@Test
public void testDuplicatePreAggOn() {
PlanChecker.from(connectContext)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !right_when_preagg_on --
1 2 6

-- !right_when_preagg_off --
1 7

Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("test_scan_preaggregation_explain") {
sql "SET enable_nereids_planner=true"
sql "SET enable_fallback_to_original_planner=false"
sql "use nereids_test_query_db"

sql "DROP TABLE IF EXISTS test_scan_preaggregation"
sql """ CREATE TABLE `test_scan_preaggregation` (
`k1` int(11) NULL,
`k2` int(11) NULL,
`k3` int(11) NULL,
`v1` int(11) SUM NULL
) ENGINE=OLAP
AGGREGATE KEY(`k1`, `k2`, `k3`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`k1`) BUCKETS 3
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2",
"disable_auto_compaction" = "false"
); """
sql """ insert into test_scan_preaggregation values
(1, 2, 3, 4),
(1, 5, 6, 7); """
explain {
sql("select k1, min(k2), max(k3) from test_scan_preaggregation where k1 = 0 group by k1;")
contains "PREAGGREGATION: ON"
}

qt_right_when_preagg_on "select k1, min(k2), max(k3) from test_scan_preaggregation where k1 = 1 group by k1;"
qt_right_when_preagg_off "select k1, sum(k2) from test_scan_preaggregation group by k1;"
}

0 comments on commit f0eb359

Please sign in to comment.