Skip to content

Commit

Permalink
protocols/*: Update
Browse files Browse the repository at this point in the history
  • Loading branch information
mxinden committed Aug 19, 2021
1 parent 7d9285f commit 682f6be
Show file tree
Hide file tree
Showing 12 changed files with 69 additions and 90 deletions.
17 changes: 10 additions & 7 deletions protocols/floodsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ use std::{collections::VecDeque, iter};
/// Network behaviour that handles the floodsub protocol.
pub struct Floodsub {
/// Events that need to be yielded to the outside when polling.
events: VecDeque<NetworkBehaviourAction<FloodsubRpc, FloodsubEvent>>,
events: VecDeque<
NetworkBehaviourAction<
FloodsubEvent,
OneShotHandler<FloodsubProtocol, FloodsubRpc, InnerMessage>,
>,
>,

config: FloodsubConfig,

Expand Down Expand Up @@ -104,6 +109,7 @@ impl Floodsub {
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler: self.new_handler(),
});
}
}
Expand Down Expand Up @@ -302,9 +308,11 @@ impl NetworkBehaviour for Floodsub {
// We can be disconnected by the remote in case of inactivity for example, so we always
// try to reconnect.
if self.target_peers.contains(id) {
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: *id,
condition: DialPeerCondition::Disconnected,
handler,
});
}
}
Expand Down Expand Up @@ -426,12 +434,7 @@ impl NetworkBehaviour for Floodsub {
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Expand Down
63 changes: 17 additions & 46 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ use libp2p_core::{
multiaddr::Protocol::Ip6, ConnectedPoint, Multiaddr, PeerId,
};
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
ProtocolsHandler,
DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
};

use crate::backoff::BackoffStorage;
Expand Down Expand Up @@ -193,7 +193,7 @@ impl From<MessageAuthenticity> for PublishConfig {
}

type GossipsubNetworkBehaviourAction =
NetworkBehaviourAction<Arc<GossipsubHandlerIn>, GossipsubEvent>;
NetworkBehaviourAction<GossipsubEvent, GossipsubHandler, Arc<GossipsubHandlerIn>>;

/// Network behaviour that handles the gossipsub protocol.
///
Expand Down Expand Up @@ -425,8 +425,8 @@ where

impl<D, F> Gossipsub<D, F>
where
D: DataTransform,
F: TopicSubscriptionFilter,
D: DataTransform + Send + 'static,
F: TopicSubscriptionFilter + Send + 'static,
{
/// Lists the hashes of the topics we are currently subscribed to.
pub fn topics(&self) -> impl Iterator<Item = &TopicHash> {
Expand Down Expand Up @@ -1043,9 +1043,11 @@ where
if !self.peer_topics.contains_key(peer_id) {
// Connect to peer
debug!("Connecting to explicit peer {:?}", peer_id);
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id: *peer_id,
condition: DialPeerCondition::Disconnected,
handler,
});
}
}
Expand Down Expand Up @@ -1493,9 +1495,11 @@ where
self.px_peers.insert(peer_id);

// dial peer
let handler = self.new_handler();
self.events.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler,
});
}
}
Expand Down Expand Up @@ -2969,6 +2973,7 @@ where
peer_id: &PeerId,
connection_id: &ConnectionId,
endpoint: &ConnectedPoint,
_: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
// Remove IP from peer scoring system
if let Some((peer_score, ..)) = &mut self.peer_score {
Expand Down Expand Up @@ -3169,47 +3174,13 @@ where
&mut self,
cx: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(match event {
NetworkBehaviourAction::NotifyHandler {
peer_id,
handler,
event: send_event,
} => {
// clone send event reference if others references are present
let event = Arc::try_unwrap(send_event).unwrap_or_else(|e| (*e).clone());
NetworkBehaviourAction::NotifyHandler {
peer_id,
event,
handler,
}
}
NetworkBehaviourAction::GenerateEvent(e) => {
NetworkBehaviourAction::GenerateEvent(e)
}
NetworkBehaviourAction::DialAddress { address } => {
NetworkBehaviourAction::DialAddress { address }
}
NetworkBehaviourAction::DialPeer { peer_id, condition } => {
NetworkBehaviourAction::DialPeer { peer_id, condition }
}
NetworkBehaviourAction::ReportObservedAddr { address, score } => {
NetworkBehaviourAction::ReportObservedAddr { address, score }
}
NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
} => NetworkBehaviourAction::CloseConnection {
peer_id,
connection,
},
});
let event: NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler, Arc<GossipsubHandlerIn>> = event;
return Poll::Ready(event.map_in(|e: Arc<GossipsubHandlerIn>| {
// clone send event reference if others references are present
Arc::try_unwrap(e).unwrap_or_else(|e| (*e).clone())
}));
}

// update scores
Expand Down Expand Up @@ -3396,7 +3367,7 @@ impl<C: DataTransform, F: TopicSubscriptionFilter> fmt::Debug for Gossipsub<C, F
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Gossipsub")
.field("config", &self.config)
.field("events", &self.events)
.field("events", &self.events.len())
.field("control_pool", &self.control_pool)
.field("publish_config", &self.publish_config)
.field("topic_peers", &self.topic_peers)
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/behaviour/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ mod tests {
F: TopicSubscriptionFilter + Clone + Default + Send + 'static,
{
pub fn create_network(self) -> (Gossipsub<D, F>, Vec<PeerId>, Vec<TopicHash>) {
let keypair = libp2p_core::identity::Keypair::generate_secp256k1();
let keypair = libp2p_core::identity::Keypair::generate_ed25519();
// create a gossipsub struct
let mut gs: Gossipsub<D, F> = Gossipsub::new_with_subscription_filter_and_transform(
MessageAuthenticity::Signed(keypair),
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ mod tests {
fn arbitrary<G: Gen>(g: &mut G) -> Self {
let keypair = if g.gen() {
// Small enough to be inlined.
Keypair::generate_secp256k1()
Keypair::generate_ed25519()
} else {
// Too large to be inlined.
let mut rsa_key = hex::decode("308204bd020100300d06092a864886f70d0101010500048204a7308204a30201000282010100ef930f41a71288b643c1cbecbf5f72ab53992249e2b00835bf07390b6745419f3848cbcc5b030faa127bc88cdcda1c1d6f3ff699f0524c15ab9d2c9d8015f5d4bd09881069aad4e9f91b8b0d2964d215cdbbae83ddd31a7622a8228acee07079f6e501aea95508fa26c6122816ef7b00ac526d422bd12aed347c37fff6c1c307f3ba57bb28a7f28609e0bdcc839da4eedca39f5d2fa855ba4b0f9c763e9764937db929a1839054642175312a3de2d3405c9d27bdf6505ef471ce85c5e015eee85bf7874b3d512f715de58d0794fd8afe021c197fbd385bb88a930342fac8da31c27166e2edab00fa55dc1c3814448ba38363077f4e8fe2bdea1c081f85f1aa6f02030100010282010028ff427a1aac1a470e7b4879601a6656193d3857ea79f33db74df61e14730e92bf9ffd78200efb0c40937c3356cbe049cd32e5f15be5c96d5febcaa9bd3484d7fded76a25062d282a3856a1b3b7d2c525cdd8434beae147628e21adf241dd64198d5819f310d033743915ba40ea0b6acdbd0533022ad6daa1ff42de51885f9e8bab2306c6ef1181902d1cd7709006eba1ab0587842b724e0519f295c24f6d848907f772ae9a0953fc931f4af16a07df450fb8bfa94572562437056613647818c238a6ff3f606cffa0533e4b8755da33418dfbc64a85110b1a036623c947400a536bb8df65e5ebe46f2dfd0cfc86e7aeeddd7574c253e8fbf755562b3669525d902818100f9fff30c6677b78dd31ec7a634361438457e80be7a7faf390903067ea8355faa78a1204a82b6e99cb7d9058d23c1ecf6cfe4a900137a00cecc0113fd68c5931602980267ea9a95d182d48ba0a6b4d5dd32fdac685cb2e5d8b42509b2eb59c9579ea6a67ccc7547427e2bd1fb1f23b0ccb4dd6ba7d206c8dd93253d70a451701302818100f5530dfef678d73ce6a401ae47043af10a2e3f224c71ae933035ecd68ccbc4df52d72bc6ca2b17e8faf3e548b483a2506c0369ab80df3b137b54d53fac98f95547c2bc245b416e650ce617e0d29db36066f1335a9ba02ad3e0edf9dc3d58fd835835042663edebce81803972696c789012847cb1f854ab2ac0a1bd3867ac7fb502818029c53010d456105f2bf52a9a8482bca2224a5eac74bf3cc1a4d5d291fafcdffd15a6a6448cce8efdd661f6617ca5fc37c8c885cc3374e109ac6049bcbf72b37eabf44602a2da2d4a1237fd145c863e6d75059976de762d9d258c42b0984e2a2befa01c95217c3ee9c736ff209c355466ff99375194eff943bc402ea1d172a1ed02818027175bf493bbbfb8719c12b47d967bf9eac061c90a5b5711172e9095c38bb8cc493c063abffe4bea110b0a2f22ac9311b3947ba31b7ef6bfecf8209eebd6d86c316a2366bbafda7279b2b47d5bb24b6202254f249205dcad347b574433f6593733b806f84316276c1990a016ce1bbdbe5f650325acc7791aefe515ecc60063bd02818100b6a2077f4adcf15a17092d9c4a346d6022ac48f3861b73cf714f84c440a07419a7ce75a73b9cbff4597c53c128bf81e87b272d70428a272d99f90cd9b9ea1033298e108f919c6477400145a102df3fb5601ffc4588203cf710002517bfa24e6ad32f4d09c6b1a995fa28a3104131bedd9072f3b4fb4a5c2056232643d310453f").unwrap();
Expand Down
9 changes: 2 additions & 7 deletions protocols/identify/src/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ pub struct Identify {
/// Pending replies to send.
pending_replies: VecDeque<Reply>,
/// Pending events to be emitted when polled.
events: VecDeque<NetworkBehaviourAction<IdentifyPush, IdentifyEvent>>,
events: VecDeque<NetworkBehaviourAction<IdentifyEvent, IdentifyHandler>>,
/// Peers to which an active push with current information about
/// the local peer should be sent.
pending_push: HashSet<PeerId>,
Expand Down Expand Up @@ -292,12 +292,7 @@ impl NetworkBehaviour for Identify {
&mut self,
cx: &mut Context<'_>,
params: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
>,
> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(event) = self.events.pop_front() {
return Poll::Ready(event);
}
Expand Down
12 changes: 9 additions & 3 deletions protocols/kad/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ pub struct Kademlia<TStore> {
connection_idle_timeout: Duration,

/// Queued events to return when the behaviour is being polled.
queued_events: VecDeque<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, KademliaEvent>>,
queued_events: VecDeque<NetworkBehaviourAction<KademliaEvent, KademliaHandlerProto<QueryId>>>,

/// The currently known addresses of the local node.
local_addrs: HashSet<Multiaddr>,
Expand Down Expand Up @@ -561,10 +561,12 @@ where
RoutingUpdate::Failed
}
kbucket::InsertResult::Pending { disconnected } => {
let handler = self.new_handler(),
self.queued_events
.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected,
handler,
});
RoutingUpdate::Pending
}
Expand Down Expand Up @@ -1140,10 +1142,12 @@ where
//
// Only try dialing peer if not currently connected.
if !self.connected_peers.contains(disconnected.preimage()) {
let handler = self.new_handler(),
self.queued_events
.push_back(NetworkBehaviourAction::DialPeer {
peer_id: disconnected.into_preimage(),
condition: DialPeerCondition::Disconnected,
handler,
})
}
}
Expand Down Expand Up @@ -1859,7 +1863,7 @@ where
}
}

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler) {
for query in self.queries.iter_mut() {
query.on_failure(peer_id);
}
Expand Down Expand Up @@ -2156,7 +2160,7 @@ where
&mut self,
cx: &mut Context<'_>,
parameters: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<KademliaHandlerIn<QueryId>, Self::OutEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
let now = Instant::now();

// Calculate the available capacity for queries triggered by background jobs.
Expand Down Expand Up @@ -2254,10 +2258,12 @@ where
});
} else if &peer_id != self.kbuckets.local_key().preimage() {
query.inner.pending_rpcs.push((peer_id, event));
let handler = self.new_handler(),
self.queued_events
.push_back(NetworkBehaviourAction::DialPeer {
peer_id,
condition: DialPeerCondition::Disconnected,
handler,
});
}
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -289,8 +289,8 @@ impl NetworkBehaviour for Mdns {
params: &mut impl PollParameters,
) -> Poll<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Self::OutEvent,
DummyProtocolsHandler,
>,
> {
while let Poll::Ready(event) = Pin::new(&mut self.if_watch).poll(cx) {
Expand Down
2 changes: 1 addition & 1 deletion protocols/ping/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ impl NetworkBehaviour for Ping {
&mut self,
_: &mut Context<'_>,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<Void, PingEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if let Some(e) = self.events.pop_back() {
Poll::Ready(NetworkBehaviourAction::GenerateEvent(e))
} else {
Expand Down
13 changes: 9 additions & 4 deletions protocols/relay/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use libp2p_core::connection::{ConnectedPoint, ConnectionId, ListenerId};
use libp2p_core::multiaddr::Multiaddr;
use libp2p_core::PeerId;
use libp2p_swarm::{
DialPeerCondition, NetworkBehaviour, NetworkBehaviourAction, NotifyHandler, PollParameters,
DialPeerCondition, IntoProtocolsHandler, NetworkBehaviour, NetworkBehaviourAction,
NotifyHandler, PollParameters,
};
use std::collections::{hash_map::Entry, HashMap, HashSet, VecDeque};
use std::task::{Context, Poll};
Expand All @@ -45,7 +46,7 @@ pub struct Relay {
/// [`Self::listeners`] or [`Self::listener_any_relay`].
outbox_to_listeners: VecDeque<(PeerId, BehaviourToListenerMsg)>,
/// Events that need to be yielded to the outside when polling.
outbox_to_swarm: VecDeque<NetworkBehaviourAction<RelayHandlerIn, ()>>,
outbox_to_swarm: VecDeque<NetworkBehaviourAction<(), RelayHandlerProto>>,

/// List of peers the network is connected to.
connected_peers: HashMap<PeerId, HashSet<ConnectionId>>,
Expand Down Expand Up @@ -301,7 +302,7 @@ impl NetworkBehaviour for Relay {
}
}

fn inject_dial_failure(&mut self, peer_id: &PeerId) {
fn inject_dial_failure(&mut self, peer_id: &PeerId, _: Self::ProtocolsHandler) {
if let Entry::Occupied(o) = self.listeners.entry(*peer_id) {
if matches!(o.get(), RelayListener::Connecting { .. }) {
// By removing the entry, the channel to the listener is dropped and thus the
Expand Down Expand Up @@ -340,6 +341,7 @@ impl NetworkBehaviour for Relay {
peer: &PeerId,
connection: &ConnectionId,
_: &ConnectedPoint,
_: <Self::ProtocolsHandler as IntoProtocolsHandler>::Handler,
) {
// Remove connection from the set of connections for the given peer. In case the set is
// empty it will be removed in `inject_disconnected`.
Expand Down Expand Up @@ -476,6 +478,7 @@ impl NetworkBehaviour for Relay {
.push_back(NetworkBehaviourAction::DialPeer {
peer_id: dest_id,
condition: DialPeerCondition::NotDialing,
handler: self.new_handler(),
});
} else {
self.outbox_to_swarm
Expand Down Expand Up @@ -562,7 +565,7 @@ impl NetworkBehaviour for Relay {
&mut self,
cx: &mut Context<'_>,
poll_parameters: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<RelayHandlerIn, Self::OutEvent>> {
) -> Poll<NetworkBehaviourAction<Self::OutEvent, Self::ProtocolsHandler>> {
if !self.outbox_to_listeners.is_empty() {
let relay_peer_id = self.outbox_to_listeners[0].0;

Expand Down Expand Up @@ -668,6 +671,7 @@ impl NetworkBehaviour for Relay {
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: relay_peer_id,
condition: DialPeerCondition::Disconnected,
handler: self.new_handler(),
});
}
}
Expand Down Expand Up @@ -734,6 +738,7 @@ impl NetworkBehaviour for Relay {
return Poll::Ready(NetworkBehaviourAction::DialPeer {
peer_id: relay_peer_id,
condition: DialPeerCondition::Disconnected,
handler: self.new_handler(),
});
}
}
Expand Down
4 changes: 2 additions & 2 deletions protocols/relay/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,11 +1147,11 @@ enum CombinedEvent {
}

impl CombinedBehaviour {
fn poll<TEv>(
fn poll(
&mut self,
_: &mut Context,
_: &mut impl PollParameters,
) -> Poll<NetworkBehaviourAction<TEv, CombinedEvent>> {
) -> Poll<NetworkBehaviourAction<CombinedEvent, <Self as NetworkBehaviour>::ProtocolsHandler> {
if !self.events.is_empty() {
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Expand Down
Loading

0 comments on commit 682f6be

Please sign in to comment.