Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Full support for multiple connections per peer in libp2p-swarm. #1519

Merged
merged 14 commits into from
Mar 31, 2020
8 changes: 4 additions & 4 deletions core/src/connection/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ where
TPeerId: Clone + Send + 'static,
{
let endpoint = info.to_connected_point();
if let Some(limit) = self.limits.max_pending_incoming {
if let Some(limit) = self.limits.max_incoming {
let current = self.iter_pending_incoming().count();
if current >= limit {
return Err(ConnectionLimit { limit, current })
Expand Down Expand Up @@ -845,8 +845,8 @@ where
/// The configurable limits of a connection [`Pool`].
#[derive(Debug, Clone, Default)]
pub struct PoolLimits {
pub max_pending_outgoing: Option<usize>,
pub max_pending_incoming: Option<usize>,
pub max_outgoing: Option<usize>,
pub max_incoming: Option<usize>,
pub max_established_per_peer: Option<usize>,
}

Expand All @@ -862,7 +862,7 @@ impl PoolLimits {
where
F: FnOnce() -> usize
{
Self::check(current, self.max_pending_outgoing)
Self::check(current, self.max_outgoing)
}

fn check<F>(current: F, limit: Option<usize>) -> Result<(), ConnectionLimit>
Expand Down
25 changes: 19 additions & 6 deletions core/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ where
/// Gets the configured limit on pending incoming connections,
/// i.e. concurrent incoming connection attempts.
pub fn incoming_limit(&self) -> Option<usize> {
self.pool.limits().max_pending_incoming
self.pool.limits().max_incoming
}

/// The total number of established connections in the `Network`.
Expand Down Expand Up @@ -391,8 +391,9 @@ where
}
event
}
Poll::Ready(PoolEvent::ConnectionError { connected, error, num_established, .. }) => {
Poll::Ready(PoolEvent::ConnectionError { id, connected, error, num_established, .. }) => {
NetworkEvent::ConnectionError {
id,
connected,
error,
num_established,
Expand Down Expand Up @@ -621,17 +622,29 @@ impl NetworkConfig {
self
}

/// Shortcut for calling `executor` with an object that calls the given closure.
pub fn set_executor_fn(mut self, f: impl Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + 'static) -> Self {
struct SpawnImpl<F>(F);
impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for SpawnImpl<F> {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
(self.0)(f)
}
}
self.set_executor(Box::new(SpawnImpl(f)));
self
}

pub fn executor(&self) -> Option<&Box<dyn Executor + Send>> {
self.executor.as_ref()
}

pub fn set_pending_incoming_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_pending_incoming = Some(n);
pub fn set_incoming_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_incoming = Some(n);
self
}

pub fn set_pending_outgoing_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_pending_outgoing = Some(n);
pub fn set_outgoing_limit(&mut self, n: usize) -> &mut Self {
self.pool_limits.max_outgoing = Some(n);
self
}

Expand Down
2 changes: 2 additions & 0 deletions core/src/network/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ where
///
/// The connection is closed as a result of the error.
ConnectionError {
/// The ID of the connection that encountered an error.
id: ConnectionId,
/// Information about the connection that encountered the error.
connected: Connected<TConnInfo>,
/// The error that occurred.
Expand Down
70 changes: 43 additions & 27 deletions misc/core-derive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,44 +131,52 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {

// Build the list of statements to put in the body of `inject_connected()`.
let inject_connected_stmts = {
let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count();
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}

Some(if field_n == num_fields - 1 {
match field.ident {
Some(ref i) => quote!{ self.#i.inject_connected(peer_id, endpoint); },
None => quote!{ self.#field_n.inject_connected(peer_id, endpoint); },
}
} else {
match field.ident {
Some(ref i) => quote!{ self.#i.inject_connected(peer_id.clone(), endpoint.clone()); },
None => quote!{ self.#field_n.inject_connected(peer_id.clone(), endpoint.clone()); },
}
Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_connected(peer_id); },
None => quote!{ self.#field_n.inject_connected(peer_id); },
})
})
};

// Build the list of statements to put in the body of `inject_disconnected()`.
let inject_disconnected_stmts = {
let num_fields = data_struct.fields.iter().filter(|f| !is_ignored(f)).count();
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}
Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id); },
None => quote!{ self.#field_n.inject_disconnected(peer_id); },
})
})
};

Some(if field_n == num_fields - 1 {
match field.ident {
Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id, endpoint); },
None => quote!{ self.#field_n.inject_disconnected(peer_id, endpoint); },
}
} else {
match field.ident {
Some(ref i) => quote!{ self.#i.inject_disconnected(peer_id, endpoint.clone()); },
None => quote!{ self.#field_n.inject_disconnected(peer_id, endpoint.clone()); },
}
// Build the list of statements to put in the body of `inject_connection_established()`.
let inject_connection_established_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}
Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_connection_established(peer_id, connection_id, endpoint); },
None => quote!{ self.#field_n.inject_connection_established(peer_id, connection_id, endpoint); },
})
})
};

