From 0874df275cdaed03a072ac0407727b98e2606c54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Mon, 26 Dec 2022 20:28:01 +0100 Subject: [PATCH 1/3] Print "stalled" task on shutdown When the node is shutting down, we give the Tokio runtime 60 seconds to shutdown. If after these 60 seconds there are still running tasks, we now print these tasks. This should help debugging nodes that have stalled tasks. This pr introduces a `TaskRegistry` that keeps track of all running tasks. Each task registers and unregisters itself in this `TaskRegistry`. --- Cargo.lock | 1 + client/cli/Cargo.toml | 1 + client/cli/src/runner.rs | 117 ++++++++++++++++++++----- client/service/src/task_manager/mod.rs | 98 ++++++++++++++++++++- 4 files changed, 192 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 19abb44e5ff9c..e739c1a1892da 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7256,6 +7256,7 @@ dependencies = [ "sp-keystore", "sp-panic-handler", "sp-runtime", + "sp-tracing", "sp-version", "tempfile", "thiserror", diff --git a/client/cli/Cargo.toml b/client/cli/Cargo.toml index a5de78da487c8..ee4426b91dbb8 100644 --- a/client/cli/Cargo.toml +++ b/client/cli/Cargo.toml @@ -50,6 +50,7 @@ sp-version = { version = "5.0.0", path = "../../primitives/version" } [dev-dependencies] tempfile = "3.1.0" futures-timer = "3.0.1" +sp-tracing = { version = "6.0.0", path = "../../primitives/tracing" } [features] default = ["rocksdb"] diff --git a/client/cli/src/runner.rs b/client/cli/src/runner.rs index d4191feddfa90..1a532b3bbc6fb 100644 --- a/client/cli/src/runner.rs +++ b/client/cli/src/runner.rs @@ -152,10 +152,38 @@ impl Runner { // // This is important to be done before we instruct the tokio runtime to shutdown. Otherwise // the tokio runtime will wait the full 60 seconds for all tasks to stop. - drop(task_manager); + let task_registry = task_manager.into_task_registry(); // Give all futures 60 seconds to shutdown, before tokio "leaks" them. - self.tokio_runtime.shutdown_timeout(Duration::from_secs(60)); + let shutdown_timeout = Duration::from_secs(60); + self.tokio_runtime.shutdown_timeout(shutdown_timeout); + + let running_tasks = task_registry.running_tasks(); + + if !running_tasks.is_empty() { + log::error!("Detected running(potentially stalled) tasks on shutdown:"); + running_tasks.iter().for_each(|(task, count)| { + let instances_desc = + if *count > 1 { format!("with {} instances ", count) } else { "".to_string() }; + + if task.is_default_group() { + log::error!( + "Task \"{}\" was still running {}after waiting {} seconds to finish.", + task.name, + instances_desc, + shutdown_timeout.as_secs(), + ); + } else { + log::error!( + "Task \"{}\" (Group: {}) was still running {}after waiting {} seconds to finish.", + task.name, + task.group, + instances_desc, + shutdown_timeout.as_secs(), + ); + } + }); + } res.map_err(Into::into) } @@ -388,34 +416,75 @@ mod tests { assert!((count as u128) < (Duration::from_secs(30).as_millis() / 50)); } + fn run_test_in_another_process( + test_name: &str, + test_body: impl FnOnce(), + ) -> Option { + if std::env::var("RUN_FORKED_TEST").is_ok() { + test_body(); + None + } else { + let output = std::process::Command::new(std::env::current_exe().unwrap()) + .arg(test_name) + .env("RUN_FORKED_TEST", "1") + .output() + .unwrap(); + + assert!(output.status.success()); + Some(output) + } + } + /// This test ensures that `run_node_until_exit` aborts waiting for "stuck" tasks after 60 /// seconds, aka doesn't wait until they are finished (which may never happen). #[test] fn ensure_run_until_exit_is_not_blocking_indefinitely() { - let runner = create_runner(); + let output = run_test_in_another_process( + "ensure_run_until_exit_is_not_blocking_indefinitely", + || { + sp_tracing::try_init_simple(); + + let runner = create_runner(); + + runner + .run_node_until_exit(move |cfg| async move { + let task_manager = + TaskManager::new(cfg.tokio_handle.clone(), None).unwrap(); + let (sender, receiver) = futures::channel::oneshot::channel(); + + // We need to use `spawn_blocking` here so that we get a dedicated thread + // for our future. This future is more blocking code that will never end. + task_manager.spawn_handle().spawn_blocking("test", None, async move { + let _ = sender.send(()); + loop { + std::thread::sleep(Duration::from_secs(30)); + } + }); + + task_manager.spawn_essential_handle().spawn_blocking( + "test2", + None, + async { + // Let's stop this essential task directly when our other task + // started. It will signal that the task manager should end. + let _ = receiver.await; + }, + ); + + Ok::<_, sc_service::Error>(task_manager) + }) + .unwrap_err(); + }, + ); - runner - .run_node_until_exit(move |cfg| async move { - let task_manager = TaskManager::new(cfg.tokio_handle.clone(), None).unwrap(); - let (sender, receiver) = futures::channel::oneshot::channel(); + let Some(output) = output else { return } ; - // We need to use `spawn_blocking` here so that we get a dedicated thread for our - // future. This future is more blocking code that will never end. - task_manager.spawn_handle().spawn_blocking("test", None, async move { - let _ = sender.send(()); - loop { - std::thread::sleep(Duration::from_secs(30)); - } - }); - - task_manager.spawn_essential_handle().spawn_blocking("test2", None, async { - // Let's stop this essential task directly when our other task started. - // It will signal that the task manager should end. - let _ = receiver.await; - }); + let stderr = dbg!(String::from_utf8(output.stderr).unwrap()); - Ok::<_, sc_service::Error>(task_manager) - }) - .unwrap_err(); + assert!( + stderr.contains("Task \"test\" was still running after waiting 60 seconds to finish.") + ); + assert!(!stderr + .contains("Task \"test2\" was still running after waiting 60 seconds to finish.")); } } diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 23265f9672555..d792122576444 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -24,12 +24,19 @@ use futures::{ future::{pending, select, try_join_all, BoxFuture, Either}, Future, FutureExt, StreamExt, }; +use parking_lot::Mutex; use prometheus_endpoint::{ exponential_buckets, register, CounterVec, HistogramOpts, HistogramVec, Opts, PrometheusError, Registry, U64, }; use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; -use std::{panic, pin::Pin, result::Result}; +use std::{ + collections::{hash_map::Entry, HashMap}, + panic, + pin::Pin, + result::Result, + sync::Arc, +}; use tokio::runtime::Handle; use tracing_futures::Instrument; @@ -72,6 +79,7 @@ pub struct SpawnTaskHandle { on_exit: exit_future::Exit, tokio_handle: Handle, metrics: Option, + task_registry: TaskRegistry, } impl SpawnTaskHandle { @@ -113,6 +121,7 @@ impl SpawnTaskHandle { ) { let on_exit = self.on_exit.clone(); let metrics = self.metrics.clone(); + let registry = self.task_registry.clone(); let group = match group.into() { GroupName::Specific(var) => var, @@ -129,6 +138,10 @@ impl SpawnTaskHandle { } let future = async move { + // Register the task and keep the "token" alive until the task is ended. Then this + // "token" will unregister this task. + let _registry_token = registry.register_task(name, group); + if let Some(metrics) = metrics { // Add some wrappers around `task`. let task = { @@ -298,6 +311,8 @@ pub struct TaskManager { /// terminates and gracefully shutdown. Also ends the parent `future()` if a child's essential /// task fails. children: Vec, + /// The registry of all running tasks. + task_registry: TaskRegistry, } impl TaskManager { @@ -324,6 +339,7 @@ impl TaskManager { essential_failed_rx, keep_alive: Box::new(()), children: Vec::new(), + task_registry: Default::default(), }) } @@ -333,6 +349,7 @@ impl TaskManager { on_exit: self.on_exit.clone(), tokio_handle: self.tokio_handle.clone(), metrics: self.metrics.clone(), + task_registry: self.task_registry.clone(), } } @@ -385,6 +402,14 @@ impl TaskManager { pub fn add_child(&mut self, child: TaskManager) { self.children.push(child); } + + /// Consume `self` and return the [`TaskRegistry`]. + /// + /// This [`TaskRegistry`] can be used to check for still running tasks after this task manager + /// was dropped. + pub fn into_task_registry(self) -> TaskRegistry { + self.task_registry + } } #[derive(Clone)] @@ -434,3 +459,74 @@ impl Metrics { }) } } + +/// Ensures that a [`Task`] is unregistered when this object is dropped. +struct UnregisterOnDrop { + task: Task, + registry: TaskRegistry, +} + +impl Drop for UnregisterOnDrop { + fn drop(&mut self) { + let mut tasks = self.registry.tasks.lock(); + + if let Entry::Occupied(mut entry) = (*tasks).entry(self.task.clone()) { + *entry.get_mut() -= 1; + + if *entry.get() == 0 { + entry.remove(); + } + } + } +} + +/// Represents a running async task in the [`TaskManager`]. +/// +/// As a task is identified by a name and a group, it is totally valid that there exists multiple +/// tasks with the same name and group. +#[derive(Clone, Hash, Eq, PartialEq)] +pub struct Task { + /// The name of the task. + pub name: &'static str, + /// The group this task is associated to. + pub group: &'static str, +} + +impl Task { + /// Returns if the `group` is the [`DEFAULT_GROUP_NAME`]. + pub fn is_default_group(&self) -> bool { + self.group == DEFAULT_GROUP_NAME + } +} + +/// Keeps track of all running [`Task`]s in [`TaskManager`]. +#[derive(Clone, Default)] +pub struct TaskRegistry { + tasks: Arc>>, +} + +impl TaskRegistry { + /// Register a task with the given `name` and `group`. + /// + /// Returns [`UnregisterOnDrop`] that ensures that the task is unregistered when this value is + /// dropped. + fn register_task(&self, name: &'static str, group: &'static str) -> UnregisterOnDrop { + let task = Task { name, group }; + + { + let mut tasks = self.tasks.lock(); + + *(*tasks).entry(task.clone()).or_default() += 1; + } + + UnregisterOnDrop { task, registry: self.clone() } + } + + /// Returns the running tasks. + /// + /// As a task is only identified by its `name` and `group`, there can be duplicate tasks. The + /// number per task represents the concurrently running tasks with the same identifier. + pub fn running_tasks(&self) -> HashMap { + (*self.tasks.lock()).clone() + } +} From c521493e41d03454f1cb19d5ca0104fb40c2ff30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Tue, 27 Dec 2022 23:12:52 +0100 Subject: [PATCH 2/3] Fix rustdoc --- client/service/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 8b3a29ba4032a..7a8c7dd0b31f3 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -83,7 +83,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions; pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool}; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; -pub use task_manager::{SpawnTaskHandle, TaskManager, DEFAULT_GROUP_NAME}; +pub use task_manager::{SpawnTaskHandle, Task, Taskmanager, TaskRegistry, DEFAULT_GROUP_NAME}; const DEFAULT_PROTOCOL_ID: &str = "sup"; From 521dfce52e52c1b890fd23c2a22f6751754c60c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bastian=20K=C3=B6cher?= Date: Wed, 28 Dec 2022 00:20:40 +0100 Subject: [PATCH 3/3] Update client/service/src/lib.rs --- client/service/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/service/src/lib.rs b/client/service/src/lib.rs index 7a8c7dd0b31f3..1529b822ade32 100644 --- a/client/service/src/lib.rs +++ b/client/service/src/lib.rs @@ -83,7 +83,7 @@ pub use sc_transaction_pool::Options as TransactionPoolOptions; pub use sc_transaction_pool_api::{error::IntoPoolError, InPoolTransaction, TransactionPool}; #[doc(hidden)] pub use std::{ops::Deref, result::Result, sync::Arc}; -pub use task_manager::{SpawnTaskHandle, Task, Taskmanager, TaskRegistry, DEFAULT_GROUP_NAME}; +pub use task_manager::{SpawnTaskHandle, Task, TaskManager, TaskRegistry, DEFAULT_GROUP_NAME}; const DEFAULT_PROTOCOL_ID: &str = "sup";