Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threadpool: drop incomplete tasks on shutdown #722

Merged
6 commits merged into from Jan 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions tokio-threadpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ categories = ["concurrency", "asynchronous"]
[dependencies]
tokio-executor = { version = "0.1.2", path = "../tokio-executor" }
futures = "0.1.19"
crossbeam = "0.6.0"
carllerche marked this conversation as resolved.
Show resolved Hide resolved
crossbeam-channel = "0.3.3"
crossbeam-deque = "0.6.1"
crossbeam-utils = "0.6.2"
num_cpus = "1.2"
rand = "0.6"
slab = "0.4.1"
log = "0.4"

[dev-dependencies]
Expand Down
2 changes: 2 additions & 0 deletions tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,15 @@

extern crate tokio_executor;

extern crate crossbeam;
extern crate crossbeam_channel;
extern crate crossbeam_deque as deque;
extern crate crossbeam_utils;
#[macro_use]
extern crate futures;
extern crate num_cpus;
extern crate rand;
extern crate slab;

#[macro_use]
extern crate log;
Expand Down
8 changes: 0 additions & 8 deletions tokio-threadpool/src/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,6 @@ pub(crate) struct Pool {
// Stack tracking sleeping workers.
sleep_stack: CachePadded<worker::Stack>,

// Number of workers that haven't reached the final state of shutdown
//
// This is only used to know when to single `shutdown_task` once the
// shutdown process has completed.
pub num_workers: AtomicUsize,

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an unused field from the times before ShutdownTrigger so I'm removing it.

// Worker state
//
// A worker is a thread that is processing the work queue and polling
Expand Down Expand Up @@ -122,7 +116,6 @@ impl Pool {
let ret = Pool {
state: CachePadded::new(AtomicUsize::new(State::new().into())),
sleep_stack: CachePadded::new(worker::Stack::new()),
num_workers: AtomicUsize::new(0),
workers,
queue,
trigger,
Expand Down Expand Up @@ -313,7 +306,6 @@ impl Pool {
}

let trigger = match self.trigger.upgrade() {
// The pool is shutting down.
None => {
// The pool is shutting down.
return;
Expand Down
5 changes: 5 additions & 0 deletions tokio-threadpool/src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ impl Drop for ShutdownTrigger {
// Drain the global task queue.
while self.queue.pop().is_some() {}

// Drop the remaining incomplete tasks and parkers assosicated with workers.
for worker in self.workers.iter() {
worker.shutdown();
}

// Notify the task interested in shutdown.
let mut inner = self.inner.lock().unwrap();
inner.completed = true;
Expand Down
60 changes: 57 additions & 3 deletions tokio-threadpool/src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ use futures::{self, Future, Async};
use futures::executor::{self, Spawn};

use std::{fmt, panic, ptr};
use std::cell::{UnsafeCell};
use std::cell::{Cell, UnsafeCell};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, AtomicPtr};
use std::sync::atomic::Ordering::{AcqRel, Release, Relaxed};
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release, Relaxed};

/// Harness around a future.
///
Expand All @@ -34,6 +34,21 @@ pub(crate) struct Task {
/// Next pointer in the queue of tasks pending blocking capacity.
next_blocking: AtomicPtr<Task>,

/// ID of the worker that polled this task first.
///
/// This field can be a `Cell` because it's only accessed by the worker thread that is
/// executing the task.
///
/// The worker ID is represented by a `u32` rather than `usize` in order to save some space
/// on 64-bit platforms.
pub reg_worker: Cell<Option<u32>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should be UnsafeCell? I'm surprised that Cell works. It must be because Task is !Send?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, Task is Send because it can be stolen and moved between worker threads.

What is important here is that Task has Sync and !Sync parts:

  • state is Sync because it may be concurrently written to from multiple threads, e.g. during notification.
  • future is !Sync because it's only ever used by the thread which is currently executing it. reg_worker is !Sync for the same reason.

It seems to me that reg_worker can be a Cell here just fine, since it's never accessed concurrently. Is my understanding correct? Otherwise, I can switch to UnsafeCell, it's no problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be safely called, it was more of a question of how to write idiomatic unsafe rust code :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very interested in the answer to that question, too... this kind of pattern occurs in plenty of places.


/// The key associated with this task in the `Slab` it was registered in.
///
/// This field can be a `Cell` because it's only accessed by the worker thread that has
/// registered the task.
pub reg_index: Cell<usize>,

/// Store the future at the head of the struct
///
/// The future is dropped immediately when it transitions to Complete
Expand Down Expand Up @@ -61,6 +76,8 @@ impl Task {
state: AtomicUsize::new(State::new().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
reg_worker: Cell::new(None),
reg_index: Cell::new(0),
future: UnsafeCell::new(Some(task_fut)),
}
}
Expand All @@ -75,6 +92,8 @@ impl Task {
state: AtomicUsize::new(State::stub().into()),
blocking: AtomicUsize::new(BlockingState::new().into()),
next_blocking: AtomicPtr::new(ptr::null_mut()),
reg_worker: Cell::new(None),
reg_index: Cell::new(0),
future: UnsafeCell::new(Some(task_fut)),
}
}
Expand Down Expand Up @@ -166,6 +185,41 @@ impl Task {
}
}

/// Aborts this task.
///
/// This is called when the threadpool shuts down and the task has already beed polled but not
/// completed.
pub fn abort(&self) {
use self::State::*;

let mut state = self.state.load(Acquire).into();

loop {
match state {
Idle | Scheduled => {}
Running | Notified | Complete | Aborted => {
// It is assumed that no worker threads are running so the task must be either
// in the idle or scheduled state.
panic!("unexpected state while aborting task: {:?}", state);
}
}

let actual = self.state.compare_and_swap(
state.into(),
Aborted.into(),
AcqRel).into();

if actual == state {
// The future has been aborted. Drop it immediately to free resources and run drop
// handlers.
self.drop_future();
break;
}

state = actual;
}
}

/// Notify the task
pub fn notify(me: Arc<Task>, pool: &Arc<Pool>) {
if me.schedule() {
Expand Down Expand Up @@ -206,7 +260,7 @@ impl Task {
_ => return false,
}
}
Complete | Notified | Scheduled => return false,
Complete | Aborted | Notified | Scheduled => return false,
}
}
}
Expand Down
5 changes: 4 additions & 1 deletion tokio-threadpool/src/task/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ pub(crate) enum State {

/// Task is complete
Complete = 4,

/// Task was aborted because the thread pool has been shut down
Aborted = 5,
carllerche marked this conversation as resolved.
Show resolved Hide resolved
}

// ===== impl State =====
Expand All @@ -39,7 +42,7 @@ impl From<usize> for State {

debug_assert!(
src >= Idle as usize &&
src <= Complete as usize, "actual={}", src);
src <= Aborted as usize, "actual={}", src);

unsafe { ::std::mem::transmute(src) }
}
Expand Down
120 changes: 108 additions & 12 deletions tokio-threadpool/src/worker/entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@ use worker::state::{State, PUSHED_MASK};
use std::cell::UnsafeCell;
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::atomic::Ordering::{AcqRel, Relaxed};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::atomic::Ordering::{Acquire, AcqRel, Relaxed, Release};
use std::time::Duration;

