Skip to content

Commit 54b4fa3

Browse files
authored
[pick](branch-3.0) pick #39982 #40576 #41555 #41592 #41676 #41740 (#41808)
1 parent d80031f commit 54b4fa3

File tree

93 files changed

+442
-642
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

93 files changed

+442
-642
lines changed

be/src/common/config.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -1717,7 +1717,9 @@ bool init(const char* conf_file, bool fill_conf_map, bool must_exist, bool set_t
17171717

17181718
if (config::is_cloud_mode()) {
17191719
auto st = config::set_config("enable_file_cache", "true", true, true);
1720-
LOG(INFO) << "set config enable_file_cache " << "true" << " " << st;
1720+
LOG(INFO) << "set config enable_file_cache "
1721+
<< "true"
1722+
<< " " << st;
17211723
}
17221724

17231725
return true;

be/src/pipeline/exec/aggregation_sink_operator.cpp

+4-7
Original file line numberDiff line numberDiff line change
@@ -769,12 +769,13 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
769769
return Status::OK();
770770
}
771771

772-
Status AggSinkOperatorX::prepare(RuntimeState* state) {
772+
Status AggSinkOperatorX::open(RuntimeState* state) {
773+
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state));
773774
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
774775
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
775776
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
776777
RETURN_IF_ERROR(vectorized::VExpr::prepare(
777-
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc()));
778+
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc()));
778779

779780
int j = _probe_expr_ctxs.size();
780781
for (int i = 0; i < j; ++i) {
@@ -789,7 +790,7 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
789790
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
790791
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
791792
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
792-
state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc(),
793+
state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc(),
793794
intermediate_slot_desc, output_slot_desc));
794795
_aggregate_evaluators[i]->set_version(state->be_exec_version());
795796
}
@@ -824,10 +825,6 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
824825
RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output(
825826
_probe_expr_ctxs.size(), _aggregate_evaluators, _agg_fn_output_row_descriptor));
826827
}
827-
return Status::OK();
828-
}
829-
830-
Status AggSinkOperatorX::open(RuntimeState* state) {
831828
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
832829

833830
for (auto& _aggregate_evaluator : _aggregate_evaluators) {

be/src/pipeline/exec/aggregation_sink_operator.h

+2-3
Original file line numberDiff line numberDiff line change
@@ -137,19 +137,18 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
137137

138138
Status init(const TPlanNode& tnode, RuntimeState* state) override;
139139

140-
Status prepare(RuntimeState* state) override;
141140
Status open(RuntimeState* state) override;
142141

143142
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
144143

145144
DataDistribution required_data_distribution() const override {
146145
if (_probe_expr_ctxs.empty()) {
147-
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child_x
146+
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
148147
->ignore_data_distribution()
149148
? DataDistribution(ExchangeType::PASSTHROUGH)
150149
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
151150
}
152-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
151+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
153152
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
154153
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
155154
}

be/src/pipeline/exec/analytic_sink_operator.cpp

+4-7
Original file line numberDiff line numberDiff line change
@@ -231,13 +231,14 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
231231
return Status::OK();
232232
}
233233

234-
Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
234+
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
235+
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
235236
for (const auto& ctx : _agg_expr_ctxs) {
236-
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc()));
237+
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc()));
237238
}
238239
if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) {
239240
vector<TTupleId> tuple_ids;
240-
tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
241+
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
241242
tuple_ids.push_back(_buffered_tuple_id);
242243
RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
243244
if (!_partition_by_eq_expr_ctxs.empty()) {
@@ -249,10 +250,6 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
249250
vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc));
250251
}
251252
}
252-
return Status::OK();
253-
}
254-
255-
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
256253
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state));
257254
RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
258255
for (size_t i = 0; i < _agg_functions_size; ++i) {

be/src/pipeline/exec/analytic_sink_operator.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -74,15 +74,14 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt
7474

7575
Status init(const TPlanNode& tnode, RuntimeState* state) override;
7676

77-
Status prepare(RuntimeState* state) override;
7877
Status open(RuntimeState* state) override;
7978

8079
Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
8180
DataDistribution required_data_distribution() const override {
8281
if (_partition_by_eq_expr_ctxs.empty()) {
8382
return {ExchangeType::PASSTHROUGH};
8483
} else if (_order_by_eq_expr_ctxs.empty()) {
85-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
84+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
8685
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
8786
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
8887
}

be/src/pipeline/exec/analytic_source_operator.cpp

+4-9
Original file line numberDiff line numberDiff line change
@@ -562,15 +562,15 @@ Status AnalyticLocalState::close(RuntimeState* state) {
562562
return PipelineXLocalState<AnalyticSharedState>::close(state);
563563
}
564564

565-
Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
566-
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state));
567-
DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
565+
Status AnalyticSourceOperatorX::open(RuntimeState* state) {
566+
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
567+
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
568568
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
569569
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
570570
for (size_t i = 0; i < _agg_functions.size(); ++i) {
571571
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i];
572572
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
573-
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
573+
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
574574
intermediate_slot_desc, output_slot_desc));
575575
_agg_functions[i]->set_version(state->be_exec_version());
576576
_change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
@@ -597,11 +597,6 @@ Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
597597
alignment_of_next_state * alignment_of_next_state;
598598
}
599599
}
600-
return Status::OK();
601-
}
602-
603-
Status AnalyticSourceOperatorX::open(RuntimeState* state) {
604-
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
605600
for (auto* agg_function : _agg_functions) {
606601
RETURN_IF_ERROR(agg_function->open(state));
607602
}

be/src/pipeline/exec/analytic_source_operator.h

-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,6 @@ class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
122122
bool is_source() const override { return true; }
123123

124124
Status init(const TPlanNode& tnode, RuntimeState* state) override;
125-
Status prepare(RuntimeState* state) override;
126125
Status open(RuntimeState* state) override;
127126

128127
private:

be/src/pipeline/exec/datagen_operator.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
5050
return Status::OK();
5151
}
5252

