Skip to content

Commit

Permalink
[Feat](nereids) when dealing insert into stmt with empty table source…
Browse files Browse the repository at this point in the history
…, fe returns directly (apache#34418)

When a LogicalOlapScan has no partitions, transform it to a LogicalEmptyRelation.
When dealing insert into stmt with empty table source, fe returns directly.
  • Loading branch information
feiniaofeiafei committed May 24, 2024
1 parent bb3a0fd commit 906140f
Show file tree
Hide file tree
Showing 774 changed files with 1,440 additions and 176 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.qe.ConnectContext;

import java.util.List;

/**
* Used to prune empty partition.
Expand All @@ -32,7 +36,12 @@ public Rule build() {
return logicalOlapScan().thenApply(ctx -> {
LogicalOlapScan scan = ctx.root;
OlapTable table = scan.getTable();
return scan.withSelectedPartitionIds(table.selectNonEmptyPartitionIds(scan.getSelectedPartitionIds()));
List<Long> ids = table.selectNonEmptyPartitionIds(scan.getSelectedPartitionIds());
if (ids.isEmpty()) {
return new LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(),
scan.getOutput());
}
return scan.withSelectedPartitionIds(ids);
}).toRule(RuleType.PRUNE_EMPTY_PARTITION);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@
*/
public class CreateMTMVInfo {
public static final Logger LOG = LogManager.getLogger(CreateMTMVInfo.class);
public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE";
public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE,PRUNE_EMPTY_PARTITION";
private final boolean ifNotExists;
private final TableNameInfo mvName;
private List<String> keys;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,18 +58,20 @@ public abstract class AbstractInsertExecutor {

protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
protected final boolean emptyInsert;

/**
* Constructor
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
this.ctx = ctx;
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.labelName = labelName;
this.table = table;
this.database = table.getDatabase();
this.insertCtx = insertCtx;
this.emptyInsert = emptyInsert;
}

public Coordinator getCoordinator() {
Expand Down Expand Up @@ -189,4 +191,8 @@ public void executeSingleInsert(StmtExecutor executor, long jobId) {
}
afterExec(executor);
}

public boolean isEmptyInsert() {
return emptyInsert;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExec
*/
public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
catalogName = table.getCatalog().getName();
transactionManager = table.getCatalog().getTransactionManager();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
*/
public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor {
*/
public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,14 @@
import org.apache.doris.nereids.analyzer.UnboundTableSink;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.glue.LogicalPlanAdapter;
import org.apache.doris.nereids.trees.TreeNode;
import org.apache.doris.nereids.trees.plans.Explainable;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.PlanType;
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
Expand Down Expand Up @@ -165,24 +167,28 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
// return;
throw new AnalysisException("group commit is not supported in Nereids now");
}
boolean emptyInsert = leafIsEmptyRelation(physicalSink);
OlapTable olapTable = (OlapTable) targetTableIf;
// the insertCtx contains some variables to adjust SinkNode
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx);
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);

boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
: false;
insertExecutor.getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
} else if (physicalSink instanceof PhysicalHiveTableSink) {
boolean emptyInsert = leafIsEmptyRelation(physicalSink);
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))));
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert);
// set hive query options
} else if (physicalSink instanceof PhysicalIcebergTableSink) {
boolean emptyInsert = leafIsEmptyRelation(physicalSink);
IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner,
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))));
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support [olap, hive, iceberg] table");
Expand All @@ -203,6 +209,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor

private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
// if the insert stmt data source is empty, directly return, no need to be executed.
if (insertExecutor.isEmptyInsert()) {
return;
}
insertExecutor.executeSingleInsert(executor, jobId);
}

Expand All @@ -219,4 +229,16 @@ public Plan getExplainPlan(ConnectContext ctx) {
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertIntoTableCommand(this, context);
}

