diff --git a/src/async.rs b/src/async.rs index 5293f8d..fdd6bad 100644 --- a/src/async.rs +++ b/src/async.rs @@ -274,6 +274,11 @@ impl<'a, T> SendSink<'a, T> { pub fn capacity(&self) -> Option { self.0.capacity() } + + /// Returns whether the SendSinks are belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.sender().same_channel(other.sender()) + } } impl<'a, T> Sink for SendSink<'a, T> { @@ -503,6 +508,11 @@ impl<'a, T> RecvStream<'a, T> { pub fn capacity(&self) -> Option { self.0.capacity() } + + /// Returns whether the SendSinks are belong to the same channel. + pub fn same_channel(&self, other: &Self) -> bool { + self.0.receiver.same_channel(&*other.0.receiver) + } } impl<'a, T> Clone for RecvStream<'a, T> { diff --git a/src/lib.rs b/src/lib.rs index ff75ce3..f388399 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -756,6 +756,11 @@ impl Sender { shared: Arc::downgrade(&self.shared), } } + + /// Returns whether the senders are belong to the same channel. + pub fn same_channel(&self, other: &Sender) -> bool { + Arc::ptr_eq(&self.shared, &other.shared) + } } impl Clone for Sender { @@ -927,6 +932,11 @@ impl Receiver { pub fn receiver_count(&self) -> usize { self.shared.receiver_count() } + + /// Returns whether the receivers are belong to the same channel. + pub fn same_channel(&self, other: &Receiver) -> bool { + Arc::ptr_eq(&self.shared, &other.shared) + } } impl Clone for Receiver { diff --git a/tests/check_same_channel.rs b/tests/check_same_channel.rs new file mode 100644 index 0000000..edb82c3 --- /dev/null +++ b/tests/check_same_channel.rs @@ -0,0 +1,57 @@ +#[test] +fn same_sender() { + let (tx1, _rx) = flume::unbounded::<()>(); + let tx2 = tx1.clone(); + + assert!(tx1.same_channel(&tx2)); + + let (tx3, _rx) = flume::unbounded::<()>(); + + assert!(!tx1.same_channel(&tx3)); + assert!(!tx2.same_channel(&tx3)); +} + +#[test] +fn same_receiver() { + let (_tx, rx1) = flume::unbounded::<()>(); + let rx2 = rx1.clone(); + + assert!(rx1.same_channel(&rx2)); + + let (_tx, rx3) = flume::unbounded::<()>(); + + assert!(!rx1.same_channel(&rx3)); + assert!(!rx2.same_channel(&rx3)); +} + +#[cfg(feature = "async")] +#[test] +fn same_send_sink() { + let (tx1, _rx) = flume::unbounded::<()>(); + let tx1 = tx1.into_sink(); + let tx2 = tx1.clone(); + + assert!(tx1.same_channel(&tx2)); + + let (tx3, _rx) = flume::unbounded::<()>(); + let tx3 = tx3.into_sink(); + + assert!(!tx1.same_channel(&tx3)); + assert!(!tx2.same_channel(&tx3)); +} + +#[cfg(feature = "async")] +#[test] +fn same_recv_stream() { + let (_tx, rx1) = flume::unbounded::<()>(); + let rx1 = rx1.into_stream(); + let rx2 = rx1.clone(); + + assert!(rx1.same_channel(&rx2)); + + let (_tx, rx3) = flume::unbounded::<()>(); + let rx3 = rx3.into_stream(); + + assert!(!rx1.same_channel(&rx3)); + assert!(!rx2.same_channel(&rx3)); +}