From e156d001c6577593295f6eee417ea8758fbc4a84 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 23 Jul 2014 22:48:04 -0700 Subject: [PATCH 1/3] rustrt: Allow dropping a brand-new Task When a new task fails to spawn, it triggers a task failure of the spawning task. This ends up causing runtime aborts today because of the destructor bomb in the Task structure. The bomb doesn't actually need to go off until *after* the task has run at least once. This now prevents a runtime abort when a native thread fails to spawn. --- src/libgreen/sched.rs | 2 +- src/librustrt/local.rs | 4 ++-- src/librustrt/task.rs | 37 ++++++++++++++++++++++++++++++------- 3 files changed, 33 insertions(+), 10 deletions(-) diff --git a/src/libgreen/sched.rs b/src/libgreen/sched.rs index 38bb6e355a771..b9144047df5c0 100644 --- a/src/libgreen/sched.rs +++ b/src/libgreen/sched.rs @@ -219,7 +219,7 @@ impl Scheduler { let message = stask.sched.get_mut_ref().message_queue.pop(); rtassert!(match message { msgq::Empty => true, _ => false }); - stask.task.get_mut_ref().destroyed = true; + stask.task.take().unwrap().drop(); } // This does not return a scheduler, as the scheduler is placed diff --git a/src/librustrt/local.rs b/src/librustrt/local.rs index bdb1c60b6d6f8..e2a5eef0d99e1 100644 --- a/src/librustrt/local.rs +++ b/src/librustrt/local.rs @@ -125,8 +125,8 @@ mod test { }).join(); } - fn cleanup_task(mut t: Box) { - t.destroyed = true; + fn cleanup_task(t: Box) { + t.drop(); } } diff --git a/src/librustrt/task.rs b/src/librustrt/task.rs index d27a4f25b4e70..0f4d72c9b3268 100644 --- a/src/librustrt/task.rs +++ b/src/librustrt/task.rs @@ -100,12 +100,21 @@ pub struct Task { pub storage: LocalStorage, pub unwinder: Unwinder, pub death: Death, - pub destroyed: bool, pub name: Option, + state: TaskState, imp: Option>, } +// Once a task has entered the `Armed` state it must be destroyed via `drop`, +// and no other method. This state is used to track this transition. +#[deriving(PartialEq)] +enum TaskState { + New, + Armed, + Destroyed, +} + pub struct TaskOpts { /// Invoke this procedure with the result of the task when it finishes. pub on_exit: Option, @@ -159,7 +168,7 @@ impl Task { storage: LocalStorage(None), unwinder: Unwinder::new(), death: Death::new(), - destroyed: false, + state: New, name: None, imp: None, } @@ -203,7 +212,7 @@ impl Task { /// }).destroy(); /// # } /// ``` - pub fn run(self: Box, f: ||) -> Box { + pub fn run(mut self: Box, f: ||) -> Box { assert!(!self.is_destroyed(), "cannot re-use a destroyed task"); // First, make sure that no one else is in TLS. This does not allow @@ -212,6 +221,7 @@ impl Task { if Local::exists(None::) { fail!("cannot run a task recursively inside another"); } + self.state = Armed; Local::put(self); // There are two primary reasons that general try/catch is unsafe. The @@ -333,12 +343,12 @@ impl Task { // Now that we're done, we remove the task from TLS and flag it for // destruction. let mut task: Box = Local::take(); - task.destroyed = true; + task.state = Destroyed; return task; } /// Queries whether this can be destroyed or not. - pub fn is_destroyed(&self) -> bool { self.destroyed } + pub fn is_destroyed(&self) -> bool { self.state == Destroyed } /// Inserts a runtime object into this task, transferring ownership to the /// task. It is illegal to replace a previous runtime object in this task @@ -453,12 +463,20 @@ impl Task { pub fn can_block(&self) -> bool { self.imp.get_ref().can_block() } + + /// Consume this task, flagging it as a candidate for destruction. + /// + /// This function is required to be invoked to destroy a task. A task + /// destroyed through a normal drop will abort. + pub fn drop(mut self) { + self.state = Destroyed; + } } impl Drop for Task { fn drop(&mut self) { rtdebug!("called drop for a task: {}", self as *mut Task as uint); - rtassert!(self.destroyed); + rtassert!(self.state != Armed); } } @@ -634,12 +652,17 @@ mod test { begin_unwind("cause", file!(), line!()) } + #[test] + fn drop_new_task_ok() { + drop(Task::new()); + } + // Task blocking tests #[test] fn block_and_wake() { let task = box Task::new(); let mut task = BlockedTask::block(task).wake().unwrap(); - task.destroyed = true; + task.destroy(); } } From 355c798ac3eba15bb2d53a6c553c6149391f9615 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Wed, 23 Jul 2014 22:49:19 -0700 Subject: [PATCH 2/3] native: Don't deadlock the runtime on spawn failure Previously, the call to bookkeeping::increment() was never paired with a decrement when the spawn failed (due to unwinding). This fixes the problem by returning a "bomb" from increment() which will decrement on drop, and then moving the bomb into the child task's procedure which will be dropped naturally. --- src/libnative/task.rs | 4 ++-- src/librustrt/bookkeeping.rs | 12 +++++++++++- 2 files changed, 13 insertions(+), 3 deletions(-) diff --git a/src/libnative/task.rs b/src/libnative/task.rs index 35367ff2efab3..c72d6c24a7c16 100644 --- a/src/libnative/task.rs +++ b/src/libnative/task.rs @@ -71,7 +71,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) { // Note that this increment must happen *before* the spawn in order to // guarantee that if this task exits it will always end up waiting for the // spawned task to exit. - bookkeeping::increment(); + let token = bookkeeping::increment(); // Spawning a new OS thread guarantees that __morestack will never get // triggered, but we must manually set up the actual stack bounds once this @@ -93,7 +93,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc():Send) { let mut task = task; task.put_runtime(ops); drop(task.run(|| { f.take_unwrap()() }).destroy()); - bookkeeping::decrement(); + drop(token); }) } diff --git a/src/librustrt/bookkeeping.rs b/src/librustrt/bookkeeping.rs index fd290491eaf1e..ba9995e34ca3a 100644 --- a/src/librustrt/bookkeeping.rs +++ b/src/librustrt/bookkeeping.rs @@ -19,14 +19,24 @@ //! decrement() manually. use core::atomics; +use core::ops::Drop; use mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; -pub fn increment() { +pub struct Token(()); + +impl Drop for Token { + fn drop(&mut self) { decrement() } +} + +/// Increment the number of live tasks, returning a token which will decrement +/// the count when dropped. +pub fn increment() -> Token { let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) }; + Token(()) } pub fn decrement() { From 8643a0d61359dfb1ebe38d4aae615e7c836b3d18 Mon Sep 17 00:00:00 2001 From: Alex Crichton Date: Thu, 24 Jul 2014 07:32:14 -0700 Subject: [PATCH 3/3] green: Prevent runtime corruption on spawn failure Like with libnative, when a green task failed to spawn it would leave the world in a corrupt state where the local scheduler had been dropped as well as the local task. Also like libnative, this patch sets up a "bomb" which when it goes off will restore the state of the world. --- src/libgreen/task.rs | 21 +++++++++-- src/librustrt/bookkeeping.rs | 4 +- src/librustrt/task.rs | 2 +- src/test/run-pass/spawn-stack-too-big.rs | 47 ++++++++++++++++++++++++ 4 files changed, 68 insertions(+), 6 deletions(-) create mode 100644 src/test/run-pass/spawn-stack-too-big.rs diff --git a/src/libgreen/task.rs b/src/libgreen/task.rs index 3d3b413384050..12d7b75569782 100644 --- a/src/libgreen/task.rs +++ b/src/libgreen/task.rs @@ -442,15 +442,30 @@ impl Runtime for GreenTask { f: proc():Send) { self.put_task(cur_task); + // First, set up a bomb which when it goes off will restore the local + // task unless its disarmed. This will allow us to gracefully fail from + // inside of `configure` which allocates a new task. + struct Bomb { inner: Option> } + impl Drop for Bomb { + fn drop(&mut self) { + let _ = self.inner.take().map(|task| task.put()); + } + } + let mut bomb = Bomb { inner: Some(self) }; + // Spawns a task into the current scheduler. We allocate the new task's // stack from the scheduler's stack pool, and then configure it // accordingly to `opts`. Afterwards we bootstrap it immediately by // switching to it. // // Upon returning, our task is back in TLS and we're good to return. - let mut sched = self.sched.take_unwrap(); - let sibling = GreenTask::configure(&mut sched.stack_pool, opts, f); - sched.run_task(self, sibling) + let sibling = { + let sched = bomb.inner.get_mut_ref().sched.get_mut_ref(); + GreenTask::configure(&mut sched.stack_pool, opts, f) + }; + let mut me = bomb.inner.take().unwrap(); + let sched = me.sched.take().unwrap(); + sched.run_task(me, sibling) } // Local I/O is provided by the scheduler's event loop diff --git a/src/librustrt/bookkeeping.rs b/src/librustrt/bookkeeping.rs index ba9995e34ca3a..dc96aecff8017 100644 --- a/src/librustrt/bookkeeping.rs +++ b/src/librustrt/bookkeeping.rs @@ -26,7 +26,7 @@ use mutex::{StaticNativeMutex, NATIVE_MUTEX_INIT}; static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT; static mut TASK_LOCK: StaticNativeMutex = NATIVE_MUTEX_INIT; -pub struct Token(()); +pub struct Token { _private: () } impl Drop for Token { fn drop(&mut self) { decrement() } @@ -36,7 +36,7 @@ impl Drop for Token { /// the count when dropped. pub fn increment() -> Token { let _ = unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst) }; - Token(()) + Token { _private: () } } pub fn decrement() { diff --git a/src/librustrt/task.rs b/src/librustrt/task.rs index 0f4d72c9b3268..e3d9b7d136ec2 100644 --- a/src/librustrt/task.rs +++ b/src/librustrt/task.rs @@ -663,6 +663,6 @@ mod test { fn block_and_wake() { let task = box Task::new(); let mut task = BlockedTask::block(task).wake().unwrap(); - task.destroy(); + task.drop(); } } diff --git a/src/test/run-pass/spawn-stack-too-big.rs b/src/test/run-pass/spawn-stack-too-big.rs new file mode 100644 index 0000000000000..e1c4a480d1cc1 --- /dev/null +++ b/src/test/run-pass/spawn-stack-too-big.rs @@ -0,0 +1,47 @@ +// Copyright 2014 The Rust Project Developers. See the COPYRIGHT +// file at the top-level directory of this distribution and at +// http://rust-lang.org/COPYRIGHT. +// +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +// ignore-macos apparently gargantuan mmap requests are ok? + +#![feature(phase)] + +#[phase(plugin)] +extern crate green; +extern crate native; + +use std::task::TaskBuilder; +use native::NativeTaskBuilder; + +green_start!(main) + +fn main() { + test(); + + let (tx, rx) = channel(); + TaskBuilder::new().native().spawn(proc() { + tx.send(test()); + }); + rx.recv(); +} + +#[cfg(not(target_word_size = "64"))] +fn test() {} + +#[cfg(target_word_size = "64")] +fn test() { + let (tx, rx) = channel(); + spawn(proc() { + TaskBuilder::new().stack_size(1024 * 1024 * 1024 * 64).spawn(proc() { + }); + tx.send(()); + }); + + assert!(rx.recv_opt().is_err()); +}