Skip to content

Commit fcea16a

Browse files
Merge branch 'master' into master-vault
2 parents 7a3acea + bf737b1 commit fcea16a

File tree

56 files changed

+644
-184
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

56 files changed

+644
-184
lines changed

.github/workflows/auto-cherry-pick.yml

+26-35
Original file line numberDiff line numberDiff line change
@@ -15,49 +15,40 @@
1515
# specific language governing permissions and limitations
1616
# under the License.
1717
#
18+
name: Auto Cherry-Pick to Branch
1819

1920
on:
2021
pull_request:
22+
types:
23+
- closed
2124
branches:
2225
- master
23-
types: ["closed"]
24-
26+
permissions:
27+
checks: write
28+
contents: write
29+
pull-requests: write
30+
repository-projects: write
2531
jobs:
26-
cherry_pick_branch_2.1:
27-
runs-on: ubuntu-latest
28-
name: Cherry pick into branch-2.1
29-
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/2.1.x') && github.event.pull_request.merged == true }}
30-
steps:
31-
- name: Checkout
32-
uses: actions/checkout@v3
33-
with:
34-
fetch-depth: 0
35-
- name: Cherry pick into branch-2.1
36-
uses: carloscastrojumo/github-cherry-pick-action@v1.0.1
37-
with:
38-
branch: branch-2.1
39-
labels: |
40-
cherry-pick
41-
reviewers: |
42-
yiguolei
43-
cherry_pick_branch-3.0:
32+
auto_cherry_pick:
4433
runs-on: ubuntu-latest
45-
name: Cherry pick into branch-3.0
4634
if: ${{ contains(github.event.pull_request.labels.*.name, 'dev/3.0.x') && github.event.pull_request.merged == true }}
4735
steps:
48-
- name: Checkout
36+
- name: Checkout repository
4937
uses: actions/checkout@v3
38+
39+
- name: Set up Python
40+
uses: actions/setup-python@v4
5041
with:
51-
fetch-depth: 0
52-
- name: Cherry pick into branch-3.0
53-
uses: carloscastrojumo/github-cherry-pick-action@v1.0.1
54-
with:
55-
branch: branch-3.0
56-
labels: |
57-
cherry-pick
58-
reviewers: |
59-
dataroaring
60-
title: '[cherry-pick] {old_title}'
61-
body: 'Cherry picking #{old_pull_request_id} onto this branch'
62-
env:
63-
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
42+
python-version: '3.x'
43+
44+
- name: Install dependencies
45+
run: |
46+
pip install PyGithub
47+
48+
- name: Auto cherry-pick
49+
env:
50+
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
51+
REPO_NAME: ${{ github.repository }}
52+
CONFLICT_LABEL: cherry-pick-conflict-in-3.0
53+
run: |
54+
python tools/auto-pick-script.py ${{ github.event.pull_request.number }} branch-3.0

be/src/common/config.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -1072,8 +1072,8 @@ DEFINE_mInt32(schema_cache_sweep_time_sec, "100");
10721072
// max number of segment cache, default -1 for backward compatibility fd_number*2/5
10731073
DEFINE_Int32(segment_cache_capacity, "-1");
10741074
DEFINE_Int32(segment_cache_fd_percentage, "20");
1075-
DEFINE_mInt32(estimated_mem_per_column_reader, "1024");
1076-
DEFINE_Int32(segment_cache_memory_percentage, "2");
1075+
DEFINE_mInt32(estimated_mem_per_column_reader, "512");
1076+
DEFINE_Int32(segment_cache_memory_percentage, "5");
10771077

10781078
// enable feature binlog, default false
10791079
DEFINE_Bool(enable_feature_binlog, "false");

be/src/pipeline/dependency.h

