From 8a159121fe8d00ec0921bbd531670926be4a6080 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 31 Aug 2018 15:11:42 -0700 Subject: [PATCH 1/3] fix mpsc::Sender::poll_complete impl This fixes the "bounded senders are actually unbounded" problem. The problem is discussed here: rust-lang-nursery/futures-rs#984 --- src/sync/mpsc/mod.rs | 9 ++++++++- tests/mpsc.rs | 16 ++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/src/sync/mpsc/mod.rs b/src/sync/mpsc/mod.rs index 7187579666..c0f9a8857d 100644 --- a/src/sync/mpsc/mod.rs +++ b/src/sync/mpsc/mod.rs @@ -649,7 +649,14 @@ impl Sink for Sender { } fn poll_complete(&mut self) -> Poll<(), SendError> { - Ok(Async::Ready(())) + self.poll_ready() + // At this point, the value cannot be returned and `SendError` + // cannot be created with a `T` without breaking backwards + // comptibility. This means we cannot return an error. + // + // That said, there is also no guarantee that a `poll_complete` + // returning `Ok` implies the receiver sees the message. + .or_else(|_| Ok(().into())) } fn close(&mut self) -> Poll<(), SendError> { diff --git a/tests/mpsc.rs b/tests/mpsc.rs index 8df98d490f..faeb614608 100644 --- a/tests/mpsc.rs +++ b/tests/mpsc.rs @@ -548,3 +548,19 @@ fn try_send_fail() { assert_eq!(rx.next(), Some(Ok("goodbye"))); assert!(rx.next().is_none()); } + +#[test] +fn bounded_is_really_bounded() { + use futures::Async::*; + let (mut tx, mut rx) = mpsc::channel(0); + lazy(|| { + assert!(tx.start_send(1).unwrap().is_ready()); + // Not ready until we receive + assert!(!tx.poll_complete().unwrap().is_ready()); + // Receive the value + assert_eq!(rx.poll().unwrap(), Ready(Some(1))); + // Now the sender is ready + assert!(tx.poll_complete().unwrap().is_ready()); + Ok::<_, ()>(()) + }).wait().unwrap(); +} From f659cb6309f72f89bd57369bf2a772280cd2c5d7 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Fri, 31 Aug 2018 17:20:06 -0700 Subject: [PATCH 2/3] Fix test that relied on broken behavior --- tests/sink.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/sink.rs b/tests/sink.rs index cb2fdcf26d..c8a34d9e03 100644 --- a/tests/sink.rs +++ b/tests/sink.rs @@ -392,6 +392,12 @@ fn fanout_backpressure() { let (item, right_recv) = right_recv.into_future().wait().unwrap(); assert_eq!(item, Some(1)); assert!(flag.get()); + let (item, left_recv) = left_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); + assert!(flag.get()); + assert!(task.poll_future_notify(&flag, 0).unwrap().is_not_ready()); + let (item, right_recv) = right_recv.into_future().wait().unwrap(); + assert_eq!(item, Some(2)); match task.poll_future_notify(&flag, 0).unwrap() { Async::Ready(_) => { }, From 4707a1aa1bc840caa83843f6b316959a151dd218 Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Sat, 1 Sep 2018 10:51:31 -0700 Subject: [PATCH 3/3] Fix bench The bench is written assuming internal implementation details of the channel which have changed after this fix. I removed the assertion which isn't neecessary. --- benches/sync_mpsc.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/benches/sync_mpsc.rs b/benches/sync_mpsc.rs index 34e4dd38c2..c0365c5fed 100644 --- a/benches/sync_mpsc.rs +++ b/benches/sync_mpsc.rs @@ -1,5 +1,6 @@ #![feature(test)] +#[macro_use] extern crate futures; extern crate test; @@ -106,7 +107,6 @@ impl Stream for TestSender { Err(_) => panic!(), Ok(AsyncSink::Ready) => { self.last += 1; - assert_eq!(Ok(Async::Ready(())), self.tx.poll_complete()); Ok(Async::Ready(Some(self.last))) } Ok(AsyncSink::NotReady(_)) => {