Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat](nereids)when dealing insert into stmt with empty table source, fe returns directly #35333

Merged
merged 2 commits into from
May 24, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
@@ -20,7 +20,11 @@
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.trees.plans.logical.LogicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.qe.ConnectContext;

import java.util.List;

/**
* Used to prune empty partition.
@@ -32,7 +36,12 @@ public Rule build() {
return logicalOlapScan().thenApply(ctx -> {
LogicalOlapScan scan = ctx.root;
OlapTable table = scan.getTable();
return scan.withSelectedPartitionIds(table.selectNonEmptyPartitionIds(scan.getSelectedPartitionIds()));
List<Long> ids = table.selectNonEmptyPartitionIds(scan.getSelectedPartitionIds());
if (ids.isEmpty()) {
return new LogicalEmptyRelation(ConnectContext.get().getStatementContext().getNextRelationId(),
scan.getOutput());
}
return scan.withSelectedPartitionIds(ids);
}).toRule(RuleType.PRUNE_EMPTY_PARTITION);
}
}
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@
*/
public class CreateMTMVInfo {
public static final Logger LOG = LogManager.getLogger(CreateMTMVInfo.class);
public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE";
public static final String MTMV_PLANER_DISABLE_RULES = "OLAP_SCAN_PARTITION_PRUNE,PRUNE_EMPTY_PARTITION";
private final boolean ifNotExists;
private final TableNameInfo mvName;
private List<String> keys;
Original file line number Diff line number Diff line change
@@ -58,18 +58,20 @@ public abstract class AbstractInsertExecutor {

protected String errMsg = "";
protected Optional<InsertCommandContext> insertCtx;
protected final boolean emptyInsert;

/**
* Constructor
*/
public AbstractInsertExecutor(ConnectContext ctx, TableIf table, String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
this.ctx = ctx;
this.coordinator = new Coordinator(ctx, null, planner, ctx.getStatsErrorEstimator());
this.labelName = labelName;
this.table = table;
this.database = table.getDatabase();
this.insertCtx = insertCtx;
this.emptyInsert = emptyInsert;
}

public Coordinator getCoordinator() {
@@ -189,4 +191,8 @@ public void executeSingleInsert(StmtExecutor executor, long jobId) {
}
afterExec(executor);
}

public boolean isEmptyInsert() {
return emptyInsert;
}
}
Original file line number Diff line number Diff line change
@@ -59,8 +59,9 @@ public abstract class BaseExternalTableInsertExecutor extends AbstractInsertExec
*/
public BaseExternalTableInsertExecutor(ConnectContext ctx, ExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
catalogName = table.getCatalog().getName();
transactionManager = table.getCatalog().getTransactionManager();

Original file line number Diff line number Diff line change
@@ -43,8 +43,8 @@ public class HiveInsertExecutor extends BaseExternalTableInsertExecutor {
*/
public HiveInsertExecutor(ConnectContext ctx, HMSExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

@Override
Original file line number Diff line number Diff line change
@@ -40,8 +40,9 @@ public class IcebergInsertExecutor extends BaseExternalTableInsertExecutor {
*/
public IcebergInsertExecutor(ConnectContext ctx, IcebergExternalTable table,
String labelName, NereidsPlanner planner,
Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
Optional<InsertCommandContext> insertCtx,
boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

@Override
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@
import org.apache.doris.nereids.trees.plans.commands.Command;
import org.apache.doris.nereids.trees.plans.commands.ForwardWithSync;
import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
import org.apache.doris.nereids.trees.plans.physical.PhysicalEmptyRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalHiveTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalIcebergTableSink;
import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapTableSink;
@@ -165,24 +166,28 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor
// return;
throw new AnalysisException("group commit is not supported in Nereids now");
}
boolean emptyInsert = childIsEmptyRelation(physicalSink);
OlapTable olapTable = (OlapTable) targetTableIf;
// the insertCtx contains some variables to adjust SinkNode
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx);
insertExecutor = new OlapInsertExecutor(ctx, olapTable, label, planner, insertCtx, emptyInsert);

boolean isEnableMemtableOnSinkNode =
olapTable.getTableProperty().getUseSchemaLightChange()
? insertExecutor.getCoordinator().getQueryOptions().isEnableMemtableOnSinkNode()
: false;
insertExecutor.getCoordinator().getQueryOptions()
.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
} else if (physicalSink instanceof PhysicalHiveTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
HMSExternalTable hiveExternalTable = (HMSExternalTable) targetTableIf;
insertExecutor = new HiveInsertExecutor(ctx, hiveExternalTable, label, planner,
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))));
Optional.of(insertCtx.orElse((new HiveInsertCommandContext()))), emptyInsert);
// set hive query options
} else if (physicalSink instanceof PhysicalIcebergTableSink) {
boolean emptyInsert = childIsEmptyRelation(physicalSink);
IcebergExternalTable icebergExternalTable = (IcebergExternalTable) targetTableIf;
insertExecutor = new IcebergInsertExecutor(ctx, icebergExternalTable, label, planner,
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))));
Optional.of(insertCtx.orElse((new BaseExternalTableInsertCommandContext()))), emptyInsert);
} else {
// TODO: support other table types
throw new AnalysisException("insert into command only support [olap, hive, iceberg] table");
@@ -203,6 +208,10 @@ public AbstractInsertExecutor initPlan(ConnectContext ctx, StmtExecutor executor

private void runInternal(ConnectContext ctx, StmtExecutor executor) throws Exception {
AbstractInsertExecutor insertExecutor = initPlan(ctx, executor);
// if the insert stmt data source is empty, directly return, no need to be executed.
if (insertExecutor.isEmptyInsert()) {
return;
}
insertExecutor.executeSingleInsert(executor, jobId);
}

@@ -219,4 +228,12 @@ public Plan getExplainPlan(ConnectContext ctx) {
public <R, C> R accept(PlanVisitor<R, C> visitor, C context) {
return visitor.visitInsertIntoTableCommand(this, context);
}

private boolean childIsEmptyRelation(PhysicalSink sink) {
if (sink.children() != null && sink.children().size() == 1
&& sink.child(0) instanceof PhysicalEmptyRelation) {
return true;
}
return false;
}
}
Original file line number Diff line number Diff line change
@@ -76,8 +76,8 @@ public class OlapInsertExecutor extends AbstractInsertExecutor {
* constructor
*/
public OlapInsertExecutor(ConnectContext ctx, Table table,
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx) {
super(ctx, table, labelName, planner, insertCtx);
String labelName, NereidsPlanner planner, Optional<InsertCommandContext> insertCtx, boolean emptyInsert) {
super(ctx, table, labelName, planner, insertCtx, emptyInsert);
}

public long getTxnId() {
Original file line number Diff line number Diff line change
@@ -525,7 +525,7 @@ public void testImplicitConvertSupport() throws Exception {

@Test
public void testDeleteSign() throws Exception {
String sql1 = "SELECT /*+ SET_VAR(enable_nereids_planner=true, ENABLE_FALLBACK_TO_ORIGINAL_PLANNER=false) */ * FROM db1.table1 LEFT ANTI JOIN db1.table2 ON db1.table1.siteid = db1.table2.siteid;";
String sql1 = "SELECT /*+ SET_VAR(enable_nereids_planner=true, ENABLE_FALLBACK_TO_ORIGINAL_PLANNER=false, DISABLE_NEREIDS_RULES=PRUNE_EMPTY_PARTITION) */ * FROM db1.table1 LEFT ANTI JOIN db1.table2 ON db1.table1.siteid = db1.table2.siteid;";
String explain = dorisAssert.query(sql1).explainQuery();
Assert.assertTrue(explain
.contains("__DORIS_DELETE_SIGN__ = 0"));
Original file line number Diff line number Diff line change
@@ -45,6 +45,7 @@ public static void beforeClass() throws Exception {
UtFrameUtils.createDorisCluster(runningDir);
// create connect context
connectContext = UtFrameUtils.createDefaultCtx();
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
// create database
String createDbStmtStr = "create database test;";
CreateDbStmt createDbStmt = (CreateDbStmt) UtFrameUtils.parseAndAnalyzeStmt(createDbStmtStr, connectContext);
Original file line number Diff line number Diff line change
@@ -44,6 +44,7 @@ class DistributeHintTest extends TestWithFeService implements MemoPatternMatchSu
protected void runBeforeAll() throws Exception {
createDatabase("test");
useDatabase("test");
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");

createTable("CREATE TABLE `t1` (\n"
+ " `a` int(11) NULL,\n"
Original file line number Diff line number Diff line change
@@ -43,6 +43,7 @@
class CompareOuterJoinTest extends SqlTestBase {
@Test
void testStarGraphWithInnerJoin() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
// t2
// |
//t3-- t1 -- t4
@@ -72,6 +73,7 @@ void testStarGraphWithInnerJoin() {

@Test
void testRandomQuery() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
Plan p1 = new HyperGraphBuilder(Sets.newHashSet(JoinType.INNER_JOIN))
.randomBuildPlanWith(3, 3);
p1 = PlanChecker.from(connectContext, p1)
@@ -91,7 +93,7 @@ void testRandomQuery() {

@Test
void testInnerJoinWithFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
@@ -118,7 +120,7 @@ void testInnerJoinWithFilter() {

@Test
void testInnerJoinWithFilter2() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
@@ -144,12 +146,11 @@ void testInnerJoinWithFilter2() {

@Test
void testLeftOuterJoinWithLeftFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from ( select * from T1 where T1.id = 0) T1 left outer join T2 on T1.id = T2.id",
connectContext
);
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
Plan p1 = PlanChecker.from(c1)
.analyze()
.rewrite()
@@ -172,12 +173,11 @@ void testLeftOuterJoinWithLeftFilter() {

@Test
void testLeftOuterJoinWithRightFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 left outer join ( select * from T2 where T2.id = 0) T2 on T1.id = T2.id",
connectContext
);
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
Plan p1 = PlanChecker.from(c1)
.analyze()
.rewrite()
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@
class InferJoinTest extends SqlTestBase {
@Test
void testInnerInferLeft() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
@@ -70,7 +70,7 @@ void testInnerInferLeft() {

@Test
void testInnerInferLeftWithFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join T2 on T1.id = T2.id where T1.id = 0",
connectContext
@@ -103,7 +103,7 @@ void testInnerInferLeftWithFilter() {
@Disabled
@Test
void testInnerInferLeftWithJoinCond() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 inner join "
+ "(select T2.id from T2 inner join T3 on T2.id = T3.id) T2 "
@@ -137,12 +137,11 @@ void testInnerInferLeftWithJoinCond() {

@Test
void testLeftOuterJoinWithRightFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 left outer join ( select * from T2 where T2.id = 0) T2 on T1.id = T2.id",
connectContext
);
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES");
Plan p1 = PlanChecker.from(c1)
.analyze()
.rewrite()
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
class InferPredicateTest extends SqlTestBase {
@Test
void testPullUpQueryFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 left join T2 on T1.id = T2.id where T1.id = 1",
connectContext
Original file line number Diff line number Diff line change
@@ -38,6 +38,7 @@
class PullupExpressionTest extends SqlTestBase {
@Test
void testPullUpQueryFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id where T1.id = 1",
connectContext
@@ -64,6 +65,7 @@ void testPullUpQueryFilter() {

@Test
void testPullUpQueryJoinCondition() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id and T1.score = T2.score",
connectContext
@@ -90,6 +92,7 @@ void testPullUpQueryJoinCondition() {

@Test
void testPullUpViewFilter() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id",
connectContext
@@ -117,6 +120,7 @@ void testPullUpViewFilter() {

@Test
void testPullUpViewJoinCondition() {
connectContext.getSessionVariable().setDisableNereidsRules("INFER_PREDICATES,PRUNE_EMPTY_PARTITION");
CascadesContext c1 = createCascadesContext(
"select * from T1 join T2 on T1.id = T2.id ",
connectContext
Loading
Loading