+4-4
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,19 @@ class Dependency : public std::enable_shared_from_this<Dependency> {
111111
// Notify downstream pipeline tasks this dependency is ready.
112112
void set_ready();
113113
void set_ready_to_read() {
114-
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
114+
DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
115115
_shared_state->source_deps.front()->set_ready();
116116
}
117117
void set_block_to_read() {
118-
DCHECK(_shared_state->source_deps.size() == 1) << debug_string();
118+
DCHECK_EQ(_shared_state->source_deps.size(), 1) << debug_string();
119119
_shared_state->source_deps.front()->block();
120120
}
121121
void set_ready_to_write() {
122-
DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
122+
DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
123123
_shared_state->sink_deps.front()->set_ready();
124124
}
125125
void set_block_to_write() {
126-
DCHECK(_shared_state->sink_deps.size() == 1) << debug_string();
126+
DCHECK_EQ(_shared_state->sink_deps.size(), 1) << debug_string();
127127
_shared_state->sink_deps.front()->block();
128128
}
129129

be/src/pipeline/exec/aggregation_sink_operator.cpp

+4-1
Original file line numberDiff line numberDiff line change
@@ -717,7 +717,10 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPla
717717
: tnode.agg_node.grouping_exprs),
718718
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
719719
_require_bucket_distribution(require_bucket_distribution),
720-
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples) {}
720+
_agg_fn_output_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
721+
_without_key(tnode.agg_node.grouping_exprs.empty()) {
722+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
723+
}
721724

722725
Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
723726
RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state));

be/src/pipeline/exec/aggregation_sink_operator.h

+3-4
Original file line numberDiff line numberDiff line change
@@ -143,9 +143,8 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
143143

144144
DataDistribution required_data_distribution() const override {
145145
if (_probe_expr_ctxs.empty()) {
146-
return _needs_finalize || DataSinkOperatorX<AggSinkLocalState>::_child
147-
->ignore_data_distribution()
148-
? DataDistribution(ExchangeType::PASSTHROUGH)
146+
return _needs_finalize
147+
? DataDistribution(ExchangeType::NOOP)
149148
: DataSinkOperatorX<AggSinkLocalState>::required_data_distribution();
150149
}
151150
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
@@ -204,8 +203,8 @@ class AggSinkOperatorX final : public DataSinkOperatorX<AggSinkLocalState> {
204203
const std::vector<TExpr> _partition_exprs;
205204
const bool _is_colocate;
206205
const bool _require_bucket_distribution;
207-
208206
RowDescriptor _agg_fn_output_row_descriptor;
207+
const bool _without_key;
209208
};
210209

211210
} // namespace doris::pipeline

be/src/pipeline/exec/aggregation_source_operator.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,9 @@ AggSourceOperatorX::AggSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode,
441441
const DescriptorTbl& descs)
442442
: Base(pool, tnode, operator_id, descs),
443443
_needs_finalize(tnode.agg_node.need_finalize),
444-
_without_key(tnode.agg_node.grouping_exprs.empty()) {}
444+
_without_key(tnode.agg_node.grouping_exprs.empty()) {
445+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
446+
}
445447

446448
Status AggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block, bool* eos) {
447449
auto& local_state = get_local_state(state);

be/src/pipeline/exec/analytic_sink_operator.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,9 @@ AnalyticSinkOperatorX::AnalyticSinkOperatorX(ObjectPool* pool, int operator_id,
201201
_require_bucket_distribution(require_bucket_distribution),
202202
_partition_exprs(tnode.__isset.distribute_expr_lists && require_bucket_distribution
203203
? tnode.distribute_expr_lists[0]
204-
: tnode.analytic_node.partition_exprs) {}
204+
: tnode.analytic_node.partition_exprs) {
205+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
206+
}
205207

206208
Status AnalyticSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
207209
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));

