Skip to content

Commit

Permalink
Migration to Stable Futures and Network Refactor (#209)
Browse files Browse the repository at this point in the history
* wip failing derive

* missed some files

* compiles but doesnt run

* runs now!

* refactor to remove network crate. it works, but this doesnt compile because of dependancies. easy fix, but gotta go home

* compilesgit add .

* got it working with the channel

* out channels implemented

* change channel types and add some comments

* lint

* fix license header

* suggestions

* some more suggestions

* whitespace in cargo files

* remove MergeEvent enum

* change path name

* white spaces in cargo toml
  • Loading branch information
ec2 authored Feb 4, 2020
1 parent 8b1b61b commit 477930d
Show file tree
Hide file tree
Showing 12 changed files with 133 additions and 242 deletions.
2 changes: 1 addition & 1 deletion blockchain/chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
forest_libp2p = { path = "../../node/forest_libp2p" }
blocks = { package = "forest_blocks", path = "../blocks" }
db = { path = "../../node/db" }
network = { path = "../../node/network" }
cid = { package = "forest_cid", path = "../../ipld/cid" }
clock = { path = "../../node/clock" }
encoding = { package = "forest_encoding", path = "../../encoding" }
Expand Down
2 changes: 1 addition & 1 deletion blockchain/chain_sync/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
libp2p = "0.15.0"
blocks = { package = "forest_blocks", path = "../blocks" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "776d13ef046358964c7d64cda3295a3a3cb24743" }
7 changes: 3 additions & 4 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,15 @@ authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
network = { path = "network" }
forest_libp2p = { path = "forest_libp2p" }
utils = { path = "utils" }
db = { path = "db" }
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "776d13ef046358964c7d64cda3295a3a3cb24743" }
tokio = "0.1.22"
futures = "0.1.29"
libp2p = "0.15.0"
futures = "0.3.1"
clap = "2.33.0"
log = "0.4.8"
slog = "2.5.2"
slog-async = "2.3.0"
slog-term = "2.4.2"
async-std = { version = "1.4.0", features = ["attributes"] }
serde = { version = "1.0", features = ["derive"] }
9 changes: 5 additions & 4 deletions node/forest_libp2p/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ authors = ["ChainSafe Systems <info@chainsafe.io>"]
edition = "2018"

[dependencies]
libp2p = { git = "https://github.com/SigP/rust-libp2p", rev = "776d13ef046358964c7d64cda3295a3a3cb24743" }
utils = { path = "../utils" }
tokio = "0.1.22"
futures = "0.1.29"
libp2p = "0.15.0"
futures = "0.3.1"
futures-util = "0.3.1"
log = "0.4.8"
slog = "2.5.2"
serde = {version = "1.0", features = ["derive"]}
async-std = { version = "1.4.0", features = ["unstable"] }
serde = { version = "1.0", features = ["derive"] }
50 changes: 26 additions & 24 deletions node/forest_libp2p/src/behaviour.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use futures::Async;
use futures::prelude::*;
use libp2p::core::identity::Keypair;
use libp2p::core::PeerId;
use libp2p::gossipsub::{Gossipsub, GossipsubConfig, GossipsubEvent, Topic, TopicHash};
Expand All @@ -12,24 +12,25 @@ use libp2p::ping::{
Ping, PingEvent,
};
use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess};
use libp2p::tokio_io::{AsyncRead, AsyncWrite};
use libp2p::NetworkBehaviour;
use slog::{debug, Logger};
use std::{task::Context, task::Poll};

