Skip to content

Commit 5bd33fc

Browse files
## Proposed changes pick #41292 #41350 #41589 #41628 #41743 #41601 #41667 #41751 <!--Describe your changes.--> --------- Co-authored-by: Pxl <pxl290@qq.com>
1 parent e562162 commit 5bd33fc

27 files changed

+188
-90
lines changed

be/src/exprs/bloom_filter_func.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -151,19 +151,19 @@ class BloomFilterFuncBase : public RuntimeFilterFuncBase {
151151
}
152152

153153
Status merge(BloomFilterFuncBase* bloomfilter_func) {
154+
DCHECK(bloomfilter_func != nullptr);
155+
DCHECK(bloomfilter_func->_bloom_filter != nullptr);
154156
// If `_inited` is false, there is no memory allocated in bloom filter and this is the first
155157
// call for `merge` function. So we just reuse this bloom filter, and we don't need to
156158
// allocate memory again.
157159
if (!_inited) {
158160
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
159161
DCHECK(_bloom_filter == nullptr);
160-
DCHECK(bloomfilter_func != nullptr);
161162
_bloom_filter = bloomfilter_func->_bloom_filter;
162163
_bloom_filter_alloced = other_func->_bloom_filter_alloced;
163164
_inited = true;
164165
return Status::OK();
165166
}
166-
DCHECK(bloomfilter_func != nullptr);
167167
auto* other_func = static_cast<BloomFilterFuncBase*>(bloomfilter_func);
168168
if (_bloom_filter_alloced != other_func->_bloom_filter_alloced) {
169169
return Status::InternalError(

be/src/exprs/runtime_filter.cpp

+6-3
Original file line numberDiff line numberDiff line change
@@ -468,10 +468,10 @@ class RuntimePredicateWrapper {
468468
const TExpr& probe_expr);
469469

470470
Status merge(const RuntimePredicateWrapper* wrapper) {
471-
if (is_ignored() || wrapper->is_ignored()) {
472-
_context->ignored = true;
471+
if (wrapper->is_ignored()) {
473472
return Status::OK();
474473
}
474+
_context->ignored = false;
475475

476476
bool can_not_merge_in_or_bloom =
477477
_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
@@ -489,7 +489,10 @@ class RuntimePredicateWrapper {
489489

490490
switch (_filter_type) {
491491
case RuntimeFilterType::IN_FILTER: {
492-
// try insert set
492+
if (!_context->hybrid_set) {
493+
_context->ignored = true;
494+
return Status::OK();
495+
}
493496
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
494497
if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
495498
_context->ignored = true;

be/src/exprs/runtime_filter_slots.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -98,12 +98,16 @@ class VRuntimeFilterSlots {
9898
return Status::OK();
9999
}
100100

101+
Status ignore_all_filters() {
102+
for (auto filter : _runtime_filters) {
103+
filter->set_ignored();
104+
}
105+
return Status::OK();
106+
}
107+
101108
Status init_filters(RuntimeState* state, uint64_t local_hash_table_size) {
102109
// process IN_OR_BLOOM_FILTER's real type
103110
for (auto* filter : _runtime_filters) {
104-
if (filter->get_ignored()) {
105-
continue;
106-
}
107111
if (filter->type() == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
108112
get_real_size(filter, local_hash_table_size) > state->runtime_filter_max_in_num()) {
109113
RETURN_IF_ERROR(filter->change_to_bloom_filter());
@@ -140,7 +144,7 @@ class VRuntimeFilterSlots {
140144
}
141145

142146
// publish runtime filter
143-
Status publish(bool publish_local = false) {
147+
Status publish(bool publish_local) {
144148
for (auto& pair : _runtime_filters_map) {
145149
for (auto& filter : pair.second) {
146150
RETURN_IF_ERROR(filter->publish(publish_local));

be/src/pipeline/exec/exchange_sink_operator.cpp

+21-14
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
189189
PUniqueId id;
190190
id.set_hi(_state->query_id().hi);
191191
id.set_lo(_state->query_id().lo);
192-
_sink_buffer = std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
193-
id, p._dest_node_id, _sender_id, _state->be_number(), state, this);
194192

195-
register_channels(_sink_buffer.get());
196-
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
197-
"ExchangeSinkQueueDependency", true);
198-
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
193+
if (!only_local_exchange) {
194+
_sink_buffer = std::make_unique<ExchangeSinkBuffer<ExchangeSinkLocalState>>(
195+
id, p._dest_node_id, _sender_id, _state->be_number(), state, this);
196+
register_channels(_sink_buffer.get());
197+
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
198+
"ExchangeSinkQueueDependency", true);
199+
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
200+
_finish_dependency->block();
201+
}
202+
199203
if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
200204
!only_local_exchange) {
201205
_broadcast_dependency = Dependency::create_shared(
@@ -304,7 +308,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
304308
fmt::format("Crc32HashPartitioner({})", _partition_count));
305309
}
306310

307-
_finish_dependency->block();
308311
if (_part_type == TPartitionType::HASH_PARTITIONED ||
309312
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
310313
_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
@@ -638,8 +641,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
638641
final_st = st;
639642
}
640643
}
641-
local_state._sink_buffer->set_should_stop();
642-
return final_st;
644+
if (local_state._sink_buffer) {
645+
local_state._sink_buffer->set_should_stop();
646+
}
643647
}
644648
return final_st;
645649
}
@@ -725,11 +729,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
725729
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
726730
fmt::memory_buffer debug_string_buffer;
727731
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
728-
fmt::format_to(debug_string_buffer,
729-
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
730-
"_reach_limit: {}",
731-
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
732-
_sink_buffer->_is_finishing.load(), _reach_limit.load());
732+
if (_sink_buffer) {
733+
fmt::format_to(
734+
debug_string_buffer,
735+
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
736+
"_reach_limit: {}",
737+
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
738+
_sink_buffer->_is_finishing.load(), _reach_limit.load());
739+
}
733740
return fmt::to_string(debug_string_buffer);
734741
}
735742

be/src/pipeline/exec/exchange_sink_operator.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
9696

9797
std::vector<Dependency*> dependencies() const override {
9898
std::vector<Dependency*> dep_vec;
99-
dep_vec.push_back(_queue_dependency.get());
99+
if (_queue_dependency) {
100+
dep_vec.push_back(_queue_dependency.get());
101+
}
100102
if (_broadcast_dependency) {
101103
dep_vec.push_back(_broadcast_dependency.get());
102104
}

be/src/pipeline/exec/hashjoin_build_sink.cpp

+27-15
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include "exprs/bloom_filter_func.h"
2323
#include "pipeline/exec/hashjoin_probe_operator.h"
2424
#include "pipeline/exec/operator.h"
25+
#include "pipeline/pipeline_x/pipeline_x_task.h"
2526
#include "vec/data_types/data_type_nullable.h"
2627
#include "vec/exec/join/vhash_join_node.h"
2728
#include "vec/utils/template_helpers.hpp"
@@ -122,6 +123,9 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState* state) {
122123
}
123124

124125
Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_status) {
126+
if (_closed) {
127+
return Status::OK();
128+
}
125129
auto p = _parent->cast<HashJoinBuildSinkOperatorX>();
126130
Defer defer {[&]() {
127131
if (_should_build_hash_table && p._shared_hashtable_controller) {
@@ -130,25 +134,30 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
130134
}};
131135

132136
if (!_runtime_filter_slots || _runtime_filters.empty() || state->is_cancelled()) {
133-
return Status::OK();
137+
return Base::close(state, exec_status);
134138
}
135-
auto* block = _shared_state->build_block.get();
136-
uint64_t hash_table_size = block ? block->rows() : 0;
137-
{
138-
SCOPED_TIMER(_runtime_filter_init_timer);
139-
if (_should_build_hash_table) {
140-
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
139+
140+
if (state->get_task()->wake_up_by_downstream()) {
141+
RETURN_IF_ERROR(_runtime_filter_slots->send_filter_size(state, 0, _finish_dependency));
142+
RETURN_IF_ERROR(_runtime_filter_slots->ignore_all_filters());
143+
} else {
144+
auto* block = _shared_state->build_block.get();
145+
uint64_t hash_table_size = block ? block->rows() : 0;
146+
{
147+
SCOPED_TIMER(_runtime_filter_init_timer);
148+
if (_should_build_hash_table) {
149+
RETURN_IF_ERROR(_runtime_filter_slots->init_filters(state, hash_table_size));
150+
}
151+
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
152+
}
153+
if (_should_build_hash_table && hash_table_size > 1) {
154+
SCOPED_TIMER(_runtime_filter_compute_timer);
155+
_runtime_filter_slots->insert(block);
141156
}
142-
RETURN_IF_ERROR(_runtime_filter_slots->ignore_filters(state));
143-
}
144-
if (_should_build_hash_table && hash_table_size > 1) {
145-
SCOPED_TIMER(_runtime_filter_compute_timer);
146-
_runtime_filter_slots->insert(block);
147157
}
148-
149158
SCOPED_TIMER(_publish_runtime_filter_timer);
150159
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
151-
return Status::OK();
160+
return Base::close(state, exec_status);
152161
}
153162

154163
bool HashJoinBuildSinkLocalState::build_unique() const {
@@ -519,6 +528,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
519528
SCOPED_TIMER(local_state.exec_time_counter());
520529
COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows());
521530

531+
local_state._eos = eos;
522532
if (local_state._should_build_hash_table) {
523533
// If eos or have already met a null value using short-circuit strategy, we do not need to pull
524534
// data from probe side.
@@ -582,6 +592,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
582592
local_state.process_build_block(state, (*local_state._shared_state->build_block)));
583593
if (_shared_hashtable_controller) {
584594
_shared_hash_table_context->status = Status::OK();
595+
_shared_hash_table_context->complete_build_stage = true;
585596
// arena will be shared with other instances.
586597
_shared_hash_table_context->arena = local_state._shared_state->arena;
587598
_shared_hash_table_context->hash_table_variants =
@@ -594,7 +605,8 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
594605
local_state._runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
595606
_shared_hashtable_controller->signal(node_id());
596607
}
597-
} else if (!local_state._should_build_hash_table) {
608+
} else if (!local_state._should_build_hash_table &&
609+
_shared_hash_table_context->complete_build_stage) {
598610
DCHECK(_shared_hashtable_controller != nullptr);
599611
DCHECK(_shared_hash_table_context != nullptr);
600612
// the instance which is not build hash table, it's should wait the signal of hash table build finished.

be/src/pipeline/exec/multi_cast_data_stream_sink.h

+10-5
Original file line numberDiff line numberDiff line change
@@ -62,15 +62,15 @@ class MultiCastDataStreamSinkOperatorX final
6262
using Base = DataSinkOperatorX<MultiCastDataStreamSinkLocalState>;
6363

6464
public:
65-
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources,
66-
const int cast_sender_count, ObjectPool* pool,
65+
MultiCastDataStreamSinkOperatorX(int sink_id, std::vector<int>& sources, ObjectPool* pool,
6766
const TMultiCastDataStreamSink& sink,
6867
const RowDescriptor& row_desc)
6968
: Base(sink_id, -1, sources),
7069
_pool(pool),
7170
_row_desc(row_desc),
72-
_cast_sender_count(cast_sender_count),
73-
_sink(sink) {}
71+
_cast_sender_count(sources.size()),
72+
_sink(sink),
73+
_num_dests(sources.size()) {}
7474
~MultiCastDataStreamSinkOperatorX() override = default;
7575

7676
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override {
@@ -103,14 +103,19 @@ class MultiCastDataStreamSinkOperatorX final
103103
}
104104

105105
const TMultiCastDataStreamSink& sink_node() { return _sink; }
106+
bool count_down_destination() override {
107+
DCHECK_GT(_num_dests, 0);
108+
return _num_dests.fetch_sub(1) == 1;
109+
}
106110

107111
private:
108112
friend class MultiCastDataStreamSinkLocalState;
109113
ObjectPool* _pool;
110114
RowDescriptor _row_desc;
111-
const int _cast_sender_count;
115+
const size_t _cast_sender_count;
112116
const TMultiCastDataStreamSink& _sink;
113117
friend class MultiCastDataStreamSinkLocalState;
118+
std::atomic<size_t> _num_dests;
114119
};
115120

116121
} // namespace doris::pipeline

be/src/pipeline/exec/partitioned_aggregation_sink_operator.h

-1
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ class PartitionedAggSinkLocalState
258258

259259
std::unique_ptr<RuntimeState> _runtime_state;
260260

261-
bool _eos = false;
262261
std::shared_ptr<Dependency> _finish_dependency;
263262

264263
// temp structures during spilling

be/src/pipeline/exec/spill_sort_sink_operator.h

-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState<SpillSortSha
5555

5656
RuntimeProfile::Counter* _spill_merge_sort_timer = nullptr;
5757

58-
bool _eos = false;
5958
vectorized::SpillStreamSPtr _spilling_stream;
6059
std::shared_ptr<Dependency> _finish_dependency;
6160
};

be/src/pipeline/pipeline.cpp

+12-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
#include <utility>
2323

2424
#include "pipeline/exec/operator.h"
25+
#include "pipeline/pipeline_task.h"
2526

2627
namespace doris::pipeline {
2728

@@ -99,4 +100,14 @@ Status Pipeline::set_sink(DataSinkOperatorXPtr& sink) {
99100
return Status::OK();
100101
}
101102

102-
} // namespace doris::pipeline
103+
void Pipeline::make_all_runnable() {
104+
if (_sink_x->count_down_destination()) {
105+
for (auto* task : _tasks) {
106+
if (task) {
107+
task->clear_blocking_state(true);
108+
}
109+
}
110+
}
111+
}
112+
113+
} // namespace doris::pipeline

be/src/pipeline/pipeline.h

+16-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
5050
std::weak_ptr<PipelineFragmentContext> context)
5151
: _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) {
5252
_init_profile();
53+
_tasks.resize(_num_tasks, nullptr);
5354
}
5455

5556
void add_dependency(std::shared_ptr<Pipeline>& pipeline) {
@@ -155,14 +156,24 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
155156
void set_children(std::shared_ptr<Pipeline> child) { _children.push_back(child); }
156157
void set_children(std::vector<std::shared_ptr<Pipeline>> children) { _children = children; }
157158

158-
void incr_created_tasks() { _num_tasks_created++; }
159+
void incr_created_tasks(int i, PipelineTask* task) {
160+
_num_tasks_created++;
161+
_num_tasks_running++;
162+
DCHECK_LT(i, _tasks.size());
163+
_tasks[i] = task;
164+
}
165+
166+
void make_all_runnable();
167+
159168
void set_num_tasks(int num_tasks) {
160169
_num_tasks = num_tasks;
170+
_tasks.resize(_num_tasks, nullptr);
161171
for (auto& op : operatorXs) {
162172
op->set_parallel_tasks(_num_tasks);
163173
}
164174
}
165175
int num_tasks() const { return _num_tasks; }
176+
bool close_task() { return _num_tasks_running.fetch_sub(1) == 1; }
166177

167178
std::string debug_string() {
168179
fmt::memory_buffer debug_string_buffer;
@@ -243,6 +254,10 @@ class Pipeline : public std::enable_shared_from_this<Pipeline> {
243254
int _num_tasks = 1;
244255
// How many tasks are already created?
245256
std::atomic<int> _num_tasks_created = 0;
257+
// How many tasks are already created and not finished?
258+
std::atomic<int> _num_tasks_running = 0;
259+
// Tasks in this pipeline.
260+
std::vector<PipelineTask*> _tasks;
246261
};
247262

248263
} // namespace doris::pipeline

be/src/pipeline/pipeline_fragment_context.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ void PipelineFragmentContext::close_if_prepare_failed(Status /*st*/) {
813813
DCHECK(!task->is_pending_finish());
814814
WARN_IF_ERROR(task->close(Status::OK()),
815815
fmt::format("Query {} closed since prepare failed", print_id(_query_id)));
816-
close_a_pipeline();
816+
close_a_pipeline(task->pipeline_id());
817817
}
818818
}
819819

@@ -960,7 +960,7 @@ void PipelineFragmentContext::_close_fragment_instance() {
960960
std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
961961
}
962962

963-
void PipelineFragmentContext::close_a_pipeline() {
963+
void PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
964964
std::lock_guard<std::mutex> l(_task_mutex);
965965
g_pipeline_tasks_count << -1;
966966
++_closed_tasks;

be/src/pipeline/pipeline_fragment_context.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ class PipelineFragmentContext : public TaskExecutionContext {
110110

111111
[[nodiscard]] int get_fragment_id() const { return _fragment_id; }
112112

113-
void close_a_pipeline();
113+
virtual void close_a_pipeline(PipelineId pipeline_id);
114114

115115
virtual void clear_finished_tasks() {}
116116

0 commit comments

Comments
 (0)