Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix](mtmv) Disable sql_limit variable when query rewrite by materialize view #40106

Merged
merged 11 commits into from
Oct 11, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.properties.PhysicalProperties;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.commands.ExplainCommand.ExplainLevel;
import org.apache.doris.nereids.trees.plans.commands.info.CreateMTMVInfo;
Expand All @@ -39,6 +40,7 @@
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;

import java.util.List;
Expand All @@ -55,6 +57,9 @@ public static ConnectContext createMTMVContext(MTMV mtmv) {
ctx.getState().reset();
ctx.setThreadLocalInfo();
ctx.getSessionVariable().allowModifyMaterializedViewData = true;
// Disable add default limit rule to avoid refresh data wrong
ctx.getSessionVariable().setDisableNereidsRules(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the default values are changed in the future, there may be issues here. Can we use an additional method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default value of disableNereidsRules session should always be empty ''

String.join(",", ImmutableSet.of(RuleType.ADD_DEFAULT_LIMIT.name())));
Optional<String> workloadGroup = mtmv.getWorkloadGroup();
if (workloadGroup.isPresent()) {
ctx.getSessionVariable().setWorkloadGroup(workloadGroup.get());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.qe.OriginStatement;
import org.apache.doris.qe.SessionVariable;

import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -267,11 +268,22 @@ public static Plan rewriteByRules(
CascadesContext rewrittenPlanContext = CascadesContext.initContext(
cascadesContext.getStatementContext(), rewrittenPlan,
cascadesContext.getCurrentJobContext().getRequiredProperties());
// Tmp old disable rule variable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

set disable rules here, not everywhere

Set<String> oldDisableRuleNames = rewrittenPlanContext.getStatementContext().getConnectContext()
.getSessionVariable()
.getDisableNereidsRuleNames();
rewrittenPlanContext.getStatementContext().getConnectContext().getSessionVariable()
.setDisableNereidsRules(String.join(",", ImmutableSet.of(RuleType.ADD_DEFAULT_LIMIT.name())));
rewrittenPlanContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
try {
rewrittenPlanContext.getConnectContext().setSkipAuth(true);
rewrittenPlan = planRewriter.apply(rewrittenPlanContext);
} finally {
rewrittenPlanContext.getConnectContext().setSkipAuth(false);
// Recover old disable rules variable
rewrittenPlanContext.getStatementContext().getConnectContext().getSessionVariable()
.setDisableNereidsRules(String.join(",", oldDisableRuleNames));
rewrittenPlanContext.getStatementContext().invalidCache(SessionVariable.DISABLE_NEREIDS_RULES);
}
Map<ExprId, Slot> exprIdToNewRewrittenSlot = Maps.newLinkedHashMap();
for (Slot slot : rewrittenPlan.getOutput()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -502,7 +503,7 @@ AGGREGATE KEY (siteid,citycode,username)
if (getTable() instanceof MTMV) {
MTMV mtmv = (MTMV) getTable();
MTMVCache cache = mtmv.getCache();
if (cache == null) {
if (cache == null || this.getSelectedIndexId() != this.getTable().getBaseIndexId()) {
return;
}
Plan originalPlan = cache.getOriginalPlan();
Expand All @@ -528,7 +529,7 @@ public void computeUniform(DataTrait.Builder builder) {
if (getTable() instanceof MTMV) {
MTMV mtmv = (MTMV) getTable();
MTMVCache cache = mtmv.getCache();
if (cache == null) {
if (cache == null || this.getSelectedIndexId() != this.getTable().getBaseIndexId()) {
return;
}
Plan originalPlan = cache.getOriginalPlan();
Expand All @@ -542,7 +543,7 @@ public void computeEqualSet(DataTrait.Builder builder) {
if (getTable() instanceof MTMV) {
MTMV mtmv = (MTMV) getTable();
MTMVCache cache = mtmv.getCache();
if (cache == null) {
if (cache == null || this.getSelectedIndexId() != this.getTable().getBaseIndexId()) {
return;
}
Plan originalPlan = cache.getOriginalPlan();
Expand All @@ -556,7 +557,7 @@ public void computeFd(DataTrait.Builder builder) {
if (getTable() instanceof MTMV) {
MTMV mtmv = (MTMV) getTable();
MTMVCache cache = mtmv.getCache();
if (cache == null) {
if (cache == null || this.getSelectedIndexId() != this.getTable().getBaseIndexId()) {
return;
}
Plan originalPlan = cache.getOriginalPlan();
Expand All @@ -567,9 +568,23 @@ public void computeFd(DataTrait.Builder builder) {

Map<Slot, Slot> constructReplaceMap(MTMV mtmv) {
Map<Slot, Slot> replaceMap = new HashMap<>();
List<Slot> originOutputs = mtmv.getCache().getOriginalPlan().getOutput();
for (int i = 0; i < getOutput().size(); i++) {
replaceMap.put(originOutputs.get(i), getOutput().get(i));
// Need remove invisible column, and then mapping them
List<Slot> originOutputs = new ArrayList<>();
for (Slot originSlot : mtmv.getCache().getOriginalPlan().getOutput()) {
if (!(originSlot instanceof SlotReference) || (((SlotReference) originSlot).isVisible())) {
originOutputs.add(originSlot);
}
}
List<Slot> targetOutputs = new ArrayList<>();
for (Slot targeSlot : getOutput()) {
if (!(targeSlot instanceof SlotReference) || (((SlotReference) targeSlot).isVisible())) {
targetOutputs.add(targeSlot);
}
}
Preconditions.checkArgument(originOutputs.size() == targetOutputs.size(),
"constructReplaceMap, the size of originOutputs and targetOutputs should be same");
for (int i = 0; i < targetOutputs.size(); i++) {
replaceMap.put(originOutputs.get(i), targetOutputs.get(i));
}
return replaceMap;
}
Expand Down
41 changes: 41 additions & 0 deletions regression-test/data/mtmv_p0/limit/refresh_with_sql_limit.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query_mv_1 --
1 1 o 10.50 2023-12-08 a b 1 yy
1 1 o 10.50 2023-12-08 a b 1 yy
1 1 o 10.50 2023-12-08 a b 1 yy
1 1 o 9.50 2023-12-08 a b 1 yy
2 1 o 11.50 2023-12-09 a b 1 yy
2 1 o 11.50 2023-12-09 a b 1 yy
2 1 o 11.50 2023-12-09 a b 1 yy
3 1 o 12.50 2023-12-10 a b 1 yy
3 1 o 12.50 2023-12-10 a b 1 yy
3 1 o 12.50 2023-12-10 a b 1 yy
3 1 o 33.50 2023-12-10 a b 1 yy
4 2 o 43.20 2023-12-11 c d 2 mm
4 2 o 43.20 2023-12-11 c d 2 mm
4 2 o 43.20 2023-12-11 c d 2 mm
5 2 o 1.20 2023-12-12 c d 2 mi
5 2 o 56.20 2023-12-12 c d 2 mi
5 2 o 56.20 2023-12-12 c d 2 mi
5 2 o 56.20 2023-12-12 c d 2 mi

-- !query_mv_2 --
1 1 o 10.50 2023-12-08 a b 1 yy
1 1 o 10.50 2023-12-08 a b 1 yy
1 1 o 10.50 2023-12-08 a b 1 yy
1 1 o 9.50 2023-12-08 a b 1 yy
2 1 o 11.50 2023-12-09 a b 1 yy
2 1 o 11.50 2023-12-09 a b 1 yy
2 1 o 11.50 2023-12-09 a b 1 yy
3 1 o 12.50 2023-12-10 a b 1 yy
3 1 o 12.50 2023-12-10 a b 1 yy
3 1 o 12.50 2023-12-10 a b 1 yy
3 1 o 33.50 2023-12-10 a b 1 yy
4 2 o 43.20 2023-12-11 c d 2 mm
4 2 o 43.20 2023-12-11 c d 2 mm
4 2 o 43.20 2023-12-11 c d 2 mm
5 2 o 1.20 2023-12-12 c d 2 mi
5 2 o 56.20 2023-12-12 c d 2 mi
5 2 o 56.20 2023-12-12 c d 2 mi
5 2 o 56.20 2023-12-12 c d 2 mi

Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !query1_0_before --
4

-- !query1_0_after --
4

-- !query2_0_before --
4

-- !query2_0_after --
4

-- !query3_0_before --
4

-- !query3_0_after --
4

Original file line number Diff line number Diff line change
Expand Up @@ -1329,6 +1329,8 @@ class Suite implements GroovyInterceptable {
logger.info("status is not success")
}
Assert.assertEquals("SUCCESS", status)
logger.info("waitingMTMVTaskFinished analyze mv name is " + result.last().get(5))
sql "analyze table ${result.last().get(6)}.${mvName} with sync;"
}

void waitingMTMVTaskFinishedByMvNameAllowCancel(String mvName) {
Expand Down Expand Up @@ -1399,6 +1401,9 @@ class Suite implements GroovyInterceptable {
logger.info("status is not success")
}
Assert.assertEquals("SUCCESS", status)
// Need to analyze materialized view for cbo to choose the materialized view accurately
logger.info("waitingMTMVTaskFinished analyze mv name is " + result.last().get(5))
sql "analyze table ${result.last().get(6)}.${result.last().get(5)} with sync;"
}

void waitingMTMVTaskFinishedNotNeedSuccess(String jobName) {
Expand Down
115 changes: 115 additions & 0 deletions regression-test/suites/mtmv_p0/limit/refresh_with_sql_limit.groovy
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package limit
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

suite("refresh_with_sql_limit") {
String db = context.config.getDbNameByFile(context.file)
sql "use ${db}"
sql "set runtime_filter_mode=OFF";
sql "SET ignore_shape_nodes='PhysicalDistribute,PhysicalProject'"

sql """
drop table if exists orders;
"""

sql """
CREATE TABLE IF NOT EXISTS orders (
o_orderkey INTEGER NOT NULL,
o_custkey INTEGER NOT NULL,
o_orderstatus CHAR(1) NOT NULL,
o_totalprice DECIMALV3(15,2) NOT NULL,
o_orderdate DATE NOT NULL,
o_orderpriority CHAR(15) NOT NULL,
o_clerk CHAR(15) NOT NULL,
o_shippriority INTEGER NOT NULL,
O_COMMENT VARCHAR(79) NOT NULL
)
DUPLICATE KEY(o_orderkey, o_custkey)
PARTITION BY RANGE(o_orderdate) (
PARTITION `day_2` VALUES LESS THAN ('2023-12-9'),
PARTITION `day_3` VALUES LESS THAN ("2023-12-11"),
PARTITION `day_4` VALUES LESS THAN ("2023-12-30")
)
DISTRIBUTED BY HASH(o_orderkey) BUCKETS 3
PROPERTIES (
"replication_num" = "1"
);
"""

sql """
insert into orders values
(1, 1, 'o', 9.5, '2023-12-08', 'a', 'b', 1, 'yy'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy'),
(1, 1, 'o', 10.5, '2023-12-08', 'a', 'b', 1, 'yy'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy'),
(2, 1, 'o', 11.5, '2023-12-09', 'a', 'b', 1, 'yy'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy'),
(3, 1, 'o', 12.5, '2023-12-10', 'a', 'b', 1, 'yy'),
(3, 1, 'o', 33.5, '2023-12-10', 'a', 'b', 1, 'yy'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm'),
(4, 2, 'o', 43.2, '2023-12-11', 'c','d',2, 'mm'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi'),
(5, 2, 'o', 56.2, '2023-12-12', 'c','d',2, 'mi'),
(5, 2, 'o', 1.2, '2023-12-12', 'c','d',2, 'mi');
"""
sql """analyze table orders with sync"""


sql """DROP MATERIALIZED VIEW IF EXISTS mv_1"""
sql """set default_order_by_limit = 2;"""
sql """set sql_select_limit = 2;"""
sql"""
CREATE MATERIALIZED VIEW mv_1
BUILD DEFERRED REFRESH COMPLETE ON MANUAL
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS select * from orders;
"""
sql """refresh materialized view mv_1 auto;"""
def job_name = getJobName(db, "mv_1");
waitingMTMVTaskFinished(job_name)

// Reset and test mv data is right or not
sql """set default_order_by_limit = -1;"""
sql """set sql_select_limit = -1;"""
order_qt_query_mv_1 "select * from mv_1"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv_1"""


sql """DROP MATERIALIZED VIEW IF EXISTS mv_2"""
sql """set default_order_by_limit = 2"""
sql """set sql_select_limit = 2"""
sql"""
CREATE MATERIALIZED VIEW mv_2
BUILD IMMEDIATE REFRESH COMPLETE ON MANUAL
DISTRIBUTED BY RANDOM BUCKETS 2
PROPERTIES ('replication_num' = '1')
AS select * from orders;
"""
waitingMTMVTaskFinished(getJobName(db, "mv_2"))

// Reset and test mv data is right or not
sql """set default_order_by_limit = -1;"""
sql """set sql_select_limit = -1;"""
order_qt_query_mv_2 "select * from mv_2"
sql """ DROP MATERIALIZED VIEW IF EXISTS mv_2"""
}
Loading
Loading