Skip to content

Commit c566ae2

Browse files
authored
Revert "[Improvement](sort) Free sort blocks if this block is exhausted (#393…"
This reverts commit 3ac8347.
1 parent 44b87cb commit c566ae2

8 files changed

+149
-112
lines changed

be/src/vec/common/sort/partition_sorter.cpp

+22-20
Original file line numberDiff line numberDiff line change
@@ -58,17 +58,20 @@ Status PartitionSorter::append_block(Block* input_block) {
5858
Block sorted_block = VectorizedUtils::create_empty_columnswithtypename(_row_desc);
5959
DCHECK(input_block->columns() == sorted_block.columns());
6060
RETURN_IF_ERROR(partial_sort(*input_block, sorted_block));
61-
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
61+
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
6262
return Status::OK();
6363
}
6464

6565
Status PartitionSorter::prepare_for_read() {
66+
auto& cursors = _state->get_cursors();
6667
auto& blocks = _state->get_sorted_block();
6768
auto& priority_queue = _state->get_priority_queue();
6869
for (auto& block : blocks) {
69-
priority_queue.push(MergeSortCursorImpl::create_shared(block, _sort_description));
70+
cursors.emplace_back(block, _sort_description);
71+
}
72+
for (auto& cursor : cursors) {
73+
priority_queue.push(MergeSortCursor(&cursor));
7074
}
71-
blocks.clear();
7275
return Status::OK();
7376
}
7477

@@ -81,30 +84,29 @@ void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
8184
}
8285

8386
Status PartitionSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
84-
if (_state->get_priority_queue().empty()) {
85-
*eos = true;
86-
} else if (_state->get_priority_queue().size() == 1 && _has_global_limit) {
87-
block->swap(*_state->get_priority_queue().top().impl->block);
88-
block->set_num_rows(_partition_inner_limit);
87+
if (_state->get_sorted_block().empty()) {
8988
*eos = true;
9089
} else {
91-
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
90+
if (_state->get_sorted_block().size() == 1 && _has_global_limit) {
91+
auto& sorted_block = _state->get_sorted_block()[0];
92+
block->swap(sorted_block);
93+
block->set_num_rows(_partition_inner_limit);
94+
*eos = true;
95+
} else {
96+
RETURN_IF_ERROR(partition_sort_read(block, eos, state->batch_size()));
97+
}
9298
}
9399
return Status::OK();
94100
}
95101

96102
Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int batch_size) {
97-
auto& priority_queue = _state->get_priority_queue();
98-
if (priority_queue.empty()) {
99-
*eos = true;
100-
return Status::OK();
101-
}
102-
const auto& sorted_block = priority_queue.top().impl->block;
103-
size_t num_columns = sorted_block->columns();
103+
const auto& sorted_block = _state->get_sorted_block()[0];
104+
size_t num_columns = sorted_block.columns();
104105
MutableBlock m_block =
105-
VectorizedUtils::build_mutable_mem_reuse_block(output_block, *sorted_block);
106+
VectorizedUtils::build_mutable_mem_reuse_block(output_block, sorted_block);
106107
MutableColumns& merged_columns = m_block.mutable_columns();
107108
size_t current_output_rows = 0;
109+
auto& priority_queue = _state->get_priority_queue();
108110

109111
bool get_enough_data = false;
110112
while (!priority_queue.empty()) {
@@ -119,7 +121,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
119121
//1 row_number no need to check distinct, just output partition_inner_limit row
120122
if ((current_output_rows + _output_total_rows) < _partition_inner_limit) {
121123
for (size_t i = 0; i < num_columns; ++i) {
122-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
124+
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
123125
}
124126
} else {
125127
//rows has get enough
@@ -153,7 +155,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
153155
}
154156
}
155157
for (size_t i = 0; i < num_columns; ++i) {
156-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
158+
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
157159
}
158160
break;
159161
}
@@ -178,7 +180,7 @@ Status PartitionSorter::partition_sort_read(Block* output_block, bool* eos, int
178180
*_previous_row = current;
179181
}
180182
for (size_t i = 0; i < num_columns; ++i) {
181-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
183+
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
182184
}
183185
current_output_rows++;
184186
break;

