Skip to content

Commit

Permalink
Implements (partially) libp2p#889 for Gossipsub.
Browse files Browse the repository at this point in the history
  • Loading branch information
AgeManning committed Jan 29, 2019
1 parent 6df8e5f commit dea3929
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
13 changes: 4 additions & 9 deletions protocols/gossipsub/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ fn main() {

// Create a random PeerId
let local_key = secio::SecioKeyPair::ed25519_generated().unwrap();
let local_pub_key = local_key.to_public_key();
println!("Local peer id: {:?}", local_pub_key.clone().into_peer_id());
let local_peer_id = local_key.to_peer_id();
println!("Local peer id: {:?}", local_peer_id);

// Set up an encrypted TCP Transport over the Mplex and Yamux protocols
let transport = libp2p::build_development_transport(local_key);
Expand All @@ -44,14 +44,9 @@ fn main() {
Duration::from_secs(60),
);
// build a gossipsub network behaviour
let mut gossipsub =
gossipsub::Gossipsub::new(local_pub_key.clone().into_peer_id(), gossipsub_config);
let mut gossipsub = gossipsub::Gossipsub::new(local_peer_id.clone(), gossipsub_config);
gossipsub.subscribe(topic.clone());
libp2p::Swarm::new(
transport,
gossipsub,
libp2p::core::topology::MemoryTopology::empty(local_pub_key),
)
libp2p::Swarm::new(transport, gossipsub, local_peer_id)
};

// Listen on all interfaces and whatever port the OS assigns
Expand Down
16 changes: 11 additions & 5 deletions protocols/gossipsub/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use libp2p_core::swarm::{
};
use libp2p_core::{
protocols_handler::{OneShotHandler, ProtocolsHandler},
PeerId,
Multiaddr, PeerId,
};
use libp2p_floodsub::{Topic, TopicHash};
use mcache::MessageCache;
Expand Down Expand Up @@ -999,7 +999,7 @@ impl<TSubstream> Gossipsub<TSubstream> {
}
}

impl<TSubstream, TTopology> NetworkBehaviour<TTopology> for Gossipsub<TSubstream>
impl<TSubstream> NetworkBehaviour for Gossipsub<TSubstream>
where
TSubstream: AsyncRead + AsyncWrite,
{
Expand All @@ -1010,6 +1010,10 @@ where
Default::default()
}

fn addresses_of_peer(&self, _: &PeerId) -> Vec<Multiaddr> {
Vec::new()
}

fn inject_connected(&mut self, id: PeerId, _: ConnectedPoint) {
// We need to send our subscriptions to the newly-connected node.
let mut subscriptions = vec![];
Expand Down Expand Up @@ -1104,6 +1108,8 @@ where
// remove peer from peer_topics
let was_in = self.peer_topics.remove(id);
debug_assert!(was_in.is_some());

//TODO: Reconnect due to inactivity
}

fn inject_node_event(&mut self, propagation_source: PeerId, event: InnerMessage) {
Expand Down Expand Up @@ -1155,7 +1161,7 @@ where

fn poll(
&mut self,
_: &mut PollParameters<TTopology>,
_: &mut PollParameters,
) -> Async<
NetworkBehaviourAction<
<Self::ProtocolsHandler as ProtocolsHandler>::InEvent,
Expand Down Expand Up @@ -1234,7 +1240,6 @@ pub enum NodeType {
#[cfg(test)]
mod tests {
use super::*;
use libp2p_core::topology::MemoryTopology;
use libp2p_floodsub::TopicBuilder;

// helper functions for testing
Expand Down Expand Up @@ -1273,7 +1278,8 @@ mod tests {
for _ in 0..peer_no {
let peer = PeerId::random();
peers.push(peer.clone());
<Gossipsub<tokio::net::TcpStream> as NetworkBehaviour<MemoryTopology>>::inject_connected(&mut gs,
<Gossipsub<tokio::net::TcpStream> as NetworkBehaviour>::inject_connected(
&mut gs,
peer.clone(),
dummy_connected_point.clone(),
);
Expand Down

0 comments on commit dea3929

Please sign in to comment.