Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

thread_local_system: test behavior when used in Handle::block_on inside a spawn_blocking #44

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 95 additions & 0 deletions tokio-epoll-uring/src/system/lifecycle/thread_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,98 @@ impl std::ops::Deref for Handle {
.expect("must be already initialized when using this")
}
}

#[cfg(test)]
mod tests {
use std::{sync::Arc, time::Duration};

#[test]
fn test_block_on_inside_spawn_blocking_1() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let weak = rt.block_on(async move {
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async {
let system = crate::thread_local_system().await;
let ((), res) = system.nop().await;
res.unwrap();
Arc::downgrade(&system.0)
})
})
.await
.unwrap()
});

drop(rt);

assert!(weak.upgrade().is_none());
}

#[test]
fn test_block_on_inside_spawn_blocking_2() {
let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();

let task = rt.spawn(async move {
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async {
let system = crate::thread_local_system().await;
let ((), res) = system.nop().await;
res.unwrap();
Arc::downgrade(&system.0)
})
})
.await
.unwrap()
});

let weak = rt.block_on(task).unwrap();

drop(rt);

assert!(weak.upgrade().is_none());
}

#[test]
fn test_block_on_inside_spawn_blocking_3() {
let keepalive = Duration::from_secs(4);

let rt = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.thread_keep_alive(keepalive)
.build()
.unwrap();

let task = rt.spawn(async move {
tokio::task::spawn_blocking(|| {
tokio::runtime::Handle::current().block_on(async {
let system = crate::thread_local_system().await;
let ((), res) = system.nop().await;
res.unwrap();
Arc::downgrade(&system.0)
})
})
.await
.unwrap()
});

let weak = rt.block_on(task).unwrap();

// immediately after should succeed
// (technically this makes the test sensitive to CPU timing but 4 secs is a long time)
assert!(weak.upgrade().is_some());

// add 1 second slack time to make the test less sensitive to CPU timing
std::thread::sleep(keepalive + Duration::from_secs(1));

// the `rt` should have downsized its blocking pool after `keepalive`
assert!(weak.upgrade().is_none());

drop(rt);
}
}
Loading