-
Notifications
You must be signed in to change notification settings - Fork 1.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Top-K eager batch sorting #7180
Conversation
…otential row reduction
FYI #7179 may also be interesting (I will happily update the docs to match this code if we pursue it) |
I pushed some changes, so that now in the absence of the Also in case the Some numbers for the example above (all in release mode):
EDIT: diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index aea499d60..b479370d9 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
...
fn create_runtime_env() -> Result<RuntimeEnv> {
- let rn_config = RuntimeConfig::new();
+ let rn_config = RuntimeConfig::new().with_memory_limit(256 * 1024 * 1024, 0.7);
RuntimeEnv::new(rn_config)
}
The times are again comparable, but slightly in favor of the present approach this time around, though I suppose I'd need more than N=1 samples to compare those meaningfully. The memory benefits on the other hand are still clearly on the side of top-k eager sorting in this PR. Testing on a larger external fileIn addition I repeated the entire profiling for another larger file (on disk-size 2.4GB, num of rows 46.2 million, 12 columns).
and again the results with same memory limits as above for this new file
Similar all-round story (though memory improvements are more modest now), with the exception that in this case the same memory limits are not translated to reality effectively as for the smaller file above (caps at 2.7G instead of ~200M). |
bcf1318
to
c4a40f2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @gruuya -- I took a quick look at this code and it looks like a good idea to me. I do think we can improve the situation substantially more with a dedicated operator, but this seems like a good step forward
cc @NGA-TRAN this might help us in IOx as well
I am running some benchmarks against this branch and will report back shortly
|
||
// TODO: This should probably be try_grow (#5885) | ||
reservation.resize(input.get_array_memory_size()); | ||
// Maybe we should perform sorting in a parallel task to unblock the caller |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the parallelization is working correctly all the other cores should be busy doing something useful here -- and thus sorting in another task may not be warranted but I am not sure
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah yeah, I think that makes sense; will remove the comment.
I also was thinking we can probably write a test for this behavior in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs I have something similar in #7130 (review) I can potentially help writing that test if you agree it would be helpful |
Thanks, I certainly planned on adding tests, just wanted to concentrate on the benchmarks first. I'm thinking of starting with Incidentally, there's a failing test now, which I've been looking at and it's a bit curious—the difference comes down to the fact that previously processed batch size was a bit different (due to the fetch) and the intermediate sort call was stable (i.e. preserved order), while currently it's unstable (didn't preserve order). To be more specific, in this particular test there are always 2 input batches (I've added the column C3 for more clarity). [
"+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+",
"| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1 | c3 |",
"+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+",
"| 0.01479305307777301 | 0.01479305307777301 | -86 |",
"| 0.02182578039211991 | 0.02182578039211991 | -38 |",
"| 0.03968347085780355 | 0.03968347085780355 | 30 |",
"| 0.04429073092078406 | 0.04429073092078406 | 36 |",
"| 0.047343434291126085 | 0.047343434291126085 | -95 |",
...
"| 0.980809631269599 | 0.980809631269599 | 52 |",
"| 0.991517828651004 | 0.991517828651004 | 29 |",
"+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+",
] and one with 1 row: [
"+--------------------------------------------------------------------------------------------------------------------------------+--------------------+-----+",
"| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1 | c3 |",
"+--------------------------------------------------------------------------------------------------------------------------------+--------------------+-----+",
"| 0.9965400387585364 | 0.9965400387585364 | 120 |",
"+--------------------------------------------------------------------------------------------------------------------------------+--------------------+-----+",
] In the case of this PR the first batch is sorted to: [
"+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
"| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1 | c3 |",
"+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
"| 0.8506721053047003 | 0.8506721053047003 | -117 |",
"| 0.9706712283358269 | 0.9706712283358269 | -117 |",
"| 0.152498292971736 | 0.152498292971736 | -111 |",
"| 0.36936304600612724 | 0.36936304600612724 | -107 |",
"| 0.565352842229935 | 0.565352842229935 | -106 |",
"+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
] I.e. without preserving the order for the same C3 [
"+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
"| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1 | c3 |",
"+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
"| 0.9706712283358269 | 0.9706712283358269 | -117 |",
"| 0.8506721053047003 | 0.8506721053047003 | -117 |",
"| 0.152498292971736 | 0.152498292971736 | -111 |",
"| 0.36936304600612724 | 0.36936304600612724 | -107 |",
"| 0.565352842229935 | 0.565352842229935 | -106 |",
"+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
] Still need to look into this a bit more. |
I consolidated/extended the fuzz tests (they all pass locally); I've ran into one more edge case while doing that: ca0786f#diff-c0e76bbcb3ed7bfbba2f99fedfdab7ebb9200746a835db51619a6b10e3e2adcfR255-R262 This might have some runtime implications on it's own, so it will be worthwhile to double check the benchmarks afterwards (i.e. once I resolve the other failing test). It could also be that there is a more elegant way of solving that. |
c8d96b2
to
7b1a4d2
Compare
Fwiw, I was able to narrow down the reason why that last test is failing; TLDR: unstable sort and changed batch sizes result in different order of items with the same value in the sort column between main and this PR. DetailsThe repro involves an array 1 with 100 elements (this is basically column C3 from the test file) and array 2 with all elements from array 1 except one (which doesn't affect the output, namely 120) use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, SortColumn};
use arrow_array::{ArrayRef, Int32Array};
let options = SortOptions {
descending: false,
nulls_first: false,
};
let get_indices = |sort_column: SortColumn, fetch: Option<usize>| -> Vec<u32> {
lexsort_to_indices(&vec![sort_column], fetch)
.unwrap()
.into_iter()
.map(|maybe_index| maybe_index.unwrap_or_else(|| panic!("No index!")))
.collect::<Vec<u32>>()
};
let array_1: ArrayRef = Arc::new(Int32Array::from(vec![
1, -40, 29, -85, -82, -111, 104, 13, 38, -38, 57, -54, 112, 113, 54, 103, 49,
-98, 77, 97, -56, -99, 36, -53, -29, -25, 123, -31, 45, 17, 97, -60, 36, -5,
13, 41, 93, 73, -2, 22, 63, 102, -8, 17, 52, 68, 31, -24, 65, 125, 17, -106,
-59, 55, -60, -76, 73, -117, -101, 62, -79, 68, 70, -61, 74, 122, 71, -94,
-72, 71, 96, -48, -56, 52, -5, 12, 64, -90, -86, -117, 14, 29, -59, 83, -12,
3, -72, -107, 118, 97, -101, -43, -101, -44, 5, 120, -95, 123, 47, 30,
]));
let sort_column_1 = SortColumn {
values: array_1,
options: Some(options.clone()),
};
let indices_1_no_fetch = get_indices(sort_column_1.clone(), None);
let indices_1_fetch_5 = get_indices(sort_column_1, Some(5));
println!("Indices for sorting array 1");
println!("{:#?}", indices_1_no_fetch[0..5].to_vec());
println!("{:#?}", indices_1_fetch_5);
let array_2: ArrayRef = Arc::new(Int32Array::from(vec![
1, -40, 29, -85, -82, -111, 104, 13, 38, -38, 57, -54, 112, 113, 54, 103, 49,
-98, 77, 97, -56, -99, 36, -53, -29, -25, 123, -31, 45, 17, 97, -60, 36, -5,
13, 41, 93, 73, -2, 22, 63, 102, -8, 17, 52, 68, 31, -24, 65, 125, 17, -106,
-59, 55, -60, -76, 73, -117, -101, 62, -79, 68, 70, -61, 74, 122, 71, -94,
-72, 71, 96, -48, -56, 52, -5, 12, 64, -90, -86, -117, 14, 29, -59, 83, -12,
3, -72, -107, 118, 97, -101, -43, -101, -44, 5, -95, 123, 47, 30,
]));
let sort_column_2 = SortColumn {
values: array_2,
options: Some(options),
};
let indices_2_no_fetch = get_indices(sort_column_2.clone(), None);
let indices_2_fetch_5 = get_indices(sort_column_2, Some(5));
println!("Indices for sorting array 2");
println!("{:#?}", indices_2_no_fetch[0..5].to_vec());
println!("{:#?}", indices_2_fetch_5); This prints out:
Note that in the case of the second array the first two indices are swapped when setting some fetch size, as then the unstable sort gets used, and the two smallest array values are the same (-117). Perhaps it's ok to change the test output to match the one being obtained now? |
very nice 🕵️ work
SELECT
MAX(c12) OVER window1,
MIN(c12) OVER window2 as max1
FROM aggregate_test_100
WINDOW window1 AS (ORDER BY C12),
window2 AS (PARTITION BY C11),
window3 AS (ORDER BY C1)
ORDER BY C3
LIMIT 5 If you are saying there are more than 5 values with the same value of c5 then I think technically the query produces non specified answers and thus I think we should change the test to be deterministic -- perhaps we can increase the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this PR is almost ready to go -- thank you very much @gruuya. I left a suggestion about the test and memory accounting, but otherwise 🚀
} else { | ||
assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0); | ||
} | ||
#[rstest] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
❤️
// Eagerly sort the batch to potentially reduce the number of rows | ||
// after applying the fetch parameter; first perform a memory reservation | ||
// for the sorting procedure. | ||
let mut reservation = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need a new consumer here for each batch insertion -- we could just update the main reservation on self.reservation
So something like do the sort and then update the size on limit
input = sort_batch(&input, &self.expr, self.fetch)?;
reservation.try_grow(input.get_array_memory_size());
Note the accounting is reworked in #7130
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the reasoning for the new consumer was that we only want to reserve a bit of memory briefly to account for the overhead of keeping both the sorted and original batch at the same time. Given that in this case we drop the old batch asap (due to input
re-assignment) in favor of a smaller/truncated one I'd agree that a new consumer is not needed in that case. (Same could be said about the consumer/reservation in sort_batch_stream
, though in that case since it isn't given that there is a LIMIT
we could end up holding 2 same-sized batches at one point in time.)
That said, reservation.try_grow(input.get_array_memory_size())
does then get called immediately below to check whether the new (sorted/truncated) batch can be kept in memory without breaking the configured limit, so I don't think there's a need for another try_grow
prior to that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah that makes sense
I think I think you could use (they very newly added) Reservation::split()
to track the short allocation and free it on drop without needing an entirely new consumer:
perhaps like
let batch_reservation = self.reservation.split(input.get_arra_memory_size());
input = sort_batch(&input, &self.expr, self.fetch)?;
// free allocation for previous input
// (it would also be freed on drop so the free isn't necessary but may be clearer)
batch_reservation.free();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, so I went ahead to add this, but then it seemed to me that the logic of split
is backwards (for this case at least)? In particular, the size
of the reservation is not free memory, but instead used memory, right? And with split
+ free
we'd effectively deduct some piece of memory in our accounting that was previously allocated elsewhere (without actually freeing that memory).
Atm I went with skipping reservation updates at all in this case; the reason being that we're already holding the input batch in memory and we're are about to (try to) allocate memory for it right after sorting it anyway. Hence, the sole purpose of that reservation seems to be to account for that extra piece of memory due to non-in-place sorting (i.e. having the old and the new bath present at the same time briefly), and that piece of memory is guaranteed to be less than the batch size. Let me know if you'd like me to do something else.
I also changed that tests output. Now all tests should be passing, so it's probably time for the regular benchmarks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Atm I went with skipping reservation updates at all in this case;
I think this make sense -- especially since the extra memory for batches is likely small given k is very often small.
.await | ||
} | ||
|
||
// We expect to see lower memory thresholds in general when applying a `LIMIT` clause due to eager sorting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is nice
Oh actually I meant that there are only 2 identical values for the sorting column (C3) which end up in the top-5 according to the test query, but their order is unstable when using a ❯ SELECT
MAX(c12) OVER window1,
MIN(c12) OVER window2 as max1,
C3
FROM aggregate_test_100
WINDOW window1 AS (ORDER BY C12),
window2 AS (PARTITION BY C11),
window3 AS (ORDER BY C1)
ORDER BY C3;
+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+
| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1 | c3 |
+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+
| 0.9706712283358269 | 0.9706712283358269 | -117 |
| 0.8506721053047003 | 0.8506721053047003 | -117 |
| 0.152498292971736 | 0.152498292971736 | -111 |
| 0.36936304600612724 | 0.36936304600612724 | -107 |
| 0.565352842229935 | 0.565352842229935 | -106 |
...
| 0.7631239070049998 | 0.7631239070049998 | 125 |
+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+
100 rows in set. Query took 0.154 seconds.
❯ SELECT
MAX(c12) OVER window1,
MIN(c12) OVER window2 as max1,
C3
FROM aggregate_test_100
WINDOW window1 AS (ORDER BY C12),
window2 AS (PARTITION BY C11),
window3 AS (ORDER BY C1)
ORDER BY C3
LIMIT 5;
+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+
| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1 | c3 |
+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+
| 0.8506721053047003 | 0.8506721053047003 | -117 |
| 0.9706712283358269 | 0.9706712283358269 | -117 |
| 0.152498292971736 | 0.152498292971736 | -111 |
| 0.36936304600612724 | 0.36936304600612724 | -107 |
| 0.565352842229935 | 0.565352842229935 | -106 |
+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+
5 rows in set. Query took 0.042 seconds. (Note that I've included the C3 column for clarity.) One quirky thing that also contributes to this difference is that the incoming batches are always split into first with 99 rows and the second with 1 row, and that seems kind of strange. While previously they would have first been concatenated into 99 + 1 = 100 rows batch and then sorted, they are now sorted first and so the effect from #7180 (comment) kicks in. |
This holds for both no-fetch and fetch cases equally, unlike the case of multiple sorted batches.
Yeah, I agree this test is not well designed as it is "non deterministic" (there is no one single right answer) I think either updating the output or updating the test are both appropriate in this case We could also add |
The new output has swapped frist two rows since the sort column value is the same and with LIMIT in place the sort in unstable.
Would it be possible to compare the sorting performance of this pull request with the main branch by modifying the test sets in the description of #6163 with different values of I am interested in these tests because we are reintroducing per-batch sorting (for TopK), which was noted as a source of inefficiency in #6163 (comment):
By analyzing the results of the suggested tests, we can determine whether we need to implement simple heuristics to determine when to use eager sorting (if |
Certainly; I started up a slightly modified sorting benchmark with an additional outer loop |
How about K slightly smaller than the batch size? Say K=8000 when batch size=8192. |
Sorry didn't see this comment in time, maybe I could add that case later on. Anyway I've got some preliminary results and they're not very promising (if there's a change it's non-insignificant slowdown) sorting benchmarks
Strangely I see slowdown even for base cases when eager sorting does not apply. I'll need to think about it a bit. |
I wonder if it could be related to the benchmark itself somehow -- perhaps you could run the benchmark again main twice and see if you got consistent results? I believe I was seeing some non consistent results when I was running last week myself |
Even when a fetch value is specified do not try to merge-stream sorted in-mem batches during the sorting procedure,\n as this seems to introduce time regressions.
Ok, I went back and re-visited a change I made previously, and it looks like it was definitely contributing to consistent systemic regression I initially observed (as it was adding some extra sort calls). I also think I found a better way to solve that issue (basically convert all the batches from a single spill into individual streams) that I pushed now. This still results in the optimized memory profiles depicted in the PR description. Here are the new sorting benchmarks (10 iterations, for fetch in sorting benchmarks
Note that the base cases now show no changes compared to main, which is what I expect. There are a couple of speedups, but still the overall direction is regression-y. This is especially pronounced for K = 8000 (good idea for including that value @yjshen). It could be that there are some further micro-optimizations waiting that would push the runtimes for this branch to merge-acceptable territory while keeping the memory usage as is, though I doubt it. I'll give it some more thought and report back if I come up with something. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your diligence @gruuya -- in my opinion this code is now good enough to be merged.
I had one comment that could save a sort that might be worth looking at, but I don't think it is required.
Really nice work
return self.sort_batch_stream(batch, metrics); | ||
// Even if all individual batches were themselves sorted the resulting concatenated one | ||
// isn't guaranteed to be sorted, so we must perform sorting on the stream. | ||
return self.sort_batch_stream(batch, false, metrics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another approach might be be to not use the concat
in place heuristic if there are any previously sorted batches -- now the code only checks for the overall size less than 1MB -- it could also check if there was any true in in_mem_batches
and if there are use the merge path below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point; will try to benchmark that change too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So I did try to test this approach as well, and then saw some improvements that seemed too good to be true. I went and re-ran the benchmarks again and the improvements held, until they didn't at some point 🤷🏻♂️ (fwiw I'm running the benchmarks on a cloud VM, not dedicated hardware).
In hindsight, the sorting benchmarks actually do not use a memory limit and so there were no spills and this code path wasn't exercised. I did try running the benchmarks with memory limits on, but then I hit Dictionary replacement detected when writing IPC file format.
arrow error during spilling. It seems like this is a general problem as it happens on the main branch too, though I haven't investigated further.
Either way, I'll add this check now even without doing benchmarking on it because it seems it can only help.
#[rstest] | ||
#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 20_000)] | ||
#[case::cant_spill_to_disk(vec!["Memory Exhausted while Sorting (DiskManager is disabled)"], 40_000)] | ||
//#[case::no_oom(vec![], 80_000)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't quite understand why atm, but this test case now fails. In fact, the no-oom threshold is like 10 times that value which is even weirder. I did double-check with bytehound and the memory profiles are still looking good outside of this test. Will need to think about it.
Thanks @alamb for the reviews and timely feedback.
I'd like to emphasize that there are still regressions with this approach. In fact in case of larger files (> 1GB) with K in 1000-8000 range, the runtime seems to be hit the most, with probably negligible memory improvements (if any). Anecdotally, the original file I've been testing does now show considerable speedup though, but that is perhaps not a typical file size (146M). So it's a mixed bag really, and I'm not sure it's best for this to be merged as is. |
if input.num_rows() == 0 { | ||
return Ok(()); | ||
} | ||
|
||
let mut batch_sorted = false; | ||
if self.fetch.map_or(false, |f| f < input.num_rows()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm thinking can we make the heuristic f < input.num_rows() / 10
or something magic numbers to only do eager sort for small `K's?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also thinking something similar but along the lines of f < input.num_rows() && f <= 100
, so that we effectively have a hard cur-off for eager sorting after 100 rows.
/// | ||
/// Each spill file has one or more batches. Intra-batch order is guaranteed (each one is sorted), | ||
/// but the inter-batch ordering is not guaranteed, hence why we need to convert each batch from the | ||
/// spill to a separate input stream for the merge-sort procedure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this produce unnecessary comparison when the inter-batch ordering in spillfiles are guaranteed for the normal case without K
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think not, because than it should be single record batch per spill. I can add a sort counter to double-check though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A spill file is generated by in_mem_sort()
and then spill. And spill_sorted_batches
takes batches: Vec<RecordBatch>
as one of the arguments.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point; for some reason I thought that only 1 batch gets collected from in_mem_sort_stream
in case of no fetch, even though that is not a given.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fwiw, I did revise the approach to stream the entire spill in one stream when inter-batch order is guaranteed (which involved keeping track of this as well). I also added the hard cut-off for eager batching to 100 rows.
sorting benchmarks
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query ┃ main ┃ top-k-eager-sorting ┃ Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8 │ 28233.61ms │ 29251.16ms │ no change │
│ fetch None │ │ │ │
│ Qsort int │ 36739.73ms │ 36870.39ms │ no change │
│ fetch None │ │ │ │
│ Qsort │ 29974.54ms │ 30480.98ms │ no change │
│ decimal │ │ │ │
│ fetch None │ │ │ │
│ Qsort │ 40214.81ms │ 40804.29ms │ no change │
│ integer │ │ │ │
│ tuple fetch │ │ │ │
│ None │ │ │ │
│ Qsort utf8 │ 29429.65ms │ 29875.64ms │ no change │
│ tuple fetch │ │ │ │
│ None │ │ │ │
│ Qsort mixed │ 34435.12ms │ 34415.10ms │ no change │
│ tuple fetch │ │ │ │
│ None │ │ │ │
│ Qsort utf8 │ 1709.85ms │ 1829.22ms │ 1.07x slower │
│ fetch │ │ │ │
│ Some(1) │ │ │ │
│ Qsort int │ 1842.45ms │ 1749.96ms │ +1.05x faster │
│ fetch │ │ │ │
│ Some(1) │ │ │ │
│ Qsort │ 1680.46ms │ 1677.47ms │ no change │
│ decimal │ │ │ │
│ fetch │ │ │ │
│ Some(1) │ │ │ │
│ Qsort │ 1745.56ms │ 1871.46ms │ 1.07x slower │
│ integer │ │ │ │
│ tuple fetch │ │ │ │
│ Some(1) │ │ │ │
│ Qsort utf8 │ 1965.73ms │ 2331.78ms │ 1.19x slower │
│ tuple fetch │ │ │ │
│ Some(1) │ │ │ │
│ Qsort mixed │ 1813.14ms │ 2061.60ms │ 1.14x slower │
│ tuple fetch │ │ │ │
│ Some(1) │ │ │ │
│ Qsort utf8 │ 1712.66ms │ 1829.49ms │ 1.07x slower │
│ fetch │ │ │ │
│ Some(10) │ │ │ │
│ Qsort int │ 1832.73ms │ 1759.00ms │ no change │
│ fetch │ │ │ │
│ Some(10) │ │ │ │
│ Qsort │ 1699.42ms │ 1686.28ms │ no change │
│ decimal │ │ │ │
│ fetch │ │ │ │
│ Some(10) │ │ │ │
│ Qsort │ 1762.86ms │ 1877.71ms │ 1.07x slower │
│ integer │ │ │ │
│ tuple fetch │ │ │ │
│ Some(10) │ │ │ │
│ Qsort utf8 │ 1974.07ms │ 2337.96ms │ 1.18x slower │
│ tuple fetch │ │ │ │
│ Some(10) │ │ │ │
│ Qsort mixed │ 1854.64ms │ 2078.75ms │ 1.12x slower │
│ tuple fetch │ │ │ │
│ Some(10) │ │ │ │
│ Qsort utf8 │ 1733.28ms │ 1853.74ms │ 1.07x slower │
│ fetch │ │ │ │
│ Some(100) │ │ │ │
│ Qsort int │ 1845.74ms │ 1797.14ms │ no change │
│ fetch │ │ │ │
│ Some(100) │ │ │ │
│ Qsort │ 1685.38ms │ 1712.97ms │ no change │
│ decimal │ │ │ │
│ fetch │ │ │ │
│ Some(100) │ │ │ │
│ Qsort │ 1799.54ms │ 1907.62ms │ 1.06x slower │
│ integer │ │ │ │
│ tuple fetch │ │ │ │
│ Some(100) │ │ │ │
│ Qsort utf8 │ 2008.06ms │ 2371.06ms │ 1.18x slower │
│ tuple fetch │ │ │ │
│ Some(100) │ │ │ │
│ Qsort mixed │ 1855.62ms │ 2110.55ms │ 1.14x slower │
│ tuple fetch │ │ │ │
│ Some(100) │ │ │ │
│ Qsort utf8 │ 1728.54ms │ 1760.71ms │ no change │
│ fetch │ │ │ │
│ Some(101) │ │ │ │
│ Qsort int │ 1867.64ms │ 1902.02ms │ no change │
│ fetch │ │ │ │
│ Some(101) │ │ │ │
│ Qsort │ 1691.19ms │ 1708.74ms │ no change │
│ decimal │ │ │ │
│ fetch │ │ │ │
│ Some(101) │ │ │ │
│ Qsort │ 1784.79ms │ 1830.82ms │ no change │
│ integer │ │ │ │
│ tuple fetch │ │ │ │
│ Some(101) │ │ │ │
│ Qsort utf8 │ 1999.16ms │ 2034.22ms │ no change │
│ tuple fetch │ │ │ │
│ Some(101) │ │ │ │
│ Qsort mixed │ 1869.82ms │ 1914.20ms │ no change │
│ tuple fetch │ │ │ │
│ Some(101) │ │ │ │
│ Qsort utf8 │ 1774.79ms │ 1814.19ms │ no change │
│ fetch │ │ │ │
│ Some(1000) │ │ │ │
│ Qsort int │ 1959.51ms │ 1999.51ms │ no change │
│ fetch │ │ │ │
│ Some(1000) │ │ │ │
│ Qsort │ 1714.06ms │ 1730.36ms │ no change │
│ decimal │ │ │ │
│ fetch │ │ │ │
│ Some(1000) │ │ │ │
│ Qsort │ 1940.85ms │ 1969.82ms │ no change │
│ integer │ │ │ │
│ tuple fetch │ │ │ │
│ Some(1000) │ │ │ │
│ Qsort utf8 │ 2103.61ms │ 2132.59ms │ no change │
│ tuple fetch │ │ │ │
│ Some(1000) │ │ │ │
│ Qsort mixed │ 2032.56ms │ 2087.27ms │ no change │
│ tuple fetch │ │ │ │
│ Some(1000) │ │ │ │
└──────────────┴────────────┴─────────────────────┴───────────────┘
Overall the times are not that impressive, and the implementation is quite convoluted at this point, so I wouldn't merge this either.
I agree. A lot of us are thinking about solving the "top K" problem these days, I feel we should be able to find a solution that will achieve desirable results without the regressions. |
I will not merge this until we get consensus on the approach |
Wait until performance regressions are resolved / consensus is reached
This is to cover the case when the inter-batch order is guaranteed.
Alright, @alamb @yjshen, this is probably the final, best-effort, attempt so far, with the simplest implementation. I've set a hard cut-off for eager sorting at 100 rows so that the pronounced regressions for bigger fetches are avoided (at least for the 1.0 scale factor in the benchmarks). It also happens that most of the memory savings occur for K <= 100, in addition to probably 80% of usages in practice falling into that range. sorting benchmarks
Still not sure whether this is merge-worthy, but seems like the best trade-off between run-times and memory profiles so far. EDIT: Curiously, this seems to result in a speedup for larger file sizes (as controlled by the benchmark scale factor), see for instance: sorting benchmarks at 3x scale factor
and sorting benchmarks at 5x scale factor
|
// Eagerly sort the batch to potentially reduce the number of rows | ||
// after applying the fetch parameter. | ||
// Currently only applied for fetch of 100 rows or less. | ||
input = sort_batch(&input, &self.expr, self.fetch)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether you could recover part of the perf difference by concat + sorting only once every n
(say every 10) batches. The selectivity of the limit and total work to be performed is much bigger for sorting 81920 vs 8192 rows, also the merging to be performed will be over fewer batches.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's worth trying to benchmark as well. It could also be something like every n
unsorted rows or every size
unsorted bytes to accommodate for variability in batch row count.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, definitely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fwiw, I did try out a variety of approaches along these lines, but still failed to eliminate the perf difference. There's probably a combination of parameters that is just right and could yield better results, but with the native Top-K operator in the works that likely won't be necessary.
I have been thinking about this PR and implementing It is my opinion that we could keep DataFusion simpler and make this query faster and use less memory by implementing a real The basic idea would be to keep a heap of the top K items (using Row Format / specialization for single columns) and I think we could even prototype the idea using the same Given how important Sort / Limit is I would like to help. What do you think @gruuya and @Dandandan and @ozankabak ? |
I think trying out the Even if it turns out we can't, I think that effort will result in some interesting learnings we should be able to leverage to optimize the LIMIT case via other approaches. |
Agreed, real That said, that may take some time, and this PR feels to be on the verge of breaking even when it comes to performance and thus serve as a bandaid/base case until the real thing is implemented. So I'm compelled to continue experimenting with a couple of related approaches (i.e. things along the line of @Dandandan's batched eager sorting) for a short while if you don't mind (and share if I find something promising). |
Sounds like a good plan @gruuya -- I believe I have some time this afternoon I can devote to trying to get a prototype for TopK up and running. I'll report back here with any progress |
I have made some progress on a native TopK operator -- I need to run some performance tests and polish it up and then I'll share it. I am sure it will need some additional work but I think it is looking promising |
Glad to hear that @alamb! I think we can close this PR now as it seems it's obsolete (as well as too verbose at this point), and continue discussion in a dedicated issue/PR for a native Top-K operator. |
FWIW I made a very simplistic POC for a special TopK operator and for my test (K=10) it goes almost 2x faster and uses 1/3 the peak memory. There is also plenty of room for both algorithmic and engineering optimizations. I'll be hacking on the idea in #7250 and will make a broader announcement when ready |
Which issue does this PR close?
Partially addresses #7149
Rationale for this change
Try to take advantage of the
fetch
count known to theSortExec
to reduce the size of the sorted batches that are later merged.What changes are included in this PR?
Instead of accumulating the full batches prior to sorting/spilling them in preparation for the merge-sort, try to do the sorting ahead of time on each incoming batch inside of the
ExternalSorter
.Are these changes tested?
The existing sort-spill tests pass. As for the timing and memory implications, using the setup:
jemallocator::Jemalloc
for the global allocator in order to record the memory profiles using bytehoundhttps://seafowl-public.s3.eu-west-1.amazonaws.com/tutorial/trase-supply-chains.parquet
as target (146M on-disk size) for the external tableCREATE EXTERNAL TABLE supply_chains STORED AS PARQUET LOCATION '/home/ubuntu/supply-chains.parquet';
SELECT * FROM supply_chains ORDER BY flow_id DESC LIMIT K
for K=1, 10, 100, 1000I've recorded the following:
Are there any user-facing changes?
Only runtime/memory profiles.