Skip to content

Commit

Permalink
Allow StreamMuxer to notify changes in the address (#1621)
Browse files Browse the repository at this point in the history
* Allow StreamMuxer to notify changes in the address

* Fix doc link

* Revert accidental rename

* Other accidental rename

Co-authored-by: Roman Borschel <romanb@users.noreply.github.com>
  • Loading branch information
tomaka and romanb authored Jun 30, 2020
1 parent d5b2635 commit 826f513
Show file tree
Hide file tree
Showing 23 changed files with 330 additions and 57 deletions.
9 changes: 9 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# 0.20.0 [????-??-??]

- Rename `StreamMuxer::poll_inbound` to `poll_event` and change the
return value to `StreamMuxerEvent`. This new `StreamMuxerEvent` makes
it possible for the multiplexing layer to notify the upper layers of
a change in the address of the underlying connection.

- Add `ConnectionHandler::inject_address_change`.

# 0.19.2 [2020-06-22]

- Add PartialOrd and Ord for PeerId
Expand Down
27 changes: 25 additions & 2 deletions core/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,16 @@ impl ConnectedPoint {
ConnectedPoint::Listener { .. } => true
}
}

/// Modifies the address of the remote stored in this struct.
///
/// For `Dialer`, this modifies `address`. For `Listener`, this modifies `send_back_addr`.
pub fn set_remote_address(&mut self, new_address: Multiaddr) {
match self {
ConnectedPoint::Dialer { address } => *address = new_address,
ConnectedPoint::Listener { send_back_addr, .. } => *send_back_addr = new_address,
}
}
}

/// Information about a successfully established connection.
Expand Down Expand Up @@ -169,6 +179,15 @@ impl ConnectionInfo for PeerId {
}
}

/// Event generated by a [`Connection`].
#[derive(Debug, Clone)]
pub enum Event<T> {
/// Event generated by the [`ConnectionHandler`].
Handler(T),
/// Address of the remote has changed.
AddressChange(Multiaddr),
}

/// A multiplexed connection to a peer with an associated `ConnectionHandler`.
pub struct Connection<TMuxer, THandler>
where
Expand Down Expand Up @@ -239,7 +258,7 @@ where
/// Polls the connection for events produced by the associated handler
/// as a result of I/O activity on the substream multiplexer.
pub fn poll(mut self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Result<THandler::OutEvent, ConnectionError<THandler::Error>>>
-> Poll<Result<Event<THandler::OutEvent>, ConnectionError<THandler::Error>>>
{
loop {
let mut io_pending = false;
Expand All @@ -255,6 +274,10 @@ where
let endpoint = SubstreamEndpoint::Dialer(user_data);
self.handler.inject_substream(substream, endpoint)
}
Poll::Ready(Ok(SubstreamEvent::AddressChange(address))) => {
self.handler.inject_address_change(&address);
return Poll::Ready(Ok(Event::AddressChange(address)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::IO(err))),
}

Expand All @@ -269,7 +292,7 @@ where
self.muxing.open_substream(user_data);
}
Poll::Ready(Ok(ConnectionHandlerEvent::Custom(event))) => {
return Poll::Ready(Ok(event));
return Poll::Ready(Ok(Event::Handler(event)));
}
Poll::Ready(Err(err)) => return Poll::Ready(Err(ConnectionError::Handler(err))),
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/connection/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::PeerId;
use crate::{Multiaddr, PeerId};
use std::{task::Context, task::Poll};
use super::{Connected, SubstreamEndpoint};

Expand Down Expand Up @@ -58,6 +58,9 @@ pub trait ConnectionHandler {
/// Notifies the handler of an event.
fn inject_event(&mut self, event: Self::InEvent);

/// Notifies the handler of a change in the address of the remote.
fn inject_address_change(&mut self, new_address: &Multiaddr);

/// Polls the handler for events.
///
/// Returning an error will close the connection to the remote.
Expand Down
31 changes: 30 additions & 1 deletion core/src/connection/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ use std::{
collections::hash_map,
error,
fmt,
mem,
pin::Pin,
task::{Context, Poll},
};
use super::{
Connected,
ConnectedPoint,
Connection,
ConnectionError,
ConnectionHandler,
Expand Down Expand Up @@ -220,7 +222,17 @@ pub enum Event<'a, I, O, H, TE, HE, C> {
entry: EstablishedEntry<'a, I, C>,
/// The produced event.
event: O
}
},

/// A connection to a node has changed its address.
AddressChange {
/// The entry associated with the connection that changed address.
entry: EstablishedEntry<'a, I, C>,
/// The former [`ConnectedPoint`].
old_endpoint: ConnectedPoint,
/// The new [`ConnectedPoint`].
new_endpoint: ConnectedPoint,
},
}

impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
Expand Down Expand Up @@ -369,6 +381,23 @@ impl<I, O, H, TE, HE, C> Manager<I, O, H, TE, HE, C> {
let _ = task.remove();
Event::PendingConnectionError { id, error, handler }
}
task::Event::AddressChange { id: _, new_address } => {
let (new, old) = if let TaskState::Established(c) = &mut task.get_mut().state {
let mut new_endpoint = c.endpoint.clone();
new_endpoint.set_remote_address(new_address);
let old_endpoint = mem::replace(&mut c.endpoint, new_endpoint.clone());
(new_endpoint, old_endpoint)
} else {
unreachable!(
"`Event::AddressChange` implies (2) occurred on that task and thus (3)."
)
};
Event::AddressChange {
entry: EstablishedEntry { task },
old_endpoint: old,
new_endpoint: new,
}
},
task::Event::Error { id, error } => {
let id = ConnectionId(id);
let task = task.remove();
Expand Down
18 changes: 15 additions & 3 deletions core/src/connection/manager/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
Multiaddr,
muxing::StreamMuxer,
connection::{
self,
Close,
Connected,
Connection,
Expand Down Expand Up @@ -55,17 +57,20 @@ pub enum Event<T, H, TE, HE, C> {
Error { id: TaskId, error: ConnectionError<HE> },
/// A pending connection failed.
Failed { id: TaskId, error: PendingConnectionError<TE>, handler: H },
/// A node we are connected to has changed its address.
AddressChange { id: TaskId, new_address: Multiaddr },
/// Notify the manager of an event from the connection.
Notify { id: TaskId, event: T }
Notify { id: TaskId, event: T },
}

impl<T, H, TE, HE, C> Event<T, H, TE, HE, C> {
pub fn id(&self) -> &TaskId {
match self {
Event::Established { id, .. } => id,
Event::Error { id, .. } => id,
Event::Notify { id, .. } => id,
Event::Failed { id, .. } => id,
Event::AddressChange { id, .. } => id,
Event::Notify { id, .. } => id,
}
}
}
Expand Down Expand Up @@ -245,13 +250,20 @@ where
this.state = State::EstablishedPending(connection);
return Poll::Pending
}
Poll::Ready(Ok(event)) => {
Poll::Ready(Ok(connection::Event::Handler(event))) => {
this.state = State::EstablishedReady {
connection: Some(connection),
event: Event::Notify { id, event }
};
continue 'poll
}
Poll::Ready(Ok(connection::Event::AddressChange(new_address))) => {
this.state = State::EstablishedReady {
connection: Some(connection),
event: Event::AddressChange { id, new_address }
};
continue 'poll
}
Poll::Ready(Err(error)) => {
// Notify the manager of the error via an event,
// dropping the connection.
Expand Down
39 changes: 38 additions & 1 deletion core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ pub enum PoolEvent<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TC
/// The produced event.
event: TOutEvent,
},

/// The connection to a node has changed its address.
AddressChange {
/// The connection that has changed address.
connection: EstablishedConnection<'a, TInEvent, TConnInfo, TPeerId>,
/// The new endpoint.
new_endpoint: ConnectedPoint,
/// The old endpoint.
old_endpoint: ConnectedPoint,
},
}

impl<'a, TInEvent, TOutEvent, THandler, TTransErr, THandlerErr, TConnInfo, TPeerId> fmt::Debug
Expand Down Expand Up @@ -162,6 +172,13 @@ where
.field("event", event)
.finish()
},
PoolEvent::AddressChange { ref connection, ref new_endpoint, ref old_endpoint } => {
f.debug_struct("PoolEvent::AddressChange")
.field("conn_info", connection.info())
.field("new_endpoint", new_endpoint)
.field("old_endpoint", old_endpoint)
.finish()
},
}
}
}
Expand Down Expand Up @@ -639,7 +656,27 @@ where
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
}
},
manager::Event::AddressChange { entry, new_endpoint, old_endpoint } => {
let id = entry.id();

match self.established.get_mut(entry.connected().peer_id()) {
Some(list) => *list.get_mut(&id)
.expect("state inconsistency: entry is `EstablishedEntry` but absent \
from `established`") = new_endpoint.clone(),
None => unreachable!("since `entry` is an `EstablishedEntry`.")
};

match self.get(id) {
Some(PoolConnection::Established(connection)) =>
return Poll::Ready(PoolEvent::AddressChange {
connection,
new_endpoint,
old_endpoint,
}),
_ => unreachable!("since `entry` is an `EstablishedEntry`.")
}
},
}
}
}
Expand Down
20 changes: 17 additions & 3 deletions core/src/connection/substream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use crate::muxing::{StreamMuxer, SubstreamRef, substream_from_ref};
use crate::muxing::{StreamMuxer, StreamMuxerEvent, SubstreamRef, substream_from_ref};
use futures::prelude::*;
use multiaddr::Multiaddr;
use smallvec::SmallVec;
use std::sync::Arc;
use std::{fmt, io::Error as IoError, pin::Pin, task::Context, task::Poll};
Expand Down Expand Up @@ -95,6 +96,12 @@ where
/// destroyed or `close_graceful` is called.
substream: Substream<TMuxer>,
},

