From 169ee81bd14d4cb9156412363ecd360b6bda827f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 23 Aug 2022 15:30:21 -0700 Subject: [PATCH 1/2] rt, refactor: extract basic_scheduler::Config The `basic_scheduler` uses a `Config` struct to pass in runtime configuration options. These config options are set at the public `runtime::Builder` level and are shared between the basic scheduler and the multi-threaded scheduler. This change extracts the struct and uses it to configure the multi-threaded scheduler as well --- tokio/src/runtime/basic_scheduler.rs | 20 +--------------- tokio/src/runtime/builder.rs | 17 +++++++------ tokio/src/runtime/config.rs | 19 +++++++++++++++ tokio/src/runtime/mod.rs | 3 +++ tokio/src/runtime/thread_pool/mod.rs | 17 +++---------- tokio/src/runtime/thread_pool/worker.rs | 32 +++++++------------------ 6 files changed, 45 insertions(+), 63 deletions(-) create mode 100644 tokio/src/runtime/config.rs diff --git a/tokio/src/runtime/basic_scheduler.rs b/tokio/src/runtime/basic_scheduler.rs index 9792ef57bd5..4b6a7b97587 100644 --- a/tokio/src/runtime/basic_scheduler.rs +++ b/tokio/src/runtime/basic_scheduler.rs @@ -5,7 +5,7 @@ use crate::park::{Park, Unpark}; use crate::runtime::context::EnterGuard; use crate::runtime::driver::Driver; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; -use crate::runtime::{Callback, HandleInner}; +use crate::runtime::{Config, HandleInner}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::sync::notify::Notify; use crate::util::atomic_cell::AtomicCell; @@ -68,24 +68,6 @@ pub(crate) struct Spawner { shared: Arc, } -pub(crate) struct Config { - /// How many ticks before pulling a task from the global/remote queue? - pub(crate) global_queue_interval: u32, - - /// How many ticks before yielding to the driver for timer and I/O events? - pub(crate) event_interval: u32, - - /// Callback for a worker parking itself - pub(crate) before_park: Option, - - /// Callback for a worker unparking itself - pub(crate) after_unpark: Option, - - #[cfg(tokio_unstable)] - /// How to respond to unhandled task panics. - pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, -} - /// Scheduler state shared between threads. struct Shared { /// Remote run queue. None if the `Runtime` has been dropped. diff --git a/tokio/src/runtime/builder.rs b/tokio/src/runtime/builder.rs index 2f678aa72e2..29ae2f8d104 100644 --- a/tokio/src/runtime/builder.rs +++ b/tokio/src/runtime/builder.rs @@ -784,8 +784,7 @@ impl Builder { } fn build_basic_runtime(&mut self) -> io::Result { - use crate::runtime::basic_scheduler::Config; - use crate::runtime::{BasicScheduler, HandleInner, Kind}; + use crate::runtime::{BasicScheduler, Config, HandleInner, Kind}; let (driver, resources) = driver::Driver::new(self.get_cfg())?; @@ -903,7 +902,7 @@ cfg_rt_multi_thread! { impl Builder { fn build_threaded_runtime(&mut self) -> io::Result { use crate::loom::sys::num_cpus; - use crate::runtime::{HandleInner, Kind, ThreadPool}; + use crate::runtime::{Config, HandleInner, Kind, ThreadPool}; let core_threads = self.worker_threads.unwrap_or_else(num_cpus); @@ -926,10 +925,14 @@ cfg_rt_multi_thread! { core_threads, driver, handle_inner, - self.before_park.clone(), - self.after_unpark.clone(), - self.global_queue_interval, - self.event_interval, + Config { + before_park: self.before_park.clone(), + after_unpark: self.after_unpark.clone(), + global_queue_interval: self.global_queue_interval, + event_interval: self.event_interval, + #[cfg(tokio_unstable)] + unhandled_panic: self.unhandled_panic.clone(), + }, ); let spawner = Spawner::ThreadPool(scheduler.spawner().clone()); diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs new file mode 100644 index 00000000000..c2a5b63103b --- /dev/null +++ b/tokio/src/runtime/config.rs @@ -0,0 +1,19 @@ +use crate::runtime::Callback; + +pub(crate) struct Config { + /// How many ticks before pulling a task from the global/remote queue? + pub(crate) global_queue_interval: u32, + + /// How many ticks before yielding to the driver for timer and I/O events? + pub(crate) event_interval: u32, + + /// Callback for a worker parking itself + pub(crate) before_park: Option, + + /// Callback for a worker unparking itself + pub(crate) after_unpark: Option, + + #[cfg(tokio_unstable)] + /// How to respond to unhandled task panics. + pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, +} \ No newline at end of file diff --git a/tokio/src/runtime/mod.rs b/tokio/src/runtime/mod.rs index ff06b70eb2d..7fb907f5a04 100644 --- a/tokio/src/runtime/mod.rs +++ b/tokio/src/runtime/mod.rs @@ -202,6 +202,9 @@ cfg_rt! { mod basic_scheduler; use basic_scheduler::BasicScheduler; + mod config; + use config::Config; + mod blocking; use blocking::BlockingPool; #[cfg_attr(tokio_wasi, allow(unused_imports))] diff --git a/tokio/src/runtime/thread_pool/mod.rs b/tokio/src/runtime/thread_pool/mod.rs index 5ac71e139d9..b39d9af5386 100644 --- a/tokio/src/runtime/thread_pool/mod.rs +++ b/tokio/src/runtime/thread_pool/mod.rs @@ -15,7 +15,7 @@ pub(crate) use worker::block_in_place; use crate::loom::sync::Arc; use crate::runtime::task::{self, JoinHandle}; -use crate::runtime::{Callback, Driver, HandleInner}; +use crate::runtime::{Config, Driver, HandleInner}; use std::fmt; use std::future::Future; @@ -49,21 +49,10 @@ impl ThreadPool { size: usize, driver: Driver, handle_inner: HandleInner, - before_park: Option, - after_unpark: Option, - global_queue_interval: u32, - event_interval: u32, + config: Config, ) -> (ThreadPool, Launch) { let parker = Parker::new(driver); - let (shared, launch) = worker::create( - size, - parker, - handle_inner, - before_park, - after_unpark, - global_queue_interval, - event_interval, - ); + let (shared, launch) = worker::create(size, parker, handle_inner, config); let spawner = Spawner { shared }; let thread_pool = ThreadPool { spawner }; diff --git a/tokio/src/runtime/thread_pool/worker.rs b/tokio/src/runtime/thread_pool/worker.rs index b01c5bc4749..2e4a810d8e0 100644 --- a/tokio/src/runtime/thread_pool/worker.rs +++ b/tokio/src/runtime/thread_pool/worker.rs @@ -65,7 +65,7 @@ use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::task::{Inject, JoinHandle, OwnedTasks}; use crate::runtime::thread_pool::{queue, Idle, Parker, Unparker}; -use crate::runtime::{task, Callback, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics}; +use crate::runtime::{task, Config, HandleInner, MetricsBatch, SchedulerMetrics, WorkerMetrics}; use crate::util::atomic_cell::AtomicCell; use crate::util::FastRand; @@ -117,12 +117,6 @@ struct Core { /// Fast random number generator. rand: FastRand, - - /// How many ticks before pulling a task from the global/remote queue? - global_queue_interval: u32, - - /// How many ticks before yielding to the driver for timer and I/O events? - event_interval: u32, } /// State shared across all workers @@ -152,10 +146,8 @@ pub(super) struct Shared { #[allow(clippy::vec_box)] // we're moving an already-boxed value shutdown_cores: Mutex>>, - /// Callback for a worker parking itself - before_park: Option, - /// Callback for a worker unparking itself - after_unpark: Option, + /// Scheduler configuration options + config: Config, /// Collects metrics from the runtime. pub(super) scheduler_metrics: SchedulerMetrics, @@ -202,10 +194,7 @@ pub(super) fn create( size: usize, park: Parker, handle_inner: HandleInner, - before_park: Option, - after_unpark: Option, - global_queue_interval: u32, - event_interval: u32, + config: Config, ) -> (Arc, Launch) { let mut cores = Vec::with_capacity(size); let mut remotes = Vec::with_capacity(size); @@ -227,8 +216,6 @@ pub(super) fn create( park: Some(park), metrics: MetricsBatch::new(), rand: FastRand::new(seed()), - global_queue_interval, - event_interval, })); remotes.push(Remote { steal, unpark }); @@ -242,8 +229,7 @@ pub(super) fn create( idle: Idle::new(size), owned: OwnedTasks::new(), shutdown_cores: Mutex::new(vec![]), - before_park, - after_unpark, + config, scheduler_metrics: SchedulerMetrics::new(), worker_metrics: worker_metrics.into_boxed_slice(), }); @@ -468,7 +454,7 @@ impl Context { } fn maintenance(&self, mut core: Box) -> Box { - if core.tick % core.event_interval == 0 { + if core.tick % self.worker.shared.config.event_interval == 0 { // Call `park` with a 0 timeout. This enables the I/O driver, timer, ... // to run without actually putting the thread to sleep. core = self.park_timeout(core, Some(Duration::from_millis(0))); @@ -492,7 +478,7 @@ impl Context { /// Also, we rely on the workstealing algorithm to spread the tasks amongst workers /// after all the IOs get dispatched fn park(&self, mut core: Box) -> Box { - if let Some(f) = &self.worker.shared.before_park { + if let Some(f) = &self.worker.shared.config.before_park { f(); } @@ -511,7 +497,7 @@ impl Context { } } - if let Some(f) = &self.worker.shared.after_unpark { + if let Some(f) = &self.worker.shared.config.after_unpark { f(); } core @@ -555,7 +541,7 @@ impl Core { /// Return the next notified task available to this worker. fn next_task(&mut self, worker: &Worker) -> Option { - if self.tick % self.global_queue_interval == 0 { + if self.tick % worker.shared.config.global_queue_interval == 0 { worker.inject().pop().or_else(|| self.next_local_task()) } else { self.next_local_task().or_else(|| worker.inject().pop()) From 782f9d3b540ef9c6e17e614af92b2c761214e8ad Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Tue, 23 Aug 2022 15:38:25 -0700 Subject: [PATCH 2/2] fmt --- tokio/src/runtime/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tokio/src/runtime/config.rs b/tokio/src/runtime/config.rs index c2a5b63103b..b5ff6eadd8d 100644 --- a/tokio/src/runtime/config.rs +++ b/tokio/src/runtime/config.rs @@ -16,4 +16,4 @@ pub(crate) struct Config { #[cfg(tokio_unstable)] /// How to respond to unhandled task panics. pub(crate) unhandled_panic: crate::runtime::UnhandledPanic, -} \ No newline at end of file +}