diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 3a5b83b7aa5..bbc350feab9 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -281,6 +281,8 @@ jobs: run: cargo install cargo-hack - name: check --each-feature run: cargo hack check --all --each-feature -Z avoid-dev-deps + - name: check net,time + run: cargo check -p tokio --no-default-features --features net,time -Z avoid-dev-deps # Try with unstable feature flags - name: check --each-feature --unstable run: cargo hack check --all --each-feature -Z avoid-dev-deps diff --git a/tokio/src/loom/mocked.rs b/tokio/src/loom/mocked.rs index 367d59b43a4..1c4a32dd863 100644 --- a/tokio/src/loom/mocked.rs +++ b/tokio/src/loom/mocked.rs @@ -38,3 +38,8 @@ pub(crate) mod sys { 2 } } + +pub(crate) mod thread { + pub use loom::lazy_static::AccessError; + pub use loom::thread::*; +} diff --git a/tokio/src/loom/std/mod.rs b/tokio/src/loom/std/mod.rs index 0c70bee74eb..1141e4dc07e 100644 --- a/tokio/src/loom/std/mod.rs +++ b/tokio/src/loom/std/mod.rs @@ -102,7 +102,7 @@ pub(crate) mod thread { #[allow(unused_imports)] pub(crate) use std::thread::{ - current, panicking, park, park_timeout, sleep, spawn, Builder, JoinHandle, LocalKey, - Result, Thread, ThreadId, + current, panicking, park, park_timeout, sleep, spawn, AccessError, Builder, JoinHandle, + LocalKey, Result, Thread, ThreadId, }; } diff --git a/tokio/src/park/either.rs b/tokio/src/park/either.rs deleted file mode 100644 index ee02ec158b0..00000000000 --- a/tokio/src/park/either.rs +++ /dev/null @@ -1,74 +0,0 @@ -#![cfg_attr(not(feature = "full"), allow(dead_code))] - -use crate::park::{Park, Unpark}; - -use std::fmt; -use std::time::Duration; - -pub(crate) enum Either { - A(A), - B(B), -} - -impl Park for Either -where - A: Park, - B: Park, -{ - type Unpark = Either; - type Error = Either; - - fn unpark(&self) -> Self::Unpark { - match self { - Either::A(a) => Either::A(a.unpark()), - Either::B(b) => Either::B(b.unpark()), - } - } - - fn park(&mut self) -> Result<(), Self::Error> { - match self { - Either::A(a) => a.park().map_err(Either::A), - Either::B(b) => b.park().map_err(Either::B), - } - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - match self { - Either::A(a) => a.park_timeout(duration).map_err(Either::A), - Either::B(b) => b.park_timeout(duration).map_err(Either::B), - } - } - - fn shutdown(&mut self) { - match self { - Either::A(a) => a.shutdown(), - Either::B(b) => b.shutdown(), - } - } -} - -impl Unpark for Either -where - A: Unpark, - B: Unpark, -{ - fn unpark(&self) { - match self { - Either::A(a) => a.unpark(), - Either::B(b) => b.unpark(), - } - } -} - -impl fmt::Debug for Either -where - A: fmt::Debug, - B: fmt::Debug, -{ - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - Either::A(a) => a.fmt(fmt), - Either::B(b) => b.fmt(fmt), - } - } -} diff --git a/tokio/src/park/mod.rs b/tokio/src/park/mod.rs index 9284702a681..a88b33ac15b 100644 --- a/tokio/src/park/mod.rs +++ b/tokio/src/park/mod.rs @@ -34,84 +34,4 @@ //! * `park_timeout` does the same as `park` but allows specifying a maximum //! time to block the thread for. -cfg_rt! { - pub(crate) mod either; -} - -#[cfg(any(feature = "rt", feature = "sync"))] pub(crate) mod thread; - -use std::fmt::Debug; -use std::sync::Arc; -use std::time::Duration; - -/// Blocks the current thread. -pub(crate) trait Park { - /// Unpark handle type for the `Park` implementation. - type Unpark: Unpark; - - /// Error returned by `park`. - type Error: Debug; - - /// Gets a new `Unpark` handle associated with this `Park` instance. - fn unpark(&self) -> Self::Unpark; - - /// Blocks the current thread unless or until the token is available. - /// - /// A call to `park` does not guarantee that the thread will remain blocked - /// forever, and callers should be prepared for this possibility. This - /// function may wakeup spuriously for any reason. - /// - /// # Panics - /// - /// This function **should** not panic, but ultimately, panics are left as - /// an implementation detail. Refer to the documentation for the specific - /// `Park` implementation. - fn park(&mut self) -> Result<(), Self::Error>; - - /// Parks the current thread for at most `duration`. - /// - /// This function is the same as `park` but allows specifying a maximum time - /// to block the thread for. - /// - /// Same as `park`, there is no guarantee that the thread will remain - /// blocked for any amount of time. Spurious wakeups are permitted for any - /// reason. - /// - /// # Panics - /// - /// This function **should** not panic, but ultimately, panics are left as - /// an implementation detail. Refer to the documentation for the specific - /// `Park` implementation. - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>; - - /// Releases all resources held by the parker for proper leak-free shutdown. - fn shutdown(&mut self); -} - -/// Unblock a thread blocked by the associated `Park` instance. -pub(crate) trait Unpark: Sync + Send + 'static { - /// Unblocks a thread that is blocked by the associated `Park` handle. - /// - /// Calling `unpark` atomically makes available the unpark token, if it is - /// not already available. - /// - /// # Panics - /// - /// This function **should** not panic, but ultimately, panics are left as - /// an implementation detail. Refer to the documentation for the specific - /// `Unpark` implementation. - fn unpark(&self); -} - -impl Unpark for Box { - fn unpark(&self) { - (**self).unpark() - } -} - -impl Unpark for Arc { - fn unpark(&self) { - (**self).unpark() - } -} diff --git a/tokio/src/park/thread.rs b/tokio/src/park/thread.rs index 4db1c1b31b6..abcdcb9c5cd 100644 --- a/tokio/src/park/thread.rs +++ b/tokio/src/park/thread.rs @@ -2,7 +2,6 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; -use crate::park::{Park, Unpark}; use std::sync::atomic::Ordering::SeqCst; use std::time::Duration; @@ -12,8 +11,6 @@ pub(crate) struct ParkThread { inner: Arc, } -pub(crate) type ParkError = (); - /// Unblocks a thread that was blocked by `ParkThread`. #[derive(Clone, Debug)] pub(crate) struct UnparkThread { @@ -47,32 +44,25 @@ impl ParkThread { }), } } -} - -impl Park for ParkThread { - type Unpark = UnparkThread; - type Error = ParkError; - fn unpark(&self) -> Self::Unpark { + pub(crate) fn unpark(&self) -> UnparkThread { let inner = self.inner.clone(); UnparkThread { inner } } - fn park(&mut self) -> Result<(), Self::Error> { + pub(crate) fn park(&mut self) { self.inner.park(); - Ok(()) } - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + pub(crate) fn park_timeout(&mut self, duration: Duration) { // Wasm doesn't have threads, so just sleep. #[cfg(not(tokio_wasm))] self.inner.park_timeout(duration); #[cfg(tokio_wasm)] std::thread::sleep(duration); - Ok(()) } - fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self) { self.inner.shutdown(); } } @@ -212,12 +202,13 @@ impl Default for ParkThread { // ===== impl UnparkThread ===== -impl Unpark for UnparkThread { - fn unpark(&self) { +impl UnparkThread { + pub(crate) fn unpark(&self) { self.inner.unpark(); } } +use crate::loom::thread::AccessError; use std::future::Future; use std::marker::PhantomData; use std::mem; @@ -241,24 +232,38 @@ impl CachedParkThread { } } - pub(crate) fn get_unpark(&self) -> Result { + pub(crate) fn waker(&self) -> Result { + self.unpark().map(|unpark| unpark.into_waker()) + } + + fn unpark(&self) -> Result { self.with_current(|park_thread| park_thread.unpark()) } + pub(crate) fn park(&mut self) { + self.with_current(|park_thread| park_thread.inner.park()) + .unwrap(); + } + + pub(crate) fn park_timeout(&mut self, duration: Duration) { + self.with_current(|park_thread| park_thread.inner.park_timeout(duration)) + .unwrap(); + } + /// Gets a reference to the `ParkThread` handle for this thread. - fn with_current(&self, f: F) -> Result + fn with_current(&self, f: F) -> Result where F: FnOnce(&ParkThread) -> R, { - CURRENT_PARKER.try_with(|inner| f(inner)).map_err(|_| ()) + CURRENT_PARKER.try_with(|inner| f(inner)) } - pub(crate) fn block_on(&mut self, f: F) -> Result { + pub(crate) fn block_on(&mut self, f: F) -> Result { use std::task::Context; use std::task::Poll::Ready; // `get_unpark()` should not return a Result - let waker = self.get_unpark()?.into_waker(); + let waker = self.waker()?; let mut cx = Context::from_waker(&waker); pin!(f); @@ -268,34 +273,11 @@ impl CachedParkThread { return Ok(v); } - self.park()?; + self.park(); } } } -impl Park for CachedParkThread { - type Unpark = UnparkThread; - type Error = ParkError; - - fn unpark(&self) -> Self::Unpark { - self.get_unpark().unwrap() - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park())?; - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.with_current(|park_thread| park_thread.inner.park_timeout(duration))?; - Ok(()) - } - - fn shutdown(&mut self) { - let _ = self.with_current(|park_thread| park_thread.inner.shutdown()); - } -} - impl UnparkThread { pub(crate) fn into_waker(self) -> Waker { unsafe { diff --git a/tokio/src/process/unix/driver.rs b/tokio/src/process/unix/driver.rs index 84dc8fbd027..2e1a36ade73 100644 --- a/tokio/src/process/unix/driver.rs +++ b/tokio/src/process/unix/driver.rs @@ -2,11 +2,10 @@ //! Process driver. -use crate::park::Park; use crate::process::unix::GlobalOrphanQueue; +use crate::runtime::io::Handle; use crate::signal::unix::driver::{Driver as SignalDriver, Handle as SignalHandle}; -use std::io; use std::time::Duration; /// Responsible for cleaning up orphaned child processes on Unix platforms. @@ -28,31 +27,22 @@ impl Driver { signal_handle, } } -} - -// ===== impl Park for Driver ===== - -impl Park for Driver { - type Unpark = ::Unpark; - type Error = io::Error; - fn unpark(&self) -> Self::Unpark { - self.park.unpark() + pub(crate) fn handle(&self) -> Handle { + self.park.io_handle() } - fn park(&mut self) -> Result<(), Self::Error> { - self.park.park()?; + pub(crate) fn park(&mut self) { + self.park.park(); GlobalOrphanQueue::reap_orphans(&self.signal_handle); - Ok(()) } - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.park.park_timeout(duration)?; + pub(crate) fn park_timeout(&mut self, duration: Duration) { + self.park.park_timeout(duration); GlobalOrphanQueue::reap_orphans(&self.signal_handle); - Ok(()) } - fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self) { self.park.shutdown() } } diff --git a/tokio/src/runtime/driver.rs b/tokio/src/runtime/driver.rs index 74ee772506e..cfb3bd4922e 100644 --- a/tokio/src/runtime/driver.rs +++ b/tokio/src/runtime/driver.rs @@ -1,6 +1,5 @@ //! Abstracts out the entire chain of runtime sub-drivers into common types. -use crate::park::thread::ParkThread; -use crate::park::Park; +use crate::park::thread::{ParkThread, UnparkThread}; use std::io; use std::time::Duration; @@ -8,13 +7,21 @@ use std::time::Duration; // ===== io driver ===== cfg_io_driver! { - type IoDriver = crate::runtime::io::Driver; - type IoStack = crate::park::either::Either; + pub(crate) type IoDriver = crate::runtime::io::Driver; pub(crate) type IoHandle = Option; - fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { - use crate::park::either::Either; + #[derive(Debug)] + pub(crate) enum IoStack { + Enabled(ProcessDriver), + Disabled(ParkThread), + } + pub(crate) enum IoUnpark { + Enabled(crate::runtime::io::Handle), + Disabled(UnparkThread), + } + + fn create_io_stack(enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { #[cfg(loom)] assert!(!enabled); @@ -25,18 +32,58 @@ cfg_io_driver! { let (signal_driver, signal_handle) = create_signal_driver(io_driver)?; let process_driver = create_process_driver(signal_driver); - (Either::A(process_driver), Some(io_handle), signal_handle) + (IoStack::Enabled(process_driver), Some(io_handle), signal_handle) } else { - (Either::B(ParkThread::new()), Default::default(), Default::default()) + (IoStack::Disabled(ParkThread::new()), Default::default(), Default::default()) }; Ok(ret) } + + impl IoStack { + pub(crate) fn unpark(&self) -> IoUnpark { + match self { + IoStack::Enabled(v) => IoUnpark::Enabled(v.handle()), + IoStack::Disabled(v) => IoUnpark::Disabled(v.unpark()), + } + } + + pub(crate) fn park(&mut self) { + match self { + IoStack::Enabled(v) => v.park(), + IoStack::Disabled(v) => v.park(), + } + } + + pub(crate) fn park_timeout(&mut self, duration: Duration) { + match self { + IoStack::Enabled(v) => v.park_timeout(duration), + IoStack::Disabled(v) => v.park_timeout(duration), + } + } + + pub(crate) fn shutdown(&mut self) { + match self { + IoStack::Enabled(v) => v.shutdown(), + IoStack::Disabled(v) => v.shutdown(), + } + } + } + + impl IoUnpark { + pub(crate) fn unpark(&self) { + match self { + IoUnpark::Enabled(v) => v.unpark(), + IoUnpark::Disabled(v) => v.unpark(), + } + } + } } cfg_not_io_driver! { pub(crate) type IoHandle = (); - type IoStack = ParkThread; + pub(crate) type IoStack = ParkThread; + pub(crate) type IoUnpark = UnparkThread; fn create_io_stack(_enabled: bool) -> io::Result<(IoStack, IoHandle, SignalHandle)> { Ok((ParkThread::new(), Default::default(), Default::default())) @@ -98,7 +145,16 @@ cfg_not_process_driver! { // ===== time driver ===== cfg_time! { - type TimeDriver = crate::park::either::Either, IoStack>; + #[derive(Debug)] + pub(crate) enum TimeDriver { + Enabled(crate::runtime::time::Driver), + Disabled(IoStack), + } + + pub(crate) enum TimerUnpark { + Enabled(crate::runtime::time::TimerUnpark), + Disabled(IoUnpark), + } pub(crate) type Clock = crate::time::Clock; pub(crate) type TimeHandle = Option; @@ -112,21 +168,62 @@ cfg_time! { io_stack: IoStack, clock: Clock, ) -> (TimeDriver, TimeHandle) { - use crate::park::either::Either; - if enable { let driver = crate::runtime::time::Driver::new(io_stack, clock); let handle = driver.handle(); - (Either::A(driver), Some(handle)) + (TimeDriver::Enabled(driver), Some(handle)) } else { - (Either::B(io_stack), None) + (TimeDriver::Disabled(io_stack), None) + } + } + + impl TimeDriver { + pub(crate) fn unpark(&self) -> TimerUnpark { + match self { + TimeDriver::Enabled(v) => TimerUnpark::Enabled(v.unpark()), + TimeDriver::Disabled(v) => TimerUnpark::Disabled(v.unpark()), + } + } + + pub(crate) fn park(&mut self) { + match self { + TimeDriver::Enabled(v) => v.park(), + TimeDriver::Disabled(v) => v.park(), + } + } + + pub(crate) fn park_timeout(&mut self, duration: Duration) { + match self { + TimeDriver::Enabled(v) => v.park_timeout(duration), + TimeDriver::Disabled(v) => v.park_timeout(duration), + } + } + + // TODO: tokio-rs/tokio#4990, should the `current_thread` scheduler call this? + cfg_rt_multi_thread! { + pub(crate) fn shutdown(&mut self) { + match self { + TimeDriver::Enabled(v) => v.shutdown(), + TimeDriver::Disabled(v) => v.shutdown(), + } + } + } + } + + impl TimerUnpark { + pub(crate) fn unpark(&self) { + match self { + TimerUnpark::Enabled(v) => v.unpark(), + TimerUnpark::Disabled(v) => v.unpark(), + } } } } cfg_not_time! { type TimeDriver = IoStack; + type TimerUnpark = IoUnpark; pub(crate) type Clock = (); pub(crate) type TimeHandle = (); @@ -151,6 +248,8 @@ pub(crate) struct Driver { inner: TimeDriver, } +pub(crate) type Unpark = TimerUnpark; + pub(crate) struct Resources { pub(crate) io_handle: IoHandle, pub(crate) signal_handle: SignalHandle, @@ -184,25 +283,23 @@ impl Driver { }, )) } -} -impl Park for Driver { - type Unpark = ::Unpark; - type Error = ::Error; - - fn unpark(&self) -> Self::Unpark { + pub(crate) fn unpark(&self) -> TimerUnpark { self.inner.unpark() } - fn park(&mut self) -> Result<(), Self::Error> { + pub(crate) fn park(&mut self) { self.inner.park() } - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + pub(crate) fn park_timeout(&mut self, duration: Duration) { self.inner.park_timeout(duration) } - fn shutdown(&mut self) { - self.inner.shutdown() + // TODO: tokio-rs/tokio#4990, should the `current_thread` scheduler call this? + cfg_rt_multi_thread! { + pub(crate) fn shutdown(&mut self) { + self.inner.shutdown() + } } } diff --git a/tokio/src/runtime/enter.rs b/tokio/src/runtime/enter.rs index 6e4e37d70ff..66b17868b95 100644 --- a/tokio/src/runtime/enter.rs +++ b/tokio/src/runtime/enter.rs @@ -25,8 +25,6 @@ pub(crate) struct Enter { } cfg_rt! { - use crate::park::thread::ParkError; - use std::time::Duration; /// Marks the current thread as being within the dynamic extent of an @@ -139,10 +137,12 @@ cfg_rt_multi_thread! { } cfg_rt! { + use crate::loom::thread::AccessError; + impl Enter { /// Blocks the thread on the specified future, returning the value with /// which that future completes. - pub(crate) fn block_on(&mut self, f: F) -> Result + pub(crate) fn block_on(&mut self, f: F) -> Result where F: std::future::Future, { @@ -156,18 +156,17 @@ cfg_rt! { /// /// If the future completes before `timeout`, the result is returned. If /// `timeout` elapses, then `Err` is returned. - pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result + pub(crate) fn block_on_timeout(&mut self, f: F, timeout: Duration) -> Result where F: std::future::Future, { - use crate::park::Park; use crate::park::thread::CachedParkThread; use std::task::Context; use std::task::Poll::Ready; use std::time::Instant; let mut park = CachedParkThread::new(); - let waker = park.get_unpark()?.into_waker(); + let waker = park.waker().map_err(|_| ())?; let mut cx = Context::from_waker(&waker); pin!(f); @@ -184,7 +183,7 @@ cfg_rt! { return Err(()); } - park.park_timeout(when - now)?; + park.park_timeout(when - now); } } } diff --git a/tokio/src/runtime/io/mod.rs b/tokio/src/runtime/io/mod.rs index d447b87eba4..ef335723b19 100644 --- a/tokio/src/runtime/io/mod.rs +++ b/tokio/src/runtime/io/mod.rs @@ -10,7 +10,6 @@ mod metrics; use crate::io::interest::Interest; use crate::io::ready::Ready; -use crate::park::{Park, Unpark}; use crate::util::slab::{self, Slab}; use crate::{loom::sync::RwLock, util::bit}; @@ -145,7 +144,35 @@ impl Driver { } } - fn turn(&mut self, max_wait: Option) -> io::Result<()> { + // TODO: remove this in a later refactor + cfg_not_rt! { + cfg_time! { + pub(crate) fn unpark(&self) -> Handle { + self.handle() + } + } + } + + pub(crate) fn park(&mut self) { + self.turn(None); + } + + pub(crate) fn park_timeout(&mut self, duration: Duration) { + self.turn(Some(duration)); + } + + pub(crate) fn shutdown(&mut self) { + if self.inner.shutdown() { + self.resources.for_each(|io| { + // If a task is waiting on the I/O resource, notify it. The task + // will then attempt to use the I/O resource and fail due to the + // driver being shutdown. And shutdown will clear all wakers. + io.shutdown(); + }); + } + } + + fn turn(&mut self, max_wait: Option) { // How often to call `compact()` on the resource slab const COMPACT_INTERVAL: u8 = 255; @@ -167,7 +194,7 @@ impl Driver { // In case of wasm32_wasi this error happens, when trying to poll without subscriptions // just return from the park, as there would be nothing, which wakes us up. } - Err(e) => return Err(e), + Err(e) => panic!("unexpected error when polling the I/O driver: {:?}", e), } // Process all the events that came in, dispatching appropriately @@ -184,8 +211,6 @@ impl Driver { self.inner.metrics.incr_ready_count_by(ready_count); self.events = Some(events); - - Ok(()) } fn dispatch(&mut self, token: mio::Token, ready: Ready) { @@ -215,36 +240,6 @@ impl Drop for Driver { } } -impl Park for Driver { - type Unpark = Handle; - type Error = io::Error; - - fn unpark(&self) -> Self::Unpark { - self.handle() - } - - fn park(&mut self) -> io::Result<()> { - self.turn(None)?; - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> io::Result<()> { - self.turn(Some(duration))?; - Ok(()) - } - - fn shutdown(&mut self) { - if self.inner.shutdown() { - self.resources.for_each(|io| { - // If a task is waiting on the I/O resource, notify it. The task - // will then attempt to use the I/O resource and fail due to the - // driver being shutdown. And shutdown will clear all wakers. - io.shutdown(); - }); - } - } -} - impl fmt::Debug for Driver { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Driver") @@ -303,18 +298,12 @@ impl Handle { /// after this method has been called. If the reactor is not currently /// blocked in `turn`, then the next call to `turn` will not block and /// return immediately. - fn wakeup(&self) { + pub(crate) fn unpark(&self) { #[cfg(not(tokio_wasi))] self.inner.waker.wake().expect("failed to wake I/O driver"); } } -impl Unpark for Handle { - fn unpark(&self) { - self.wakeup(); - } -} - impl fmt::Debug for Handle { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "Handle") diff --git a/tokio/src/runtime/scheduler/current_thread.rs b/tokio/src/runtime/scheduler/current_thread.rs index 59472cf240e..2bc19fef048 100644 --- a/tokio/src/runtime/scheduler/current_thread.rs +++ b/tokio/src/runtime/scheduler/current_thread.rs @@ -1,9 +1,8 @@ use crate::future::poll_fn; use crate::loom::sync::atomic::AtomicBool; use crate::loom::sync::{Arc, Mutex}; -use crate::park::{Park, Unpark}; use crate::runtime::context::EnterGuard; -use crate::runtime::driver::Driver; +use crate::runtime::driver::{Driver, Unpark}; use crate::runtime::task::{self, JoinHandle, OwnedTasks, Schedule, Task}; use crate::runtime::{Config, HandleInner}; use crate::runtime::{MetricsBatch, SchedulerMetrics, WorkerMetrics}; @@ -77,7 +76,7 @@ struct Shared { owned: OwnedTasks>, /// Unpark the blocked thread. - unpark: ::Unpark, + unpark: Unpark, /// Indicates whether the blocked on thread was woken. woken: AtomicBool, @@ -305,7 +304,7 @@ impl Context { core.metrics.submit(&core.spawner.shared.worker_metrics); let (c, _) = self.enter(core, || { - driver.park().expect("failed to park"); + driver.park(); }); core = c; @@ -330,9 +329,7 @@ impl Context { core.metrics.submit(&core.spawner.shared.worker_metrics); let (mut core, _) = self.enter(core, || { - driver - .park_timeout(Duration::from_millis(0)) - .expect("failed to park"); + driver.park_timeout(Duration::from_millis(0)); }); core.driver = Some(driver); diff --git a/tokio/src/runtime/scheduler/multi_thread/park.rs b/tokio/src/runtime/scheduler/multi_thread/park.rs index 033b9f20bee..b3ef15c1f80 100644 --- a/tokio/src/runtime/scheduler/multi_thread/park.rs +++ b/tokio/src/runtime/scheduler/multi_thread/park.rs @@ -5,8 +5,7 @@ use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::{Arc, Condvar, Mutex}; use crate::loom::thread; -use crate::park::{Park, Unpark}; -use crate::runtime::driver::Driver; +use crate::runtime::driver::{Driver, Unpark}; use crate::util::TryLock; use std::sync::atomic::Ordering::SeqCst; @@ -45,7 +44,7 @@ struct Shared { driver: TryLock, /// Unpark handle - handle: ::Unpark, + handle: Unpark, } impl Parker { @@ -64,54 +63,46 @@ impl Parker { }), } } -} - -impl Clone for Parker { - fn clone(&self) -> Parker { - Parker { - inner: Arc::new(Inner { - state: AtomicUsize::new(EMPTY), - mutex: Mutex::new(()), - condvar: Condvar::new(), - shared: self.inner.shared.clone(), - }), - } - } -} - -impl Park for Parker { - type Unpark = Unparker; - type Error = (); - fn unpark(&self) -> Unparker { + pub(crate) fn unpark(&self) -> Unparker { Unparker { inner: self.inner.clone(), } } - fn park(&mut self) -> Result<(), Self::Error> { + pub(crate) fn park(&mut self) { self.inner.park(); - Ok(()) } - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { + pub(crate) fn park_timeout(&mut self, duration: Duration) { // Only parking with zero is supported... assert_eq!(duration, Duration::from_millis(0)); if let Some(mut driver) = self.inner.shared.driver.try_lock() { - driver.park_timeout(duration).map_err(|_| ()) - } else { - Ok(()) + driver.park_timeout(duration) } } - fn shutdown(&mut self) { + pub(crate) fn shutdown(&mut self) { self.inner.shutdown(); } } -impl Unpark for Unparker { - fn unpark(&self) { +impl Clone for Parker { + fn clone(&self) -> Parker { + Parker { + inner: Arc::new(Inner { + state: AtomicUsize::new(EMPTY), + mutex: Mutex::new(()), + condvar: Condvar::new(), + shared: self.inner.shared.clone(), + }), + } + } +} + +impl Unparker { + pub(crate) fn unpark(&self) { self.inner.unpark(); } } @@ -201,8 +192,7 @@ impl Inner { Err(actual) => panic!("inconsistent park state; actual = {}", actual), } - // TODO: don't unwrap - driver.park().unwrap(); + driver.park(); match self.state.swap(EMPTY, SeqCst) { NOTIFIED => {} // got a notification, hurray! diff --git a/tokio/src/runtime/scheduler/multi_thread/worker.rs b/tokio/src/runtime/scheduler/multi_thread/worker.rs index 6e2f4fed87f..44615d26215 100644 --- a/tokio/src/runtime/scheduler/multi_thread/worker.rs +++ b/tokio/src/runtime/scheduler/multi_thread/worker.rs @@ -60,7 +60,6 @@ use crate::coop; use crate::future::Future; use crate::loom::rand::seed; use crate::loom::sync::{Arc, Mutex}; -use crate::park::{Park, Unpark}; use crate::runtime; use crate::runtime::enter::EnterContext; use crate::runtime::scheduler::multi_thread::{queue, Idle, Parker, Unparker}; @@ -512,9 +511,9 @@ impl Context { // Park thread if let Some(timeout) = duration { - park.park_timeout(timeout).expect("park failed"); + park.park_timeout(timeout); } else { - park.park().expect("park failed"); + park.park(); } // Remove `core` from context diff --git a/tokio/src/runtime/time/mod.rs b/tokio/src/runtime/time/mod.rs index 0d898470ea8..d7b7d71bf93 100644 --- a/tokio/src/runtime/time/mod.rs +++ b/tokio/src/runtime/time/mod.rs @@ -20,10 +20,22 @@ mod wheel; use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::{Arc, Mutex}; -use crate::park::{Park, Unpark}; use crate::time::error::Error; use crate::time::{Clock, Duration}; +// This duplication should be cleaned up in a later refactor +cfg_io_driver! { + cfg_rt! { + use crate::runtime::driver::{IoStack, IoUnpark}; + } + cfg_not_rt! { + use crate::runtime::io::{Driver as IoStack, Handle as IoUnpark}; + } +} +cfg_not_io_driver! { + use crate::park::thread::{ParkThread as IoStack, UnparkThread as IoUnpark}; +} + use std::fmt; use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; @@ -83,7 +95,7 @@ use std::{num::NonZeroU64, ptr::NonNull, task::Waker}; /// [timeout]: crate::time::Timeout /// [interval]: crate::time::Interval #[derive(Debug)] -pub(crate) struct Driver { +pub(crate) struct Driver { /// Timing backend in use. time_source: TimeSource, @@ -91,7 +103,7 @@ pub(crate) struct Driver { handle: Handle, /// Parker to delegate to. - park: P, + park: IoStack, // When `true`, a call to `park_timeout` should immediately return and time // should not advance. One reason for this to be `true` is if the task @@ -112,7 +124,7 @@ struct Inner { pub(super) is_shutdown: AtomicBool, /// Unparker that can be used to wake the time driver. - unpark: Box, + unpark: IoUnpark, } /// Time state shared which must be protected by a `Mutex` @@ -132,18 +144,15 @@ struct InnerState { // ===== impl Driver ===== -impl

