Skip to content

Commit

Permalink
test(11523): demonstrate the underlying pool behavior on deregister
Browse files Browse the repository at this point in the history
  • Loading branch information
wiedld committed Jul 26, 2024
1 parent e71a710 commit ca32c1a
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,4 +565,71 @@ mod tests {
"should provide proper error with different hashed consumer (foo(can_spill=false)=30 bytes and foo(can_spill=true)=0 bytes, available=70)"
);
}

#[test]
fn test_tracked_consumers_pool_deregister() {
fn test_per_pool_type(pool: Arc<dyn MemoryPool>) {
// Baseline: see the 2 memory consumers
let mut r0 = MemoryConsumer::new("r0").register(&pool);
r0.grow(10);
let r1_consumer = MemoryConsumer::new("r1");
let mut r1 = r1_consumer.clone().register(&pool);
r1.grow(20);
let expected = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70. The top memory consumers (across reservations) are: r1 consumed 20 bytes, r0 consumed 10 bytes";
assert!(
matches!(
r0.try_grow(150),
Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected)
),
"should provide proper error with both consumers"
);

// Test: unregister one
// only the remaining one should be listed
pool.unregister(&r1_consumer);
let expected_consumers = "The top memory consumers (across reservations) are: r0 consumed 10 bytes";
assert!(
matches!(
r0.try_grow(150),
Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_consumers)
),
"should provide proper error with only 1 consumer left registered"
);

// Test: actual message we see is the `available is 70`. When it should be `available is 90`.
// This is because the pool.shrink() does not automatically occur within the inner_pool.deregister().
let expected_70_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 70.";
assert!(
matches!(
r0.try_grow(150),
Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_70_available)
),
"should provide proper error with both consumers"
);

// Test: the registration needs to resize itself to zero,
// for the proper error message
r1.resize(0);
let expected_90_available = "Failed to allocate additional 150 bytes for r0 with 10 bytes already allocated - maximum available is 90.";
assert!(
matches!(
r0.try_grow(150),
Err(DataFusionError::ResourcesExhausted(e)) if e.to_string().contains(expected_90_available)
),
"should provide proper error with both consumers"
);
}

let tracked_spill_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
FairSpillPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
test_per_pool_type(tracked_spill_pool);

let tracked_greedy_pool: Arc<dyn MemoryPool> = Arc::new(TrackConsumersPool::new(
GreedyMemoryPool::new(100),
NonZeroUsize::new(3).unwrap(),
));
test_per_pool_type(tracked_greedy_pool);
}
}

0 comments on commit ca32c1a

Please sign in to comment.