Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Top-K eager batch sorting #7180

Closed
wants to merge 11 commits into from
Closed

Conversation

gruuya
Copy link
Contributor

@gruuya gruuya commented Aug 2, 2023

Which issue does this PR close?

Partially addresses #7149

Rationale for this change

Try to take advantage of the fetch count known to the SortExec to reduce the size of the sorted batches that are later merged.

What changes are included in this PR?

Instead of accumulating the full batches prior to sorting/spilling them in preparation for the merge-sort, try to do the sorting ahead of time on each incoming batch inside of the ExternalSorter.

Are these changes tested?

The existing sort-spill tests pass. As for the timing and memory implications, using the setup:

  • jemallocator::Jemalloc for the global allocator in order to record the memory profiles using bytehound
  • https://seafowl-public.s3.eu-west-1.amazonaws.com/tutorial/trase-supply-chains.parquet as target (146M on-disk size) for the external table CREATE EXTERNAL TABLE supply_chains STORED AS PARQUET LOCATION '/home/ubuntu/supply-chains.parquet';
  • run SELECT * FROM supply_chains ORDER BY flow_id DESC LIMIT K for K=1, 10, 100, 1000

I've recorded the following:

  1. current main
slika 2. this PR slika

Are there any user-facing changes?

Only runtime/memory profiles.

@github-actions github-actions bot added the core Core DataFusion crate label Aug 2, 2023
@alamb
Copy link
Contributor

alamb commented Aug 2, 2023

FYI #7179 may also be interesting (I will happily update the docs to match this code if we pursue it)

@gruuya
Copy link
Contributor Author

gruuya commented Aug 2, 2023

I pushed some changes, so that now in the absence of the LIMIT clause (fetch is None) the external sorting algorithm defaults to the previous logic.

Also in case the LIMIT is present but it's lower than the batch row count skip eager batch sorting (since there's no memory benefit from it then), and defer it to afterwards (i.e. on spill, or the final sort call). Strangely this seems to cause a minor regression compared to eager-sort-always (39d6d16) for medium valued LIMITs (e.g. 10K and 100K), so some further investigation is needed.

Some numbers for the example above (all in release mode):

K Time main(e39b5ca) Time this PR(dcba28f) Peak memory main(e39b5ca) Peak memory this PR(dcba28f)
1 8.477 7.119 2.5G 27M
10 7.684 7.043 2.5G 32M
100 8.082 7.233 2.5G 70M
1000 9.312 9.248 2.5G 640M
10000 35.991 34.844 2.5G 2.5G

EDIT:
To get a broader picture about the performance I also tested with memory limits on

diff --git a/datafusion-cli/src/main.rs b/datafusion-cli/src/main.rs
index aea499d60..b479370d9 100644
--- a/datafusion-cli/src/main.rs
+++ b/datafusion-cli/src/main.rs
...
 fn create_runtime_env() -> Result<RuntimeEnv> {
-    let rn_config = RuntimeConfig::new();
+    let rn_config = RuntimeConfig::new().with_memory_limit(256 * 1024 * 1024, 0.7);
     RuntimeEnv::new(rn_config)
 }
K Time main(e39b5ca) Time this PR(dcba28f) Peak memory main(e39b5ca) Peak memory this PR(dcba28f)
1 7.385 6.981 200M 30M
10 7.106 7.107 200M 32M
100 7.495 7.561 200M 71M
1000 8.769 8.902 200M 205M
10000 22.544 22.951 225M 220M

The times are again comparable, but slightly in favor of the present approach this time around, though I suppose I'd need more than N=1 samples to compare those meaningfully. The memory benefits on the other hand are still clearly on the side of top-k eager sorting in this PR.

Testing on a larger external file

In addition I repeated the entire profiling for another larger file (on disk-size 2.4GB, num of rows 46.2 million, 12 columns).
Here are the results W/O memory limits

K Time main(e39b5ca) Time this PR(dcba28f) Peak memory main(e39b5ca) Peak memory this PR(dcba28f)
1 9.571 6.978 7G 2.57G
10 9.569 6.706 7G 2.57G
100 9.622 6.686 7G 2.62G
1000 9.845 7.566 7G 3.1G
10000 14.173 14.426 7G 7G

and again the results with same memory limits as above for this new file

K Time main(e39b5ca) Time this PR(dcba28f) Peak memory main(e39b5ca) Peak memory this PR(dcba28f)
1 7.809 6.908 2.7G 2.57G
10 7.627 6.731 2.7G 2.57G
100 7.610 6.687 2.7G 2.62G
1000 8.001 9.567 2.7G 2.7G
10000 11.677 11.867 2.7G 2.7G

Similar all-round story (though memory improvements are more modest now), with the exception that in this case the same memory limits are not translated to reality effectively as for the smaller file above (caps at 2.7G instead of ~200M).

@gruuya gruuya changed the title Eager external sorting Top-K eager external sorting Aug 3, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gruuya -- I took a quick look at this code and it looks like a good idea to me. I do think we can improve the situation substantially more with a dedicated operator, but this seems like a good step forward

cc @NGA-TRAN this might help us in IOx as well

I am running some benchmarks against this branch and will report back shortly


// TODO: This should probably be try_grow (#5885)
reservation.resize(input.get_array_memory_size());
// Maybe we should perform sorting in a parallel task to unblock the caller
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the parallelization is working correctly all the other cores should be busy doing something useful here -- and thus sorting in another task may not be warranted but I am not sure

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, I think that makes sense; will remove the comment.

@alamb
Copy link
Contributor

alamb commented Aug 3, 2023

I also was thinking we can probably write a test for this behavior in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs

I have something similar in #7130 (review)

I can potentially help writing that test if you agree it would be helpful

@gruuya
Copy link
Contributor Author

gruuya commented Aug 3, 2023

I also was thinking we can probably write a test for this behavior in https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs

I have something similar in #7130 (review)

I can potentially help writing that test if you agree it would be helpful

Thanks, I certainly planned on adding tests, just wanted to concentrate on the benchmarks first. I'm thinking of starting with order_spill_fuzz.rs file for starters, extend the cases for some limit numbers, then if it's needed I can extend https://github.com/apache/arrow-datafusion/blob/main/datafusion/core/tests/memory_limit.rs as well.

