Skip to content

Commit 6daf531

Browse files
MryangeHappenLee
authored andcommitted
[code](pipelineX) refine some pipelineX code (apache#28570)
1 parent f8c90de commit 6daf531

14 files changed

+130
-82
lines changed

be/src/pipeline/exec/exchange_sink_operator.cpp

+6-8
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ bool ExchangeSinkLocalState::transfer_large_data_by_brpc() const {
100100
}
101101

102102
Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
103-
RETURN_IF_ERROR(PipelineXSinkLocalState<>::init(state, info));
103+
RETURN_IF_ERROR(Base::init(state, info));
104104
SCOPED_TIMER(exec_time_counter());
105105
SCOPED_TIMER(_open_timer);
106106
_sender_id = info.sender_id;
@@ -174,9 +174,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
174174
id, p._dest_node_id, _sender_id, _state->be_number(), state->get_query_ctx());
175175

176176
register_channels(_sink_buffer.get());
177-
178-
_exchange_sink_dependency = AndDependency::create_shared(
179-
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
177+
auto* _exchange_sink_dependency = _dependency;
180178
_queue_dependency = ExchangeSinkQueueDependency::create_shared(
181179
_parent->operator_id(), _parent->node_id(), state->get_query_ctx());
182180
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
@@ -237,7 +235,7 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
237235
}
238236

239237
Status ExchangeSinkLocalState::open(RuntimeState* state) {
240-
RETURN_IF_ERROR(PipelineXSinkLocalState<>::open(state));
238+
RETURN_IF_ERROR(Base::open(state));
241239
auto& p = _parent->cast<ExchangeSinkOperatorX>();
242240
if (p._part_type == TPartitionType::HASH_PARTITIONED ||
243241
p._part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
@@ -522,8 +520,7 @@ Status ExchangeSinkOperatorX::try_close(RuntimeState* state, Status exec_status)
522520

523521
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
524522
fmt::memory_buffer debug_string_buffer;
525-
fmt::format_to(debug_string_buffer, "{}",
526-
PipelineXSinkLocalState<>::debug_string(indentation_level));
523+
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
527524
fmt::format_to(debug_string_buffer, ", Sink Buffer: (_should_stop = {}, _busy_channels = {})",
528525
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load());
529526
return fmt::to_string(debug_string_buffer);
@@ -536,6 +533,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
536533
SCOPED_TIMER(exec_time_counter());
537534
SCOPED_TIMER(_close_timer);
538535
COUNTER_UPDATE(_wait_queue_timer, _queue_dependency->watcher_elapse_time());
536+
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
539537
if (_broadcast_dependency) {
540538
COUNTER_UPDATE(_wait_broadcast_buffer_timer, _broadcast_dependency->watcher_elapse_time());
541539
}
@@ -545,7 +543,7 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
545543
}
546544
_sink_buffer->update_profile(profile());
547545
_sink_buffer->close();
548-
return PipelineXSinkLocalState<>::close(state, exec_status);
546+
return Base::close(state, exec_status);
549547
}
550548

551549
} // namespace doris::pipeline

be/src/pipeline/exec/exchange_sink_operator.h

+11-5
Original file line numberDiff line numberDiff line change
@@ -144,20 +144,25 @@ class LocalExchangeChannelDependency final : public Dependency {
144144
// TODO(gabriel): blocked by memory
145145
};
146146

147-
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
147+
class ExchangeSinkLocalState final : public PipelineXSinkLocalState<AndDependency> {
148148
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
149+
using Base = PipelineXSinkLocalState<AndDependency>;
149150

150151
public:
151152
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
152-
: PipelineXSinkLocalState<>(parent, state),
153+
: Base(parent, state),
153154
current_channel_idx(0),
154155
only_local_exchange(false),
155-
_serializer(this) {}
156+
_serializer(this) {
157+
_finish_dependency = std::make_shared<FinishDependency>(
158+
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
159+
state->get_query_ctx());
160+
}
156161

157162
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
158163
Status open(RuntimeState* state) override;
159164
Status close(RuntimeState* state, Status exec_status) override;
160-
Dependency* dependency() override { return _exchange_sink_dependency.get(); }
165+
Dependency* finishdependency() override { return _finish_dependency.get(); }
161166
Status serialize_block(vectorized::Block* src, PBlock* dest, int num_receivers = 1);
162167
void register_channels(pipeline::ExchangeSinkBuffer<ExchangeSinkLocalState>* buffer);
163168
Status get_next_available_buffer(vectorized::BroadcastPBlockHolder** holder);
@@ -231,11 +236,12 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
231236
vectorized::BlockSerializer<ExchangeSinkLocalState> _serializer;
232237

233238
std::shared_ptr<ExchangeSinkQueueDependency> _queue_dependency;
234-
std::shared_ptr<AndDependency> _exchange_sink_dependency;
235239
std::shared_ptr<BroadcastDependency> _broadcast_dependency;
236240
std::vector<std::shared_ptr<LocalExchangeChannelDependency>> _local_channels_dependency;
237241
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
238242
int _partition_count;
243+
244+
std::shared_ptr<Dependency> _finish_dependency;
239245
};
240246

