-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Reduce SortExec memory usage by void constructing single huge batch #2132
Conversation
Hardware Settings:
A modified version of TPC-H q1:select
l_returnflag,
l_linestatus,
l_quantity,
l_extendedprice,
l_discount,
l_tax
from
lineitem
order by
l_extendedprice,
l_discount;
Without this PR:
With this PR:
Performances are similar. cargo criterion --bench sort_limit_query_sqlWithout this PR:
With this PR:
Similar performance as well. |
// NB timer records time taken on drop, so there are no | ||
// calls to `timer.done()` below. | ||
let _timer = tracking_metrics.elapsed_compute().timer(); | ||
fn get_sorted_iter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main changes:
- concat all sort-columns (instead of all columns)
- sort to get the index array (same as the original sort)
- use
CompositeIndex
to avoid huge batch construction (to access records scattered in different batches) - construct a small batch at a time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What in the extreme case of when the number of sort columns is equivalent or close to the nr columns?
In that case we are a bit worse of now than before because we need to concat the sort columns anyway?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think in both cases (for this PR and the current master), we need to concat sort columns before the current lexsort
. For the current master, the concat is done while constructing the single huge record batch.
For select a, b from table order by a,b
, we consume memory with Vec<CompositeIndex>
in this PR, but also avoid take
huge arrays that do the actual reorder. So I think this behavior is consistent in this PR for different cases that sort columns and payload columns vary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would say that the peak memory usage is no different in the worst case (all columns are being sorted) as the implementation on master copies the entire input into a new record batch with all columns as well as evaluate the SortExprs into their own area
However, in the common case where not all columns are part of the sort key, this implementation will use significantly less peak memory
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah my intuition is that worst case memory usage will be similar. My conern was that we use mutablearraydata in the new implementation, but I realize we only use them for the non-sort columns. So the implementation in the above case (sort by all columns) is almost the same.
// NB timer records time taken on drop, so there are no | ||
// calls to `timer.done()` below. | ||
let _timer = tracking_metrics.elapsed_compute().timer(); | ||
let partial = sort_batch(input, self.schema.clone(), &self.expr)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A change here: sort each batch before buffering it in memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Performance would deteriorate significantly without this change:
Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 2, batch_size: 4096, path: "/home/yijie/sort_test/tpch-parquet", file_format: "parquet", mem_table: false, output_path: None }
Query 1 iteration 0 took 4619.9 ms and returned 6001214 rows
Query 1 iteration 1 took 4561.0 ms and returned 6001214 rows
Query 1 iteration 2 took 4527.7 ms and returned 6001214 rows
The main reason I think is caused by random memory access while constructing output batches. Without this per-batch sort, while collecting cells from unsorted batches, the memory access would be fully randomized. With this per-batch sort, we are accessing memory linearly for each column in each batch, this would results in much predictable memory access pattern and benefits the CPU cache.
I think the perf counter confirms the above speculation:
sudo perf stat -a -e cache-misses,cache-references,l3_cache_accesses,l3_misses,dTLB-load-misses,dTLB-loads target/release/tpch benchmark datafusion --iterations 3 --path /home/yijie/sort_test/tpch-parquet --format parquet --query 1 --batch-size 4096
Without this per-batch sort:
Performance counter stats for 'system wide':
1,340,359,889 cache-misses # 35.817 % of all cache refs
3,742,289,458 cache-references
1,984,089,839 l3_cache_accesses
540,429,658 l3_misses
303,508,234 dTLB-load-misses # 49.51% of all dTLB cache accesses
613,048,439 dTLB-loads
14.222309739 seconds time elapsed
With this per-batch sort:
Performance counter stats for 'system wide':
1,059,913,512 cache-misses # 30.715 % of all cache refs
3,450,839,405 cache-references
1,388,975,765 l3_cache_accesses
235,570,805 l3_misses
239,390,511 dTLB-load-misses # 51.36% of all dTLB cache accesses
466,141,655 dTLB-loads
8.675278258 seconds time elapsed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think the performance gains come from two folds:
- Sort and reorder the batch in the same thread while it would still be in the cache, as you mentioned.
- I think the other one is the memory access pattern for the final output phase. We are serially accessing columns for each batch. So the "sort order materializing we done for each incoming column" changes "purely" randomized collecting to sequentially accessing each column from all the batches, and yields a better cache behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice @yjshen 👍 . It is a pleasure to read.
It is fair to say that this PR's core change is to only copy data for the "sort keys" ( rather than all of the columns? If so it I think this is a good approach and state-of-the-art)
There are likely some other improvements that can still be made (I pointed out some below), but this seems like a (great) step in the right direction
I also tested it out using the IOx suite and that passed 🎉 : https://github.com/influxdata/influxdb_iox/pull/4230
Maybe we can eventually write a blog about your sorting adventures (in the vein of
https://duckdb.org/2021/08/27/external-sorting.html) -- you have just as much good stuff to report.
// NB timer records time taken on drop, so there are no | ||
// calls to `timer.done()` below. | ||
let _timer = tracking_metrics.elapsed_compute().timer(); | ||
let partial = sort_batch(input, self.schema.clone(), &self.expr)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(0..arrays[0].len()) | ||
.map(|r| CompositeIndex { | ||
// since we original use UInt32Array to index the combined mono batch, | ||
// component record batches won't overflow as well, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 yeah I agree this approach is no more prone to overflow than the implementation on master
.iter() | ||
.map(|b| b.column(i).data()) | ||
.collect::<Vec<_>>(); | ||
let mut mutable = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is very clever 👍 👍
cc @Dandandan who I think is also interested in clever database internals :) |
MutableArrayData::new(arrays, false, combined.len()); | ||
for x in combined.iter() { | ||
// we cannot extend with slice here, since the lexsort is unstable | ||
mutable.extend( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think MutableArrayData is a bit more optimized for copying larger chunks of data, instead of single rows at a time. I guess that needs some work optimizing this case at the Arrow side (or somehow use another construction?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think #2132 (comment) would result in copying larger chunks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think of something like take
but supporting multiple batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've updated the SortedIterator
above to produce a slice each time to avoid this per cell copy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I filed apache/arrow-rs#1523 to track adding something like this in arrow-rs
Co-authored-by: Andrew Lamb <andrew@nerdnetworks.org>
Thanks @alamb @Dandandan for your review!
Yes, that's the change in this PR.
I've updated the implementation with
Sounds great! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
len += 1; | ||
// since we have pre-sort each of the incoming batches, | ||
// so if we witnessed a wrong order of indexes from the same batch, | ||
// it must be of the same key with the row pointed by start_row_index. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this happened, does it mean the sort wasn't stable (as in it rearranged inputs that sorted to equal keys)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that's the only case I think.
len = 1; | ||
} | ||
} | ||
slices.push(CompositeSlice { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it important to check here that len > 0
? I don't think it is possible to try and sort a 0 length RecordBatch
but I wonder what would happen in that case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not possible to be zero here, since it's guarded to not insert empty batches into the in-memory-sorter in the first place (https://github.com/apache/arrow-datafusion/blob/2325d1aac2230182c4cb496327daa76ea0d80088/datafusion/core/src/physical_plan/sorts/sort.rs#L115), so for inner batches merges there wouldn't be empty rows. I will add an assert here in case an unexpected happens.
I updated https://github.com/influxdata/influxdb_iox/pull/4230 to use and the tests still pass 👍 |
I think this is good to go, so merging it in 🚀 |
Thanks @yjshen |
Which issue does this PR close?
Closes #2149.
Rationale for this change
It's a known issue that we have "double memory" usage behavior for
SortExec
. To articulate a little bit, while doing in-memory sorting, we are actually:combine_batches
and then sorting.The behavior is not critical for pure in-memory processing while memory is sufficient, but when handling datasets much bigger than memory, it becomes crucial to not double use memory when we are already taking much memory and need to spill before heading more incoming batches.
Currently, we are experiencing exit code 137 while using DataFusion sort in Blaze. While handling a much bigger dataset than memory, double memory sort would give queries no chance to finish.
What changes are included in this PR?
batch_size
.SortedSizedRecordBatchStream
.Are there any user-facing changes?
No.