be/src/vec/common/sort/partition_sorter.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ struct SortCursorCmp {
5050
SortCursorCmp(const MergeSortCursor& cursor) : row(cursor->pos), impl(cursor.impl) {}
5151

5252
void reset() {
53-
impl->reset();
53+
impl = nullptr;
5454
row = 0;
5555
}
5656
bool compare_two_rows(const MergeSortCursor& rhs) const {
@@ -67,7 +67,7 @@ struct SortCursorCmp {
6767
return true;
6868
}
6969
int row = 0;
70-
std::shared_ptr<MergeSortCursorImpl> impl = nullptr;
70+
MergeSortCursorImpl* impl = nullptr;
7171
};
7272

7373
class PartitionSorter final : public Sorter {

be/src/vec/common/sort/sorter.cpp

+34-37
Original file line numberDiff line numberDiff line change
@@ -59,44 +59,48 @@ namespace doris::vectorized {
5959
void MergeSorterState::reset() {
6060
auto empty_queue = std::priority_queue<MergeSortCursor>();
6161
priority_queue_.swap(empty_queue);
62-
std::vector<std::shared_ptr<MergeSortCursorImpl>> empty_cursors(0);
63-
std::vector<std::shared_ptr<Block>> empty_blocks(0);
62+
std::vector<MergeSortCursorImpl> empty_cursors(0);
63+
cursors_.swap(empty_cursors);
64+
std::vector<Block> empty_blocks(0);
6465
sorted_blocks_.swap(empty_blocks);
6566
unsorted_block_ = Block::create_unique(unsorted_block_->clone_empty());
6667
in_mem_sorted_bocks_size_ = 0;
6768
}
6869

69-
void MergeSorterState::add_sorted_block(std::shared_ptr<Block> block) {
70-
auto rows = block->rows();
70+
Status MergeSorterState::add_sorted_block(Block& block) {
71+
auto rows = block.rows();
7172
if (0 == rows) {
72-
return;
73+
return Status::OK();
7374
}
74-
in_mem_sorted_bocks_size_ += block->bytes();
75-
sorted_blocks_.emplace_back(block);
75+
in_mem_sorted_bocks_size_ += block.bytes();
76+
sorted_blocks_.emplace_back(std::move(block));
7677
num_rows_ += rows;
78+
return Status::OK();
7779
}
7880

7981
Status MergeSorterState::build_merge_tree(const SortDescription& sort_description) {
8082
for (auto& block : sorted_blocks_) {
81-
priority_queue_.emplace(
82-
MergeSortCursorImpl::create_shared(std::move(block), sort_description));
83+
cursors_.emplace_back(block, sort_description);
84+
}
85+
86+
if (sorted_blocks_.size() > 1) {
87+
for (auto& cursor : cursors_) {
88+
priority_queue_.emplace(&cursor);
89+
}
8390
}
8491

85-
sorted_blocks_.clear();
8692
return Status::OK();
8793
}
8894

8995
Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int batch_size,
9096
bool* eos) {
91-
DCHECK(sorted_blocks_.empty());
92-
DCHECK(unsorted_block_->empty());
93-
if (priority_queue_.empty()) {
97+
if (sorted_blocks_.empty()) {
9498
*eos = true;
95-
} else if (priority_queue_.size() == 1) {
99+
} else if (sorted_blocks_.size() == 1) {
96100
if (offset_ != 0) {
97-
priority_queue_.top().impl->block->skip_num_rows(offset_);
101+
sorted_blocks_[0].skip_num_rows(offset_);
98102
}
99-
block->swap(*priority_queue_.top().impl->block);
103+
block->swap(sorted_blocks_[0]);
100104
*eos = true;
101105
} else {
102106
RETURN_IF_ERROR(_merge_sort_read_impl(batch_size, block, eos));
@@ -106,14 +110,9 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
106110

107111
Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block,
108112
bool* eos) {
109-
if (priority_queue_.empty()) {
110-
*eos = true;
111-
return Status::OK();
112-
}
113-
size_t num_columns = priority_queue_.top().impl->block->columns();
113+
size_t num_columns = sorted_blocks_[0].columns();
114114

115-
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(
116-
block, *priority_queue_.top().impl->block);
115+
MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block(block, sorted_blocks_[0]);
117116
MutableColumns& merged_columns = m_block.mutable_columns();
118117

119118
/// Take rows from queue in right order and push to 'merged'.
@@ -124,7 +123,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
124123

125124
if (offset_ == 0) {
126125
for (size_t i = 0; i < num_columns; ++i)
127-
merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos);
126+
merged_columns[i]->insert_from(*current->all_columns[i], current->pos);
128127
++merged_rows;
129128
} else {
130129
offset_--;
@@ -135,9 +134,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
135134
priority_queue_.push(current);
136135
}
137136

138-
if (merged_rows == batch_size) {
139-
break;
140-
}
137+
if (merged_rows == batch_size) break;
141138
}
142139
block->set_columns(std::move(merged_columns));
143140

@@ -264,22 +261,22 @@ Status FullSorter::_do_sort() {
264261
// if one block totally greater the heap top of _block_priority_queue
265262
// we can throw the block data directly.
266263
if (_state->num_rows() < _offset + _limit) {
267-
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
268-
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
269-
_state->last_sorted_block(), _sort_description));
264+
static_cast<void>(_state->add_sorted_block(desc_block));
265+
_block_priority_queue.emplace(_pool->add(
266+
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
270267
} else {
271-
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
272-
Block::create_shared(std::move(desc_block)), _sort_description);
273-
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
268+
auto tmp_cursor_impl =
269+
std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
270+
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
274271
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
275-
_state->add_sorted_block(tmp_cursor_impl->block);
276-
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
277-
_state->last_sorted_block(), _sort_description));
272+
static_cast<void>(_state->add_sorted_block(desc_block));
273+
_block_priority_queue.emplace(_pool->add(
274+
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
278275
}
279276
}
280277
} else {
281278
// dispose normal sort logic
282-
_state->add_sorted_block(Block::create_shared(std::move(desc_block)));
279+
static_cast<void>(_state->add_sorted_block(desc_block));
283280
}
284281
return Status::OK();
285282
}

be/src/vec/common/sort/sorter.h

+8-4
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class MergeSorterState {
5959

6060
~MergeSorterState() = default;
6161

62-
void add_sorted_block(std::shared_ptr<Block> block);
62+
Status add_sorted_block(Block& block);
6363

6464
Status build_merge_tree(const SortDescription& sort_description);
6565

@@ -72,19 +72,23 @@ class MergeSorterState {
7272

7373
uint64_t num_rows() const { return num_rows_; }
7474

75-
std::shared_ptr<Block> last_sorted_block() { return sorted_blocks_.back(); }
75+
Block& last_sorted_block() { return sorted_blocks_.back(); }
7676

77-
std::vector<std::shared_ptr<Block>>& get_sorted_block() { return sorted_blocks_; }
77+
std::vector<Block>& get_sorted_block() { return sorted_blocks_; }
7878
std::priority_queue<MergeSortCursor>& get_priority_queue() { return priority_queue_; }
79+
std::vector<MergeSortCursorImpl>& get_cursors() { return cursors_; }
7980
void reset();
8081

8182
std::unique_ptr<Block> unsorted_block_;
8283

8384
private:
85+
int _calc_spill_blocks_to_merge() const;
86+
8487
Status _merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos);
8588

8689
std::priority_queue<MergeSortCursor> priority_queue_;
87-
std::vector<std::shared_ptr<Block>> sorted_blocks_;
90+
std::vector<MergeSortCursorImpl> cursors_;
91+
std::vector<Block> sorted_blocks_;
8892
size_t in_mem_sorted_bocks_size_ = 0;
8993
uint64_t num_rows_ = 0;
9094

be/src/vec/common/sort/topn_sorter.cpp

+9-8
Original file line numberDiff line numberDiff line change
@@ -72,16 +72,17 @@ Status TopNSorter::_do_sort(Block* block) {
7272
// if one block totally greater the heap top of _block_priority_queue
7373
// we can throw the block data directly.
7474
if (_state->num_rows() < _offset + _limit) {
75-
_state->add_sorted_block(Block::create_shared(std::move(sorted_block)));
76-
_block_priority_queue.emplace(MergeSortCursorImpl::create_shared(
77-
_state->last_sorted_block(), _sort_description));
75+
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
76+
_block_priority_queue.emplace(_pool->add(
77+
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
7878
} else {
79-
auto tmp_cursor_impl = MergeSortCursorImpl::create_shared(
80-
Block::create_shared(std::move(sorted_block)), _sort_description);
81-
MergeSortBlockCursor block_cursor(tmp_cursor_impl);
79+
auto tmp_cursor_impl =
80+
std::make_unique<MergeSortCursorImpl>(sorted_block, _sort_description);
81+
MergeSortBlockCursor block_cursor(tmp_cursor_impl.get());
8282
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
83-
_state->add_sorted_block(block_cursor.impl->block);
84-
_block_priority_queue.emplace(tmp_cursor_impl);
83+
RETURN_IF_ERROR(_state->add_sorted_block(sorted_block));
84+
_block_priority_queue.emplace(_pool->add(
85+
new MergeSortCursorImpl(_state->last_sorted_block(), _sort_description)));
8586
}
8687
}
8788
} else {

0 commit comments

Comments
 (0)