-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce SortExec memory usage by void constructing single huge batch #2132
Changes from 1 commit
6f83571
4497d11
971ded6
acff377
1e120de
ad361c1
28f5f45
e56919b
75c6912
6bf7b9d
73ad8f0
ad3a4d2
e4bee9d
2325d1a
93f9d4d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -341,15 +341,13 @@ fn get_sorted_iter( | |
.iter() | ||
.enumerate() | ||
.flat_map(|(i, arrays)| { | ||
(0..arrays[0].len()) | ||
.map(|r| CompositeIndex { | ||
// since we original use UInt32Array to index the combined mono batch, | ||
// component record batches won't overflow as well, | ||
// use u32 here for space efficiency. | ||
batch_idx: i as u32, | ||
row_idx: r as u32, | ||
}) | ||
.collect::<Vec<CompositeIndex>>() | ||
(0..arrays[0].len()).map(move |r| CompositeIndex { | ||
// since we original use UInt32Array to index the combined mono batch, | ||
// component record batches won't overflow as well, | ||
// use u32 here for space efficiency. | ||
batch_idx: i as u32, | ||
row_idx: r as u32, | ||
}) | ||
}) | ||
.collect::<Vec<CompositeIndex>>(); | ||
|
||
|
@@ -418,12 +416,14 @@ impl Iterator for SortedIterator { | |
} | ||
|
||
let current_size = min(self.batch_size, self.length - self.pos); | ||
let mut result = Vec::with_capacity(current_size); | ||
for i in 0..current_size { | ||
let p = self.pos + i; | ||
let c_index = self.indices.value(p) as usize; | ||
result.push(self.composite[c_index]) | ||
} | ||
let result = (0..current_size) | ||
.map(|i| { | ||
let p = self.pos + i; | ||
let c_index = self.indices.value(p) as usize; | ||
self.composite[c_index] | ||
}) | ||
.collect::<Vec<_>>(); | ||
|
||
self.pos += current_size; | ||
Some(result) | ||
} | ||
|
@@ -470,21 +470,21 @@ impl Stream for SortedSizedRecordBatchStream { | |
match self.sorted_iter.next() { | ||
None => Poll::Ready(None), | ||
Some(combined) => { | ||
let num_rows = combined.len(); | ||
let slices = combine_adjacent_indexes(combined); | ||
let mut output = Vec::with_capacity(self.num_cols); | ||
for i in 0..self.num_cols { | ||
let arrays = self | ||
.batches | ||
.iter() | ||
.map(|b| b.column(i).data()) | ||
.collect::<Vec<_>>(); | ||
let mut mutable = | ||
MutableArrayData::new(arrays, false, combined.len()); | ||
for x in combined.iter() { | ||
// we cannot extend with slice here, since the lexsort is unstable | ||
let mut mutable = MutableArrayData::new(arrays, false, num_rows); | ||
for x in slices.iter() { | ||
mutable.extend( | ||
x.batch_idx as usize, | ||
x.row_idx as usize, | ||
(x.row_idx + 1) as usize, | ||
x.start_row_idx as usize, | ||
x.start_row_idx as usize + x.len, | ||
); | ||
} | ||
output.push(make_array(mutable.freeze())) | ||
|
@@ -497,6 +497,49 @@ impl Stream for SortedSizedRecordBatchStream { | |
} | ||
} | ||
|
||
struct CompositeSlice { | ||
batch_idx: u32, | ||
start_row_idx: u32, | ||
len: usize, | ||
} | ||
|
||
/// Combine adjacent indexes from the same batch to make a slice, for more efficient `extend` later. | ||
fn combine_adjacent_indexes(combined: Vec<CompositeIndex>) -> Vec<CompositeSlice> { | ||
let mut last_batch_idx = 0; | ||
let mut start_row_idx = 0; | ||
let mut len = 0; | ||
|
||
let mut slices = vec![]; | ||
for ci in combined { | ||
if len == 0 { | ||
last_batch_idx = ci.batch_idx; | ||
start_row_idx = ci.row_idx; | ||
len = 1; | ||
} else if ci.batch_idx == last_batch_idx { | ||
len += 1; | ||
// since we have pre-sort each of the incoming batches, | ||
// so if we witnessed a wrong order of indexes from the same batch, | ||
// it must be of the same key with the row pointed by start_row_index. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if this happened, does it mean the sort wasn't stable (as in it rearranged inputs that sorted to equal keys)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, that's the only case I think. |
||
start_row_idx = min(start_row_idx, ci.row_idx); | ||
} else { | ||
slices.push(CompositeSlice { | ||
batch_idx: last_batch_idx, | ||
start_row_idx, | ||
len, | ||
}); | ||
last_batch_idx = ci.batch_idx; | ||
start_row_idx = ci.row_idx; | ||
len = 1; | ||
} | ||
} | ||
slices.push(CompositeSlice { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it important to check here that There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's not possible to be zero here, since it's guarded to not insert empty batches into the in-memory-sorter in the first place (https://github.com/apache/arrow-datafusion/blob/2325d1aac2230182c4cb496327daa76ea0d80088/datafusion/core/src/physical_plan/sorts/sort.rs#L115), so for inner batches merges there wouldn't be empty rows. I will add an assert here in case an unexpected happens. |
||
batch_idx: last_batch_idx, | ||
start_row_idx, | ||
len, | ||
}); | ||
slices | ||
} | ||
|
||
impl RecordBatchStream for SortedSizedRecordBatchStream { | ||
fn schema(&self) -> SchemaRef { | ||
self.schema.clone() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think MutableArrayData is a bit more optimized for copying larger chunks of data, instead of single rows at a time. I guess that needs some work optimizing this case at the Arrow side (or somehow use another construction?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think #2132 (comment) would result in copying larger chunks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think of something like
take
but supporting multiple batches.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the
SortedIterator
above to produce a slice each time to avoid this per cell copy.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed apache/arrow-rs#1523 to track adding something like this in arrow-rs