Driver

-where - P: Park + 'static, -{ +impl Driver { /// Creates a new `Driver` instance that uses `park` to block the current /// thread and `time_source` to get the current time and convert to ticks. /// /// Specifying the source of time is useful when testing. - pub(crate) fn new(park: P, clock: Clock) -> Driver

{ + pub(crate) fn new(park: IoStack, clock: Clock) -> Driver { let time_source = TimeSource::new(clock); - let inner = Inner::new(time_source.clone(), Box::new(park.unpark())); + let inner = Inner::new(time_source.clone(), park.unpark()); Driver { time_source, @@ -164,7 +173,33 @@ where self.handle.clone() } - fn park_internal(&mut self, limit: Option) -> Result<(), P::Error> { + pub(crate) fn unpark(&self) -> TimerUnpark { + TimerUnpark::new(self) + } + + pub(crate) fn park(&mut self) { + self.park_internal(None) + } + + pub(crate) fn park_timeout(&mut self, duration: Duration) { + self.park_internal(Some(duration)) + } + + pub(crate) fn shutdown(&mut self) { + if self.handle.is_shutdown() { + return; + } + + self.handle.get().is_shutdown.store(true, Ordering::SeqCst); + + // Advance time forward to the end of time. + + self.handle.process_at_time(u64::MAX); + + self.park.shutdown(); + } + + fn park_internal(&mut self, limit: Option) { let mut lock = self.handle.get().state.lock(); assert!(!self.handle.is_shutdown()); @@ -188,32 +223,30 @@ where duration = std::cmp::min(limit, duration); } - self.park_timeout(duration)?; + self.park_thread_timeout(duration); } else { - self.park.park_timeout(Duration::from_secs(0))?; + self.park.park_timeout(Duration::from_secs(0)); } } None => { if let Some(duration) = limit { - self.park_timeout(duration)?; + self.park_thread_timeout(duration); } else { - self.park.park()?; + self.park.park(); } } } // Process pending timers after waking up self.handle.process(); - - Ok(()) } cfg_test_util! { - fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { + fn park_thread_timeout(&mut self, duration: Duration) { let clock = &self.time_source.clock; if clock.is_paused() { - self.park.park_timeout(Duration::from_secs(0))?; + self.park.park_timeout(Duration::from_secs(0)); // If the time driver was woken, then the park completed // before the "duration" elapsed (usually caused by a @@ -224,10 +257,8 @@ where clock.advance(duration); } } else { - self.park.park_timeout(duration)?; + self.park.park_timeout(duration); } - - Ok(()) } fn did_wake(&self) -> bool { @@ -236,8 +267,8 @@ where } cfg_not_test_util! { - fn park_timeout(&mut self, duration: Duration) -> Result<(), P::Error> { - self.park.park_timeout(duration) + fn park_thread_timeout(&mut self, duration: Duration) { + self.park.park_timeout(duration); } } } @@ -383,58 +414,21 @@ impl Handle { } } -impl

Park for Driver

-where - P: Park + 'static, -{ - type Unpark = TimerUnpark

; - type Error = P::Error; - - fn unpark(&self) -> Self::Unpark { - TimerUnpark::new(self) - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.park_internal(None) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.park_internal(Some(duration)) - } - - fn shutdown(&mut self) { - if self.handle.is_shutdown() { - return; - } - - self.handle.get().is_shutdown.store(true, Ordering::SeqCst); - - // Advance time forward to the end of time. - - self.handle.process_at_time(u64::MAX); - - self.park.shutdown(); - } -} - -impl

Drop for Driver

-where - P: Park + 'static, -{ +impl Drop for Driver { fn drop(&mut self) { self.shutdown(); } } -pub(crate) struct TimerUnpark { - inner: P::Unpark, +pub(crate) struct TimerUnpark { + inner: IoUnpark, #[cfg(feature = "test-util")] did_wake: Arc, } -impl TimerUnpark

{ - fn new(driver: &Driver

) -> TimerUnpark

{ +impl TimerUnpark { + fn new(driver: &Driver) -> TimerUnpark { TimerUnpark { inner: driver.park.unpark(), @@ -442,10 +436,8 @@ impl TimerUnpark

{ did_wake: driver.did_wake.clone(), } } -} -impl Unpark for TimerUnpark

{ - fn unpark(&self) { + pub(crate) fn unpark(&self) { #[cfg(feature = "test-util")] self.did_wake.store(true, Ordering::SeqCst); @@ -456,7 +448,7 @@ impl Unpark for TimerUnpark

{ // ===== impl Inner ===== impl Inner { - pub(self) fn new(time_source: TimeSource, unpark: Box) -> Self { + pub(self) fn new(time_source: TimeSource, unpark: IoUnpark) -> Self { Inner { state: Mutex::new(InnerState { time_source, diff --git a/tokio/src/runtime/time/tests/mod.rs b/tokio/src/runtime/time/tests/mod.rs index ff69c8407fa..31ed198e2f7 100644 --- a/tokio/src/runtime/time/tests/mod.rs +++ b/tokio/src/runtime/time/tests/mod.rs @@ -5,25 +5,13 @@ use std::{task::Context, time::Duration}; #[cfg(not(loom))] use futures::task::noop_waker_ref; +use crate::loom::sync::atomic::{AtomicBool, Ordering}; use crate::loom::sync::Arc; use crate::loom::thread; -use crate::{ - loom::sync::atomic::{AtomicBool, Ordering}, - park::Unpark, -}; +use crate::runtime::driver::IoUnpark; use super::{Handle, TimerEntry}; -struct MockUnpark {} -impl Unpark for MockUnpark { - fn unpark(&self) {} -} -impl MockUnpark { - fn mock() -> Box { - Box::new(Self {}) - } -} - fn block_on(f: impl std::future::Future) -> T { #[cfg(loom)] return loom::future::block_on(f); @@ -45,13 +33,25 @@ fn model(f: impl Fn() + Send + Sync + 'static) { f(); } +#[cfg(not(tokio_wasm))] +fn unpark() -> IoUnpark { + use crate::park::thread::ParkThread; + IoUnpark::Disabled(ParkThread::new().unpark()) +} + +#[cfg(tokio_wasm)] +fn unpark() -> IoUnpark { + use crate::park::thread::ParkThread; + ParkThread::new().unpark() +} + #[test] fn single_timer() { model(|| { let clock = crate::time::Clock::new(true, false); let time_source = super::TimeSource::new(clock.clone()); - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let inner = super::Inner::new(time_source.clone(), unpark()); let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); @@ -82,7 +82,7 @@ fn drop_timer() { let clock = crate::time::Clock::new(true, false); let time_source = super::TimeSource::new(clock.clone()); - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let inner = super::Inner::new(time_source.clone(), unpark()); let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); @@ -113,7 +113,7 @@ fn change_waker() { let clock = crate::time::Clock::new(true, false); let time_source = super::TimeSource::new(clock.clone()); - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let inner = super::Inner::new(time_source.clone(), unpark()); let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); @@ -148,7 +148,7 @@ fn reset_future() { let clock = crate::time::Clock::new(true, false); let time_source = super::TimeSource::new(clock.clone()); - let inner = super::Inner::new(time_source.clone(), MockUnpark::mock()); + let inner = super::Inner::new(time_source.clone(), unpark()); let handle = Handle::new(Arc::new(inner)); let handle_ = handle.clone(); @@ -206,7 +206,7 @@ fn poll_process_levels() { let time_source = super::TimeSource::new(clock.clone()); - let inner = super::Inner::new(time_source, MockUnpark::mock()); + let inner = super::Inner::new(time_source, unpark()); let handle = Handle::new(Arc::new(inner)); let mut entries = vec![]; @@ -247,7 +247,7 @@ fn poll_process_levels_targeted() { let time_source = super::TimeSource::new(clock.clone()); - let inner = super::Inner::new(time_source, MockUnpark::mock()); + let inner = super::Inner::new(time_source, unpark()); let handle = Handle::new(Arc::new(inner)); let e1 = TimerEntry::new(&handle, clock.now() + Duration::from_millis(193)); diff --git a/tokio/src/signal/unix/driver.rs b/tokio/src/signal/unix/driver.rs index 54959e04df5..e30519350af 100644 --- a/tokio/src/signal/unix/driver.rs +++ b/tokio/src/signal/unix/driver.rs @@ -4,12 +4,11 @@ use crate::io::interest::Interest; use crate::io::PollEvented; -use crate::park::Park; -use crate::runtime::io::Driver as IoDriver; +use crate::runtime::io; use crate::signal::registry::globals; use mio::net::UnixStream; -use std::io::{self, Read}; +use std::io::{self as std_io, Read}; use std::ptr; use std::sync::{Arc, Weak}; use std::task::{Context, Poll, RawWaker, RawWakerVTable, Waker}; @@ -23,7 +22,7 @@ use std::time::Duration; #[derive(Debug)] pub(crate) struct Driver { /// Thread parker. The `Driver` park implementation delegates to this. - park: IoDriver, + park: io::Driver, /// A pipe for receiving wake events from the signal handler receiver: PollEvented, @@ -44,7 +43,7 @@ pub(super) struct Inner(()); impl Driver { /// Creates a new signal `Driver` instance that delegates wakeups to `park`. - pub(crate) fn new(park: IoDriver) -> io::Result { + pub(crate) fn new(park: io::Driver) -> std_io::Result { use std::mem::ManuallyDrop; use std::os::unix::io::{AsRawFd, FromRawFd}; @@ -93,6 +92,24 @@ impl Driver { } } + pub(crate) fn io_handle(&self) -> io::Handle { + self.park.handle() + } + + pub(crate) fn park(&mut self) { + self.park.park(); + self.process(); + } + + pub(crate) fn park_timeout(&mut self, duration: Duration) { + self.park.park_timeout(duration); + self.process(); + } + + pub(crate) fn shutdown(&mut self) { + self.park.shutdown() + } + fn process(&self) { // Check if the pipe is ready to read and therefore has "woken" us up // @@ -114,7 +131,7 @@ impl Driver { match (&*self.receiver).read(&mut buf) { Ok(0) => panic!("EOF on self-pipe"), Ok(_) => continue, // Keep reading - Err(e) if e.kind() == io::ErrorKind::WouldBlock => break, + Err(e) if e.kind() == std_io::ErrorKind::WouldBlock => break, Err(e) => panic!("Bad read on self-pipe: {}", e), } } @@ -134,41 +151,17 @@ unsafe fn noop_clone(_data: *const ()) -> RawWaker { unsafe fn noop(_data: *const ()) {} -// ===== impl Park for Driver ===== - -impl Park for Driver { - type Unpark = ::Unpark; - type Error = io::Error; - - fn unpark(&self) -> Self::Unpark { - self.park.unpark() - } - - fn park(&mut self) -> Result<(), Self::Error> { - self.park.park()?; - self.process(); - Ok(()) - } - - fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.park.park_timeout(duration)?; - self.process(); - Ok(()) - } - - fn shutdown(&mut self) { - self.park.shutdown() - } -} - // ===== impl Handle ===== impl Handle { - pub(super) fn check_inner(&self) -> io::Result<()> { + pub(super) fn check_inner(&self) -> std_io::Result<()> { if self.inner.strong_count() > 0 { Ok(()) } else { - Err(io::Error::new(io::ErrorKind::Other, "signal driver gone")) + Err(std_io::Error::new( + std_io::ErrorKind::Other, + "signal driver gone", + )) } } } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index a10ffb7d797..f0e9dc27f33 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -3,7 +3,6 @@ use crate::loom::future::AtomicWaker; use crate::loom::sync::atomic::AtomicUsize; use crate::loom::sync::Arc; use crate::park::thread::CachedParkThread; -use crate::park::Park; use crate::sync::mpsc::error::TryRecvError; use crate::sync::mpsc::list; use crate::sync::notify::Notify; @@ -326,13 +325,13 @@ impl Rx { // Park the thread until the problematic send has completed. let mut park = CachedParkThread::new(); - let waker = park.unpark().into_waker(); + let waker = park.waker().unwrap(); loop { self.inner.rx_waker.register_by_ref(&waker); // It is possible that the problematic send has now completed, // so we have to check for messages again. try_recv!(); - park.park().expect("park failed"); + park.park(); } }) }