Incidentally, there's a failing test now, which I've been looking at and it's a bit curious—the difference comes down to the fact that previously processed batch size was a bit different (due to the fetch) and the intermediate sort call was stable (i.e. preserved order), while currently it's unstable (didn't preserve order).

To be more specific, in this particular test there are always 2 input batches (I've added the column C3 for more clarity).
One with 99 rows

[
    "+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+",
    "| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1                 | c3   |",
    "+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+",
    "| 0.01479305307777301                                                                                                            | 0.01479305307777301  | -86  |",
    "| 0.02182578039211991                                                                                                            | 0.02182578039211991  | -38  |",
    "| 0.03968347085780355                                                                                                            | 0.03968347085780355  | 30   |",
    "| 0.04429073092078406                                                                                                            | 0.04429073092078406  | 36   |",
    "| 0.047343434291126085                                                                                                           | 0.047343434291126085 | -95  |",
    ...
    "| 0.980809631269599                                                                                                              | 0.980809631269599    | 52   |",
    "| 0.991517828651004                                                                                                              | 0.991517828651004    | 29   |",
    "+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+",
]

and one with 1 row:

[
    "+--------------------------------------------------------------------------------------------------------------------------------+--------------------+-----+",
    "| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1               | c3  |",
    "+--------------------------------------------------------------------------------------------------------------------------------+--------------------+-----+",
    "| 0.9965400387585364                                                                                                             | 0.9965400387585364 | 120 |",
    "+--------------------------------------------------------------------------------------------------------------------------------+--------------------+-----+",
]

In the case of this PR the first batch is sorted to:

[
    "+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
    "| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1                | c3   |",
    "+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
    "| 0.8506721053047003                                                                                                             | 0.8506721053047003  | -117 |",
    "| 0.9706712283358269                                                                                                             | 0.9706712283358269  | -117 |",
    "| 0.152498292971736                                                                                                              | 0.152498292971736   | -111 |",
    "| 0.36936304600612724                                                                                                            | 0.36936304600612724 | -107 |",
    "| 0.565352842229935                                                                                                              | 0.565352842229935   | -106 |",
    "+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
]

I.e. without preserving the order for the same C3
On main this isn't manifested since the two batches get concatenated into a batch with 100 rows and the sorting of that batch turns out to be stable for some reason:

[
    "+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
    "| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1                | c3   |",
    "+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
    "| 0.9706712283358269                                                                                                             | 0.9706712283358269  | -117 |",
    "| 0.8506721053047003                                                                                                             | 0.8506721053047003  | -117 |",
    "| 0.152498292971736                                                                                                              | 0.152498292971736   | -111 |",
    "| 0.36936304600612724                                                                                                            | 0.36936304600612724 | -107 |",
    "| 0.565352842229935                                                                                                              | 0.565352842229935   | -106 |",
    "+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+",
]

Still need to look into this a bit more.

@gruuya
Copy link
Contributor Author

gruuya commented Aug 3, 2023

I consolidated/extended the fuzz tests (they all pass locally); I've ran into one more edge case while doing that: ca0786f#diff-c0e76bbcb3ed7bfbba2f99fedfdab7ebb9200746a835db51619a6b10e3e2adcfR255-R262

This might have some runtime implications on it's own, so it will be worthwhile to double check the benchmarks afterwards (i.e. once I resolve the other failing test). It could also be that there is a more elegant way of solving that.

@gruuya
Copy link
Contributor Author

gruuya commented Aug 4, 2023

Fwiw, I was able to narrow down the reason why that last test is failing; TLDR: unstable sort and changed batch sizes result in different order of items with the same value in the sort column between main and this PR.

Details

The repro involves an array 1 with 100 elements (this is basically column C3 from the test file) and array 2 with all elements from array 1 except one (which doesn't affect the output, namely 120)

use arrow::compute::SortOptions;
use arrow::compute::{lexsort_to_indices, SortColumn};
use arrow_array::{ArrayRef, Int32Array};
let options = SortOptions {
    descending: false,
    nulls_first: false,
};
let get_indices = |sort_column: SortColumn, fetch: Option<usize>| -> Vec<u32> {
    lexsort_to_indices(&vec![sort_column], fetch)
        .unwrap()
        .into_iter()
        .map(|maybe_index| maybe_index.unwrap_or_else(|| panic!("No index!")))
        .collect::<Vec<u32>>()
};
let array_1: ArrayRef = Arc::new(Int32Array::from(vec![
    1, -40, 29, -85, -82, -111, 104, 13, 38, -38, 57, -54, 112, 113, 54, 103, 49,
    -98, 77, 97, -56, -99, 36, -53, -29, -25, 123, -31, 45, 17, 97, -60, 36, -5,
    13, 41, 93, 73, -2, 22, 63, 102, -8, 17, 52, 68, 31, -24, 65, 125, 17, -106,
    -59, 55, -60, -76, 73, -117, -101, 62, -79, 68, 70, -61, 74, 122, 71, -94,
    -72, 71, 96, -48, -56, 52, -5, 12, 64, -90, -86, -117, 14, 29, -59, 83, -12,
    3, -72, -107, 118, 97, -101, -43, -101, -44, 5, 120, -95, 123, 47, 30,
]));
let sort_column_1 = SortColumn {
    values: array_1,
    options: Some(options.clone()),
};
let indices_1_no_fetch = get_indices(sort_column_1.clone(), None);
let indices_1_fetch_5 = get_indices(sort_column_1, Some(5));
println!("Indices for sorting array 1");
println!("{:#?}", indices_1_no_fetch[0..5].to_vec());
println!("{:#?}", indices_1_fetch_5);

let array_2: ArrayRef = Arc::new(Int32Array::from(vec![
    1, -40, 29, -85, -82, -111, 104, 13, 38, -38, 57, -54, 112, 113, 54, 103, 49,
    -98, 77, 97, -56, -99, 36, -53, -29, -25, 123, -31, 45, 17, 97, -60, 36, -5,
    13, 41, 93, 73, -2, 22, 63, 102, -8, 17, 52, 68, 31, -24, 65, 125, 17, -106,
    -59, 55, -60, -76, 73, -117, -101, 62, -79, 68, 70, -61, 74, 122, 71, -94,
    -72, 71, 96, -48, -56, 52, -5, 12, 64, -90, -86, -117, 14, 29, -59, 83, -12,
    3, -72, -107, 118, 97, -101, -43, -101, -44, 5, -95, 123, 47, 30,
]));
let sort_column_2 = SortColumn {
    values: array_2,
    options: Some(options),
};
let indices_2_no_fetch = get_indices(sort_column_2.clone(), None);
let indices_2_fetch_5 = get_indices(sort_column_2, Some(5));
println!("Indices for sorting array 2");
println!("{:#?}", indices_2_no_fetch[0..5].to_vec());
println!("{:#?}", indices_2_fetch_5);

This prints out:

Indices for sorting array 1
[
    57,
    79,
    5,
    87,
    51,
]
[
    57,
    79,
    5,
    87,
    51,
]
Indices for sorting array 2
[
    57,
    79,
    5,
    87,
    51,
]
[
    79,
    57,
    5,
    87,
    51,
]

Note that in the case of the second array the first two indices are swapped when setting some fetch size, as then the unstable sort gets used, and the two smallest array values are the same (-117).

Perhaps it's ok to change the test output to match the one being obtained now?

@alamb
Copy link
Contributor

alamb commented Aug 4, 2023

TLDR: unstable sort and changed batch sizes result in different order of items with the same value in the sort column between main and this PR.

very nice 🕵️ work

Perhaps it's ok to change the test output to match the one being obtained now?

SELECT
  MAX(c12) OVER window1,
  MIN(c12) OVER window2 as max1
  FROM aggregate_test_100
  WINDOW window1 AS (ORDER BY C12),
  window2 AS (PARTITION BY C11),
  window3 AS (ORDER BY C1)
  ORDER BY C3
  LIMIT 5

If you are saying there are more than 5 values with the same value of c5 then I think technically the query produces non specified answers and thus I think we should change the test to be deterministic -- perhaps we can increase the LIMIT to include all the first values of c3

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this PR is almost ready to go -- thank you very much @gruuya. I left a suggestion about the test and memory accounting, but otherwise 🚀

} else {
assert_eq!(sort.metrics().unwrap().spill_count().unwrap(), 0);
}
#[rstest]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

// Eagerly sort the batch to potentially reduce the number of rows
// after applying the fetch parameter; first perform a memory reservation
// for the sorting procedure.
let mut reservation =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need a new consumer here for each batch insertion -- we could just update the main reservation on self.reservation

So something like do the sort and then update the size on limit

            input = sort_batch(&input, &self.expr, self.fetch)?;
            reservation.try_grow(input.get_array_memory_size());

Note the accounting is reworked in #7130

Copy link
Contributor Author

@gruuya gruuya Aug 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the reasoning for the new consumer was that we only want to reserve a bit of memory briefly to account for the overhead of keeping both the sorted and original batch at the same time. Given that in this case we drop the old batch asap (due to input re-assignment) in favor of a smaller/truncated one I'd agree that a new consumer is not needed in that case. (Same could be said about the consumer/reservation in sort_batch_stream, though in that case since it isn't given that there is a LIMIT we could end up holding 2 same-sized batches at one point in time.)

That said, reservation.try_grow(input.get_array_memory_size()) does then get called immediately below to check whether the new (sorted/truncated) batch can be kept in memory without breaking the configured limit, so I don't think there's a need for another try_grow prior to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah that makes sense

I think I think you could use (they very newly added) Reservation::split() to track the short allocation and free it on drop without needing an entirely new consumer:

https://github.com/apache/arrow-datafusion/blob/23547587c2773ddddb9b16cba1eb8ebf2eebd85a/datafusion/execution/src/memory_pool/mod.rs#L225-L231

perhaps like

let batch_reservation = self.reservation.split(input.get_arra_memory_size());
input = sort_batch(&input, &self.expr, self.fetch)?;
// free allocation for previous input
// (it would also be freed on drop so the free isn't necessary but may be clearer)
batch_reservation.free();

Copy link
Contributor Author

@gruuya gruuya Aug 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so I went ahead to add this, but then it seemed to me that the logic of split is backwards (for this case at least)? In particular, the size of the reservation is not free memory, but instead used memory, right? And with split + free we'd effectively deduct some piece of memory in our accounting that was previously allocated elsewhere (without actually freeing that memory).

Atm I went with skipping reservation updates at all in this case; the reason being that we're already holding the input batch in memory and we're are about to (try to) allocate memory for it right after sorting it anyway. Hence, the sole purpose of that reservation seems to be to account for that extra piece of memory due to non-in-place sorting (i.e. having the old and the new bath present at the same time briefly), and that piece of memory is guaranteed to be less than the batch size. Let me know if you'd like me to do something else.

I also changed that tests output. Now all tests should be passing, so it's probably time for the regular benchmarks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Atm I went with skipping reservation updates at all in this case;

I think this make sense -- especially since the extra memory for batches is likely small given k is very often small.

.await
}

