Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[feature](pipelineX) add local_shuffle in sort partition sort analytic node #28265

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
: DataSinkOperatorX(operator_id, tnode.node_id),
_buffered_tuple_id(tnode.analytic_node.__isset.buffered_tuple_id
? tnode.analytic_node.buffered_tuple_id
: 0) {}
: 0),
_is_colocate(tnode.analytic_node.__isset.is_colocate && tnode.analytic_node.is_colocate) {
}

Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
10 changes: 10 additions & 0 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;

ExchangeType get_local_exchange_type() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return ExchangeType::PASSTHROUGH;
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate ? ExchangeType::BUCKET_HASH_SHUFFLE : ExchangeType::HASH_SHUFFLE;
}
return ExchangeType::NOOP;
}

private:
Status _insert_range_column(vectorized::Block* block, const vectorized::VExprContextSPtr& expr,
vectorized::IColumn* dst_column, size_t length);
Expand All @@ -123,6 +132,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
const TTupleId _buffered_tuple_id;

std::vector<size_t> _num_agg_input;
const bool _is_colocate;
};

} // namespace pipeline
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/partition_sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ class PartitionSortSinkOperatorX final : public DataSinkOperatorX<PartitionSortS
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: method 'get_local_exchange_type' can be made static [readability-convert-member-functions-to-static]

Suggested change
ExchangeType get_local_exchange_type() const override {
static ExchangeType get_local_exchange_type() override {

if (_topn_phase == TPartTopNPhase::TWO_PHASE_GLOBAL) {
return ExchangeType::NOOP;
}
return ExchangeType::PASSTHROUGH;
}

private:
friend class PartitionSortSinkLocalState;
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP
_limit(tnode.limit),
_use_topn_opt(tnode.sort_node.use_topn_opt),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read) {}
_use_two_phase_read(tnode.sort_node.sort_info.use_two_phase_read),
_merge_by_exchange(tnode.sort_node.merge_by_exchange) {}

Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));
Expand Down
8 changes: 8 additions & 0 deletions be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,13 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* in_block,
SourceState source_state) override;
ExchangeType get_local_exchange_type() const override {
if (_merge_by_exchange) {
// The current sort node is used for the ORDER BY
return ExchangeType::PASSTHROUGH;
}
return ExchangeType::NOOP;
}

private:
friend class SortSinkLocalState;
Expand All @@ -113,6 +120,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {

const RowDescriptor _row_descriptor;
const bool _use_two_phase_read;
const bool _merge_by_exchange;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1800,6 +1800,7 @@ public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sor
}
SortNode sortNode = (SortNode) inputFragment.getPlanRoot().getChild(0);
((ExchangeNode) inputFragment.getPlanRoot()).setMergeInfo(sortNode.getSortInfo());
sortNode.setMergeByExchange();
}
return inputFragment;
}
Expand Down Expand Up @@ -1854,6 +1855,7 @@ public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTra
exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo());
exchangeNode.setLimit(topN.getLimit());
exchangeNode.setOffset(topN.getOffset());
((SortNode) exchangeNode.getChild(0)).setMergeByExchange();
}
updateLegacyPlanIdToPhysicalPlan(inputFragment.getPlanRoot(), topN);
return inputFragment;
Expand Down Expand Up @@ -2020,6 +2022,7 @@ public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalW
// TODO: nereids forbid all parallel scan under PhysicalSetOperation temporary
if (findOlapScanNodesByPassExchangeAndJoinNode(inputPlanFragment.getPlanRoot())) {
inputPlanFragment.setHasColocatePlanNode(true);
analyticEvalNode.setColocate(true);
}
return inputPlanFragment;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public class AnalyticEvalNode extends PlanNode {
private final Expr orderByEq;
private final TupleDescriptor bufferedTupleDesc;

private boolean isColocate = false;

public AnalyticEvalNode(
PlanNodeId id, PlanNode input, List<Expr> analyticFnCalls,
List<Expr> partitionExprs, List<OrderByElement> orderByElements,
Expand Down Expand Up @@ -181,6 +183,10 @@ protected void computeOldCardinality() {
cardinality = getChild(0).cardinality;
}

public void setColocate(boolean colocate) {
this.isColocate = colocate;
}

@Override
protected String debugString() {
List<String> orderByElementStrs = Lists.newArrayList();
Expand Down Expand Up @@ -215,7 +221,7 @@ protected void toThrift(TPlanNode msg) {
msg.analytic_node.setPartitionExprs(Expr.treesToThrift(substitutedPartitionExprs));
msg.analytic_node.setOrderByExprs(Expr.treesToThrift(OrderByElement.getOrderByExprs(orderByElements)));
msg.analytic_node.setAnalyticFunctions(Expr.treesToThrift(analyticFnCalls));

msg.analytic_node.setIsColocate(isColocate);
if (analyticWindow == null) {
if (!orderByElements.isEmpty()) {
msg.analytic_node.setWindow(AnalyticWindow.DEFAULT_WINDOW.toThrift());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1308,6 +1308,7 @@ private PlanFragment createOrderByFragment(
exchNode.setLimit(limit);
}
exchNode.setMergeInfo(node.getSortInfo());
node.setMergeByExchange();
exchNode.setOffset(offset);

// Child nodes should not process the offset. If there is a limit,
Expand Down
11 changes: 10 additions & 1 deletion fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public class SortNode extends PlanNode {
private boolean useTopnOpt;
private boolean useTwoPhaseReadOpt;

private boolean isDefaultLimit;
// If mergeByexchange is set to true, the sort information is pushed to the
// exchange node, and the sort node is used for the ORDER BY .
private boolean mergeByexchange = false;

private boolean isDefaultLimit;
// if true, the output of this node feeds an AnalyticNode
private boolean isAnalyticSort;
private DataPartition inputPartition;
Expand Down Expand Up @@ -134,6 +138,10 @@ public SortInfo getSortInfo() {
return info;
}

public void setMergeByExchange() {
this.mergeByexchange = true;
}

public boolean getUseTopnOpt() {
return useTopnOpt;
}
Expand Down Expand Up @@ -309,6 +317,7 @@ protected void toThrift(TPlanNode msg) {
msg.sort_node = sortNode;
msg.sort_node.setOffset(offset);
msg.sort_node.setUseTopnOpt(useTopnOpt);
msg.sort_node.setMergeByExchange(this.mergeByexchange);
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions gensrc/thrift/PlanNodes.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,7 @@ struct TSortNode {
// Indicates whether the imposed limit comes DEFAULT_ORDER_BY_LIMIT.
6: optional bool is_default_limit
7: optional bool use_topn_opt
8: optional bool merge_by_exchange
}

enum TopNAlgorithm {
Expand Down Expand Up @@ -999,6 +1000,8 @@ struct TAnalyticNode {
// should be evaluated over a row that is composed of the child tuple and the buffered
// tuple
9: optional Exprs.TExpr order_by_eq

10: optional bool is_colocate
}

struct TMergeNode {
Expand Down