Skip to content

Commit

Permalink
Extend memory limit tests for sort with a limit
Browse files Browse the repository at this point in the history
  • Loading branch information
gruuya committed Aug 4, 2023
1 parent 5e110d5 commit c8d96b2
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 18 deletions.
12 changes: 8 additions & 4 deletions datafusion/core/src/physical_plan/sorts/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,18 @@ impl ExternalSorter {

/// Sorts the in_mem_batches in place
async fn in_mem_sort(&mut self) -> Result<()> {
if self.in_mem_batches.iter().all(|(sorted, _)| *sorted) && self.fetch.is_none() {
if self.in_mem_batches.is_empty()
|| self.in_mem_batches.iter().all(|(sorted, _)| *sorted)
&& self.fetch.is_none()
{
// Do not sort if all the in-mem batches are sorted _and_ there was no `fetch` specified.
// If a `fetch` was specified we could hit a pathological case even if all the batches
// are sorted whereby we have ~100 batches with 1 row each (in case of `LIMIT 1`), and
// it turns out this is a problem when reading from the spills:
// are sorted whereby we have ~100 in-mem batches with 1 row each (in case of `LIMIT 1`),
// and then if this gets spilled to disk it turns out this is a problem when reading
// a series of 1-row batches from the spill:
// `Failure while reading spill file: NamedTempFile("/var..."). Error: Execution error: channel closed`
// Even if a larger `fetch` was used we would likely benefit from merging the individual
// batches together during sort.
// truncated batches together during sort.
return Ok(());
}

Expand Down
12 changes: 6 additions & 6 deletions datafusion/core/tests/fuzz_cases/order_spill_fuzz.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,18 +39,18 @@ use test_utils::{batches_to_vec, partitions_to_sorted_vec};
#[case::mem_10k_5_rows(10240, 5, None, false)]
#[case::mem_10k_20k_rows(10240, 20000, None, true)]
#[case::mem_10k_1m_rows(10240, 1000000, None, true)]
#[case::mem_10k_5_rows_fetch_10(10240, 5, Some(10), false)]
#[case::mem_10k_20k_rows_fetch_10(10240, 20000, Some(10), false)]
#[case::mem_10k_1m_rows_fetch_10(10240, 1000000, Some(10), false)]
#[case::mem_10k_5_rows_fetch_10(10240, 5, Some(1), false)]
#[case::mem_10k_20k_rows_fetch_10(10240, 20000, Some(1), false)]
#[case::mem_10k_1m_rows_fetch_10(10240, 1000000, Some(1), false)]
#[case::mem_10k_5_rows_fetch_1000(10240, 5, Some(1000), false)]
#[case::mem_10k_20k_rows_fetch_1000(10240, 20000, Some(1000), true)]
#[case::mem_10k_1m_rows_fetch_1000(10240, 1000000, Some(1000), true)]
#[case::mem_100k_5_rows(102400, 5, None, false)]
#[case::mem_100k_20k_rows(102400, 20000, None, false)]
#[case::mem_100k_1m_rows(102400, 1000000, None, true)]
#[case::mem_100k_5_rows_fetch_10(102400, 5, Some(10), false)]
#[case::mem_100k_20k_rows_fetch_10(102400, 20000, Some(10), false)]
#[case::mem_100k_1m_rows_fetch_10(102400, 1000000, Some(10), false)]
#[case::mem_100k_5_rows_fetch_1(102400, 5, Some(1), false)]
#[case::mem_100k_20k_rows_fetch_1(102400, 20000, Some(1), false)]
#[case::mem_100k_1m_rows_fetch_1(102400, 1000000, Some(1), false)]
#[case::mem_100k_5_rows_fetch_1000(102400, 5, Some(1000), false)]
#[case::mem_100k_20k_rows_fetch_1000(102400, 20000, Some(1000), false)]
#[case::mem_100k_1m_rows_fetch_1000(102400, 1000000, Some(1000), false)]
Expand Down
48 changes: 40 additions & 8 deletions datafusion/core/tests/memory_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use arrow::record_batch::RecordBatch;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::streaming::PartitionStream;
use futures::StreamExt;
use rstest::rstest;
use std::sync::Arc;

use datafusion::datasource::streaming::StreamingTable;
Expand All @@ -45,17 +46,38 @@ fn init() {
let _ = env_logger::try_init();
}

#[rstest]
#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 100_000)]
#[case::cant_spill_to_disk(vec!["Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)"], 200_000)]
#[case::no_oom(vec![], 600_000)]
#[tokio::test]
async fn oom_sort() {
async fn sort(#[case] expected_errors: Vec<&str>, #[case] memory_limit: usize) {
TestCase::new(
"select * from t order by host DESC",
vec![
"Resources exhausted: Memory Exhausted while Sorting (DiskManager is disabled)",
],
200_000,
expected_errors,
memory_limit,
)
.run()
.await
}

// We expect to see lower memory thresholds in general when applying a `LIMIT` clause due to eager sorting
#[rstest]
#[case::cant_grow_reservation(vec!["Resources exhausted: Failed to allocate additional", "ExternalSorter"], 20_000)]
#[case::cant_spill_to_disk(vec!["Memory Exhausted while Sorting (DiskManager is disabled)"], 40_000)]
#[case::no_oom(vec![], 80_000)]
#[tokio::test]
async fn sort_with_limit(
#[case] expected_errors: Vec<&str>,
#[case] memory_limit: usize,
) {
TestCase::new(
"select * from t order by host DESC limit 10",
expected_errors,
memory_limit,
)
.run()
.await
.run()
.await
}

#[tokio::test]
Expand Down Expand Up @@ -267,9 +289,19 @@ impl TestCase {

match df.collect().await {
Ok(_batches) => {
panic!("Unexpected success when running, expected memory limit failure")
if !expected_errors.is_empty() {
panic!(
"Unexpected success when running, expected memory limit failure"
)
}
}
Err(e) => {
if expected_errors.is_empty() {
panic!(
"Unexpected failure when running, expected sufficient memory {e}"
)
}

for error_substring in expected_errors {
assert_contains!(e.to_string(), error_substring);
}
Expand Down

0 comments on commit c8d96b2

Please sign in to comment.