Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[branch-3.0](pick) Pick 6 commits #41715

Merged
merged 7 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -769,12 +769,13 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status AggSinkOperatorX::prepare(RuntimeState* state) {
Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc()));
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -789,7 +790,7 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, DataSinkOperatorX<AggSinkLocalState>::_child_x->row_desc(),
state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}
Expand Down Expand Up @@ -824,10 +825,6 @@ Status AggSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output(
_probe_expr_ctxs.size(), _aggregate_evaluators, _agg_fn_output_row_descriptor));
}
return Status::OK();
}

Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (auto& _aggregate_evaluator : _aggregate_evaluators) {
Expand Down
5 changes: 2 additions & 3 deletions be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -137,19 +137,18 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;

DataDistribution required_data_distribution() const override {
if (_probe_expr_ctxs.empty()) {
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child_x
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
}
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
11 changes: 4 additions & 7 deletions be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,14 @@ Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<AnalyticSinkLocalState>::open(state));
for (const auto& ctx : _agg_expr_ctxs) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(ctx, state, _child->row_desc()));
}
if (!_partition_by_eq_expr_ctxs.empty() || !_order_by_eq_expr_ctxs.empty()) {
vector<TTupleId> tuple_ids;
tuple_ids.push_back(_child_x->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_child->row_desc().tuple_descriptors()[0]->id());
tuple_ids.push_back(_buffered_tuple_id);
RowDescriptor cmp_row_desc(state->desc_tbl(), tuple_ids, vector<bool>(2, false));
if (!_partition_by_eq_expr_ctxs.empty()) {
Expand All @@ -249,10 +250,6 @@ Status AnalyticSinkOperatorX::prepare(RuntimeState* state) {
vectorized::VExpr::prepare(_order_by_eq_expr_ctxs, state, cmp_row_desc));
}
}
return Status::OK();
}

Status AnalyticSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::open(_partition_by_eq_expr_ctxs, state));
RETURN_IF_ERROR(vectorized::VExpr::open(_order_by_eq_expr_ctxs, state));
for (size_t i = 0; i < _agg_functions_size; ++i) {
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,15 +74,14 @@ class AnalyticSinkOperatorX final : public DataSinkOperatorX<AnalyticSinkLocalSt

Status init(const TPlanNode& tnode, RuntimeState* state) override;

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
DataDistribution required_data_distribution() const override {
if (_partition_by_eq_expr_ctxs.empty()) {
return {ExchangeType::PASSTHROUGH};
} else if (_order_by_eq_expr_ctxs.empty()) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
13 changes: 4 additions & 9 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -562,15 +562,15 @@ Status AnalyticLocalState::close(RuntimeState* state) {
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::prepare(state));
DCHECK(_child_x->row_desc().is_prefix_of(_row_descriptor));
Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
DCHECK(_child->row_desc().is_prefix_of(_row_descriptor));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
for (size_t i = 0; i < _agg_functions.size(); ++i) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[i];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[i];
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child_x->row_desc(),
RETURN_IF_ERROR(_agg_functions[i]->prepare(state, _child->row_desc(),
intermediate_slot_desc, output_slot_desc));
_agg_functions[i]->set_version(state->be_exec_version());
_change_to_nullable_flags.push_back(output_slot_desc->is_nullable() &&
Expand All @@ -597,11 +597,6 @@ Status AnalyticSourceOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}
return Status::OK();
}

Status AnalyticSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<AnalyticLocalState>::open(state));
for (auto* agg_function : _agg_functions) {
RETURN_IF_ERROR(agg_function->open(state));
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,6 @@ class AnalyticSourceOperatorX final : public OperatorX<AnalyticLocalState> {
bool is_source() const override { return true; }

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/datagen_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ Status DataGenSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state)
return Status::OK();
}

Status DataGenSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::prepare(state));
Status DataGenSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<DataGenLocalState>::open(state));
// get tuple desc
_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/datagen_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DataGenSourceOperatorX final : public OperatorX<DataGenLocalState> {
const DescriptorTbl& descs);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;

[[nodiscard]] bool is_source() const override { return true; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -369,12 +369,12 @@ Status DistinctStreamingAggOperatorX::init(const TPlanNode& tnode, RuntimeState*
return Status::OK();
}

Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::prepare(state));
Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
_intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child_x->row_desc()));
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state, _child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
Expand All @@ -389,7 +389,7 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
state, _child_x->row_desc(), intermediate_slot_desc, output_slot_desc));
state, _child->row_desc(), intermediate_slot_desc, output_slot_desc));
_aggregate_evaluators[i]->set_version(state->be_exec_version());
}

Expand All @@ -412,12 +412,6 @@ Status DistinctStreamingAggOperatorX::prepare(RuntimeState* state) {
alignment_of_next_state * alignment_of_next_state;
}
}

return Status::OK();
}