241247
class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalState> {

be/src/pipeline/exec/exchange_source_operator.cpp

+6-8
Original file line numberDiff line numberDiff line change
@@ -41,12 +41,11 @@ bool ExchangeSourceOperator::is_pending_finish() const {
4141
}
4242

4343
ExchangeLocalState::ExchangeLocalState(RuntimeState* state, OperatorXBase* parent)
44-
: PipelineXLocalState<>(state, parent), num_rows_skipped(0), is_ready(false) {}
44+
: Base(state, parent), num_rows_skipped(0), is_ready(false) {}
4545

4646
std::string ExchangeLocalState::debug_string(int indentation_level) const {
4747
fmt::memory_buffer debug_string_buffer;
48-
fmt::format_to(debug_string_buffer, "{}",
49-
PipelineXLocalState<>::debug_string(indentation_level));
48+
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
5049
fmt::format_to(debug_string_buffer, ", Queues: (");
5150
const auto& queues = stream_recvr->sender_queues();
5251
for (size_t i = 0; i < queues.size(); i++) {
@@ -68,15 +67,14 @@ std::string ExchangeSourceOperatorX::debug_string(int indentation_level) const {
6867
}
6968

7069
Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
71-
RETURN_IF_ERROR(PipelineXLocalState<>::init(state, info));
70+
RETURN_IF_ERROR(Base::init(state, info));
7271
SCOPED_TIMER(exec_time_counter());
7372
SCOPED_TIMER(_open_timer);
7473
auto& p = _parent->cast<ExchangeSourceOperatorX>();
7574
stream_recvr = state->exec_env()->vstream_mgr()->create_recvr(
7675
state, p.input_row_desc(), state->fragment_instance_id(), p.node_id(), p.num_senders(),
7776
profile(), p.is_merging(), p.sub_plan_query_statistics_recvr());
78-
source_dependency = AndDependency::create_shared(_parent->operator_id(), _parent->node_id(),
79-
state->get_query_ctx());
77+
auto* source_dependency = _dependency;
8078
const auto& queues = stream_recvr->sender_queues();
8179
deps.resize(queues.size());
8280
metrics.resize(queues.size());
@@ -101,7 +99,7 @@ Status ExchangeLocalState::init(RuntimeState* state, LocalStateInfo& info) {
10199
Status ExchangeLocalState::open(RuntimeState* state) {
102100
SCOPED_TIMER(exec_time_counter());
103101
SCOPED_TIMER(_open_timer);
104-
RETURN_IF_ERROR(PipelineXLocalState<>::open(state));
102+
RETURN_IF_ERROR(Base::open(state));
105103
return Status::OK();
106104
}
107105

@@ -215,7 +213,7 @@ Status ExchangeLocalState::close(RuntimeState* state) {
215213
if (_parent->cast<ExchangeSourceOperatorX>()._is_merging) {
216214
vsort_exec_exprs.close(state);
217215
}
218-
return PipelineXLocalState<>::close(state);
216+
return Base::close(state);
219217
}
220218

221219
Status ExchangeSourceOperatorX::close(RuntimeState* state) {

be/src/pipeline/exec/exchange_source_operator.h

+4-3
Original file line numberDiff line numberDiff line change
@@ -71,21 +71,22 @@ struct ExchangeDataDependency final : public Dependency {
7171
};
7272

7373
class ExchangeSourceOperatorX;
74-
class ExchangeLocalState final : public PipelineXLocalState<> {
74+
class ExchangeLocalState final : public PipelineXLocalState<AndDependency> {
7575
ENABLE_FACTORY_CREATOR(ExchangeLocalState);
76+
77+
public:
78+
using Base = PipelineXLocalState<AndDependency>;
7679
ExchangeLocalState(RuntimeState* state, OperatorXBase* parent);
7780

7881
Status init(RuntimeState* state, LocalStateInfo& info) override;
7982
Status open(RuntimeState* state) override;
8083
Status close(RuntimeState* state) override;
81-
Dependency* dependency() override { return source_dependency.get(); }
8284
std::string debug_string(int indentation_level) const override;
8385
std::shared_ptr<doris::vectorized::VDataStreamRecvr> stream_recvr;
8486
doris::vectorized::VSortExecExprs vsort_exec_exprs;
8587
int64_t num_rows_skipped;
8688
bool is_ready;
8789

88-
std::shared_ptr<AndDependency> source_dependency;
8990
std::vector<std::shared_ptr<ExchangeDataDependency>> deps;
9091

9192
std::vector<RuntimeProfile::Counter*> metrics;

be/src/pipeline/exec/multi_cast_data_stream_source.cpp

+7-3
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,13 @@ RuntimeProfile* MultiCastDataStreamerSourceOperator::get_runtime_profile() const
128128
MultiCastDataStreamSourceLocalState::MultiCastDataStreamSourceLocalState(RuntimeState* state,
129129
OperatorXBase* parent)
130130
: Base(state, parent),
131-
vectorized::RuntimeFilterConsumer(
132-
static_cast<Parent*>(parent)->dest_id_from_sink(), parent->runtime_filter_descs(),
133-
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {};
131+
vectorized::RuntimeFilterConsumer(static_cast<Parent*>(parent)->dest_id_from_sink(),
132+
parent->runtime_filter_descs(),
133+
static_cast<Parent*>(parent)->_row_desc(), _conjuncts) {
134+
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
135+
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
136+
state->get_query_ctx());
137+
};
134138

135139
Status MultiCastDataStreamSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
136140
RETURN_IF_ERROR(Base::init(state, info));

be/src/pipeline/exec/multi_cast_data_stream_source.h

+3
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,11 @@ class MultiCastDataStreamSourceLocalState final
120120

121121
friend class MultiCastDataStreamerSourceOperatorX;
122122

123+
RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); }
124+
123125
private:
124126
vectorized::VExprContextSPtrs _output_expr_contexts;
127+
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
125128
};
126129

127130
class MultiCastDataStreamerSourceOperatorX final

be/src/pipeline/exec/scan_operator.cpp

+12-2
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,14 @@ std::string ScanOperator::debug_string() const {
100100

101101
template <typename Derived>
102102
ScanLocalState<Derived>::ScanLocalState(RuntimeState* state, OperatorXBase* parent)
103-
: ScanLocalStateBase(state, parent) {}
103+
: ScanLocalStateBase(state, parent) {
104+
_finish_dependency = std::make_shared<FinishDependency>(
105+
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
106+
state->get_query_ctx());
107+
_filter_dependency = std::make_shared<RuntimeFilterDependency>(
108+
parent->operator_id(), parent->node_id(), parent->get_name() + "_FILTER_DEPENDENCY",
109+
state->get_query_ctx());
110+
}
104111

105112
template <typename Derived>
106113
bool ScanLocalState<Derived>::ready_to_read() {
@@ -1311,6 +1318,9 @@ Status ScanLocalState<Derived>::_init_profile() {
13111318

13121319
_max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT);
13131320

1321+
_wait_for_finish_dependency_timer =
1322+
ADD_TIMER(_runtime_profile, "WaitForPendingFinishDependency");
1323+
13141324
return Status::OK();
13151325
}
13161326

@@ -1442,7 +1452,7 @@ Status ScanLocalState<Derived>::close(RuntimeState* state) {
14421452
_scanner_ctx->clear_and_join(reinterpret_cast<ScanLocalStateBase*>(this), state);
14431453
}
14441454
COUNTER_SET(_wait_for_dependency_timer, _scan_dependency->watcher_elapse_time());
1445-
1455+
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
14461456
return PipelineXLocalState<>::close(state);
14471457
}
14481458

be/src/pipeline/exec/scan_operator.h

+9
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ class ScanLocalStateBase : public PipelineXLocalState<>, public vectorized::Runt
171171
RuntimeProfile::Counter* _wait_for_scanner_done_timer = nullptr;
172172
// time of prefilter input block from scanner
173173
RuntimeProfile::Counter* _wait_for_eos_timer = nullptr;
174+
175+
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
174176
};
175177

176178
template <typename LocalStateType>
@@ -211,6 +213,9 @@ class ScanLocalState : public ScanLocalStateBase {
211213

212214
Dependency* dependency() override { return _scan_dependency.get(); }
213215

216+
RuntimeFilterDependency* filterdependency() override { return _filter_dependency.get(); };
217+
Dependency* finishdependency() override { return _finish_dependency.get(); }
218+
214219
protected:
215220
template <typename LocalStateType>
216221
friend class ScanOperatorX;
@@ -405,6 +410,10 @@ class ScanLocalState : public ScanLocalStateBase {
405410
std::atomic<bool> _eos = false;
406411

407412
std::mutex _block_lock;
413+
414+
std::shared_ptr<RuntimeFilterDependency> _filter_dependency;
415+
416+
std::shared_ptr<Dependency> _finish_dependency;
408417
};
409418

410419
template <typename LocalStateType>

0 commit comments

Comments
 (0)