diff --git a/core/src/nodes/collection.rs b/core/src/nodes/collection.rs index 12e8d1ca579..44d75fa0fa7 100644 --- a/core/src/nodes/collection.rs +++ b/core/src/nodes/collection.rs @@ -30,25 +30,27 @@ use crate::{ }; use fnv::FnvHashMap; use futures::prelude::*; -use std::{collections::hash_map::Entry, error, fmt, mem}; +use std::{collections::hash_map::Entry, error, fmt, hash::Hash, mem}; mod tests; -// TODO: make generic over PeerId - /// Implementation of `Stream` that handles a collection of nodes. -pub struct CollectionStream { +pub struct CollectionStream { /// Object that handles the tasks. - inner: HandledNodesTasks, + inner: HandledNodesTasks, /// List of nodes, with the task id that handles this node. The corresponding entry in `tasks` /// must always be in the `Connected` state. - nodes: FnvHashMap, + nodes: FnvHashMap, /// List of tasks and their state. If `Connected`, then a corresponding entry must be present /// in `nodes`. - tasks: FnvHashMap, + tasks: FnvHashMap>, } -impl fmt::Debug for CollectionStream { +impl fmt::Debug for + CollectionStream +where + TPeerId: fmt::Debug, +{ fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { let mut list = f.debug_list(); for (id, task) in &self.tasks { @@ -67,18 +69,18 @@ impl fmt::Debug for Colle /// State of a task. #[derive(Debug, Clone, PartialEq, Eq)] -enum TaskState { +enum TaskState { /// Task is attempting to reach a peer. Pending, /// The task is connected to a peer. - Connected(PeerId), + Connected(TPeerId), } /// Event that can happen on the `CollectionStream`. -pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr> { +pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TPeerId> { /// A connection to a node has succeeded. You must use the provided event in order to accept /// the connection. - NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>), + NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>), /// A connection to a node has been closed. /// @@ -86,7 +88,7 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr /// substream attempt is pending. NodeClosed { /// Identifier of the node. - peer_id: PeerId, + peer_id: TPeerId, }, /// A connection to a node has errored. @@ -94,7 +96,7 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr /// Can only happen after a node has been successfully reached. NodeError { /// Identifier of the node. - peer_id: PeerId, + peer_id: TPeerId, /// The error that happened. error: HandledNodeError, }, @@ -112,16 +114,18 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr /// A node has produced an event. NodeEvent { /// Identifier of the node. - peer_id: PeerId, + peer_id: TPeerId, /// The produced event. event: TOutEvent, }, } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for + CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> where TOutEvent: fmt::Debug, TReachErr: fmt::Debug, THandlerErr: fmt::Debug, + TPeerId: Eq + Hash + Clone + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { @@ -159,19 +163,23 @@ where TOutEvent: fmt::Debug, /// Event that happens when we reach a node. #[must_use = "The node reached event is used to accept the newly-opened connection"] -pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr: 'a> { +pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr: 'a, TPeerId: 'a = PeerId> { /// Peer id we connected to. - peer_id: PeerId, + peer_id: TPeerId, /// The task id that reached the node. id: TaskId, /// The `CollectionStream` we are referencing. - parent: &'a mut CollectionStream, + parent: &'a mut CollectionStream, } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> { +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> +where + TPeerId: Eq + Hash + Clone, +{ /// Returns the peer id of the node that has been reached. #[inline] - pub fn peer_id(&self) -> &PeerId { + pub fn peer_id(&self) -> &TPeerId { &self.peer_id } @@ -189,11 +197,11 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE } /// Accepts the new node. - pub fn accept(self) -> (CollectionNodeAccept, PeerId) { + pub fn accept(self) -> (CollectionNodeAccept, TPeerId) { // Set the state of the task to `Connected`. let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id); let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone())); - debug_assert_eq!(_former_state, Some(TaskState::Pending)); + debug_assert!(_former_state == Some(TaskState::Pending)); // It is possible that we already have a task connected to the same peer. In this // case, we need to emit a `NodeReplaced` event. @@ -204,7 +212,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE self.nodes are valid tasks in the HandledNodesTasks; QED") .close(); let _former_other_state = self.parent.tasks.remove(&former_task_id); - debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone()))); + debug_assert!(_former_other_state == Some(TaskState::Connected(self.peer_id.clone()))); // TODO: we unfortunately have to clone the peer id here (CollectionNodeAccept::ReplacedExisting, self.peer_id.clone()) @@ -223,7 +231,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE /// /// Has the same effect as dropping the event without accepting it. #[inline] - pub fn deny(self) -> PeerId { + pub fn deny(self) -> TPeerId { // TODO: we unfortunately have to clone the id here, in order to be explicit let peer_id = self.peer_id.clone(); drop(self); // Just to be explicit @@ -231,7 +239,11 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE } } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> { +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> +where + TPeerId: Eq + Hash + Clone + fmt::Debug, +{ fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { f.debug_struct("CollectionReachEvent") .field("peer_id", &self.peer_id) @@ -240,7 +252,9 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for C } } -impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> { +impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop for + CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> +{ fn drop(&mut self) { let task_state = self.parent.tasks.remove(&self.id); debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false }); @@ -266,7 +280,11 @@ pub enum CollectionNodeAccept { #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct ReachAttemptId(TaskId); -impl CollectionStream { +impl + CollectionStream +where + TPeerId: Eq + Hash + Clone, +{ /// Creates a new empty collection. #[inline] pub fn new() -> Self { @@ -284,8 +302,8 @@ impl CollectionStream(&mut self, future: TFut, handler: THandler) -> ReachAttemptId where - TFut: Future + Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + TFut: Future + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? TReachErr: error::Error + Send + 'static, @@ -294,6 +312,7 @@ impl CollectionStream CollectionStream Option> { + pub fn peer_mut(&mut self, id: &TPeerId) -> Option> { let task = match self.nodes.get(id) { Some(&task) => task, None => return None, @@ -358,7 +377,7 @@ impl CollectionStream bool { + pub fn has_connection(&self, id: &TPeerId) -> bool { self.nodes.contains_key(id) } @@ -366,7 +385,7 @@ impl CollectionStream impl Iterator { + pub fn connections(&self) -> impl Iterator { self.nodes.keys() } @@ -375,7 +394,7 @@ impl CollectionStream **Note**: we use a regular `poll` method instead of implementing `Stream` in order to /// > remove the `Err` variant, but also because we want the `CollectionStream` to stay /// > borrowed if necessary. - pub fn poll(&mut self) -> Async> { + pub fn poll(&mut self) -> Async> { let item = match self.inner.poll() { Async::Ready(item) => item, Async::NotReady => return Async::NotReady, @@ -487,13 +506,16 @@ impl fmt::Display for InterruptError { impl error::Error for InterruptError {} /// Access to a peer in the collection. -pub struct PeerMut<'a, TInEvent: 'a> { +pub struct PeerMut<'a, TInEvent: 'a, TPeerId: 'a = PeerId> { inner: HandledNodesTask<'a, TInEvent>, - tasks: &'a mut FnvHashMap, - nodes: &'a mut FnvHashMap, + tasks: &'a mut FnvHashMap>, + nodes: &'a mut FnvHashMap, } -impl<'a, TInEvent> PeerMut<'a, TInEvent> { +impl<'a, TInEvent, TPeerId> PeerMut<'a, TInEvent, TPeerId> +where + TPeerId: Eq + Hash, +{ /// Sends an event to the given node. #[inline] pub fn send_event(&mut self, event: TInEvent) { diff --git a/core/src/nodes/handled_node_tasks.rs b/core/src/nodes/handled_node_tasks.rs index 32c713da359..def5ab5af45 100644 --- a/core/src/nodes/handled_node_tasks.rs +++ b/core/src/nodes/handled_node_tasks.rs @@ -40,8 +40,6 @@ use void::Void; mod tests; -// TODO: make generic over PeerId - // Implementor notes // ================= // @@ -59,7 +57,7 @@ mod tests; // conditions in the user's code. See similar comments in the documentation of `NodeStream`. /// Implementation of `Stream` that handles a collection of nodes. -pub struct HandledNodesTasks { +pub struct HandledNodesTasks { /// A map between active tasks to an unbounded sender, used to control the task. Closing the sender interrupts /// the task. It is possible that we receive messages from tasks that used to be in this list /// but no longer are, in which case we should ignore them. @@ -73,12 +71,14 @@ pub struct HandledNodesTasks + Send>; 8]>, /// Sender to emit events to the outside. Meant to be cloned and sent to tasks. - events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage, TaskId)>, /// Receiver side for the events. - events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, + events_rx: mpsc::UnboundedReceiver<(InToExtMessage, TaskId)>, } -impl fmt::Debug for HandledNodesTasks { +impl fmt::Debug for + HandledNodesTasks +{ fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { f.debug_list() .entries(self.tasks.keys().cloned()) @@ -122,30 +122,30 @@ where } /// Prototype for a `NodeHandler`. -pub trait IntoNodeHandler { +pub trait IntoNodeHandler { /// The node handler. type Handler: NodeHandler; /// Builds the node handler. /// - /// The `PeerId` is the id of the node the handler is going to handle. - fn into_handler(self, remote_peer_id: &PeerId) -> Self::Handler; + /// The `TPeerId` is the id of the node the handler is going to handle. + fn into_handler(self, remote_peer_id: &TPeerId) -> Self::Handler; } -impl IntoNodeHandler for T +impl IntoNodeHandler for T where T: NodeHandler { type Handler = Self; #[inline] - fn into_handler(self, _: &PeerId) -> Self { + fn into_handler(self, _: &TPeerId) -> Self { self } } /// Event that can happen on the `HandledNodesTasks`. #[derive(Debug)] -pub enum HandledNodesEvent { +pub enum HandledNodesEvent { /// A task has been closed. /// /// This happens once the node handler closes or an error happens. @@ -165,7 +165,7 @@ pub enum HandledNodesEvent { /// Identifier of the task that succeeded. id: TaskId, /// Identifier of the node. - peer_id: PeerId, + peer_id: TPeerId, }, /// A task has produced an event. @@ -181,7 +181,9 @@ pub enum HandledNodesEvent { #[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)] pub struct TaskId(usize); -impl HandledNodesTasks { +impl + HandledNodesTasks +{ /// Creates a new empty collection. #[inline] pub fn new() -> Self { @@ -202,8 +204,8 @@ impl HandledNodesTask /// events. pub fn add_reach_attempt(&mut self, future: TFut, handler: TIntoHandler) -> TaskId where - TFut: Future + Send + 'static, - TIntoHandler: IntoNodeHandler + Send + 'static, + TFut: Future + Send + 'static, + TIntoHandler: IntoNodeHandler + Send + 'static, TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, TReachErr: error::Error + Send + 'static, THandlerErr: error::Error + Send + 'static, @@ -212,6 +214,7 @@ impl HandledNodesTask ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required? TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required + TPeerId: Send + 'static, { let task_id = self.next_task_id; self.next_task_id.0 += 1; @@ -264,7 +267,7 @@ impl HandledNodesTask } /// Provides an API similar to `Stream`, except that it cannot produce an error. - pub fn poll(&mut self) -> Async> { + pub fn poll(&mut self) -> Async> { for to_spawn in self.to_spawn.drain() { tokio_executor::spawn(to_spawn); } @@ -350,8 +353,10 @@ impl<'a, TInEvent> fmt::Debug for Task<'a, TInEvent> { } } -impl Stream for HandledNodesTasks { - type Item = HandledNodesEvent; +impl Stream for + HandledNodesTasks +{ + type Item = HandledNodesEvent; type Error = Void; // TODO: use ! once stable #[inline] @@ -362,9 +367,9 @@ impl Stream for Handl /// Message to transmit from a task to the public API. #[derive(Debug)] -enum InToExtMessage { +enum InToExtMessage { /// A connection to a node has succeeded. - NodeReached(PeerId), + NodeReached(TPeerId), /// The task closed. TaskClosed(Result<(), TaskClosedEvent>, Option), /// An event from the node. @@ -373,26 +378,26 @@ enum InToExtMessage { /// Implementation of `Future` that handles a single node, and all the communications between /// the various components of the `HandledNodesTasks`. -struct NodeTask +struct NodeTask where TMuxer: StreamMuxer, - TIntoHandler: IntoNodeHandler, + TIntoHandler: IntoNodeHandler, TIntoHandler::Handler: NodeHandler>, { /// Sender to transmit events to the outside. - events_tx: mpsc::UnboundedSender<(InToExtMessage::Error>, TaskId)>, + events_tx: mpsc::UnboundedSender<(InToExtMessage::Error, TPeerId>, TaskId)>, /// Receiving end for events sent from the main `HandledNodesTasks`. in_events_rx: stream::Fuse>, /// Inner state of the `NodeTask`. - inner: NodeTaskInner, + inner: NodeTaskInner, /// Identifier of the attempt. id: TaskId, } -enum NodeTaskInner +enum NodeTaskInner where TMuxer: StreamMuxer, - TIntoHandler: IntoNodeHandler, + TIntoHandler: IntoNodeHandler, TIntoHandler::Handler: NodeHandler>, { /// Future to resolve to connect to the node. @@ -414,12 +419,12 @@ where Poisoned, } -impl Future for - NodeTask +impl Future for + NodeTask where TMuxer: StreamMuxer, - TFut: Future, - TIntoHandler: IntoNodeHandler, + TFut: Future, + TIntoHandler: IntoNodeHandler, TIntoHandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent>, { type Item = (); diff --git a/core/src/nodes/handled_node_tasks/tests.rs b/core/src/nodes/handled_node_tasks/tests.rs index ac00a7572bd..bd46c2cdedb 100644 --- a/core/src/nodes/handled_node_tasks/tests.rs +++ b/core/src/nodes/handled_node_tasks/tests.rs @@ -42,6 +42,7 @@ type TestNodeTask = NodeTask< InEvent, OutEvent, io::Error, + PeerId, >; struct NodeTaskTestBuilder { @@ -75,9 +76,9 @@ impl NodeTaskTestBuilder { fn node_task(&mut self) -> ( TestNodeTask, UnboundedSender, - UnboundedReceiver<(InToExtMessage, TaskId)>, + UnboundedReceiver<(InToExtMessage, TaskId)>, ) { - let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage, TaskId)>(); + let (events_from_node_task_tx, events_from_node_task_rx) = mpsc::unbounded::<(InToExtMessage, TaskId)>(); let (events_to_node_task_tx, events_to_node_task_rx) = mpsc::unbounded::(); let inner = if self.inner_node.is_some() { NodeTaskInner::Node(self.inner_node.take().unwrap()) @@ -285,7 +286,7 @@ fn iterate_over_all_tasks() { #[test] fn add_reach_attempt_prepares_a_new_task() { - let mut handled_nodes = HandledNodesTasks::new(); + let mut handled_nodes: HandledNodesTasks<_, _, _, _, _> = HandledNodesTasks::new(); assert_eq!(handled_nodes.tasks().count(), 0); assert_eq!(handled_nodes.to_spawn.len(), 0); diff --git a/core/src/nodes/raw_swarm.rs b/core/src/nodes/raw_swarm.rs index 4325974a3a3..141286d9c7d 100644 --- a/core/src/nodes/raw_swarm.rs +++ b/core/src/nodes/raw_swarm.rs @@ -47,13 +47,13 @@ use std::{ collections::hash_map::{Entry, OccupiedEntry}, error, fmt, + hash::Hash, }; mod tests; /// Implementation of `Stream` that handles the nodes. -#[derive(Debug)] -pub struct RawSwarm +pub struct RawSwarm where TTrans: Transport, { @@ -61,30 +61,59 @@ where listeners: ListenersStream, /// The nodes currently active. - active_nodes: CollectionStream, THandlerErr>, + active_nodes: CollectionStream, THandlerErr, TPeerId>, /// The reach attempts of the swarm. /// This needs to be a separate struct in order to handle multiple mutable borrows issues. - reach_attempts: ReachAttempts, + reach_attempts: ReachAttempts, /// Max numer of incoming connections. incoming_limit: Option, } -#[derive(Debug)] -struct ReachAttempts { +impl fmt::Debug for + RawSwarm +where + TTrans: Transport + fmt::Debug, + TPeerId: fmt::Debug + Eq + Hash, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_struct("ReachAttempts") + .field("listeners", &self.listeners) + .field("active_nodes", &self.active_nodes) + .field("reach_attempts", &self.reach_attempts) + .field("incoming_limit", &self.incoming_limit) + .finish() + } +} + +struct ReachAttempts { /// Peer ID of the node we control. - local_peer_id: PeerId, + local_peer_id: TPeerId, /// Attempts to reach a peer. - out_reach_attempts: FnvHashMap, + out_reach_attempts: FnvHashMap, /// Reach attempts for incoming connections, and outgoing connections for which we don't know /// the peer ID. other_reach_attempts: Vec<(ReachAttemptId, ConnectedPoint)>, /// For each peer ID we're connected to, contains the endpoint we're connected to. - connected_points: FnvHashMap, + connected_points: FnvHashMap, +} + +impl fmt::Debug for ReachAttempts +where + TPeerId: fmt::Debug + Eq + Hash, +{ + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_struct("ReachAttempts") + .field("local_peer_id", &self.local_peer_id) + .field("out_reach_attempts", &self.out_reach_attempts) + .field("other_reach_attempts", &self.other_reach_attempts) + .field("connected_points", &self.connected_points) + .finish() + } } /// Attempt to reach a peer. @@ -99,7 +128,7 @@ struct OutReachAttempt { } /// Event that can happen on the `RawSwarm`. -pub enum RawSwarmEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> +pub enum RawSwarmEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a = PeerId> where TTrans: Transport, { @@ -114,7 +143,7 @@ where }, /// A new connection arrived on a listener. - IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>), + IncomingConnection(IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), /// A new connection was arriving on a listener, but an error happened when negotiating it. /// @@ -132,7 +161,7 @@ where /// A new connection to a peer has been opened. Connected { /// Id of the peer. - peer_id: PeerId, + peer_id: TPeerId, /// If `Listener`, then we received the connection. If `Dial`, then it's a connection that /// we opened. endpoint: ConnectedPoint, @@ -141,7 +170,7 @@ where /// A connection to a peer has been replaced with a new one. Replaced { /// Id of the peer. - peer_id: PeerId, + peer_id: TPeerId, /// Endpoint we were connected to. closed_endpoint: ConnectedPoint, /// If `Listener`, then we received the connection. If `Dial`, then it's a connection that @@ -155,7 +184,7 @@ where /// substream attempt is pending. NodeClosed { /// Identifier of the node. - peer_id: PeerId, + peer_id: TPeerId, /// Endpoint we were connected to. endpoint: ConnectedPoint, }, @@ -163,7 +192,7 @@ where /// The handler of a node has produced an error. NodeError { /// Identifier of the node. - peer_id: PeerId, + peer_id: TPeerId, /// Endpoint we were connected to. endpoint: ConnectedPoint, /// The error that happened. @@ -178,13 +207,13 @@ where remain_addrs_attempt: usize, /// Id of the peer we were trying to dial. - peer_id: PeerId, + peer_id: TPeerId, /// The multiaddr we failed to reach. multiaddr: Multiaddr, /// The error that happened. - error: RawSwarmReachError, + error: RawSwarmReachError, }, /// Failed to reach a peer that we were trying to dial. @@ -202,18 +231,20 @@ where /// A node produced a custom event. NodeEvent { /// Id of the node that produced the event. - peer_id: PeerId, + peer_id: TPeerId, /// Event that was produced by the node. event: TOutEvent, }, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> fmt::Debug for RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for + RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where TOutEvent: fmt::Debug, TTrans: Transport, TTrans::Error: fmt::Debug, THandlerErr: fmt::Debug, + TPeerId: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { @@ -288,27 +319,29 @@ where /// Internal error type that contains all the possible errors that can happen in a reach attempt. #[derive(Debug)] -enum InternalReachErr { +enum InternalReachErr { /// Error in the transport layer. Transport(TransportError), /// We successfully reached the peer, but there was a mismatch between the expected id and the /// actual id of the peer. PeerIdMismatch { /// The peer id that the node reports. - obtained: PeerId, + obtained: TPeerId, }, /// The negotiated `PeerId` is the same as the one of the local node. FoundLocalPeerId, } -impl fmt::Display for InternalReachErr -where TTransErr: fmt::Display +impl fmt::Display for InternalReachErr +where + TTransErr: fmt::Display, + TPeerId: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { InternalReachErr::Transport(err) => write!(f, "{}", err), InternalReachErr::PeerIdMismatch { obtained } => { - write!(f, "Peer ID mismatch, obtained: {}", obtained.to_base58()) + write!(f, "Peer ID mismatch, obtained: {:?}", obtained) }, InternalReachErr::FoundLocalPeerId => { write!(f, "Remote has the same PeerId as us") @@ -317,8 +350,10 @@ where TTransErr: fmt::Display } } -impl error::Error for InternalReachErr -where TTransErr: error::Error + 'static +impl error::Error for InternalReachErr +where + TTransErr: error::Error + 'static, + TPeerId: fmt::Debug, { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { @@ -331,7 +366,7 @@ where TTransErr: error::Error + 'static /// Error that can happen when trying to reach a node. #[derive(Debug)] -pub enum RawSwarmReachError { +pub enum RawSwarmReachError { /// Error in the transport layer. Transport(TransportError), @@ -339,25 +374,29 @@ pub enum RawSwarmReachError { /// actual id of the peer. PeerIdMismatch { /// The peer id that the node reports. - obtained: PeerId, + obtained: TPeerId, } } -impl fmt::Display for RawSwarmReachError -where TTransErr: fmt::Display +impl fmt::Display for RawSwarmReachError +where + TTransErr: fmt::Display, + TPeerId: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { RawSwarmReachError::Transport(err) => write!(f, "{}", err), RawSwarmReachError::PeerIdMismatch { obtained } => { - write!(f, "Peer ID mismatch, obtained: {}", obtained.to_base58()) + write!(f, "Peer ID mismatch, obtained: {:?}", obtained) }, } } } -impl error::Error for RawSwarmReachError -where TTransErr: error::Error + 'static +impl error::Error for RawSwarmReachError +where + TTransErr: error::Error + 'static, + TPeerId: fmt::Debug, { fn source(&self) -> Option<&(dyn error::Error + 'static)> { match self { @@ -442,30 +481,30 @@ where TTransErr: error::Error + 'static } /// A new connection arrived on a listener. -pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> +pub struct IncomingConnectionEvent<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a> where TTrans: Transport { /// The produced upgrade. upgrade: TTrans::ListenerUpgrade, /// PeerId of the local node. - local_peer_id: PeerId, + local_peer_id: TPeerId, /// Address of the listener which received the connection. listen_addr: Multiaddr, /// Address used to send back data to the remote. send_back_addr: Multiaddr, /// Reference to the `active_nodes` field of the swarm. - active_nodes: &'a mut CollectionStream, THandlerErr>, + active_nodes: &'a mut CollectionStream, THandlerErr, TPeerId>, /// Reference to the `other_reach_attempts` field of the swarm. other_reach_attempts: &'a mut Vec<(ReachAttemptId, ConnectedPoint)>, } -impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr> - IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId> + IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where - TTrans: Transport, + TTrans: Transport, TTrans::Error: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -474,6 +513,7 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, + TPeerId: fmt::Debug + Eq + Hash + Clone + Send + 'static, { /// Starts processing the incoming connection and sets the handler to use for it. #[inline] @@ -505,7 +545,8 @@ where } } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + IncomingConnectionEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where TTrans: Transport { /// Returns the `IncomingInfo` corresponding to this incoming connection. @@ -617,19 +658,20 @@ impl<'a> IncomingInfo<'a> { } } -impl - RawSwarm +impl + RawSwarm where TTrans: Transport + Clone, TMuxer: StreamMuxer, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, + TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, { /// Creates a new node events stream. #[inline] - pub fn new(transport: TTrans, local_peer_id: PeerId) -> Self { + pub fn new(transport: TTrans, local_peer_id: TPeerId) -> Self { // TODO: with_capacity? RawSwarm { listeners: ListenersStream::new(transport), @@ -647,7 +689,7 @@ where /// Creates a new node event stream with incoming connections limit. #[inline] pub fn new_with_incoming_limit(transport: TTrans, - local_peer_id: PeerId, incoming_limit: Option) -> Self + local_peer_id: TPeerId, incoming_limit: Option) -> Self { RawSwarm { incoming_limit, @@ -709,7 +751,7 @@ where /// /// This is the same value as was passed to `new()`. #[inline] - pub fn local_peer_id(&self) -> &PeerId { + pub fn local_peer_id(&self) -> &TPeerId { &self.reach_attempts.local_peer_id } @@ -718,7 +760,7 @@ where /// The second parameter is the handler to use if we manage to reach a node. pub fn dial(&mut self, addr: Multiaddr, handler: THandler) -> Result<(), TransportError> where - TTrans: Transport, + TTrans: Transport, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, @@ -788,7 +830,7 @@ where /// Grants access to a struct that represents a peer. #[inline] - pub fn peer(&mut self, peer_id: PeerId) -> Peer { + pub fn peer(&mut self, peer_id: TPeerId) -> Peer { if peer_id == self.reach_attempts.local_peer_id { return Peer::LocalNode; } @@ -831,9 +873,9 @@ where /// /// It is a logic error to call this method if we already have an outgoing attempt to the /// given peer. - fn start_dial_out(&mut self, peer_id: PeerId, handler: THandler, first: Multiaddr, rest: Vec) + fn start_dial_out(&mut self, peer_id: TPeerId, handler: THandler, first: Multiaddr, rest: Vec) where - TTrans: Transport, + TTrans: Transport, TTrans::Dial: Send + 'static, TTrans::Error: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, @@ -875,9 +917,9 @@ where } /// Provides an API similar to `Stream`, except that it cannot error. - pub fn poll(&mut self) -> Async> + pub fn poll(&mut self) -> Async> where - TTrans: Transport, + TTrans: Transport, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TTrans::ListenerUpgrade: Send + 'static, @@ -886,7 +928,7 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -999,12 +1041,12 @@ where /// Internal struct indicating an action to perform of the swarm. #[derive(Debug)] #[must_use] -struct ActionItem { - start_dial_out: Option<(PeerId, THandler, Multiaddr, Vec)>, +struct ActionItem { + start_dial_out: Option<(TPeerId, THandler, Multiaddr, Vec)>, interrupt: Option, } -impl Default for ActionItem { +impl Default for ActionItem { fn default() -> Self { ActionItem { start_dial_out: None, @@ -1019,17 +1061,18 @@ impl Default for ActionItem { /// /// > **Note**: The event **must** have been produced by the collection of nodes, otherwise /// > panics will likely happen. -fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr>( - reach_attempts: &mut ReachAttempts, - event: CollectionReachEvent, THandlerErr> -) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>) +fn handle_node_reached<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>( + reach_attempts: &mut ReachAttempts, + event: CollectionReachEvent, THandlerErr, TPeerId>, +) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>) where - TTrans: Transport + Clone, + TTrans: Transport + Clone, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, + TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, { // We first start looking in the incoming attempts. While this makes the code less optimal, // it also makes the logic easier. @@ -1137,8 +1180,11 @@ where /// This means that if `local` and `other` both dial each other, the connection from `local` should /// be kept and the one from `other` will be dropped. #[inline] -fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool { - local.as_bytes() < other.as_bytes() +fn has_dial_prio(local: &TPeerId, other: &TPeerId) -> bool +where + TPeerId: AsRef<[u8]>, +{ + local.as_ref() < other.as_ref() } /// Handles a reach error event from the collection. @@ -1147,13 +1193,15 @@ fn has_dial_prio(local: &PeerId, other: &PeerId) -> bool { /// /// > **Note**: The event **must** have been produced by the collection of nodes, otherwise /// > panics will likely happen. -fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>( - reach_attempts: &mut ReachAttempts, +fn handle_reach_error<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>( + reach_attempts: &mut ReachAttempts, reach_id: ReachAttemptId, - error: InternalReachErr, + error: InternalReachErr, handler: THandler, -) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>) -where TTrans: Transport +) -> (ActionItem, RawSwarmEvent<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>) +where + TTrans: Transport, + TPeerId: Eq + Hash + Clone, { // Search for the attempt in `out_reach_attempts`. // TODO: could be more optimal than iterating over everything @@ -1251,29 +1299,31 @@ where TTrans: Transport } /// State of a peer in the system. -pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> +pub enum Peer<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a> where TTrans: Transport, { /// We are connected to this peer. - Connected(PeerConnected<'a, TInEvent>), + Connected(PeerConnected<'a, TInEvent, TPeerId>), /// We are currently attempting to connect to this peer. - PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>), + PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), /// We are not connected to this peer at all. /// /// > **Note**: It is however possible that a pending incoming connection is being negotiated /// > and will connect to this peer, but we don't know it yet. - NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>), + NotConnected(PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), /// The requested peer is the local node. LocalNode, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> fmt::Debug for Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for + Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where TTrans: Transport, + TPeerId: Eq + Hash + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { match *self { @@ -1302,10 +1352,10 @@ where } // TODO: add other similar methods that wrap to the ones of `PeerNotConnected` -impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr> - Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TMuxer, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + Peer<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where - TTrans: Transport + Clone, + TTrans: Transport + Clone, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, @@ -1313,14 +1363,15 @@ where TMuxer::Substream: Send, TInEvent: Send + 'static, TOutEvent: Send + 'static, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, + TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, { /// If we are connected, returns the `PeerConnected`. #[inline] - pub fn as_connected(self) -> Option> { + pub fn as_connected(self) -> Option> { match self { Peer::Connected(peer) => Some(peer), _ => None, @@ -1329,7 +1380,7 @@ where /// If a connection is pending, returns the `PeerPendingConnect`. #[inline] - pub fn as_pending_connect(self) -> Option> { + pub fn as_pending_connect(self) -> Option> { match self { Peer::PendingConnect(peer) => Some(peer), _ => None, @@ -1338,7 +1389,7 @@ where /// If we are not connected, returns the `PeerNotConnected`. #[inline] - pub fn as_not_connected(self) -> Option> { + pub fn as_not_connected(self) -> Option> { match self { Peer::NotConnected(peer) => Some(peer), _ => None, @@ -1353,7 +1404,7 @@ where /// Returns an error if we are `LocalNode`. #[inline] pub fn or_connect(self, addr: Multiaddr, handler: THandler) - -> Result, Self> + -> Result, Self> { self.or_connect_with(move |_| addr, handler) } @@ -1367,9 +1418,9 @@ where /// Returns an error if we are `LocalNode`. #[inline] pub fn or_connect_with(self, addr: TFn, handler: THandler) - -> Result, Self> + -> Result, Self> where - TFn: FnOnce(&PeerId) -> Multiaddr, + TFn: FnOnce(&TPeerId) -> Multiaddr, { match self { Peer::Connected(peer) => Ok(PeerPotentialConnect::Connected(peer)), @@ -1384,21 +1435,22 @@ where } /// Peer we are potentially going to connect to. -pub enum PeerPotentialConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> +pub enum PeerPotentialConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a> where TTrans: Transport { /// We are connected to this peer. - Connected(PeerConnected<'a, TInEvent>), + Connected(PeerConnected<'a, TInEvent, TPeerId>), /// We are currently attempting to connect to this peer. - PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr>), + PendingConnect(PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId>), } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> - PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + PeerPotentialConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where - TTrans: Transport + TTrans: Transport, + TPeerId: Eq + Hash + Clone, { /// Closes the connection or the connection attempt. // TODO: consider returning a `PeerNotConnected` @@ -1412,7 +1464,7 @@ where /// If we are connected, returns the `PeerConnected`. #[inline] - pub fn as_connected(self) -> Option> { + pub fn as_connected(self) -> Option> { match self { PeerPotentialConnect::Connected(peer) => Some(peer), _ => None, @@ -1421,7 +1473,7 @@ where /// If a connection is pending, returns the `PeerPendingConnect`. #[inline] - pub fn as_pending_connect(self) -> Option> { + pub fn as_pending_connect(self) -> Option> { match self { PeerPotentialConnect::PendingConnect(peer) => Some(peer), _ => None, @@ -1430,14 +1482,17 @@ where } /// Access to a peer we are connected to. -pub struct PeerConnected<'a, TInEvent: 'a> { - peer: CollecPeerMut<'a, TInEvent>, +pub struct PeerConnected<'a, TInEvent: 'a, TPeerId: 'a> { + peer: CollecPeerMut<'a, TInEvent, TPeerId>, /// Reference to the `connected_points` field of the parent. - connected_points: &'a mut FnvHashMap, - peer_id: PeerId, + connected_points: &'a mut FnvHashMap, + peer_id: TPeerId, } -impl<'a, TInEvent> PeerConnected<'a, TInEvent> { +impl<'a, TInEvent, TPeerId> PeerConnected<'a, TInEvent, TPeerId> +where + TPeerId: Eq + Hash, +{ /// Closes the connection to this node. /// /// No `NodeClosed` message will be generated for this node. @@ -1467,17 +1522,19 @@ impl<'a, TInEvent> PeerConnected<'a, TInEvent> { /// Access to a peer we are attempting to connect to. #[derive(Debug)] -pub struct PeerPendingConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> +pub struct PeerPendingConnect<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a> where TTrans: Transport { - attempt: OccupiedEntry<'a, PeerId, OutReachAttempt>, - active_nodes: &'a mut CollectionStream, THandlerErr>, + attempt: OccupiedEntry<'a, TPeerId, OutReachAttempt>, + active_nodes: &'a mut CollectionStream, THandlerErr, TPeerId>, } -impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where - TTrans: Transport + TTrans: Transport, + TPeerId: Eq + Hash + Clone, { /// Interrupt this connection attempt. // TODO: consider returning a PeerNotConnected; however that is really pain in terms of @@ -1520,25 +1577,37 @@ where } /// Access to a peer we're not connected to. -#[derive(Debug)] -pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a> +pub struct PeerNotConnected<'a, TTrans: 'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, THandlerErr: 'a, TPeerId: 'a> +where + TTrans: Transport, +{ + peer_id: TPeerId, + nodes: &'a mut RawSwarm, +} + +impl<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> fmt::Debug for + PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where TTrans: Transport, + TPeerId: fmt::Debug, { - peer_id: PeerId, - nodes: &'a mut RawSwarm, + fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> { + f.debug_struct("PeerNotConnected") + .field("peer_id", &self.peer_id) + .finish() + } } -impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr> - PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> +impl<'a, TTrans, TInEvent, TOutEvent, TMuxer, THandler, THandlerErr, TPeerId> + PeerNotConnected<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> where - TTrans: Transport + Clone, + TTrans: Transport + Clone, TTrans::Error: Send + 'static, TTrans::Dial: Send + 'static, TMuxer: StreamMuxer + Send + Sync + 'static, TMuxer::OutboundSubstream: Send, TMuxer::Substream: Send, - THandler: IntoNodeHandler + Send + 'static, + THandler: IntoNodeHandler + Send + 'static, THandler::Handler: NodeHandler, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static, ::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be necessary THandlerErr: error::Error + Send + 'static, @@ -1551,7 +1620,9 @@ where /// the whole connection is immediately closed. #[inline] pub fn connect(self, addr: Multiaddr, handler: THandler) - -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> + -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + where + TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, { self.connect_inner(handler, addr, Vec::new()) } @@ -1566,9 +1637,10 @@ where /// the whole connection is immediately closed. #[inline] pub fn connect_iter(self, addrs: TIter, handler: THandler) - -> Result, Self> + -> Result, Self> where TIter: IntoIterator, + TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, { let mut addrs = addrs.into_iter(); let first = match addrs.next() { @@ -1581,7 +1653,9 @@ where /// Inner implementation of `connect`. fn connect_inner(self, handler: THandler, first: Multiaddr, rest: Vec) - -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr> + -> PeerPendingConnect<'a, TTrans, TInEvent, TOutEvent, THandler, THandlerErr, TPeerId> + where + TPeerId: fmt::Debug + Eq + Hash + Clone + AsRef<[u8]> + Send + 'static, { self.nodes.start_dial_out(self.peer_id.clone(), handler, first, rest); PeerPendingConnect { diff --git a/core/src/peer_id.rs b/core/src/peer_id.rs index b7283cd1f4e..c4920c5eecd 100644 --- a/core/src/peer_id.rs +++ b/core/src/peer_id.rs @@ -155,6 +155,13 @@ impl AsRef for PeerId { } } +impl AsRef<[u8]> for PeerId { + #[inline] + fn as_ref(&self) -> &[u8] { + self.as_bytes() + } +} + impl Into for PeerId { #[inline] fn into(self) -> multihash::Multihash { diff --git a/protocols/kad/src/behaviour.rs b/protocols/kad/src/behaviour.rs index 6d4771445f7..1f84f6fd25f 100644 --- a/protocols/kad/src/behaviour.rs +++ b/protocols/kad/src/behaviour.rs @@ -198,7 +198,7 @@ impl Kademlia { .map(|peer_id| build_kad_peer(peer_id, topology, &self.connected_peers)) .collect(); - let local_node_is_providing = self.providing_keys.iter().any(|k| k.as_ref() == &key); + let local_node_is_providing = self.providing_keys.iter().any(|k| k == &key); let provider_peers = topology .get_providers(&key) @@ -258,7 +258,7 @@ impl Kademlia { /// There doesn't exist any "remove provider" message to broadcast on the network, therefore we /// will still be registered as a provider in the DHT for as long as the timeout doesn't expire. pub fn remove_providing(&mut self, key: &Multihash) { - if let Some(position) = self.providing_keys.iter().position(|k| k.as_ref() == key) { + if let Some(position) = self.providing_keys.iter().position(|k| k == key) { self.providing_keys.remove(position); } } @@ -392,7 +392,7 @@ where Ok(Async::NotReady) => {}, Ok(Async::Ready(Some(_))) => { for provided in self.providing_keys.clone().into_iter() { - let purpose = QueryPurpose::AddProvider(provided.as_ref().clone()); + let purpose = QueryPurpose::AddProvider(provided.clone().into()); self.start_query(QueryTarget::FindPeer(provided), purpose); } },