use crossbeam::queue::SegQueue;
use crossbeam_utils::CachePadded;
use deque;
use slab::Slab;

// TODO: None of the fields should be public
//
Expand All @@ -32,10 +35,20 @@ pub(crate) struct WorkerEntry {
stealer: deque::Stealer<Arc<Task>>,

// Thread parker
pub park: UnsafeCell<BoxPark>,
park: UnsafeCell<Option<BoxPark>>,

// Thread unparker
pub unpark: BoxUnpark,
unpark: UnsafeCell<Option<BoxUnpark>>,

// Tasks that have been first polled by this worker, but not completed yet.
running_tasks: UnsafeCell<Slab<Arc<Task>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with using a Slab here is that the memory will grow to hold the max number of tasks that were concurrently running, but never shrink after. Slabs can't easily shrink due to their design.

Do you have thoughts on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to say for me whether that's really a problem in practice or not. This Slab should be pretty compact since Arc<Task> is just a single word. If you're worried about it, I can switch back to Vec and we can shrink the underlying buffer when it gets less than half full.


// Tasks that have been first polled by this worker, but completed by another worker.
remotely_completed_tasks: SegQueue<Arc<Task>>,

// Set to `true` when `remotely_completed_tasks` has tasks that need to be removed from
// `running_tasks`.
needs_drain: AtomicBool,
}

