Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make the structs in nodes generic over PeerId #881

Merged
merged 2 commits into from
Jan 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 60 additions & 38 deletions core/src/nodes/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,27 @@ use crate::{
};
use fnv::FnvHashMap;
use futures::prelude::*;
use std::{collections::hash_map::Entry, error, fmt, mem};
use std::{collections::hash_map::Entry, error, fmt, hash::Hash, mem};

mod tests;

// TODO: make generic over PeerId

/// Implementation of `Stream` that handles a collection of nodes.
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
pub struct CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId = PeerId> {
/// Object that handles the tasks.
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>,
inner: HandledNodesTasks<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>,
/// List of nodes, with the task id that handles this node. The corresponding entry in `tasks`
/// must always be in the `Connected` state.
nodes: FnvHashMap<PeerId, TaskId>,
nodes: FnvHashMap<TPeerId, TaskId>,
/// List of tasks and their state. If `Connected`, then a corresponding entry must be present
/// in `nodes`.
tasks: FnvHashMap<TaskId, TaskState>,
tasks: FnvHashMap<TaskId, TaskState<TPeerId>>,
}

impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
where
TPeerId: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
let mut list = f.debug_list();
for (id, task) in &self.tasks {
Expand All @@ -67,34 +69,34 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for Colle

/// State of a task.
#[derive(Debug, Clone, PartialEq, Eq)]
enum TaskState {
enum TaskState<TPeerId> {
/// Task is attempting to reach a peer.
Pending,
/// The task is connected to a peer.
Connected(PeerId),
Connected(TPeerId),
}

/// Event that can happen on the `CollectionStream`.
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr> {
pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr, TPeerId> {
/// A connection to a node has succeeded. You must use the provided event in order to accept
/// the connection.
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>),
NodeReached(CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>),

/// A connection to a node has been closed.
///
/// This happens once both the inbound and outbound channels are closed, and no more outbound
/// substream attempt is pending.
NodeClosed {
/// Identifier of the node.
peer_id: PeerId,
peer_id: TPeerId,
},

/// A connection to a node has errored.
///
/// Can only happen after a node has been successfully reached.
NodeError {
/// Identifier of the node.
peer_id: PeerId,
peer_id: TPeerId,
/// The error that happened.
error: HandledNodeError<THandlerErr>,
},
Expand All @@ -112,16 +114,18 @@ pub enum CollectionEvent<'a, TInEvent:'a , TOutEvent: 'a, THandler: 'a, TReachEr
/// A node has produced an event.
NodeEvent {
/// Identifier of the node.
peer_id: PeerId,
peer_id: TPeerId,
/// The produced event.
event: TOutEvent,
},
}

impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
CollectionEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
where TOutEvent: fmt::Debug,
TReachErr: fmt::Debug,
THandlerErr: fmt::Debug,
TPeerId: Eq + Hash + Clone + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
match *self {
Expand Down Expand Up @@ -159,19 +163,23 @@ where TOutEvent: fmt::Debug,

/// Event that happens when we reach a node.
#[must_use = "The node reached event is used to accept the newly-opened connection"]
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr: 'a> {
pub struct CollectionReachEvent<'a, TInEvent: 'a, TOutEvent: 'a, THandler: 'a, TReachErr, THandlerErr: 'a, TPeerId: 'a = PeerId> {
/// Peer id we connected to.
peer_id: PeerId,
peer_id: TPeerId,
/// The task id that reached the node.
id: TaskId,
/// The `CollectionStream` we are referencing.
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>,
parent: &'a mut CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>,
}

impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
where
TPeerId: Eq + Hash + Clone,
{
/// Returns the peer id of the node that has been reached.
#[inline]
pub fn peer_id(&self) -> &PeerId {
pub fn peer_id(&self) -> &TPeerId {
&self.peer_id
}

Expand All @@ -189,11 +197,11 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE
}

/// Accepts the new node.
pub fn accept(self) -> (CollectionNodeAccept, PeerId) {
pub fn accept(self) -> (CollectionNodeAccept, TPeerId) {
// Set the state of the task to `Connected`.
let former_task_id = self.parent.nodes.insert(self.peer_id.clone(), self.id);
let _former_state = self.parent.tasks.insert(self.id, TaskState::Connected(self.peer_id.clone()));
debug_assert_eq!(_former_state, Some(TaskState::Pending));
debug_assert!(_former_state == Some(TaskState::Pending));
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the change from debug_assert_eq to debug_assert here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise we have to require Debug on the TPeerId.

Copy link

@senden9 senden9 Jan 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this not required here? https://github.com/libp2p/rust-libp2p/pull/881/files#diff-87276c9dd75246456f7330545b24dd12R128

       TPeerId: Eq + Hash + Clone + fmt::Debug,

Edit: Oh, I understand. Impl <…> fmt::Debug for … TPeerId: … + fmt::Debug


// It is possible that we already have a task connected to the same peer. In this
// case, we need to emit a `NodeReplaced` event.
Expand All @@ -204,7 +212,7 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE
self.nodes are valid tasks in the HandledNodesTasks; QED")
.close();
let _former_other_state = self.parent.tasks.remove(&former_task_id);
debug_assert_eq!(_former_other_state, Some(TaskState::Connected(self.peer_id.clone())));
debug_assert!(_former_other_state == Some(TaskState::Connected(self.peer_id.clone())));

// TODO: we unfortunately have to clone the peer id here
(CollectionNodeAccept::ReplacedExisting, self.peer_id.clone())
Expand All @@ -223,15 +231,19 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionReachE
///
/// Has the same effect as dropping the event without accepting it.
#[inline]
pub fn deny(self) -> PeerId {
pub fn deny(self) -> TPeerId {
// TODO: we unfortunately have to clone the id here, in order to be explicit
let peer_id = self.peer_id.clone();
drop(self); // Just to be explicit
peer_id
}
}

impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> fmt::Debug for
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
where
TPeerId: Eq + Hash + Clone + fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
f.debug_struct("CollectionReachEvent")
.field("peer_id", &self.peer_id)
Expand All @@ -240,7 +252,9 @@ impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> fmt::Debug for C
}
}

impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> Drop for CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
impl<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId> Drop for
CollectionReachEvent<'a, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
{
fn drop(&mut self) {
let task_state = self.parent.tasks.remove(&self.id);
debug_assert!(if let Some(TaskState::Pending) = task_state { true } else { false });
Expand All @@ -266,7 +280,11 @@ pub enum CollectionNodeAccept {
#[derive(Debug, Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ReachAttemptId(TaskId);

impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> {
impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
CollectionStream<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>
where
TPeerId: Eq + Hash + Clone,
{
/// Creates a new empty collection.
#[inline]
pub fn new() -> Self {
Expand All @@ -284,8 +302,8 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
-> ReachAttemptId
where
TFut: Future<Item = (PeerId, TMuxer), Error = TReachErr> + Send + 'static,
THandler: IntoNodeHandler + Send + 'static,
TFut: Future<Item = (TPeerId, TMuxer), Error = TReachErr> + Send + 'static,
THandler: IntoNodeHandler<TPeerId> + Send + 'static,
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static, // TODO: shouldn't be required?
TReachErr: error::Error + Send + 'static,
Expand All @@ -294,6 +312,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
TOutEvent: Send + 'static,
TMuxer: StreamMuxer + Send + Sync + 'static, // TODO: Send + Sync + 'static shouldn't be required
TMuxer::OutboundSubstream: Send + 'static, // TODO: shouldn't be required
TPeerId: Send + 'static,
{
let id = self.inner.add_reach_attempt(future, handler);
self.tasks.insert(id, TaskState::Pending);
Expand Down Expand Up @@ -338,7 +357,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
///
/// Returns `None` if we don't have a connection to this peer.
#[inline]
pub fn peer_mut(&mut self, id: &PeerId) -> Option<PeerMut<TInEvent>> {
pub fn peer_mut(&mut self, id: &TPeerId) -> Option<PeerMut<TInEvent, TPeerId>> {
let task = match self.nodes.get(id) {
Some(&task) => task,
None => return None,
Expand All @@ -358,15 +377,15 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
///
/// This will return true only after a `NodeReached` event has been produced by `poll()`.
#[inline]
pub fn has_connection(&self, id: &PeerId) -> bool {
pub fn has_connection(&self, id: &TPeerId) -> bool {
self.nodes.contains_key(id)
}

/// Returns a list of all the active connections.
///
/// Does not include reach attempts that haven't reached any target yet.
#[inline]
pub fn connections(&self) -> impl Iterator<Item = &PeerId> {
pub fn connections(&self) -> impl Iterator<Item = &TPeerId> {
self.nodes.keys()
}

Expand All @@ -375,7 +394,7 @@ impl<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr> CollectionStream<TIn
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
/// > borrowed if necessary.
pub fn poll(&mut self) -> Async<CollectionEvent<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr>> {
pub fn poll(&mut self) -> Async<CollectionEvent<TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TPeerId>> {
let item = match self.inner.poll() {
Async::Ready(item) => item,
Async::NotReady => return Async::NotReady,
Expand Down Expand Up @@ -487,13 +506,16 @@ impl fmt::Display for InterruptError {
impl error::Error for InterruptError {}

/// Access to a peer in the collection.
pub struct PeerMut<'a, TInEvent: 'a> {
pub struct PeerMut<'a, TInEvent: 'a, TPeerId: 'a = PeerId> {
inner: HandledNodesTask<'a, TInEvent>,
tasks: &'a mut FnvHashMap<TaskId, TaskState>,
nodes: &'a mut FnvHashMap<PeerId, TaskId>,
tasks: &'a mut FnvHashMap<TaskId, TaskState<TPeerId>>,
nodes: &'a mut FnvHashMap<TPeerId, TaskId>,
}

impl<'a, TInEvent> PeerMut<'a, TInEvent> {
impl<'a, TInEvent, TPeerId> PeerMut<'a, TInEvent, TPeerId>
where
TPeerId: Eq + Hash,
{
/// Sends an event to the given node.
#[inline]
pub fn send_event(&mut self, event: TInEvent) {
Expand Down
Loading