Skip to content

Commit

Permalink
Add a method to test if split streams come from the same stream. (#1762)
Browse files Browse the repository at this point in the history
* Add a method to test if split streams come from the same stream.

The exposed stream ID can also be used as key in associative containers.

* Document the fact that split stream IDs can dangle.
  • Loading branch information
de-vri-es authored and LucioFranco committed Jan 21, 2020
1 parent 3176d0a commit 5bf78d7
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 1 deletion.
42 changes: 41 additions & 1 deletion tokio/src/io/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,18 @@ cfg_io_util! {
}
}

/// An opaque ID for the parent stream of a split half.
///
/// If you keep a `SplitStreamId` around after both halves have been dropped or reunited,
/// the stream ID is dangling.
/// The same ID may then be used for other split streams.
/// To avoid this, do not keep `SplitStreamId` around after both half have been dropped.
///
/// Note that it is still impossible to unsplit two halves from a different stream,
/// since at-least one half has not been dropped in that scenario.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Debug)]
pub struct SplitStreamId(usize);

struct Inner<T> {
locked: AtomicBool,
stream: UnsafeCell<T>,
Expand All @@ -61,14 +73,28 @@ struct Guard<'a, T> {
}

impl<T> ReadHalf<T> {
/// Get an opaque ID for the parent stream.
///
/// This can be used to check if two halves have been split from the
/// same stream.
/// The stream ID can also be used as key in associative containers.
///
/// Note that stream IDs may dangle when both halves are dropped.
/// See [`SplitStreamId`] for more information.
pub fn stream_id(&self) -> SplitStreamId {
SplitStreamId(&*self.inner as *const Inner<T> as usize)
}

/// Reunite with a previously split `WriteHalf`.
///
/// # Panics
///
/// If this `ReadHalf` and the given `WriteHalf` do not originate from the
/// same `split` operation this method will panic.
/// This can be checked ahead of time by comparing the stream ID
/// of the two halves.
pub fn unsplit(self, wr: WriteHalf<T>) -> T {
if Arc::ptr_eq(&self.inner, &wr.inner) {
if self.stream_id() == wr.stream_id() {
drop(wr);

let inner = Arc::try_unwrap(self.inner)
Expand All @@ -82,6 +108,20 @@ impl<T> ReadHalf<T> {
}
}

impl<T> WriteHalf<T> {
/// Get an opaque ID for the parent stream.
///
/// This can be used to check if two halves have been split from the
/// same stream.
/// The stream ID can also be used as key in associative containers.
///
/// Note that stream IDs may dangle when both halves are dropped.
/// See [`SplitStreamId`] for more information.
pub fn stream_id(&self) -> SplitStreamId {
SplitStreamId(&*self.inner as *const Inner<T> as usize)
}
}

impl<T: AsyncRead> AsyncRead for ReadHalf<T> {
fn poll_read(
self: Pin<&mut Self>,
Expand Down
10 changes: 10 additions & 0 deletions tokio/tests/io_split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,16 @@ fn is_send_and_sync() {
assert_bound::<WriteHalf<RW>>();
}

#[test]
fn split_stream_id() {
let (r1, w1) = split(RW);
let (r2, w2) = split(RW);
assert_eq!(r1.stream_id(), w1.stream_id());
assert_eq!(r1.stream_id(), w1.stream_id());
assert_ne!(r1.stream_id(), w2.stream_id());
assert_ne!(r2.stream_id(), w1.stream_id());
}

#[test]
fn unsplit_ok() {
let (r, w) = split(RW);
Expand Down

0 comments on commit 5bf78d7

Please sign in to comment.