Skip to content

Commit

Permalink
[feature](shuffle) enable strict consistency dml by default (apache#3…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored and ByteYue committed May 15, 2024
1 parent b3f68e3 commit b1384fa
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -636,6 +636,12 @@ public class Config extends ConfigBase {
varType = VariableAnnotation.EXPERIMENTAL)
public static boolean enable_single_replica_load = false;

@ConfField(mutable = true, masterOnly = true, description = {
"对于 tablet 数量小于该数目的 DUPLICATE KEY 表,将不会启用 shuffle",
"Shuffle won't be enabled for DUPLICATE KEY tables if its tablet num is lower than this number"},
varType = VariableAnnotation.EXPERIMENTAL)
public static int min_tablets_for_dup_table_shuffle = 64;

@ConfField(mutable = true, masterOnly = true, description = {
"单个数据库最大并发运行的事务数,包括 prepare 和 commit 事务。",
"Maximum concurrent running txn num including prepare, commit txns under a single db.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
import org.apache.doris.catalog.Database;
import org.apache.doris.catalog.DistributionInfo;
import org.apache.doris.catalog.HashDistributionInfo;
import org.apache.doris.catalog.KeysType;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.RandomDistributionInfo;
import org.apache.doris.common.Config;
import org.apache.doris.nereids.exceptions.AnalysisException;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.LogicalProperties;
Expand Down Expand Up @@ -199,6 +201,11 @@ public PhysicalProperties getRequirePhysicalProperties() {
if (targetTable.isPartitionDistributed()) {
DistributionInfo distributionInfo = targetTable.getDefaultDistributionInfo();
if (distributionInfo instanceof HashDistributionInfo) {
// Do not enable shuffle for duplicate key tables when its tablet num is less than threshold.
if (targetTable.getKeysType() == KeysType.DUP_KEYS
&& distributionInfo.getBucketNum() < Config.min_tablets_for_dup_table_shuffle) {
return PhysicalProperties.ANY;
}
return PhysicalProperties.TABLET_ID_SHUFFLE;
} else if (distributionInfo instanceof RandomDistributionInfo) {
return PhysicalProperties.ANY;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ public class SessionVariable implements Serializable, Writable {
public boolean enableNereidsDmlWithPipeline = true;

@VariableMgr.VarAttr(name = ENABLE_STRICT_CONSISTENCY_DML, needForward = true)
public boolean enableStrictConsistencyDml = false;
public boolean enableStrictConsistencyDml = true;

@VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableVectorizedEngine = true;
Expand Down Expand Up @@ -3527,6 +3527,14 @@ public void setDumpNereidsMemo(boolean dumpNereidsMemo) {
this.dumpNereidsMemo = dumpNereidsMemo;
}

public boolean isEnableStrictConsistencyDml() {
return this.enableStrictConsistencyDml;
}

public void setEnableStrictConsistencyDml(boolean value) {
this.enableStrictConsistencyDml = value;
}

public void disableStrictConsistencyDmlOnce() throws DdlException {
if (!enableStrictConsistencyDml) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3401,6 +3401,8 @@ public HttpStreamParams generateHttpStreamPlan(TUniqueId queryId) throws Excepti
try {
if (sessionVariable.isEnableNereidsPlanner()) {
try {
// disable shuffle for http stream (only 1 sink)
sessionVariable.disableStrictConsistencyDmlOnce();
httpStreamParams = generateHttpStreamNereidsPlan(queryId);
} catch (NereidsException | ParseException e) {
if (context.getMinidump() != null && context.getMinidump().toString(4) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,12 @@ public void testInsertPlan() throws Exception {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
}

// 1. should not contains exchange node in new planner
// 2. should not contains exchange node in new planner
v = connectContext.getSessionVariable().isEnableNereidsPlanner();
boolean v2 = connectContext.getSessionVariable().isEnableStrictConsistencyDml();
try {
connectContext.getSessionVariable().setEnableNereidsPlanner(true);
connectContext.getSessionVariable().setEnableStrictConsistencyDml(false);
String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
stmtExecutor1.execute();
Expand All @@ -713,6 +715,24 @@ public void testInsertPlan() throws Exception {
Assertions.assertFalse(plan1.contains("VEXCHANGE"));
} finally {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
connectContext.getSessionVariable().setEnableStrictConsistencyDml(v2);
}

// 3. should contain exchange node in new planner if enable strict consistency dml
v = connectContext.getSessionVariable().isEnableNereidsPlanner();
v2 = connectContext.getSessionVariable().isEnableStrictConsistencyDml();
try {
connectContext.getSessionVariable().setEnableNereidsPlanner(true);
connectContext.getSessionVariable().setEnableStrictConsistencyDml(true);
String sql1 = "explain insert into db1.tbl1 select * from db1.tbl1";
StmtExecutor stmtExecutor1 = new StmtExecutor(connectContext, sql1);
stmtExecutor1.execute();
Planner planner1 = stmtExecutor1.planner();
String plan1 = planner1.getExplainString(new ExplainOptions(false, false, false));
Assertions.assertTrue(plan1.contains("VEXCHANGE"));
} finally {
connectContext.getSessionVariable().setEnableNereidsPlanner(v);
connectContext.getSessionVariable().setEnableStrictConsistencyDml(v2);
}
}
}

0 comments on commit b1384fa

Please sign in to comment.