From 25c550bd1fef5e88316e18cd063369b1106489fc Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 19 Nov 2019 10:37:52 -0500 Subject: [PATCH 01/35] update gitignore --- .gitignore | 4 +++- node/ferret-libp2p/src/service.rs | 0 2 files changed, 3 insertions(+), 1 deletion(-) create mode 100644 node/ferret-libp2p/src/service.rs diff --git a/.gitignore b/.gitignore index 212181d360e4..9910f4d50da3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ /target **/*.rs.bk -/Cargo.lock \ No newline at end of file +/Cargo.lock +.idea +.DS_STORE diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs new file mode 100644 index 000000000000..e69de29bb2d1 From 60ff0527bbd0795350da5a6669bcd8ef1ff1a6d0 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 19 Nov 2019 10:38:11 -0500 Subject: [PATCH 02/35] basic gossipsub mdns chat --- node/Cargo.toml | 9 +++ node/ferret-libp2p/Cargo.toml | 15 +++++ node/ferret-libp2p/src/behaviour.rs | 100 ++++++++++++++++++++++++++++ node/ferret-libp2p/src/lib.rs | 9 +++ node/ferret-libp2p/src/service.rs | 1 + node/src/main.rs | 93 +++++++++++++++++++++++++- vm/types/Cargo.toml | 11 +++ vm/types/src/actor/mod.rs | 28 ++++++++ vm/types/src/lib.rs | 9 +++ vm/types/src/message.rs | 40 +++++++++++ 10 files changed, 312 insertions(+), 3 deletions(-) create mode 100644 node/ferret-libp2p/Cargo.toml create mode 100644 node/ferret-libp2p/src/behaviour.rs create mode 100644 node/ferret-libp2p/src/lib.rs create mode 100644 vm/types/Cargo.toml create mode 100644 vm/types/src/actor/mod.rs create mode 100644 vm/types/src/lib.rs create mode 100644 vm/types/src/message.rs diff --git a/node/Cargo.toml b/node/Cargo.toml index 92f4df347a09..5f18a7eee1fb 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -12,3 +12,12 @@ keystore = { path = "keystore" } network = { path = "network" } repository = { path = "repository" } types = { path = "types" } +ferret-libp2p = { path = "ferret-libp2p"} + + +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } +tokio = "0.1" +tokio-stdin-stdout = "*" +futures = "0.1.29" +log = "0.4.8" +env_logger = "*" \ No newline at end of file diff --git a/node/ferret-libp2p/Cargo.toml b/node/ferret-libp2p/Cargo.toml new file mode 100644 index 000000000000..2e06aebb0fea --- /dev/null +++ b/node/ferret-libp2p/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "ferret-libp2p" +version = "0.1.0" +authors = ["Eric Tu "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } +tokio = "0.1" +tokio-stdin-stdout = "*" +futures = "0.1.29" +log = "0.4.8" +env_logger = "*" \ No newline at end of file diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs new file mode 100644 index 000000000000..e394756dc398 --- /dev/null +++ b/node/ferret-libp2p/src/behaviour.rs @@ -0,0 +1,100 @@ +use libp2p::NetworkBehaviour; +use libp2p::core::{Multiaddr, PeerId, }; +use libp2p::kad::record; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; +use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; +use log::{debug, warn}; +use libp2p::gossipsub::{Gossipsub, GossipsubEvent, GossipsubConfig, Topic, TopicHash}; +use libp2p::mdns::{Mdns, MdnsEvent}; +use libp2p::tokio_io::{AsyncRead, AsyncWrite}; +use libp2p::core::identity::Keypair; +use libp2p::{swarm::toggle::Toggle, }; +use futures::prelude::*; +use futures::Async; + +#[derive(NetworkBehaviour)] +#[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")] +pub struct MyBehaviour { + pub gossipsub: Gossipsub, + pub mdns: Mdns, + #[behaviour(ignore)] + events: Vec, +} + +pub enum MyBehaviourEvent { + DiscoveredPeer(PeerId), + ExpiredPeer(PeerId), + GossipMessage { + source: PeerId, + topics: Vec, + message: Vec, + }, +} + +impl NetworkBehaviourEventProcess for MyBehaviour { + fn inject_event(&mut self, event: MdnsEvent) { + match event { + MdnsEvent::Discovered(list) => { + for (peer, _) in list { + self.events.push(MyBehaviourEvent::DiscoveredPeer(peer)) + } + }, + MdnsEvent::Expired(list) => { + for (peer, _) in list { + if !self.mdns.has_node(&peer) { + self.events.push(MyBehaviourEvent::ExpiredPeer(peer)) + } + } + } + } + } +} + +impl NetworkBehaviourEventProcess for MyBehaviour { + // Called when `floodsub` produces an event. + fn inject_event(&mut self, message: GossipsubEvent) { + if let GossipsubEvent::Message(_, message) = message { + self.events.push(MyBehaviourEvent::GossipMessage { + source: message.source, + topics: message.topics, + message: message.data, + }) + } + } +} + +impl MyBehaviour { + /// Consumes the events list when polled. + fn poll( + &mut self, + ) -> Async> { + if !self.events.is_empty() { + return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); + } + + Async::NotReady + } +} + +impl MyBehaviour { + pub fn new(local_key: &Keypair) -> Self { + let local_peer_id = local_key.public().clone().into_peer_id(); + let gossipsub_config = GossipsubConfig::default(); + MyBehaviour { + gossipsub: Gossipsub::new(local_peer_id.clone(), gossipsub_config), + mdns: Mdns::new().expect("Failed to create mDNS service"), + events: vec![] + } + } + + pub fn publish(&mut self, topic: &Topic, data: impl Into>) { + println!("Pubishing a message"); + self.gossipsub.publish(topic, data); + } + + pub fn subscribe(&mut self, topic: Topic) -> bool { + self.gossipsub.subscribe(topic) + } + +} + diff --git a/node/ferret-libp2p/src/lib.rs b/node/ferret-libp2p/src/lib.rs new file mode 100644 index 000000000000..89524ede2664 --- /dev/null +++ b/node/ferret-libp2p/src/lib.rs @@ -0,0 +1,9 @@ +pub mod behaviour; +pub mod service; +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index e69de29bb2d1..b3bd64ce599f 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -0,0 +1 @@ +use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; \ No newline at end of file diff --git a/node/src/main.rs b/node/src/main.rs index e7a11a969c03..3e61947ccad6 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -1,3 +1,90 @@ -fn main() { - println!("Hello, world!"); -} +use libp2p::{ + identity, + PeerId, + gossipsub::{ + Topic, + }, + swarm::{ + Swarm, + }, + tokio_codec::{FramedRead, LinesCodec}, +}; +use tokio; + + +use ferret_libp2p::behaviour::*; + +use futures::prelude::*; + +use env_logger::{Builder, Env}; + + +fn main(){ + Builder::from_env(Env::default().default_filter_or("info")).init(); + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + println!("Local peer id: {:?}", local_peer_id); + + // Set up an encrypted TCP Transport over the Mplex and Yamux protocols + let transport = libp2p::build_development_transport(local_key.clone()); + + // Create a Floodsub/Gossipsub topic + let topic = Topic::new("test-net".into()); + + let mut swarm = { + let be = MyBehaviour::new(&local_key); + Swarm::new(transport, be, local_peer_id) + }; + swarm.gossipsub.subscribe(topic.clone()); + + if let Some(x) = std::env::args().nth(1) { + println!("Hello, world! {}", x); + } else { + println!("Nothing"); + } + + Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + + let stdin = tokio_stdin_stdout::stdin(0); + let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); + let mut listening = false; + tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { + loop { + match framed_stdin.poll().expect("Error while polling stdin") { + Async::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()), + Async::Ready(None) => panic!("Stdin closed"), + Async::NotReady => break, + }; + } + loop { + match swarm.poll() { + Ok(Async::Ready(Some(event))) => match event { + MyBehaviourEvent::DiscoveredPeer(peer) => { + libp2p::Swarm::dial(&mut swarm, peer); + }, + MyBehaviourEvent::ExpiredPeer(peer) => { + }, + MyBehaviourEvent::GossipMessage { + source, + topics, + message, + } => { + println!("Received Gossip: {:?} {:?} {:?}", source, topics, String::from_utf8(message).unwrap()); + } + + }, + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { + if !listening { + if let Some(a) = Swarm::listeners(&swarm).next() { + println!("Listening on {:?}", a); + listening = true; + } + } + break + }, + _ => {} + } + } + Ok(Async::NotReady) + })); +} \ No newline at end of file diff --git a/vm/types/Cargo.toml b/vm/types/Cargo.toml new file mode 100644 index 000000000000..265ec529df8f --- /dev/null +++ b/vm/types/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "types" +version = "0.1.0" +authors = ["Eric Tu "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +cid = "*" +bytes = "*" \ No newline at end of file diff --git a/vm/types/src/actor/mod.rs b/vm/types/src/actor/mod.rs new file mode 100644 index 000000000000..caaa55463f16 --- /dev/null +++ b/vm/types/src/actor/mod.rs @@ -0,0 +1,28 @@ +extern crate cid; +use cid::{Cid}; +use bytes::{Bytes}; + +//Might have to be some bignum +pub type UVarint = u64; + +pub type MethodNum = UVarint; +pub type MethodParam = Bytes; +pub type MethodParams = Vec; +pub type Code = Bytes; + +pub type TokenAmount = UVarint; +pub type CallSeqNum = UVarint; + +pub type CodeCID = Cid; +pub type ActorSubstateCID = Cid; + +pub struct ActorState { + codeCID: CodeCID, + state: ActorSubstateCID, + balance: TokenAmount, + callSeqNum: CallSeqNum, +} + +pub struct Actor { + pub state: ActorState, +} \ No newline at end of file diff --git a/vm/types/src/lib.rs b/vm/types/src/lib.rs new file mode 100644 index 000000000000..41a74be78360 --- /dev/null +++ b/vm/types/src/lib.rs @@ -0,0 +1,9 @@ +pub mod message; +pub mod actor; +#[cfg(test)] +mod tests { + #[test] + fn it_works() { + assert_eq!(2 + 2, 4); + } +} diff --git a/vm/types/src/message.rs b/vm/types/src/message.rs new file mode 100644 index 000000000000..aaab93e37387 --- /dev/null +++ b/vm/types/src/message.rs @@ -0,0 +1,40 @@ +use super::actor::{MethodNum, MethodParams, CallSeqNum, TokenAmount}; + +//Should probably be a bignum +pub type GasAmount = u64; + +pub type GasPrice = TokenAmount; + +pub type Address = String; + +pub type FilCryptoSignature = String; + +#[derive (Debug, Clone)] +pub struct Message { + from: Address, //addr.Address + to: Address, // addr.address + method: MethodNum, + params: MethodParams, + call_seq_num: CallSeqNum, + value: TokenAmount, + gas_price: GasPrice, + gas_limit: GasAmount, +} + +#[derive (Debug, Clone)] +pub struct SignedMessage { + message: Message, + signature: FilCryptoSignature, +} + +impl From for Message { + fn from (msg: SignedMessage) -> Self{ + msg.message + } +} + +impl Message { + fn sign () -> SignedMessage { + unimplemented!(); + } +} From facc4ea40f5845f24f342fc917f2a5726b3871de Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Thu, 21 Nov 2019 15:47:05 -0500 Subject: [PATCH 03/35] broke up into service --- node/Cargo.toml | 2 +- node/ferret-libp2p/Cargo.toml | 2 +- node/ferret-libp2p/src/service.rs | 160 +++++++++++++++++++++++++++++- node/src/main.rs | 95 ++++++------------ 4 files changed, 192 insertions(+), 67 deletions(-) diff --git a/node/Cargo.toml b/node/Cargo.toml index 5f18a7eee1fb..fac2dc34576c 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -16,7 +16,7 @@ ferret-libp2p = { path = "ferret-libp2p"} libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } -tokio = "0.1" +tokio = "0.1.22" tokio-stdin-stdout = "*" futures = "0.1.29" log = "0.4.8" diff --git a/node/ferret-libp2p/Cargo.toml b/node/ferret-libp2p/Cargo.toml index 2e06aebb0fea..650c0af40d43 100644 --- a/node/ferret-libp2p/Cargo.toml +++ b/node/ferret-libp2p/Cargo.toml @@ -8,7 +8,7 @@ edition = "2018" [dependencies] libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } -tokio = "0.1" +tokio = "0.1.22" tokio-stdin-stdout = "*" futures = "0.1.29" log = "0.4.8" diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index b3bd64ce599f..3718d336bbd0 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1 +1,159 @@ -use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; \ No newline at end of file +use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; +use libp2p::{self, Swarm, core::transport::boxed::Boxed, core, secio, yamux, mplex, PeerId, core::muxing::StreamMuxerBox, core::nodes::Substream, identity, gossipsub::{Topic, TopicHash}, Transport, build_development_transport}; +use futures::{Stream, Async}; +use std::io::{Error, ErrorKind}; +use std::time::Duration; +use futures::sync::mpsc; +use futures::future::Future; + +use std::sync::Arc; + +type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; +type Libp2pBehaviour = MyBehaviour>; + +pub struct Service { + pub swarm: Swarm, + network_receiver: mpsc::UnboundedReceiver, + outbound_transmitter: Arc>, +} + +impl Service { + pub fn new (outbound_transmitter: Arc>) -> Result<(Self, Arc>), Error> + { + // Starting Libp2p Service + + // TODO @Greg do local storage + let local_key = identity::Keypair::generate_ed25519(); + let local_peer_id = PeerId::from(local_key.public()); + + + let transport = build_transport(local_key.clone()); +// let transport = build_development_transport(&local_key); + let mut swarm = { + let be = MyBehaviour::new(&local_key); + Swarm::new(transport, be, local_peer_id) + }; + + // TODO be able to specify port aand listening addr with proper error handling + Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + + // TODO be able to bootstrap peers + // TODO build list of topics + + let topic = Topic::new("test-net".into()); + swarm.subscribe(topic.clone()); + let (tx, rx) = mpsc::unbounded(); + let tx = Arc::new(tx); + Ok((Service{ + swarm: swarm, + network_receiver: rx, + outbound_transmitter + }, tx.clone())) + } +} + +pub enum NetworkEvent { + PubsubMessage { + source: PeerId, + topics: Vec, + message: Vec, + }, +} + +pub enum NetworkMessage { + PubsubMessage{ + topic: Topic, + message: Vec, + } +} + +impl Service { + pub fn start(&mut self) { + loop { + match self.swarm.poll() { + Ok(Async::Ready(Some(event))) => match event { + MyBehaviourEvent::DiscoveredPeer(peer) => { + libp2p::Swarm::dial(&mut self.swarm, peer); + }, + MyBehaviourEvent::ExpiredPeer(peer) => { + }, + MyBehaviourEvent::GossipMessage { + source, + topics, + message, + } => { + // TODO proper error handling + self.outbound_transmitter.unbounded_send(NetworkEvent::PubsubMessage { + source, + topics, + message, + }).unwrap_or_else(|e| { + panic!( + "failed to send in network_transmitter" + ); + }); + } + }, + Ok(Async::Ready(None)) => {} + Ok(Async::NotReady) => {}, + _ => {} + } + } + } +} + + +fn build_transport(local_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { + let transport = libp2p::tcp::TcpConfig::new().nodelay(true); + let transport = libp2p::dns::DnsConfig::new(transport); + + transport.upgrade(core::upgrade::Version::V1) + .authenticate(secio::SecioConfig::new(local_key)) + .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) + .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) + .timeout(Duration::from_secs(20)) + .map_err(|err| Error::new(ErrorKind::Other, err)) + .boxed() +} + + + +//tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { +//loop { +//match framed_stdin.poll().expect("Error while polling stdin") { +//Async::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()), +//Async::Ready(None) => panic!("Stdin closed"), +//Async::NotReady => break, +//}; +//} +//loop { +//match swarm.poll() { +//Ok(Async::Ready(Some(event))) => match event { +//MyBehaviourEvent::DiscoveredPeer(peer) => { +//libp2p::Swarm::dial(&mut swarm, peer); +//}, +//MyBehaviourEvent::ExpiredPeer(peer) => { +//}, +//MyBehaviourEvent::GossipMessage { +//source, +//topics, +//message, +//} => { +//println!("Received Gossip: {:?} {:?} {:?}", source, topics, String::from_utf8(message).unwrap()); +//} +// +//}, +//Ok(Async::Ready(None)) | Ok(Async::NotReady) => { +//if !listening { +//if let Some(a) = Swarm::listeners(&swarm).next() { +//println!("Listening on {:?}", a); +//listening = true; +//} +//} +//break +//}, +//_ => {} +//} +//} +//Ok(Async::NotReady) +//})); \ No newline at end of file diff --git a/node/src/main.rs b/node/src/main.rs index 3e61947ccad6..dffdcf2fbe4c 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -9,82 +9,49 @@ use libp2p::{ }, tokio_codec::{FramedRead, LinesCodec}, }; -use tokio; +use futures::sync::mpsc; use ferret_libp2p::behaviour::*; +use ferret_libp2p::service::*; use futures::prelude::*; use env_logger::{Builder, Env}; - +use ferret_libp2p::service; +use std::sync::Arc; +use tokio::prelude::*; +use tokio; +use futures::future::lazy; +use tokio::runtime::current_thread::Runtime; fn main(){ Builder::from_env(Env::default().default_filter_or("info")).init(); - let local_key = identity::Keypair::generate_ed25519(); - let local_peer_id = PeerId::from(local_key.public()); - println!("Local peer id: {:?}", local_peer_id); - - // Set up an encrypted TCP Transport over the Mplex and Yamux protocols - let transport = libp2p::build_development_transport(local_key.clone()); - - // Create a Floodsub/Gossipsub topic - let topic = Topic::new("test-net".into()); - - let mut swarm = { - let be = MyBehaviour::new(&local_key); - Swarm::new(transport, be, local_peer_id) - }; - swarm.gossipsub.subscribe(topic.clone()); - - if let Some(x) = std::env::args().nth(1) { - println!("Hello, world! {}", x); - } else { - println!("Nothing"); - } - - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); - + let (tx, rx) = mpsc::unbounded::(); + let tx = Arc::new(tx); + let (mut network_service, net_tx) = service::Service::new(tx.clone()).unwrap(); let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); let mut listening = false; - tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { - loop { - match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()), - Async::Ready(None) => panic!("Stdin closed"), - Async::NotReady => break, - }; - } - loop { - match swarm.poll() { - Ok(Async::Ready(Some(event))) => match event { - MyBehaviourEvent::DiscoveredPeer(peer) => { - libp2p::Swarm::dial(&mut swarm, peer); - }, - MyBehaviourEvent::ExpiredPeer(peer) => { - }, - MyBehaviourEvent::GossipMessage { - source, - topics, - message, - } => { - println!("Received Gossip: {:?} {:?} {:?}", source, topics, String::from_utf8(message).unwrap()); - } - }, - Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - if !listening { - if let Some(a) = Swarm::listeners(&swarm).next() { - println!("Listening on {:?}", a); - listening = true; - } - } - break - }, - _ => {} - } - } - Ok(Async::NotReady) - })); +// let mut rt = Runtime::new().unwrap(); + + // tokio::runtime::run(lazy (|| { + // network_service.start(); + // Ok(()) + // })); +// rt.block_on(network_service.start()); + network_service.start(); + // tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { + // loop { + // match framed_stdin.poll().expect("Error while polling stdin") { + // Async::Ready(Some(line)) => println!("aaa"), + // // Async::Ready(Some(line)) => network_service.publish(&topic, line.as_bytes()), + // Async::Ready(None) => panic!("Stdin closed"), + // Async::NotReady => break, + // }; + // } + // Ok(Async::NotReady) + // }).map_err(|e| println!("error = {:?}", e))); + } \ No newline at end of file From e9c73463f42f9f4ba19b2dffc252ccee9858fd2c Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Sat, 23 Nov 2019 11:34:23 -0500 Subject: [PATCH 04/35] doesnt work, would appreciate a look --- node/ferret-libp2p/src/service.rs | 70 ++++++++++++++++++------------- node/src/main.rs | 26 ++++-------- 2 files changed, 48 insertions(+), 48 deletions(-) diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 3718d336bbd0..2405b6daa366 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -7,7 +7,10 @@ use futures::sync::mpsc; use futures::future::Future; use std::sync::Arc; - +use futures::future::PollFn; +use futures::task::Spawn; +use futures::Lazy; +use futures::IntoFuture; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; @@ -18,8 +21,12 @@ pub struct Service { } impl Service { + pub fn publish(&mut self, topic: &Topic, data: impl Into>) { + println!("Pubishing a message"); + self.swarm.gossipsub.publish(topic, data); + } pub fn new (outbound_transmitter: Arc>) -> Result<(Self, Arc>), Error> - { + { // Starting Libp2p Service // TODO @Greg do local storage @@ -68,37 +75,42 @@ pub enum NetworkMessage { } impl Service { - pub fn start(&mut self) { - loop { - match self.swarm.poll() { - Ok(Async::Ready(Some(event))) => match event { - MyBehaviourEvent::DiscoveredPeer(peer) => { - libp2p::Swarm::dial(&mut self.swarm, peer); - }, - MyBehaviourEvent::ExpiredPeer(peer) => { - }, - MyBehaviourEvent::GossipMessage { - source, - topics, - message, - } => { - // TODO proper error handling - self.outbound_transmitter.unbounded_send(NetworkEvent::PubsubMessage { + + pub fn start(&'static mut self) -> SpawnIntoFuture, dyn IntoFuture>>{ + futures::executor::spawn(futures::lazy(|| { + loop { + match self.swarm.poll() { + Ok(Async::Ready(Some(event))) => match event { + MyBehaviourEvent::DiscoveredPeer(peer) => { + libp2p::Swarm::dial(&mut self.swarm, peer); + }, + MyBehaviourEvent::ExpiredPeer(peer) => { + }, + MyBehaviourEvent::GossipMessage { source, topics, message, - }).unwrap_or_else(|e| { - panic!( - "failed to send in network_transmitter" - ); - }); - } - }, - Ok(Async::Ready(None)) => {} - Ok(Async::NotReady) => {}, - _ => {} + } => { + // TODO proper error handling + self.outbound_transmitter.unbounded_send(NetworkEvent::PubsubMessage { + source, + topics, + message, + }).unwrap_or_else(|e| { + panic!( + "failed to send in network_transmitter" + ); + }); + } + }, + Ok(Async::Ready(None)) => {} + Ok(Async::NotReady) => {}, + _ => {} + } } - } + Ok(Async::NotReady) + })) + } } diff --git a/node/src/main.rs b/node/src/main.rs index dffdcf2fbe4c..33e5acd67509 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -34,24 +34,12 @@ fn main(){ let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); let mut listening = false; -// let mut rt = Runtime::new().unwrap(); - - // tokio::runtime::run(lazy (|| { - // network_service.start(); - // Ok(()) - // })); -// rt.block_on(network_service.start()); - network_service.start(); - // tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { - // loop { - // match framed_stdin.poll().expect("Error while polling stdin") { - // Async::Ready(Some(line)) => println!("aaa"), - // // Async::Ready(Some(line)) => network_service.publish(&topic, line.as_bytes()), - // Async::Ready(None) => panic!("Stdin closed"), - // Async::NotReady => break, - // }; - // } - // Ok(Async::NotReady) - // }).map_err(|e| println!("error = {:?}", e))); + let mut rt = Runtime::new().unwrap(); + + + println!("???"); + tokio::run(network_service.start()); + + println!("???"); } \ No newline at end of file From eb249168173e00b310fda45e89caa8a2cb9fdc78 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 25 Nov 2019 16:33:57 -0500 Subject: [PATCH 05/35] it compiles! need to handle events now --- node/ferret-libp2p/src/service.rs | 163 +++++++++++------------------- node/network/Cargo.toml | 4 + node/network/src/lib.rs | 2 +- node/network/src/service.rs | 90 +++++++++++++++++ node/src/main.rs | 41 +++++--- 5 files changed, 182 insertions(+), 118 deletions(-) create mode 100644 node/network/src/service.rs diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 2405b6daa366..4231edf4359f 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,31 +1,28 @@ use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; use libp2p::{self, Swarm, core::transport::boxed::Boxed, core, secio, yamux, mplex, PeerId, core::muxing::StreamMuxerBox, core::nodes::Substream, identity, gossipsub::{Topic, TopicHash}, Transport, build_development_transport}; -use futures::{Stream, Async}; +use futures::{Stream, Async, Future}; use std::io::{Error, ErrorKind}; use std::time::Duration; use futures::sync::mpsc; -use futures::future::Future; +use tokio::runtime::TaskExecutor; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use futures::future::PollFn; use futures::task::Spawn; use futures::Lazy; use futures::IntoFuture; +use std::ops::{Deref, DerefMut}; + type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; -pub struct Service { +pub struct Libp2pService{ pub swarm: Swarm, - network_receiver: mpsc::UnboundedReceiver, - outbound_transmitter: Arc>, } -impl Service { - pub fn publish(&mut self, topic: &Topic, data: impl Into>) { - println!("Pubishing a message"); - self.swarm.gossipsub.publish(topic, data); - } - pub fn new (outbound_transmitter: Arc>) -> Result<(Self, Arc>), Error> +impl Libp2pService{ + // TODO Allow bootstrap and topics + pub fn new () -> Result { // Starting Libp2p Service @@ -35,7 +32,6 @@ impl Service { let transport = build_transport(local_key.clone()); -// let transport = build_development_transport(&local_key); let mut swarm = { let be = MyBehaviour::new(&local_key); Swarm::new(transport, be, local_peer_id) @@ -49,71 +45,69 @@ impl Service { let topic = Topic::new("test-net".into()); swarm.subscribe(topic.clone()); - let (tx, rx) = mpsc::unbounded(); - let tx = Arc::new(tx); - Ok((Service{ + + Ok((Libp2pService{ swarm: swarm, - network_receiver: rx, - outbound_transmitter - }, tx.clone())) + })) } } -pub enum NetworkEvent { - PubsubMessage { - source: PeerId, - topics: Vec, - message: Vec, - }, -} +impl Stream for Libp2pService { -pub enum NetworkMessage { - PubsubMessage{ - topic: Topic, - message: Vec, - } -} + type Item = NetworkEvent; + type Error = (); -impl Service { - - pub fn start(&'static mut self) -> SpawnIntoFuture, dyn IntoFuture>>{ - futures::executor::spawn(futures::lazy(|| { - loop { - match self.swarm.poll() { - Ok(Async::Ready(Some(event))) => match event { - MyBehaviourEvent::DiscoveredPeer(peer) => { - libp2p::Swarm::dial(&mut self.swarm, peer); - }, - MyBehaviourEvent::ExpiredPeer(peer) => { - }, - MyBehaviourEvent::GossipMessage { + fn poll(&mut self) -> Result>, Self::Error> { + let mut listening = false; + loop { + match self.swarm.poll() { + Ok(Async::Ready(Some(event))) => match event { + MyBehaviourEvent::DiscoveredPeer(peer) => { + libp2p::Swarm::dial(&mut self.swarm, peer); + break; + }, + MyBehaviourEvent::ExpiredPeer(peer) => { + break; + }, + MyBehaviourEvent::GossipMessage { + source, + topics, + message, + } => { + let message = String::from_utf8(message).unwrap(); + println!("Received Gossip: {:?} {:?} {:?}", source, topics, message); + return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage { source, topics, - message, - } => { - // TODO proper error handling - self.outbound_transmitter.unbounded_send(NetworkEvent::PubsubMessage { - source, - topics, - message, - }).unwrap_or_else(|e| { - panic!( - "failed to send in network_transmitter" - ); - }); + message + }))); + } + + }, + Ok(Async::Ready(None)) | Ok(Async::NotReady) => { + if !listening { + if let Some(a) = Swarm::listeners(&self.swarm).next() { + println!("Listening on {:?}", a); + listening = true; } - }, - Ok(Async::Ready(None)) => {} - Ok(Async::NotReady) => {}, - _ => {} - } + } + break + }, + _ => {} } - Ok(Async::NotReady) - })) - + } + Ok(Async::NotReady) } } +#[derive(Clone)] +pub enum NetworkEvent { + PubsubMessage { + source: PeerId, + topics: Vec, + message: String, + }, +} fn build_transport(local_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> { let transport = libp2p::tcp::TcpConfig::new().nodelay(true); @@ -130,42 +124,3 @@ fn build_transport(local_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBo -//tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { -//loop { -//match framed_stdin.poll().expect("Error while polling stdin") { -//Async::Ready(Some(line)) => swarm.publish(&topic, line.as_bytes()), -//Async::Ready(None) => panic!("Stdin closed"), -//Async::NotReady => break, -//}; -//} -//loop { -//match swarm.poll() { -//Ok(Async::Ready(Some(event))) => match event { -//MyBehaviourEvent::DiscoveredPeer(peer) => { -//libp2p::Swarm::dial(&mut swarm, peer); -//}, -//MyBehaviourEvent::ExpiredPeer(peer) => { -//}, -//MyBehaviourEvent::GossipMessage { -//source, -//topics, -//message, -//} => { -//println!("Received Gossip: {:?} {:?} {:?}", source, topics, String::from_utf8(message).unwrap()); -//} -// -//}, -//Ok(Async::Ready(None)) | Ok(Async::NotReady) => { -//if !listening { -//if let Some(a) = Swarm::listeners(&swarm).next() { -//println!("Listening on {:?}", a); -//listening = true; -//} -//} -//break -//}, -//_ => {} -//} -//} -//Ok(Async::NotReady) -//})); \ No newline at end of file diff --git a/node/network/Cargo.toml b/node/network/Cargo.toml index eb0fe20db24a..828884c9bba6 100644 --- a/node/network/Cargo.toml +++ b/node/network/Cargo.toml @@ -7,3 +7,7 @@ edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +ferret-libp2p = { path = "../ferret-libp2p" } +futures = "0.1.29" +tokio = "0.1.22" +libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index 8b137891791f..cc097b599434 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -1 +1 @@ - +pub mod service; \ No newline at end of file diff --git a/node/network/src/service.rs b/node/network/src/service.rs new file mode 100644 index 000000000000..3cc7d79b900e --- /dev/null +++ b/node/network/src/service.rs @@ -0,0 +1,90 @@ +use ferret_libp2p::service::{Libp2pService, NetworkEvent}; +use tokio::sync::mpsc; + +use std::sync::{Arc, Mutex}; +//use std::error::Error; +use libp2p::{self, Swarm, core::transport::boxed::Boxed, core, secio, yamux, mplex, PeerId, core::muxing::StreamMuxerBox, core::nodes::Substream, identity, gossipsub::{Topic, TopicHash}, Transport, build_development_transport}; +use tokio::runtime::TaskExecutor; +use futures::Async; +use futures::stream::Stream; +use futures::Future; + + +pub enum NetworkMessage { + PubsubMessage { + topics: Topic, + message: Vec, + }, +} + +pub struct NetworkService{ + pub libp2p: Arc>, +} + +impl NetworkService{ + pub fn new ( + outbound_transmitter: Arc>, + executor: &TaskExecutor + ) -> (Self, mpsc::UnboundedSender) { + let (tx, rx) = mpsc::unbounded_channel(); + + let libp2p_service = Arc::new(Mutex::new(Libp2pService::new().unwrap())); + + start(libp2p_service.clone(), executor, outbound_transmitter, rx); + + return (NetworkService{ + libp2p: libp2p_service, + }, tx); + } +} + +enum Error{ + aaa (u8) +} + + + +pub fn start ( + libp2p_service: Arc>, + executor: &TaskExecutor, + outbound_transmitter: Arc>, + mut message_receiver: mpsc::UnboundedReceiver, +) -> tokio::sync::oneshot::Sender { + let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); + executor.spawn( + poll(libp2p_service,outbound_transmitter,message_receiver) + .select(exit_rx.then(|_| Ok(()))) + .then(move |_| { + Ok(()) + }), + ); + + network_exit +} + +fn poll( + libp2p_service: Arc>, + outbound_transmitter: Arc>, + mut message_receiver: mpsc::UnboundedReceiver, +) -> impl futures::Future { + futures::future::poll_fn(move || -> Result<_, Error> { + loop { + match message_receiver.poll() { + Ok(Async::Ready(Some(event))) => match event { + NetworkMessage::PubsubMessage { + topics, + message + } => { + libp2p_service.lock().unwrap().swarm.publish(&topics, message); + } + } + _ => {} + } + match libp2p_service.lock().unwrap().poll() { + _ => break + } + return Err(Error::aaa(2)); + } + Ok(Async::NotReady) + }) +} \ No newline at end of file diff --git a/node/src/main.rs b/node/src/main.rs index 33e5acd67509..78bfe76aa899 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -10,10 +10,11 @@ use libp2p::{ tokio_codec::{FramedRead, LinesCodec}, }; -use futures::sync::mpsc; +use tokio::sync::mpsc; use ferret_libp2p::behaviour::*; -use ferret_libp2p::service::*; +use ferret_libp2p::service::{NetworkEvent}; +use network::service::*; use futures::prelude::*; @@ -23,23 +24,37 @@ use std::sync::Arc; use tokio::prelude::*; use tokio; use futures::future::lazy; -use tokio::runtime::current_thread::Runtime; +use tokio::runtime::Runtime; +use std::sync::Mutex; fn main(){ Builder::from_env(Env::default().default_filter_or("info")).init(); - let (tx, rx) = mpsc::unbounded::(); - let tx = Arc::new(tx); - let (mut network_service, net_tx) = service::Service::new(tx.clone()).unwrap(); - let stdin = tokio_stdin_stdout::stdin(0); - let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); - let mut listening = false; - let mut rt = Runtime::new().unwrap(); + let (tx, rx) = mpsc::unbounded_channel::(); + let mut tx = Arc::new(tx); - println!("???"); - tokio::run(network_service.start()); + let (mut network_service, mut net_tx) = NetworkService::new(tx.clone(),&rt.executor()); - println!("???"); + let network_service = Arc::new(network_service); + let stdin = tokio_stdin_stdout::stdin(0); + let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); + let mut listening = false; + let topic = Topic::new("test-net".into()); + + tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { + loop { + match framed_stdin.poll().expect("Error while polling stdin") { + Async::Ready(Some(line)) => net_tx.try_send( + NetworkMessage::PubsubMessage { + topics: topic.clone(), + message: line.as_bytes().to_vec() + }), + Async::Ready(None) => panic!("Stdin closed"), + Async::NotReady => break, + }; + } + Ok(Async::NotReady) + })); } \ No newline at end of file From 09559931b35e5a38b1bf32eca4c46ab0713b8c8d Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 25 Nov 2019 17:35:55 -0500 Subject: [PATCH 06/35] msgs sending, cant peer though --- node/ferret-libp2p/src/behaviour.rs | 2 -- node/ferret-libp2p/src/service.rs | 12 +++--------- node/network/src/service.rs | 24 ++++++++++++++++-------- node/src/main.rs | 14 ++++++++++---- 4 files changed, 29 insertions(+), 23 deletions(-) diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs index e394756dc398..e4f59065cabc 100644 --- a/node/ferret-libp2p/src/behaviour.rs +++ b/node/ferret-libp2p/src/behaviour.rs @@ -51,7 +51,6 @@ impl NetworkBehaviourEventProcess } impl NetworkBehaviourEventProcess for MyBehaviour { - // Called when `floodsub` produces an event. fn inject_event(&mut self, message: GossipsubEvent) { if let GossipsubEvent::Message(_, message) = message { self.events.push(MyBehaviourEvent::GossipMessage { @@ -71,7 +70,6 @@ impl MyBehaviour { if !self.events.is_empty() { return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0))); } - Async::NotReady } } diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 4231edf4359f..5fec61b93e12 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -60,14 +60,14 @@ impl Stream for Libp2pService { fn poll(&mut self) -> Result>, Self::Error> { let mut listening = false; loop { + println!("loop poll"); match self.swarm.poll() { Ok(Async::Ready(Some(event))) => match event { MyBehaviourEvent::DiscoveredPeer(peer) => { + println!("LIBP2P DISCOVERED PEER {:?}", peer); libp2p::Swarm::dial(&mut self.swarm, peer); - break; }, MyBehaviourEvent::ExpiredPeer(peer) => { - break; }, MyBehaviourEvent::GossipMessage { source, @@ -85,15 +85,9 @@ impl Stream for Libp2pService { }, Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - if !listening { - if let Some(a) = Swarm::listeners(&self.swarm).next() { - println!("Listening on {:?}", a); - listening = true; - } - } break }, - _ => {} + _ => {break} } } Ok(Async::NotReady) diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 3cc7d79b900e..ea4d2d9c29dc 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -24,17 +24,17 @@ pub struct NetworkService{ impl NetworkService{ pub fn new ( outbound_transmitter: Arc>, - executor: &TaskExecutor - ) -> (Self, mpsc::UnboundedSender) { + executor: &TaskExecutor, + ) -> (Self, mpsc::UnboundedSender, tokio::sync::oneshot::Sender) { let (tx, rx) = mpsc::unbounded_channel(); let libp2p_service = Arc::new(Mutex::new(Libp2pService::new().unwrap())); - start(libp2p_service.clone(), executor, outbound_transmitter, rx); + let exit_tx = start(libp2p_service.clone(), executor, outbound_transmitter, rx); return (NetworkService{ libp2p: libp2p_service, - }, tx); + }, tx, exit_tx); } } @@ -75,15 +75,23 @@ fn poll( topics, message } => { + println!("Got a msg from msgchannel"); libp2p_service.lock().unwrap().swarm.publish(&topics, message); } - } - _ => {} + }, + Ok(Async::NotReady) => break, + _ => {break} } match libp2p_service.lock().unwrap().poll() { - _ => break + Ok(Async::Ready(Some(event))) => match event { + NetworkEvent::PubsubMessage { source, topics, message } => { + println!("ASDFASDFSADFSAF") + } + } + Ok(Async::Ready(None)) => unreachable!("Stream never ends"), + Ok(Async::NotReady) => break, + _ => {break} } - return Err(Error::aaa(2)); } Ok(Async::NotReady) }) diff --git a/node/src/main.rs b/node/src/main.rs index 78bfe76aa899..8cca54126faf 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -34,7 +34,7 @@ fn main(){ let (tx, rx) = mpsc::unbounded_channel::(); let mut tx = Arc::new(tx); - let (mut network_service, mut net_tx) = NetworkService::new(tx.clone(),&rt.executor()); + let (mut network_service, mut net_tx, mut exit_tx) = NetworkService::new(tx.clone(),&rt.executor()); let network_service = Arc::new(network_service); let stdin = tokio_stdin_stdout::stdin(0); @@ -43,18 +43,24 @@ fn main(){ let topic = Topic::new("test-net".into()); - tokio::run(futures::future::poll_fn(move || -> Result<_, ()> { + println!("Polling for stdin"); + rt.executor().spawn(futures::future::poll_fn(move || -> Result<_, ()> { loop { match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => net_tx.try_send( + Async::Ready(Some(line)) => { + println!("Got msg from stdin"); + net_tx.try_send( NetworkMessage::PubsubMessage { topics: topic.clone(), message: line.as_bytes().to_vec() - }), + }) + }, Async::Ready(None) => panic!("Stdin closed"), Async::NotReady => break, }; } Ok(Async::NotReady) })); + rt.shutdown_on_idle() + .wait().unwrap(); } \ No newline at end of file From f3c013bf49897150c96133ea18f13adb03df2b1b Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 25 Nov 2019 17:56:41 -0500 Subject: [PATCH 07/35] can bootstrap now --- node/ferret-libp2p/src/service.rs | 12 ++++++++++-- node/network/src/service.rs | 2 +- node/src/main.rs | 17 +++++++++++++++++ 3 files changed, 28 insertions(+), 3 deletions(-) diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 5fec61b93e12..6cbfe256196f 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -30,8 +30,12 @@ impl Libp2pService{ let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); + println!("Local peer id: {:?}", local_peer_id); let transport = build_transport(local_key.clone()); +// let transport = build_development_transport(local_key.clone()) +// .map_err(|err| Error::new(ErrorKind::Other, err)) +// .boxed(); let mut swarm = { let be = MyBehaviour::new(&local_key); Swarm::new(transport, be, local_peer_id) @@ -84,8 +88,12 @@ impl Stream for Libp2pService { } }, - Ok(Async::Ready(None)) | Ok(Async::NotReady) => { - break + Ok(Async::Ready(None)) => break, + Ok(Async::NotReady) => { + if let Some(a) = Swarm::listeners(&self.swarm).next() { + println!("Listening on {:?}", a); + } + break; }, _ => {break} } diff --git a/node/network/src/service.rs b/node/network/src/service.rs index ea4d2d9c29dc..d8e90d189640 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -85,7 +85,7 @@ fn poll( match libp2p_service.lock().unwrap().poll() { Ok(Async::Ready(Some(event))) => match event { NetworkEvent::PubsubMessage { source, topics, message } => { - println!("ASDFASDFSADFSAF") + println!("ASDFASDFSADFSAF"); } } Ok(Async::Ready(None)) => unreachable!("Stream never ends"), diff --git a/node/src/main.rs b/node/src/main.rs index 8cca54126faf..2922b88aac2d 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -29,6 +29,8 @@ use std::sync::Mutex; fn main(){ Builder::from_env(Env::default().default_filter_or("info")).init(); + + let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::unbounded_channel::(); @@ -36,6 +38,21 @@ fn main(){ let (mut network_service, mut net_tx, mut exit_tx) = NetworkService::new(tx.clone(),&rt.executor()); + + // Reach out to another node if specified + if let Some(to_dial) = std::env::args().nth(1) { + let dialing = to_dial.clone(); + match to_dial.parse() { + Ok(to_dial) => { + match libp2p::Swarm::dial_addr(&mut network_service.libp2p.lock().unwrap().swarm, to_dial) { + Ok(_) => println!("Dialed {:?}", dialing), + Err(e) => println!("Dial {:?} failed: {:?}", dialing, e) + } + }, + Err(err) => println!("Failed to parse address to dial: {:?}", err), + } + } + let network_service = Arc::new(network_service); let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); From 1fe9402c3438c81c21f2bd1f2109341a9c0f37f9 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 00:28:03 -0500 Subject: [PATCH 08/35] got the loop to work. --- node/ferret-libp2p/src/service.rs | 1 + node/network/src/service.rs | 10 +++++++--- node/src/main.rs | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 6cbfe256196f..9c6353a9b02d 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -98,6 +98,7 @@ impl Stream for Libp2pService { _ => {break} } } + println!("Libp2p Not ready"); Ok(Async::NotReady) } } diff --git a/node/network/src/service.rs b/node/network/src/service.rs index d8e90d189640..91af8c121291 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -8,7 +8,7 @@ use tokio::runtime::TaskExecutor; use futures::Async; use futures::stream::Stream; use futures::Future; - +use tokio::prelude::*; pub enum NetworkMessage { PubsubMessage { @@ -80,8 +80,10 @@ fn poll( } }, Ok(Async::NotReady) => break, - _ => {break} + _ => { break } } + } + loop { match libp2p_service.lock().unwrap().poll() { Ok(Async::Ready(Some(event))) => match event { NetworkEvent::PubsubMessage { source, topics, message } => { @@ -89,7 +91,9 @@ fn poll( } } Ok(Async::Ready(None)) => unreachable!("Stream never ends"), - Ok(Async::NotReady) => break, + Ok(Async::NotReady) => { + break + }, _ => {break} } } diff --git a/node/src/main.rs b/node/src/main.rs index 2922b88aac2d..7a26f9ee4882 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -78,6 +78,7 @@ fn main(){ } Ok(Async::NotReady) })); + rt.shutdown_on_idle() .wait().unwrap(); } \ No newline at end of file From a2e6fd213a0822cc766913588a34fe3272c1a009 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 13:25:44 -0500 Subject: [PATCH 09/35] linting --- node/ferret-libp2p/src/behaviour.rs | 36 +++++++------ node/ferret-libp2p/src/service.rs | 58 ++++++++++---------- node/network/src/lib.rs | 2 +- node/network/src/service.rs | 83 ++++++++++++++++------------- node/src/main.rs | 73 ++++++++++++------------- 5 files changed, 131 insertions(+), 121 deletions(-) diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs index e4f59065cabc..9bb03d717881 100644 --- a/node/ferret-libp2p/src/behaviour.rs +++ b/node/ferret-libp2p/src/behaviour.rs @@ -1,16 +1,16 @@ -use libp2p::NetworkBehaviour; -use libp2p::core::{Multiaddr, PeerId, }; +use futures::prelude::*; +use futures::Async; +use libp2p::core::identity::Keypair; +use libp2p::core::{muxing::StreamMuxerBox, nodes::Substream}; +use libp2p::core::{Multiaddr, PeerId}; +use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash}; use libp2p::kad::record; -use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; -use libp2p::core::{nodes::Substream, muxing::StreamMuxerBox}; -use log::{debug, warn}; -use libp2p::gossipsub::{Gossipsub, GossipsubEvent, GossipsubConfig, Topic, TopicHash}; use libp2p::mdns::{Mdns, MdnsEvent}; +use libp2p::swarm::toggle::Toggle; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; use libp2p::tokio_io::{AsyncRead, AsyncWrite}; -use libp2p::core::identity::Keypair; -use libp2p::{swarm::toggle::Toggle, }; -use futures::prelude::*; -use futures::Async; +use libp2p::NetworkBehaviour; +use log::{debug, warn}; #[derive(NetworkBehaviour)] #[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")] @@ -31,14 +31,16 @@ pub enum MyBehaviourEvent { }, } -impl NetworkBehaviourEventProcess for MyBehaviour { +impl NetworkBehaviourEventProcess + for MyBehaviour +{ fn inject_event(&mut self, event: MdnsEvent) { match event { MdnsEvent::Discovered(list) => { for (peer, _) in list { - self.events.push(MyBehaviourEvent::DiscoveredPeer(peer)) + self.events.push(MyBehaviourEvent::DiscoveredPeer(peer)) } - }, + } MdnsEvent::Expired(list) => { for (peer, _) in list { if !self.mdns.has_node(&peer) { @@ -50,7 +52,9 @@ impl NetworkBehaviourEventProcess } } -impl NetworkBehaviourEventProcess for MyBehaviour { +impl NetworkBehaviourEventProcess + for MyBehaviour +{ fn inject_event(&mut self, message: GossipsubEvent) { if let GossipsubEvent::Message(_, message) = message { self.events.push(MyBehaviourEvent::GossipMessage { @@ -81,7 +85,7 @@ impl MyBehaviour { MyBehaviour { gossipsub: Gossipsub::new(local_peer_id.clone(), gossipsub_config), mdns: Mdns::new().expect("Failed to create mDNS service"), - events: vec![] + events: vec![], } } @@ -93,6 +97,4 @@ impl MyBehaviour { pub fn subscribe(&mut self, topic: Topic) -> bool { self.gossipsub.subscribe(topic) } - } - diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 9c6353a9b02d..3a32b43052a8 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,29 +1,35 @@ use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; -use libp2p::{self, Swarm, core::transport::boxed::Boxed, core, secio, yamux, mplex, PeerId, core::muxing::StreamMuxerBox, core::nodes::Substream, identity, gossipsub::{Topic, TopicHash}, Transport, build_development_transport}; -use futures::{Stream, Async, Future}; +use futures::sync::mpsc; +use futures::{Async, Future, Stream}; +use libp2p::{ + self, build_development_transport, core, + core::muxing::StreamMuxerBox, + core::nodes::Substream, + core::transport::boxed::Boxed, + gossipsub::{Topic, TopicHash}, + identity, mplex, secio, yamux, PeerId, Swarm, Transport, +}; use std::io::{Error, ErrorKind}; use std::time::Duration; -use futures::sync::mpsc; use tokio::runtime::TaskExecutor; -use std::sync::{Arc, Mutex}; use futures::future::PollFn; use futures::task::Spawn; -use futures::Lazy; use futures::IntoFuture; +use futures::Lazy; use std::ops::{Deref, DerefMut}; +use std::sync::{Arc, Mutex}; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; -pub struct Libp2pService{ +pub struct Libp2pService { pub swarm: Swarm, } -impl Libp2pService{ +impl Libp2pService { // TODO Allow bootstrap and topics - pub fn new () -> Result - { + pub fn new() -> Result { // Starting Libp2p Service // TODO @Greg do local storage @@ -33,15 +39,13 @@ impl Libp2pService{ println!("Local peer id: {:?}", local_peer_id); let transport = build_transport(local_key.clone()); -// let transport = build_development_transport(local_key.clone()) -// .map_err(|err| Error::new(ErrorKind::Other, err)) -// .boxed(); + let mut swarm = { let be = MyBehaviour::new(&local_key); Swarm::new(transport, be, local_peer_id) }; - // TODO be able to specify port aand listening addr with proper error handling + // TODO be able to specify port and listening addr with proper error handling Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); // TODO be able to bootstrap peers @@ -50,14 +54,11 @@ impl Libp2pService{ let topic = Topic::new("test-net".into()); swarm.subscribe(topic.clone()); - Ok((Libp2pService{ - swarm: swarm, - })) + Ok((Libp2pService { swarm: swarm })) } } impl Stream for Libp2pService { - type Item = NetworkEvent; type Error = (); @@ -70,9 +71,8 @@ impl Stream for Libp2pService { MyBehaviourEvent::DiscoveredPeer(peer) => { println!("LIBP2P DISCOVERED PEER {:?}", peer); libp2p::Swarm::dial(&mut self.swarm, peer); - }, - MyBehaviourEvent::ExpiredPeer(peer) => { - }, + } + MyBehaviourEvent::ExpiredPeer(peer) => {} MyBehaviourEvent::GossipMessage { source, topics, @@ -83,10 +83,9 @@ impl Stream for Libp2pService { return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage { source, topics, - message + message, }))); } - }, Ok(Async::Ready(None)) => break, Ok(Async::NotReady) => { @@ -94,8 +93,8 @@ impl Stream for Libp2pService { println!("Listening on {:?}", a); } break; - }, - _ => {break} + } + _ => break, } } println!("Libp2p Not ready"); @@ -116,14 +115,15 @@ fn build_transport(local_key: identity::Keypair) -> Boxed<(PeerId, StreamMuxerBo let transport = libp2p::tcp::TcpConfig::new().nodelay(true); let transport = libp2p::dns::DnsConfig::new(transport); - transport.upgrade(core::upgrade::Version::V1) + transport + .upgrade(core::upgrade::Version::V1) .authenticate(secio::SecioConfig::new(local_key)) - .multiplex(core::upgrade::SelectUpgrade::new(yamux::Config::default(), mplex::MplexConfig::new())) + .multiplex(core::upgrade::SelectUpgrade::new( + yamux::Config::default(), + mplex::MplexConfig::new(), + )) .map(|(peer, muxer), _| (peer, core::muxing::StreamMuxerBox::new(muxer))) .timeout(Duration::from_secs(20)) .map_err(|err| Error::new(ErrorKind::Other, err)) .boxed() } - - - diff --git a/node/network/src/lib.rs b/node/network/src/lib.rs index cc097b599434..1f278a4d5194 100644 --- a/node/network/src/lib.rs +++ b/node/network/src/lib.rs @@ -1 +1 @@ -pub mod service; \ No newline at end of file +pub mod service; diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 91af8c121291..13705ccce69a 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -3,48 +3,58 @@ use tokio::sync::mpsc; use std::sync::{Arc, Mutex}; //use std::error::Error; -use libp2p::{self, Swarm, core::transport::boxed::Boxed, core, secio, yamux, mplex, PeerId, core::muxing::StreamMuxerBox, core::nodes::Substream, identity, gossipsub::{Topic, TopicHash}, Transport, build_development_transport}; -use tokio::runtime::TaskExecutor; -use futures::Async; use futures::stream::Stream; +use futures::Async; use futures::Future; +use libp2p::{ + self, build_development_transport, core, + core::muxing::StreamMuxerBox, + core::nodes::Substream, + core::transport::boxed::Boxed, + gossipsub::{Topic, TopicHash}, + identity, mplex, secio, yamux, PeerId, Swarm, Transport, +}; use tokio::prelude::*; +use tokio::runtime::TaskExecutor; pub enum NetworkMessage { - PubsubMessage { - topics: Topic, - message: Vec, - }, + PubsubMessage { topics: Topic, message: Vec }, } -pub struct NetworkService{ +pub struct NetworkService { pub libp2p: Arc>, } -impl NetworkService{ - pub fn new ( +impl NetworkService { + pub fn new( outbound_transmitter: Arc>, executor: &TaskExecutor, - ) -> (Self, mpsc::UnboundedSender, tokio::sync::oneshot::Sender) { + ) -> ( + Self, + mpsc::UnboundedSender, + tokio::sync::oneshot::Sender, + ) { let (tx, rx) = mpsc::unbounded_channel(); let libp2p_service = Arc::new(Mutex::new(Libp2pService::new().unwrap())); let exit_tx = start(libp2p_service.clone(), executor, outbound_transmitter, rx); - return (NetworkService{ - libp2p: libp2p_service, - }, tx, exit_tx); + return ( + NetworkService { + libp2p: libp2p_service, + }, + tx, + exit_tx, + ); } } -enum Error{ - aaa (u8) +enum Error { + aaa(u8), } - - -pub fn start ( +pub fn start( libp2p_service: Arc>, executor: &TaskExecutor, outbound_transmitter: Arc>, @@ -52,11 +62,9 @@ pub fn start ( ) -> tokio::sync::oneshot::Sender { let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); executor.spawn( - poll(libp2p_service,outbound_transmitter,message_receiver) + poll(libp2p_service, outbound_transmitter, message_receiver) .select(exit_rx.then(|_| Ok(()))) - .then(move |_| { - Ok(()) - }), + .then(move |_| Ok(())), ); network_exit @@ -71,32 +79,35 @@ fn poll( loop { match message_receiver.poll() { Ok(Async::Ready(Some(event))) => match event { - NetworkMessage::PubsubMessage { - topics, - message - } => { + NetworkMessage::PubsubMessage { topics, message } => { println!("Got a msg from msgchannel"); - libp2p_service.lock().unwrap().swarm.publish(&topics, message); + libp2p_service + .lock() + .unwrap() + .swarm + .publish(&topics, message); } }, Ok(Async::NotReady) => break, - _ => { break } + _ => break, } } loop { match libp2p_service.lock().unwrap().poll() { Ok(Async::Ready(Some(event))) => match event { - NetworkEvent::PubsubMessage { source, topics, message } => { + NetworkEvent::PubsubMessage { + source, + topics, + message, + } => { println!("ASDFASDFSADFSAF"); } - } - Ok(Async::Ready(None)) => unreachable!("Stream never ends"), - Ok(Async::NotReady) => { - break }, - _ => {break} + Ok(Async::Ready(None)) => unreachable!("Stream never ends"), + Ok(Async::NotReady) => break, + _ => break, } } Ok(Async::NotReady) }) -} \ No newline at end of file +} diff --git a/node/src/main.rs b/node/src/main.rs index 7a26f9ee4882..ab57ab536573 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -1,54 +1,52 @@ use libp2p::{ + gossipsub::Topic, identity, - PeerId, - gossipsub::{ - Topic, - }, - swarm::{ - Swarm, - }, + swarm::Swarm, tokio_codec::{FramedRead, LinesCodec}, + PeerId, }; use tokio::sync::mpsc; use ferret_libp2p::behaviour::*; -use ferret_libp2p::service::{NetworkEvent}; +use ferret_libp2p::service::NetworkEvent; use network::service::*; use futures::prelude::*; use env_logger::{Builder, Env}; use ferret_libp2p::service; +use futures::future::lazy; use std::sync::Arc; -use tokio::prelude::*; +use std::sync::Mutex; use tokio; -use futures::future::lazy; +use tokio::prelude::*; use tokio::runtime::Runtime; -use std::sync::Mutex; -fn main(){ +fn main() { Builder::from_env(Env::default().default_filter_or("info")).init(); - let mut rt = Runtime::new().unwrap(); let (tx, rx) = mpsc::unbounded_channel::(); let mut tx = Arc::new(tx); - let (mut network_service, mut net_tx, mut exit_tx) = NetworkService::new(tx.clone(),&rt.executor()); - + let (mut network_service, mut net_tx, mut exit_tx) = + NetworkService::new(tx.clone(), &rt.executor()); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { let dialing = to_dial.clone(); match to_dial.parse() { Ok(to_dial) => { - match libp2p::Swarm::dial_addr(&mut network_service.libp2p.lock().unwrap().swarm, to_dial) { + match libp2p::Swarm::dial_addr( + &mut network_service.libp2p.lock().unwrap().swarm, + to_dial, + ) { Ok(_) => println!("Dialed {:?}", dialing), - Err(e) => println!("Dial {:?} failed: {:?}", dialing, e) + Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), } - }, + } Err(err) => println!("Failed to parse address to dial: {:?}", err), } } @@ -61,24 +59,23 @@ fn main(){ let topic = Topic::new("test-net".into()); println!("Polling for stdin"); - rt.executor().spawn(futures::future::poll_fn(move || -> Result<_, ()> { - loop { - match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => { - println!("Got msg from stdin"); - net_tx.try_send( - NetworkMessage::PubsubMessage { - topics: topic.clone(), - message: line.as_bytes().to_vec() - }) - }, - Async::Ready(None) => panic!("Stdin closed"), - Async::NotReady => break, - }; - } - Ok(Async::NotReady) - })); + rt.executor() + .spawn(futures::future::poll_fn(move || -> Result<_, ()> { + loop { + match framed_stdin.poll().expect("Error while polling stdin") { + Async::Ready(Some(line)) => { + println!("Got msg from stdin"); + net_tx.try_send(NetworkMessage::PubsubMessage { + topics: topic.clone(), + message: line.as_bytes().to_vec(), + }) + } + Async::Ready(None) => panic!("Stdin closed"), + Async::NotReady => break, + }; + } + Ok(Async::NotReady) + })); - rt.shutdown_on_idle() - .wait().unwrap(); -} \ No newline at end of file + rt.shutdown_on_idle().wait().unwrap(); +} From f0f2f7a068a8aa1efc5bb8617440a9fa4623b4e1 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 13:28:11 -0500 Subject: [PATCH 10/35] more linting --- node/ferret-libp2p/src/behaviour.rs | 13 ++++++------- node/ferret-libp2p/src/service.rs | 20 ++++++-------------- node/network/src/service.rs | 16 ++++++++-------- node/src/main.rs | 21 +++++++++------------ 4 files changed, 29 insertions(+), 41 deletions(-) diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs index 9bb03d717881..21fcf47d2131 100644 --- a/node/ferret-libp2p/src/behaviour.rs +++ b/node/ferret-libp2p/src/behaviour.rs @@ -1,16 +1,15 @@ use futures::prelude::*; use futures::Async; use libp2p::core::identity::Keypair; -use libp2p::core::{muxing::StreamMuxerBox, nodes::Substream}; -use libp2p::core::{Multiaddr, PeerId}; + +use libp2p::core::PeerId; use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash}; -use libp2p::kad::record; + use libp2p::mdns::{Mdns, MdnsEvent}; -use libp2p::swarm::toggle::Toggle; + use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; use libp2p::tokio_io::{AsyncRead, AsyncWrite}; use libp2p::NetworkBehaviour; -use log::{debug, warn}; #[derive(NetworkBehaviour)] #[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")] @@ -80,10 +79,10 @@ impl MyBehaviour { impl MyBehaviour { pub fn new(local_key: &Keypair) -> Self { - let local_peer_id = local_key.public().clone().into_peer_id(); + let local_peer_id = local_key.public().into_peer_id(); let gossipsub_config = GossipsubConfig::default(); MyBehaviour { - gossipsub: Gossipsub::new(local_peer_id.clone(), gossipsub_config), + gossipsub: Gossipsub::new(local_peer_id, gossipsub_config), mdns: Mdns::new().expect("Failed to create mDNS service"), events: vec![], } diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 3a32b43052a8..9c96f9a38177 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,8 +1,8 @@ use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; -use futures::sync::mpsc; + use futures::{Async, Future, Stream}; use libp2p::{ - self, build_development_transport, core, + self, core, core::muxing::StreamMuxerBox, core::nodes::Substream, core::transport::boxed::Boxed, @@ -11,14 +11,6 @@ use libp2p::{ }; use std::io::{Error, ErrorKind}; use std::time::Duration; -use tokio::runtime::TaskExecutor; - -use futures::future::PollFn; -use futures::task::Spawn; -use futures::IntoFuture; -use futures::Lazy; -use std::ops::{Deref, DerefMut}; -use std::sync::{Arc, Mutex}; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; @@ -52,9 +44,9 @@ impl Libp2pService { // TODO build list of topics let topic = Topic::new("test-net".into()); - swarm.subscribe(topic.clone()); + swarm.subscribe(topic); - Ok((Libp2pService { swarm: swarm })) + Ok(Libp2pService { swarm }) } } @@ -63,7 +55,7 @@ impl Stream for Libp2pService { type Error = (); fn poll(&mut self) -> Result>, Self::Error> { - let mut listening = false; + let _listening = false; loop { println!("loop poll"); match self.swarm.poll() { @@ -72,7 +64,7 @@ impl Stream for Libp2pService { println!("LIBP2P DISCOVERED PEER {:?}", peer); libp2p::Swarm::dial(&mut self.swarm, peer); } - MyBehaviourEvent::ExpiredPeer(peer) => {} + MyBehaviourEvent::ExpiredPeer(_peer) => {} MyBehaviourEvent::GossipMessage { source, topics, diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 13705ccce69a..686aa404c937 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -14,7 +14,7 @@ use libp2p::{ gossipsub::{Topic, TopicHash}, identity, mplex, secio, yamux, PeerId, Swarm, Transport, }; -use tokio::prelude::*; + use tokio::runtime::TaskExecutor; pub enum NetworkMessage { @@ -40,13 +40,13 @@ impl NetworkService { let exit_tx = start(libp2p_service.clone(), executor, outbound_transmitter, rx); - return ( + ( NetworkService { libp2p: libp2p_service, }, tx, exit_tx, - ); + ) } } @@ -58,7 +58,7 @@ pub fn start( libp2p_service: Arc>, executor: &TaskExecutor, outbound_transmitter: Arc>, - mut message_receiver: mpsc::UnboundedReceiver, + message_receiver: mpsc::UnboundedReceiver, ) -> tokio::sync::oneshot::Sender { let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); executor.spawn( @@ -72,7 +72,7 @@ pub fn start( fn poll( libp2p_service: Arc>, - outbound_transmitter: Arc>, + _outbound_transmitter: Arc>, mut message_receiver: mpsc::UnboundedReceiver, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, Error> { @@ -96,9 +96,9 @@ fn poll( match libp2p_service.lock().unwrap().poll() { Ok(Async::Ready(Some(event))) => match event { NetworkEvent::PubsubMessage { - source, - topics, - message, + source: _, + topics: _, + message: _, } => { println!("ASDFASDFSADFSAF"); } diff --git a/node/src/main.rs b/node/src/main.rs index ab57ab536573..9ae1d477d9fd 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -8,31 +8,28 @@ use libp2p::{ use tokio::sync::mpsc; -use ferret_libp2p::behaviour::*; use ferret_libp2p::service::NetworkEvent; use network::service::*; use futures::prelude::*; use env_logger::{Builder, Env}; -use ferret_libp2p::service; -use futures::future::lazy; + use std::sync::Arc; -use std::sync::Mutex; + use tokio; -use tokio::prelude::*; + use tokio::runtime::Runtime; fn main() { Builder::from_env(Env::default().default_filter_or("info")).init(); - let mut rt = Runtime::new().unwrap(); + let rt = Runtime::new().unwrap(); - let (tx, rx) = mpsc::unbounded_channel::(); - let mut tx = Arc::new(tx); + let (tx, _rx) = mpsc::unbounded_channel::(); + let tx = Arc::new(tx); - let (mut network_service, mut net_tx, mut exit_tx) = - NetworkService::new(tx.clone(), &rt.executor()); + let (network_service, mut net_tx, _exit_tx) = NetworkService::new(tx, &rt.executor()); // Reach out to another node if specified if let Some(to_dial) = std::env::args().nth(1) { @@ -51,10 +48,10 @@ fn main() { } } - let network_service = Arc::new(network_service); + let _network_service = Arc::new(network_service); let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); - let mut listening = false; + let _listening = false; let topic = Topic::new("test-net".into()); From eca7248bb34ba2e479ee64c0f89022b036ff02b3 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 13:51:35 -0500 Subject: [PATCH 11/35] more linting --- node/ferret-libp2p/src/behaviour.rs | 1 - node/ferret-libp2p/src/service.rs | 2 +- node/network/src/main.rs | 75 ----------------------------- node/network/src/service.rs | 17 +++---- node/src/clock/test.rs | 2 +- node/src/main.rs | 4 -- 6 files changed, 8 insertions(+), 93 deletions(-) delete mode 100644 node/network/src/main.rs diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs index 21fcf47d2131..66832985e393 100644 --- a/node/ferret-libp2p/src/behaviour.rs +++ b/node/ferret-libp2p/src/behaviour.rs @@ -1,4 +1,3 @@ -use futures::prelude::*; use futures::Async; use libp2p::core::identity::Keypair; diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 9c96f9a38177..a00b77a7437d 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,6 +1,6 @@ use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; -use futures::{Async, Future, Stream}; +use futures::{Async, Stream}; use libp2p::{ self, core, core::muxing::StreamMuxerBox, diff --git a/node/network/src/main.rs b/node/network/src/main.rs deleted file mode 100644 index f9f9fb1c0131..000000000000 --- a/node/network/src/main.rs +++ /dev/null @@ -1,75 +0,0 @@ -use libp2p::{ - gossipsub::Topic, - identity, - swarm::Swarm, - tokio_codec::{FramedRead, LinesCodec}, - PeerId, -}; - -use tokio::sync::mpsc; - -use ferret_libp2p::service::NetworkEvent; -use network::service::*; - -use futures::prelude::*; - - -use std::sync::Arc; - -use tokio; - -use tokio::runtime::Runtime; - -fn main() { - let rt = Runtime::new().unwrap(); - - let (tx, _rx) = mpsc::unbounded_channel::(); - let tx = Arc::new(tx); - - let (network_service, mut net_tx, _exit_tx) = NetworkService::new(tx, &rt.executor()); - - // Reach out to another node if specified - if let Some(to_dial) = std::env::args().nth(1) { - let dialing = to_dial.clone(); - match to_dial.parse() { - Ok(to_dial) => { - match libp2p::Swarm::dial_addr( - &mut network_service.libp2p.lock().unwrap().swarm, - to_dial, - ) { - Ok(_) => println!("Dialed {:?}", dialing), - Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), - } - } - Err(err) => println!("Failed to parse address to dial: {:?}", err), - } - } - - let _network_service = Arc::new(network_service); - let stdin = tokio_stdin_stdout::stdin(0); - let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); - let _listening = false; - - let topic = Topic::new("test-net".into()); - - println!("Polling for stdin"); - rt.executor() - .spawn(futures::future::poll_fn(move || -> Result<_, ()> { - loop { - match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => { - println!("Got msg from stdin"); - net_tx.try_send(NetworkMessage::PubsubMessage { - topics: topic.clone(), - message: line.as_bytes().to_vec(), - }) - } - Async::Ready(None) => panic!("Stdin closed"), - Async::NotReady => break, - }; - } - Ok(Async::NotReady) - })); - - rt.shutdown_on_idle().wait().unwrap(); -} \ No newline at end of file diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 686aa404c937..d2093c611237 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -7,12 +7,8 @@ use futures::stream::Stream; use futures::Async; use futures::Future; use libp2p::{ - self, build_development_transport, core, - core::muxing::StreamMuxerBox, - core::nodes::Substream, - core::transport::boxed::Boxed, - gossipsub::{Topic, TopicHash}, - identity, mplex, secio, yamux, PeerId, Swarm, Transport, + self, + gossipsub::{Topic, }, }; use tokio::runtime::TaskExecutor; @@ -51,7 +47,6 @@ impl NetworkService { } enum Error { - aaa(u8), } pub fn start( @@ -96,11 +91,11 @@ fn poll( match libp2p_service.lock().unwrap().poll() { Ok(Async::Ready(Some(event))) => match event { NetworkEvent::PubsubMessage { - source: _, - topics: _, - message: _, + source, + topics, + message, } => { - println!("ASDFASDFSADFSAF"); + println!("Received a message from GossipSub! {:?}, {:?}, {:?}", source, topics, message); } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), diff --git a/node/src/clock/test.rs b/node/src/clock/test.rs index a5895107c852..f21687452f07 100644 --- a/node/src/clock/test.rs +++ b/node/src/clock/test.rs @@ -3,7 +3,7 @@ use crate::clock::ChainEpochClock; #[test] fn create_chain_epoch_clock() { - let utc_timestamp = 1574286946904; + let utc_timestamp = 1_574_286_946_904; let clock = ChainEpochClock::new(utc_timestamp); assert_eq!(clock.get_time().timestamp(), utc_timestamp); } diff --git a/node/src/main.rs b/node/src/main.rs index 94b86bf1cb77..b15c5cfe85a5 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -4,10 +4,7 @@ use cli::cli; use libp2p::{ gossipsub::Topic, - identity, - swarm::Swarm, tokio_codec::{FramedRead, LinesCodec}, - PeerId, }; use tokio::sync::mpsc; @@ -17,7 +14,6 @@ use network::service::*; use futures::prelude::*; - use std::sync::Arc; use tokio; From 24263c0b08fed0ebee0b95bb5b7d0575387d577d Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 14:27:45 -0500 Subject: [PATCH 12/35] add config --- node/ferret-libp2p/src/config.rs | 18 ++++++++++++++++++ node/ferret-libp2p/src/lib.rs | 1 + node/ferret-libp2p/src/service.rs | 30 +++++++++++++++++++++--------- node/network/src/service.rs | 6 +++--- node/src/main.rs | 26 +++++++------------------- 5 files changed, 50 insertions(+), 31 deletions(-) create mode 100644 node/ferret-libp2p/src/config.rs diff --git a/node/ferret-libp2p/src/config.rs b/node/ferret-libp2p/src/config.rs new file mode 100644 index 000000000000..3604c23b3dd9 --- /dev/null +++ b/node/ferret-libp2p/src/config.rs @@ -0,0 +1,18 @@ +use libp2p::gossipsub::Topic; + +pub struct Libp2pConfig{ + pub listening_multiaddr: String, + pub pubsub_topics: Vec, + pub bootstrap_peers: Vec, +} + +impl Default for Libp2pConfig{ + fn default() -> Self { + Libp2pConfig{ + listening_multiaddr: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + pubsub_topics: vec![], + bootstrap_peers: vec![] + } + } +} + diff --git a/node/ferret-libp2p/src/lib.rs b/node/ferret-libp2p/src/lib.rs index 89524ede2664..50d7f3ac9c76 100644 --- a/node/ferret-libp2p/src/lib.rs +++ b/node/ferret-libp2p/src/lib.rs @@ -1,5 +1,6 @@ pub mod behaviour; pub mod service; +pub mod config; #[cfg(test)] mod tests { #[test] diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index a00b77a7437d..02bbe6dc279b 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,5 +1,4 @@ use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; - use futures::{Async, Stream}; use libp2p::{ self, core, @@ -11,6 +10,7 @@ use libp2p::{ }; use std::io::{Error, ErrorKind}; use std::time::Duration; +use super::config::Libp2pConfig; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; @@ -20,8 +20,7 @@ pub struct Libp2pService { } impl Libp2pService { - // TODO Allow bootstrap and topics - pub fn new() -> Result { + pub fn new(config: &Libp2pConfig) -> Result { // Starting Libp2p Service // TODO @Greg do local storage @@ -37,14 +36,27 @@ impl Libp2pService { Swarm::new(transport, be, local_peer_id) }; - // TODO be able to specify port and listening addr with proper error handling - Swarm::listen_on(&mut swarm, "/ip4/0.0.0.0/tcp/0".parse().unwrap()).unwrap(); + for node in config.bootstrap_peers.clone() { + let dialing = node.clone(); + match node.parse() { + Ok(to_dial) => { + match libp2p::Swarm::dial_addr( + &mut swarm, + to_dial, + ) { + Ok(_) => println!("Dialed {:?}", dialing), + Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), + } + } + Err(err) => println!("Failed to parse address to dial: {:?}", err), + } + } - // TODO be able to bootstrap peers - // TODO build list of topics + Swarm::listen_on(&mut swarm, config.listening_multiaddr.parse().unwrap()).unwrap(); - let topic = Topic::new("test-net".into()); - swarm.subscribe(topic); + for topic in config.pubsub_topics.clone() { + swarm.subscribe(topic); + } Ok(Libp2pService { swarm }) } diff --git a/node/network/src/service.rs b/node/network/src/service.rs index d2093c611237..61a23a2f8ca9 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -1,8 +1,7 @@ use ferret_libp2p::service::{Libp2pService, NetworkEvent}; +use ferret_libp2p::config::Libp2pConfig; use tokio::sync::mpsc; - use std::sync::{Arc, Mutex}; -//use std::error::Error; use futures::stream::Stream; use futures::Async; use futures::Future; @@ -23,6 +22,7 @@ pub struct NetworkService { impl NetworkService { pub fn new( + config: &Libp2pConfig, outbound_transmitter: Arc>, executor: &TaskExecutor, ) -> ( @@ -32,7 +32,7 @@ impl NetworkService { ) { let (tx, rx) = mpsc::unbounded_channel(); - let libp2p_service = Arc::new(Mutex::new(Libp2pService::new().unwrap())); + let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(config).unwrap())); let exit_tx = start(libp2p_service.clone(), executor, outbound_transmitter, rx); diff --git a/node/src/main.rs b/node/src/main.rs index b15c5cfe85a5..90021e2555ee 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -10,6 +10,7 @@ use libp2p::{ use tokio::sync::mpsc; use ferret_libp2p::service::NetworkEvent; +use ferret_libp2p::config::Libp2pConfig; use network::service::*; use futures::prelude::*; @@ -28,31 +29,18 @@ fn main() { let (tx, _rx) = mpsc::unbounded_channel::(); let tx = Arc::new(tx); - let (network_service, mut net_tx, _exit_tx) = NetworkService::new(tx, &rt.executor()); - - // Reach out to another node if specified - if let Some(to_dial) = std::env::args().nth(1) { - let dialing = to_dial.clone(); - match to_dial.parse() { - Ok(to_dial) => { - match libp2p::Swarm::dial_addr( - &mut network_service.libp2p.lock().unwrap().swarm, - to_dial, - ) { - Ok(_) => println!("Dialed {:?}", dialing), - Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), - } - } - Err(err) => println!("Failed to parse address to dial: {:?}", err), - } - } + let mut netcfg = Libp2pConfig::default(); + let topic = Topic::new("test-net".into()); + netcfg.pubsub_topics.push(topic.clone()); + + let (network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg,tx, &rt.executor()); + let _network_service = Arc::new(network_service); let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); let _listening = false; - let topic = Topic::new("test-net".into()); println!("Polling for stdin"); rt.executor() From 4076fbb38e6733bd92c3bc2ba11c7126723eb4f8 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 15:38:17 -0500 Subject: [PATCH 13/35] remove some prints --- node/ferret-libp2p/src/service.rs | 14 ++------------ node/network/src/service.rs | 1 - node/src/main.rs | 7 ++----- 3 files changed, 4 insertions(+), 18 deletions(-) diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 02bbe6dc279b..2da5be5439b9 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -5,7 +5,7 @@ use libp2p::{ core::muxing::StreamMuxerBox, core::nodes::Substream, core::transport::boxed::Boxed, - gossipsub::{Topic, TopicHash}, + gossipsub::{TopicHash}, identity, mplex, secio, yamux, PeerId, Swarm, Transport, }; use std::io::{Error, ErrorKind}; @@ -21,8 +21,6 @@ pub struct Libp2pService { impl Libp2pService { pub fn new(config: &Libp2pConfig) -> Result { - // Starting Libp2p Service - // TODO @Greg do local storage let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); @@ -69,11 +67,9 @@ impl Stream for Libp2pService { fn poll(&mut self) -> Result>, Self::Error> { let _listening = false; loop { - println!("loop poll"); match self.swarm.poll() { Ok(Async::Ready(Some(event))) => match event { MyBehaviourEvent::DiscoveredPeer(peer) => { - println!("LIBP2P DISCOVERED PEER {:?}", peer); libp2p::Swarm::dial(&mut self.swarm, peer); } MyBehaviourEvent::ExpiredPeer(_peer) => {} @@ -92,16 +88,10 @@ impl Stream for Libp2pService { } }, Ok(Async::Ready(None)) => break, - Ok(Async::NotReady) => { - if let Some(a) = Swarm::listeners(&self.swarm).next() { - println!("Listening on {:?}", a); - } - break; - } + Ok(Async::NotReady) => break, _ => break, } } - println!("Libp2p Not ready"); Ok(Async::NotReady) } } diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 61a23a2f8ca9..df35066d2b80 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -75,7 +75,6 @@ fn poll( match message_receiver.poll() { Ok(Async::Ready(Some(event))) => match event { NetworkMessage::PubsubMessage { topics, message } => { - println!("Got a msg from msgchannel"); libp2p_service .lock() .unwrap() diff --git a/node/src/main.rs b/node/src/main.rs index 90021e2555ee..0fd532d0923d 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -9,8 +9,8 @@ use libp2p::{ use tokio::sync::mpsc; -use ferret_libp2p::service::NetworkEvent; use ferret_libp2p::config::Libp2pConfig; +use ferret_libp2p::service::NetworkEvent; use network::service::*; use futures::prelude::*; @@ -33,16 +33,13 @@ fn main() { let topic = Topic::new("test-net".into()); netcfg.pubsub_topics.push(topic.clone()); - let (network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg,tx, &rt.executor()); - + let (network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); let _network_service = Arc::new(network_service); let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); let _listening = false; - - println!("Polling for stdin"); rt.executor() .spawn(futures::future::poll_fn(move || -> Result<_, ()> { loop { From 830ca7ed2e194142bd00f8e4b57f6a4472df1d3e Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 16:34:06 -0500 Subject: [PATCH 14/35] docs --- node/ferret-libp2p/src/config.rs | 15 +++++------ node/ferret-libp2p/src/lib.rs | 2 +- node/ferret-libp2p/src/service.rs | 35 +++++++++++++------------ node/network/src/service.rs | 43 +++++++++++++++++++++++-------- node/src/main.rs | 2 +- 5 files changed, 59 insertions(+), 38 deletions(-) diff --git a/node/ferret-libp2p/src/config.rs b/node/ferret-libp2p/src/config.rs index 3604c23b3dd9..cd217b604e32 100644 --- a/node/ferret-libp2p/src/config.rs +++ b/node/ferret-libp2p/src/config.rs @@ -1,18 +1,17 @@ use libp2p::gossipsub::Topic; -pub struct Libp2pConfig{ +pub struct Libp2pConfig { pub listening_multiaddr: String, pub pubsub_topics: Vec, pub bootstrap_peers: Vec, } -impl Default for Libp2pConfig{ +impl Default for Libp2pConfig { fn default() -> Self { - Libp2pConfig{ - listening_multiaddr: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), - pubsub_topics: vec![], - bootstrap_peers: vec![] - } + Libp2pConfig { + listening_multiaddr: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + pubsub_topics: vec![], + bootstrap_peers: vec![], + } } } - diff --git a/node/ferret-libp2p/src/lib.rs b/node/ferret-libp2p/src/lib.rs index 50d7f3ac9c76..0429457acf01 100644 --- a/node/ferret-libp2p/src/lib.rs +++ b/node/ferret-libp2p/src/lib.rs @@ -1,6 +1,6 @@ pub mod behaviour; -pub mod service; pub mod config; +pub mod service; #[cfg(test)] mod tests { #[test] diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 2da5be5439b9..4ffeefc05eb9 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,30 +1,34 @@ +use super::config::Libp2pConfig; use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; use futures::{Async, Stream}; use libp2p::{ - self, core, - core::muxing::StreamMuxerBox, - core::nodes::Substream, - core::transport::boxed::Boxed, - gossipsub::{TopicHash}, - identity, mplex, secio, yamux, PeerId, Swarm, Transport, + self, core, core::muxing::StreamMuxerBox, core::nodes::Substream, + core::transport::boxed::Boxed, gossipsub::TopicHash, identity, mplex, secio, yamux, PeerId, + Swarm, Transport, }; use std::io::{Error, ErrorKind}; use std::time::Duration; -use super::config::Libp2pConfig; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; +/// The Libp2pService listens to events from the Libp2p swarm. pub struct Libp2pService { pub swarm: Swarm, } impl Libp2pService { + /// Constructs a Libp2pService + /// + /// # Example + /// ``` + /// let mut netcfg = Libp2pConfig::default(); + /// let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(config).unwrap())); + /// ``` pub fn new(config: &Libp2pConfig) -> Result { // TODO @Greg do local storage let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); - println!("Local peer id: {:?}", local_peer_id); let transport = build_transport(local_key.clone()); @@ -37,15 +41,10 @@ impl Libp2pService { for node in config.bootstrap_peers.clone() { let dialing = node.clone(); match node.parse() { - Ok(to_dial) => { - match libp2p::Swarm::dial_addr( - &mut swarm, - to_dial, - ) { - Ok(_) => println!("Dialed {:?}", dialing), - Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), - } - } + Ok(to_dial) => match libp2p::Swarm::dial_addr(&mut swarm, to_dial) { + Ok(_) => println!("Dialed {:?}", dialing), + Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), + }, Err(err) => println!("Failed to parse address to dial: {:?}", err), } } @@ -64,6 +63,7 @@ impl Stream for Libp2pService { type Item = NetworkEvent; type Error = (); + /// Continuously polls the Libp2p swarm to get events fn poll(&mut self) -> Result>, Self::Error> { let _listening = false; loop { @@ -96,6 +96,7 @@ impl Stream for Libp2pService { } } +/// Events emitted by this Service to be listened by the NetworkService. #[derive(Clone)] pub enum NetworkEvent { PubsubMessage { diff --git a/node/network/src/service.rs b/node/network/src/service.rs index df35066d2b80..1f4d42e55d33 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -1,26 +1,44 @@ -use ferret_libp2p::service::{Libp2pService, NetworkEvent}; use ferret_libp2p::config::Libp2pConfig; -use tokio::sync::mpsc; -use std::sync::{Arc, Mutex}; +use ferret_libp2p::service::{Libp2pService, NetworkEvent}; use futures::stream::Stream; use futures::Async; use futures::Future; -use libp2p::{ - self, - gossipsub::{Topic, }, -}; +use libp2p::{self, gossipsub::Topic}; +use std::sync::{Arc, Mutex}; +use tokio::sync::mpsc; use tokio::runtime::TaskExecutor; +/// Ingress events to the NetworkService pub enum NetworkMessage { PubsubMessage { topics: Topic, message: Vec }, } +/// The NetworkService receives commands through a channel which communicates with Libp2p. +/// It also listens to the Libp2p service for pub struct NetworkService { pub libp2p: Arc>, } impl NetworkService { + /// Starts a Libp2pService with a given config, UnboundedSender, and tokio executor. + /// Returns an UnboundedSender channel so messages can come in. + /// + /// # Example + /// ``` + /// use tokio::runtime::Runtime; + /// use tokio::sync::mpsc; + /// use ferret_libp2p::service::NetworkEvent; + /// use ferret_libp2p::config::Libp2pConfig; + /// use std::sync::Arc; + /// + /// + /// let rt = Runtime::new().unwrap(); + /// let (tx, _rx) = mpsc::unbounded_channel::(); + /// let tx = Arc::new(tx); + /// let mut netcfg = Libp2pConfig::default(); + /// let (network_service, mut net_tx, _exit_tx) = new(&netcfg, tx, &rt.executor()); + /// ``` pub fn new( config: &Libp2pConfig, outbound_transmitter: Arc>, @@ -46,10 +64,10 @@ impl NetworkService { } } -enum Error { -} +enum Error {} -pub fn start( +/// Spawns the NetworkService service. +fn start( libp2p_service: Arc>, executor: &TaskExecutor, outbound_transmitter: Arc>, @@ -94,7 +112,10 @@ fn poll( topics, message, } => { - println!("Received a message from GossipSub! {:?}, {:?}, {:?}", source, topics, message); + println!( + "Received a message from GossipSub! {:?}, {:?}, {:?}", + source, topics, message + ); } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), diff --git a/node/src/main.rs b/node/src/main.rs index 0fd532d0923d..6a4e9c444a91 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -24,11 +24,11 @@ use tokio::runtime::Runtime; fn main() { cli(); + /// TODO Everything below should be run in a function somewhere, but since we only have this main right now, should be ok to leave here let rt = Runtime::new().unwrap(); let (tx, _rx) = mpsc::unbounded_channel::(); let tx = Arc::new(tx); - let mut netcfg = Libp2pConfig::default(); let topic = Topic::new("test-net".into()); netcfg.pubsub_topics.push(topic.clone()); From c20a7fb6297f5fcedebe402ca9421b204932ad08 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 17:59:29 -0500 Subject: [PATCH 15/35] PR suggestions --- node/ferret-libp2p/Cargo.toml | 2 +- node/ferret-libp2p/src/service.rs | 5 +++-- node/network/src/service.rs | 2 +- node/src/main.rs | 14 ++++++++------ 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/node/ferret-libp2p/Cargo.toml b/node/ferret-libp2p/Cargo.toml index 650c0af40d43..64e090c219d8 100644 --- a/node/ferret-libp2p/Cargo.toml +++ b/node/ferret-libp2p/Cargo.toml @@ -1,7 +1,7 @@ [package] name = "ferret-libp2p" version = "0.1.0" -authors = ["Eric Tu "] +authors = ["ChainSafe Systems "] edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 4ffeefc05eb9..bca258f79aec 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -49,7 +49,9 @@ impl Libp2pService { } } - Swarm::listen_on(&mut swarm, config.listening_multiaddr.parse().unwrap()).unwrap(); + Swarm::listen_on( + &mut swarm, + config.listening_multiaddr.parse().expect("Incorrect MultiAddr Format"))?; for topic in config.pubsub_topics.clone() { swarm.subscribe(topic); @@ -65,7 +67,6 @@ impl Stream for Libp2pService { /// Continuously polls the Libp2p swarm to get events fn poll(&mut self) -> Result>, Self::Error> { - let _listening = false; loop { match self.swarm.poll() { Ok(Async::Ready(Some(event))) => match event { diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 1f4d42e55d33..78e405e34c36 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -88,7 +88,7 @@ fn poll( _outbound_transmitter: Arc>, mut message_receiver: mpsc::UnboundedReceiver, ) -> impl futures::Future { - futures::future::poll_fn(move || -> Result<_, Error> { + futures::future::poll_fn(move || -> Result<_, _> { loop { match message_receiver.poll() { Ok(Async::Ready(Some(event))) => match event { diff --git a/node/src/main.rs b/node/src/main.rs index 6a4e9c444a91..5b32bbbcbd3d 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -24,24 +24,25 @@ use tokio::runtime::Runtime; fn main() { cli(); - /// TODO Everything below should be run in a function somewhere, but since we only have this main right now, should be ok to leave here + // TODO Everything below should be run in a function somewhere, but since we only have this + // main right now, should be ok to leave here let rt = Runtime::new().unwrap(); let (tx, _rx) = mpsc::unbounded_channel::(); let tx = Arc::new(tx); let mut netcfg = Libp2pConfig::default(); - let topic = Topic::new("test-net".into()); + let topic = Topic::new("/fil/blocks".into()); + netcfg.pubsub_topics.push(topic.clone()); + let topic = Topic::new("/fil/messages".into()); netcfg.pubsub_topics.push(topic.clone()); - let (network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); + let (_network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); - let _network_service = Arc::new(network_service); let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); - let _listening = false; rt.executor() - .spawn(futures::future::poll_fn(move || -> Result<_, ()> { + .spawn(futures::future::poll_fn(move || -> Result<_, _> { loop { match framed_stdin.poll().expect("Error while polling stdin") { Async::Ready(Some(line)) => { @@ -60,3 +61,4 @@ fn main() { rt.shutdown_on_idle().wait().unwrap(); } + From 96b5f12cb4635c8cbd1e6f958f5ceb1cc4853d7c Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 18:01:09 -0500 Subject: [PATCH 16/35] more suggestions --- node/Cargo.toml | 4 ++-- node/ferret-libp2p/Cargo.toml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/Cargo.toml b/node/Cargo.toml index 0a68798b7f9f..a67e6eb364d9 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -13,9 +13,9 @@ ferret-libp2p = { path = "ferret-libp2p"} libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } tokio = "0.1.22" -tokio-stdin-stdout = "*" +tokio-stdin-stdout = "0.1.5" futures = "0.1.29" log = "0.4.8" -env_logger = "*" +env_logger = "0.7.1" clap = "2.33.0" chrono = "0.4.9" diff --git a/node/ferret-libp2p/Cargo.toml b/node/ferret-libp2p/Cargo.toml index 64e090c219d8..e69b58cad017 100644 --- a/node/ferret-libp2p/Cargo.toml +++ b/node/ferret-libp2p/Cargo.toml @@ -9,7 +9,7 @@ edition = "2018" [dependencies] libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } tokio = "0.1.22" -tokio-stdin-stdout = "*" +tokio-stdin-stdout = "0.1.5" futures = "0.1.29" log = "0.4.8" -env_logger = "*" \ No newline at end of file +env_logger = "0.7.1" \ No newline at end of file From 21552e7f855befde216a16dba9a7e732e553e389 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 18:11:12 -0500 Subject: [PATCH 17/35] even more suggestions --- node/ferret-libp2p/src/config.rs | 2 +- node/ferret-libp2p/src/lib.rs | 7 ------- node/ferret-libp2p/src/service.rs | 8 ++++---- 3 files changed, 5 insertions(+), 12 deletions(-) diff --git a/node/ferret-libp2p/src/config.rs b/node/ferret-libp2p/src/config.rs index cd217b604e32..1a3831fc5901 100644 --- a/node/ferret-libp2p/src/config.rs +++ b/node/ferret-libp2p/src/config.rs @@ -9,7 +9,7 @@ pub struct Libp2pConfig { impl Default for Libp2pConfig { fn default() -> Self { Libp2pConfig { - listening_multiaddr: "/ip4/0.0.0.0/tcp/0".parse().unwrap(), + listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(), pubsub_topics: vec![], bootstrap_peers: vec![], } diff --git a/node/ferret-libp2p/src/lib.rs b/node/ferret-libp2p/src/lib.rs index 0429457acf01..9caf6c2a5388 100644 --- a/node/ferret-libp2p/src/lib.rs +++ b/node/ferret-libp2p/src/lib.rs @@ -1,10 +1,3 @@ pub mod behaviour; pub mod config; pub mod service; -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index bca258f79aec..5fb1cb3b7f9e 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,8 +1,8 @@ use super::config::Libp2pConfig; -use crate::behaviour::{MyBehaviour, MyBehaviourEvent}; +use super::behaviour::{MyBehaviour, MyBehaviourEvent}; use futures::{Async, Stream}; use libp2p::{ - self, core, core::muxing::StreamMuxerBox, core::nodes::Substream, + core, core::muxing::StreamMuxerBox, core::nodes::Substream, core::transport::boxed::Boxed, gossipsub::TopicHash, identity, mplex, secio, yamux, PeerId, Swarm, Transport, }; @@ -41,7 +41,7 @@ impl Libp2pService { for node in config.bootstrap_peers.clone() { let dialing = node.clone(); match node.parse() { - Ok(to_dial) => match libp2p::Swarm::dial_addr(&mut swarm, to_dial) { + Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) { Ok(_) => println!("Dialed {:?}", dialing), Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), }, @@ -51,7 +51,7 @@ impl Libp2pService { Swarm::listen_on( &mut swarm, - config.listening_multiaddr.parse().expect("Incorrect MultiAddr Format"))?; + config.listening_multiaddr.parse().expect("Incorrect MultiAddr Format")).unwrap(); for topic in config.pubsub_topics.clone() { swarm.subscribe(topic); From 8435720c9f7c4930ee98499b95b7befac91d2563 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 19:22:22 -0500 Subject: [PATCH 18/35] more suggestions --- node/ferret-libp2p/src/behaviour.rs | 1 - node/ferret-libp2p/src/service.rs | 3 +-- node/network/src/service.rs | 15 +++++++-------- node/src/main.rs | 25 +++++++++++++++++-------- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs index 66832985e393..ba447a61510a 100644 --- a/node/ferret-libp2p/src/behaviour.rs +++ b/node/ferret-libp2p/src/behaviour.rs @@ -88,7 +88,6 @@ impl MyBehaviour { } pub fn publish(&mut self, topic: &Topic, data: impl Into>) { - println!("Pubishing a message"); self.gossipsub.publish(topic, data); } diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 5fb1cb3b7f9e..5fb46a2e7917 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -73,14 +73,13 @@ impl Stream for Libp2pService { MyBehaviourEvent::DiscoveredPeer(peer) => { libp2p::Swarm::dial(&mut self.swarm, peer); } - MyBehaviourEvent::ExpiredPeer(_peer) => {} + MyBehaviourEvent::ExpiredPeer(_) => {} MyBehaviourEvent::GossipMessage { source, topics, message, } => { let message = String::from_utf8(message).unwrap(); - println!("Received Gossip: {:?} {:?} {:?}", source, topics, message); return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage { source, topics, diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 78e405e34c36..bde838d7797a 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -3,7 +3,7 @@ use ferret_libp2p::service::{Libp2pService, NetworkEvent}; use futures::stream::Stream; use futures::Async; use futures::Future; -use libp2p::{self, gossipsub::Topic}; +use libp2p::{gossipsub::Topic}; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; @@ -41,7 +41,7 @@ impl NetworkService { /// ``` pub fn new( config: &Libp2pConfig, - outbound_transmitter: Arc>, + outbound_transmitter: mpsc::UnboundedSender, executor: &TaskExecutor, ) -> ( Self, @@ -70,7 +70,7 @@ enum Error {} fn start( libp2p_service: Arc>, executor: &TaskExecutor, - outbound_transmitter: Arc>, + outbound_transmitter: mpsc::UnboundedSender, message_receiver: mpsc::UnboundedReceiver, ) -> tokio::sync::oneshot::Sender { let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); @@ -85,7 +85,7 @@ fn start( fn poll( libp2p_service: Arc>, - _outbound_transmitter: Arc>, + mut outbound_transmitter: mpsc::UnboundedSender, mut message_receiver: mpsc::UnboundedReceiver, ) -> impl futures::Future { futures::future::poll_fn(move || -> Result<_, _> { @@ -112,10 +112,9 @@ fn poll( topics, message, } => { - println!( - "Received a message from GossipSub! {:?}, {:?}, {:?}", - source, topics, message - ); + outbound_transmitter.try_send(NetworkEvent::PubsubMessage{ + source, topics, message + }); } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), diff --git a/node/src/main.rs b/node/src/main.rs index 5b32bbbcbd3d..e9490ddb5a15 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -15,8 +15,6 @@ use network::service::*; use futures::prelude::*; -use std::sync::Arc; - use tokio; use tokio::runtime::Runtime; @@ -28,14 +26,12 @@ fn main() { // main right now, should be ok to leave here let rt = Runtime::new().unwrap(); - let (tx, _rx) = mpsc::unbounded_channel::(); - let tx = Arc::new(tx); + let (tx, mut rx) = mpsc::unbounded_channel::(); let mut netcfg = Libp2pConfig::default(); - let topic = Topic::new("/fil/blocks".into()); - netcfg.pubsub_topics.push(topic.clone()); - let topic = Topic::new("/fil/messages".into()); + let topic = Topic::new("test-net".into()); netcfg.pubsub_topics.push(topic.clone()); + let (_network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); let stdin = tokio_stdin_stdout::stdin(0); @@ -46,7 +42,6 @@ fn main() { loop { match framed_stdin.poll().expect("Error while polling stdin") { Async::Ready(Some(line)) => { - println!("Got msg from stdin"); net_tx.try_send(NetworkMessage::PubsubMessage { topics: topic.clone(), message: line.as_bytes().to_vec(), @@ -56,6 +51,20 @@ fn main() { Async::NotReady => break, }; } + loop { + match rx.poll() { + Ok(Async::Ready(Some(message))) => match message { + NetworkEvent::PubsubMessage { + source, + topics, + message, + } => { + println!("Got msg! {:?} {:?} {:?}", source, topics, message); + } + } + _ => {break} + } + } Ok(Async::NotReady) })); From 0c2585ddb948ea438704138bd48484305b145fca Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 19:34:39 -0500 Subject: [PATCH 19/35] added topics into default config --- node/ferret-libp2p/src/config.rs | 3 ++- node/src/main.rs | 11 ++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/node/ferret-libp2p/src/config.rs b/node/ferret-libp2p/src/config.rs index 1a3831fc5901..0f33969e05ad 100644 --- a/node/ferret-libp2p/src/config.rs +++ b/node/ferret-libp2p/src/config.rs @@ -10,7 +10,8 @@ impl Default for Libp2pConfig { fn default() -> Self { Libp2pConfig { listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(), - pubsub_topics: vec![], + pubsub_topics: vec![Topic::new("/fil/blocks".to_owned()), + Topic::new("/fil/messages".to_owned())], bootstrap_peers: vec![], } } diff --git a/node/src/main.rs b/node/src/main.rs index e9490ddb5a15..d8cc189d1a3d 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -24,18 +24,19 @@ fn main() { // TODO Everything below should be run in a function somewhere, but since we only have this // main right now, should be ok to leave here + /// Create the tokio runtime let rt = Runtime::new().unwrap(); + /// Create the channel so we can receive messages from NetworkService let (tx, mut rx) = mpsc::unbounded_channel::(); + /// Create the default libp2p config let mut netcfg = Libp2pConfig::default(); - let topic = Topic::new("test-net".into()); - netcfg.pubsub_topics.push(topic.clone()); - - let (_network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); + let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); + let topics = netcfg.pubsub_topics.clone(); rt.executor() .spawn(futures::future::poll_fn(move || -> Result<_, _> { @@ -43,7 +44,7 @@ fn main() { match framed_stdin.poll().expect("Error while polling stdin") { Async::Ready(Some(line)) => { net_tx.try_send(NetworkMessage::PubsubMessage { - topics: topic.clone(), + topics: topics[0].clone(), message: line.as_bytes().to_vec(), }) } From ec52afe7afc26bfc0b4c342937cb4820ffb45d6d Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 19:37:00 -0500 Subject: [PATCH 20/35] lint --- node/src/main.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/node/src/main.rs b/node/src/main.rs index d8cc189d1a3d..6c36d6c3143a 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -33,7 +33,6 @@ fn main() { let mut netcfg = Libp2pConfig::default(); let (_network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); - let stdin = tokio_stdin_stdout::stdin(0); let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); let topics = netcfg.pubsub_topics.clone(); @@ -42,12 +41,10 @@ fn main() { .spawn(futures::future::poll_fn(move || -> Result<_, _> { loop { match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => { - net_tx.try_send(NetworkMessage::PubsubMessage { - topics: topics[0].clone(), - message: line.as_bytes().to_vec(), - }) - } + Async::Ready(Some(line)) => net_tx.try_send(NetworkMessage::PubsubMessage { + topics: topics[0].clone(), + message: line.as_bytes().to_vec(), + }), Async::Ready(None) => panic!("Stdin closed"), Async::NotReady => break, }; @@ -60,10 +57,10 @@ fn main() { topics, message, } => { - println!("Got msg! {:?} {:?} {:?}", source, topics, message); + println!("Got msg! {:?} {:?} {:?}", source, topics, message); } - } - _ => {break} + }, + _ => break, } } Ok(Async::NotReady) @@ -71,4 +68,3 @@ fn main() { rt.shutdown_on_idle().wait().unwrap(); } - From ab037a01cde0afedb69bf015f1a5fa0898bf2560 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 19:47:41 -0500 Subject: [PATCH 21/35] remove types --- vm/types/Cargo.toml | 11 ----------- vm/types/src/actor/mod.rs | 28 --------------------------- vm/types/src/lib.rs | 9 --------- vm/types/src/message.rs | 40 --------------------------------------- 4 files changed, 88 deletions(-) delete mode 100644 vm/types/Cargo.toml delete mode 100644 vm/types/src/actor/mod.rs delete mode 100644 vm/types/src/lib.rs delete mode 100644 vm/types/src/message.rs diff --git a/vm/types/Cargo.toml b/vm/types/Cargo.toml deleted file mode 100644 index 265ec529df8f..000000000000 --- a/vm/types/Cargo.toml +++ /dev/null @@ -1,11 +0,0 @@ -[package] -name = "types" -version = "0.1.0" -authors = ["Eric Tu "] -edition = "2018" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -cid = "*" -bytes = "*" \ No newline at end of file diff --git a/vm/types/src/actor/mod.rs b/vm/types/src/actor/mod.rs deleted file mode 100644 index caaa55463f16..000000000000 --- a/vm/types/src/actor/mod.rs +++ /dev/null @@ -1,28 +0,0 @@ -extern crate cid; -use cid::{Cid}; -use bytes::{Bytes}; - -//Might have to be some bignum -pub type UVarint = u64; - -pub type MethodNum = UVarint; -pub type MethodParam = Bytes; -pub type MethodParams = Vec; -pub type Code = Bytes; - -pub type TokenAmount = UVarint; -pub type CallSeqNum = UVarint; - -pub type CodeCID = Cid; -pub type ActorSubstateCID = Cid; - -pub struct ActorState { - codeCID: CodeCID, - state: ActorSubstateCID, - balance: TokenAmount, - callSeqNum: CallSeqNum, -} - -pub struct Actor { - pub state: ActorState, -} \ No newline at end of file diff --git a/vm/types/src/lib.rs b/vm/types/src/lib.rs deleted file mode 100644 index 41a74be78360..000000000000 --- a/vm/types/src/lib.rs +++ /dev/null @@ -1,9 +0,0 @@ -pub mod message; -pub mod actor; -#[cfg(test)] -mod tests { - #[test] - fn it_works() { - assert_eq!(2 + 2, 4); - } -} diff --git a/vm/types/src/message.rs b/vm/types/src/message.rs deleted file mode 100644 index aaab93e37387..000000000000 --- a/vm/types/src/message.rs +++ /dev/null @@ -1,40 +0,0 @@ -use super::actor::{MethodNum, MethodParams, CallSeqNum, TokenAmount}; - -//Should probably be a bignum -pub type GasAmount = u64; - -pub type GasPrice = TokenAmount; - -pub type Address = String; - -pub type FilCryptoSignature = String; - -#[derive (Debug, Clone)] -pub struct Message { - from: Address, //addr.Address - to: Address, // addr.address - method: MethodNum, - params: MethodParams, - call_seq_num: CallSeqNum, - value: TokenAmount, - gas_price: GasPrice, - gas_limit: GasAmount, -} - -#[derive (Debug, Clone)] -pub struct SignedMessage { - message: Message, - signature: FilCryptoSignature, -} - -impl From for Message { - fn from (msg: SignedMessage) -> Self{ - msg.message - } -} - -impl Message { - fn sign () -> SignedMessage { - unimplemented!(); - } -} From 723eedba2c4ff32536b64c67b886d26ed5300624 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 19:49:30 -0500 Subject: [PATCH 22/35] lint --- node/src/main.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/node/src/main.rs b/node/src/main.rs index 6c36d6c3143a..6d5ebd004f62 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -24,12 +24,12 @@ fn main() { // TODO Everything below should be run in a function somewhere, but since we only have this // main right now, should be ok to leave here - /// Create the tokio runtime + // Create the tokio runtime let rt = Runtime::new().unwrap(); - /// Create the channel so we can receive messages from NetworkService + // Create the channel so we can receive messages from NetworkService let (tx, mut rx) = mpsc::unbounded_channel::(); - /// Create the default libp2p config + // Create the default libp2p config let mut netcfg = Libp2pConfig::default(); let (_network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); From efd006197c2419719002a6beb5b518bd91d83e41 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 19:50:10 -0500 Subject: [PATCH 23/35] clippy fixes --- node/src/main.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/src/main.rs b/node/src/main.rs index 6d5ebd004f62..cd8536462be5 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -3,7 +3,6 @@ mod cli; use cli::cli; use libp2p::{ - gossipsub::Topic, tokio_codec::{FramedRead, LinesCodec}, }; @@ -30,7 +29,7 @@ fn main() { // Create the channel so we can receive messages from NetworkService let (tx, mut rx) = mpsc::unbounded_channel::(); // Create the default libp2p config - let mut netcfg = Libp2pConfig::default(); + let netcfg = Libp2pConfig::default(); let (_network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); let stdin = tokio_stdin_stdout::stdin(0); From f7f59622d65142a9edb67e70170352bc78679f01 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Wed, 27 Nov 2019 20:02:37 -0500 Subject: [PATCH 24/35] remove CLI portion of app. if you wanna have a chat, just checkout the previous commit --- node/Cargo.toml | 1 - node/ferret-libp2p/Cargo.toml | 1 - node/network/src/service.rs | 2 +- node/src/main.rs | 42 +++-------------------------------- 4 files changed, 4 insertions(+), 42 deletions(-) diff --git a/node/Cargo.toml b/node/Cargo.toml index a67e6eb364d9..2f8a158d82de 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -13,7 +13,6 @@ ferret-libp2p = { path = "ferret-libp2p"} libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } tokio = "0.1.22" -tokio-stdin-stdout = "0.1.5" futures = "0.1.29" log = "0.4.8" env_logger = "0.7.1" diff --git a/node/ferret-libp2p/Cargo.toml b/node/ferret-libp2p/Cargo.toml index e69b58cad017..7fa1d2d843e9 100644 --- a/node/ferret-libp2p/Cargo.toml +++ b/node/ferret-libp2p/Cargo.toml @@ -9,7 +9,6 @@ edition = "2018" [dependencies] libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } tokio = "0.1.22" -tokio-stdin-stdout = "0.1.5" futures = "0.1.29" log = "0.4.8" env_logger = "0.7.1" \ No newline at end of file diff --git a/node/network/src/service.rs b/node/network/src/service.rs index bde838d7797a..6777b9a79426 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -114,7 +114,7 @@ fn poll( } => { outbound_transmitter.try_send(NetworkEvent::PubsubMessage{ source, topics, message - }); + }).map_err(|e| println!("Can't handle message")); } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), diff --git a/node/src/main.rs b/node/src/main.rs index cd8536462be5..338fdc0741d7 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -2,10 +2,6 @@ mod cli; use cli::cli; -use libp2p::{ - tokio_codec::{FramedRead, LinesCodec}, -}; - use tokio::sync::mpsc; use ferret_libp2p::config::Libp2pConfig; @@ -27,43 +23,11 @@ fn main() { let rt = Runtime::new().unwrap(); // Create the channel so we can receive messages from NetworkService - let (tx, mut rx) = mpsc::unbounded_channel::(); + let (tx, _rx) = mpsc::unbounded_channel::(); // Create the default libp2p config let netcfg = Libp2pConfig::default(); - let (_network_service, mut net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); - - let stdin = tokio_stdin_stdout::stdin(0); - let mut framed_stdin = FramedRead::new(stdin, LinesCodec::new()); - let topics = netcfg.pubsub_topics.clone(); - - rt.executor() - .spawn(futures::future::poll_fn(move || -> Result<_, _> { - loop { - match framed_stdin.poll().expect("Error while polling stdin") { - Async::Ready(Some(line)) => net_tx.try_send(NetworkMessage::PubsubMessage { - topics: topics[0].clone(), - message: line.as_bytes().to_vec(), - }), - Async::Ready(None) => panic!("Stdin closed"), - Async::NotReady => break, - }; - } - loop { - match rx.poll() { - Ok(Async::Ready(Some(message))) => match message { - NetworkEvent::PubsubMessage { - source, - topics, - message, - } => { - println!("Got msg! {:?} {:?} {:?}", source, topics, message); - } - }, - _ => break, - } - } - Ok(Async::NotReady) - })); + // Start the NetworkService. Returns net_tx so you can pass messages in. + let (_network_service, _net_tx, _exit_tx) = NetworkService::new(&netcfg, tx, &rt.executor()); rt.shutdown_on_idle().wait().unwrap(); } From c9b4d329ffe7fa8983774917330962a14a2e78fd Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 2 Dec 2019 16:44:32 -0500 Subject: [PATCH 25/35] fix PR suggesttions --- node/network/src/service.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 6777b9a79426..93a177613bcc 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -112,9 +112,16 @@ fn poll( topics, message, } => { - outbound_transmitter.try_send(NetworkEvent::PubsubMessage{ - source, topics, message - }).map_err(|e| println!("Can't handle message")); + if outbound_transmitter + .try_send(NetworkEvent::PubsubMessage { + source, + topics, + message, + }) + .is_err() + { + println!("Can't handle message") + } } }, Ok(Async::Ready(None)) => unreachable!("Stream never ends"), From 092d782c5367276134a1a0f763e75313e84ab79c Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 2 Dec 2019 17:42:03 -0500 Subject: [PATCH 26/35] add logger to network and libp2p crates --- node/ferret-libp2p/Cargo.toml | 2 +- node/ferret-libp2p/src/service.rs | 18 ++++++------------ node/network/Cargo.toml | 2 ++ node/network/src/service.rs | 13 +++++++++---- node/src/main.rs | 4 ++-- 5 files changed, 20 insertions(+), 19 deletions(-) diff --git a/node/ferret-libp2p/Cargo.toml b/node/ferret-libp2p/Cargo.toml index 7fa1d2d843e9..4806d396a6fd 100644 --- a/node/ferret-libp2p/Cargo.toml +++ b/node/ferret-libp2p/Cargo.toml @@ -11,4 +11,4 @@ libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01 tokio = "0.1.22" futures = "0.1.29" log = "0.4.8" -env_logger = "0.7.1" \ No newline at end of file +slog = "2.5.2" \ No newline at end of file diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 5fb46a2e7917..ba41815b19a7 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -8,7 +8,7 @@ use libp2p::{ }; use std::io::{Error, ErrorKind}; use std::time::Duration; - +use slog::{Logger, info, debug, error}; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; @@ -19,17 +19,11 @@ pub struct Libp2pService { impl Libp2pService { /// Constructs a Libp2pService - /// - /// # Example - /// ``` - /// let mut netcfg = Libp2pConfig::default(); - /// let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(config).unwrap())); - /// ``` - pub fn new(config: &Libp2pConfig) -> Result { + pub fn new(log: Logger, config: &Libp2pConfig) -> Result { // TODO @Greg do local storage let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); - println!("Local peer id: {:?}", local_peer_id); + info!(log, "Local peer id: {:?}", local_peer_id); let transport = build_transport(local_key.clone()); @@ -42,10 +36,10 @@ impl Libp2pService { let dialing = node.clone(); match node.parse() { Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) { - Ok(_) => println!("Dialed {:?}", dialing), - Err(e) => println!("Dial {:?} failed: {:?}", dialing, e), + Ok(_) => debug!(log, "Dialed {:?}", dialing), + Err(e) => debug!(log, "Dial {:?} failed: {:?}", dialing, e), }, - Err(err) => println!("Failed to parse address to dial: {:?}", err), + Err(err) => error!(log, "Failed to parse address to dial: {:?}", err), } } diff --git a/node/network/Cargo.toml b/node/network/Cargo.toml index 828884c9bba6..10d21e2eb851 100644 --- a/node/network/Cargo.toml +++ b/node/network/Cargo.toml @@ -11,3 +11,5 @@ ferret-libp2p = { path = "../ferret-libp2p" } futures = "0.1.29" tokio = "0.1.22" libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "cdd5251d29e21a01aa2ffed8cb577a37a0f9e2eb" } +log = "0.4.8" +slog = "2.5.2" \ No newline at end of file diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 93a177613bcc..9101cd590610 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -9,6 +9,8 @@ use tokio::sync::mpsc; use tokio::runtime::TaskExecutor; +use slog::{Logger, warn}; + /// Ingress events to the NetworkService pub enum NetworkMessage { PubsubMessage { topics: Topic, message: Vec }, @@ -41,6 +43,7 @@ impl NetworkService { /// ``` pub fn new( config: &Libp2pConfig, + log: Logger, outbound_transmitter: mpsc::UnboundedSender, executor: &TaskExecutor, ) -> ( @@ -50,9 +53,9 @@ impl NetworkService { ) { let (tx, rx) = mpsc::unbounded_channel(); - let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(config).unwrap())); + let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(log.clone(), config).unwrap())); - let exit_tx = start(libp2p_service.clone(), executor, outbound_transmitter, rx); + let exit_tx = start(log.clone(), libp2p_service.clone(), executor, outbound_transmitter, rx); ( NetworkService { @@ -68,6 +71,7 @@ enum Error {} /// Spawns the NetworkService service. fn start( + log: Logger, libp2p_service: Arc>, executor: &TaskExecutor, outbound_transmitter: mpsc::UnboundedSender, @@ -75,7 +79,7 @@ fn start( ) -> tokio::sync::oneshot::Sender { let (network_exit, exit_rx) = tokio::sync::oneshot::channel(); executor.spawn( - poll(libp2p_service, outbound_transmitter, message_receiver) + poll(log, libp2p_service, outbound_transmitter, message_receiver) .select(exit_rx.then(|_| Ok(()))) .then(move |_| Ok(())), ); @@ -84,6 +88,7 @@ fn start( } fn poll( + log: Logger, libp2p_service: Arc>, mut outbound_transmitter: mpsc::UnboundedSender, mut message_receiver: mpsc::UnboundedReceiver, @@ -120,7 +125,7 @@ fn poll( }) .is_err() { - println!("Can't handle message") + warn!(log, "Cant handle message"); } } }, diff --git a/node/src/main.rs b/node/src/main.rs index 7d792b70a948..8cbab3a20211 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -17,7 +17,6 @@ use tokio; use tokio::runtime::Runtime; fn main() { - let log = log::setup_logging(); info!(log, "Starting Ferret"); cli(&log); @@ -30,7 +29,8 @@ fn main() { // Create the default libp2p config let netcfg = Libp2pConfig::default(); // Start the NetworkService. Returns net_tx so you can pass messages in. - let (_network_service, _net_tx, _exit_tx) = NetworkService::new(&netcfg, log.clone(), tx, &rt.executor()); + let (_network_service, _net_tx, _exit_tx) = + NetworkService::new(&netcfg, log.clone(), tx, &rt.executor()); rt.shutdown_on_idle().wait().unwrap(); info!(log, "Ferret finish shutdown"); From 3a68f2f30817d78b8741977ac86ea6c1c3daaed8 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 2 Dec 2019 18:17:17 -0500 Subject: [PATCH 27/35] fix doctest --- node/network/src/service.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 9101cd590610..4377e2142c13 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -33,13 +33,14 @@ impl NetworkService { /// use ferret_libp2p::service::NetworkEvent; /// use ferret_libp2p::config::Libp2pConfig; /// use std::sync::Arc; + /// use slog::Logger; /// /// /// let rt = Runtime::new().unwrap(); /// let (tx, _rx) = mpsc::unbounded_channel::(); /// let tx = Arc::new(tx); /// let mut netcfg = Libp2pConfig::default(); - /// let (network_service, mut net_tx, _exit_tx) = new(&netcfg, tx, &rt.executor()); + /// let (network_service, mut net_tx, _exit_tx) = new(&netcfg, None, tx, &rt.executor()); /// ``` pub fn new( config: &Libp2pConfig, From 54bfd046f67f15f89462885661746290a5b8c0fc Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 2 Dec 2019 18:22:04 -0500 Subject: [PATCH 28/35] linting --- node/ferret-libp2p/src/config.rs | 6 ++++-- node/ferret-libp2p/src/service.rs | 16 ++++++++++------ node/network/src/service.rs | 12 +++++++++--- 3 files changed, 23 insertions(+), 11 deletions(-) diff --git a/node/ferret-libp2p/src/config.rs b/node/ferret-libp2p/src/config.rs index 0f33969e05ad..66438a2e2d69 100644 --- a/node/ferret-libp2p/src/config.rs +++ b/node/ferret-libp2p/src/config.rs @@ -10,8 +10,10 @@ impl Default for Libp2pConfig { fn default() -> Self { Libp2pConfig { listening_multiaddr: "/ip4/0.0.0.0/tcp/0".to_owned(), - pubsub_topics: vec![Topic::new("/fil/blocks".to_owned()), - Topic::new("/fil/messages".to_owned())], + pubsub_topics: vec![ + Topic::new("/fil/blocks".to_owned()), + Topic::new("/fil/messages".to_owned()), + ], bootstrap_peers: vec![], } } diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index ba41815b19a7..bf9c625baebb 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -1,14 +1,13 @@ -use super::config::Libp2pConfig; use super::behaviour::{MyBehaviour, MyBehaviourEvent}; +use super::config::Libp2pConfig; use futures::{Async, Stream}; use libp2p::{ - core, core::muxing::StreamMuxerBox, core::nodes::Substream, - core::transport::boxed::Boxed, gossipsub::TopicHash, identity, mplex, secio, yamux, PeerId, - Swarm, Transport, + core, core::muxing::StreamMuxerBox, core::nodes::Substream, core::transport::boxed::Boxed, + gossipsub::TopicHash, identity, mplex, secio, yamux, PeerId, Swarm, Transport, }; +use slog::{debug, error, info, Logger}; use std::io::{Error, ErrorKind}; use std::time::Duration; -use slog::{Logger, info, debug, error}; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = MyBehaviour>; @@ -45,7 +44,12 @@ impl Libp2pService { Swarm::listen_on( &mut swarm, - config.listening_multiaddr.parse().expect("Incorrect MultiAddr Format")).unwrap(); + config + .listening_multiaddr + .parse() + .expect("Incorrect MultiAddr Format"), + ) + .unwrap(); for topic in config.pubsub_topics.clone() { swarm.subscribe(topic); diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 4377e2142c13..0b33947b6fc2 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -3,13 +3,13 @@ use ferret_libp2p::service::{Libp2pService, NetworkEvent}; use futures::stream::Stream; use futures::Async; use futures::Future; -use libp2p::{gossipsub::Topic}; +use libp2p::gossipsub::Topic; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; use tokio::runtime::TaskExecutor; -use slog::{Logger, warn}; +use slog::{warn, Logger}; /// Ingress events to the NetworkService pub enum NetworkMessage { @@ -56,7 +56,13 @@ impl NetworkService { let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(log.clone(), config).unwrap())); - let exit_tx = start(log.clone(), libp2p_service.clone(), executor, outbound_transmitter, rx); + let exit_tx = start( + log.clone(), + libp2p_service.clone(), + executor, + outbound_transmitter, + rx, + ); ( NetworkService { From c03f05bcff6d88fb59f2037d7c07ce0cbc70a3db Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Mon, 2 Dec 2019 18:32:28 -0500 Subject: [PATCH 29/35] remove doctest caused more headaches than was beneficial --- node/network/src/service.rs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 0b33947b6fc2..a9368d172401 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -25,23 +25,6 @@ pub struct NetworkService { impl NetworkService { /// Starts a Libp2pService with a given config, UnboundedSender, and tokio executor. /// Returns an UnboundedSender channel so messages can come in. - /// - /// # Example - /// ``` - /// use tokio::runtime::Runtime; - /// use tokio::sync::mpsc; - /// use ferret_libp2p::service::NetworkEvent; - /// use ferret_libp2p::config::Libp2pConfig; - /// use std::sync::Arc; - /// use slog::Logger; - /// - /// - /// let rt = Runtime::new().unwrap(); - /// let (tx, _rx) = mpsc::unbounded_channel::(); - /// let tx = Arc::new(tx); - /// let mut netcfg = Libp2pConfig::default(); - /// let (network_service, mut net_tx, _exit_tx) = new(&netcfg, None, tx, &rt.executor()); - /// ``` pub fn new( config: &Libp2pConfig, log: Logger, From 2a73a4e9d050da8e1ad0ad8100aaa6d14bacbe17 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 3 Dec 2019 15:04:31 -0500 Subject: [PATCH 30/35] pr suggestions --- node/ferret-libp2p/src/service.rs | 4 ++-- node/network/src/service.rs | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index bf9c625baebb..2b6939746408 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -18,7 +18,7 @@ pub struct Libp2pService { impl Libp2pService { /// Constructs a Libp2pService - pub fn new(log: Logger, config: &Libp2pConfig) -> Result { + pub fn new(log: Logger, config: &Libp2pConfig) -> Self { // TODO @Greg do local storage let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); @@ -55,7 +55,7 @@ impl Libp2pService { swarm.subscribe(topic); } - Ok(Libp2pService { swarm }) + Libp2pService { swarm } } } diff --git a/node/network/src/service.rs b/node/network/src/service.rs index a9368d172401..d8e1381de238 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -37,7 +37,7 @@ impl NetworkService { ) { let (tx, rx) = mpsc::unbounded_channel(); - let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(log.clone(), config).unwrap())); + let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(log.clone(), config))); let exit_tx = start( log.clone(), From b0812f13d0aeb5ceedd793bc27d6e3f0e389db84 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 3 Dec 2019 15:20:16 -0500 Subject: [PATCH 31/35] borrow instead of cloning logger where it doesnt need to be cloned --- node/ferret-libp2p/src/behaviour.rs | 3 --- node/ferret-libp2p/src/service.rs | 2 +- node/network/src/service.rs | 9 +++------ node/src/main.rs | 2 +- 4 files changed, 5 insertions(+), 11 deletions(-) diff --git a/node/ferret-libp2p/src/behaviour.rs b/node/ferret-libp2p/src/behaviour.rs index ba447a61510a..0ad2c81e69c0 100644 --- a/node/ferret-libp2p/src/behaviour.rs +++ b/node/ferret-libp2p/src/behaviour.rs @@ -1,11 +1,8 @@ use futures::Async; use libp2p::core::identity::Keypair; - use libp2p::core::PeerId; use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash}; - use libp2p::mdns::{Mdns, MdnsEvent}; - use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess}; use libp2p::tokio_io::{AsyncRead, AsyncWrite}; use libp2p::NetworkBehaviour; diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index 2b6939746408..cc058d7b704e 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -18,7 +18,7 @@ pub struct Libp2pService { impl Libp2pService { /// Constructs a Libp2pService - pub fn new(log: Logger, config: &Libp2pConfig) -> Self { + pub fn new(log: &Logger, config: &Libp2pConfig) -> Self { // TODO @Greg do local storage let local_key = identity::Keypair::generate_ed25519(); let local_peer_id = PeerId::from(local_key.public()); diff --git a/node/network/src/service.rs b/node/network/src/service.rs index d8e1381de238..8012a33c63da 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -1,14 +1,11 @@ use ferret_libp2p::config::Libp2pConfig; use ferret_libp2p::service::{Libp2pService, NetworkEvent}; use futures::stream::Stream; -use futures::Async; -use futures::Future; +use futures::{Async, Future}; use libp2p::gossipsub::Topic; use std::sync::{Arc, Mutex}; use tokio::sync::mpsc; - use tokio::runtime::TaskExecutor; - use slog::{warn, Logger}; /// Ingress events to the NetworkService @@ -27,7 +24,7 @@ impl NetworkService { /// Returns an UnboundedSender channel so messages can come in. pub fn new( config: &Libp2pConfig, - log: Logger, + log: &Logger, outbound_transmitter: mpsc::UnboundedSender, executor: &TaskExecutor, ) -> ( @@ -37,7 +34,7 @@ impl NetworkService { ) { let (tx, rx) = mpsc::unbounded_channel(); - let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(log.clone(), config))); + let libp2p_service = Arc::new(Mutex::new(Libp2pService::new(log, config))); let exit_tx = start( log.clone(), diff --git a/node/src/main.rs b/node/src/main.rs index 8cbab3a20211..5687cc848733 100644 --- a/node/src/main.rs +++ b/node/src/main.rs @@ -30,7 +30,7 @@ fn main() { let netcfg = Libp2pConfig::default(); // Start the NetworkService. Returns net_tx so you can pass messages in. let (_network_service, _net_tx, _exit_tx) = - NetworkService::new(&netcfg, log.clone(), tx, &rt.executor()); + NetworkService::new(&netcfg, &log, tx, &rt.executor()); rt.shutdown_on_idle().wait().unwrap(); info!(log, "Ferret finish shutdown"); From 59c24779b39eade3d5a6f8215fb09281f08446c4 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 3 Dec 2019 15:20:44 -0500 Subject: [PATCH 32/35] linting --- node/network/src/service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/network/src/service.rs b/node/network/src/service.rs index 8012a33c63da..193f94ad52b6 100644 --- a/node/network/src/service.rs +++ b/node/network/src/service.rs @@ -3,10 +3,10 @@ use ferret_libp2p::service::{Libp2pService, NetworkEvent}; use futures::stream::Stream; use futures::{Async, Future}; use libp2p::gossipsub::Topic; +use slog::{warn, Logger}; use std::sync::{Arc, Mutex}; -use tokio::sync::mpsc; use tokio::runtime::TaskExecutor; -use slog::{warn, Logger}; +use tokio::sync::mpsc; /// Ingress events to the NetworkService pub enum NetworkMessage { From 33bdf70b95e094a6909f9ba95eaf859b1fa0111a Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 3 Dec 2019 15:27:59 -0500 Subject: [PATCH 33/35] pr suggestions --- node/ferret-libp2p/src/service.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index cc058d7b704e..f509aaf99da9 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -32,11 +32,10 @@ impl Libp2pService { }; for node in config.bootstrap_peers.clone() { - let dialing = node.clone(); match node.parse() { Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) { - Ok(_) => debug!(log, "Dialed {:?}", dialing), - Err(e) => debug!(log, "Dial {:?} failed: {:?}", dialing, e), + Ok(_) => debug!(log, "Dialed {:?}", node), + Err(e) => debug!(log, "Dial {:?} failed: {:?}", node, e), }, Err(err) => error!(log, "Failed to parse address to dial: {:?}", err), } From 8e29083801d9884db19d6b3dcb547b0c108d86d7 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 3 Dec 2019 15:30:05 -0500 Subject: [PATCH 34/35] more pr suggestions --- node/ferret-libp2p/src/service.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/node/ferret-libp2p/src/service.rs b/node/ferret-libp2p/src/service.rs index f509aaf99da9..4e8aeb2919a7 100644 --- a/node/ferret-libp2p/src/service.rs +++ b/node/ferret-libp2p/src/service.rs @@ -76,7 +76,6 @@ impl Stream for Libp2pService { topics, message, } => { - let message = String::from_utf8(message).unwrap(); return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage { source, topics, @@ -99,7 +98,7 @@ pub enum NetworkEvent { PubsubMessage { source: PeerId, topics: Vec, - message: String, + message: Vec, }, } From 18c0844d25af15a5805316d51d48bda4c8d8ef07 Mon Sep 17 00:00:00 2001 From: Eric Tu Date: Tue, 3 Dec 2019 15:57:24 -0500 Subject: [PATCH 35/35] add vscode to gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index 9910f4d50da3..b9a1cb1edc4c 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ /Cargo.lock .idea .DS_STORE +.vscode \ No newline at end of file