Skip to content

Commit

Permalink
thread_local_system: test behavior when used in Handle::block_on insi…
Browse files Browse the repository at this point in the history
…de a spawn_blocking

Context: neondatabase/neon#6667
  • Loading branch information
problame committed Feb 7, 2024
1 parent 0e1af4c commit 8bddb82
Showing 1 changed file with 95 additions and 0 deletions.
95 changes: 95 additions & 0 deletions tokio-epoll-uring/src/system/lifecycle/thread_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,98 @@ impl std::ops::Deref for Handle {
.expect("must be already initialized when using this")
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

#[test]
fn test_block_on_inside_spawn_blocking_1() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let weak = rt.block_on(async move {
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async {
let system = crate::thread_local_system().await;
let ((), res) = system.nop().await;
res.unwrap();
Arc::downgrade(&system.0)
})
})
.await
.unwrap()
});

drop(rt);

assert!(weak.upgrade().is_none());
}

#[test]
fn test_block_on_inside_spawn_blocking_2() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let task = rt.spawn(async move {
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async {
let system = crate::thread_local_system().await;
let ((), res) = system.nop().await;
res.unwrap();
Arc::downgrade(&system.0)
})
})
.await
.unwrap()
});

let weak = rt.block_on(task).unwrap();

drop(rt);

assert!(weak.upgrade().is_none());
}

#[test]
fn test_block_on_inside_spawn_blocking_3() {
let keepalive = Duration::from_secs(4);

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_keep_alive(keepalive)
.build()
.unwrap();

let task = rt.spawn(async move {
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async {
let system = crate::thread_local_system().await;
let ((), res) = system.nop().await;
res.unwrap();
Arc::downgrade(&system.0)
})
})
.await
.unwrap()
});

let weak = rt.block_on(task).unwrap();

// immediately after should succeed
// (technically this makes the test sensitive to CPU timing but 4 secs is a long time)
assert!(weak.upgrade().is_some());

// add 1 second slack time to make the test less sensitive to CPU timing
std::thread::sleep(keepalive + Duration::from_secs(1));

// the `rt` should have downsized its blocking pool after `keepalive`
assert!(weak.upgrade().is_none());

drop(rt);
}
}

0 comments on commit 8bddb82

Please sign in to comment.