Skip to content

Commit cd11837

Browse files
committed
[local exchange](fix) Fix correctness caused by local exchange (apache#41555)
For plan `local exchange (hash shuffle) -> union -> colocated agg`, we must ensure local exchange use the same hash algorithm as MPP shuffling. This problem is covered by our test cases but only can be reproduced on multiple BEs so no case is added in this PR.
1 parent 09cea73 commit cd11837

13 files changed

+49
-36
lines changed

be/src/pipeline/exec/aggregation_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
164164
? DataDistribution(ExchangeType::PASSTHROUGH)
165165
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
166166
}
167-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
167+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
168168
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
169169
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
170170
}

be/src/pipeline/exec/analytic_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
102102
if (_partition_by_eq_expr_ctxs.empty()) {
103103
return {ExchangeType::PASSTHROUGH};
104104
} else if (_order_by_eq_expr_ctxs.empty()) {
105-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
105+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
106106
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
107107
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
108108
}

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ class DistinctStreamingAggOperatorX final
107107

108108
DataDistribution required_data_distribution() const override {
109109
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
110-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
110+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
111111
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
112112
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
113113
}

be/src/pipeline/exec/hashjoin_build_sink.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class HashJoinBuildSinkOperatorX final
167167
bool require_shuffled_data_distribution() const override {
168168
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
169169
}
170-
bool is_shuffled_hash_join() const override {
170+
bool is_shuffled_operator() const override {
171171
return _join_distribution == TJoinDistributionType::PARTITIONED;
172172
}
173173
bool require_data_distribution() const override {

be/src/pipeline/exec/hashjoin_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
178178
bool require_shuffled_data_distribution() const override {
179179
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
180180
}
181-
bool is_shuffled_hash_join() const override {
181+
bool is_shuffled_operator() const override {
182182
return _join_distribution == TJoinDistributionType::PARTITIONED;
183183
}
184184
bool require_data_distribution() const override {

be/src/pipeline/exec/operator.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -254,10 +254,13 @@ class OperatorBase {
254254

255255
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
256256
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
257-
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
258-
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
259-
_followed_by_shuffled_join = followed_by_shuffled_join;
257+
[[nodiscard]] bool followed_by_shuffled_operator() const {
258+
return _followed_by_shuffled_operator;
260259
}
260+
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
261+
_followed_by_shuffled_operator = followed_by_shuffled_operator;
262+
}
263+
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
261264
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }
262265

263266
protected:
@@ -268,7 +271,7 @@ class OperatorBase {
268271
OperatorXPtr _child_x = nullptr;
269272

270273
bool _is_closed;
271-
bool _followed_by_shuffled_join = false;
274+
bool _followed_by_shuffled_operator = false;
272275
};
273276

274277
/**

be/src/pipeline/exec/partitioned_hash_join_probe_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class PartitionedHashJoinProbeOperatorX final
172172
bool require_shuffled_data_distribution() const override {
173173
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
174174
}
175-
bool is_shuffled_hash_join() const override {
175+
bool is_shuffled_operator() const override {
176176
return _join_distribution == TJoinDistributionType::PARTITIONED;
177177
}
178178

be/src/pipeline/exec/partitioned_hash_join_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ class PartitionedHashJoinSinkOperatorX
122122
bool require_shuffled_data_distribution() const override {
123123
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
124124
}
125-
bool is_shuffled_hash_join() const override {
125+
bool is_shuffled_operator() const override {
126126
return _join_distribution == TJoinDistributionType::PARTITIONED;
127127
}
128128

be/src/pipeline/exec/sort_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
8787
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
8888
DataDistribution required_data_distribution() const override {
8989
if (_is_analytic_sort) {
90-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
90+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
9191
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
9292
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
9393
} else if (_merge_by_exchange) {

be/src/pipeline/exec/union_sink_operator.h

+6
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
125125
}
126126
}
127127

128+
bool require_shuffled_data_distribution() const override {
129+
return _followed_by_shuffled_operator;
130+
}
131+
132+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
133+
128134
private:
129135
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }
130136

be/src/pipeline/exec/union_source_operator.h

+5
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
135135
return Status::OK();
136136
}
137137
[[nodiscard]] int get_child_count() const { return _child_size; }
138+
bool require_shuffled_data_distribution() const override {
139+
return _followed_by_shuffled_operator;
140+
}
141+
142+
bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }
138143

139144
private:
140145
bool _has_data(RuntimeState* state) const {

be/src/pipeline/pipeline_x/operator.h

-4
Original file line numberDiff line numberDiff line change
@@ -231,8 +231,6 @@ class OperatorXBase : public OperatorBase {
231231

232232
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }
233233

234-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
235-
236234
bool can_read() override {
237235
LOG(FATAL) << "should not reach here!";
238236
return false;
@@ -627,8 +625,6 @@ class DataSinkOperatorXBase : public OperatorBase {
627625
: DataDistribution(ExchangeType::NOOP);
628626
}
629627

630-
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
631-
632628
Status close(RuntimeState* state) override {
633629
return Status::InternalError("Should not reach here!");
634630
}

be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp

+23-20
Original file line numberDiff line numberDiff line change
@@ -771,7 +771,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
771771
ObjectPool* pool, const std::vector<TPlanNode>& tnodes,
772772
const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs,
773773
OperatorXPtr parent, int* node_idx, OperatorXPtr* root, PipelinePtr& cur_pipe,
774-
int child_idx, const bool followed_by_shuffled_join) {
774+
int child_idx, const bool followed_by_shuffled_operator) {
775775
// propagate error case
776776
if (*node_idx >= tnodes.size()) {
777777
// TODO: print thrift msg
@@ -782,11 +782,11 @@ Status PipelineXFragmentContext::_create_tree_helper(
782782
const TPlanNode& tnode = tnodes[*node_idx];
783783

784784
int num_children = tnodes[*node_idx].num_children;
785-
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
785+
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
786786
OperatorXPtr op = nullptr;
787787
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
788788
parent == nullptr ? -1 : parent->node_id(), child_idx,
789-
followed_by_shuffled_join));
789+
followed_by_shuffled_operator));
790790

791791
// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
792792
if (parent != nullptr) {
@@ -797,7 +797,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
797797
}
798798

799799
/**
800-
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
800+
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
801801
*
802802
* For plan:
803803
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
@@ -811,8 +811,8 @@ Status PipelineXFragmentContext::_create_tree_helper(
811811
cur_pipe->operator_xs().empty()
812812
? cur_pipe->sink_x()->require_shuffled_data_distribution()
813813
: op->require_shuffled_data_distribution();
814-
current_followed_by_shuffled_join =
815-
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
814+
current_followed_by_shuffled_operator =
815+
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
816816
require_shuffled_data_distribution;
817817

818818
cur_pipe->_name.push_back('-');
@@ -823,7 +823,7 @@ Status PipelineXFragmentContext::_create_tree_helper(
823823
for (int i = 0; i < num_children; i++) {
824824
++*node_idx;
825825
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
826-
cur_pipe, i, current_followed_by_shuffled_join));
826+
cur_pipe, i, current_followed_by_shuffled_operator));
827827

828828
// we are expecting a child, but have used all nodes
829829
// this means we have been given a bad tree and must fail
@@ -865,13 +865,13 @@ Status PipelineXFragmentContext::_add_local_exchange_impl(
865865
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
866866
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
867867
*/
868-
const bool followed_by_shuffled_join =
869-
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_join()
870-
: cur_pipe->sink_x()->followed_by_shuffled_join();
868+
const bool followed_by_shuffled_operator =
869+
operator_xs.size() > idx ? operator_xs[idx]->followed_by_shuffled_operator()
870+
: cur_pipe->sink_x()->followed_by_shuffled_operator();
871871
const bool should_disable_bucket_shuffle =
872872
bucket_seq_to_instance_idx.empty() &&
873873
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
874-
followed_by_shuffled_join;
874+
followed_by_shuffled_operator;
875875
sink.reset(new LocalExchangeSinkOperatorX(
876876
sink_id, local_exchange_id,
877877
should_disable_bucket_shuffle ? _total_instances : _num_instances,
@@ -1047,7 +1047,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
10471047
const DescriptorTbl& descs, OperatorXPtr& op,
10481048
PipelinePtr& cur_pipe, int parent_idx,
10491049
int child_idx,
1050-
const bool followed_by_shuffled_join) {
1050+
const bool followed_by_shuffled_operator) {
10511051
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
10521052
// Therefore, here we need to use a stack-like structure.
10531053
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
@@ -1121,7 +1121,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
11211121
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
11221122
_require_bucket_distribution));
11231123
RETURN_IF_ERROR(cur_pipe->add_operator(op));
1124-
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
1124+
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
11251125
_require_bucket_distribution =
11261126
_require_bucket_distribution || op->require_data_distribution();
11271127
} else if (tnode.agg_node.__isset.use_streaming_preaggregation &&
@@ -1152,7 +1152,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
11521152
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
11531153
_require_bucket_distribution));
11541154
}
1155-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1155+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
11561156
_require_bucket_distribution =
11571157
_require_bucket_distribution || sink->require_data_distribution();
11581158
sink->set_dests_id({op->operator_id()});
@@ -1203,8 +1203,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12031203

