Skip to content

Commit

Permalink
Refcount tasks in packets to avoid races.
Browse files Browse the repository at this point in the history
Revert "Once again, revert "Use pipes in compiletest""

Fixes #3098
  • Loading branch information
eholk committed Aug 6, 2012
1 parent 86947e4 commit 9f287c2
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 39 deletions.
5 changes: 0 additions & 5 deletions src/compiletest/compiletest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,6 @@ import task;
import core::result;
import result::{ok, err};

import comm::port;
import comm::chan;
import comm::send;
import comm::recv;

import common::config;
import common::mode_run_pass;
import common::mode_run_fail;
Expand Down
27 changes: 15 additions & 12 deletions src/compiletest/procsrv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import run::spawn_process;
import io::{writer_util, reader_util};
import libc::{c_int, pid_t};

import pipes::chan;

export run;

#[cfg(target_os = "win32")]
Expand Down Expand Up @@ -58,29 +60,30 @@ fn run(lib_path: ~str,


writeclose(pipe_in.out, input);
let p = comm::port();
let ch = comm::chan(p);
let p = pipes::port_set();
let ch = p.chan();
do task::spawn_sched(task::single_threaded) {
let errput = readclose(pipe_err.in);
comm::send(ch, (2, errput));
ch.send((2, errput));
}
let ch = p.chan();
do task::spawn_sched(task::single_threaded) {
let output = readclose(pipe_out.in);
comm::send(ch, (1, output));
ch.send((1, output));
}
let status = run::waitpid(pid);
let mut errs = ~"";
let mut outs = ~"";
let mut count = 2;
while count > 0 {
let stream = comm::recv(p);
alt check stream {
(1, s) => {
outs = s;
}
(2, s) => {
errs = s;
}
alt p.recv() {
(1, s) => {
outs = s;
}
(2, s) => {
errs = s;
}
_ { fail }
};
count -= 1;
};
Expand Down
63 changes: 46 additions & 17 deletions src/libcore/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,27 +113,29 @@ type buffer<T: send> = {

struct packet_header {
let mut state: state;
let mut blocked_task: option<*rust_task>;
let mut blocked_task: *rust_task;

// This is a reinterpret_cast of a ~buffer, that can also be cast
// to a buffer_header if need be.
let mut buffer: *libc::c_void;

new() {
self.state = empty;
self.blocked_task = none;
self.blocked_task = ptr::null();
self.buffer = ptr::null();
}

// Returns the old state.
unsafe fn mark_blocked(this: *rust_task) -> state {
self.blocked_task = some(this);
rustrt::rust_task_ref(this);
let old_task = swap_task(self.blocked_task, this);
assert old_task.is_null();
swap_state_acq(self.state, blocked)
}

unsafe fn unblock() {
assert self.state != blocked || self.blocked_task != none;
self.blocked_task = none;
let old_task = swap_task(self.blocked_task, ptr::null());
if !old_task.is_null() { rustrt::rust_task_deref(old_task) }
alt swap_state_acq(self.state, empty) {
empty | blocked => (),
terminated => self.state = terminated,
Expand Down Expand Up @@ -240,12 +242,26 @@ fn atomic_sub_rel(&dst: int, src: int) -> int {
rusti::atomic_sub_rel(dst, src)
}

#[doc(hidden)]
fn swap_task(&dst: *rust_task, src: *rust_task) -> *rust_task {
// It might be worth making both acquire and release versions of
// this.
unsafe {
reinterpret_cast(rusti::atomic_xchng(
*(ptr::mut_addr_of(dst) as *mut int),
src as int))
}
}

#[doc(hidden)]
type rust_task = libc::c_void;

extern mod rustrt {
#[rust_stack]
fn rust_get_task() -> *rust_task;
#[rust_stack]
fn rust_task_ref(task: *rust_task);
fn rust_task_deref(task: *rust_task);

#[rust_stack]
fn task_clear_event_reject(task: *rust_task);
Expand Down Expand Up @@ -334,10 +350,11 @@ fn send<T: send, Tbuffer: send>(-p: send_packet_buffered<T, Tbuffer>,
full => fail ~"duplicate send",
blocked => {
debug!{"waking up task for %?", p_};
alt p.header.blocked_task {
some(task) => rustrt::task_signal_event(
task, ptr::addr_of(p.header) as *libc::c_void),
none => debug!{"just kidding!"}
let old_task = swap_task(p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::task_signal_event(
old_task, ptr::addr_of(p.header) as *libc::c_void);
rustrt::rust_task_deref(old_task);
}

// The receiver will eventually clean this up.
Expand Down Expand Up @@ -372,7 +389,9 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
let p = unsafe { &*p_ };
let this = rustrt::rust_get_task();
rustrt::task_clear_event_reject(this);
p.header.blocked_task = some(this);
rustrt::rust_task_ref(this);
let old_task = swap_task(p.header.blocked_task, this);
assert old_task.is_null();
let mut first = true;
let mut count = SPIN_COUNT;
loop {
Expand Down Expand Up @@ -402,14 +421,22 @@ fn try_recv<T: send, Tbuffer: send>(-p: recv_packet_buffered<T, Tbuffer>)
full => {
let mut payload = none;
payload <-> p.payload;
p.header.blocked_task = none;
let old_task = swap_task(p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
}
p.header.state = empty;
return some(option::unwrap(payload))
}
terminated => {
// This assert detects when we've accidentally unsafely
// casted too big of a number to a state.
assert old_state == terminated;

let old_task = swap_task(p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::rust_task_deref(old_task);
}
return none;
}
}
Expand Down Expand Up @@ -437,17 +464,18 @@ fn sender_terminate<T: send>(p: *packet<T>) {
let p = unsafe { &*p };
alt swap_state_rel(p.header.state, terminated) {
empty => {
assert p.header.blocked_task.is_null();
// The receiver will eventually clean up.
//unsafe { forget(p) }
}
blocked => {
// wake up the target
alt p.header.blocked_task {
some(target) =>
let old_task = swap_task(p.header.blocked_task, ptr::null());
if !old_task.is_null() {
rustrt::task_signal_event(
target,
ptr::addr_of(p.header) as *libc::c_void),
none => { debug!{"receiver is already shutting down"} }
old_task,
ptr::addr_of(p.header) as *libc::c_void);
rustrt::rust_task_deref(old_task);
}
// The receiver will eventually clean up.
//unsafe { forget(p) }
Expand All @@ -457,6 +485,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
fail ~"you dun goofed"
}
terminated => {
assert p.header.blocked_task.is_null();
// I have to clean up, use drop_glue
}
}
Expand All @@ -465,7 +494,7 @@ fn sender_terminate<T: send>(p: *packet<T>) {
#[doc(hidden)]
fn receiver_terminate<T: send>(p: *packet<T>) {
let p = unsafe { &*p };
assert p.header.blocked_task == none;
assert p.header.blocked_task.is_null();
alt swap_state_rel(p.header.state, terminated) {
empty => {
// the sender will clean up
Expand Down
3 changes: 0 additions & 3 deletions src/rt/rust_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -678,9 +678,6 @@ MUST_CHECK bool rust_task::wait_event(void **result) {

void
rust_task::signal_event(void *event) {
assert(task_state_blocked == state ||
task_state_running == state);

scoped_lock with(lifecycle_lock);

this->event = event;
Expand Down
2 changes: 0 additions & 2 deletions src/test/bench/shootout-pfib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
// -*- rust -*-
// xfail-pretty

// xfail-test

/*
A parallel version of fibonacci numbers.
Expand Down

0 comments on commit 9f287c2

Please sign in to comment.