Status DistinctStreamingAggOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(StatefulOperatorX<DistinctStreamingAggLocalState>::open(state));
RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state));

for (int i = 0; i < _aggregate_evaluators.size(); ++i) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,14 @@ class DistinctStreamingAggOperatorX final
DistinctStreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs, bool require_bucket_distribution);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) const override;
Status push(RuntimeState* state, vectorized::Block* input_block, bool eos) const override;
bool need_more_input_data(RuntimeState* state) const override;

DataDistribution required_data_distribution() const override {
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_join
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
: DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs);
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/es_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,8 @@ Status EsScanOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
return Status::OK();
}

Status EsScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::prepare(state));
Status EsScanOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<EsScanLocalState>::open(state));

_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
if (_tuple_desc == nullptr) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/es_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class EsScanOperatorX final : public ScanOperatorX<EsScanLocalState> {
const DescriptorTbl& descs, int parallel_tasks);

Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

private:
friend class EsScanLocalState;
Expand Down
14 changes: 5 additions & 9 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -335,19 +335,15 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
return Status::OK();
}

Status ExchangeSinkOperatorX::prepare(RuntimeState* state) {
Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
_mem_tracker = std::make_unique<MemTracker>("ExchangeSinkOperatorX:");
return Status::OK();
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
DCHECK(state != nullptr);
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_output_tuple_id == -1) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child_x->row_desc()));
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc()));
} else {
auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false));
Expand Down Expand Up @@ -686,10 +682,10 @@ Status ExchangeSinkLocalState::close(RuntimeState* state, Status exec_status) {
}

DataDistribution ExchangeSinkOperatorX::required_data_distribution() const {
if (_child_x && _enable_local_merge_sort) {
if (_child && _enable_local_merge_sort) {
// SORT_OPERATOR -> DATA_STREAM_SINK_OPERATOR
// SORT_OPERATOR -> LOCAL_MERGE_SORT -> DATA_STREAM_SINK_OPERATOR
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child_x);
if (auto sort_source = std::dynamic_pointer_cast<SortSourceOperatorX>(_child);
sort_source && sort_source->use_local_merge()) {
// Sort the data local
return ExchangeType::LOCAL_MERGE_SORT;
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt

RuntimeState* state() { return _state; }

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override;
Expand Down
10 changes: 2 additions & 8 deletions be/src/pipeline/exec/exchange_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,13 @@ Status ExchangeSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state
return Status::OK();
}

Status ExchangeSourceOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::prepare(state));
Status ExchangeSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
DCHECK_GT(_num_senders, 0);

if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, _row_descriptor, _row_descriptor));
}

return Status::OK();
}

Status ExchangeSourceOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(OperatorX<ExchangeLocalState>::open(state));
if (_is_merging) {
RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
}
Expand Down
1 change: 0 additions & 1 deletion be/src/pipeline/exec/exchange_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@ class ExchangeSourceOperatorX final : public OperatorX<ExchangeLocalState> {
ExchangeSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int num_senders);
Status init(const TPlanNode& tnode, RuntimeState* state) override;
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
Expand Down
4 changes: 2 additions & 2 deletions be/src/pipeline/exec/file_scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ Status FileScanLocalState::_process_conjuncts(RuntimeState* state) {
return Status::OK();
}

Status FileScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
Status FileScanOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::open(state));
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) {
TFileScanRangeParams& params =
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/file_scan_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ class FileScanOperatorX final : public ScanOperatorX<FileScanLocalState> {
_output_tuple_id = tnode.file_scan_node.tuple_id;
}

Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;

bool is_file_scan_operator() const override { return true; }

Expand Down
10 changes: 3 additions & 7 deletions be/src/pipeline/exec/group_commit_block_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,19 +259,15 @@ Status GroupCommitBlockSinkOperatorX::init(const TDataSink& t_sink) {
return Status::OK();
}

Status GroupCommitBlockSinkOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(Base::prepare(state));
Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
// get table's tuple descriptor
_output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_desc_id);
if (_output_tuple_desc == nullptr) {
LOG(WARNING) << "unknown destination tuple descriptor, id=" << _tuple_desc_id;
return Status::InternalError("unknown destination tuple descriptor");
}
return vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc);
}

Status GroupCommitBlockSinkOperatorX::open(RuntimeState* state) {
// Prepare the exprs to run.
RETURN_IF_ERROR(vectorized::VExpr::prepare(_output_vexpr_ctxs, state, _row_desc));
return vectorized::VExpr::open(_output_vexpr_ctxs, state);
}

Expand Down
2 changes: 0 additions & 2 deletions be/src/pipeline/exec/group_commit_block_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,6 @@ class GroupCommitBlockSinkOperatorX final

Status init(const TDataSink& sink) override;

Status prepare(RuntimeState* state) override;

Status open(RuntimeState* state) override;

Status sink(RuntimeState* state, vectorized::Block* block, bool eos) override;
Expand Down
Loading
Loading