Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return ResourceExhausted errors when memory limit is exceed in GroupedHashAggregateStreamV2 (Row Hash) #4202

Merged
merged 4 commits into from
Nov 18, 2022

Conversation

crepererum
Copy link
Contributor

Which issue does this PR close?

Doesn't close, but works towards #3940 (need to migrate V1 as well

Rationale for this change

Ensure that users don't run out of memory while performing group-by operations. This is esp. important for servers or multi-tenant systems.

What changes are included in this PR?

  • small clean up regarding async usage (first commit)
  • use a a nested construct (BoxStream) for GroupedHashAggregateStreamV2 so we can call into the async memory manager (I thought about NOT doing this but I think it's worth to consider because on the long run a group-by can get another splillable operation to spill)

Are these changes tested?

  • new test (test_oom)
  • perf results (see below)

Perf results:

❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue3940a-pre
    Finished bench [optimized] target(s) in 0.08s
     Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-e9e315ab7a06a262)
aggregate_query_no_group_by 15 12
                        time:   [654.77 µs 655.49 µs 656.29 µs]
                        change: [-1.6711% -1.2910% -0.8435%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) low mild
  5 (5.00%) high mild
  3 (3.00%) high severe

aggregate_query_no_group_by_min_max_f64
                        time:   [579.93 µs 580.59 µs 581.27 µs]
                        change: [-3.8985% -3.2219% -2.6198%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) low severe
  3 (3.00%) low mild
  1 (1.00%) high mild
  4 (4.00%) high severe