be/src/pipeline/exec/analytic_source_operator.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ AnalyticSourceOperatorX::AnalyticSourceOperatorX(ObjectPool* pool, const TPlanNo
475475
_has_range_window(tnode.analytic_node.window.type == TAnalyticWindowType::RANGE),
476476
_has_window_start(tnode.analytic_node.window.__isset.window_start),
477477
_has_window_end(tnode.analytic_node.window.__isset.window_end) {
478+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
478479
_fn_scope = AnalyticFnScope::PARTITION;
479480
if (tnode.analytic_node.__isset.window &&
480481
tnode.analytic_node.window.type == TAnalyticWindowType::RANGE) {

be/src/pipeline/exec/assert_num_rows_operator.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ AssertNumRowsOperatorX::AssertNumRowsOperatorX(ObjectPool* pool, const TPlanNode
2727
: StreamingOperatorX<AssertNumRowsLocalState>(pool, tnode, operator_id, descs),
2828
_desired_num_rows(tnode.assert_num_rows_node.desired_num_rows),
2929
_subquery_string(tnode.assert_num_rows_node.subquery_string) {
30+
_is_serial_operator = true;
3031
if (tnode.assert_num_rows_node.__isset.assertion) {
3132
_assertion = tnode.assert_num_rows_node.assertion;
3233
} else {

be/src/pipeline/exec/distinct_streaming_aggregation_operator.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,9 @@ DistinctStreamingAggOperatorX::DistinctStreamingAggOperatorX(ObjectPool* pool, i
326326
? tnode.distribute_expr_lists[0]
327327
: tnode.agg_node.grouping_exprs),
328328
_is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate),
329-
_require_bucket_distribution(require_bucket_distribution) {
329+
_require_bucket_distribution(require_bucket_distribution),
330+
_without_key(tnode.agg_node.grouping_exprs.empty()) {
331+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
330332
if (tnode.agg_node.__isset.use_streaming_preaggregation) {
331333
_is_streaming_preagg = tnode.agg_node.use_streaming_preaggregation;
332334
if (_is_streaming_preagg) {

be/src/pipeline/exec/distinct_streaming_aggregation_operator.h

+4
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,9 @@ class DistinctStreamingAggOperatorX final
104104
bool need_more_input_data(RuntimeState* state) const override;
105105

106106
DataDistribution required_data_distribution() const override {
107+
if (_needs_finalize && _probe_expr_ctxs.empty()) {
108+
return {ExchangeType::NOOP};
109+
}
107110
if (_needs_finalize || (!_probe_expr_ctxs.empty() && !_is_streaming_preagg)) {
108111
return _is_colocate && _require_bucket_distribution && !_followed_by_shuffled_operator
109112
? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs)
@@ -136,6 +139,7 @@ class DistinctStreamingAggOperatorX final
136139
/// The total size of the row from the aggregate functions.
137140
size_t _total_size_of_aggregate_states = 0;
138141
bool _is_streaming_preagg = false;
142+
const bool _without_key;
139143
};
140144

141145
} // namespace pipeline

be/src/pipeline/exec/exchange_sink_operator.h

+1
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
224224
Status serialize_block(ExchangeSinkLocalState& stete, vectorized::Block* src, PBlock* dest,
225225
int num_receivers = 1);
226226
DataDistribution required_data_distribution() const override;
227+
bool is_serial_operator() const override { return true; }
227228

228229
private:
229230
friend class ExchangeSinkLocalState;

be/src/pipeline/exec/join_build_sink_operator.cpp

+2
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,8 @@ JoinBuildSinkOperatorX<LocalStateType>::JoinBuildSinkOperatorX(ObjectPool* pool,
8282
_short_circuit_for_null_in_build_side(_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
8383
!_is_mark_join),
8484
_runtime_filter_descs(tnode.runtime_filters) {
85+
DataSinkOperatorX<LocalStateType>::_is_serial_operator =
86+
tnode.__isset.is_serial_operator && tnode.is_serial_operator;
8587
_init_join_op();
8688
if (_is_mark_join) {
8789
DCHECK(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op == TJoinOp::LEFT_SEMI_JOIN ||

be/src/pipeline/exec/join_probe_operator.cpp

+1
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
220220
: true)
221221

222222
) {
223+
Base::_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
223224
if (tnode.__isset.hash_join_node) {
224225
_intermediate_row_desc.reset(new RowDescriptor(
225226
descs, tnode.hash_join_node.vintermediate_tuple_id_list,

be/src/pipeline/exec/nested_loop_join_probe_operator.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,9 @@ class NestedLoopJoinProbeOperatorX final
203203
}
204204

205205
DataDistribution required_data_distribution() const override {
206-
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
206+
if (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
207+
_join_op == TJoinOp::RIGHT_OUTER_JOIN || _join_op == TJoinOp::RIGHT_ANTI_JOIN ||
208+
_join_op == TJoinOp::RIGHT_SEMI_JOIN || _join_op == TJoinOp::FULL_OUTER_JOIN) {
207209
return {ExchangeType::NOOP};
208210
}
209211
return {ExchangeType::ADAPTIVE_PASSTHROUGH};

be/src/pipeline/exec/operator.cpp

+5-4
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,9 @@ std::string PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentatio
141141

142142
std::string OperatorXBase::debug_string(int indentation_level) const {
143143
fmt::memory_buffer debug_string_buffer;
144-
fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}",
145-
std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks);
144+
fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}, _is_serial_operator={}",
145+
std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks,
146+
_is_serial_operator);
146147
return fmt::to_string(debug_string_buffer);
147148
}
148149

@@ -363,8 +364,8 @@ void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos)
363364
std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
364365
fmt::memory_buffer debug_string_buffer;
365366

366-
fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '),
367-
_name, node_id());
367+
fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}",
368+
std::string(indentation_level * 2, ' '), _name, node_id(), _is_serial_operator);
368369
return fmt::to_string(debug_string_buffer);
369370
}
370371

be/src/pipeline/exec/operator.h

+5-1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,9 @@ class OperatorBase {
101101
return Status::OK();
102102
}
103103

104+
// Operators need to be executed serially. (e.g. finalized agg without key)
105+
[[nodiscard]] virtual bool is_serial_operator() const { return _is_serial_operator; }
106+
104107
[[nodiscard]] bool is_closed() const { return _is_closed; }
105108

106109
virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; }
@@ -122,6 +125,7 @@ class OperatorBase {
122125

123126
bool _is_closed;
124127
bool _followed_by_shuffled_operator = false;
128+
bool _is_serial_operator = false;
125129
};
126130

127131
class PipelineXLocalStateBase {
@@ -444,7 +448,7 @@ class DataSinkOperatorXBase : public OperatorBase {
444448

445449
Status init(const TDataSink& tsink) override;
446450
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
447-
const bool is_shuffled_hash_join,
451+
const bool use_global_hash_shuffle,
448452
const std::map<int, int>& shuffle_idx_to_instance_idx) {
449453
return Status::InternalError("init() is only implemented in local exchange!");
450454
}

be/src/pipeline/exec/partitioned_aggregation_source_operator.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,10 @@ Status PartitionedAggSourceOperatorX::close(RuntimeState* state) {
118118
return _agg_source_operator->close(state);
119119
}
120120

121+
bool PartitionedAggSourceOperatorX::is_serial_operator() const {
122+
return _agg_source_operator->is_serial_operator();
123+
}
124+
121125
Status PartitionedAggSourceOperatorX::get_block(RuntimeState* state, vectorized::Block* block,
122126
bool* eos) {
123127
auto& local_state = get_local_state(state);

be/src/pipeline/exec/partitioned_aggregation_source_operator.h

+2
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ class PartitionedAggSourceOperatorX : public OperatorX<PartitionedAggLocalState>
9191

9292
bool is_source() const override { return true; }
9393

94+
bool is_serial_operator() const override;
95+
9496
private:
9597
friend class PartitionedAggLocalState;
9698

be/src/pipeline/exec/sort_sink_operator.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,9 @@ SortSinkOperatorX::SortSinkOperatorX(ObjectPool* pool, int operator_id, const TP
9090
: std::vector<TExpr> {}),
9191
_algorithm(tnode.sort_node.__isset.algorithm ? tnode.sort_node.algorithm
9292
: TSortAlgorithm::FULL_SORT),
93-
_reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {}
93+
_reuse_mem(_algorithm != TSortAlgorithm::HEAP_SORT) {
94+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
95+
}
9496

9597
Status SortSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
9698
RETURN_IF_ERROR(DataSinkOperatorX::init(tnode, state));

be/src/pipeline/exec/sort_sink_operator.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ class SortSinkOperatorX final : public DataSinkOperatorX<SortSinkLocalState> {
6969
} else if (_merge_by_exchange) {
7070
// The current sort node is used for the ORDER BY
7171
return {ExchangeType::PASSTHROUGH};
72+
} else {
73+
return {ExchangeType::NOOP};
7274
}
73-
return DataSinkOperatorX<SortSinkLocalState>::required_data_distribution();
7475
}
7576
bool require_shuffled_data_distribution() const override { return _is_analytic_sort; }
7677
bool require_data_distribution() const override { return _is_colocate; }

be/src/pipeline/exec/sort_source_operator.cpp

+3-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ SortSourceOperatorX::SortSourceOperatorX(ObjectPool* pool, const TPlanNode& tnod
3030
const DescriptorTbl& descs)
3131
: OperatorX<SortLocalState>(pool, tnode, operator_id, descs),
3232
_merge_by_exchange(tnode.sort_node.merge_by_exchange),
33-
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) {}
33+
_offset(tnode.sort_node.__isset.offset ? tnode.sort_node.offset : 0) {
34+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
35+
}
3436

3537
Status SortSourceOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
3638
RETURN_IF_ERROR(Base::init(tnode, state));

be/src/pipeline/exec/union_source_operator.h

+3-1
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,9 @@ class UnionSourceOperatorX final : public OperatorX<UnionSourceLocalState> {
6363
using Base = OperatorX<UnionSourceLocalState>;
6464
UnionSourceOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
6565
const DescriptorTbl& descs)
66-
: Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {};
66+
: Base(pool, tnode, operator_id, descs), _child_size(tnode.num_children) {
67+
_is_serial_operator = tnode.__isset.is_serial_operator && tnode.is_serial_operator;
68+
}
6769
~UnionSourceOperatorX() override = default;
6870
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
6971

be/src/pipeline/local_exchange/local_exchange_sink_operator.cpp

+3-3
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,17 @@ std::vector<Dependency*> LocalExchangeSinkLocalState::dependencies() const {
3636
}
3737

3838
Status LocalExchangeSinkOperatorX::init(ExchangeType type, const int num_buckets,
39-
const bool should_disable_bucket_shuffle,
39+
const bool use_global_hash_shuffle,
4040
const std::map<int, int>& shuffle_idx_to_instance_idx) {
4141
_name = "LOCAL_EXCHANGE_SINK_OPERATOR (" + get_exchange_type_name(type) + ")";
4242
_type = type;
4343
if (_type == ExchangeType::HASH_SHUFFLE) {
44-
_use_global_shuffle = should_disable_bucket_shuffle;
44+
_use_global_shuffle = use_global_hash_shuffle;
4545
// For shuffle join, if data distribution has been broken by previous operator, we
4646
// should use a HASH_SHUFFLE local exchanger to shuffle data again. To be mentioned,
4747
// we should use map shuffle idx to instance idx because all instances will be
4848
// distributed to all BEs. Otherwise, we should use shuffle idx directly.
49-
if (should_disable_bucket_shuffle) {
49+
if (use_global_hash_shuffle) {
5050
std::for_each(shuffle_idx_to_instance_idx.begin(), shuffle_idx_to_instance_idx.end(),
5151
[&](const auto& item) {
5252
DCHECK(item.first != -1);

be/src/pipeline/local_exchange/local_exchange_sink_operator.h

+1-1
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class LocalExchangeSinkOperatorX final : public DataSinkOperatorX<LocalExchangeS
100100
return Status::InternalError("{} should not init with TPlanNode", Base::_name);
101101
}
102102

103-
Status init(ExchangeType type, const int num_buckets, const bool should_disable_bucket_shuffle,
103+
Status init(ExchangeType type, const int num_buckets, const bool use_global_hash_shuffle,
104104
const std::map<int, int>& shuffle_idx_to_instance_idx) override;
105105

106106
Status open(RuntimeState* state) override;

0 commit comments

Comments
 (0)