Skip to content

Commit

Permalink
[feature](pipelineX) add local_shuffle in sort partition sort analyti…
Browse files Browse the repository at this point in the history
…c node (apache#28265)
  • Loading branch information
Mryange authored and HappenLee committed Jan 12, 2024
1 parent a3416af commit 68da09c
Show file tree
Hide file tree
Showing 10 changed files with 53 additions and 4 deletions.
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
: DataSinkOperatorX(operator_id, tnode.node_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0) {}
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate) {
}

Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

ExchangeType get_local_exchange_type() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return ExchangeType::PASSTHROUGH;
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
}
return ExchangeType::NOOP;
}

private:
Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length);
Expand All @@ -123,6 +132,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
const TTupleId _buffered_tuple_id;

std::vector<size_t> _num_agg_input;
const bool _is_colocate;
};

} // namespace pipeline
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortS
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override {
if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
return ExchangeType::NOOP;
}
return ExchangeType::PASSTHROUGH;
}

private:
friend class PartitionSortSinkLocalState;
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP
_limit(tnode.limit),
_use_topn_opt(tnode.sort_node.use_topn_opt),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {}
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
_merge_by_exchange(tnode.sort_node.merge_by_exchange) {}

Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override {
if (_merge_by_exchange) {
// The current sort node is used for the ORDER BY
return ExchangeType::PASSTHROUGH;
}
return ExchangeType::NOOP;
}

private:
friend class SortSinkLocalState;
Expand All @@ -113,6 +120,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {

const RowDescriptor _row_descriptor;
const bool _use_two_phase_read;
const bool _merge_by_exchange;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,7 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sor
}
SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
sortNode.setMergeByExchange();
}
return inputFragment;
}
Expand Down Expand Up @@ -1854,6 +1855,7 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
((SortNode) exchangeNode.getChild(0)).setMergeByExchange();
}
updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), topN);
return inputFragment;
Expand Down Expand Up @@ -2020,6 +2022,7 @@ public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalW
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
inputPlanFragment.setHasColocatePlanNode(true);
analyticEvalNode.setColocate(true);
}
return inputPlanFragment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class AnalyticEvalNode extends PlanNode {
private final Expr orderByEq;
private final TupleDescriptor bufferedTupleDesc;

private boolean isColocate = false;

public AnalyticEvalNode(
PlanNodeId id, PlanNode input, List<Expr> analyticFnCalls,
List<Expr> partitionExprs, List<OrderByElement> orderByElements,
Expand Down Expand Up @@ -181,6 +183,10 @@ protected void computeOldCardinality() {
cardinality = getChild(0).cardinality;
}

public void setColocate(boolean colocate) {
this.isColocate = colocate;
}

@Override
protected String debugString() {
List<String> orderByElementStrs = Lists.newArrayList();
Expand Down Expand Up @@ -215,7 +221,7 @@ protected void toThrift(TPlanNode msg) {
msg.analytic_node.setPartitionExprs(Expr.treesToThrift(substitutedPartitionExprs));
msg.analytic_node.setOrderByExprs(Expr.treesToThrift(OrderByElement.getOrderByExprs(orderByElements)));
msg.analytic_node.setAnalyticFunctions(Expr.treesToThrift(analyticFnCalls));

msg.analytic_node.setIsColocate(isColocate);
if (analyticWindow == null) {
if (!orderByElements.isEmpty()) {
msg.analytic_node.setWindow(AnalyticWindow.DEFAULT_WINDOW.toThrift());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,7 @@ private PlanFragment createOrderByFragment(
exchNode.setLimit(limit);
}
exchNode.setMergeInfo(node.getSortInfo());
node.setMergeByExchange();
exchNode.setOffset(offset);

// Child nodes should not process the offset. If there is a limit,
Expand Down
11 changes: 10 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public class SortNode extends PlanNode {
private boolean useTopnOpt;
private boolean useTwoPhaseReadOpt;

private boolean isDefaultLimit;
// If mergeByexchange is set to true, the sort information is pushed to the
// exchange node, and the sort node is used for the ORDER BY .
private boolean mergeByexchange = false;

private boolean isDefaultLimit;
// if true, the output of this node feeds an AnalyticNode
private boolean isAnalyticSort;
private DataPartition inputPartition;
Expand Down Expand Up @@ -134,6 +138,10 @@ public SortInfo getSortInfo() {
return info;
}

public void setMergeByExchange() {
this.mergeByexchange = true;
}

public boolean getUseTopnOpt() {
return useTopnOpt;
}
Expand Down Expand Up @@ -309,6 +317,7 @@ protected void toThrift(TPlanNode msg) {
msg.sort_node = sortNode;
msg.sort_node.setOffset(offset);
msg.sort_node.setUseTopnOpt(useTopnOpt);
msg.sort_node.setMergeByExchange(this.mergeByexchange);
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ struct TSortNode {
// Indicates whether the imposed limit comes DEFAULT_ORDER_BY_LIMIT.
6: optional bool is_default_limit
7: optional bool use_topn_opt
8: optional bool merge_by_exchange
}

enum TopNAlgorithm {
Expand Down Expand Up @@ -999,6 +1000,8 @@ struct TAnalyticNode {
// should be evaluated over a row that is composed of the child tuple and the buffered
// tuple
9: optional Exprs.TExpr order_by_eq

10: optional bool is_colocate
}

struct TMergeNode {
Expand Down

0 comments on commit 68da09c

Please sign in to comment.