aggregate_query_no_group_by_count_distinct_wide
                        time:   [2.4610 ms 2.4801 ms 2.4990 ms]
                        change: [-2.9300% -1.8414% -0.7493%] (p = 0.00 < 0.05)
                        Change within noise threshold.

Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.4s, enable flat sampling, or reduce sample count to 50.
aggregate_query_no_group_by_count_distinct_narrow
                        time:   [1.6578 ms 1.6661 ms 1.6743 ms]
                        change: [-4.5391% -3.5033% -2.5050%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by
                        time:   [2.1767 ms 2.2045 ms 2.2486 ms]
                        change: [-4.1048% -2.5858% -0.3237%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter
                        time:   [1.0916 ms 1.0927 ms 1.0941 ms]
                        change: [-0.8524% -0.4230% -0.0724%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low severe
  1 (1.00%) low mild
  4 (4.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by_u64 15 12
                        time:   [2.2108 ms 2.2238 ms 2.2368 ms]
                        change: [-4.2142% -3.2743% -2.3523%] (p = 0.00 < 0.05)
                        Performance has improved.

Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter_u64 15 12
                        time:   [1.0922 ms 1.0931 ms 1.0940 ms]
                        change: [-0.6872% -0.3192% +0.1193%] (p = 0.12 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) low mild
  4 (4.00%) high severe

aggregate_query_group_by_u64_multiple_keys
                        time:   [14.714 ms 15.023 ms 15.344 ms]
                        change: [-5.8337% -2.7471% +0.2798%] (p = 0.09 > 0.05)
                        No change in performance detected.

aggregate_query_approx_percentile_cont_on_u64
                        time:   [3.7776 ms 3.8049 ms 3.8329 ms]
                        change: [-4.4977% -3.4230% -2.3282%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

aggregate_query_approx_percentile_cont_on_f32
                        time:   [3.1769 ms 3.1997 ms 3.2230 ms]
                        change: [-4.4664% -3.2597% -2.0955%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild

I think the mild improvements are either flux or due to the somewhat
manual memory allocation pattern.

Are there any user-facing changes?

The V2 group-by op an now emit a ResourceExhausted error if it runs out of memory. Note that the error is kinda nested/wrapped due to #4172.

Most of it is refactoring to allow us to call the async memory subsystem
while polling the stream. The actual memory accounting is rather easy
(since it's only ever growing except when the stream is dropped).

Helps with apache#3940. (not closing yet, also need to do V1)

Performance Impact:
-------------------

```text
❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue3940a-pre
    Finished bench [optimized] target(s) in 0.08s
     Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-e9e315ab7a06a262)
aggregate_query_no_group_by 15 12
                        time:   [654.77 µs 655.49 µs 656.29 µs]
                        change: [-1.6711% -1.2910% -0.8435%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) low mild
  5 (5.00%) high mild
  3 (3.00%) high severe

aggregate_query_no_group_by_min_max_f64
                        time:   [579.93 µs 580.59 µs 581.27 µs]
                        change: [-3.8985% -3.2219% -2.6198%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) low severe
  3 (3.00%) low mild
  1 (1.00%) high mild
  4 (4.00%) high severe

aggregate_query_no_group_by_count_distinct_wide
                        time:   [2.4610 ms 2.4801 ms 2.4990 ms]
                        change: [-2.9300% -1.8414% -0.7493%] (p = 0.00 < 0.05)
                        Change within noise threshold.

Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.4s, enable flat sampling, or reduce sample count to 50.
aggregate_query_no_group_by_count_distinct_narrow
                        time:   [1.6578 ms 1.6661 ms 1.6743 ms]
                        change: [-4.5391% -3.5033% -2.5050%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by
                        time:   [2.1767 ms 2.2045 ms 2.2486 ms]
                        change: [-4.1048% -2.5858% -0.3237%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter
                        time:   [1.0916 ms 1.0927 ms 1.0941 ms]
                        change: [-0.8524% -0.4230% -0.0724%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low severe
  1 (1.00%) low mild
  4 (4.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by_u64 15 12
                        time:   [2.2108 ms 2.2238 ms 2.2368 ms]
                        change: [-4.2142% -3.2743% -2.3523%] (p = 0.00 < 0.05)
                        Performance has improved.

Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter_u64 15 12
                        time:   [1.0922 ms 1.0931 ms 1.0940 ms]
                        change: [-0.6872% -0.3192% +0.1193%] (p = 0.12 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) low mild
  4 (4.00%) high severe

aggregate_query_group_by_u64_multiple_keys
                        time:   [14.714 ms 15.023 ms 15.344 ms]
                        change: [-5.8337% -2.7471% +0.2798%] (p = 0.09 > 0.05)
                        No change in performance detected.

aggregate_query_approx_percentile_cont_on_u64
                        time:   [3.7776 ms 3.8049 ms 3.8329 ms]
                        change: [-4.4977% -3.4230% -2.3282%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

aggregate_query_approx_percentile_cont_on_f32
                        time:   [3.1769 ms 3.1997 ms 3.2230 ms]
                        change: [-4.4664% -3.2597% -2.0955%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
```

I think the mild improvements are either flux or due to the somewhat
manual memory allocation pattern.
@github-actions github-actions bot added the core Core DataFusion crate label Nov 14, 2022
@alamb alamb requested a review from yjshen November 15, 2022 18:28
@alamb alamb changed the title wire memory management into GroupedHashAggregateStreamV2 Return ResourceExhausted errors when memory limit is exceed in GroupedHashAggregateStreamV2 (Row Hash) Nov 15, 2022
Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @crepererum I went through this PR carefully and I think it could be merged as is. Thank you for the performance results

Note to other reviewers is that the memory limits are not enabled by default so the additional accounting will not be used except if the memory manager limits are engaged

I had some small suggestions but none that I think are required

I also found this PR easier to review using whitespace blind diff https://github.com/apache/arrow-datafusion/pull/4202/files?w=1

cc @yjshen and @milenkovicm who I think has been working in this area

ALso cc @Dandandan as I know you are often interested in this type of code

/// high due to lock contention) and pre-calculating the entire allocation for a whole [`RecordBatch`] is complicated or
/// expensive.
///
/// The pool will try to allocate a whole block of memory and gives back overallocated memory on [drop](Self::drop).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👌

@@ -70,6 +72,16 @@ use hashbrown::raw::RawTable;
/// [Compact]: datafusion_row::layout::RowType::Compact
/// [WordAligned]: datafusion_row::layout::RowType::WordAligned
pub(crate) struct GroupedHashAggregateStreamV2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks very much like other stream adapters we have in DataFusion -- perhaps we can name it something more general like SendableRecordBatchStreamWrapper or something and put it in

https://github.com/apache/arrow-datafusion/blob/c9361e0210861962074eb10d7e480949bb862b97/datafusion/core/src/physical_plan/stream.rs#L34

we can always do this as a follow on PR as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do that in a follow-up, since migrating V1 will probably end up with the same helper.

// allocate more

// growth factor: 2, but at least 2 elements
let bump_elements = (group_state.indices.capacity() * 2).max(2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could somehow encapsulate the memory manager interactions into functions on GroupAggrState rather than treating it like a struct. I don't think that is necessary .

However encapsulating might:

  1. Keep this code manageable for future readers
  2. Allow the memory allocation routines to be unit tested (like that when new groups are added that the memory allocation is incremented correctly)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tend to agree with with @alamb here, IMHO group_aggregate_batch is too busy at the moment, and some kind of separation of concerns would help.

What if group_aggregate_batch returns how much more memory it allocated, and accounting is done after method call? This would help with encapsulation of aggregation algorithm and make it easier to swap. I'm aware that it might not produce 100% correct results but as we discussed in #3941 it is ok to have small discrepancy for short period of time

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, this way end of the batch would be a "safe point" at which we could trigger spill

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we could somehow encapsulate the memory manager interactions into functions on GroupAggrState rather than treating it like a struct.

That only works if all interactions with GroupState go throw methods, not only a few of them due to how Rust handles borrowing (= fn f(&self) and fn f(&mut self) borrow the whole struct, so you cannot mutable borrow any member at the same time).

What if group_aggregate_batch returns how much more memory it allocated, and accounting is done after method call? ... Also, this way end of the batch would be a "safe point" at which we could trigger spill

Fair, let me try that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

Let me know if this looks better. I will pull out + document all the helper structs and traits when I port V1 (I want at least a 2nd consumer so I can make sure the interface makes sense).

if group_state.indices.capacity() == group_state.indices.len() {
// allocate more

// growth factor: 2, but at least 2 elements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Growth factors like this are sometimes capped at some large value (like 1G) to avoid the 2x memory overhead associated at large memory levels.

If we use 2x growth with no cap, you can get into situations like the table would fit in 36GB but the code is trying to go from 32GB to 64GB and hits the limit even when the query could complete. This could always be handled in a follow on PR -- users can always disable the memory manager and let the allocations happen and suffer OOMs if they want the current behavior

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timer.done();

match result {
Ok(_) => continue,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this would be place to do something like:

 Ok(_) => {
    let new_data_size = this.aggr_state.get_current_size();
    let acquired = this.memory_manager.can_grow_directly(new_data_size - data_size_before_batch, data_size_before_batch);
    if !acquired {
        this.aggr_state.spill();
        this.memory_manager.record_free_then_acquire(data_size, 0);
    }
    continue;
}

we basically assume that group_aggregate_batch can get all the memory it needs, no need to do per row interaction with memory manager.

this would decouple process and accounting

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The interaction is not per row. It's per batch. I can place the accounting here. The code you propose is basically the same that currently runs, just inlined (it's the default impl. of MemoryConsumer::try_grow).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies you're right @crepererum it is per batch.

The reason why I believe moving it out makes sense is separation of concerns, but it's up to you.

for example, at line 363

        // allocate memory
        // This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
        // overshooting a bit. Also this means we either store the whole record batch or not.
        memory_consumer.alloc(allocated).await?;

can this trigger spill? will the state be consistent if spill is triggered. My guess it will be not, it might be implementation specific, but hard to tell without understanding memory management implementation, and store implementation.

fn insert_accounted(
&mut self,
x: Self::T,
hasher: impl Fn(&Self::T) -> u64,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is coupling with current implementation. for example, what if we decide to keep state in b-tree rather than hash map (we need it sorted due to spill)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, memory accounting is ALWAYS coupled to the data structures that are used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my bad @crepererum ignore my comment, apologies

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Long-term I would wish that Rust stabilizes the Allocator trait so we could plug this into the data structures and measure their usage (no need to guess).

// allocate memory
// This happens AFTER we actually used the memory, but simplifies the whole accounting and we are OK with
// overshooting a bit. Also this means we either store the whole record batch or not.
memory_consumer.alloc(allocated).await?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as i mentioned above, should this call go before return statement? if it triggers spill we internal state should be consistent.

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is looking great

https://github.com/apache/arrow-datafusion/pull/4202/files?w=1 shows the diff clearly

What are your thoughts @milenkovicm ?

@@ -418,6 +487,130 @@ impl std::fmt::Debug for AggregationState {
}
}

/// Accounting data structure for memory usage.
struct AggregationStateMemoryConsumer {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️

fn push_accounted(&mut self, x: Self::T, accounting: &mut usize);
}

impl<T> VecAllocExt for Vec<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is very nice

@milenkovicm
Copy link
Contributor

I think this is looking great

https://github.com/apache/arrow-datafusion/pull/4202/files?w=1 shows the diff clearly

What are your thoughts @milenkovicm ?

I think @crepererum did fine job here.

Not sure if he will move

memory_consumer.alloc(allocated).await?;

Just before return statement, otherwise it is spot on.

@crepererum
Copy link
Contributor Author

I'll move the alloc statement, give me a few minutes...

@crepererum
Copy link
Contributor Author

I'll move the alloc statement, give me a few minutes...

done

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great -- thank you @milenkovicm and @crepererum -- I will plan to merge this tomorrow unless I hear otherwise

FYI @liukun4515 @Dandandan @avantgardnerio @andygrove

@alamb alamb merged commit f3a65c7 into apache:master Nov 18, 2022
@ursabot
Copy link

ursabot commented Nov 18, 2022

Benchmark runs are scheduled for baseline = 09e1c91 and contender = f3a65c7. f3a65c7 is a master commit associated with this PR. Results will be available as each benchmark for each run completes.
Conbench compare runs links:
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ec2-t3-xlarge-us-east-2] ec2-t3-xlarge-us-east-2
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on test-mac-arm] test-mac-arm
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-i9-9960x] ursa-i9-9960x
[Skipped ⚠️ Benchmarking of arrow-datafusion-commits is not supported on ursa-thinkcentre-m75q] ursa-thinkcentre-m75q
Buildkite builds:
Supported benchmarks:
ec2-t3-xlarge-us-east-2: Supported benchmark langs: Python, R. Runs only benchmarks with cloud = True
test-mac-arm: Supported benchmark langs: C++, Python, R
ursa-i9-9960x: Supported benchmark langs: Python, R, JavaScript
ursa-thinkcentre-m75q: Supported benchmark langs: C++, Java

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
core Core DataFusion crate
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants