Skip to content

Commit 17fd5a8

Browse files
committed
[local exchange](fix) Fix correctness caused by local exchange (apache#41555)
For plan `local exchange (hash shuffle) -> union -> colocated agg`, we must ensure local exchange use the same hash algorithm as MPP shuffling. This problem is covered by our test cases but only can be reproduced on multiple BEs so no case is added in this PR.
1 parent 3b6edc5 commit 17fd5a8

14 files changed

+50
-37
lines changed

be/src/pipeline/exec/aggregation_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
164164
? DataDistribution(ExchangeType::PASSTHROUGH)
165165
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
166166
}
167-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
167+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
168168
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
169169
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
170170
}

be/src/pipeline/exec/analytic_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
102102
if (_partition_by_eq_expr_ctxs.empty()) {
103103
return {ExchangeType::PASSTHROUGH};
104104
} else if (_order_by_eq_expr_ctxs.empty()) {
105-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
105+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
106106
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
107107
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
108108
}

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class DistinctStreamingAggOperatorX final
107107

108108
DataDistribution required_data_distribution() const override {
109109
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
110-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
110+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
111111
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
112112
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
113113
}

be/src/pipeline/exec/hashjoin_build_sink.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class HashJoinBuildSinkOperatorX final
167167
bool require_shuffled_data_distribution() const override {
168168
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
169169
}
170-
bool is_shuffled_hash_join() const override {
170+
bool is_shuffled_operator() const override {
171171
return _join_distribution == TJoinDistributionType::PARTITIONED;
172172
}
173173
bool require_data_distribution() const override {

be/src/pipeline/exec/hashjoin_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
178178
bool require_shuffled_data_distribution() const override {
179179
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
180180
}
181-
bool is_shuffled_hash_join() const override {
181+
bool is_shuffled_operator() const override {
182182
return _join_distribution == TJoinDistributionType::PARTITIONED;
183183
}
184184
bool require_data_distribution() const override {

be/src/pipeline/exec/operator.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,13 @@ class OperatorBase {
254254

255255
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
256256
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
257-
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
258-
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
259-
_followed_by_shuffled_join = followed_by_shuffled_join;
257+
[[nodiscard]] bool followed_by_shuffled_operator() const {
258+
return _followed_by_shuffled_operator;
260259
}
260+
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
261+
_followed_by_shuffled_operator = followed_by_shuffled_operator;
262+
}
263+
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
261264
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }
262265

263266
protected:
@@ -268,7 +271,7 @@ class OperatorBase {
268271
OperatorXPtr _child_x = nullptr;
269272

270273
bool _is_closed;
271-
bool _followed_by_shuffled_join = false;
274+
bool _followed_by_shuffled_operator = false;
272275
};
273276

274277
/**

be/src/pipeline/exec/partitioned_hash_join_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class PartitionedHashJoinProbeOperatorX final
172172
bool require_shuffled_data_distribution() const override {
173173
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
174174
}
175-
bool is_shuffled_hash_join() const override {
175+
bool is_shuffled_operator() const override {
176176
return _join_distribution == TJoinDistributionType::PARTITIONED;
177177
}
178178

be/src/pipeline/exec/partitioned_hash_join_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class PartitionedHashJoinSinkOperatorX
122122
bool require_shuffled_data_distribution() const override {
123123
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
124124
}
125-
bool is_shuffled_hash_join() const override {
125+
bool is_shuffled_operator() const override {
126126
return _join_distribution == TJoinDistributionType::PARTITIONED;
127127
}
128128

be/src/pipeline/exec/sort_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
8787
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
8888
DataDistribution required_data_distribution() const override {
8989
if (_is_analytic_sort) {
90-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
90+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
9191
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
9292
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
9393
} else if (_merge_by_exchange) {

be/src/pipeline/exec/union_sink_operator.h

+6
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
125125
}
126126
}
127127

128+
bool require_shuffled_data_distribution() const override {
129+
return _followed_by_shuffled_operator;
130+
}
131+
132+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
133+
128134
private:
129135
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }
130136

be/src/pipeline/exec/union_source_operator.h

+5
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
135135
return Status::OK();
136136
}
137137
[[nodiscard]] int get_child_count() const { return _child_size; }
138+
bool require_shuffled_data_distribution() const override {
139+
return _followed_by_shuffled_operator;
140+
}
141+
142+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
138143

139144
private:
140145
bool _has_data(RuntimeState* state) const {

be/src/pipeline/pipeline_x/operator.h

-4
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ class OperatorXBase : public OperatorBase {
231231

232232
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }
233233

234-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
235-
236234
bool can_read() override {
237235
LOG(FATAL) << "should not reach here!";
238236
return false;
@@ -628,8 +626,6 @@ class DataSinkOperatorXBase : public OperatorBase {
628626
: DataDistribution(ExchangeType::NOOP);
629627
}
630628

631-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
632-
633629
Status close(RuntimeState* state) override {
634630
return Status::InternalError("Should not reach here!");
635631
}

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp

+23-20
Original file line numberDiff line numberDiff line change
@@ -774,7 +774,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
774774
ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
775775
const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs,
776776
OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe,
777-
int child_idx, const bool followed_by_shuffled_join) {
777+
int child_idx, const bool followed_by_shuffled_operator) {
778778
// propagate error case
779779
if (*node_idx >= tnodes.size()) {
780780
// TODO: print thrift msg
@@ -785,11 +785,11 @@ Status PipelineXFragmentContext::_create_tree_helper(
785785
const TPlanNode& tnode = tnodes[*node_idx];
786786

787787
int num_children = tnodes[*node_idx].num_children;
788-
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
788+
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
789789
OperatorXPtr op = nullptr;
790790
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
791791
parent == nullptr ? -1 : parent->node_id(), child_idx,
792-
followed_by_shuffled_join));
792+
followed_by_shuffled_operator));
793793

794794
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
795795
if (parent != nullptr) {
@@ -800,7 +800,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
800800
}
801801

802802
/**
803-
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
803+
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
804804
*
805805
* For plan:
806806
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
@@ -814,8 +814,8 @@ Status PipelineXFragmentContext::_create_tree_helper(
814814
cur_pipe->operator_xs().empty()
815815
? cur_pipe->sink_x()->require_shuffled_data_distribution()
816816
: op->require_shuffled_data_distribution();
817-
current_followed_by_shuffled_join =
818-
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
817+
current_followed_by_shuffled_operator =
818+
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
819819
require_shuffled_data_distribution;
820820

821821
cur_pipe->_name.push_back('-');
@@ -826,7 +826,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
826826
for (int i = 0; i < num_children; i++) {
827827
++*node_idx;
828828
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
829-
cur_pipe, i, current_followed_by_shuffled_join));
829+
cur_pipe, i, current_followed_by_shuffled_operator));
830830

831831
// we are expecting a child, but have used all nodes
832832
// this means we have been given a bad tree and must fail
@@ -868,13 +868,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
868868
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
869869
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
870870
*/
871-
const bool followed_by_shuffled_join =
872-
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join()
873-
: cur_pipe->sink_x()->followed_by_shuffled_join();
871+
const bool followed_by_shuffled_operator =
872+
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator()
873+
: cur_pipe->sink_x()->followed_by_shuffled_operator();
874874
const bool should_disable_bucket_shuffle =
875875
bucket_seq_to_instance_idx.empty() &&
876876
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
877-
followed_by_shuffled_join;
877+
followed_by_shuffled_operator;
878878
sink.reset(new LocalExchangeSinkOperatorX(
879879
sink_id, local_exchange_id,
880880
should_disable_bucket_shuffle ? _total_instances : _num_instances,
@@ -1050,7 +1050,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
10501050
const DescriptorTbl& descs, OperatorXPtr& op,
10511051
PipelinePtr& cur_pipe, int parent_idx,
10521052
int child_idx,
1053-
const bool followed_by_shuffled_join) {
1053+
const bool followed_by_shuffled_operator) {
10541054
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
10551055
// Therefore, here we need to use a stack-like structure.
10561056
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
@@ -1124,7 +1124,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
11241124
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
11251125
_require_bucket_distribution));
11261126
RETURN_IF_ERROR(cur_pipe->add_operator(op));
1127-
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
1127+
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
11281128
_require_bucket_distribution =
11291129
_require_bucket_distribution || op->require_data_distribution();
11301130
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
@@ -1155,7 +1155,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
11551155
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
11561156
_require_bucket_distribution));
11571157
}
1158-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1158+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
11591159
_require_bucket_distribution =
11601160
_require_bucket_distribution || sink->require_data_distribution();
11611161
sink->set_dests_id({op->operator_id()});
@@ -1206,8 +1206,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12061206

