Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix thread panic when "unreachable" SpawnedTask code is reachable. #12086

Merged
merged 9 commits into from
Aug 23, 2024
3 changes: 3 additions & 0 deletions datafusion/common-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ path = "src/lib.rs"

[dependencies]
tokio = { workspace = true }

[dev-dependencies]
tokio = { version = "1.36", features = ["rt", "rt-multi-thread", "time"] }
wiedld marked this conversation as resolved.
Show resolved Hide resolved
53 changes: 53 additions & 0 deletions datafusion/common-runtime/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,56 @@ impl<R: 'static> SpawnedTask<R> {
})
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::{
future::{pending, Pending},
sync::{Arc, Mutex},
};

use tokio::runtime::Runtime;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This idea came a bit late, so this is optional:

Could you add a test that shows that the panic is propagated, e.g.:

#[tokio::test]
#[should_panic(expected = "foo")]
async fn panic_resume() {
    // this should panic w/o an `unwrap`
    SpawnedTask::spawn(async {panic!("foo")}).join_unwind().await.ok();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea -- I'll make a follow on PR to do so (as it adds additional test coverage for an existing feature rather than something that was changed in this PR)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[tokio::test]
#[should_panic(
expected = "entered unreachable code: SpawnedTask was cancelled unexpectedly"
wiedld marked this conversation as resolved.
Show resolved Hide resolved
)]
async fn runtime_shutdown() {
// capture the panic message
let panic_msg = Arc::new(Mutex::new(None));
let captured_panic_msg = Arc::clone(&panic_msg);
std::panic::set_hook(Box::new(move |e| {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we remove this hook? IIRC it is a global process state (similar to env variables) and is shared with all the other tests that run in the same process (so all unit tests in common-runtime). This will be a pain to debug for others.

See my comment below on an alternative.

let mut guard = captured_panic_msg.lock().unwrap();
*guard = Some(e.to_string());
}));

for _ in 0..30 {
let rt = Runtime::new().unwrap();
let join = rt.spawn(async {
let task = SpawnedTask::spawn(async {
let fut: Pending<()> = pending();
fut.await;
unreachable!("should never return");
});
task.join_unwind().await;
});

// caller shutdown their DF runtime (e.g. timeout, error in caller, etc)
rt.shutdown_background();

// race condition
// poll occurs during shutdown (buffered stream poll calls, etc)
let _ = join.await;
}

// demonstrate that we hit the unreachable code
let maybe_panic = panic_msg.lock().unwrap().clone();
assert_eq!(
maybe_panic, None,
"should not have rt thread panic, instead found {:?}",
maybe_panic
);
}
}
Loading