@@ -59,48 +59,51 @@ namespace doris::vectorized {
59
59
void MergeSorterState::reset () {
60
60
auto empty_queue = std::priority_queue<MergeSortCursor>();
61
61
priority_queue_.swap (empty_queue);
62
- std::vector<MergeSortCursorImpl> empty_cursors (0 );
62
+ std::vector<std::shared_ptr< MergeSortCursorImpl> > empty_cursors (0 );
63
63
cursors_.swap (empty_cursors);
64
- std::vector<Block> empty_blocks (0 );
64
+ std::vector<std::shared_ptr< Block> > empty_blocks (0 );
65
65
sorted_blocks_.swap (empty_blocks);
66
66
unsorted_block_ = Block::create_unique (unsorted_block_->clone_empty ());
67
67
in_mem_sorted_bocks_size_ = 0 ;
68
68
}
69
69
70
- Status MergeSorterState::add_sorted_block (Block& block) {
70
+ Status MergeSorterState::add_sorted_block (Block&& block) {
71
71
auto rows = block.rows ();
72
72
if (0 == rows) {
73
73
return Status::OK ();
74
74
}
75
75
in_mem_sorted_bocks_size_ += block.bytes ();
76
- sorted_blocks_.emplace_back (std::move (block));
76
+ sorted_blocks_.emplace_back (Block::create_shared ( std::move (block) ));
77
77
num_rows_ += rows;
78
78
return Status::OK ();
79
79
}
80
80
81
81
Status MergeSorterState::build_merge_tree (const SortDescription& sort_description) {
82
82
for (auto & block : sorted_blocks_) {
83
- cursors_.emplace_back (block, sort_description);
83
+ cursors_.emplace_back (
84
+ MergeSortCursorImpl::create_shared (std::move (block), sort_description));
84
85
}
85
86
86
- if (sorted_blocks_.size () > 1 ) {
87
+ sorted_blocks_.clear ();
88
+ if (cursors_.size () > 1 ) {
87
89
for (auto & cursor : cursors_) {
88
- priority_queue_.emplace (& cursor);
90
+ priority_queue_.emplace (std::move ( cursor) );
89
91
}
92
+ cursors_.clear ();
90
93
}
91
94
92
95
return Status::OK ();
93
96
}
94
97
95
98
Status MergeSorterState::merge_sort_read (doris::vectorized::Block* block, int batch_size,
96
99
bool * eos) {
97
- if (sorted_blocks_ .empty ()) {
100
+ if (cursors_. empty () && priority_queue_ .empty ()) {
98
101
*eos = true ;
99
- } else if (sorted_blocks_ .size () == 1 ) {
102
+ } else if (cursors_ .size () == 1 ) {
100
103
if (offset_ != 0 ) {
101
- sorted_blocks_ [0 ]. skip_num_rows (offset_);
104
+ cursors_ [0 ]-> block -> skip_num_rows (offset_);
102
105
}
103
- block->swap (sorted_blocks_ [0 ]);
106
+ block->swap (*cursors_ [0 ]-> block );
104
107
*eos = true ;
105
108
} else {
106
109
RETURN_IF_ERROR (_merge_sort_read_impl (batch_size, block, eos));
@@ -110,9 +113,14 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba
110
113
111
114
Status MergeSorterState::_merge_sort_read_impl (int batch_size, doris::vectorized::Block* block,
112
115
bool * eos) {
113
- size_t num_columns = sorted_blocks_[0 ].columns ();
116
+ if (priority_queue_.empty ()) {
117
+ *eos = true ;
118
+ return Status::OK ();
119
+ }
120
+ size_t num_columns = priority_queue_.top ().impl ->block ->columns ();
114
121
115
- MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block (block, sorted_blocks_[0 ]);
122
+ MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block (
123
+ block, *priority_queue_.top ().impl ->block );
116
124
MutableColumns& merged_columns = m_block.mutable_columns ();
117
125
118
126
// / Take rows from queue in right order and push to 'merged'.
@@ -123,7 +131,7 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
123
131
124
132
if (offset_ == 0 ) {
125
133
for (size_t i = 0 ; i < num_columns; ++i)
126
- merged_columns[i]->insert_from (*current->all_columns [i], current->pos );
134
+ merged_columns[i]->insert_from (*current->block -> get_columns () [i], current->pos );
127
135
++merged_rows;
128
136
} else {
129
137
offset_--;
@@ -134,7 +142,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized
134
142
priority_queue_.push (current);
135
143
}
136
144
137
- if (merged_rows == batch_size) break ;
145
+ if (merged_rows == batch_size) {
146
+ break ;
147
+ }
138
148
}
139
149
block->set_columns (std::move (merged_columns));
140
150
@@ -261,22 +271,24 @@ Status FullSorter::_do_sort() {
261
271
// if one block totally greater the heap top of _block_priority_queue
262
272
// we can throw the block data directly.
263
273
if (_state->num_rows () < _offset + _limit) {
264
- static_cast <void >(_state->add_sorted_block (desc_block));
265
- _block_priority_queue.emplace (_pool->add (
266
- new MergeSortCursorImpl (_state->last_sorted_block (), _sort_description)));
274
+ static_cast <void >(_state->add_sorted_block (std::move (desc_block)));
275
+ _block_priority_queue.emplace (
276
+ MergeSortBlockCursor::create_shared (MergeSortCursorImpl::create_shared (
277
+ _state->last_sorted_block (), _sort_description)));
267
278
} else {
268
- auto tmp_cursor_impl =
269
- std::make_unique<MergeSortCursorImpl>(desc_block, _sort_description);
270
- MergeSortBlockCursor block_cursor (tmp_cursor_impl.get ());
271
- if (!block_cursor.totally_greater (_block_priority_queue.top ())) {
272
- static_cast <void >(_state->add_sorted_block (desc_block));
273
- _block_priority_queue.emplace (_pool->add (
274
- new MergeSortCursorImpl (_state->last_sorted_block (), _sort_description)));
279
+ auto tmp_cursor_impl = MergeSortCursorImpl::create_shared (
280
+ Block::create_shared (std::move (desc_block)), _sort_description);
281
+ MergeSortBlockCursor block_cursor (tmp_cursor_impl);
282
+ if (!block_cursor.totally_greater (*_block_priority_queue.top ())) {
283
+ static_cast <void >(_state->add_sorted_block (std::move (*tmp_cursor_impl->block )));
284
+ _block_priority_queue.emplace (
285
+ MergeSortBlockCursor::create_shared (MergeSortCursorImpl::create_shared (
286
+ _state->last_sorted_block (), _sort_description)));
275
287
}
276
288
}
277
289
} else {
278
290
// dispose normal sort logic
279
- static_cast <void >(_state->add_sorted_block (desc_block));
291
+ static_cast <void >(_state->add_sorted_block (std::move ( desc_block) ));
280
292
}
281
293
return Status::OK ();
282
294
}
0 commit comments