// We expect to see lower memory thresholds in general when applying a `LIMIT` clause due to eager sorting
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is nice

@gruuya
Copy link
Contributor Author

gruuya commented Aug 4, 2023

If you are saying there are more than 5 values with the same value of c5 then I think technically the query produces non specified answers and thus I think we should change the test to be deterministic -- perhaps we can increase the LIMIT to include all the first values of c3

Oh actually I meant that there are only 2 identical values for the sorting column (C3) which end up in the top-5 according to the test query, but their order is unstable when using a LIMIT as opposed to without it:

SELECT
  MAX(c12) OVER window1,
  MIN(c12) OVER window2 as max1,
  C3
  FROM aggregate_test_100
  WINDOW window1 AS (ORDER BY C12),
  window2 AS (PARTITION BY C11),
  window3 AS (ORDER BY C1)
  ORDER BY C3;
+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+
| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1                 | c3   |
+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+
| 0.9706712283358269                                                                                                             | 0.9706712283358269   | -117 |
| 0.8506721053047003                                                                                                             | 0.8506721053047003   | -117 |
| 0.152498292971736                                                                                                              | 0.152498292971736    | -111 |
| 0.36936304600612724                                                                                                            | 0.36936304600612724  | -107 |
| 0.565352842229935                                                                                                              | 0.565352842229935    | -106 |
...
| 0.7631239070049998                                                                                                             | 0.7631239070049998   | 125  |
+--------------------------------------------------------------------------------------------------------------------------------+----------------------+------+
100 rows in set. Query took 0.154 seconds.

❯ SELECT
  MAX(c12) OVER window1,
  MIN(c12) OVER window2 as max1,
  C3
  FROM aggregate_test_100
  WINDOW window1 AS (ORDER BY C12),
  window2 AS (PARTITION BY C11),
  window3 AS (ORDER BY C1)
  ORDER BY C3
  LIMIT 5;
+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+
| MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW | max1                | c3   |
+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+
| 0.8506721053047003                                                                                                             | 0.8506721053047003  | -117 |
| 0.9706712283358269                                                                                                             | 0.9706712283358269  | -117 |
| 0.152498292971736                                                                                                              | 0.152498292971736   | -111 |
| 0.36936304600612724                                                                                                            | 0.36936304600612724 | -107 |
| 0.565352842229935                                                                                                              | 0.565352842229935   | -106 |
+--------------------------------------------------------------------------------------------------------------------------------+---------------------+------+
5 rows in set. Query took 0.042 seconds.

