Skip to content

Commit

Permalink
Fix a race condition in the async code
Browse files Browse the repository at this point in the history
The race condition was happening when the Sender sent a value and was
immediatelly dropped between the calls
`self.receiver.shared.recv_sync(None)` and
`self.receiver.shared.is_disconnected()`. More information on github
issue zesterer#120
  • Loading branch information
inetic committed Mar 8, 2023
1 parent ad3f450 commit 5a17b3b
Showing 1 changed file with 27 additions and 20 deletions.
47 changes: 27 additions & 20 deletions src/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,29 +374,36 @@ impl<'a, T> RecvFut<'a, T> {
stream: bool,
) -> Poll<Result<T, RecvError>> {
if self.hook.is_some() {
if let Ok(msg) = self.receiver.shared.recv_sync(None) {
Poll::Ready(Ok(msg))
} else if self.receiver.shared.is_disconnected() {
Poll::Ready(Err(RecvError::Disconnected))
} else {
let hook = self.hook.as_ref().map(Arc::clone).unwrap();
if hook.update_waker(cx.waker()) {
// If the previous hook was awakened, we need to insert it back to the
// queue, otherwise, it remains valid.
wait_lock(&self.receiver.shared.chan).waiting.push_back(hook);
match self.receiver.shared.recv_sync(None) {
Ok(msg) => return Poll::Ready(Ok(msg)),
Err(TryRecvTimeoutError::Disconnected) => {
return Poll::Ready(Err(RecvError::Disconnected))
}
// To avoid a missed wakeup, re-check disconnect status here because the channel might have
// gotten shut down before we had a chance to push our hook
if self.receiver.shared.is_disconnected() {
// And now, to avoid a race condition between the first recv attempt and the disconnect check we
// just performed, attempt to recv again just in case we missed something.
Poll::Ready(self.receiver.shared
_ => (),
}

let hook = self.hook.as_ref().map(Arc::clone).unwrap();
if hook.update_waker(cx.waker()) {
// If the previous hook was awakened, we need to insert it back to the
// queue, otherwise, it remains valid.
wait_lock(&self.receiver.shared.chan)
.waiting
.push_back(hook);
}
// To avoid a missed wakeup, re-check disconnect status here because the channel might have
// gotten shut down before we had a chance to push our hook
if self.receiver.shared.is_disconnected() {
// And now, to avoid a race condition between the first recv attempt and the disconnect check we
// just performed, attempt to recv again just in case we missed something.
Poll::Ready(
self.receiver
.shared
.recv_sync(None)
.map(Ok)
.unwrap_or(Err(RecvError::Disconnected)))
} else {
Poll::Pending
}
.unwrap_or(Err(RecvError::Disconnected)),
)
} else {
Poll::Pending
}
} else {
let mut_self = self.get_mut();
Expand Down

0 comments on commit 5a17b3b

Please sign in to comment.