Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adaptive in-memory sort (~2x faster) (#5879) #6163

Merged
merged 5 commits into from
May 2, 2023

Conversation

tustvold
Copy link
Contributor

@tustvold tustvold commented Apr 29, 2023

Which issue does this PR close?

Closes #5879
Closes #5230

Rationale for this change

merge sorted i64        time:   [4.7568 ms 4.7616 ms 4.7669 ms]
                        change: [+2.1068% +2.3410% +2.5729%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 7 outliers among 100 measurements (7.00%)
  4 (4.00%) high mild
  3 (3.00%) high severe

sort merge i64          time:   [4.7577 ms 4.7668 ms 4.7756 ms]
                        change: [-3.8818% -3.5709% -3.2819%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 10 outliers among 100 measurements (10.00%)
  8 (8.00%) low mild
  2 (2.00%) high mild

sort i64                time:   [3.5604 ms 3.5695 ms 3.5787 ms]
                        change: [-47.559% -47.338% -47.118%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

sort partitioned i64    time:   [189.26 µs 191.22 µs 193.29 µs]
                        change: [-67.019% -44.594% -9.0868%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  9 (9.00%) high severe

merge sorted f64        time:   [4.9416 ms 4.9477 ms 4.9549 ms]
                        change: [+1.0481% +1.2864% +1.5176%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 5 outliers among 100 measurements (5.00%)
  2 (2.00%) high mild
  3 (3.00%) high severe

sort merge f64          time:   [4.9364 ms 4.9466 ms 4.9567 ms]
                        change: [-4.5491% -4.2663% -3.9838%] (p = 0.00 < 0.05)
                        Performance has improved.

sort f64                time:   [4.5859 ms 4.6085 ms 4.6335 ms]
                        change: [-41.877% -41.536% -41.182%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 13 outliers among 100 measurements (13.00%)
  3 (3.00%) high mild
  10 (10.00%) high severe

sort partitioned f64    time:   [193.25 µs 194.84 µs 196.60 µs]
                        change: [-66.364% -49.820% -24.707%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) high mild
  6 (6.00%) high severe

merge sorted utf8 low cardinality
                        time:   [5.2695 ms 5.2795 ms 5.2905 ms]
                        change: [+0.1782% +0.5459% +0.8824%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 8 outliers among 100 measurements (8.00%)
  3 (3.00%) high mild
  5 (5.00%) high severe

sort merge utf8 low cardinality
                        time:   [5.4749 ms 5.4859 ms 5.4968 ms]
                        change: [-9.5832% -9.1290% -8.6668%] (p = 0.00 < 0.05)
                        Performance has improved.

sort utf8 low cardinality
                        time:   [10.327 ms 10.352 ms 10.379 ms]
                        change: [+12.671% +13.176% +13.689%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 10 outliers among 100 measurements (10.00%)
  10 (10.00%) high mild

sort partitioned utf8 low cardinality
                        time:   [380.82 µs 390.19 µs 400.01 µs]
                        change: [-58.561% -51.365% -41.516%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) high mild
  3 (3.00%) high severe

merge sorted utf8 high cardinality
                        time:   [8.0970 ms 8.1371 ms 8.1811 ms]
                        change: [+1.7218% +2.4401% +3.1634%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 12 outliers among 100 measurements (12.00%)
  4 (4.00%) high mild
  8 (8.00%) high severe

sort merge utf8 high cardinality
                        time:   [8.2608 ms 8.2747 ms 8.2892 ms]
                        change: [-10.764% -10.105% -9.4476%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

sort utf8 high cardinality
                        time:   [14.481 ms 14.601 ms 14.726 ms]
                        change: [-24.598% -23.812% -23.012%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

sort partitioned utf8 high cardinality
                        time:   [457.02 µs 466.43 µs 475.95 µs]
                        change: [-60.870% -50.562% -36.727%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 6 outliers among 100 measurements (6.00%)
  1 (1.00%) high mild
  5 (5.00%) high severe

merge sorted utf8 tuple time:   [14.555 ms 14.572 ms 14.591 ms]
                        change: [-0.4969% -0.2681% -0.0295%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 5 outliers among 100 measurements (5.00%)
  4 (4.00%) high mild
  1 (1.00%) high severe

sort merge utf8 tuple   time:   [16.312 ms 16.388 ms 16.464 ms]
                        change: [-8.7208% -8.0755% -7.4565%] (p = 0.00 < 0.05)
                        Performance has improved.

sort utf8 tuple         time:   [27.043 ms 27.188 ms 27.334 ms]
                        change: [-59.129% -58.917% -58.688%] (p = 0.00 < 0.05)
                        Performance has improved.

Benchmarking sort partitioned utf8 tuple: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.0s, enable flat sampling, or reduce sample count to 70.
sort partitioned utf8 tuple
                        time:   [1.0555 ms 1.0934 ms 1.1338 ms]
                        change: [-56.637% -52.161% -44.365%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

merge sorted utf8 dictionary
                        time:   [5.5580 ms 5.5623 ms 5.5667 ms]
                        change: [+2.0292% +2.2143% +2.3711%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

sort merge utf8 dictionary
                        time:   [5.2240 ms 5.2377 ms 5.2515 ms]
                        change: [-6.0528% -5.7105% -5.3791%] (p = 0.00 < 0.05)
                        Performance has improved.

Benchmarking sort utf8 dictionary: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.3s, enable flat sampling, or reduce sample count to 50.
sort utf8 dictionary    time:   [1.6521 ms 1.6562 ms 1.6608 ms]
                        change: [-71.444% -71.351% -71.261%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 13 outliers among 100 measurements (13.00%)
  10 (10.00%) high mild
  3 (3.00%) high severe

sort partitioned utf8 dictionary
                        time:   [191.38 µs 193.07 µs 194.76 µs]
                        change: [-67.496% -48.064% -16.445%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 8 outliers among 100 measurements (8.00%)
  2 (2.00%) high mild
  6 (6.00%) high severe

merge sorted utf8 dictionary tuple
                        time:   [8.3299 ms 8.3399 ms 8.3514 ms]
                        change: [+2.1649% +2.3555% +2.5497%] (p = 0.00 < 0.05)
                        Performance has regressed.
Found 4 outliers among 100 measurements (4.00%)
  3 (3.00%) high mild
  1 (1.00%) high severe

sort merge utf8 dictionary tuple
                        time:   [8.3788 ms 8.4157 ms 8.4522 ms]
                        change: [-11.462% -10.581% -9.7110%] (p = 0.00 < 0.05)
                        Performance has improved.

sort utf8 dictionary tuple
                        time:   [18.048 ms 18.103 ms 18.159 ms]
                        change: [-33.080% -32.800% -32.502%] (p = 0.00 < 0.05)
                        Performance has improved.

sort partitioned utf8 dictionary tuple
                        time:   [695.23 µs 717.90 µs 741.22 µs]
                        change: [-58.425% -50.360% -40.576%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 4 outliers among 100 measurements (4.00%)
  1 (1.00%) high mild
  3 (3.00%) high severe

merge sorted mixed dictionary tuple
                        time:   [13.847 ms 13.864 ms 13.883 ms]
                        change: [-0.1635% +0.0555% +0.2611%] (p = 0.60 > 0.05)
                        No change in performance detected.
Found 3 outliers among 100 measurements (3.00%)
  2 (2.00%) high mild
  1 (1.00%) high severe

sort merge mixed dictionary tuple
                        time:   [14.056 ms 14.103 ms 14.150 ms]
                        change: [-9.5911% -9.0131% -8.4159%] (p = 0.00 < 0.05)
                        Performance has improved.

sort mixed dictionary tuple
                        time:   [25.585 ms 25.672 ms 25.759 ms]
                        change: [-60.308% -60.150% -59.983%] (p = 0.00 < 0.05)
                        Performance has improved.

sort partitioned mixed dictionary tuple
                        time:   [753.92 µs 781.26 µs 811.65 µs]
                        change: [-56.610% -50.505% -43.248%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 5 outliers among 100 measurements (5.00%)
  1 (1.00%) high mild
  4 (4.00%) high severe

merge sorted mixed tuple
                        time:   [13.707 ms 13.724 ms 13.744 ms]
                        change: [-0.7392% -0.5536% -0.3534%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 3 outliers among 100 measurements (3.00%)
  1 (1.00%) high mild
  2 (2.00%) high severe

sort merge mixed tuple  time:   [14.666 ms 14.728 ms 14.792 ms]
                        change: [-7.0426% -6.5315% -6.0608%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

sort mixed tuple        time:   [22.520 ms 22.655 ms 22.792 ms]
                        change: [-27.945% -27.394% -26.833%] (p = 0.00 < 0.05)
                        Performance has improved.

sort partitioned mixed tuple
                        time:   [661.24 µs 678.44 µs 696.29 µs]
                        change: [-57.005% -49.694% -40.292%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  2 (2.00%) high mild
  5 (5.00%) high severe

What changes are included in this PR?

Previously ExternalSorter would sort every batch it receives, but then perform an in-memory sort by concatenating all the inputs together.

This PR modifies ExternalSorter to not presort batches, and to instead perform an in-memory sort prior to either spilling or performing the final sort. It also adds an adaptive strategy that falls back to SortPreservingMerge based on the size of the input batches.

With this PR ExternalSorter will now use the default Rust sort, currently pattern-defeating quicksort, for small inputs, switching to merge sort based on SortPreservingMerge for larger inputs.

Are these changes tested?

Are there any user-facing changes?

@github-actions github-actions bot added the core Core DataFusion crate label Apr 29, 2023
@@ -1089,7 +820,7 @@ mod tests {
#[tokio::test]
async fn test_sort_fetch_memory_calculation() -> Result<()> {
// This test mirrors down the size from the example above.
let avg_batch_size = 5000;
let avg_batch_size = 4000;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to adjust this down slightly, as the memory characteristics have changed slightly (for the better)

self.fetch,
)?;
// TODO: More accurate, dynamic memory accounting (#5885)
merge_metrics.init_mem_used(self.reservation.free());
Copy link
Contributor Author

@tustvold tustvold Apr 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is redundant as the mem sort stream now accounts for its memory usage

Comment on lines +292 to +293
// TODO: Run batch sorts concurrently (#6162)
// TODO: Pushdown fetch to streaming merge (#6000)
Copy link
Contributor Author

@tustvold tustvold Apr 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are some ideas for further improvements, both should be relatively straightforward to implement

#6162 is potentially a big win, it will effectively get us parallel merge sort

@yjshen
Copy link
Member

yjshen commented Apr 30, 2023

Previously ExternalSorter would sort every batch it receives, but then perform an in-memory sort by concatenating all the inputs together.

This PR modifies ExternalSorter to not presort batches

I think that sorting the batch while it is still in the cache, immediately after its generation, can effectively utilize the cache and enhance performance?

@@ -118,7 +118,7 @@ fn make_staggered_batches(len: usize) -> Vec<RecordBatch> {
RecordBatch::try_from_iter(vec![(
"x",
Arc::new(Int32Array::from_iter_values(
std::iter::from_fn(|| Some(rng.gen())).take(to_read),
(0..to_read).map(|_| rng.gen()),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seemingly innocuous change is necessary because the previous formulation results in a size hint of 0, resulting in bump allocation of the Int32Array. This in turn causes it to be larger than it should be. Previously as we sorted on ingest this wasn't a problem, as it would rewrite the array, we no longer do this and so the memory accounting is impacted by this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend a comment explaining this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems strange to explain what was previously just an implementation bug, I think it is fine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess I was thinking that without a comment it could easily regress in the future 🤷 maybe the memory accounting failures would catch it

@tustvold
Copy link
Contributor Author

tustvold commented Apr 30, 2023

can effectively utilize the cache and enhance performance

At least empirically with the benchmarks I have available, the cost of sorting and therefore copying all the values an additional time far outweighs any cache locality effects.

"| 289 | 269 | 305 | 305 | 305 | 283 | 100 | 100 | 99 | 99 | 86 | 86 | 301 | 296 | 301 | 1004 | 305 | 305 | 301 | 301 | 1001 | 1002 | 1001 | 289 |",
"| 289 | 266 | 305 | 305 | 305 | 278 | 99 | 99 | 99 | 99 | 86 | 86 | 296 | 291 | 296 | 1004 | 305 | 305 | 301 | 296 | 305 | 1002 | 305 | 286 |",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have to confess to not really understanding what is going on in this test. I think the output order was just not guaranteed, but I don't really understand what is going on with these functions 😅

Perhaps @mustafasrepo might be able to verify this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since ts doesn't consist of unique values, result is not deterministic. In another Pr I will fix it. As far as this PR is concerned you are right, output order is not guaranteed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for looking into

/// Stream of sorted record batches
struct SortedSizedRecordBatchStream {
schema: SchemaRef,
async fn spill_sorted_batches(
batches: Vec<RecordBatch>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for curiosity, why do you prefer vector instead of stream here? to avoid channels overheads?

Copy link
Contributor Author

@tustvold tustvold Apr 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I switched to an in-place sort so that we can potentially avoid needing to spill - https://github.com/apache/arrow-datafusion/pull/6163/files#diff-c0e76bbcb3ed7bfbba2f99fedfdab7ebb9200746a835db51619a6b10e3e2adcfR128

Given this, the stream was unnecessary added complexity

@alamb
Copy link
Contributor

alamb commented Apr 30, 2023

I ran the sort benchmarks (shout out to @jaylmiller for adding them) on this branch https://github.com/apache/arrow-datafusion/tree/main/benchmarks#parquet-benchmarks

They certainly look very promising:

--------------------
Benchmark sort.json
--------------------
┏━━━━━━━━━━━━━━┳━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━┓
┃ Query        ┃        sort ┃       sort ┃        Change ┃
┡━━━━━━━━━━━━━━╇━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━┩
│ Qsort utf8   │  80037.05ms │ 55724.44ms │ +1.44x faster │
│ Qsort int    │  96166.00ms │ 69435.53ms │ +1.38x faster │
│ Qsort        │  85487.77ms │ 57109.82ms │ +1.50x faster │
│ decimal      │             │            │               │
│ Qsort        │ 103824.30ms │ 78792.01ms │ +1.32x faster │
│ integer      │             │            │               │
│ tuple        │             │            │               │
│ Qsort utf8   │  80706.73ms │ 63156.00ms │ +1.28x faster │
│ tuple        │             │            │               │
│ Qsort mixed  │  97100.16ms │ 68542.10ms │ +1.42x faster │
│ tuple        │             │            │               │
└──────────────┴─────────────┴────────────┴───────────────┘

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read this PR carefully and it is really nice. Thank you so much @tustvold -- it is a mark of expertise that the system is sped with a net deletion of code ❤️

its-so-beautiful-crying-gif

cc @yjshen / @Dandandan in case you are interested in this

Comment on lines +238 to +242
self.in_mem_batches = self
.in_mem_sort_stream(tracking_metrics)?
.try_collect()
.await?;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this use of streams is downright beautiful. It just is so easy to read

// The factor of 2 aims to avoid a degenerate case where the
// memory required for `fetch` is just under the memory available,
// causing repeated resorting of data
if self.reservation.size() > before / 2
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in the case where self.reservation.size() > before / 2 is true, (and we have freed a bunch of memory and skop the spilling) it seems like self.reservation_try_grow(size) needs to be called again and return successfully

Maybe we could just reverse the order of this check (self.reservation.try_grow(size).is_err() || self.reservation.size() > before / 2) and add a comment noting it relies on the side effects

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs to be called again and return successfully

It is called in the loop body following the spill


struct SortedIterator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This merging uses the streaming_merge implementation now, right?

@tustvold tustvold merged commit 9a0ab5f into apache:main May 2, 2023
@andygrove andygrove added performance Make DataFusion faster enhancement New feature or request labels May 6, 2023
@yjshen yjshen mentioned this pull request Aug 7, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate enhancement New feature or request performance Make DataFusion faster
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Unify Sorting Implementations Use Arrow Row Format in SortExec to improve performance
6 participants