12041204
_pipeline_parent_map.push(op->node_id(), cur_pipe);
12051205
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1206-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1207-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1206+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1207+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
12081208
} else {
12091209
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
12101210
RETURN_IF_ERROR(cur_pipe->add_operator(op));
@@ -1225,8 +1225,8 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12251225

12261226
_pipeline_parent_map.push(op->node_id(), cur_pipe);
12271227
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
1228-
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
1229-
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
1228+
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
1229+
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
12301230
}
12311231
_require_bucket_distribution =
12321232
_require_bucket_distribution || op->require_data_distribution();
@@ -1256,6 +1256,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12561256
case TPlanNodeType::UNION_NODE: {
12571257
int child_count = tnode.num_children;
12581258
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
1259+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
12591260
RETURN_IF_ERROR(cur_pipe->add_operator(op));
12601261

12611262
const auto downstream_pipeline_id = cur_pipe->id();
@@ -1298,7 +1299,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
12981299
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
12991300
_require_bucket_distribution));
13001301
}
1301-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1302+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13021303
_require_bucket_distribution =
13031304
_require_bucket_distribution || sink->require_data_distribution();
13041305
sink->set_dests_id({op->operator_id()});
@@ -1338,7 +1339,7 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
13381339
DataSinkOperatorXPtr sink;
13391340
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
13401341
_require_bucket_distribution));
1341-
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
1342+
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
13421343
_require_bucket_distribution =
13431344
_require_bucket_distribution || sink->require_data_distribution();
13441345
sink->set_dests_id({op->operator_id()});
@@ -1349,11 +1350,13 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN
13491350
case TPlanNodeType::INTERSECT_NODE: {
13501351
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
13511352
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1353+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
13521354
break;
13531355
}
13541356
case TPlanNodeType::EXCEPT_NODE: {
13551357
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
13561358
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
1359+
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
13571360
break;
13581361
}
13591362
case TPlanNodeType::REPEAT_NODE: {

0 commit comments

Comments
 (0)