From 9f287c211eb3a92ae3aa31b6f767bb9998f2f147 Mon Sep 17 00:00:00 2001 From: Eric Holk Date: Fri, 3 Aug 2012 18:57:43 -0700 Subject: [PATCH] Refcount tasks in packets to avoid races. Revert "Once again, revert "Use pipes in compiletest"" Fixes #3098 --- src/compiletest/compiletest.rs | 5 --- src/compiletest/procsrv.rs | 27 +++++++------- src/libcore/pipes.rs | 63 ++++++++++++++++++++++++--------- src/rt/rust_task.cpp | 3 -- src/test/bench/shootout-pfib.rs | 2 -- 5 files changed, 61 insertions(+), 39 deletions(-) diff --git a/src/compiletest/compiletest.rs b/src/compiletest/compiletest.rs index 39c4f0f81d097..33b9655aeb2f0 100644 --- a/src/compiletest/compiletest.rs +++ b/src/compiletest/compiletest.rs @@ -8,11 +8,6 @@ import task; import core::result; import result::{ok, err}; -import comm::port; -import comm::chan; -import comm::send; -import comm::recv; - import common::config; import common::mode_run_pass; import common::mode_run_fail; diff --git a/src/compiletest/procsrv.rs b/src/compiletest/procsrv.rs index bb9080becffe6..99b18a67e1e52 100644 --- a/src/compiletest/procsrv.rs +++ b/src/compiletest/procsrv.rs @@ -2,6 +2,8 @@ import run::spawn_process; import io::{writer_util, reader_util}; import libc::{c_int, pid_t}; +import pipes::chan; + export run; #[cfg(target_os = "win32")] @@ -58,29 +60,30 @@ fn run(lib_path: ~str, writeclose(pipe_in.out, input); - let p = comm::port(); - let ch = comm::chan(p); + let p = pipes::port_set(); + let ch = p.chan(); do task::spawn_sched(task::single_threaded) { let errput = readclose(pipe_err.in); - comm::send(ch, (2, errput)); + ch.send((2, errput)); } + let ch = p.chan(); do task::spawn_sched(task::single_threaded) { let output = readclose(pipe_out.in); - comm::send(ch, (1, output)); + ch.send((1, output)); } let status = run::waitpid(pid); let mut errs = ~""; let mut outs = ~""; let mut count = 2; while count > 0 { - let stream = comm::recv(p); - alt check stream { - (1, s) => { - outs = s; - } - (2, s) => { - errs = s; - } + alt p.recv() { + (1, s) => { + outs = s; + } + (2, s) => { + errs = s; + } + _ { fail } }; count -= 1; }; diff --git a/src/libcore/pipes.rs b/src/libcore/pipes.rs index cd4265ada8e6e..963be25b69ff1 100644 --- a/src/libcore/pipes.rs +++ b/src/libcore/pipes.rs @@ -113,7 +113,7 @@ type buffer = { struct packet_header { let mut state: state; - let mut blocked_task: option<*rust_task>; + let mut blocked_task: *rust_task; // This is a reinterpret_cast of a ~buffer, that can also be cast // to a buffer_header if need be. @@ -121,19 +121,21 @@ struct packet_header { new() { self.state = empty; - self.blocked_task = none; + self.blocked_task = ptr::null(); self.buffer = ptr::null(); } // Returns the old state. unsafe fn mark_blocked(this: *rust_task) -> state { - self.blocked_task = some(this); + rustrt::rust_task_ref(this); + let old_task = swap_task(self.blocked_task, this); + assert old_task.is_null(); swap_state_acq(self.state, blocked) } unsafe fn unblock() { - assert self.state != blocked || self.blocked_task != none; - self.blocked_task = none; + let old_task = swap_task(self.blocked_task, ptr::null()); + if !old_task.is_null() { rustrt::rust_task_deref(old_task) } alt swap_state_acq(self.state, empty) { empty | blocked => (), terminated => self.state = terminated, @@ -240,12 +242,26 @@ fn atomic_sub_rel(&dst: int, src: int) -> int { rusti::atomic_sub_rel(dst, src) } +#[doc(hidden)] +fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task { + // It might be worth making both acquire and release versions of + // this. + unsafe { + reinterpret_cast(rusti::atomic_xchng( + *(ptr::mut_addr_of(dst) as *mut int), + src as int)) + } +} + #[doc(hidden)] type rust_task = libc::c_void; extern mod rustrt { #[rust_stack] fn rust_get_task() -> *rust_task; + #[rust_stack] + fn rust_task_ref(task: *rust_task); + fn rust_task_deref(task: *rust_task); #[rust_stack] fn task_clear_event_reject(task: *rust_task); @@ -334,10 +350,11 @@ fn send(-p: send_packet_buffered, full => fail ~"duplicate send", blocked => { debug!{"waking up task for %?", p_}; - alt p.header.blocked_task { - some(task) => rustrt::task_signal_event( - task, ptr::addr_of(p.header) as *libc::c_void), - none => debug!{"just kidding!"} + let old_task = swap_task(p.header.blocked_task, ptr::null()); + if !old_task.is_null() { + rustrt::task_signal_event( + old_task, ptr::addr_of(p.header) as *libc::c_void); + rustrt::rust_task_deref(old_task); } // The receiver will eventually clean this up. @@ -372,7 +389,9 @@ fn try_recv(-p: recv_packet_buffered) let p = unsafe { &*p_ }; let this = rustrt::rust_get_task(); rustrt::task_clear_event_reject(this); - p.header.blocked_task = some(this); + rustrt::rust_task_ref(this); + let old_task = swap_task(p.header.blocked_task, this); + assert old_task.is_null(); let mut first = true; let mut count = SPIN_COUNT; loop { @@ -402,7 +421,10 @@ fn try_recv(-p: recv_packet_buffered) full => { let mut payload = none; payload <-> p.payload; - p.header.blocked_task = none; + let old_task = swap_task(p.header.blocked_task, ptr::null()); + if !old_task.is_null() { + rustrt::rust_task_deref(old_task); + } p.header.state = empty; return some(option::unwrap(payload)) } @@ -410,6 +432,11 @@ fn try_recv(-p: recv_packet_buffered) // This assert detects when we've accidentally unsafely // casted too big of a number to a state. assert old_state == terminated; + + let old_task = swap_task(p.header.blocked_task, ptr::null()); + if !old_task.is_null() { + rustrt::rust_task_deref(old_task); + } return none; } } @@ -437,17 +464,18 @@ fn sender_terminate(p: *packet) { let p = unsafe { &*p }; alt swap_state_rel(p.header.state, terminated) { empty => { + assert p.header.blocked_task.is_null(); // The receiver will eventually clean up. //unsafe { forget(p) } } blocked => { // wake up the target - alt p.header.blocked_task { - some(target) => + let old_task = swap_task(p.header.blocked_task, ptr::null()); + if !old_task.is_null() { rustrt::task_signal_event( - target, - ptr::addr_of(p.header) as *libc::c_void), - none => { debug!{"receiver is already shutting down"} } + old_task, + ptr::addr_of(p.header) as *libc::c_void); + rustrt::rust_task_deref(old_task); } // The receiver will eventually clean up. //unsafe { forget(p) } @@ -457,6 +485,7 @@ fn sender_terminate(p: *packet) { fail ~"you dun goofed" } terminated => { + assert p.header.blocked_task.is_null(); // I have to clean up, use drop_glue } } @@ -465,7 +494,7 @@ fn sender_terminate(p: *packet) { #[doc(hidden)] fn receiver_terminate(p: *packet) { let p = unsafe { &*p }; - assert p.header.blocked_task == none; + assert p.header.blocked_task.is_null(); alt swap_state_rel(p.header.state, terminated) { empty => { // the sender will clean up diff --git a/src/rt/rust_task.cpp b/src/rt/rust_task.cpp index 58a0e3eae6c95..061e87ebff82d 100644 --- a/src/rt/rust_task.cpp +++ b/src/rt/rust_task.cpp @@ -678,9 +678,6 @@ MUST_CHECK bool rust_task::wait_event(void **result) { void rust_task::signal_event(void *event) { - assert(task_state_blocked == state || - task_state_running == state); - scoped_lock with(lifecycle_lock); this->event = event; diff --git a/src/test/bench/shootout-pfib.rs b/src/test/bench/shootout-pfib.rs index 9db4020574299..5b309659ee505 100644 --- a/src/test/bench/shootout-pfib.rs +++ b/src/test/bench/shootout-pfib.rs @@ -1,8 +1,6 @@ // -*- rust -*- // xfail-pretty -// xfail-test - /* A parallel version of fibonacci numbers.