private boolean leafIsEmptyRelation(TreeNode<Plan> node) {
if (node.children() == null || node.children().isEmpty()) {
return node instanceof PhysicalEmptyRelation;
}
for (TreeNode<Plan> child : node.children()) {
if (!leafIsEmptyRelation(child)) {
return false;
}
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
* constructor
*/
public OlapInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

public long getTxnId() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ public void testImplicitConvertSupport() throws Exception {

@Test
public void testDeleteSign() throws Exception {
String sql1 = "SELECT /*+ SET_VAR(enable_nereids_planner=true, ENABLE_FALLBACK_TO_ORIGINAL_PLANNER=false) */ * FROM db1.table1 LEFT ANTI JOIN db1.table2 ON db1.table1.siteid = db1.table2.siteid;";
String sql1 = "SELECT /*+ SET_VAR(enable_nereids_planner=true, ENABLE_FALLBACK_TO_ORIGINAL_PLANNER=false, DISABLE_NEREIDS_RULES=PRUNE_EMPTY_PARTITION) */ * FROM db1.table1 LEFT ANTI JOIN db1.table2 ON db1.table1.siteid = db1.table2.siteid;";
String explain = dorisAssert.query(sql1).explainQuery();
Assert.assertTrue(explain
.contains("__DORIS_DELETE_SIGN__ = 0"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public static void beforeClass() throws Exception {
UtFrameUtils.createDorisCluster(runningDir);
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
// create database
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class DistributeHintTest extends TestWithFeService implements MemoPatternMatchSu
protected void runBeforeAll() throws Exception {
createDatabase("test");
useDatabase("test");
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");

createTable("CREATE TABLE `t1` (\n"
+ " `a` int(11) NULL,\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
class CompareOuterJoinTest extends SqlTestBase {
@Test
void testStarGraphWithInnerJoin() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
// t2
// |
//t3-- t1 -- t4
Expand Down Expand Up @@ -72,6 +73,7 @@ void testStarGraphWithInnerJoin() {

@Test
void testRandomQuery() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
Plan p1 = new HyperGraphBuilder(Sets.newHashSet(JoinType.INNER_JOIN))
.randomBuildPlanWith(3, 3);
p1 = PlanChecker.from(connectContext, p1)
Expand All @@ -91,7 +93,7 @@ void testRandomQuery() {

@Test
void testInnerJoinWithFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
Expand All @@ -118,7 +120,7 @@ void testInnerJoinWithFilter() {

@Test
void testInnerJoinWithFilter2() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
Expand All @@ -144,12 +146,11 @@ void testInnerJoinWithFilter2() {

@Test
void testLeftOuterJoinWithLeftFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from ( select * from T1 where T1.id = 0) T1 left outer join T2 on T1.id = T2.id",
connectContext
);
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
Plan p1 = PlanChecker.from(c1)
.analyze()
.rewrite()
Expand All @@ -172,12 +173,11 @@ void testLeftOuterJoinWithLeftFilter() {

@Test
void testLeftOuterJoinWithRightFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 left outer join ( select * from T2 where T2.id = 0) T2 on T1.id = T2.id",
connectContext
);
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
Plan p1 = PlanChecker.from(c1)
.analyze()
.rewrite()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@
class InferJoinTest extends SqlTestBase {
@Test
void testInnerInferLeft() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
Expand Down Expand Up @@ -70,7 +70,7 @@ void testInnerInferLeft() {

@Test
void testInnerInferLeftWithFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
Expand Down Expand Up @@ -103,7 +103,7 @@ void testInnerInferLeftWithFilter() {
@Disabled
@Test
void testInnerInferLeftWithJoinCond() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join "
+ "(select T2.id from T2 inner join T3 on T2.id = T3.id) T2 "
Expand Down Expand Up @@ -137,12 +137,11 @@ void testInnerInferLeftWithJoinCond() {

@Test
void testLeftOuterJoinWithRightFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 left outer join ( select * from T2 where T2.id = 0) T2 on T1.id = T2.id",
connectContext
);
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
Plan p1 = PlanChecker.from(c1)
.analyze()
.rewrite()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
class InferPredicateTest extends SqlTestBase {
@Test
void testPullUpQueryFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 left join T2 on T1.id = T2.id where T1.id = 1",
connectContext
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
class PullupExpressionTest extends SqlTestBase {
@Test
void testPullUpQueryFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id where T1.id = 1",
connectContext
Expand All @@ -64,6 +65,7 @@ void testPullUpQueryFilter() {

@Test
void testPullUpQueryJoinCondition() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id and T1.score = T2.score",
connectContext
Expand All @@ -90,6 +92,7 @@ void testPullUpQueryJoinCondition() {

@Test
void testPullUpViewFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id",
connectContext
Expand Down Expand Up @@ -117,6 +120,7 @@ void testPullUpViewFilter() {

@Test
void testPullUpViewJoinCondition() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id ",
connectContext
Expand Down
Loading

0 comments on commit 906140f

Please sign in to comment.