53-
Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
54-
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
53+
Status DataGenSourceOperatorX::open(RuntimeState* state) {
54+
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state));
5555
// get tuple desc
5656
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
5757

be/src/pipeline/exec/datagen_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
5252
const DescriptorTbl& descs);
5353

5454
Status init(const TPlanNode& tnode, RuntimeState* state) override;
55-
Status prepare(RuntimeState* state) override;
55+
Status open(RuntimeState* state) override;
5656
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
5757

5858
[[nodiscard]] bool is_source() const override { return true; }

be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp

+4-10
Original file line numberDiff line numberDiff line change
@@ -369,12 +369,12 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState*
369369
return Status::OK();
370370
}
371371

372-
Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
373-
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
372+
Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
373+
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
374374
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
375375
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
376376
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
377-
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
377+
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));
378378

379379
int j = _probe_expr_ctxs.size();
380380
for (int i = 0; i < j; ++i) {
@@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
389389
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
390390
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
391391
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
392-
state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc));
392+
state, _child->row_desc(), intermediate_slot_desc, output_slot_desc));
393393
_aggregate_evaluators[i]->set_version(state->be_exec_version());
394394
}
395395

@@ -412,12 +412,6 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
412412
alignment_of_next_state * alignment_of_next_state;
413413
}
414414
}
415-
416-
return Status::OK();
417-
}
418-
419-
Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
420-
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
421415
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));
422416

423417
for (int i = 0; i < _aggregate_evaluators.size(); ++i) {

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

+1-2
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,14 @@ class DistinctStreamingAggOperatorX final
9898
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
9999
const DescriptorTbl& descs, bool require_bucket_distribution);
100100
Status init(const TPlanNode& tnode, RuntimeState* state) override;
101-
Status prepare(RuntimeState* state) override;
102101
Status open(RuntimeState* state) override;
103102
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override;
104103
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override;
105104
bool need_more_input_data(RuntimeState* state) const override;
106105

107106
DataDistribution required_data_distribution() const override {
108107
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
109-
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
108+
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
110109
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
111110
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
112111
}

be/src/pipeline/exec/es_scan_operator.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
138138
return Status::OK();
139139
}
140140

141-
Status EsScanOperatorX::prepare(RuntimeState* state) {
142-
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
141+
Status EsScanOperatorX::open(RuntimeState* state) {
142+
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state));
143143

144144
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
145145
if (_tuple_desc == nullptr) {

be/src/pipeline/exec/es_scan_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
7272
const DescriptorTbl& descs, int parallel_tasks);
7373

7474
Status init(const TPlanNode& tnode, RuntimeState* state) override;
75-
Status prepare(RuntimeState* state) override;
75+
Status open(RuntimeState* state) override;
7676

7777
private:
7878
friend class EsScanLocalState;

0 commit comments

Comments
 (0)