Skip to content

Commit 614f28e

Browse files
committed
[Improvement](sort) Free sort blocks if this block is exhausted
1 parent fc469dc commit 614f28e

8 files changed

+138
-105
lines changed

be/src/vec/common/sort/partition_sorter.cpp

+23-20
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ Status PartitionSorter::append_block(Block* input_block) {
5858
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
5959
DCHECK(input_block->columns() == sorted_block.columns());
6060
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
61-
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
61+
RETURN_IF_ERROR(_state->add_sorted_block(std::move(sorted_block)));
6262
return Status::OK();
6363
}
6464

@@ -67,46 +67,49 @@ Status PartitionSorter::prepare_for_read() {
6767
auto& blocks = _state->get_sorted_block();
6868
auto& priority_queue = _state->get_priority_queue();
6969
for (auto& block : blocks) {
70-
cursors.emplace_back(block, _sort_description);
70+
cursors.emplace_back(MergeSortCursorImpl::create_shared(block, _sort_description));
7171
}
7272
for (auto& cursor : cursors) {
73-
priority_queue.push(MergeSortCursor(&cursor));
73+
priority_queue.push(std::move(cursor));
7474
}
75+
cursors.clear();
7576
return Status::OK();
7677
}
7778

7879
// have done sorter and get topn records, so could reset those state to init
7980
void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
80-
std::priority_queue<MergeSortBlockCursor> empty_queue;
81+
std::priority_queue<std::shared_ptr<MergeSortBlockCursor>> empty_queue;
8182
std::swap(_block_priority_queue, empty_queue);
8283
_state = MergeSorterState::create_unique(_row_desc, _offset, _limit, runtime_state, nullptr);
8384
_previous_row->reset();
8485
}
8586

8687
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
87-
if (_state->get_sorted_block().empty()) {
88+
if (_state->get_sorted_block().empty() && _state->get_priority_queue().empty()) {
89+
*eos = true;
90+
} else if (_state->get_cursors().size() == 1 && _has_global_limit) {
91+
auto& cursor = _state->get_cursors()[0];
92+
block->swap(*cursor->block);
93+
block->set_num_rows(_partition_inner_limit);
8894
*eos = true;
8995
} else {
90-
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
91-
auto& sorted_block = _state->get_sorted_block()[0];
92-
block->swap(sorted_block);
93-
block->set_num_rows(_partition_inner_limit);
94-
*eos = true;
95-
} else {
96-
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
97-
}
96+
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
9897
}
9998
return Status::OK();
10099
}
101100

102101
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
103-
const auto& sorted_block = _state->get_sorted_block()[0];
104-
size_t num_columns = sorted_block.columns();
102+
auto& priority_queue = _state->get_priority_queue();
103+
if (priority_queue.empty()) {
104+
*eos = true;
105+
return Status::OK();
106+
}
107+
const auto& sorted_block = priority_queue.top().impl->block;
108+
size_t num_columns = sorted_block->columns();
105109
MutableBlock m_block =
106-
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
110+
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
107111
MutableColumns& merged_columns = m_block.mutable_columns();
108112
size_t current_output_rows = 0;
109-
auto& priority_queue = _state->get_priority_queue();
110113

111114
bool get_enough_data = false;
112115
while (!priority_queue.empty()) {
@@ -121,7 +124,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
121124
//1 row_number no need to check distinct, just output partition_inner_limit row
122125
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
123126
for (size_t i = 0; i < num_columns; ++i) {
124-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
127+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
125128
}
126129
} else {
127130
//rows has get enough
@@ -155,7 +158,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
155158
}
156159
}
157160
for (size_t i = 0; i < num_columns; ++i) {
158-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
161+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
159162
}
160163
break;
161164
}
@@ -180,7 +183,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
180183
*_previous_row = current;
181184
}
182185
for (size_t i = 0; i < num_columns; ++i) {
183-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
186+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
184187
}
185188
current_output_rows++;
186189
break;

be/src/vec/common/sort/partition_sorter.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ struct SortCursorCmp {
5050
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}
5151

5252
void reset() {
53-
impl = nullptr;
53+
impl->reset();
5454
row = 0;
5555
}
5656
bool compare_two_rows(const MergeSortCursor& rhs) const {
@@ -67,7 +67,7 @@ struct SortCursorCmp {
6767
return true;
6868
}
6969
int row = 0;
70-
MergeSortCursorImpl* impl = nullptr;
70+
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
7171
};
7272

