Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce SortExec memory usage by void constructing single huge batch #2132

Merged
merged 15 commits into from
Apr 5, 2022
85 changes: 64 additions & 21 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,13 @@ fn get_sorted_iter(
.iter()
.enumerate()
.flat_map(|(i, arrays)| {
(0..arrays[0].len())
.map(|r| CompositeIndex {
// since we original use UInt32Array to index the combined mono batch,
// component record batches won't overflow as well,
// use u32 here for space efficiency.
batch_idx: i as u32,
row_idx: r as u32,
})
.collect::<Vec<CompositeIndex>>()
(0..arrays[0].len()).map(move |r| CompositeIndex {
// since we original use UInt32Array to index the combined mono batch,
// component record batches won't overflow as well,
// use u32 here for space efficiency.
batch_idx: i as u32,
row_idx: r as u32,
})
})
.collect::<Vec<CompositeIndex>>();

Expand Down Expand Up @@ -418,12 +416,14 @@ impl Iterator for SortedIterator {
}

let current_size = min(self.batch_size, self.length - self.pos);
let mut result = Vec::with_capacity(current_size);
for i in 0..current_size {
let p = self.pos + i;
let c_index = self.indices.value(p) as usize;
result.push(self.composite[c_index])
}
let result = (0..current_size)
.map(|i| {
let p = self.pos + i;
let c_index = self.indices.value(p) as usize;
self.composite[c_index]
})
.collect::<Vec<_>>();

self.pos += current_size;
Some(result)
}
Expand Down Expand Up @@ -470,21 +470,21 @@ impl Stream for SortedSizedRecordBatchStream {
match self.sorted_iter.next() {
None => Poll::Ready(None),
Some(combined) => {
let num_rows = combined.len();
let slices = combine_adjacent_indexes(combined);
let mut output = Vec::with_capacity(self.num_cols);
for i in 0..self.num_cols {
let arrays = self
.batches
.iter()
.map(|b| b.column(i).data())
.collect::<Vec<_>>();
let mut mutable =
MutableArrayData::new(arrays, false, combined.len());
for x in combined.iter() {
// we cannot extend with slice here, since the lexsort is unstable
let mut mutable = MutableArrayData::new(arrays, false, num_rows);
for x in slices.iter() {
mutable.extend(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think MutableArrayData is a bit more optimized for copying larger chunks of data, instead of single rows at a time. I guess that needs some work optimizing this case at the Arrow side (or somehow use another construction?)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think #2132 (comment) would result in copying larger chunks

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think of something like take but supporting multiple batches.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've updated the SortedIterator above to produce a slice each time to avoid this per cell copy.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I filed apache/arrow-rs#1523 to track adding something like this in arrow-rs

x.batch_idx as usize,
x.row_idx as usize,
(x.row_idx + 1) as usize,
x.start_row_idx as usize,
x.start_row_idx as usize + x.len,
);
}
output.push(make_array(mutable.freeze()))
Expand All @@ -497,6 +497,49 @@ impl Stream for SortedSizedRecordBatchStream {
}
}

struct CompositeSlice {
batch_idx: u32,
start_row_idx: u32,
len: usize,
}

/// Combine adjacent indexes from the same batch to make a slice, for more efficient `extend` later.
fn combine_adjacent_indexes(combined: Vec<CompositeIndex>) -> Vec<CompositeSlice> {
let mut last_batch_idx = 0;
let mut start_row_idx = 0;
let mut len = 0;

let mut slices = vec![];
for ci in combined {
if len == 0 {
last_batch_idx = ci.batch_idx;
start_row_idx = ci.row_idx;
len = 1;
} else if ci.batch_idx == last_batch_idx {
len += 1;
// since we have pre-sort each of the incoming batches,
// so if we witnessed a wrong order of indexes from the same batch,
// it must be of the same key with the row pointed by start_row_index.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this happened, does it mean the sort wasn't stable (as in it rearranged inputs that sorted to equal keys)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the only case I think.

start_row_idx = min(start_row_idx, ci.row_idx);
} else {
slices.push(CompositeSlice {
batch_idx: last_batch_idx,
start_row_idx,
len,
});
last_batch_idx = ci.batch_idx;
start_row_idx = ci.row_idx;
len = 1;
}
}
slices.push(CompositeSlice {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it important to check here that len > 0? I don't think it is possible to try and sort a 0 length RecordBatch but I wonder what would happen in that case

Copy link
Member Author

@yjshen yjshen Apr 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's not possible to be zero here, since it's guarded to not insert empty batches into the in-memory-sorter in the first place (https://github.com/apache/arrow-datafusion/blob/2325d1aac2230182c4cb496327daa76ea0d80088/datafusion/core/src/physical_plan/sorts/sort.rs#L115), so for inner batches merges there wouldn't be empty rows. I will add an assert here in case an unexpected happens.

batch_idx: last_batch_idx,
start_row_idx,
len,
});
slices
}

impl RecordBatchStream for SortedSizedRecordBatchStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
Expand Down