From 3be11de1190b547f40a782c2ac7f56bbddcc9e3b Mon Sep 17 00:00:00 2001 From: driftluo Date: Tue, 13 Oct 2020 10:53:35 +0800 Subject: [PATCH] feat: make yamux independent of the specific runtime --- yamux/Cargo.toml | 18 ++++++++++-- yamux/src/session.rs | 66 +++++++++++++++++++++++++++++++++++++++----- yamux/src/stream.rs | 15 ++++------ 3 files changed, 80 insertions(+), 19 deletions(-) diff --git a/yamux/Cargo.toml b/yamux/Cargo.toml index a04da953..06466ce6 100644 --- a/yamux/Cargo.toml +++ b/yamux/Cargo.toml @@ -10,12 +10,24 @@ edition = "2018" [dependencies] bytes = "0.5.0" futures = { version = "0.3.0" } -tokio = { version = "0.2.0", features = ["time", "rt-core"] } +tokio = { version = "0.2.0" } tokio-util = { version = "0.3.0", features = ["codec"] } log = "0.4" +futures-timer = { version = "3.0.2", optional = true } + [dev-dependencies] env_logger = "0.6" -rand = "0.6" +rand = "0.7" bytesize = "1" -tokio = { version = "0.2.0", features = ["time", "dns", "tcp", "io-util"] } +tokio = { version = "0.2.0", features = ["time", "dns", "tcp", "io-util", "rt-core"] } + +[features] +default = ["tokio-timer"] +# use tokio timer +tokio-timer = ["tokio/time"] +# generic timer, this means that yamux can run under any runtime +# the difference of `AsyncRead/AsyncWrite` can be converted by `tokio-util` +generic-timer = ["futures-timer"] +# use futures-timer's wasm feature +wasm = ["generic-timer", "futures-timer/wasm-bindgen"] diff --git a/yamux/src/session.rs b/yamux/src/session.rs index 0261b7cd..b17ab356 100644 --- a/yamux/src/session.rs +++ b/yamux/src/session.rs @@ -13,10 +13,7 @@ use futures::{ Sink, Stream, }; use log::debug; -use tokio::{ - prelude::{AsyncRead, AsyncWrite}, - time::Interval, -}; +use tokio::prelude::{AsyncRead, AsyncWrite}; use tokio_util::codec::Framed; use crate::{ @@ -28,6 +25,8 @@ use crate::{ StreamId, }; +use timer::{interval, Interval}; + const BUF_SHRINK_THRESHOLD: usize = u8::max_value() as usize; const TIMEOUT: Duration = Duration::from_secs(30); @@ -40,11 +39,11 @@ pub struct Session { eof: bool, // remoteGoAway indicates the remote side does - // not want futher connections. Must be first for alignment. + // not want further connections. Must be first for alignment. remote_go_away: bool, // localGoAway indicates that we should stop - // accepting futher connections. Must be first for alignment. + // accepting further connections. Must be first for alignment. local_go_away: bool, // nextStreamID is the next stream we should @@ -118,7 +117,7 @@ where FrameCodec::default().max_frame_size(config.max_stream_window_size), ); let keepalive = if config.enable_keepalive { - Some(tokio::time::interval(config.keepalive_interval)) + Some(interval(config.keepalive_interval)) } else { None }; @@ -620,6 +619,59 @@ where } } +mod timer { + #[cfg(feature = "generic-timer")] + pub use generic_time::{interval, Interval}; + #[cfg(feature = "tokio-timer")] + pub use tokio::time::{interval, Interval}; + + #[cfg(feature = "generic-timer")] + mod generic_time { + use futures::{Future, Stream}; + use futures_timer::Delay; + use std::{ + pin::Pin, + task::{Context, Poll}, + time::Duration, + }; + + pub struct Interval { + delay: Delay, + period: Duration, + } + + impl Interval { + fn new(period: Duration) -> Self { + Self { + delay: Delay::new(period), + period, + } + } + } + + impl Stream for Interval { + type Item = (); + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match Pin::new(&mut self.delay).poll(cx) { + Poll::Ready(_) => { + let dur = self.period; + self.delay.reset(dur); + Poll::Ready(Some(())) + } + Poll::Pending => Poll::Pending, + } + } + } + + pub fn interval(period: Duration) -> Interval { + assert!(period > Duration::new(0, 0), "`period` must be non-zero."); + + Interval::new(period) + } + } +} + #[cfg(test)] mod test { use super::Session; diff --git a/yamux/src/stream.rs b/yamux/src/stream.rs index ac336730..114a9387 100644 --- a/yamux/src/stream.rs +++ b/yamux/src/stream.rs @@ -5,7 +5,7 @@ use futures::{ channel::mpsc::{Receiver, Sender}, stream::FusedStream, task::AtomicWaker, - SinkExt, Stream, + Stream, }; use std::{ @@ -114,9 +114,8 @@ impl StreamHandle { } fn send_go_away(&mut self) { - let mut sender = self.event_sender.clone(); self.state = StreamState::LocalClosing; - tokio::spawn(async move { sender.send(StreamEvent::GoAway).await }); + let _ignore = self.event_sender.try_send(StreamEvent::GoAway); } #[inline] @@ -516,12 +515,10 @@ impl Drop for StreamHandle { let frame = Frame::new_window_update(flags, self.id, 0); let rst_event = StreamEvent::Frame(frame); let event = StreamEvent::StateChanged((self.id, StreamState::Closed)); - let mut sender = self.event_sender.clone(); - - tokio::spawn(async move { - let _ignore = sender.send(rst_event).await; - let _ignore = sender.send(event).await; - }); + // It is indeed possible that it cannot be sent here, but ignore it for now + // we should wait a `async drop` api to do this instead of any runtime + let _ignore = self.event_sender.try_send(rst_event); + let _ignore = self.event_sender.try_send(event); } } }