diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index 34e4dd38c2..c0365c5fed 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -1,5 +1,6 @@ #![feature(test)] +#[macro_use] extern crate futures; extern crate test; @@ -106,7 +107,6 @@ impl Stream for TestSender { Err(_) => panic!(), Ok(AsyncSink::Ready) => { self.last += 1; - assert_eq!(Ok(Async::Ready(())), self.tx.poll_complete()); Ok(Async::Ready(Some(self.last))) } Ok(AsyncSink::NotReady(_)) => { diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs index 7187579666..c0f9a8857d 100644 --- a/src/sync/mpsc/mod.rs +++ b/src/sync/mpsc/mod.rs @@ -649,7 +649,14 @@ impl Sink for Sender { } fn poll_complete(&mut self) -> Poll<(), SendError> { - Ok(Async::Ready(())) + self.poll_ready() + // At this point, the value cannot be returned and `SendError` + // cannot be created with a `T` without breaking backwards + // comptibility. This means we cannot return an error. + // + // That said, there is also no guarantee that a `poll_complete` + // returning `Ok` implies the receiver sees the message. + .or_else(|_| Ok(().into())) } fn close(&mut self) -> Poll<(), SendError> { diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 8df98d490f..faeb614608 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -548,3 +548,19 @@ fn try_send_fail() { assert_eq!(rx.next(), Some(Ok("goodbye"))); assert!(rx.next().is_none()); } + +#[test] +fn bounded_is_really_bounded() { + use futures::Async::*; + let (mut tx, mut rx) = mpsc::channel(0); + lazy(|| { + assert!(tx.start_send(1).unwrap().is_ready()); + // Not ready until we receive + assert!(!tx.poll_complete().unwrap().is_ready()); + // Receive the value + assert_eq!(rx.poll().unwrap(), Ready(Some(1))); + // Now the sender is ready + assert!(tx.poll_complete().unwrap().is_ready()); + Ok::<_, ()>(()) + }).wait().unwrap(); +} diff --git a/tests/sink.rs b/tests/sink.rs index cb2fdcf26d..c8a34d9e03 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -392,6 +392,12 @@ fn fanout_backpressure() { let (item, right_recv) = right_recv.into_future().wait().unwrap(); assert_eq!(item, Some(1)); assert!(flag.get()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); match task.poll_future_notify(&flag, 0).unwrap() { Async::Ready(_) => { },