Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[local exchange](fix) Fix correctness caused by local exchange #41555

Merged
merged 2 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class DistinctStreamingAggOperatorX final

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class HashJoinBuildSinkOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLoca
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN && !_is_broadcast_join;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}
bool require_data_distribution() const override {
Expand Down
15 changes: 7 additions & 8 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,20 @@ class OperatorBase {
virtual Status revoke_memory(RuntimeState* state) { return Status::OK(); }
[[nodiscard]] virtual bool require_data_distribution() const { return false; }
OperatorPtr child() { return _child; }
[[nodiscard]] bool followed_by_shuffled_join() const { return _followed_by_shuffled_join; }
void set_followed_by_shuffled_join(bool followed_by_shuffled_join) {
_followed_by_shuffled_join = followed_by_shuffled_join;
[[nodiscard]] bool followed_by_shuffled_operator() const {
return _followed_by_shuffled_operator;
}
void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) {
_followed_by_shuffled_operator = followed_by_shuffled_operator;
}
[[nodiscard]] virtual bool is_shuffled_operator() const { return false; }
[[nodiscard]] virtual bool require_shuffled_data_distribution() const { return false; }

protected:
OperatorPtr _child = nullptr;

bool _is_closed;
bool _followed_by_shuffled_join = false;
bool _followed_by_shuffled_operator = false;
};

class PipelineXLocalStateBase {
Expand Down Expand Up @@ -477,8 +480,6 @@ class DataSinkOperatorXBase : public OperatorBase {
[[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const;

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
Expand Down Expand Up @@ -663,8 +664,6 @@ class OperatorXBase : public OperatorBase {
[[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) = 0;

[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }

Status close(RuntimeState* state) override;

[[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class PartitionedHashJoinProbeOperatorX final
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ class PartitionedHashJoinSinkOperatorX
bool require_shuffled_data_distribution() const override {
return _join_op != TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
}
bool is_shuffled_hash_join() const override {
bool is_shuffled_operator() const override {
return _join_distribution == TJoinDistributionType::PARTITIONED;
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/sort_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_is_analytic_sort) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
} else if (_merge_by_exchange) {
Expand Down
6 changes: 6 additions & 0 deletions be/src/pipeline/exec/union_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,12 @@ class UnionSinkOperatorX final : public DataSinkOperatorX<UnionSinkLocalState> {
}
}

bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; }

Expand Down
5 changes: 5 additions & 0 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
return Status::OK();
}
[[nodiscard]] int get_child_count() const { return _child_size; }
bool require_shuffled_data_distribution() const override {
return _followed_by_shuffled_operator;
}

bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; }

private:
bool _has_data(RuntimeState* state) const {
Expand Down
46 changes: 25 additions & 21 deletions be/src/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -667,7 +667,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const DescriptorTbl& descs, OperatorPtr parent,
int* node_idx, OperatorPtr* root,
PipelinePtr& cur_pipe, int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// propagate error case
if (*node_idx >= tnodes.size()) {
return Status::InternalError(
Expand All @@ -677,11 +677,11 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
const TPlanNode& tnode = tnodes[*node_idx];

int num_children = tnodes[*node_idx].num_children;
bool current_followed_by_shuffled_join = followed_by_shuffled_join;
bool current_followed_by_shuffled_operator = followed_by_shuffled_operator;
OperatorPtr op = nullptr;
RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], request, descs, op, cur_pipe,
parent == nullptr ? -1 : parent->node_id(), child_idx,
followed_by_shuffled_join));
followed_by_shuffled_operator));

// assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr));
if (parent != nullptr) {
Expand All @@ -691,7 +691,7 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
*root = op;
}
/**
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled hash join.
* `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators).
*
* For plan:
* LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2)
Expand All @@ -704,15 +704,15 @@ Status PipelineFragmentContext::_create_tree_helper(ObjectPool* pool,
auto require_shuffled_data_distribution =
cur_pipe->operators().empty() ? cur_pipe->sink()->require_shuffled_data_distribution()
: op->require_shuffled_data_distribution();
current_followed_by_shuffled_join =
(followed_by_shuffled_join || op->is_shuffled_hash_join()) &&
current_followed_by_shuffled_operator =
(followed_by_shuffled_operator || op->is_shuffled_operator()) &&
require_shuffled_data_distribution;

// rely on that tnodes is preorder of the plan
for (int i = 0; i < num_children; i++) {
++*node_idx;
RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, request, descs, op, node_idx, nullptr,
cur_pipe, i, current_followed_by_shuffled_join));
cur_pipe, i, current_followed_by_shuffled_operator));

// we are expecting a child, but have used all nodes
// this means we have been given a bad tree and must fail
Expand Down Expand Up @@ -753,13 +753,13 @@ Status PipelineFragmentContext::_add_local_exchange_impl(
* `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment.
* So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`.
*/
const bool followed_by_shuffled_join = operators.size() > idx
? operators[idx]->followed_by_shuffled_join()
: cur_pipe->sink()->followed_by_shuffled_join();
const bool followed_by_shuffled_operator =
operators.size() > idx ? operators[idx]->followed_by_shuffled_operator()
: cur_pipe->sink()->followed_by_shuffled_operator();
const bool should_disable_bucket_shuffle =
bucket_seq_to_instance_idx.empty() &&
shuffle_idx_to_instance_idx.find(-1) == shuffle_idx_to_instance_idx.end() &&
followed_by_shuffled_join;
followed_by_shuffled_operator;
sink.reset(new LocalExchangeSinkOperatorX(
sink_id, local_exchange_id,
should_disable_bucket_shuffle ? _total_instances : _num_instances,
Expand Down Expand Up @@ -1199,7 +1199,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
const DescriptorTbl& descs, OperatorPtr& op,
PipelinePtr& cur_pipe, int parent_idx,
int child_idx,
const bool followed_by_shuffled_join) {
const bool followed_by_shuffled_operator) {
// We directly construct the operator from Thrift because the given array is in the order of preorder traversal.
// Therefore, here we need to use a stack-like structure.
_pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx);
Expand Down Expand Up @@ -1321,15 +1321,15 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_join(false);
op->set_followed_by_shuffled_operator(false);
_require_bucket_distribution = true;
RETURN_IF_ERROR(new_pipe->add_operator(op));
RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op));
cur_pipe = new_pipe;
} else {
op.reset(new DistinctStreamingAggOperatorX(pool, next_operator_id(), tnode, descs,
_require_bucket_distribution));
op->set_followed_by_shuffled_join(followed_by_shuffled_join);
op->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand Down Expand Up @@ -1384,7 +1384,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new AggSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1434,8 +1434,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
} else {
op.reset(new HashJoinProbeOperatorX(pool, tnode, next_operator_id(), descs));
RETURN_IF_ERROR(cur_pipe->add_operator(op));
Expand All @@ -1456,8 +1456,8 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo

_pipeline_parent_map.push(op->node_id(), cur_pipe);
_pipeline_parent_map.push(op->node_id(), build_side_pipe);
sink->set_followed_by_shuffled_join(sink->is_shuffled_hash_join());
op->set_followed_by_shuffled_join(op->is_shuffled_hash_join());
sink->set_followed_by_shuffled_operator(sink->is_shuffled_operator());
op->set_followed_by_shuffled_operator(op->is_shuffled_operator());
}
_require_bucket_distribution =
_require_bucket_distribution || op->require_data_distribution();
Expand Down Expand Up @@ -1487,6 +1487,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::UNION_NODE: {
int child_count = tnode.num_children;
op.reset(new UnionSourceOperatorX(pool, tnode, next_operator_id(), descs));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
RETURN_IF_ERROR(cur_pipe->add_operator(op));

const auto downstream_pipeline_id = cur_pipe->id();
Expand All @@ -1498,6 +1499,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
_dag[downstream_pipeline_id].push_back(build_side_pipe->id());
DataSinkOperatorPtr sink;
sink.reset(new UnionSinkOperatorX(i, next_sink_operator_id(), pool, tnode, descs));
sink->set_followed_by_shuffled_operator(_require_bucket_distribution);
sink->set_dests_id({op->operator_id()});
RETURN_IF_ERROR(build_side_pipe->set_sink(sink));
RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get()));
Expand Down Expand Up @@ -1531,7 +1533,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
sink.reset(new SortSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
}
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand Down Expand Up @@ -1571,7 +1573,7 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
DataSinkOperatorPtr sink;
sink.reset(new AnalyticSinkOperatorX(pool, next_sink_operator_id(), tnode, descs,
_require_bucket_distribution));
sink->set_followed_by_shuffled_join(followed_by_shuffled_join);
sink->set_followed_by_shuffled_operator(followed_by_shuffled_operator);
_require_bucket_distribution =
_require_bucket_distribution || sink->require_data_distribution();
sink->set_dests_id({op->operator_id()});
Expand All @@ -1582,11 +1584,13 @@ Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNo
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(
pool, tnode, descs, op, cur_pipe, parent_idx, child_idx));
op->set_followed_by_shuffled_operator(_require_bucket_distribution);
break;
}
case TPlanNodeType::REPEAT_NODE: {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/pipeline_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,10 @@ class PipelineTask {
bool is_finalized() const { return _finalized; }

void clear_blocking_state(bool wake_up_by_downstream = false) {
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
_state->get_query_ctx()->get_execution_dependency()->set_always_ready();
// We use a lock to assure all dependencies are not deconstructed here.
std::unique_lock<std::mutex> lc(_dependency_lock);
_wake_up_by_downstream = _wake_up_by_downstream || wake_up_by_downstream;
if (!_finalized) {
_execution_dep->set_always_ready();
for (auto* dep : _filter_dependencies) {
Expand Down
Loading