impl WorkerEntry {
Expand All @@ -47,8 +60,11 @@ impl WorkerEntry {
next_sleeper: UnsafeCell::new(0),
worker: w,
stealer: s,
park: UnsafeCell::new(park),
unpark,
park: UnsafeCell::new(Some(park)),
unpark: UnsafeCell::new(Some(unpark)),
running_tasks: UnsafeCell::new(Slab::new()),
remotely_completed_tasks: SegQueue::new(),
needs_drain: AtomicBool::new(false),
}
}

Expand Down Expand Up @@ -100,7 +116,7 @@ impl WorkerEntry {
Sleeping => {
// The worker is currently sleeping, the condition variable must
// be signaled
self.wakeup();
self.unpark();
true
}
Shutdown => false,
Expand Down Expand Up @@ -163,7 +179,7 @@ impl WorkerEntry {
}

// Wakeup the worker
self.wakeup();
self.unpark();
}

/// Pop a task
Expand Down Expand Up @@ -202,14 +218,94 @@ impl WorkerEntry {
}
}

/// Parks the worker thread.
pub fn park(&self) {
if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
park.park().unwrap();
}
}

/// Parks the worker thread for at most `duration`.
pub fn park_timeout(&self, duration: Duration) {
if let Some(park) = unsafe { (*self.park.get()).as_mut() } {
park.park_timeout(duration).unwrap();
}
}

/// Unparks the worker thread.
#[inline]
pub fn push_internal(&self, task: Arc<Task>) {
self.worker.push(task);
pub fn unpark(&self) {
if let Some(park) = unsafe { (*self.unpark.get()).as_ref() } {
park.unpark();
}
}

/// Registers a task in this worker.
///
/// Called when the task is being polled for the first time.
#[inline]
pub fn register_task(&self, task: &Arc<Task>) {
let running_tasks = unsafe { &mut *self.running_tasks.get() };

let key = running_tasks.insert(task.clone());
task.reg_index.set(key);
}

/// Unregisters a task from this worker.
///
/// Called when the task is completed and was previously registered in this worker.
#[inline]
pub fn wakeup(&self) {
self.unpark.unpark();
pub fn unregister_task(&self, task: Arc<Task>) {
let running_tasks = unsafe { &mut *self.running_tasks.get() };
running_tasks.remove(task.reg_index.get());
self.drain_remotely_completed_tasks();
}

/// Unregisters a task from this worker.
///
/// Called when the task is completed by another worker and was previously registered in this
/// worker.
#[inline]
pub fn remotely_complete_task(&self, task: Arc<Task>) {
self.remotely_completed_tasks.push(task);
self.needs_drain.store(true, Release);
}

/// Drops the remaining incomplete tasks and the parker associated with this worker.
///
/// This function is called by the shutdown trigger.
pub fn shutdown(&self) {
self.drain_remotely_completed_tasks();

// Abort all incomplete tasks.
let running_tasks = unsafe { &mut *self.running_tasks.get() };
for (_, task) in running_tasks.iter() {
task.abort();
}
running_tasks.clear();

// Drop the parker.
unsafe {
*self.park.get() = None;
*self.unpark.get() = None;
}
}

/// Drains the `remotely_completed_tasks` queue and removes tasks from `running_tasks`.
#[inline]
fn drain_remotely_completed_tasks(&self) {
if self.needs_drain.compare_and_swap(true, false, Acquire) {
let running_tasks = unsafe { &mut *self.running_tasks.get() };

while let Some(task) = self.remotely_completed_tasks.try_pop() {
running_tasks.remove(task.reg_index.get());
}
}
}

#[inline]
pub fn push_internal(&self, task: Arc<Task>) {
self.worker.push(task);
}

#[inline]
Expand Down
Loading