12071207
_pipeline_parent_map.push(op->node_id(), cur_pipe);
12081208
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1209-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1210-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1209+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1210+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
12111211
} else {
12121212
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
12131213
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -1228,8 +1228,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12281228

12291229
_pipeline_parent_map.push(op->node_id(), cur_pipe);
12301230
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1231-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1232-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1231+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1232+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
12331233
}
12341234
_require_bucket_distribution =
12351235
_require_bucket_distribution || op->require_data_distribution();
@@ -1259,6 +1259,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12591259
case TPlanNodeType::UNION_NODE: {
12601260
int child_count = tnode.num_children;
12611261
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
1262+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
12621263
RETURN_IF_ERROR(cur_pipe->add_operator(op));
12631264

12641265
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1301,7 +1302,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
13011302
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
13021303
_require_bucket_distribution));
13031304
}
1304-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1305+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13051306
_require_bucket_distribution =
13061307
_require_bucket_distribution || sink->require_data_distribution();
13071308
sink->set_dests_id({op->operator_id()});
@@ -1341,7 +1342,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
13411342
DataSinkOperatorXPtr sink;
13421343
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
13431344
_require_bucket_distribution));
1344-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1345+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13451346
_require_bucket_distribution =
13461347
_require_bucket_distribution || sink->require_data_distribution();
13471348
sink->set_dests_id({op->operator_id()});
@@ -1352,11 +1353,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
13521353
case TPlanNodeType::INTERSECT_NODE: {
13531354
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
13541355
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1356+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
13551357
break;
13561358
}
13571359
case TPlanNodeType::EXCEPT_NODE: {
13581360
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
13591361
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1362+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
13601363
break;
13611364
}
13621365
case TPlanNodeType::REPEAT_NODE: {

be/src/pipeline/pipeline_x/pipeline_x_task.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,10 +139,10 @@ class PipelineXTask : public PipelineTask {
139139
int task_id() const { return _index; };
140140

141141
void clear_blocking_state(bool wake_up_by_downstream = false) override {
142-
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
143142
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
144143
// We use a lock to assure all dependencies are not deconstructed here.
145144
std::unique_lock<std::mutex> lc(_dependency_lock);
145+
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
146146
if (!_finished) {
147147
_execution_dep->set_always_ready();
148148
for (auto* dep : _filter_dependencies) {

0 commit comments

Comments
 (0)