(Note that I've included the C3 column for clarity.)

One quirky thing that also contributes to this difference is that the incoming batches are always split into first with 99 rows and the second with 1 row, and that seems kind of strange. While previously they would have first been concatenated into 99 + 1 = 100 rows batch and then sorted, they are now sorted first and so the effect from #7180 (comment) kicks in.

@gruuya gruuya marked this pull request as ready for review August 4, 2023 20:46
This holds for both no-fetch and fetch cases equally, unlike the case of multiple sorted batches.
@alamb
Copy link
Contributor

alamb commented Aug 5, 2023

Oh actually I meant that there are only 2 identical values for the sorting column (C3) which end up in the top-5 according to the test query, but their order is unstable when using a LIMIT as opposed to without it:

Yeah, I agree this test is not well designed as it is "non deterministic" (there is no one single right answer)

I think either updating the output or updating the test are both appropriate in this case

We could also add rowsort to the query, but that might obscure real ordering bugs given the query has an ORDER BY.

The new output has swapped frist two rows since the sort column value is the same and with LIMIT in place the sort in unstable.
@github-actions github-actions bot added the sqllogictest SQL Logic Tests (.slt) label Aug 5, 2023
@gruuya gruuya changed the title Top-K eager external sorting Top-K eager batch sorting Aug 5, 2023
@yjshen
Copy link
Member

yjshen commented Aug 7, 2023

Would it be possible to compare the sorting performance of this pull request with the main branch by modifying the test sets in the description of #6163 with different values of K?

I am interested in these tests because we are reintroducing per-batch sorting (for TopK), which was noted as a source of inefficiency in #6163 (comment):

The cost of sorting and copying all the values an additional time far outweighs any cache locality effects.

By analyzing the results of the suggested tests, we can determine whether we need to implement simple heuristics to determine when to use eager sorting (if K is similar to batch_size) or if the performance improvement in #6163 is mainly due to the adaptive sorting algorithm over index-based sorting, and thus eager sorting will not cause performance regression.

@gruuya
Copy link
Contributor Author

gruuya commented Aug 7, 2023

Would it be possible to compare the sorting performance of this pull request with the main branch by modifying the test sets in the description of #6163 with different values of K?

Certainly; I started up a slightly modified sorting benchmark with an additional outer loop for fetch in &[None, Some(1), Some(10), Some(100), Some(1000), Some(10000)]. The idea is that the first and last case should not hit the eager sorting code heuristic (because of the missing fetch, and fetch being larger than batch sizes respectively), so they should be same as on main. Will report back today hopefully.

@yjshen
Copy link
Member

yjshen commented Aug 7, 2023

How about K slightly smaller than the batch size? Say K=8000 when batch size=8192.

@gruuya
Copy link
Contributor Author

gruuya commented Aug 7, 2023

How about K slightly smaller than the batch size? Say K=8000 when batch size=8192.

Sorry didn't see this comment in time, maybe I could add that case later on.

Anyway I've got some preliminary results and they're not very promising (if there's a change it's non-insignificant slowdown)

sorting benchmarks
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ top-k-eager-sorting ┃       Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━┩
│ Qsort utf8   │ 35484.89ms │          37195.91ms │    no change │
│ with fetch   │            │                     │              │
│ None         │            │                     │              │
│ Qsort int    │ 46779.63ms │          46817.47ms │    no change │
│ with fetch   │            │                     │              │
│ None         │            │                     │              │
│ Qsort        │ 36037.43ms │          36471.88ms │    no change │
│ decimal with │            │                     │              │
│ fetch None   │            │                     │              │
│ Qsort        │ 51767.49ms │          52308.29ms │    no change │
│ integer      │            │                     │              │
│ tuple with   │            │                     │              │
│ fetch None   │            │                     │              │
│ Qsort utf8   │ 35248.66ms │          35866.21ms │    no change │
│ tuple with   │            │                     │              │
│ fetch None   │            │                     │              │
│ Qsort mixed  │ 41124.15ms │          42406.47ms │    no change │
│ tuple with   │            │                     │              │
│ fetch None   │            │                     │              │
│ Qsort utf8   │  1894.58ms │           2081.23ms │ 1.10x slower │
│ with fetch   │            │                     │              │
│ Some(1)      │            │                     │              │
│ Qsort int    │  1995.16ms │           1992.60ms │    no change │
│ with fetch   │            │                     │              │
│ Some(1)      │            │                     │              │
│ Qsort        │  1811.48ms │           1924.61ms │ 1.06x slower │
│ decimal with │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1)      │            │                     │              │
│ Qsort        │  1873.19ms │           2138.09ms │ 1.14x slower │
│ integer      │            │                     │              │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1)      │            │                     │              │
│ Qsort utf8   │  2106.81ms │           2584.92ms │ 1.23x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1)      │            │                     │              │
│ Qsort mixed  │  1948.55ms │           2304.72ms │ 1.18x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1)      │            │                     │              │
│ Qsort utf8   │  1841.42ms │           2084.12ms │ 1.13x slower │
│ with fetch   │            │                     │              │
│ Some(10)     │            │                     │              │
│ Qsort int    │  1967.58ms │           1994.95ms │    no change │
│ with fetch   │            │                     │              │
│ Some(10)     │            │                     │              │
│ Qsort        │  1811.81ms │           1906.41ms │ 1.05x slower │
│ decimal with │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10)     │            │                     │              │
│ Qsort        │  1902.72ms │           2107.02ms │ 1.11x slower │
│ integer      │            │                     │              │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10)     │            │                     │              │
│ Qsort utf8   │  2096.74ms │           2567.12ms │ 1.22x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10)     │            │                     │              │
│ Qsort mixed  │  1983.58ms │           2285.19ms │ 1.15x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10)     │            │                     │              │
│ Qsort utf8   │  1860.15ms │           2098.58ms │ 1.13x slower │
│ with fetch   │            │                     │              │
│ Some(100)    │            │                     │              │
│ Qsort int    │  2017.14ms │           2018.33ms │    no change │
│ with fetch   │            │                     │              │
│ Some(100)    │            │                     │              │
│ Qsort        │  1831.45ms │           1980.09ms │ 1.08x slower │
│ decimal with │            │                     │              │
│ fetch        │            │                     │              │
│ Some(100)    │            │                     │              │
│ Qsort        │  1933.97ms │           2171.31ms │ 1.12x slower │
│ integer      │            │                     │              │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(100)    │            │                     │              │
│ Qsort utf8   │  2155.04ms │           2657.61ms │ 1.23x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(100)    │            │                     │              │
│ Qsort mixed  │  2028.39ms │           2365.85ms │ 1.17x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(100)    │            │                     │              │
│ Qsort utf8   │  1970.11ms │           2160.91ms │ 1.10x slower │
│ with fetch   │            │                     │              │
│ Some(1000)   │            │                     │              │
│ Qsort int    │  2142.49ms │           2123.38ms │    no change │
│ with fetch   │            │                     │              │
│ Some(1000)   │            │                     │              │
│ Qsort        │  1891.53ms │           2007.44ms │ 1.06x slower │
│ decimal with │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1000)   │            │                     │              │
│ Qsort        │  2089.55ms │           2424.50ms │ 1.16x slower │
│ integer      │            │                     │              │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1000)   │            │                     │              │
│ Qsort utf8   │  2251.87ms │           2860.59ms │ 1.27x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1000)   │            │                     │              │
│ Qsort mixed  │  2215.35ms │           2739.82ms │ 1.24x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(1000)   │            │                     │              │
│ Qsort utf8   │  2273.12ms │           2440.54ms │ 1.07x slower │
│ with fetch   │            │                     │              │
│ Some(10000)  │            │                     │              │
│ Qsort int    │  2352.27ms │           2540.47ms │ 1.08x slower │
│ with fetch   │            │                     │              │
│ Some(10000)  │            │                     │              │
│ Qsort        │  2062.62ms │           2255.48ms │ 1.09x slower │
│ decimal with │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10000)  │            │                     │              │
│ Qsort        │  2741.88ms │           2937.61ms │ 1.07x slower │
│ integer      │            │                     │              │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10000)  │            │                     │              │
│ Qsort utf8   │  2554.86ms │           2733.78ms │ 1.07x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10000)  │            │                     │              │
│ Qsort mixed  │  3408.55ms │           3589.02ms │ 1.05x slower │
│ tuple with   │            │                     │              │
│ fetch        │            │                     │              │
│ Some(10000)  │            │                     │              │
└──────────────┴────────────┴─────────────────────┴──────────────┘

Strangely I see slowdown even for base cases when eager sorting does not apply. I'll need to think about it a bit.