7373
class PartitionSorter final : public Sorter {

be/src/vec/common/sort/sorter.cpp

+38-26
Original file line numberDiff line numberDiff line change
@@ -59,47 +59,50 @@ namespace doris::vectorized {
5959
void MergeSorterState::reset() {
6060
auto empty_queue = std::priority_queue<MergeSortCursor>();
6161
priority_queue_.swap(empty_queue);
62-
std::vector<MergeSortCursorImpl> empty_cursors(0);
62+
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
6363
cursors_.swap(empty_cursors);
64-
std::vector<Block> empty_blocks(0);
64+
std::vector<std::shared_ptr<Block>> empty_blocks(0);
6565
sorted_blocks_.swap(empty_blocks);
6666
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
6767
in_mem_sorted_bocks_size_ = 0;
6868
}
69-
Status MergeSorterState::add_sorted_block(Block& block) {
69+
Status MergeSorterState::add_sorted_block(Block&& block) {
7070
auto rows = block.rows();
7171
if (0 == rows) {
7272
return Status::OK();
7373
}
7474
in_mem_sorted_bocks_size_ += block.bytes();
75-
sorted_blocks_.emplace_back(std::move(block));
75+
sorted_blocks_.emplace_back(Block::create_shared(std::move(block)));
7676
num_rows_ += rows;
7777
return Status::OK();
7878
}
7979

8080
Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
8181
for (auto& block : sorted_blocks_) {
82-
cursors_.emplace_back(block, sort_description);
82+
cursors_.emplace_back(
83+
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
8384
}
8485

85-
if (sorted_blocks_.size() > 1) {
86+
sorted_blocks_.clear();
87+
if (cursors_.size() > 1) {
8688
for (auto& cursor : cursors_) {
87-
priority_queue_.emplace(&cursor);
89+
priority_queue_.emplace(std::move(cursor));
8890
}
91+
cursors_.clear();
8992
}
9093

9194
return Status::OK();
9295
}
9396

9497
Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size,
9598
bool* eos) {
96-
if (sorted_blocks_.empty()) {
99+
if (cursors_.empty() && priority_queue_.empty()) {
97100
*eos = true;
98-
} else if (sorted_blocks_.size() == 1) {
101+
} else if (cursors_.size() == 1) {
99102
if (offset_ != 0) {
100-
sorted_blocks_[0].skip_num_rows(offset_);
103+
cursors_[0]->block->skip_num_rows(offset_);
101104
}
102-
block->swap(sorted_blocks_[0]);
105+
block->swap(*cursors_[0]->block);
103106
*eos = true;
104107
} else {
105108
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
@@ -109,9 +112,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
109112

110113
Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block,
111114
bool* eos) {
112-
size_t num_columns = sorted_blocks_[0].columns();
115+
if (priority_queue_.empty()) {
116+
*eos = true;
117+
return Status::OK();
118+
}
119+
size_t num_columns = priority_queue_.top().impl->block->columns();
113120

114-
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
121+
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
122+
block, *priority_queue_.top().impl->block);
115123
MutableColumns& merged_columns = m_block.mutable_columns();
116124

117125
/// Take rows from queue in right order and push to 'merged'.
@@ -122,7 +130,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
122130

123131
if (offset_ == 0) {
124132
for (size_t i = 0; i < num_columns; ++i)
125-
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
133+
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
126134
++merged_rows;
127135
} else {
128136
offset_--;
@@ -133,7 +141,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
133141
priority_queue_.push(current);
134142
}
135143

136-
if (merged_rows == batch_size) break;
144+
if (merged_rows == batch_size) {
145+
break;
146+
}
137147
}
138148
block->set_columns(std::move(merged_columns));
139149

@@ -260,22 +270,24 @@ Status FullSorter::_do_sort() {
260270
// if one block totally greater the heap top of _block_priority_queue
261271
// we can throw the block data directly.
262272
if (_state->num_rows() < _offset + _limit) {
263-
static_cast<void>(_state->add_sorted_block(desc_block));
264-
_block_priority_queue.emplace(_pool->add(
265-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
273+
static_cast<void>(_state->add_sorted_block(std::move(desc_block)));
274+
_block_priority_queue.emplace(
275+
MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared(
276+
_state->last_sorted_block(), _sort_description)));
266277
} else {
267-
auto tmp_cursor_impl =
268-
std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
269-
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
270-
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
271-
static_cast<void>(_state->add_sorted_block(desc_block));
272-
_block_priority_queue.emplace(_pool->add(
273-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
278+
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
279+
Block::create_shared(std::move(desc_block)), _sort_description);
280+
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
281+
if (!block_cursor.totally_greater(*_block_priority_queue.top())) {
282+
static_cast<void>(_state->add_sorted_block(std::move(*tmp_cursor_impl->block)));
283+
_block_priority_queue.emplace(
284+
MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared(
285+
_state->last_sorted_block(), _sort_description)));
274286
}
275287
}
276288
} else {
277289
// dispose normal sort logic
278-
static_cast<void>(_state->add_sorted_block(desc_block));
290+
static_cast<void>(_state->add_sorted_block(std::move(desc_block)));
279291
}
280292
return Status::OK();
281293
}

be/src/vec/common/sort/sorter.h

+7-9
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class MergeSorterState {
5959

6060
~MergeSorterState() = default;
6161

62-
Status add_sorted_block(Block& block);
62+
Status add_sorted_block(Block&& block);
6363

6464
Status build_merge_tree(const SortDescription& sort_description);
6565

@@ -72,23 +72,21 @@ class MergeSorterState {
7272

7373
uint64_t num_rows() const { return num_rows_; }
7474

75-
Block& last_sorted_block() { return sorted_blocks_.back(); }
75+
std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); }
7676

77-
std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
77+
std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; }
7878
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
79-
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
79+
std::vector<std::shared_ptr<MergeSortCursorImpl>>& get_cursors() { return cursors_; }
8080
void reset();
8181

8282
std::unique_ptr<Block> unsorted_block_;
8383

8484
private:
85-
int _calc_spill_blocks_to_merge() const;
86-
8785
Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos);
8886

8987
std::priority_queue<MergeSortCursor> priority_queue_;
90-
std::vector<MergeSortCursorImpl> cursors_;
91-
std::vector<Block> sorted_blocks_;
88+
std::vector<std::shared_ptr<MergeSortCursorImpl>> cursors_;
89+
std::vector<std::shared_ptr<Block>> sorted_blocks_;
9290
size_t in_mem_sorted_bocks_size_ = 0;
9391
uint64_t num_rows_ = 0;
9492

@@ -153,7 +151,7 @@ class Sorter {
153151
RuntimeProfile::Counter* _partial_sort_timer = nullptr;
154152
RuntimeProfile::Counter* _merge_block_timer = nullptr;
155153

156-
std::priority_queue<MergeSortBlockCursor> _block_priority_queue;
154+
std::priority_queue<std::shared_ptr<MergeSortBlockCursor>> _block_priority_queue;
157155
bool _materialize_sort_exprs;
158156
};
159157

be/src/vec/common/sort/topn_sorter.cpp

+12-10
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,19 @@ Status TopNSorter::_do_sort(Block* block) {
7272
// if one block totally greater the heap top of _block_priority_queue
7373
// we can throw the block data directly.
7474
if (_state->num_rows() < _offset + _limit) {
75-
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
76-
_block_priority_queue.emplace(_pool->add(
77-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
75+
RETURN_IF_ERROR(_state->add_sorted_block(std::move(sorted_block)));
76+
_block_priority_queue.emplace(
77+
MergeSortBlockCursor::create_shared(MergeSortCursorImpl::create_shared(
78+
_state->last_sorted_block(), _sort_description)));
7879
} else {
79-
auto tmp_cursor_impl =
80-
std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description);
81-
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
82-
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
83-
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
84-
_block_priority_queue.emplace(_pool->add(
85-
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
80+
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
81+
Block::create_shared(std::move(sorted_block)), _sort_description);
82+
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
83+
if (!block_cursor.totally_greater(*_block_priority_queue.top())) {
84+
RETURN_IF_ERROR(_state->add_sorted_block(std::move(*block_cursor.impl->block)));
85+
_block_priority_queue.emplace(MergeSortBlockCursor::create_shared(
86+
MergeSortCursorImpl::create_shared(MergeSortCursorImpl(
87+
_state->last_sorted_block(), _sort_description))));
8688
}
8789
}
8890
} else {

0 commit comments

Comments
 (0)