Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Request for Comment: Native TopK Operator #7250

Closed
wants to merge 15 commits into from
Closed

Conversation

alamb
Copy link
Contributor

@alamb alamb commented Aug 9, 2023

Which issue does this PR close?

Related to #7196

This PR contains a proof of concept for a general purpose TopK operator -- I do not intend to merge this PR as is, but rather use it to gather feedback / refine the idea.

When we are happy with that, I will prepare an actual PR for consideration and review

Specific questions for reviewers:

  1. Do you know of a better structure than is in this PR (sorted Vec) for keeping a topk heap?
  2. Do you have thoughts / opinions on a TopKExec (vs reusing SortExec)?

Rationale for this change

The idea is that when sorting with a limit DataFusion currently spends

  1. a lot of memory retaining lots of data which will eventually be discarded
  2. more CPU time than necessary to sort / merge rows which will ultimately be discarded

What changes are included in this PR?

This PR contains a single TopK implementation, that uses the RowFormat. This new TopK is used in place of SortExec with a fetch in this PR, though I think it should eventually get its own TopKExec

Note I thought about using a specialized implementation for primitive types but didn't do it because:

  1. The overall time is now pretty small (in my mini benchmarks the row format takes in total 2-3% of the time)
  2. It would take some non trivial complexity to handle nulls first / nulls last and asc / desc correctly and efficiently

Known TODOs:

  • Implement a better structure than a sorted Vec (maybe)
  • Tests
  • Performance Test / verify with larger Ks

When ran this with a K=1000 I got a panic when creating the output batches (I think I need to optimize the output creation for dictionary batches).

thread 'tokio-runtime-worker' panicked at 'offset overflow', /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-45.0.0/src/transform/utils.rs:42:56
stack backtrace:
   0: rust_begin_unwind
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/std/src/panicking.rs:593:5
   1: core::panicking::panic_fmt
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/panicking.rs:67:14
   2: core::panicking::panic_display
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/panicking.rs:150:5
   3: core::panicking::panic_str
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/panicking.rs:134:5
   4: core::option::expect_failed
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/option.rs:1932:5
   5: core::option::Option<T>::expect
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/option.rs:898:21
   6: arrow_data::transform::utils::extend_offsets::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-45.0.0/src/transform/utils.rs:42:56
   7: core::iter::traits::iterator::Iterator::for_each::call::{{closure}}
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/iter/traits/iterator.rs:853:29
   8: core::iter::traits::iterator::Iterator::fold
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/iter/traits/iterator.rs:2481:21
   9: core::iter::traits::iterator::Iterator::for_each
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/core/src/iter/traits/iterator.rs:856:9
  10: arrow_data::transform::utils::extend_offsets
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-45.0.0/src/transform/utils.rs:36:5
  11: arrow_data::transform::variable_size::build_extend::{{closure}}
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-45.0.0/src/transform/variable_size.rs:57:13
  12: <alloc::boxed::Box<F,A> as core::ops::function::Fn<Args>>::call
             at /rustc/8ede3aae28fe6e4d52b38157d7bfe0d3bceef225/library/alloc/src/boxed.rs:1999:9
  13: arrow_data::transform::MutableArrayData::extend
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-45.0.0/src/transform/mod.rs:631:9
  14: arrow_data::transform::MutableArrayData::with_capacities
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-45.0.0/src/transform/mod.rs:553:29
  15: arrow_data::transform::MutableArrayData::new
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-data-45.0.0/src/transform/mod.rs:353:9
  16: arrow_select::interleave::interleave_fallback
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-select-45.0.0/src/interleave.rs:198:26
  17: arrow_select::interleave::interleave
             at /Users/alamb/.cargo/registry/src/index.crates.io-6f17d22bba15001f/arrow-select-45.0.0/src/interleave.rs:90:14
  18: datafusion::physical_plan::topk::TopKHeap::emit::{{closure}}

Open Questions:

  1. How / should we hook this into the planner? It could save multi-way sorting / merging

Performance testing

I tested with this dataset: traces.zip (240MB):

with a release build cargo build --release

I used two queries with k=10 and k=1000 and

select * from traces order by time desc limit 10
# also ordering by high cardinality coumn:
select * from traces order by time desc, span_id limit 10

Performance

Using a high cardinality dictionary column:

select * from traces order by time desc, span_id limit 10

Using a high cardinality dictionary column:

query main(s) this branch(s) improvement
select * from traces order by time desc limit 10 1.791 0.801 2.2 x faster
select * from traces order by time desc, span_id limit 10 * 66.333. 3.043 21 x faster
select * from traces order by time desc limit 10000 3.393 (see above)
select * from traces order by time desc, span_id limit 10000 3.291 DNF (see above) XXX

