Skip to content

Commit e997214

Browse files
Gabriel39cjj2010
authored andcommitted
[local exchange](fix) Fix correctness caused by local exchange (apache#41555)
For plan `local exchange (hash shuffle) -> union -> colocated agg`, we must ensure local exchange use the same hash algorithm as MPP shuffling. This problem is covered by our test cases but only can be reproduced on multiple BEs so no case is added in this PR.
1 parent 4b06fe7 commit e997214

13 files changed

+52
-38
lines changed

be/src/pipeline/exec/aggregation_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
148148
? DataDistribution(ExchangeType::PASSTHROUGH)
149149
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
150150
}
151-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
151+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
152152
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
153153
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
154154
}

be/src/pipeline/exec/analytic_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
8181
if (_partition_by_eq_expr_ctxs.empty()) {
8282
return {ExchangeType::PASSTHROUGH};
8383
} else if (_order_by_eq_expr_ctxs.empty()) {
84-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
84+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
8585
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
8686
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
8787
}

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ class DistinctStreamingAggOperatorX final
105105

106106
DataDistribution required_data_distribution() const override {
107107
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
108-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
108+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
109109
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
110110
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
111111
}

be/src/pipeline/exec/hashjoin_build_sink.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ class HashJoinBuildSinkOperatorX final
144144
bool require_shuffled_data_distribution() const override {
145145
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
146146
}
147-
bool is_shuffled_hash_join() const override {
147+
bool is_shuffled_operator() const override {
148148
return _join_distribution == TJoinDistributionType::PARTITIONED;
149149
}
150150
bool require_data_distribution() const override {

be/src/pipeline/exec/hashjoin_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
155155
bool require_shuffled_data_distribution() const override {
156156
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
157157
}
158-
bool is_shuffled_hash_join() const override {
158+
bool is_shuffled_operator() const override {
159159
return _join_distribution == TJoinDistributionType::PARTITIONED;
160160
}
161161
bool require_data_distribution() const override {

be/src/pipeline/exec/operator.h

+7-8
Original file line numberDiff line numberDiff line change
@@ -108,17 +108,20 @@ class OperatorBase {
108108
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
109109
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
110110
OperatorPtr child() { return _child; }
111-
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
112-
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
113-
_followed_by_shuffled_join = followed_by_shuffled_join;
111+
[[nodiscard]] bool followed_by_shuffled_operator() const {
112+
return _followed_by_shuffled_operator;
114113
}
114+
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
115+
_followed_by_shuffled_operator = followed_by_shuffled_operator;
116+
}
117+
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
115118
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }
116119

117120
protected:
118121
OperatorPtr _child = nullptr;
119122

120123
bool _is_closed;
121-
bool _followed_by_shuffled_join = false;
124+
bool _followed_by_shuffled_operator = false;
122125
};
123126

124127
class PipelineXLocalStateBase {
@@ -477,8 +480,6 @@ class DataSinkOperatorXBase : public OperatorBase {
477480
[[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0;
478481
[[nodiscard]] virtual DataDistribution required_data_distribution() const;
479482

480-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
481-
482483
Status close(RuntimeState* state) override {
483484
return Status::InternalError("Should not reach here!");
484485
}
@@ -663,8 +664,6 @@ class OperatorXBase : public OperatorBase {
663664
[[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block,
664665
bool* eos) = 0;
665666

666-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
667-
668667
Status close(RuntimeState* state) override;
669668

670669
[[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {

be/src/pipeline/exec/partitioned_hash_join_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ class PartitionedHashJoinProbeOperatorX final
168168
bool require_shuffled_data_distribution() const override {
169169
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
170170
}
171-
bool is_shuffled_hash_join() const override {
171+
bool is_shuffled_operator() const override {
172172
return _join_distribution == TJoinDistributionType::PARTITIONED;
173173
}
174174

be/src/pipeline/exec/partitioned_hash_join_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ class PartitionedHashJoinSinkOperatorX
118118
bool require_shuffled_data_distribution() const override {
119119
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
120120
}
121-
bool is_shuffled_hash_join() const override {
121+
bool is_shuffled_operator() const override {
122122
return _join_distribution == TJoinDistributionType::PARTITIONED;
123123
}
124124

be/src/pipeline/exec/sort_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
6363
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
6464
DataDistribution required_data_distribution() const override {
6565
if (_is_analytic_sort) {
66-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
66+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
6767
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
6868
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
6969
} else if (_merge_by_exchange) {

be/src/pipeline/exec/union_sink_operator.h

+6
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
8989
}
9090
}
9191

92+
bool require_shuffled_data_distribution() const override {
93+
return _followed_by_shuffled_operator;
94+
}
95+
96+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
97+
9298
private:
9399
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }
94100

be/src/pipeline/exec/union_source_operator.h

+5
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
9595
return Status::OK();
9696
}
9797
[[nodiscard]] int get_child_count() const { return _child_size; }
98+
bool require_shuffled_data_distribution() const override {
99+
return _followed_by_shuffled_operator;
100+
}
101+
102+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
98103

99104
private:
100105
bool _has_data(RuntimeState* state) const {

be/src/pipeline/pipeline_fragment_context.cpp

+25-21
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
667667
const DescriptorTbl& descs, OperatorPtr parent,
668668
int* node_idx, OperatorPtr* root,
669669
PipelinePtr& cur_pipe, int child_idx,
670-
const bool followed_by_shuffled_join) {
670+
const bool followed_by_shuffled_operator) {
671671
// propagate error case
672672
if (*node_idx >= tnodes.size()) {
673673
return Status::InternalError(
@@ -677,11 +677,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
677677
const TPlanNode& tnode = tnodes[*node_idx];
678678

679679
int num_children = tnodes[*node_idx].num_children;
680-
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
680+
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
681681
OperatorPtr op = nullptr;
682682
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
683683
parent == nullptr ? -1 : parent->node_id(), child_idx,
684-
followed_by_shuffled_join));
684+
followed_by_shuffled_operator));
685685

686686
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
687687
if (parent != nullptr) {
@@ -691,7 +691,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
691691
*root = op;
692692
}
693693
/**
694-
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
694+
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
695695
*
696696
* For plan:
697697
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
@@ -704,15 +704,15 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
704704
auto require_shuffled_data_distribution =
705705
cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution()
706706
: op->require_shuffled_data_distribution();
707-
current_followed_by_shuffled_join =
708-
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
707+
current_followed_by_shuffled_operator =
708+
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
709709
require_shuffled_data_distribution;
710710

711711
// rely on that tnodes is preorder of the plan
712712
for (int i = 0; i < num_children; i++) {
713713
++*node_idx;
714714
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
715-
cur_pipe, i, current_followed_by_shuffled_join));
715+
cur_pipe, i, current_followed_by_shuffled_operator));
716716

717717
// we are expecting a child, but have used all nodes
718718
// this means we have been given a bad tree and must fail
@@ -753,13 +753,13 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
753753
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
754754
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
755755
*/
756-
const bool followed_by_shuffled_join = operators.size() > idx
757-
? operators[idx]->followed_by_shuffled_join()
758-
: cur_pipe->sink()->followed_by_shuffled_join();
756+
const bool followed_by_shuffled_operator =
757+
operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
758+
: cur_pipe->sink()->followed_by_shuffled_operator();
759759
const bool should_disable_bucket_shuffle =
760760
bucket_seq_to_instance_idx.empty() &&
761761
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
762-
followed_by_shuffled_join;
762+
followed_by_shuffled_operator;
763763
sink.reset(new LocalExchangeSinkOperatorX(
764764
sink_id, local_exchange_id,
765765
should_disable_bucket_shuffle ? _total_instances : _num_instances,
@@ -1199,7 +1199,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
11991199
const DescriptorTbl& descs, OperatorPtr& op,
12001200
PipelinePtr& cur_pipe, int parent_idx,
12011201
int child_idx,
1202-
const bool followed_by_shuffled_join) {
1202+
const bool followed_by_shuffled_operator) {
12031203
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
12041204
// Therefore, here we need to use a stack-like structure.
12051205
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
@@ -1321,15 +1321,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
13211321

13221322
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
13231323
_require_bucket_distribution));
1324-
op->set_followed_by_shuffled_join(false);
1324+
op->set_followed_by_shuffled_operator(false);
13251325
_require_bucket_distribution = true;
13261326
RETURN_IF_ERROR(new_pipe->add_operator(op));
13271327
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
13281328
cur_pipe = new_pipe;
13291329
} else {
13301330
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
13311331
_require_bucket_distribution));
1332-
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
1332+
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13331333
_require_bucket_distribution =
13341334
_require_bucket_distribution || op->require_data_distribution();
13351335
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -1384,7 +1384,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
13841384
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
13851385
_require_bucket_distribution));
13861386
}
1387-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1387+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13881388
_require_bucket_distribution =
13891389
_require_bucket_distribution || sink->require_data_distribution();
13901390
sink->set_dests_id({op->operator_id()});
@@ -1434,8 +1434,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
14341434

14351435
_pipeline_parent_map.push(op->node_id(), cur_pipe);
14361436
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1437-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1438-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1437+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1438+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
14391439
} else {
14401440
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
14411441
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -1456,8 +1456,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
14561456

14571457
_pipeline_parent_map.push(op->node_id(), cur_pipe);
14581458
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1459-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1460-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1459+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1460+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
14611461
}
14621462
_require_bucket_distribution =
14631463
_require_bucket_distribution || op->require_data_distribution();
@@ -1487,6 +1487,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
14871487
case TPlanNodeType::UNION_NODE: {
14881488
int child_count = tnode.num_children;
14891489
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
1490+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
14901491
RETURN_IF_ERROR(cur_pipe->add_operator(op));
14911492

14921493
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1498,6 +1499,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
14981499
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
14991500
DataSinkOperatorPtr sink;
15001501
sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs));
1502+
sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
15011503
sink->set_dests_id({op->operator_id()});
15021504
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
15031505
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
@@ -1531,7 +1533,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
15311533
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
15321534
_require_bucket_distribution));
15331535
}
1534-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1536+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
15351537
_require_bucket_distribution =
15361538
_require_bucket_distribution || sink->require_data_distribution();
15371539
sink->set_dests_id({op->operator_id()});
@@ -1571,7 +1573,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
15711573
DataSinkOperatorPtr sink;
15721574
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
15731575
_require_bucket_distribution));
1574-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1576+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
15751577
_require_bucket_distribution =
15761578
_require_bucket_distribution || sink->require_data_distribution();
15771579
sink->set_dests_id({op->operator_id()});
@@ -1582,11 +1584,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
15821584
case TPlanNodeType::INTERSECT_NODE: {
15831585
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
15841586
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1587+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
15851588
break;
15861589
}
15871590
case TPlanNodeType::EXCEPT_NODE: {
15881591
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
15891592
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1593+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
15901594
break;
15911595
}
15921596
case TPlanNodeType::REPEAT_NODE: {

be/src/pipeline/pipeline_task.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,10 @@ class PipelineTask {
136136
bool is_finalized() const { return _finalized; }
137137

138138
void clear_blocking_state(bool wake_up_by_downstream = false) {
139-
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
140139
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
141140
// We use a lock to assure all dependencies are not deconstructed here.
142141
std::unique_lock<std::mutex> lc(_dependency_lock);
142+
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
143143
if (!_finalized) {
144144
_execution_dep->set_always_ready();
145145
for (auto* dep : _filter_dependencies) {

0 commit comments

Comments
 (0)