Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Networking Service #49

Merged
merged 38 commits into from
Dec 3, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
25c550b
update gitignore
ec2 Nov 19, 2019
60ff052
basic gossipsub mdns chat
ec2 Nov 19, 2019
facc4ea
broke up into service
ec2 Nov 21, 2019
e9c7346
doesnt work, would appreciate a look
ec2 Nov 23, 2019
eb24916
it compiles! need to handle events now
ec2 Nov 25, 2019
0955993
msgs sending, cant peer though
ec2 Nov 25, 2019
f3c013b
can bootstrap now
ec2 Nov 25, 2019
1fe9402
got the loop to work.
ec2 Nov 27, 2019
a2e6fd2
linting
ec2 Nov 27, 2019
f0f2f7a
more linting
ec2 Nov 27, 2019
d90d686
merge master
ec2 Nov 27, 2019
eca7248
more linting
ec2 Nov 27, 2019
24263c0
add config
ec2 Nov 27, 2019
4076fbb
remove some prints
ec2 Nov 27, 2019
830ca7e
docs
ec2 Nov 27, 2019
c20a7fb
PR suggestions
ec2 Nov 27, 2019
96b5f12
more suggestions
ec2 Nov 27, 2019
21552e7
even more suggestions
ec2 Nov 27, 2019
8435720
more suggestions
ec2 Nov 28, 2019
0c2585d
added topics into default config
ec2 Nov 28, 2019
ec52afe
lint
ec2 Nov 28, 2019
ab037a0
remove types
ec2 Nov 28, 2019
723eedb
lint
ec2 Nov 28, 2019
efd0061
clippy fixes
ec2 Nov 28, 2019
f7f5962
remove CLI portion of app. if you wanna have a chat, just checkout th…
ec2 Nov 28, 2019
c9b4d32
fix PR suggesttions
ec2 Dec 2, 2019
cc8847f
merge master
ec2 Dec 2, 2019
092d782
add logger to network and libp2p crates
ec2 Dec 2, 2019
3a68f2f
fix doctest
ec2 Dec 2, 2019
54bfd04
linting
ec2 Dec 2, 2019
c03f05b
remove doctest caused more headaches than was beneficial
ec2 Dec 2, 2019
2a73a4e
pr suggestions
ec2 Dec 3, 2019
b0812f1
borrow instead of cloning logger where it doesnt need to be cloned
ec2 Dec 3, 2019
59c2477
linting
ec2 Dec 3, 2019
33bdf70
pr suggestions
ec2 Dec 3, 2019
8e29083
more pr suggestions
ec2 Dec 3, 2019
18c0844
add vscode to gitignore
ec2 Dec 3, 2019
46166fb
merge master
ec2 Dec 3, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
/target
**/*.rs.bk
/Cargo.lock
.idea/
.idea
.DS_STORE
.vscode
.idea/
7 changes: 7 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,13 @@ edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
network = { path = "network" }
ferret-libp2p = { path = "ferret-libp2p"}


libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" }
ec2 marked this conversation as resolved.
Show resolved Hide resolved
tokio = "0.1.22"
futures = "0.1.29"
clap = "2.33.0"
dirs = "2.0.2"
toml = "0.5.5"
Expand Down
14 changes: 14 additions & 0 deletions node/ferret-libp2p/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
[package]
name = "ferret-libp2p"
version = "0.1.0"
authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" }
tokio = "0.1.22"
futures = "0.1.29"
log = "0.4.8"
slog = "2.5.2"
94 changes: 94 additions & 0 deletions node/ferret-libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use futures::Async;
use libp2p::core::identity::Keypair;
use libp2p::core::PeerId;
use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash};
use libp2p::mdns::{Mdns, MdnsEvent};
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess};
use libp2p::tokio_io::{AsyncRead, AsyncWrite};
use libp2p::NetworkBehaviour;

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")]
pub struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> {
pub gossipsub: Gossipsub<TSubstream>,
pub mdns: Mdns<TSubstream>,
#[behaviour(ignore)]
Comment on lines +11 to +15
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious what #[behaviour(ignore)] and #[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")] do? Can you explain?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[derive(NetworkBehaviour)] imparts the NetworkBehaviour trait to MyBehaviour. Then #[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")] are the attributes that the macro expects, to give it more context.

events: Vec<MyBehaviourEvent>,
}

pub enum MyBehaviourEvent {
DiscoveredPeer(PeerId),
ExpiredPeer(PeerId),
GossipMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Vec<u8>,
},
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be beneficial to have comments for some of these functions below

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These functions are all inherited through a trait. Do you think that it would still need documentation? Also they are not functions that we EVER should call. They are called by Libp2p

for MyBehaviour<TSubstream>
{
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.events.push(MyBehaviourEvent::DiscoveredPeer(peer))
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.events.push(MyBehaviourEvent::ExpiredPeer(peer))
}
}
}
}
}
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for MyBehaviour<TSubstream>
{
fn inject_event(&mut self, message: GossipsubEvent) {
if let GossipsubEvent::Message(_, message) = message {
self.events.push(MyBehaviourEvent::GossipMessage {
source: message.source,
topics: message.topics,
message: message.data,
})
}
}
}

impl<TSubstream: AsyncRead + AsyncWrite> MyBehaviour<TSubstream> {
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, MyBehaviourEvent>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Async::NotReady
}
}

impl<TSubstream: AsyncRead + AsyncWrite> MyBehaviour<TSubstream> {
pub fn new(local_key: &Keypair) -> Self {
let local_peer_id = local_key.public().into_peer_id();
let gossipsub_config = GossipsubConfig::default();
MyBehaviour {
gossipsub: Gossipsub::new(local_peer_id, gossipsub_config),
mdns: Mdns::new().expect("Failed to create mDNS service"),
events: vec![],
}
}

pub fn publish(&mut self, topic: &Topic, data: impl Into<Vec<u8>>) {
self.gossipsub.publish(topic, data);
}

pub fn subscribe(&mut self, topic: Topic) -> bool {
self.gossipsub.subscribe(topic)
}
}
20 changes: 20 additions & 0 deletions node/ferret-libp2p/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
use libp2p::gossipsub::Topic;

pub struct Libp2pConfig {
pub listening_multiaddr: String,
pub pubsub_topics: Vec<Topic>,
pub bootstrap_peers: Vec<String>,
}

impl Default for Libp2pConfig {
fn default() -> Self {
Libp2pConfig {
listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(),
pubsub_topics: vec![
Topic::new("/fil/blocks".to_owned()),
Topic::new("/fil/messages".to_owned()),
],
bootstrap_peers: vec![],
}
}
}
3 changes: 3 additions & 0 deletions node/ferret-libp2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod behaviour;
pub mod config;
pub mod service;
120 changes: 120 additions & 0 deletions node/ferret-libp2p/src/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
use super::behaviour::{MyBehaviour, MyBehaviourEvent};
use super::config::Libp2pConfig;
use futures::{Async, Stream};
use libp2p::{
core, core::muxing::StreamMuxerBox, core::nodes::Substream, core::transport::boxed::Boxed,
gossipsub::TopicHash, identity, mplex, secio, yamux, PeerId, Swarm, Transport,
};
use slog::{debug, error, info, Logger};
use std::io::{Error, ErrorKind};
use std::time::Duration;
type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour = MyBehaviour<Substream<StreamMuxerBox>>;

/// The Libp2pService listens to events from the Libp2p swarm.
pub struct Libp2pService {
pub swarm: Swarm<Libp2pStream, Libp2pBehaviour>,
}

impl Libp2pService {
/// Constructs a Libp2pService
pub fn new(log: &Logger, config: &Libp2pConfig) -> Self {
// TODO @Greg do local storage
let local_key = identity::Keypair::generate_ed25519();
let local_peer_id = PeerId::from(local_key.public());
info!(log, "Local peer id: {:?}", local_peer_id);

let transport = build_transport(local_key.clone());

let mut swarm = {
let be = MyBehaviour::new(&local_key);
Swarm::new(transport, be, local_peer_id)
};

for node in config.bootstrap_peers.clone() {
match node.parse() {
Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) {
Ok(_) => debug!(log, "Dialed {:?}", node),
Err(e) => debug!(log, "Dial {:?} failed: {:?}", node, e),
},
Err(err) => error!(log, "Failed to parse address to dial: {:?}", err),
}
}

Swarm::listen_on(
&mut swarm,
config
.listening_multiaddr
.parse()
.expect("Incorrect MultiAddr Format"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

function returns a result, are you sure you want this to be a panic?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I want it to panic if the user gives an invalid address to listen on.

)
.unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, the function signature is for a result, but there are no error types returned anywhere in the function. If intended to be a panic for these, change the function signature to just return Self

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right. Ill change that to return just Self

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I want it to panic if the user gives an invalid address to listen on.


for topic in config.pubsub_topics.clone() {
swarm.subscribe(topic);
}

Libp2pService { swarm }
}
}

impl Stream for Libp2pService {
type Item = NetworkEvent;
type Error = ();
ec2 marked this conversation as resolved.
Show resolved Hide resolved

/// Continuously polls the Libp2p swarm to get events
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(event))) => match event {
MyBehaviourEvent::DiscoveredPeer(peer) => {
libp2p::Swarm::dial(&mut self.swarm, peer);
}
MyBehaviourEvent::ExpiredPeer(_) => {}
MyBehaviourEvent::GossipMessage {
source,
topics,
message,
} => {
return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage {
source,
topics,
message,
})));
}
},
Ok(Async::Ready(None)) => break,
Ok(Async::NotReady) => break,
_ => break,
}
}
Ok(Async::NotReady)
}
}

/// Events emitted by this Service to be listened by the NetworkService.
#[derive(Clone)]
pub enum NetworkEvent {
PubsubMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Vec<u8>,
},
}

fn build_transport(local_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
let transport = libp2p::tcp::TcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport);

transport
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(local_key))
.multiplex(core::upgrade::SelectUpgrade::new(
yamux::Config::default(),
mplex::MplexConfig::new(),
))
.map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer)))
.timeout(Duration::from_secs(20))
.map_err(|err| Error::new(ErrorKind::Other, err))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you changing the error to an Other io error? Are you sure this is the type of error this should return?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. We get some sort of libp2p transport error, and i want to convert it to an IO error

.boxed()
}
15 changes: 15 additions & 0 deletions node/network/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "network"
version = "0.1.0"
authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ferret-libp2p = { path = "../ferret-libp2p" }
futures = "0.1.29"
tokio = "0.1.22"
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" }
log = "0.4.8"
slog = "2.5.2"
1 change: 1 addition & 0 deletions node/network/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod service;
Loading