For the queries where this branch is 20x faster -- I didn't profile master, however, a noticed large amount of this time is spent using only a single core, which I believe is some sort of merge inside the ExternalSort operator.

Memory Usage

Screenshot 2023-08-10 at 1 21 16 PM
Screenshot 2023-08-10 at 1 23 25 PM

@alamb alamb changed the title [NOT READY FOR REVIEW] [NOT READY FOR REVIEW] Native TopK Operator Aug 9, 2023
@alamb alamb marked this pull request as draft August 10, 2023 14:39
@github-actions github-actions bot added core Core DataFusion crate sqllogictest SQL Logic Tests (.slt) labels Aug 10, 2023
@alamb alamb force-pushed the alamb/topk branch 2 times, most recently from d2eeaae to 764a2ae Compare August 11, 2023 15:00
@@ -3208,8 +3209,8 @@ SELECT
ORDER BY C3
LIMIT 5
----
0.970671228336 0.970671228336
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these have the same problem -- that the top 5 values have the same value for the c3 column

SELECT
ts,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added ts to this query to show that the the first two rows have a tie in the same value 264 and thus it is ok that the output order changes. We should probably fix the test (or could add rowsort) but that might obscure real errors in the future 🤔

&self.metrics_set,
context.runtime_env(),
);
if let Some(fetch) = self.fetch.as_ref() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this prototype I simply used the TopK implementation for SortExec when there was a fetch as it produces the same output.

However, longer term I think we should make a real TopKExec ExecutionPlan so that the optimizers know about it and can avoid repartitioning / trying to do anything else fancy

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is some threshold at which the fetch is large enough that sorting is the better approach, certainly if the fetch is large enough that we need to spill

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible approach for starters (i.e. prior to TopKExec) could be to run the new TopK when fetch is Some and there was no memory limits set (i.e. RuntimeEnv uses UnboundedMemoryPool), given that the ExternalSorter does a relatively good job of obeying the memory limits (though it will use it all up in this scenario).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @tustvold and @gruuya -- these are good ideas.

Maybe we can figure out how to switch between sort and TopK dynamically -- like if TopK exceeds some memory threshold revert back to the normal ExternalSorter. Let me think about this 🤔

// 2. only do one update through top_k

let mut batch_entry = self.heap.register_batch(batch);
for (index, row) in rows.iter().enumerate() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The is the core of the algorithm -- it is pretty unfancy but avoids allocations unless there is an actual new top value. I will file some potential arrow-rs improvements to make this even faster

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What sort of data are you testing with? I did some modeling in python so I could have a "realistic" fake data generator, and using "fail fast" strategies like this seem to provide a ~10x performance boost.

However, when query planning this, should we take into account worst-case scenarios? Because that is where I've spent my time struggling with my PR.

When confronted with worst-case data, I can't seem to do better than being 40% worse than the existing aggregate->sort->limit.

Copy link
Contributor

@avantgardnerio avantgardnerio Aug 13, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the distribution I've seen in our data:

image

Edit: image uploads seem to be broken. Caption: Gaussian distribution shifted imperceptibly to the right

The mean is ~0.1. So for most records time isn't advancing at all (in milliseconds), it's frequently regressing too, but the curve is shifted slightly to the right so overall time marches on. I assume this is a typical case in time-series data bases.

But if these operators get invoked on other sorts of data, or if time data was clean and continually advancing, it would could be worst case and possibly cause performance regressions if we haven't been accounting for that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, when query planning this, should we take into account worst-case scenarios? Because that is where I've spent my time struggling with my PR.