@alamb
Copy link
Contributor

alamb commented Aug 7, 2023

Strangely I see slowdown even for base cases when eager sorting does not apply. I'll need to think about it a bit.

I wonder if it could be related to the benchmark itself somehow -- perhaps you could run the benchmark again main twice and see if you got consistent results?

I believe I was seeing some non consistent results when I was running last week myself

Even when a fetch value is specified do not try to merge-stream sorted in-mem batches during the sorting procedure,\n as this seems to introduce time regressions.
@gruuya
Copy link
Contributor Author

gruuya commented Aug 7, 2023

Ok, I went back and re-visited a change I made previously, and it looks like it was definitely contributing to consistent systemic regression I initially observed (as it was adding some extra sort calls).

I also think I found a better way to solve that issue (basically convert all the batches from a single spill into individual streams) that I pushed now. This still results in the optimized memory profiles depicted in the PR description.

Here are the new sorting benchmarks (10 iterations, for fetch in None, Some(1), Some(10), Some(100), Some(1000), Some(8000), Some(10000)):

sorting benchmarks
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ top-k-eager-sorting ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8   │ 37157.19ms │          37730.09ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort int    │ 48290.35ms │          49269.24ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort        │ 38306.38ms │          38645.93ms │     no change │
│ decimal      │            │                     │               │
│ fetch None   │            │                     │               │
│ Qsort        │ 53457.21ms │          54205.26ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │ 38104.45ms │          38006.01ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort mixed  │ 43931.69ms │          43924.41ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │  1869.92ms │           1899.74ms │     no change │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort int    │  2011.87ms │           1821.10ms │ +1.10x faster │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1851.76ms │           1760.68ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1914.27ms │           1953.97ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  2138.20ms │           2417.24ms │  1.13x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort mixed  │  1999.94ms │           2128.49ms │  1.06x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  1904.48ms │           1906.43ms │     no change │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort int    │  2025.38ms │           1833.70ms │ +1.10x faster │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1837.30ms │           1787.43ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1905.57ms │           1962.79ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  2125.98ms │           2436.01ms │  1.15x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort mixed  │  2010.12ms │           2144.80ms │  1.07x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  1910.96ms │           1961.26ms │     no change │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort int    │  2033.24ms │           1869.98ms │ +1.09x faster │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1845.95ms │           1782.47ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1962.15ms │           1986.78ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  2157.68ms │           2467.28ms │  1.14x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort mixed  │  2046.18ms │           2199.65ms │  1.08x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  1983.56ms │           2035.27ms │     no change │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort int    │  2151.34ms │           1995.38ms │ +1.08x faster │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  1899.04ms │           1855.03ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  2103.93ms │           2243.71ms │  1.07x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort utf8   │  2274.01ms │           2662.95ms │  1.17x slower │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort mixed  │  2224.79ms │           2587.77ms │  1.16x slower │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort utf8   │  2176.51ms │           2745.07ms │  1.26x slower │
│ fetch        │            │                     │               │
│ Some(8000)   │            │                     │               │
│ Qsort int    │  2300.06ms │           2695.76ms │  1.17x slower │
│ fetch        │            │                     │               │
│ Some(8000)   │            │                     │               │
│ Qsort        │  1990.73ms │           2343.61ms │  1.18x slower │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(8000)   │            │                     │               │
│ Qsort        │  2758.47ms │           4018.36ms │  1.46x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(8000)   │            │                     │               │
│ Qsort utf8   │  2903.87ms │           4070.53ms │  1.40x slower │
│ tuple fetch  │            │                     │               │
│ Some(8000)   │            │                     │               │
│ Qsort mixed  │  3540.07ms │           5640.86ms │  1.59x slower │
│ tuple fetch  │            │                     │               │
│ Some(8000)   │            │                     │               │
│ Qsort utf8   │  2255.55ms │           2181.42ms │     no change │
│ fetch        │            │                     │               │
│ Some(10000)  │            │                     │               │
│ Qsort int    │  2380.78ms │           2367.10ms │     no change │
│ fetch        │            │                     │               │
│ Some(10000)  │            │                     │               │
│ Qsort        │  2108.30ms │           2022.66ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(10000)  │            │                     │               │
│ Qsort        │  2817.34ms │           2758.43ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(10000)  │            │                     │               │
│ Qsort utf8   │  2625.46ms │           2564.71ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(10000)  │            │                     │               │
│ Qsort mixed  │  3457.03ms │           3418.22ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(10000)  │            │                     │               │
└──────────────┴────────────┴─────────────────────┴───────────────┘

Note that the base cases now show no changes compared to main, which is what I expect. There are a couple of speedups, but still the overall direction is regression-y. This is especially pronounced for K = 8000 (good idea for including that value @yjshen).

It could be that there are some further micro-optimizations waiting that would push the runtimes for this branch to merge-acceptable territory while keeping the memory usage as is, though I doubt it. I'll give it some more thought and report back if I come up with something.

alamb
alamb previously approved these changes Aug 7, 2023
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your diligence @gruuya -- in my opinion this code is now good enough to be merged.

I had one comment that could save a sort that might be worth looking at, but I don't think it is required.

Really nice work

return self.sort_batch_stream(batch, metrics);
// Even if all individual batches were themselves sorted the resulting concatenated one
// isn't guaranteed to be sorted, so we must perform sorting on the stream.
return self.sort_batch_stream(batch, false, metrics);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another approach might be be to not use the concat in place heuristic if there are any previously sorted batches -- now the code only checks for the overall size less than 1MB -- it could also check if there was any true in in_mem_batches and if there are use the merge path below

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point; will try to benchmark that change too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I did try to test this approach as well, and then saw some improvements that seemed too good to be true. I went and re-ran the benchmarks again and the improvements held, until they didn't at some point 🤷🏻‍♂️ (fwiw I'm running the benchmarks on a cloud VM, not dedicated hardware).

In hindsight, the sorting benchmarks actually do not use a memory limit and so there were no spills and this code path wasn't exercised. I did try running the benchmarks with memory limits on, but then I hit Dictionary replacement detected when writing IPC file format. arrow error during spilling. It seems like this is a general problem as it happens on the main branch too, though I haven't investigated further.

Either way, I'll add this check now even without doing benchmarking on it because it seems it can only help.

#[rstest]
#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 20_000)]
#[case::cant_spill_to_disk(vec!["Memory Exhausted while Sorting (DiskManager is disabled)"], 40_000)]
//#[case::no_oom(vec![], 80_000)]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand why atm, but this test case now fails. In fact, the no-oom threshold is like 10 times that value which is even weirder. I did double-check with bytehound and the memory profiles are still looking good outside of this test. Will need to think about it.

@gruuya
Copy link
Contributor Author

gruuya commented Aug 7, 2023

Thanks @alamb for the reviews and timely feedback.

in my opinion this code is now good enough to be merged.

I'd like to emphasize that there are still regressions with this approach. In fact in case of larger files (> 1GB) with K in 1000-8000 range, the runtime seems to be hit the most, with probably negligible memory improvements (if any). Anecdotally, the original file I've been testing does now show considerable speedup though, but that is perhaps not a typical file size (146M). So it's a mixed bag really, and I'm not sure it's best for this to be merged as is.