/// Address to the remote has changed. The previous one is now obsolete.
///
/// > **Note**: This can for example happen when using the QUIC protocol, where the two nodes
/// > can change their IP address while retaining the same QUIC connection.
AddressChange(Multiaddr),
}

/// Identifier for a substream being opened.
Expand Down Expand Up @@ -145,13 +152,15 @@ where
/// Provides an API similar to `Future`.
pub fn poll(&mut self, cx: &mut Context) -> Poll<Result<SubstreamEvent<TMuxer, TUserData>, IoError>> {
// Polling inbound substream.
match self.inner.poll_inbound(cx) {
Poll::Ready(Ok(substream)) => {
match self.inner.poll_event(cx) {
Poll::Ready(Ok(StreamMuxerEvent::InboundSubstream(substream))) => {
let substream = substream_from_ref(self.inner.clone(), substream);
return Poll::Ready(Ok(SubstreamEvent::InboundSubstream {
substream,
}));
}
Poll::Ready(Ok(StreamMuxerEvent::AddressChange(addr))) =>
return Poll::Ready(Ok(SubstreamEvent::AddressChange(addr))),
Poll::Ready(Err(err)) => return Poll::Ready(Err(err.into())),
Poll::Pending => {}
}
Expand Down Expand Up @@ -253,6 +262,11 @@ where
.field("substream", substream)
.finish()
},
SubstreamEvent::AddressChange(address) => {
f.debug_struct("SubstreamEvent::AddressChange")
.field("address", address)
.finish()
},
}
}
}
24 changes: 20 additions & 4 deletions core/src/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use crate::{
muxing::StreamMuxer,
muxing::{StreamMuxer, StreamMuxerEvent},
ProtocolName,
transport::{Transport, ListenerEvent, TransportError},
Multiaddr
Expand Down Expand Up @@ -189,10 +189,26 @@ where
type OutboundSubstream = EitherOutbound<A, B>;
type Error = IoError;

fn poll_inbound(&self, cx: &mut Context) -> Poll<Result<Self::Substream, Self::Error>> {
fn poll_event(&self, cx: &mut Context) -> Poll<Result<StreamMuxerEvent<Self::Substream>, Self::Error>> {
match self {
EitherOutput::First(inner) => inner.poll_inbound(cx).map(|p| p.map(EitherOutput::First)).map_err(|e| e.into()),
EitherOutput::Second(inner) => inner.poll_inbound(cx).map(|p| p.map(EitherOutput::Second)).map_err(|e| e.into()),
EitherOutput::First(inner) => inner.poll_event(cx).map(|result| {
result.map_err(|e| e.into()).map(|event| {
match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) =>
StreamMuxerEvent::InboundSubstream(EitherOutput::First(substream))
}
})
}),
EitherOutput::Second(inner) => inner.poll_event(cx).map(|result| {
result.map_err(|e| e.into()).map(|event| {
match event {
StreamMuxerEvent::AddressChange(addr) => StreamMuxerEvent::AddressChange(addr),
StreamMuxerEvent::InboundSubstream(substream) =>
StreamMuxerEvent::InboundSubstream(EitherOutput::Second(substream))
}
})
}),
}
}

Expand Down
Loading

0 comments on commit 826f513

Please sign in to comment.