From 8bddb82f52a99e56216a929afddda5596e19af20 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 7 Feb 2024 17:44:43 +0000 Subject: [PATCH] thread_local_system: test behavior when used in Handle::block_on inside a spawn_blocking Context: https://github.com/neondatabase/neon/issues/6667 --- .../src/system/lifecycle/thread_local.rs | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/tokio-epoll-uring/src/system/lifecycle/thread_local.rs b/tokio-epoll-uring/src/system/lifecycle/thread_local.rs index 365baca..973bb89 100644 --- a/tokio-epoll-uring/src/system/lifecycle/thread_local.rs +++ b/tokio-epoll-uring/src/system/lifecycle/thread_local.rs @@ -31,3 +31,98 @@ impl std::ops::Deref for Handle { .expect("must be already initialized when using this") } } + +#[cfg(test)] +mod tests { + use std::{sync::Arc, time::Duration}; + + #[test] + fn test_block_on_inside_spawn_blocking_1() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let weak = rt.block_on(async move { + tokio::task::spawn_blocking(|| { + tokio::runtime::Handle::current().block_on(async { + let system = crate::thread_local_system().await; + let ((), res) = system.nop().await; + res.unwrap(); + Arc::downgrade(&system.0) + }) + }) + .await + .unwrap() + }); + + drop(rt); + + assert!(weak.upgrade().is_none()); + } + + #[test] + fn test_block_on_inside_spawn_blocking_2() { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + + let task = rt.spawn(async move { + tokio::task::spawn_blocking(|| { + tokio::runtime::Handle::current().block_on(async { + let system = crate::thread_local_system().await; + let ((), res) = system.nop().await; + res.unwrap(); + Arc::downgrade(&system.0) + }) + }) + .await + .unwrap() + }); + + let weak = rt.block_on(task).unwrap(); + + drop(rt); + + assert!(weak.upgrade().is_none()); + } + + #[test] + fn test_block_on_inside_spawn_blocking_3() { + let keepalive = Duration::from_secs(4); + + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_keep_alive(keepalive) + .build() + .unwrap(); + + let task = rt.spawn(async move { + tokio::task::spawn_blocking(|| { + tokio::runtime::Handle::current().block_on(async { + let system = crate::thread_local_system().await; + let ((), res) = system.nop().await; + res.unwrap(); + Arc::downgrade(&system.0) + }) + }) + .await + .unwrap() + }); + + let weak = rt.block_on(task).unwrap(); + + // immediately after should succeed + // (technically this makes the test sensitive to CPU timing but 4 secs is a long time) + assert!(weak.upgrade().is_some()); + + // add 1 second slack time to make the test less sensitive to CPU timing + std::thread::sleep(keepalive + Duration::from_secs(1)); + + // the `rt` should have downsized its blocking pool after `keepalive` + assert!(weak.upgrade().is_none()); + + drop(rt); + } +}