if input.num_rows() == 0 {
return Ok(());
}

let mut batch_sorted = false;
if self.fetch.map_or(false, |f| f < input.num_rows()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking can we make the heuristic f < input.num_rows() / 10 or something magic numbers to only do eager sort for small `K's?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was also thinking something similar but along the lines of f < input.num_rows() && f <= 100, so that we effectively have a hard cur-off for eager sorting after 100 rows.

///
/// Each spill file has one or more batches. Intra-batch order is guaranteed (each one is sorted),
/// but the inter-batch ordering is not guaranteed, hence why we need to convert each batch from the
/// spill to a separate input stream for the merge-sort procedure.
Copy link
Member

@yjshen yjshen Aug 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this produce unnecessary comparison when the inter-batch ordering in spillfiles are guaranteed for the normal case without K?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think not, because than it should be single record batch per spill. I can add a sort counter to double-check though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A spill file is generated by in_mem_sort() and then spill. And spill_sorted_batches takes batches: Vec<RecordBatch> as one of the arguments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, good point; for some reason I thought that only 1 batch gets collected from in_mem_sort_stream in case of no fetch, even though that is not a given.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, I did revise the approach to stream the entire spill in one stream when inter-batch order is guaranteed (which involved keeping track of this as well). I also added the hard cut-off for eager batching to 100 rows.

sorting benchmarks
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ top-k-eager-sorting ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8   │ 28233.61ms │          29251.16ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort int    │ 36739.73ms │          36870.39ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort        │ 29974.54ms │          30480.98ms │     no change │
│ decimal      │            │                     │               │
│ fetch None   │            │                     │               │
│ Qsort        │ 40214.81ms │          40804.29ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │ 29429.65ms │          29875.64ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort mixed  │ 34435.12ms │          34415.10ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │  1709.85ms │           1829.22ms │  1.07x slower │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort int    │  1842.45ms │           1749.96ms │ +1.05x faster │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1680.46ms │           1677.47ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1745.56ms │           1871.46ms │  1.07x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  1965.73ms │           2331.78ms │  1.19x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort mixed  │  1813.14ms │           2061.60ms │  1.14x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  1712.66ms │           1829.49ms │  1.07x slower │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort int    │  1832.73ms │           1759.00ms │     no change │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1699.42ms │           1686.28ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1762.86ms │           1877.71ms │  1.07x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  1974.07ms │           2337.96ms │  1.18x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort mixed  │  1854.64ms │           2078.75ms │  1.12x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  1733.28ms │           1853.74ms │  1.07x slower │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort int    │  1845.74ms │           1797.14ms │     no change │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1685.38ms │           1712.97ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1799.54ms │           1907.62ms │  1.06x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  2008.06ms │           2371.06ms │  1.18x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort mixed  │  1855.62ms │           2110.55ms │  1.14x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  1728.54ms │           1760.71ms │     no change │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort int    │  1867.64ms │           1902.02ms │     no change │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort        │  1691.19ms │           1708.74ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort        │  1784.79ms │           1830.82ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort utf8   │  1999.16ms │           2034.22ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort mixed  │  1869.82ms │           1914.20ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort utf8   │  1774.79ms │           1814.19ms │     no change │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort int    │  1959.51ms │           1999.51ms │     no change │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  1714.06ms │           1730.36ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  1940.85ms │           1969.82ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort utf8   │  2103.61ms │           2132.59ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort mixed  │  2032.56ms │           2087.27ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
└──────────────┴────────────┴─────────────────────┴───────────────┘

Overall the times are not that impressive, and the implementation is quite convoluted at this point, so I wouldn't merge this either.

@ozankabak
Copy link
Contributor

I'd like to emphasize that there are still regressions with this approach. In fact in case of larger files (> 1GB) with K in 1000-8000 range, the runtime seems to be hit the most, with probably negligible memory improvements (if any). Anecdotally, the original file I've been testing does now show considerable speedup though, but that is perhaps not a typical file size (146M). So it's a mixed bag really, and I'm not sure it's best for this to be merged as is.

I agree. A lot of us are thinking about solving the "top K" problem these days, I feel we should be able to find a solution that will achieve desirable results without the regressions.

@alamb
Copy link
Contributor

alamb commented Aug 8, 2023

I will not merge this until we get consensus on the approach

@alamb alamb dismissed their stale review August 8, 2023 14:08

Wait until performance regressions are resolved / consensus is reached

@gruuya
Copy link
Contributor Author

gruuya commented Aug 8, 2023

Alright, @alamb @yjshen, this is probably the final, best-effort, attempt so far, with the simplest implementation.

I've set a hard cut-off for eager sorting at 100 rows so that the pronounced regressions for bigger fetches are avoided (at least for the 1.0 scale factor in the benchmarks). It also happens that most of the memory savings occur for K <= 100, in addition to probably 80% of usages in practice falling into that range.

sorting benchmarks
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃       main ┃ top-k-eager-sorting ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8   │ 28233.61ms │          29131.55ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort int    │ 36739.73ms │          37069.77ms │     no change │
│ fetch None   │            │                     │               │
│ Qsort        │ 29974.54ms │          30505.64ms │     no change │
│ decimal      │            │                     │               │
│ fetch None   │            │                     │               │
│ Qsort        │ 40214.81ms │          40837.69ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │ 29429.65ms │          29552.35ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort mixed  │ 34435.12ms │          34964.91ms │     no change │
│ tuple fetch  │            │                     │               │
│ None         │            │                     │               │
│ Qsort utf8   │  1709.85ms │           1813.60ms │  1.06x slower │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort int    │  1842.45ms │           1734.15ms │ +1.06x faster │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1680.46ms │           1669.21ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort        │  1745.56ms │           1869.70ms │  1.07x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  1965.73ms │           2339.40ms │  1.19x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort mixed  │  1813.14ms │           2046.39ms │  1.13x slower │
│ tuple fetch  │            │                     │               │
│ Some(1)      │            │                     │               │
│ Qsort utf8   │  1712.66ms │           1823.54ms │  1.06x slower │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort int    │  1832.73ms │           1738.45ms │ +1.05x faster │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1699.42ms │           1662.90ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort        │  1762.86ms │           1855.61ms │  1.05x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  1974.07ms │           2323.84ms │  1.18x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort mixed  │  1854.64ms │           2025.10ms │  1.09x slower │
│ tuple fetch  │            │                     │               │
│ Some(10)     │            │                     │               │
│ Qsort utf8   │  1733.28ms │           1834.81ms │  1.06x slower │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort int    │  1845.74ms │           1751.06ms │ +1.05x faster │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1685.38ms │           1680.20ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort        │  1799.54ms │           1911.41ms │  1.06x slower │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  2008.06ms │           2368.46ms │  1.18x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort mixed  │  1855.62ms │           2085.46ms │  1.12x slower │
│ tuple fetch  │            │                     │               │
│ Some(100)    │            │                     │               │
│ Qsort utf8   │  1728.54ms │           1728.53ms │     no change │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort int    │  1867.64ms │           1865.83ms │     no change │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort        │  1691.19ms │           1693.69ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort        │  1784.79ms │           1801.06ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort utf8   │  1999.16ms │           2000.34ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort mixed  │  1869.82ms │           1877.08ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(101)    │            │                     │               │
│ Qsort utf8   │  1774.79ms │           1786.39ms │     no change │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort int    │  1959.51ms │           1933.94ms │     no change │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  1714.06ms │           1715.05ms │     no change │
│ decimal      │            │                     │               │
│ fetch        │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort        │  1940.85ms │           1943.97ms │     no change │
│ integer      │            │                     │               │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort utf8   │  2103.61ms │           2108.73ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
│ Qsort mixed  │  2032.56ms │           2060.30ms │     no change │
│ tuple fetch  │            │                     │               │
│ Some(1000)   │            │                     │               │
└──────────────┴────────────┴─────────────────────┴───────────────┘

Still not sure whether this is merge-worthy, but seems like the best trade-off between run-times and memory profiles so far.

EDIT:

Curiously, this seems to result in a speedup for larger file sizes (as controlled by the benchmark scale factor), see for instance:

sorting benchmarks at 3x scale factor
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃   main-3x ┃ top-k-eager-sorting-3x ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8   │ 5933.24ms │              5012.70ms │ +1.18x faster │
│ fetch        │           │                        │               │
│ Some(1)      │           │                        │               │
│ Qsort int    │ 6496.29ms │              5202.70ms │ +1.25x faster │
│ fetch        │           │                        │               │
│ Some(1)      │           │                        │               │
│ Qsort        │ 5411.53ms │              5013.61ms │ +1.08x faster │
│ decimal      │           │                        │               │
│ fetch        │           │                        │               │
│ Some(1)      │           │                        │               │
│ Qsort        │ 6113.03ms │              5614.54ms │ +1.09x faster │
│ integer      │           │                        │               │
│ tuple fetch  │           │                        │               │
│ Some(1)      │           │                        │               │
│ Qsort utf8   │ 6913.41ms │              7080.71ms │     no change │
│ tuple fetch  │           │                        │               │
│ Some(1)      │           │                        │               │
│ Qsort mixed  │ 5981.34ms │              6190.89ms │     no change │
│ tuple fetch  │           │                        │               │
│ Some(1)      │           │                        │               │
│ Qsort utf8   │ 5966.03ms │              5189.21ms │ +1.15x faster │
│ fetch        │           │                        │               │
│ Some(10)     │           │                        │               │
│ Qsort int    │ 6612.69ms │              5177.90ms │ +1.28x faster │
│ fetch        │           │                        │               │
│ Some(10)     │           │                        │               │
│ Qsort        │ 5832.09ms │              5008.34ms │ +1.16x faster │
│ decimal      │           │                        │               │
│ fetch        │           │                        │               │
│ Some(10)     │           │                        │               │
│ Qsort        │ 6242.88ms │              5371.00ms │ +1.16x faster │
│ integer      │           │                        │               │
│ tuple fetch  │           │                        │               │
│ Some(10)     │           │                        │               │
│ Qsort utf8   │ 6993.52ms │              7042.05ms │     no change │
│ tuple fetch  │           │                        │               │
│ Some(10)     │           │                        │               │
│ Qsort mixed  │ 6695.45ms │              6222.60ms │ +1.08x faster │
│ tuple fetch  │           │                        │               │
│ Some(10)     │           │                        │               │
│ Qsort utf8   │ 6058.16ms │              5655.45ms │ +1.07x faster │
│ fetch        │           │                        │               │
│ Some(100)    │           │                        │               │
│ Qsort int    │ 7305.25ms │              5350.27ms │ +1.37x faster │
│ fetch        │           │                        │               │
│ Some(100)    │           │                        │               │
│ Qsort        │ 6034.78ms │              5128.72ms │ +1.18x faster │
│ decimal      │           │                        │               │
│ fetch        │           │                        │               │
│ Some(100)    │           │                        │               │
│ Qsort        │ 6499.52ms │              5827.22ms │ +1.12x faster │
│ integer      │           │                        │               │
│ tuple fetch  │           │                        │               │
│ Some(100)    │           │                        │               │
│ Qsort utf8   │ 7032.34ms │              7152.44ms │     no change │
│ tuple fetch  │           │                        │               │
│ Some(100)    │           │                        │               │
│ Qsort mixed  │ 6475.82ms │              6388.50ms │     no change │
│ tuple fetch  │           │                        │               │
│ Some(100)    │           │                        │               │
│ Qsort utf8   │ 5924.83ms │              5770.70ms │     no change │
│ fetch        │           │                        │               │
│ Some(101)    │           │                        │               │
│ Qsort int    │ 6995.09ms │              6754.10ms │     no change │
│ fetch        │           │                        │               │
│ Some(101)    │           │                        │               │
│ Qsort        │ 5792.79ms │              5595.89ms │     no change │
│ decimal      │           │                        │               │
│ fetch        │           │                        │               │
│ Some(101)    │           │                        │               │
│ Qsort        │ 6289.37ms │              6303.32ms │     no change │
│ integer      │           │                        │               │
│ tuple fetch  │           │                        │               │
│ Some(101)    │           │                        │               │
│ Qsort utf8   │ 6978.01ms │              6910.07ms │     no change │
│ tuple fetch  │           │                        │               │
│ Some(101)    │           │                        │               │
│ Qsort mixed  │ 6591.68ms │              6640.04ms │     no change │
│ tuple fetch  │           │                        │               │
│ Some(101)    │           │                        │               │
└──────────────┴───────────┴────────────────────────┴───────────────┘

and

sorting benchmarks at 5x scale factor
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃    main-5x ┃ top-k-eager-sorting-5x ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8   │  9255.76ms │              8771.16ms │ +1.06x faster │
│ fetch        │            │                        │               │
│ Some(1)      │            │                        │               │
│ Qsort int    │ 12739.78ms │              8254.16ms │ +1.54x faster │
│ fetch        │            │                        │               │
│ Some(1)      │            │                        │               │
│ Qsort        │  9347.40ms │              7924.59ms │ +1.18x faster │
│ decimal      │            │                        │               │
│ fetch        │            │                        │               │
│ Some(1)      │            │                        │               │
│ Qsort        │ 10666.33ms │              8960.77ms │ +1.19x faster │
│ integer      │            │                        │               │
│ tuple fetch  │            │                        │               │
│ Some(1)      │            │                        │               │
│ Qsort utf8   │ 12187.03ms │             11218.87ms │ +1.09x faster │
│ tuple fetch  │            │                        │               │
│ Some(1)      │            │                        │               │
│ Qsort mixed  │ 10572.96ms │              9952.78ms │ +1.06x faster │
│ tuple fetch  │            │                        │               │
│ Some(1)      │            │                        │               │
│ Qsort utf8   │  9302.46ms │              8641.35ms │ +1.08x faster │
│ fetch        │            │                        │               │
│ Some(10)     │            │                        │               │
│ Qsort int    │ 12219.65ms │              8198.37ms │ +1.49x faster │
│ fetch        │            │                        │               │
│ Some(10)     │            │                        │               │
│ Qsort        │  9174.84ms │              7883.77ms │ +1.16x faster │
│ decimal      │            │                        │               │
│ fetch        │            │                        │               │
│ Some(10)     │            │                        │               │
│ Qsort        │ 10248.38ms │              8918.93ms │ +1.15x faster │
│ integer      │            │                        │               │
│ tuple fetch  │            │                        │               │
│ Some(10)     │            │                        │               │
│ Qsort utf8   │ 11875.86ms │             11407.02ms │     no change │
│ tuple fetch  │            │                        │               │
│ Some(10)     │            │                        │               │
│ Qsort mixed  │ 11078.38ms │              9921.77ms │ +1.12x faster │
│ tuple fetch  │            │                        │               │
│ Some(10)     │            │                        │               │
│ Qsort utf8   │  9767.50ms │              8829.77ms │ +1.11x faster │
│ fetch        │            │                        │               │
│ Some(100)    │            │                        │               │
│ Qsort int    │ 12666.82ms │              8383.07ms │ +1.51x faster │
│ fetch        │            │                        │               │
│ Some(100)    │            │                        │               │
│ Qsort        │  9221.71ms │              8194.20ms │ +1.13x faster │
│ decimal      │            │                        │               │
│ fetch        │            │                        │               │
│ Some(100)    │            │                        │               │
│ Qsort        │ 11187.49ms │              9075.40ms │ +1.23x faster │
│ integer      │            │                        │               │
│ tuple fetch  │            │                        │               │
│ Some(100)    │            │                        │               │
│ Qsort utf8   │ 12335.79ms │             11420.75ms │ +1.08x faster │
│ tuple fetch  │            │                        │               │
│ Some(100)    │            │                        │               │
│ Qsort mixed  │ 11624.44ms │             10292.47ms │ +1.13x faster │
│ tuple fetch  │            │                        │               │
│ Some(100)    │            │                        │               │
│ Qsort utf8   │  9642.31ms │              9515.85ms │     no change │
│ fetch        │            │                        │               │
│ Some(101)    │            │                        │               │
│ Qsort int    │ 13492.60ms │             12820.44ms │     no change │
│ fetch        │            │                        │               │
│ Some(101)    │            │                        │               │
│ Qsort        │  9377.99ms │              9237.80ms │     no change │
│ decimal      │            │                        │               │
│ fetch        │            │                        │               │
│ Some(101)    │            │                        │               │
│ Qsort        │ 11192.37ms │             10938.08ms │     no change │
│ integer      │            │                        │               │
│ tuple fetch  │            │                        │               │
│ Some(101)    │            │                        │               │
│ Qsort utf8   │ 12397.22ms │             12163.53ms │     no change │
│ tuple fetch  │            │                        │               │
│ Some(101)    │            │                        │               │
│ Qsort mixed  │ 11874.67ms │             11567.16ms │     no change │
│ tuple fetch  │            │                        │               │
│ Some(101)    │            │                        │               │
└──────────────┴────────────┴────────────────────────┴───────────────┘

// Eagerly sort the batch to potentially reduce the number of rows
// after applying the fetch parameter.
// Currently only applied for fetch of 100 rows or less.
input = sort_batch(&input, &self.expr, self.fetch)?;
Copy link
Contributor

@Dandandan Dandandan Aug 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether you could recover part of the perf difference by concat + sorting only once every n (say every 10) batches. The selectivity of the limit and total work to be performed is much bigger for sorting 81920 vs 8192 rows, also the merging to be performed will be over fewer batches.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's worth trying to benchmark as well. It could also be something like every n unsorted rows or every size unsorted bytes to accommodate for variability in batch row count.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, definitely.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fwiw, I did try out a variety of approaches along these lines, but still failed to eliminate the perf difference. There's probably a combination of parameters that is just right and could yield better results, but with the native Top-K operator in the works that likely won't be necessary.

@alamb
Copy link
Contributor

alamb commented Aug 9, 2023

I have been thinking about this PR and implementing LIMIT in general

It is my opinion that we could keep DataFusion simpler and make this query faster and use less memory by implementing a real TopKExec rather than trying to optimize SortExec more

The basic idea would be to keep a heap of the top K items (using Row Format / specialization for single columns) and I think we could even prototype the idea using the same SortExec but using a different stream when there was a fetch.

Given how important Sort / Limit is I would like to help.

What do you think @gruuya and @Dandandan and @ozankabak ?

@ozankabak
Copy link
Contributor

I think trying out the TopK operator idea is definitely worthwhile. I think we will need to be very careful with the implementation (e.g. the heap part) to get it to perform, but I have a feeling that we should be able to get decent performance if we get the implementation right.

Even if it turns out we can't, I think that effort will result in some interesting learnings we should be able to leverage to optimize the LIMIT case via other approaches.

@gruuya
Copy link
Contributor Author

gruuya commented Aug 9, 2023

Agreed, real TopKExec would be fantastic (actually I was convinced that this was already implemented in 28.0.0 after a cursory code/PR glance, hence my original issue #7149).

That said, that may take some time, and this PR feels to be on the verge of breaking even when it comes to performance and thus serve as a bandaid/base case until the real thing is implemented. So I'm compelled to continue experimenting with a couple of related approaches (i.e. things along the line of @Dandandan's batched eager sorting) for a short while if you don't mind (and share if I find something promising).

@alamb
Copy link
Contributor

alamb commented Aug 9, 2023

Sounds like a good plan @gruuya -- I believe I have some time this afternoon I can devote to trying to get a prototype for TopK up and running. I'll report back here with any progress

@alamb
Copy link
Contributor

alamb commented Aug 9, 2023

I have made some progress on a native TopK operator -- I need to run some performance tests and polish it up and then I'll share it.

I am sure it will need some additional work but I think it is looking promising

@gruuya
Copy link
Contributor Author

gruuya commented Aug 10, 2023

Glad to hear that @alamb!

I think we can close this PR now as it seems it's obsolete (as well as too verbose at this point), and continue discussion in a dedicated issue/PR for a native Top-K operator.

@gruuya gruuya closed this Aug 10, 2023
@alamb
Copy link
Contributor

alamb commented Aug 10, 2023

FWIW I made a very simplistic POC for a special TopK operator and for my test (K=10) it goes almost 2x faster and uses 1/3 the peak memory. There is also plenty of room for both algorithmic and engineering optimizations. I'll be hacking on the idea in #7250 and will make a broader announcement when ready

@alamb
Copy link
Contributor

alamb commented Aug 11, 2023

BTW I think #7250 is ready for some feedback -- it seems to work quite well, both with memory consumed as well as performance wise.

@gruuya perhaps you could test your example query with #7250 and see if it helps your usecase too?

@gruuya
Copy link
Contributor Author

gruuya commented Aug 11, 2023

@gruuya perhaps you could test your example query with #7250 and see if it helps your usecase too?

Oh, I'd love to! I'll report back to that PR when I get some numbers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate sqllogictest SQL Logic Tests (.slt)
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants