Skip to content

Commit

Permalink
sync: fix possible dangling pointer in semaphore (#2340)
Browse files Browse the repository at this point in the history
## Motivation

When cancelling futures which are waiting to acquire semaphore permits,
there is a possible dangling pointer if notified futures are dropped
after the notified wakers have been split into a separate list. Because
these futures' wait queue nodes are no longer in the main list guarded
by the lock, their `Drop` impls will complete immediately, and they may
be dropped while still in the list of tasks to notify.

## Solution

This branch fixes this by popping from the wait list inside the lock.
The wakers of popped nodes are temporarily stored in a stack array,
so that they can be notified after the lock is released. Since the
size of the stack array is fixed, we may in some cases have to loop
multiple times, acquiring and releasing the lock, until all permits
have been released. This may also have the possible side advantage of
preventing a thread releasing a very large number of permits from
starving other threads that need to enqueue waiters.

I've also added a loom test that can reliably reproduce a segfault
on master, but passes on this branch (after a lot of iterations).

Signed-off-by: Eliza Weisman <eliza@buoyant.io>
  • Loading branch information
hawkw authored Mar 27, 2020
1 parent 5c71268 commit 00725f6
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 233 deletions.
146 changes: 66 additions & 80 deletions tokio/src/sync/batch_semaphore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,8 @@ impl Semaphore {
return;
}

// Assign permits to the wait queue, returning a list containing all the
// waiters at the back of the queue that received enough permits to wake
// up.
let notified = self.add_permits_locked(added, self.waiters.lock().unwrap());

// Once we release the lock, notify all woken waiters.
notify_all(notified);
// Assign permits to the wait queue
self.add_permits_locked(added, self.waiters.lock().unwrap());
}

/// Closes the semaphore. This prevents the semaphore from issuing new
Expand All @@ -144,20 +139,22 @@ impl Semaphore {
// semaphore implementation.
#[allow(dead_code)]
pub(crate) fn close(&self) {
let notified = {
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
// unqueued waiter to acquire all the permits it needs immediately,
// it won't touch the wait list. Therefore, we have to set a bit on
// the permit counter as well. However, we must do this while
// holding the lock --- otherwise, if we set the bit and then wait
// to acquire the lock we'll enter an inconsistent state where the
// permit counter is closed, but the wait list is not.
self.permits.fetch_or(Self::CLOSED, Release);
waiters.closed = true;
waiters.queue.take_all()
};
notify_all(notified)
let mut waiters = self.waiters.lock().unwrap();
// If the semaphore's permits counter has enough permits for an
// unqueued waiter to acquire all the permits it needs immediately,
// it won't touch the wait list. Therefore, we have to set a bit on
// the permit counter as well. However, we must do this while
// holding the lock --- otherwise, if we set the bit and then wait
// to acquire the lock we'll enter an inconsistent state where the
// permit counter is closed, but the wait list is not.
self.permits.fetch_or(Self::CLOSED, Release);
waiters.closed = true;
while let Some(mut waiter) = waiters.queue.pop_back() {
let waker = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
if let Some(waker) = waker {
waker.wake();
}
}
}

pub(crate) fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> {
Expand Down Expand Up @@ -189,58 +186,60 @@ impl Semaphore {

/// Release `rem` permits to the semaphore's wait list, starting from the
/// end of the queue.
///
/// This returns a new `LinkedList` containing all the waiters that received
/// enough permits to be notified. Once the lock on the wait list is
/// released, this list should be drained and the waiters in it notified.
///
/// If `rem` exceeds the number of permits needed by the wait list, the
/// remainder are assigned back to the semaphore.
fn add_permits_locked(
&self,
mut rem: usize,
mut waiters: MutexGuard<'_, Waitlist>,
) -> LinkedList<Waiter> {
// Starting from the back of the wait queue, assign each waiter as many
// permits as it needs until we run out of permits to assign.
let mut last = None;
for waiter in waiters.queue.iter().rev() {
// Was the waiter assigned enough permits to wake it?
if !waiter.assign_permits(&mut rem) {
break;
fn add_permits_locked(&self, mut rem: usize, waiters: MutexGuard<'_, Waitlist>) {
let mut wakers: [Option<Waker>; 8] = Default::default();
let mut lock = Some(waiters);
let mut is_empty = false;
while rem > 0 {
let mut waiters = lock.take().unwrap_or_else(|| self.waiters.lock().unwrap());
'inner: for slot in &mut wakers[..] {
// Was the waiter assigned enough permits to wake it?
match waiters.queue.last() {
Some(waiter) => {
if !waiter.assign_permits(&mut rem) {
break 'inner;
}
}
None => {
is_empty = true;
// If we assigned permits to all the waiters in the queue, and there are
// still permits left over, assign them back to the semaphore.
break 'inner;
}
};
let mut waiter = waiters.queue.pop_back().unwrap();
*slot = unsafe { waiter.as_mut().waker.with_mut(|waker| (*waker).take()) };
}
last = Some(NonNull::from(waiter));
}

// If we assigned permits to all the waiters in the queue, and there are
// still permits left over, assign them back to the semaphore.
if rem > 0 {
let permits = rem << Self::PERMIT_SHIFT;
assert!(
permits < Self::MAX_PERMITS,
"cannot add more than MAX_PERMITS permits ({})",
Self::MAX_PERMITS
);
let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
assert!(
prev + permits <= Self::MAX_PERMITS,
"number of added permits ({}) would overflow MAX_PERMITS ({})",
rem,
Self::MAX_PERMITS
);
}
if rem > 0 && is_empty {
let permits = rem << Self::PERMIT_SHIFT;
assert!(
permits < Self::MAX_PERMITS,
"cannot add more than MAX_PERMITS permits ({})",
Self::MAX_PERMITS
);
let prev = self.permits.fetch_add(rem << Self::PERMIT_SHIFT, Release);
assert!(
prev + permits <= Self::MAX_PERMITS,
"number of added permits ({}) would overflow MAX_PERMITS ({})",
rem,
Self::MAX_PERMITS
);
rem = 0;
}

// Split off the queue at the last waiter that was satisfied, creating a
// new list. Once we release the lock, we'll drain this list and notify
// the waiters in it.
if let Some(waiter) = last {
// Safety: it's only safe to call `split_back` with a pointer to a
// node in the same list as the one we call `split_back` on. Since
// we got the waiter pointer from the list's iterator, this is fine.
unsafe { waiters.queue.split_back(waiter) }
} else {
LinkedList::new()
drop(waiters); // release the lock

wakers
.iter_mut()
.filter_map(Option::take)
.for_each(Waker::wake);
}

assert_eq!(rem, 0);
}

fn poll_acquire(
Expand Down Expand Up @@ -354,18 +353,6 @@ impl fmt::Debug for Semaphore {
}
}

/// Pop all waiters from `list`, starting at the end of the queue, and notify
/// them.
fn notify_all(mut list: LinkedList<Waiter>) {
while let Some(waiter) = list.pop_back() {
let waker = unsafe { waiter.as_ref().waker.with_mut(|waker| (*waker).take()) };

waker
.expect("if a node is in the wait list, it must have a waker")
.wake();
}
}

impl Waiter {
fn new(num_permits: u16) -> Self {
Waiter {
Expand Down Expand Up @@ -471,8 +458,7 @@ impl Drop for Acquire<'_> {

let acquired_permits = self.num_permits as usize - self.node.state.load(Acquire);
if acquired_permits > 0 {
let notified = self.semaphore.add_permits_locked(acquired_permits, waiters);
notify_all(notified);
self.semaphore.add_permits_locked(acquired_permits, waiters);
}
}
}
Expand Down
44 changes: 44 additions & 0 deletions tokio/src/sync/tests/loom_semaphore_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,50 @@ fn concurrent_close() {
});
}

#[test]
fn concurrent_cancel() {
async fn poll_and_cancel(semaphore: Arc<Semaphore>) {
let mut acquire1 = Some(semaphore.acquire(1));
let mut acquire2 = Some(semaphore.acquire(1));
poll_fn(|cx| {
// poll the acquire future once, and then immediately throw
// it away. this simulates a situation where a future is
// polled and then cancelled, such as by a timeout.
if let Some(acquire) = acquire1.take() {
pin!(acquire);
let _ = acquire.poll(cx);
}
if let Some(acquire) = acquire2.take() {
pin!(acquire);
let _ = acquire.poll(cx);
}
Poll::Ready(())
})
.await
}

loom::model(|| {
let semaphore = Arc::new(Semaphore::new(0));
let t1 = {
let semaphore = semaphore.clone();
thread::spawn(move || block_on(poll_and_cancel(semaphore)))
};
let t2 = {
let semaphore = semaphore.clone();
thread::spawn(move || block_on(poll_and_cancel(semaphore)))
};
let t3 = {
let semaphore = semaphore.clone();
thread::spawn(move || block_on(poll_and_cancel(semaphore)))
};

t1.join().unwrap();
semaphore.release(10);
t2.join().unwrap();
t3.join().unwrap();
});
}

#[test]
fn batch() {
let mut b = loom::model::Builder::new();
Expand Down
Loading

0 comments on commit 00725f6

Please sign in to comment.