Skip to content

Commit

Permalink
Fix missed task notifications. (libp2p#1210)
Browse files Browse the repository at this point in the history
Addresses libp2p#1206 by always
registering the current task before calling poll_*_notify functions.
This is in the same spirit as the corresponding fix for yamux
in libp2p/rust-yamux#54.

Also adds missing registration of the current task in close()
and flush_all(), which have been observed to cause stalls
when trying to do a graceful connection shutdown / close.
  • Loading branch information
romanb authored Jul 24, 2019
1 parent 4b6d1f8 commit bcc7c4d
Showing 1 changed file with 10 additions and 19 deletions.
29 changes: 10 additions & 19 deletions muxers/mplex/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,13 +283,11 @@ where C: AsyncRead + AsyncWrite,
}
}

inner.notifier_read.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
let elem = match inner.inner.poll_stream_notify(&inner.notifier_read, 0) {
Ok(Async::Ready(Some(item))) => item,
Ok(Async::Ready(None)) => return Err(IoErrorKind::BrokenPipe.into()),
Ok(Async::NotReady) => {
inner.notifier_read.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
return Ok(Async::NotReady);
},
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(err) => {
let err2 = IoError::new(err.kind(), err.to_string());
inner.error = Err(err);
Expand Down Expand Up @@ -333,14 +331,10 @@ where C: AsyncRead + AsyncWrite
if inner.is_shutdown {
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
match inner.inner.start_send_notify(elem, &inner.notifier_write, 0) {
Ok(AsyncSink::Ready) => {
Ok(Async::Ready(()))
},
Ok(AsyncSink::NotReady(_)) => {
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
Ok(Async::NotReady)
},
Ok(AsyncSink::Ready) => Ok(Async::Ready(())),
Ok(AsyncSink::NotReady(_)) => Ok(Async::NotReady),
Err(err) => Err(err)
}
}
Expand Down Expand Up @@ -410,6 +404,7 @@ where C: AsyncRead + AsyncWrite
return Err(IoError::new(IoErrorKind::Other, "connection is shut down"))
}
let inner = &mut *inner; // Avoids borrow errors
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
inner.inner.poll_flush_notify(&inner.notifier_write, 0)
},
OutboundSubstreamState::Done => {
Expand All @@ -420,7 +415,6 @@ where C: AsyncRead + AsyncWrite
match polling {
Ok(Async::Ready(())) => (),
Ok(Async::NotReady) => {
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
return Ok(Async::NotReady)
},
Err(err) => {
Expand Down Expand Up @@ -545,13 +539,8 @@ where C: AsyncRead + AsyncWrite
}

let inner = &mut *inner; // Avoids borrow errors
match inner.inner.poll_flush_notify(&inner.notifier_write, 0)? {
Async::Ready(()) => Ok(Async::Ready(())),
Async::NotReady => {
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
Ok(Async::NotReady)
}
}
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
inner.inner.poll_flush_notify(&inner.notifier_write, 0)
}

fn shutdown_substream(&self, sub: &mut Self::Substream) -> Poll<(), IoError> {
Expand Down Expand Up @@ -585,6 +574,7 @@ where C: AsyncRead + AsyncWrite
#[inline]
fn close(&self) -> Poll<(), IoError> {
let inner = &mut *self.inner.lock();
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
try_ready!(inner.inner.close_notify(&inner.notifier_write, 0));
inner.is_shutdown = true;
Ok(Async::Ready(()))
Expand All @@ -596,6 +586,7 @@ where C: AsyncRead + AsyncWrite
if inner.is_shutdown {
return Ok(Async::Ready(()))
}
inner.notifier_write.to_notify.lock().insert(TASK_ID.with(|&t| t), task::current());
inner.inner.poll_flush_notify(&inner.notifier_write, 0)
}
}
Expand Down

0 comments on commit bcc7c4d

Please sign in to comment.