Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom yamux mode #1691

Merged
merged 5 commits into from
Aug 17, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions core/src/transport/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,37 @@ where
Multiplex { info: Some(i), upgrade }
})
}

/// Like [`Builder::multiplex`] but accepts a function which returns the upgrade.
///
/// The supplied function is applied to [`ConnectionInfo`] and [`ConnectedPoint`]
/// and returns an upgrade which receives the I/O resource `C` and must
/// produce a [`StreamMuxer`] `M`. The transport must already be authenticated.
/// This ends the (regular) transport upgrade process, yielding the underlying,
/// configured transport.
///
/// ## Transitions
///
/// * I/O upgrade: `C -> M`.
/// * Transport output: `(I, C) -> (I, M)`.
pub fn multiplex_ext<C, M, U, I, E, F>(self, up: F)
-> AndThen<T, impl FnOnce((I, C), ConnectedPoint) -> Multiplex<C, U, I> + Clone>
where
T: Transport<Output = (I, C)>,
C: AsyncRead + AsyncWrite + Unpin,
M: StreamMuxer,
I: ConnectionInfo,
U: InboundUpgrade<Negotiated<C>, Output = M, Error = E>,
U: OutboundUpgrade<Negotiated<C>, Output = M, Error = E> + Clone,
E: Error + 'static,
F: for<'a> FnOnce(&'a I, &'a ConnectedPoint) -> U + Clone
{
let version = self.version;
self.inner.and_then(move |(i, c), endpoint| {
let upgrade = upgrade::apply(c, up(&i, &endpoint), endpoint, version);
Multiplex { info: Some(i), upgrade }
})
}
}

/// An upgrade that authenticates the remote peer, typically
Expand Down
33 changes: 23 additions & 10 deletions muxers/yamux/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use parking_lot::Mutex;
use std::{fmt, io, iter, ops::{Deref, DerefMut}, pin::Pin, task::Context};
use thiserror::Error;

pub use yamux::WindowUpdateMode;
pub use yamux::{Mode, WindowUpdateMode};

/// A Yamux connection.
///
Expand Down Expand Up @@ -165,15 +165,26 @@ where

/// The yamux configuration.
#[derive(Clone)]
pub struct Config(yamux::Config);
pub struct Config {
config: yamux::Config,
mode: Option<yamux::Mode>
}

/// The yamux configuration for upgrading I/O resources which are ![`Send`].
#[derive(Clone)]
pub struct LocalConfig(Config);

impl Config {
pub fn new(cfg: yamux::Config) -> Self {
Config(cfg)
Config { config: cfg, mode: None }
}

/// Override the connection mode.
///
/// This will always use the provided mode during the connection upgrade,
/// irrespective of whether an inbound or outbound upgrade happens.
pub fn override_mode(&mut self, mode: yamux::Mode) {
self.mode = Some(mode)
}

/// Turn this into a [`LocalConfig`] for use with upgrades of ![`Send`] resources.
Expand All @@ -184,21 +195,21 @@ impl Config {

impl Default for Config {
fn default() -> Self {
Config(yamux::Config::default())
Config::new(yamux::Config::default())
}
}

impl Deref for Config {
type Target = yamux::Config;

fn deref(&self) -> &Self::Target {
&self.0
&self.config
}
}

impl DerefMut for Config {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.0
&mut self.config
}
}

Expand Down Expand Up @@ -229,7 +240,7 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Server)))
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Server))))
}
}

Expand All @@ -242,7 +253,8 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_inbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Server)))
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Server))))
}
}

Expand All @@ -255,7 +267,7 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::new(io, self.0, yamux::Mode::Client)))
future::ready(Ok(Yamux::new(io, self.config, self.mode.unwrap_or(yamux::Mode::Client))))
}
}

Expand All @@ -268,7 +280,8 @@ where
type Future = future::Ready<Result<Self::Output, Self::Error>>;

fn upgrade_outbound(self, io: C, _: Self::Info) -> Self::Future {
future::ready(Ok(Yamux::local(io, (self.0).0, yamux::Mode::Client)))
let cfg = self.0;
future::ready(Ok(Yamux::local(io, cfg.config, cfg.mode.unwrap_or(yamux::Mode::Client))))
}
}

Expand Down