// Build the list of statements to put in the body of `inject_connection_closed()`.
let inject_connection_closed_stmts = {
data_struct.fields.iter().enumerate().filter_map(move |(field_n, field)| {
if is_ignored(&field) {
return None;
}
Some(match field.ident {
Some(ref i) => quote!{ self.#i.inject_connection_closed(peer_id, connection_id, endpoint); },
None => quote!{ self.#field_n.inject_connection_closed(peer_id, connection_id, endpoint); },
})
})
};
Expand Down Expand Up @@ -383,8 +391,8 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
std::task::Poll::Ready(#network_behaviour_action::DialAddress { address }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialAddress { address });
}
std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id });
std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition }) => {
return std::task::Poll::Ready(#network_behaviour_action::DialPeer { peer_id, condition });
}
std::task::Poll::Ready(#network_behaviour_action::NotifyHandler { peer_id, handler, event }) => {
return std::task::Poll::Ready(#network_behaviour_action::NotifyHandler {
Expand Down Expand Up @@ -421,14 +429,22 @@ fn build_struct(ast: &DeriveInput, data_struct: &DataStruct) -> TokenStream {
out
}

fn inject_connected(&mut self, peer_id: #peer_id, endpoint: #connected_point) {
fn inject_connected(&mut self, peer_id: &#peer_id) {
#(#inject_connected_stmts);*
}

fn inject_disconnected(&mut self, peer_id: &#peer_id, endpoint: #connected_point) {
fn inject_disconnected(&mut self, peer_id: &#peer_id) {
#(#inject_disconnected_stmts);*
}

fn inject_connection_established(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point) {
#(#inject_connection_established_stmts);*
}

fn inject_connection_closed(&mut self, peer_id: &#peer_id, connection_id: &#connection_id, endpoint: &#connected_point) {
#(#inject_connection_closed_stmts);*
}

fn inject_addr_reach_failure(&mut self, peer_id: Option<&#peer_id>, addr: &#multiaddr, error: &dyn std::error::Error) {
#(#inject_addr_reach_failure_stmts);*
}
Expand Down
20 changes: 13 additions & 7 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ use crate::protocol::{FloodsubConfig, FloodsubMessage, FloodsubRpc, FloodsubSubs
use crate::topic::Topic;
use cuckoofilter::CuckooFilter;
use fnv::FnvHashSet;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
NetworkBehaviour,
NetworkBehaviourAction,
PollParameters,
ProtocolsHandler,
OneShotHandler,
NotifyHandler
NotifyHandler,
DialPeerCondition,
};
use rand;
use smallvec::SmallVec;
Expand Down Expand Up @@ -96,7 +97,9 @@ impl Floodsub {
}

if self.target_peers.insert(peer_id.clone()) {
self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id });
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id, condition: DialPeerCondition::Disconnected
});
}
}

Expand Down Expand Up @@ -236,9 +239,9 @@ impl NetworkBehaviour for Floodsub {
Vec::new()
}

fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
fn inject_connected(&mut self, id: &PeerId) {
// We need to send our subscriptions to the newly-connected node.
if self.target_peers.contains(&id) {
if self.target_peers.contains(id) {
for topic in self.subscribed_topics.iter().cloned() {
self.events.push_back(NetworkBehaviourAction::NotifyHandler {
peer_id: id.clone(),
Expand All @@ -257,14 +260,17 @@ impl NetworkBehaviour for Floodsub {
self.connected_peers.insert(id.clone(), SmallVec::new());
}

fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) {
fn inject_disconnected(&mut self, id: &PeerId) {
let was_in = self.connected_peers.remove(id);
debug_assert!(was_in.is_some());

// We can be disconnected by the remote in case of inactivity for example, so we always
// try to reconnect.
if self.target_peers.contains(id) {
self.events.push_back(NetworkBehaviourAction::DialPeer { peer_id: id.clone() });
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: id.clone(),
condition: DialPeerCondition::Disconnected
});
}
}

Expand Down
10 changes: 5 additions & 5 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::protocol::{
};
use crate::topic::{Topic, TopicHash};
use futures::prelude::*;
use libp2p_core::{ConnectedPoint, Multiaddr, PeerId, connection::ConnectionId};
use libp2p_core::{Multiaddr, PeerId, connection::ConnectionId};
use libp2p_swarm::{
NetworkBehaviour,
NetworkBehaviourAction,
Expand Down Expand Up @@ -1012,7 +1012,7 @@ impl NetworkBehaviour for Gossipsub {
Vec::new()
}

fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
fn inject_connected(&mut self, id: &PeerId) {
info!("New peer connected: {:?}", id);
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
Expand Down Expand Up @@ -1040,7 +1040,7 @@ impl NetworkBehaviour for Gossipsub {
self.peer_topics.insert(id.clone(), Vec::new());
}

fn inject_disconnected(&mut self, id: &PeerId, _: ConnectedPoint) {
fn inject_disconnected(&mut self, id: &PeerId) {
// remove from mesh, topic_peers, peer_topic and fanout
debug!("Peer disconnected: {:?}", id);
{
Expand Down Expand Up @@ -1164,8 +1164,8 @@ impl NetworkBehaviour for Gossipsub {
NetworkBehaviourAction::DialAddress { address } => {
return Poll::Ready(NetworkBehaviourAction::DialAddress { address });
}
NetworkBehaviourAction::DialPeer { peer_id } => {
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id });
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition });
}
NetworkBehaviourAction::ReportObservedAddr { address } => {
return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { address });
Expand Down
6 changes: 1 addition & 5 deletions protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,13 @@ mod tests {

// build and connect peer_no random peers
let mut peers = vec![];
let dummy_connected_point = ConnectedPoint::Dialer {
address: "/ip4/0.0.0.0/tcp/0".parse().unwrap(),
};

for _ in 0..peer_no {
let peer = PeerId::random();
peers.push(peer.clone());
<Gossipsub as NetworkBehaviour>::inject_connected(
&mut gs,
peer.clone(),
dummy_connected_point.clone(),
&peer,
);
if to_subscribe {
gs.handle_received_subscriptions(
Expand Down
39 changes: 27 additions & 12 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ use libp2p_swarm::{
ProtocolsHandler,
ProtocolsHandlerUpgrErr
};
use std::{collections::HashMap, collections::VecDeque, io, pin::Pin, task::Context, task::Poll};
use std::{
collections::{HashMap, VecDeque},
io,
pin::Pin,
task::Context,
task::Poll
};

/// Network behaviour that automatically identifies nodes periodically, returns information
/// about them, and answers identify queries from other nodes.
Expand All @@ -49,7 +55,7 @@ pub struct Identify {
/// The public key of the local node. To report on the wire.
local_public_key: PublicKey,
/// For each peer we're connected to, the observed address to send back to it.
observed_addresses: HashMap<PeerId, Multiaddr>,
observed_addresses: HashMap<PeerId, HashMap<ConnectionId, Multiaddr>>,
/// Pending replies to send.
pending_replies: VecDeque<Reply>,
/// Pending events to be emitted when polled.
Expand Down Expand Up @@ -97,23 +103,32 @@ impl NetworkBehaviour for Identify {
Vec::new()
}

fn inject_connected(&mut self, peer_id: PeerId, endpoint: ConnectedPoint) {
let observed = match endpoint {
ConnectedPoint::Dialer { address } => address,
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr,
fn inject_connected(&mut self, _: &PeerId) {
}

fn inject_connection_established(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
let addr = match endpoint {
ConnectedPoint::Dialer { address } => address.clone(),
ConnectedPoint::Listener { send_back_addr, .. } => send_back_addr.clone(),
};

self.observed_addresses.insert(peer_id, observed);
self.observed_addresses.entry(peer_id.clone()).or_default().insert(*conn, addr);
}

fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
if let Some(addrs) = self.observed_addresses.get_mut(peer_id) {
addrs.remove(conn);
}
}

fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) {
fn inject_disconnected(&mut self, peer_id: &PeerId) {
self.observed_addresses.remove(peer_id);
}

fn inject_event(
&mut self,
peer_id: PeerId,
_connection: ConnectionId,
connection: ConnectionId,
event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent,
) {
match event {
Expand All @@ -132,9 +147,9 @@ impl NetworkBehaviour for Identify {
}
IdentifyHandlerEvent::Identify(sender) => {
let observed = self.observed_addresses.get(&peer_id)
.expect("We only receive events from nodes we're connected to. We insert \
into the hashmap when we connect to a node and remove only when we \
disconnect; QED");
.and_then(|addrs| addrs.get(&connection))
.expect("`inject_event` is only called with an established connection \
and `inject_connection_established` ensures there is an entry; qed");
self.pending_replies.push_back(
Reply::Queued {
peer: peer_id,
Expand Down
Loading