#[derive(NetworkBehaviour)]
#[behaviour(out_event = "MyBehaviourEvent", poll_method = "poll")]
pub struct MyBehaviour<TSubstream: AsyncRead + AsyncWrite> {
#[behaviour(out_event = "ForestBehaviourEvent", poll_method = "poll")]
pub struct ForestBehaviour<TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static> {
pub gossipsub: Gossipsub<TSubstream>,
pub mdns: Mdns<TSubstream>,
pub ping: Ping<TSubstream>,
pub identify: Identify<TSubstream>,
#[behaviour(ignore)]
events: Vec<MyBehaviourEvent>,
events: Vec<ForestBehaviourEvent>,
#[behaviour(ignore)]
log: Logger,
}

pub enum MyBehaviourEvent {
#[derive(Debug)]
pub enum ForestBehaviourEvent {
DiscoveredPeer(PeerId),
ExpiredPeer(PeerId),
GossipMessage {
Expand All @@ -39,33 +40,33 @@ pub enum MyBehaviourEvent {
},
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<MdnsEvent>
for MyBehaviour<TSubstream>
impl<TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static>
NetworkBehaviourEventProcess<MdnsEvent> for ForestBehaviour<TSubstream>
{
fn inject_event(&mut self, event: MdnsEvent) {
match event {
MdnsEvent::Discovered(list) => {
for (peer, _) in list {
self.events.push(MyBehaviourEvent::DiscoveredPeer(peer))
self.events.push(ForestBehaviourEvent::DiscoveredPeer(peer))
}
}
MdnsEvent::Expired(list) => {
for (peer, _) in list {
if !self.mdns.has_node(&peer) {
self.events.push(MyBehaviourEvent::ExpiredPeer(peer))
self.events.push(ForestBehaviourEvent::ExpiredPeer(peer))
}
}
}
}
}
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubEvent>
for MyBehaviour<TSubstream>
impl<TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static>
NetworkBehaviourEventProcess<GossipsubEvent> for ForestBehaviour<TSubstream>
{
fn inject_event(&mut self, message: GossipsubEvent) {
if let GossipsubEvent::Message(_, _, message) = message {
self.events.push(MyBehaviourEvent::GossipMessage {
self.events.push(ForestBehaviourEvent::GossipMessage {
source: message.source,
topics: message.topics,
message: message.data,
Expand All @@ -74,8 +75,8 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<GossipsubE
}
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
for MyBehaviour<TSubstream>
impl<TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static>
NetworkBehaviourEventProcess<PingEvent> for ForestBehaviour<TSubstream>
{
fn inject_event(&mut self, event: PingEvent) {
match event.result {
Expand Down Expand Up @@ -109,8 +110,8 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<PingEvent>
}
}

impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEvent>
for MyBehaviour<TSubstream>
impl<TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static>
NetworkBehaviourEventProcess<IdentifyEvent> for ForestBehaviour<TSubstream>
{
fn inject_event(&mut self, event: IdentifyEvent) {
match event {
Expand All @@ -132,25 +133,26 @@ impl<TSubstream: AsyncRead + AsyncWrite> NetworkBehaviourEventProcess<IdentifyEv
}
}

impl<TSubstream: AsyncRead + AsyncWrite> MyBehaviour<TSubstream> {
impl<TSubstream: AsyncRead + AsyncWrite + Send + Unpin + 'static> ForestBehaviour<TSubstream> {
/// Consumes the events list when polled.
fn poll<TBehaviourIn>(
&mut self,
) -> Async<NetworkBehaviourAction<TBehaviourIn, MyBehaviourEvent>> {
_: &mut Context,
) -> Poll<NetworkBehaviourAction<TBehaviourIn, ForestBehaviourEvent>> {
if !self.events.is_empty() {
return Async::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
return Poll::Ready(NetworkBehaviourAction::GenerateEvent(self.events.remove(0)));
}
Async::NotReady
Poll::Pending
}
}

impl<TSubstream: AsyncRead + AsyncWrite> MyBehaviour<TSubstream> {
impl<TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static> ForestBehaviour<TSubstream> {
pub fn new(log: Logger, local_key: &Keypair) -> Self {
let local_peer_id = local_key.public().into_peer_id();
let gossipsub_config = GossipsubConfig::default();
MyBehaviour {
ForestBehaviour {
gossipsub: Gossipsub::new(local_peer_id, gossipsub_config),
mdns: Mdns::new().expect("Failed to create mDNS service"),
mdns: Mdns::new().expect("Could not start mDNS"),
ping: Ping::default(),
identify: Identify::new("forest/libp2p".into(), "0.0.1".into(), local_key.public()),
log,
Expand Down
1 change: 0 additions & 1 deletion node/forest_libp2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use serde::Deserialize;
pub struct Libp2pConfig {
pub listening_multiaddr: String,
pub bootstrap_peers: Vec<String>,

#[serde(skip_deserializing)] // Always use default
pub pubsub_topics: Vec<Topic>,
}
Expand Down
2 changes: 2 additions & 0 deletions node/forest_libp2p/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

#![recursion_limit = "1024"]

pub mod behaviour;
pub mod config;
pub mod service;
130 changes: 85 additions & 45 deletions node/forest_libp2p/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,42 +1,68 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::behaviour::{MyBehaviour, MyBehaviourEvent};
use super::behaviour::{ForestBehaviour, ForestBehaviourEvent};
use super::config::Libp2pConfig;
use futures::{Async, Stream};
use async_std::sync::{channel, Receiver, Sender};
use futures::select;
use futures_util::stream::StreamExt;
use libp2p::{
core,
core::muxing::StreamMuxerBox,
core::nodes::Substream,
core::transport::boxed::Boxed,
gossipsub::TopicHash,
gossipsub::{Topic, TopicHash},
identity::{ed25519, Keypair},
mplex, secio, yamux, PeerId, Swarm, Transport,
};
use slog::{debug, error, info, trace, Logger};
use std::io::{Error, ErrorKind};
use std::time::Duration;
use utils::{get_home_dir, read_file_to_vec, write_to_file};

type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>;
type Libp2pBehaviour = MyBehaviour<Substream<StreamMuxerBox>>;
type Libp2pBehaviour = ForestBehaviour<Substream<StreamMuxerBox>>;

/// Events emitted by this Service
#[derive(Clone, Debug)]
pub enum NetworkEvent {
PubsubMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Vec<u8>,
},
}

/// Events into this Service
#[derive(Clone, Debug)]
pub enum NetworkMessage {
PubsubMessage { topic: Topic, message: Vec<u8> },
}
/// The Libp2pService listens to events from the Libp2p swarm.
pub struct Libp2pService {
pub swarm: Swarm<Libp2pStream, Libp2pBehaviour>,
swarm: Swarm<Libp2pStream, Libp2pBehaviour>,

pubsub_receiver_in: Receiver<NetworkMessage>,
pubsub_sender_in: Sender<NetworkMessage>,

pubsub_receiver_out: Receiver<NetworkEvent>,
pubsub_sender_out: Sender<NetworkEvent>,

log: Logger,
}

impl Libp2pService {
/// Constructs a Libp2pService
pub fn new(log: &Logger, config: &Libp2pConfig) -> Self {
let net_keypair = get_keypair(log);
pub fn new(log: Logger, config: &Libp2pConfig) -> Self {
let net_keypair = get_keypair(&log);
let peer_id = PeerId::from(net_keypair.public());

info!(log, "Local peer id: {:?}", peer_id);

let transport = build_transport(net_keypair.clone());

let mut swarm = {
let be = MyBehaviour::new(log.clone(), &net_keypair);
let be = ForestBehaviour::new(log.clone(), &net_keypair);
Swarm::new(transport, be, peer_id)
};

Expand All @@ -63,58 +89,72 @@ impl Libp2pService {
swarm.subscribe(topic);
}

Libp2pService { swarm }
let (pubsub_sender_in, pubsub_receiver_in) = channel(20);
let (pubsub_sender_out, pubsub_receiver_out) = channel(20);
Libp2pService {
swarm,
pubsub_receiver_in,
pubsub_sender_in,
pubsub_receiver_out,
pubsub_sender_out,
log,
}
}
}

impl Stream for Libp2pService {
type Item = NetworkEvent;
type Error = ();

/// Continuously polls the Libp2p swarm to get events
fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
/// Starts the `Libp2pService` networking stack. This Future resolves when shutdown occurs.
pub async fn run(self) {
let mut swarm_stream = self.swarm.fuse();
let mut pubsub_stream = self.pubsub_receiver_in.fuse();
loop {
match self.swarm.poll() {
Ok(Async::Ready(Some(event))) => match event {
MyBehaviourEvent::DiscoveredPeer(peer) => {
libp2p::Swarm::dial(&mut self.swarm, peer);
}
MyBehaviourEvent::ExpiredPeer(_) => {}
MyBehaviourEvent::GossipMessage {
source,
topics,
message,
} => {
return Ok(Async::Ready(Option::from(NetworkEvent::PubsubMessage {
select! {
swarm_event = swarm_stream.next() => match swarm_event {
Some(event) => match event {
ForestBehaviourEvent::DiscoveredPeer(peer) => {
libp2p::Swarm::dial(&mut swarm_stream.get_mut(), peer);
}
ForestBehaviourEvent::ExpiredPeer(_) => {}
ForestBehaviourEvent::GossipMessage {
source,
topics,
message,
})));
} => {
info!(self.log, "Got a Gossip Message from {:?}", source);
self.pubsub_sender_out.send(NetworkEvent::PubsubMessage {
source,
topics,
message
}).await;
}
}
None => {break;}
},
Ok(Async::Ready(None)) => break,
Ok(Async::NotReady) => break,
_ => break,
}
rpc_message = pubsub_stream.next() => match rpc_message {
Some(message) => match message {
NetworkMessage::PubsubMessage{topic, message} => {
swarm_stream.get_mut().publish(&topic, message);
}
}
None => {break;}
}
};
}
Ok(Async::NotReady)
}
}

/// Events emitted by this Service to be listened by the NetworkService.
#[derive(Clone)]
pub enum NetworkEvent {
PubsubMessage {
source: PeerId,
topics: Vec<TopicHash>,
message: Vec<u8>,
},
/// Returns a `Sender` allowing you to send messages over GossipSub
pub fn pubsub_sender(&self) -> Sender<NetworkMessage> {
self.pubsub_sender_in.clone()
}

/// Returns a `Receiver` to listen to GossipSub messages
pub fn pubsub_receiver(&self) -> Receiver<NetworkEvent> {
self.pubsub_receiver_out.clone()
}
}

/// Builds the transport stack that LibP2P will communicate over
pub fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Error> {
let transport = libp2p::tcp::TcpConfig::new().nodelay(true);
let transport = libp2p::dns::DnsConfig::new(transport);

let transport = libp2p::dns::DnsConfig::new(transport).unwrap();
transport
.upgrade(core::upgrade::Version::V1)
.authenticate(secio::SecioConfig::new(local_key))
Expand Down
Loading

0 comments on commit 477930d

Please sign in to comment.