Yes I think we should -- worst case is going to be reverse sorted data (where every value is a new TopK I think

For this particular PR I think with some more work this approach can do better in all cases than existing Sort w/ fetch partly due to how Sort w/ fetch is implemented. I do think there will need to be some decision of when to fall back to sort with limit as the sort with limit handles spilling to disk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW, I now have an implementation of the limited aggregator that seems to be always faster than sort with limit, at least for small limits. I think I can also make it ~40% faster with research I have in other branches.

So in short, I no longer think we need to fallback, at least if we spend enough time optimizing. Also of note for testing: my data generator does simulated timeseries data, or worst case with a flag, if you find this helpful for your work.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that data generator sounds very helpful. Where is it?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is here. I can PR it separately if you think it's useful.

It does things like look at real distributions for number of records per trace:

image

and approximate it with a Pareto distribution:

image

and similar for timings, so I think it exercises branches approximately the real way time series data would.

/// Storage for up at most `k` items, in ascending
/// order. `inner[0]` holds the smallest value of the smallest k
/// so far, `inner[len-1]` holds the largest value smallest k so far.
inner: Vec<TopKRow>,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there must be some more clever algorithmic trick to keep a sorted list / balanced tree but this one seems to work pretty well 🤷 I do think it will have O(N * K) worst case performance (if the input is reverse ordered so every new values is a new topK).

Copy link
Contributor

@tustvold tustvold Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://doc.rust-lang.org/std/collections/struct.BinaryHeap.html perhaps? I believe it likely boils down to the same thing, as it is just a wrapper around a Vec

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I couldn't figure out how to use a BinaryHeap is that I also need to get access to the largest (or smallest) value in the heap

The way BinaryHeap seems to have been setup is that you push an owned something in it and then pull the top item.

However, for this implementation it is critical not to have to actually construct an owned something unless it is a new top K (because constructing the item is expensive, given it contains an OwnedRow)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW @avantgardnerio suggested elsewhere that using a BTreeSet or similar structure would be better algorithmically (which we would see larger k ) -- I'll give it a try over the next day or two

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical not to have to actually construct an owned something

So if I understand correctly, you are keeping OwnedRows in your Vec/BTreeSet, but you are doing a comparison like:

let new_row = Row;
let vec = SortedVec<OwnedRow>;
if new_row > vec.last().as_row() {
   // fail fast
}

If so, I think you could use the BinaryHeap the same way, just by calling .peek() to get the "worst" value.

largest (or smallest) value in the heap

They have an example of making it a min-heap with Reverse.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with @tustvold unless there's something funny going on with branch mispredicts or cache locality, the BinaryHeap should be the most efficient for this use-case.

I was unable to use the out-of-the box one because I am doing aggregation as well, which requires either pointers or random-lookups into the tree.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 I was writing a response but then I think you are right we could use BinaryHeap with peek() -- I'll try that and see how it compares to a BTreeSet -- it might actually do very 🤔


// put the new row into the correct location to maintain that
// self.inner is sorted in descending order
let insertion_point = self
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is the other algorithmic thing

@@ -0,0 +1,202 @@
# Licensed to the Apache Software Foundation (ASF) under one
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is for testing -- it would not be checked in to a final PR

@@ -2597,6 +2597,7 @@ SELECT
# test_source_sorted_builtin
query TT
EXPLAIN SELECT
ts,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change is to demonstrate that the ts column is not unique in the first 5 values and thus that a different output order is acceptable

@alamb alamb changed the title [NOT READY FOR REVIEW] Native TopK Operator Request for Comment: Native TopK Operator Aug 11, 2023
@alamb
Copy link
Contributor Author

alamb commented Aug 11, 2023

This PR is now ready for review by anyone who might be interested

/// Size of memory owned by `row` until row::size() is available
/// TODO file upstream ticket in arrow-rs to add this
fn owned_row_size(row: &OwnedRow) -> usize {
std::mem::size_of_val(row) + row.as_ref().len() // underlying data, doesn't account for capacity
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW OwnedRow is an exact sized container, so there is no capacity to account for

/// the index in this record batch the row came from
index: usize,
/// the RecordBatch this row came from: an id into a [`RecordBatchStore`]
batch_id: u32,
Copy link
Contributor

@tustvold tustvold Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It occurs to me that you could potentially just store Arc<RecordBatch> here, potentially avoiding the need for RecordBatchStore. An atomic increment should be significantly less expensive than a hash table lookup 😄

Memory accounting would be a bit more complex, but could probably make use of Arc::into_inner to detect if the last reference.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the memory accounting was what I was worried about -- maybe I could use a HashSet and compare the underlying pointers or something 🤔

But for what it is worth, when profiled this most of the time seems to have gone into managing Rows

Screenshot 2023-08-11 at 1 04 40 PM

(which FWIW I plan to file a suggestion in arrow-rs shortly about how to avoid the Row allocations by reusing existing OwnedRow)

Copy link
Contributor

@tustvold tustvold Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could also just use Vec<u8>, the only reason to use OwnedRow is if you want to be able to go back, which given we retain the source RecordBatch seems unlikely to be necessary.

I'm not a massive fan of adding reuse of OwnedRow if we can avoid it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec<u8> would be fine -- the owned row is simply used for comparison. As you say this PR doesn't use it to go back to Arrays. I'll give it a try

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: using Vec worked great

Copy link
Contributor

@gruuya gruuya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've profiled this on the data set I used before, and the results are great! Very impressive.

K Peak memory main(e39b5ca) Peak memory eager sorting PR (dcba28f) Peak memory this PR (524af05)
1 2.5G 27M 15M
10 2.5G 32M 22M
100 2.5G 70M 24M
1000 2.5G 640M 24M
10000 2.5G 2.5G 30M

&self.metrics_set,
context.runtime_env(),
);
if let Some(fetch) = self.fetch.as_ref() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One possible approach for starters (i.e. prior to TopKExec) could be to run the new TopK when fetch is Some and there was no memory limits set (i.e. RuntimeEnv uses UnboundedMemoryPool), given that the ExternalSorter does a relatively good job of obeying the memory limits (though it will use it all up in this scenario).

.map(|k| {
let entry =
self.store.get(k.batch_id).expect("invalid stored batch id");
entry.batch.column(col) as &dyn Array
Copy link
Contributor

@gruuya gruuya Aug 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like until the very end we keep around the full batches that contain some of the top-k rows. This means that there is a potential edge case whereby the top 1000 rows are all distributed across 1000 different batches, thus negating the memory benefits to this operator. Even if they were distributed across 100 different batches this could mean keeping 819x more rows than we need to. The less sorted the input data is (the more uniformly distributed), the more this effect comes into play.

Can we extract/keep track of only the Rows that are in the top-k (maybe that way the BinaryHeap could be used as well)? Or alternatively, perform Array slicing and splicing eagerly, as we iterate over the batches?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point @gruuya

Or alternatively, perform Array slicing and splicing eagerly, as we iterate over the batches?

I think this is a great idea, and I think it would be relatively straight forward to "compact" the RecordBatches that are held in the RecordBatchStore -- namely copy all the stored rows into a new, single RecordBatch.

I'll give this a try as well over the next day or two

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Compaction is now implemented and seems to work well, FWIW

@alamb
Copy link
Contributor Author

alamb commented Aug 13, 2023

I plan to try the following things with this PR, likely tomorrow:

  • update storage to use Vec<u8> rather than OwnedRow (to avoid per row allocation)
  • Implement a "compact" type operation for RecordBatchStorage to reduce memory needs
  • Test using BTreeSet (to handle high row count) /BinaryHeap

@alamb
Copy link
Contributor Author

alamb commented Aug 16, 2023

My next plan is to implement record batch compaction and then once I have that test out the performance of this branch with large values of K and adversarial input (reverse sorted input)

@alamb
Copy link
Contributor Author

alamb commented Aug 23, 2023

Update:

  1. I updated the core algorithm to use a BinaryHeap from the rust std library and that works very well, and goes faster than main for LIMIT 10000 (aka "large" k type queries), including the worst case / adversarial where the data is reverse sorted
  2. I have started working on "compaction" to improve memory usage for "large k" type queries

Current status

Query Type time / CPU compared to main memory compared to main
select * from 'traces_nd_adversarial.parquet' order by time desc limit 10 better better
select * from 'traces_nd_adversarial.parquet' order by time desc limit 10000 better SAME
select * from 'traces.parquet' order by time desc limit 10 better better
select * from 'traces.parquet' order by time desc limit 10000 better SAME

Current remaining todos:

  • Improve memory usage compared to main for "large k" (via "compaction")
  • debug some issues with high cardinality dictionaries

@alamb
Copy link
Contributor Author

alamb commented Aug 23, 2023

Ok, I did some testing and I think the compaction is working as well (faster and better memory usage). All I need to do is handle this high cardinality dictionary case that is important and I think I'll be ready to make a PR for review

@github-actions github-actions bot added sql SQL Planner optimizer Optimizer rules substrait labels Aug 24, 2023
@alamb
Copy link
Contributor Author

alamb commented Aug 24, 2023

ok, I think I am now happy with the performance and feature set of this branch. I need to work out some interleave + dictionary nonsense with @tustvold but I will begin trying to create a real PR shortly

@alamb
Copy link
Contributor Author

alamb commented Aug 25, 2023

An update here:

  1. I think this code is now ready to propose adding to DataFusion and I plan to polish it up, write more tests / docs , and get a real PR up over the next few weeks

Some schedule items:

  1. I am out-ish for the next 1.5 weeks so I will likely not make much progress on this project until I return in September
  2. For this implementation, avoiding memory explosions with DictionaryArrays (as we do in IOx) requires "Optimize" Dictionary contents in DictionaryArray / concat_batches arrow-rs#506 (which @tustvold is working on, so I expect that will be ready in arrow 47.0.0, in about 2 weeks time

If anyone else needs this feature faster you are welcome to turn this PR into a real PR to DataFusion

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate optimizer Optimizer rules sql SQL Planner sqllogictest SQL Logic Tests (.slt) substrait
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants