Skip to content

Commit

Permalink
[fix](move-memtable) ignore single replica load when move memtable (#…
Browse files Browse the repository at this point in the history
…32845)


Co-authored-by: Xin Liao <liaoxinbit@126.com>
  • Loading branch information
2 people authored and dataroaring committed Apr 24, 2024
1 parent 677c4e5 commit a229380
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1023,13 +1023,16 @@ private DataSink createDataSink() throws AnalysisException {
}
if (targetTable instanceof OlapTable) {
OlapTableSink sink;
final boolean enableSingleReplicaLoad =
analyzer.getContext().getSessionVariable().isEnableMemtableOnSinkNode()
? false : analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert();
if (isGroupCommitStreamLoadSql) {
sink = new GroupCommitBlockSink((OlapTable) targetTable, olapTuple,
targetPartitionIds, analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert(),
targetPartitionIds, enableSingleReplicaLoad,
ConnectContext.get().getSessionVariable().getGroupCommit(), 0);
} else {
sink = new OlapTableSink((OlapTable) targetTable, olapTuple, targetPartitionIds,
analyzer.getContext().getSessionVariable().isEnableSingleReplicaInsert());
enableSingleReplicaLoad);
}
dataSink = sink;
sink.setPartialUpdateInputColumns(isPartialUpdate, partialUpdateCols);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void init(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusList,
this.loadId = loadId;
planner = new LoadingTaskPlanner(callback.getCallbackId(), txnId, db.getId(), table, brokerDesc, fileGroups,
strictMode, isPartialUpdate, timezone, this.timeoutS, this.loadParallelism, this.sendBatchParallelism,
this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink);
this.useNewLoadScanNode, userInfo, singleTabletLoadPerSink, enableMemTableOnSinkNode);
planner.plan(loadId, fileStatusList, fileNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class LoadingTaskPlanner {
private final int sendBatchParallelism;
private final boolean useNewLoadScanNode;
private final boolean singleTabletLoadPerSink;
private final boolean enableMemtableOnSinkNode;
private UserIdentity userInfo;
// Something useful
// ConnectContext here is just a dummy object to avoid some NPE problem, like ctx.getDatabase()
Expand All @@ -88,7 +89,7 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table
BrokerDesc brokerDesc, List<BrokerFileGroup> brokerFileGroups,
boolean strictMode, boolean isPartialUpdate, String timezone, long timeoutS, int loadParallelism,
int sendBatchParallelism, boolean useNewLoadScanNode, UserIdentity userInfo,
boolean singleTabletLoadPerSink) {
boolean singleTabletLoadPerSink, boolean enableMemtableOnSinkNode) {
this.loadJobId = loadJobId;
this.txnId = txnId;
this.dbId = dbId;
Expand All @@ -102,8 +103,9 @@ public LoadingTaskPlanner(Long loadJobId, long txnId, long dbId, OlapTable table
this.loadParallelism = loadParallelism;
this.sendBatchParallelism = sendBatchParallelism;
this.useNewLoadScanNode = useNewLoadScanNode;
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.userInfo = userInfo;
this.singleTabletLoadPerSink = singleTabletLoadPerSink;
this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
if (Env.getCurrentEnv().getAccessManager()
.checkDbPriv(userInfo, Env.getCurrentInternalCatalog().getDbNullable(dbId).getFullName(),
PrivPredicate.SELECT)) {
Expand Down Expand Up @@ -204,8 +206,10 @@ public void plan(TUniqueId loadId, List<List<TBrokerFileStatus>> fileStatusesLis

// 2. Olap table sink
List<Long> partitionIds = getAllPartitionIds();
final boolean enableSingleReplicaLoad = this.enableMemtableOnSinkNode
? false : Config.enable_single_replica_load;
OlapTableSink olapTableSink = new OlapTableSink(table, destTupleDesc, partitionIds,
Config.enable_single_replica_load);
enableSingleReplicaLoad);
long txnTimeout = timeoutS == 0 ? ConnectContext.get().getExecTimeout() : timeoutS;
olapTableSink.init(loadId, txnId, dbId, timeoutS, sendBatchParallelism, singleTabletLoadPerSink, strictMode,
txnTimeout);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,15 +262,20 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
timeout *= 2;
}

final boolean enableMemtableOnSinkNode =
destTable.getTableProperty().getUseSchemaLightChange()
? taskInfo.isMemtableOnSinkNode() : false;
final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
? false : Config.enable_single_replica_load;
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(),
enableSingleReplicaLoad, ((StreamLoadTask) taskInfo).getGroupCommit(),
taskInfo.getMaxFilterRatio());
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, enableSingleReplicaLoad);
}
int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() : timeout;
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout, taskInfo.getSendBatchParallelism(),
Expand Down Expand Up @@ -326,10 +331,7 @@ public TExecPlanFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
boolean isEnableMemtableOnSinkNode =
destTable.getTableProperty().getUseSchemaLightChange()
? taskInfo.isMemtableOnSinkNode() : false;
queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);
params.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
queryGlobals.setNowString(TimeUtils.DATETIME_FORMAT.format(LocalDateTime.now()));
Expand Down Expand Up @@ -493,15 +495,20 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns
timeout *= 2;
}

final boolean enableMemtableOnSinkNode =
destTable.getTableProperty().getUseSchemaLightChange()
? taskInfo.isMemtableOnSinkNode() : false;
final boolean enableSingleReplicaLoad = enableMemtableOnSinkNode
? false : Config.enable_single_replica_load;
// create dest sink
List<Long> partitionIds = getAllPartitionIds();
OlapTableSink olapTableSink;
if (taskInfo instanceof StreamLoadTask && ((StreamLoadTask) taskInfo).getGroupCommit() != null) {
olapTableSink = new GroupCommitBlockSink(destTable, tupleDesc, partitionIds,
Config.enable_single_replica_load, ((StreamLoadTask) taskInfo).getGroupCommit(),
enableSingleReplicaLoad, ((StreamLoadTask) taskInfo).getGroupCommit(),
taskInfo.getMaxFilterRatio());
} else {
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, Config.enable_single_replica_load);
olapTableSink = new OlapTableSink(destTable, tupleDesc, partitionIds, enableSingleReplicaLoad);
}
int txnTimeout = timeout == 0 ? ConnectContext.get().getExecTimeout() : timeout;
olapTableSink.init(loadId, taskInfo.getTxnId(), db.getId(), timeout,
Expand Down Expand Up @@ -560,10 +567,7 @@ public TPipelineFragmentParams planForPipeline(TUniqueId loadId, int fragmentIns
queryOptions.setEnablePipelineEngine(Config.enable_pipeline_load);
queryOptions.setBeExecVersion(Config.be_exec_version);
queryOptions.setIsReportSuccess(taskInfo.getEnableProfile());
boolean isEnableMemtableOnSinkNode =
destTable.getTableProperty().getUseSchemaLightChange()
? taskInfo.isMemtableOnSinkNode() : false;
queryOptions.setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode);
queryOptions.setEnableMemtableOnSinkNode(enableMemtableOnSinkNode);

pipParams.setQueryOptions(queryOptions);
TQueryGlobals queryGlobals = new TQueryGlobals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2900,6 +2900,14 @@ public void setEnableSingleReplicaInsert(boolean enableSingleReplicaInsert) {
this.enableSingleReplicaInsert = enableSingleReplicaInsert;
}

public boolean isEnableMemtableOnSinkNode() {
return enableMemtableOnSinkNode;
}

public void setEnableMemtableOnSinkNode(boolean enableMemtableOnSinkNode) {
this.enableMemtableOnSinkNode = enableMemtableOnSinkNode;
}

public boolean isEnableRuntimeFilterPrune() {
return enableRuntimeFilterPrune;
}
Expand Down

0 comments on commit a229380

Please sign in to comment.