Skip to content

Commit

Permalink
Return ResourceExhausted errors when memory limit is exceed in `Gro…
Browse files Browse the repository at this point in the history
…upedHashAggregateStreamV2` (Row Hash) (#4202)

* refactor: remove needless async

* feat: wire memory management into `GroupedHashAggregateStreamV2`

Most of it is refactoring to allow us to call the async memory subsystem
while polling the stream. The actual memory accounting is rather easy
(since it's only ever growing except when the stream is dropped).

Helps with #3940. (not closing yet, also need to do V1)

Performance Impact:
-------------------

```text
❯ cargo bench -p datafusion --bench aggregate_query_sql -- --baseline issue3940a-pre
    Finished bench [optimized] target(s) in 0.08s
     Running benches/aggregate_query_sql.rs (target/release/deps/aggregate_query_sql-e9e315ab7a06a262)
aggregate_query_no_group_by 15 12
                        time:   [654.77 µs 655.49 µs 656.29 µs]
                        change: [-1.6711% -1.2910% -0.8435%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) low mild
  5 (5.00%) high mild
  3 (3.00%) high severe

aggregate_query_no_group_by_min_max_f64
                        time:   [579.93 µs 580.59 µs 581.27 µs]
                        change: [-3.8985% -3.2219% -2.6198%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 9 outliers among 100 measurements (9.00%)
  1 (1.00%) low severe
  3 (3.00%) low mild
  1 (1.00%) high mild
  4 (4.00%) high severe

aggregate_query_no_group_by_count_distinct_wide
                        time:   [2.4610 ms 2.4801 ms 2.4990 ms]
                        change: [-2.9300% -1.8414% -0.7493%] (p = 0.00 < 0.05)
                        Change within noise threshold.

Benchmarking aggregate_query_no_group_by_count_distinct_narrow: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 8.4s, enable flat sampling, or reduce sample count to 50.
aggregate_query_no_group_by_count_distinct_narrow
                        time:   [1.6578 ms 1.6661 ms 1.6743 ms]
                        change: [-4.5391% -3.5033% -2.5050%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 7 outliers among 100 measurements (7.00%)
  1 (1.00%) low severe
  2 (2.00%) low mild
  2 (2.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by
                        time:   [2.1767 ms 2.2045 ms 2.2486 ms]
                        change: [-4.1048% -2.5858% -0.3237%] (p = 0.00 < 0.05)
                        Change within noise threshold.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high severe

Benchmarking aggregate_query_group_by_with_filter: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter
                        time:   [1.0916 ms 1.0927 ms 1.0941 ms]
                        change: [-0.8524% -0.4230% -0.0724%] (p = 0.02 < 0.05)
                        Change within noise threshold.
Found 9 outliers among 100 measurements (9.00%)
  2 (2.00%) low severe
  1 (1.00%) low mild
  4 (4.00%) high mild
  2 (2.00%) high severe

aggregate_query_group_by_u64 15 12
                        time:   [2.2108 ms 2.2238 ms 2.2368 ms]
                        change: [-4.2142% -3.2743% -2.3523%] (p = 0.00 < 0.05)
                        Performance has improved.

Benchmarking aggregate_query_group_by_with_filter_u64 15 12: Warming up for 3.0000 s
Warning: Unable to complete 100 samples in 5.0s. You may wish to increase target time to 5.5s, enable flat sampling, or reduce sample count to 60.
aggregate_query_group_by_with_filter_u64 15 12
                        time:   [1.0922 ms 1.0931 ms 1.0940 ms]
                        change: [-0.6872% -0.3192% +0.1193%] (p = 0.12 > 0.05)
                        No change in performance detected.
Found 7 outliers among 100 measurements (7.00%)
  3 (3.00%) low mild
  4 (4.00%) high severe

aggregate_query_group_by_u64_multiple_keys
                        time:   [14.714 ms 15.023 ms 15.344 ms]
                        change: [-5.8337% -2.7471% +0.2798%] (p = 0.09 > 0.05)
                        No change in performance detected.

aggregate_query_approx_percentile_cont_on_u64
                        time:   [3.7776 ms 3.8049 ms 3.8329 ms]
                        change: [-4.4977% -3.4230% -2.3282%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 2 outliers among 100 measurements (2.00%)
  2 (2.00%) high mild

aggregate_query_approx_percentile_cont_on_f32
                        time:   [3.1769 ms 3.1997 ms 3.2230 ms]
                        change: [-4.4664% -3.2597% -2.0955%] (p = 0.00 < 0.05)
                        Performance has improved.
Found 1 outliers among 100 measurements (1.00%)
  1 (1.00%) high mild
```

I think the mild improvements are either flux or due to the somewhat
manual memory allocation pattern.

* refactor: simplify memory accounting

* refactor: de-couple memory allocation
  • Loading branch information
crepererum authored Nov 18, 2022
1 parent 09e1c91 commit f3a65c7
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 86 deletions.
8 changes: 3 additions & 5 deletions datafusion/core/src/execution/memory_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,8 @@ pub trait MemoryConsumer: Send + Sync {
self.id(),
);

let can_grow_directly = self
.memory_manager()
.can_grow_directly(required, current)
.await;
let can_grow_directly =
self.memory_manager().can_grow_directly(required, current);
if !can_grow_directly {
debug!(
"Failed to grow memory of {} directly from consumer {}, spilling first ...",
Expand Down Expand Up @@ -334,7 +332,7 @@ impl MemoryManager {
}

/// Grow memory attempt from a consumer, return if we could grant that much to it
async fn can_grow_directly(&self, required: usize, current: usize) -> bool {
fn can_grow_directly(&self, required: usize, current: usize) -> bool {
let num_rqt = self.requesters.lock().len();
let mut rqt_current_used = self.requesters_total.lock();
let mut rqt_max = self.max_mem_for_requesters();
Expand Down
66 changes: 63 additions & 3 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl ExecutionPlan for AggregateExec {
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let batch_size = context.session_config().batch_size();
let input = self.input.execute(partition, context)?;
let input = self.input.execute(partition, Arc::clone(&context))?;

let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);

Expand All @@ -369,6 +369,8 @@ impl ExecutionPlan for AggregateExec {
input,
baseline_metrics,
batch_size,
context,
partition,
)?))
} else {
Ok(Box::pin(GroupedHashAggregateStream::new(
Expand Down Expand Up @@ -689,7 +691,8 @@ fn evaluate_group_by(

#[cfg(test)]
mod tests {
use crate::execution::context::TaskContext;
use crate::execution::context::{SessionConfig, TaskContext};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::from_slice::FromSlice;
use crate::physical_plan::aggregates::{
AggregateExec, AggregateMode, PhysicalGroupBy,
Expand All @@ -700,7 +703,7 @@ mod tests {
use crate::{assert_batches_sorted_eq, physical_plan::common};
use arrow::array::{Float64Array, UInt32Array};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::error::Result as ArrowResult;
use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::{DataFusionError, Result, ScalarValue};
use datafusion_physical_expr::expressions::{lit, Count};
Expand Down Expand Up @@ -1081,6 +1084,63 @@ mod tests {
check_grouping_sets(input).await
}

#[tokio::test]
async fn test_oom() -> Result<()> {
let input: Arc<dyn ExecutionPlan> =
Arc::new(TestYieldingExec { yield_first: true });
let input_schema = input.schema();

let session_ctx = SessionContext::with_config_rt(
SessionConfig::default(),
Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0))
.unwrap(),
),
);
let task_ctx = session_ctx.task_ctx();

let groups = PhysicalGroupBy {
expr: vec![(col("a", &input_schema)?, "a".to_string())],
null_expr: vec![],
groups: vec![vec![false]],
};

let aggregates: Vec<Arc<dyn AggregateExpr>> = vec![Arc::new(Avg::new(
col("b", &input_schema)?,
"AVG(b)".to_string(),
DataType::Float64,
))];

let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
groups,
aggregates,
input,
input_schema.clone(),
)?);

let err = common::collect(partial_aggregate.execute(0, task_ctx.clone())?)
.await
.unwrap_err();

// error root cause traversal is a bit complicated, see #4172.
if let DataFusionError::ArrowError(ArrowError::ExternalError(err)) = err {
if let Some(err) = err.downcast_ref::<DataFusionError>() {
assert!(
matches!(err, DataFusionError::ResourcesExhausted(_)),
"Wrong inner error type: {}",
err,
);
} else {
panic!("Wrong arrow error type: {err}")
}
} else {
panic!("Wrong outer error type: {err}")
}

Ok(())
}

#[tokio::test]
async fn test_drop_cancel_without_groups() -> Result<()> {
let session_ctx = SessionContext::new();
Expand Down
Loading

0 comments on commit f3a65c7

Please sign in to comment.