diff --git a/.gitignore b/.gitignore index 419691178107..a9eba72f6620 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ **/*.rs.bk /Cargo.lock .idea/ -config.toml +config*.toml .DS_STORE .vscode -/chain_db +/test_dir* diff --git a/blockchain/chain/src/store/chain_store.rs b/blockchain/chain/src/store/chain_store.rs index 89808f39c829..afcce827052b 100644 --- a/blockchain/chain/src/store/chain_store.rs +++ b/blockchain/chain/src/store/chain_store.rs @@ -14,13 +14,13 @@ use std::sync::Arc; const GENESIS_KEY: &str = "gen_block"; /// Generic implementation of the datastore trait and structures -pub struct ChainStore<'db, DB> { +pub struct ChainStore { // TODO add IPLD Store // TODO add StateTreeLoader // TODO add a pubsub channel that publishes an event every time the head changes. // key-value datastore - db: &'db DB, + db: Arc, // Tipset at the head of the best-known chain. // TODO revisit if this should be pointer to tipset on heap @@ -30,12 +30,12 @@ pub struct ChainStore<'db, DB> { tip_index: TipIndex, } -impl<'db, DB> ChainStore<'db, DB> +impl ChainStore where DB: BlockStore, { /// constructor - pub fn new(db: &'db DB) -> Self { + pub fn new(db: Arc) -> Self { // TODO pull heaviest tipset from data storage let heaviest = Arc::new(Tipset::new(vec![BlockHeader::default()]).unwrap()); Self { @@ -171,7 +171,7 @@ mod tests { fn genesis_test() { let db = db::MemoryDB::default(); - let cs = ChainStore::new(&db); + let cs = ChainStore::new(Arc::new(db)); let gen_block = BlockHeader::builder() .epoch(1.into()) .weight((2 as u32).into()) diff --git a/blockchain/chain_sync/src/lib.rs b/blockchain/chain_sync/src/lib.rs index 11a92d0f8e18..4796c4efa908 100644 --- a/blockchain/chain_sync/src/lib.rs +++ b/blockchain/chain_sync/src/lib.rs @@ -6,6 +6,7 @@ mod errors; mod manager; mod network_context; mod network_handler; +mod peer_manager; mod sync; pub use self::errors::Error; diff --git a/blockchain/chain_sync/src/network_handler.rs b/blockchain/chain_sync/src/network_handler.rs index 3221444022fc..7d2a292950be 100644 --- a/blockchain/chain_sync/src/network_handler.rs +++ b/blockchain/chain_sync/src/network_handler.rs @@ -1,12 +1,14 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use super::peer_manager::PeerManager; use async_std::prelude::*; use async_std::sync::{Receiver, Sender}; use async_std::task; use forest_libp2p::rpc::{RPCResponse, RequestId}; use forest_libp2p::NetworkEvent; use log::trace; +use std::sync::Arc; pub(crate) type RPCReceiver = Receiver<(RequestId, RPCResponse)>; pub(crate) type RPCSender = Sender<(RequestId, RPCResponse)>; @@ -31,10 +33,11 @@ impl NetworkHandler { } } - pub(crate) fn spawn(&self) { + pub(crate) fn spawn(&self, peer_manager: Arc) { let mut receiver = self.receiver.clone(); let rpc_send = self.rpc_send.clone(); let event_send = self.event_send.clone(); + task::spawn(async move { loop { match receiver.next().await { @@ -44,6 +47,11 @@ impl NetworkHandler { } // Pass any non RPC responses through event channel Some(event) => { + // Update peer on this thread before sending hello + if let NetworkEvent::Hello { source, .. } = &event { + peer_manager.add_peer(source.clone()).await; + } + // TODO revisit, doing this to avoid blocking this thread but can handle better if !event_send.is_full() { event_send.send(event).await diff --git a/blockchain/chain_sync/src/peer_manager.rs b/blockchain/chain_sync/src/peer_manager.rs new file mode 100644 index 000000000000..13457f21dc21 --- /dev/null +++ b/blockchain/chain_sync/src/peer_manager.rs @@ -0,0 +1,44 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use async_std::sync::RwLock; +use libp2p::core::PeerId; +use log::debug; +use std::collections::HashSet; + +/// Thread safe peer manager +#[derive(Default)] +pub struct PeerManager { + /// Hash set of full peers available + full_peers: RwLock>, +} + +impl PeerManager { + /// Adds a PeerId to the set of managed peers + pub async fn add_peer(&self, peer_id: PeerId) { + debug!("Added PeerId to full peers list: {}", &peer_id); + self.full_peers.write().await.insert(peer_id); + } + + /// Returns true if peer set is empty + pub async fn is_empty(&self) -> bool { + self.full_peers.read().await.is_empty() + } + + /// Retrieves a cloned PeerId to be used to send network request + pub async fn get_peer(&self) -> Option { + // TODO replace this with a shuffled or more random sample + self.full_peers.read().await.iter().next().cloned() + } + + /// Removes a peer from the set and returns true if the value was present previously + pub async fn remove_peer(&self, peer_id: &PeerId) -> bool { + // TODO replace this with a shuffled or more random sample + self.full_peers.write().await.remove(&peer_id) + } + + /// Gets count of full peers managed + pub async fn len(&self) -> usize { + self.full_peers.read().await.len() + } +} diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 0315bda366f1..8a76e6f46e03 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -1,22 +1,27 @@ // Copyright 2020 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +#[cfg(test)] +mod peer_test; + use super::network_handler::NetworkHandler; +use super::peer_manager::PeerManager; use super::{Error, SyncManager, SyncNetworkContext}; use address::Address; use amt::AMT; -use async_std::sync::channel; -use async_std::sync::{Receiver, Sender}; +use async_std::sync::{channel, Receiver, Sender}; +use async_std::task; use blocks::{Block, BlockHeader, FullTipset, TipSetKeys, Tipset, TxMeta}; use chain::ChainStore; use cid::Cid; +use core::time::Duration; use crypto::is_valid_signature; use db::Error as DBError; use encoding::{Cbor, Error as EncodingError}; use forest_libp2p::{NetworkEvent, NetworkMessage}; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; -use log::{info, warn}; +use log::{debug, info, warn}; use lru::LruCache; use message::Message; use num_bigint::BigUint; @@ -24,6 +29,7 @@ use state_manager::StateManager; use state_tree::{HamtStateTree, StateTree}; use std::cmp::min; use std::collections::HashMap; +use std::sync::Arc; #[derive(PartialEq, Debug, Clone)] /// Current state of the ChainSyncer @@ -42,18 +48,18 @@ enum SyncState { _Follow, } -pub struct ChainSyncer<'db, DB, ST> { +pub struct ChainSyncer { /// Syncing state of chain sync _state: SyncState, /// manages retrieving and updates state objects - state_manager: StateManager<'db, DB, ST>, + state_manager: StateManager, /// manages sync buckets sync_manager: SyncManager, /// access and store tipsets / blocks / messages - chain_store: ChainStore<'db, DB>, + chain_store: ChainStore, /// Context to be able to send requests to p2p network network: SyncNetworkContext, @@ -67,6 +73,9 @@ pub struct ChainSyncer<'db, DB, ST> { /// incoming network events to be handled by syncer net_handler: NetworkHandler, + + /// Peer manager to handle full peers to send ChainSync requests to + peer_manager: Arc, } /// Message data used to ensure valid state transition @@ -75,18 +84,18 @@ struct MsgMetaData { sequence: u64, } -impl<'db, DB> ChainSyncer<'db, DB, HamtStateTree> +impl ChainSyncer where DB: BlockStore, { pub fn new( - db: &'db DB, + db: Arc, network_send: Sender, network_rx: Receiver, ) -> Result { let sync_manager = SyncManager::default(); - let chain_store = ChainStore::new(db); + let chain_store = ChainStore::new(db.clone()); let _genesis = match chain_store.genesis()? { Some(gen) => Tipset::new(vec![gen])?, None => { @@ -104,6 +113,8 @@ where let network = SyncNetworkContext::new(network_send, rpc_rx, event_rx); + let peer_manager = Arc::new(PeerManager::default()); + let net_handler = NetworkHandler::new(network_rx, rpc_send, event_send); Ok(Self { @@ -115,18 +126,34 @@ where sync_manager, bad_blocks: LruCache::new(1 << 15), net_handler, + peer_manager, }) } } -impl<'db, DB, ST> ChainSyncer<'db, DB, ST> +impl ChainSyncer where DB: BlockStore, ST: StateTree, { /// Starts syncing process pub async fn sync(mut self) -> Result<(), Error> { - self.net_handler.spawn(); + self.net_handler.spawn(Arc::clone(&self.peer_manager)); + + info!("Bootstrapping peers to sync"); + + // Bootstrap peers before syncing + // TODO increase bootstrap peer count before syncing + const MIN_PEERS: usize = 1; + loop { + let peer_count = self.peer_manager.len().await; + if peer_count < MIN_PEERS { + debug!("bootstrapping peers, have {}", peer_count); + task::sleep(Duration::from_secs(2)).await; + } else { + break; + } + } info!("Starting chain sync"); @@ -391,7 +418,15 @@ where let window = min(epoch_diff, REQUEST_WINDOW); // TODO change from using random peerID to managed - let peer_id = PeerId::random(); + while self.peer_manager.is_empty().await { + warn!("No valid peers to sync, waiting for other nodes"); + task::sleep(Duration::from_secs(5)).await; + } + let peer_id = self + .peer_manager + .get_peer() + .await + .expect("Peer set is not empty here"); // Load blocks from network using blocksync let tipsets: Vec = match self @@ -402,6 +437,7 @@ where Ok(ts) => ts, Err(e) => { warn!("Failed blocksync request to peer {:?}: {}", peer_id, e); + self.peer_manager.remove_peer(&peer_id).await; continue; } }; diff --git a/blockchain/chain_sync/src/sync/peer_test.rs b/blockchain/chain_sync/src/sync/peer_test.rs new file mode 100644 index 000000000000..16e6a3543398 --- /dev/null +++ b/blockchain/chain_sync/src/sync/peer_test.rs @@ -0,0 +1,42 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +use super::*; +use async_std::sync::channel; +use async_std::task; +use db::MemoryDB; +use forest_libp2p::hello::HelloMessage; +use libp2p::core::PeerId; +use std::time::Duration; + +#[test] +fn peer_manager_update() { + let db = MemoryDB::default(); + let (local_sender, _test_receiver) = channel(20); + let (event_sender, event_receiver) = channel(20); + + let cs = ChainSyncer::new(Arc::new(db), local_sender, event_receiver).unwrap(); + let peer_manager = Arc::clone(&cs.peer_manager); + + task::spawn(async { + cs.sync().await.unwrap(); + }); + + let source = PeerId::random(); + let source_clone = source.clone(); + + task::block_on(async { + event_sender + .send(NetworkEvent::Hello { + message: HelloMessage::default(), + source, + }) + .await; + + // Would be ideal to not have to sleep here and have it deterministic + task::sleep(Duration::from_millis(50)).await; + + assert_eq!(peer_manager.len().await, 1); + assert_eq!(peer_manager.get_peer().await, Some(source_clone)); + }); +} diff --git a/blockchain/chain_sync/tests/syncer_test.rs b/blockchain/chain_sync/tests/syncer_test.rs index 847a5d9c1886..15becf6706a2 100644 --- a/blockchain/chain_sync/tests/syncer_test.rs +++ b/blockchain/chain_sync/tests/syncer_test.rs @@ -4,6 +4,7 @@ use async_std::sync::channel; use chain_sync::ChainSyncer; use db::MemoryDB; +use std::sync::Arc; #[test] fn chainsync_constructor() { @@ -13,5 +14,5 @@ fn chainsync_constructor() { // Test just makes sure that the chain syncer can be created without using a live database or // p2p network (local channels to simulate network messages and responses) - let _chain_syncer = ChainSyncer::new(&db, local_sender, event_receiver).unwrap(); + let _chain_syncer = ChainSyncer::new(Arc::new(db), local_sender, event_receiver).unwrap(); } diff --git a/blockchain/state_manager/src/lib.rs b/blockchain/state_manager/src/lib.rs index 73e373e6df6b..53c6ab52daed 100644 --- a/blockchain/state_manager/src/lib.rs +++ b/blockchain/state_manager/src/lib.rs @@ -9,20 +9,21 @@ use address::Address; use blockstore::BlockStore; use encoding::de::DeserializeOwned; use state_tree::StateTree; +use std::sync::Arc; /// Intermediary for retrieving state objects and updating actor states -pub struct StateManager<'db, DB, ST> { - bs: &'db DB, +pub struct StateManager { + bs: Arc, tree: ST, } -impl<'db, DB, ST> StateManager<'db, DB, ST> +impl StateManager where ST: StateTree, DB: BlockStore, { /// constructor - pub fn new(bs: &'db DB, tree: ST) -> Self { + pub fn new(bs: Arc, tree: ST) -> Self { Self { bs, tree } } /// Loads actor state from IPLD Store diff --git a/forest/src/cli/config.rs b/forest/src/cli/config.rs index b5dc358dd04e..4b1a8330adac 100644 --- a/forest/src/cli/config.rs +++ b/forest/src/cli/config.rs @@ -3,10 +3,20 @@ use forest_libp2p::Libp2pConfig; use serde::Deserialize; +use utils::get_home_dir; -#[derive(Debug, Deserialize, Default)] +#[derive(Debug, Deserialize)] #[serde(default)] pub struct Config { pub network: Libp2pConfig, pub data_dir: String, } + +impl Default for Config { + fn default() -> Self { + Self { + network: Libp2pConfig::default(), + data_dir: get_home_dir() + "/.forest", + } + } +} diff --git a/forest/src/main.rs b/forest/src/main.rs index c98642f0de00..aebdaa784fdb 100644 --- a/forest/src/main.rs +++ b/forest/src/main.rs @@ -46,14 +46,18 @@ fn main() { // Capture CLI inputs let config = cli().expect("CLI error"); - let net_keypair = match get_keypair(&"/.forest/libp2p/keypair") { + let net_keypair = match get_keypair(&format!("{}{}", &config.data_dir, "/libp2p/keypair")) { Some(kp) => kp, None => { // Keypair not found, generate and save generated keypair let gen_keypair = ed25519::Keypair::generate(); // Save Ed25519 keypair to file // TODO rename old file to keypair.old(?) - if let Err(e) = write_to_file(&gen_keypair.encode(), &"/.forest/libp2p/", "keypair") { + if let Err(e) = write_to_file( + &gen_keypair.encode(), + &format!("{}{}", &config.data_dir, "/libp2p/"), + "keypair", + ) { info!("Could not write keystore to disk!"); trace!("Error {:?}", e); }; @@ -72,10 +76,10 @@ fn main() { }); let sync_thread = task::spawn(async { // Initialize database - let mut db = RocksDb::new("chain_db"); + let mut db = RocksDb::new(config.data_dir + "/db"); db.open().unwrap(); - let chain_syncer = ChainSyncer::new(&db, network_send, network_rx).unwrap(); + let chain_syncer = ChainSyncer::new(Arc::new(db), network_send, network_rx).unwrap(); chain_syncer.sync().await.unwrap(); }); diff --git a/node/forest_libp2p/src/hello/message.rs b/node/forest_libp2p/src/hello/message.rs index d062d194ae06..0b94274bdcf8 100644 --- a/node/forest_libp2p/src/hello/message.rs +++ b/node/forest_libp2p/src/hello/message.rs @@ -92,3 +92,16 @@ impl<'de> Deserialize<'de> for HelloResponse { Ok(HelloResponse { arrival, sent }) } } + +#[cfg(test)] +mod tests { + use super::*; + use forest_encoding::*; + + #[test] + fn hello_default_ser() { + let bz = to_vec(&HelloMessage::default()).unwrap(); + let msg: HelloMessage = from_slice(&bz).unwrap(); + assert_eq!(msg, HelloMessage::default()); + } +} diff --git a/node/forest_libp2p/src/rpc/handler.rs b/node/forest_libp2p/src/rpc/handler.rs index 111c7387055d..e5ad87388e7f 100644 --- a/node/forest_libp2p/src/rpc/handler.rs +++ b/node/forest_libp2p/src/rpc/handler.rs @@ -12,6 +12,7 @@ use libp2p::swarm::{ KeepAlive, ProtocolsHandler, ProtocolsHandlerEvent, ProtocolsHandlerUpgrErr, SubstreamProtocol, }; use libp2p::{InboundUpgrade, OutboundUpgrade}; +use log::error; use smallvec::SmallVec; use std::{ pin::Pin, @@ -242,8 +243,7 @@ where > { if let Some(err) = self.pending_error.take() { // Log error, shouldn't necessarily return error and drop peer here - // TODO add logger to RPCHandler and use here - dbg!(&err); + error!("{}", err); } // return any events that need to be reported @@ -341,21 +341,18 @@ where } // establish outbound substreams - if !self.dial_queue.is_empty() { - if self.dial_negotiated < self.max_dial_negotiated { - self.dial_negotiated += 1; - let event = self.dial_queue.remove(0); - self.dial_queue.shrink_to_fit(); - if let RPCEvent::Request(id, req) = event { - return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { - protocol: SubstreamProtocol::new(req.clone()), - info: RPCEvent::Request(id, req), - }); - } - } - } else { + if !self.dial_queue.is_empty() && self.dial_negotiated < self.max_dial_negotiated { + self.dial_negotiated += 1; + let event = self.dial_queue.remove(0); self.dial_queue.shrink_to_fit(); + if let RPCEvent::Request(id, req) = event { + return Poll::Ready(ProtocolsHandlerEvent::OutboundSubstreamRequest { + protocol: SubstreamProtocol::new(req.clone()), + info: RPCEvent::Request(id, req), + }); + } } + Poll::Pending } } diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index dde784e9f8bc..8fc81a78c1bc 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -17,10 +17,10 @@ use libp2p::{ identity::{ed25519, Keypair}, mplex, secio, yamux, PeerId, Swarm, Transport, }; -use log::{debug, error, info, trace}; +use log::{debug, info, trace, warn}; use std::io::{Error, ErrorKind}; use std::time::Duration; -use utils::{get_home_dir, read_file_to_vec}; +use utils::read_file_to_vec; type Libp2pStream = Boxed<(PeerId, StreamMuxerBox), Error>; type Libp2pBehaviour = ForestBehaviour>; @@ -68,8 +68,6 @@ impl Libp2pService { pub fn new(config: &Libp2pConfig, net_keypair: Keypair) -> Self { let peer_id = PeerId::from(net_keypair.public()); - info!("Local peer id: {:?}", peer_id); - let transport = build_transport(net_keypair.clone()); let mut swarm = { @@ -77,13 +75,13 @@ impl Libp2pService { Swarm::new(transport, be, peer_id) }; - for node in config.bootstrap_peers.clone() { + for node in &config.bootstrap_peers { match node.parse() { Ok(to_dial) => match Swarm::dial_addr(&mut swarm, to_dial) { Ok(_) => debug!("Dialed {:?}", node), - Err(e) => debug!("Dial {:?} failed: {:?}", node, e), + Err(e) => warn!("Dial {:?} failed: {:?}", node, e), }, - Err(err) => error!("Failed to parse address to dial: {:?}", err), + Err(err) => warn!("Failed to parse address to dial: {:?}", err), } } @@ -115,19 +113,24 @@ impl Libp2pService { pub async fn run(self) { let mut swarm_stream = self.swarm.fuse(); let mut network_stream = self.network_receiver_in.fuse(); + loop { select! { swarm_event = swarm_stream.next() => match swarm_event { Some(event) => match event { ForestBehaviourEvent::PeerDialed(peer_id) => { - info!("Peer dialed, {:?}", peer_id); - // TODO add sending hello after genesis setup + debug!("Peer dialed, {:?}", peer_id); + // TODO send non-default Hello Message + swarm_stream.get_mut().send_rpc( + peer_id, + RPCEvent::Request(0, RPCRequest::Hello(HelloMessage::default())), + ); } ForestBehaviourEvent::PeerDisconnected(peer_id) => { - info!("Peer disconnected, {:?}", peer_id); + debug!("Peer disconnected, {:?}", peer_id); } ForestBehaviourEvent::DiscoveredPeer(peer) => { - info!("Discovered: {:?}", peer); + debug!("Discovered: {:?}", peer); libp2p::Swarm::dial(&mut swarm_stream.get_mut(), peer); } ForestBehaviourEvent::ExpiredPeer(_) => {} @@ -136,7 +139,7 @@ impl Libp2pService { topics, message, } => { - info!("Got a Gossip Message from {:?}", source); + debug!("Got a Gossip Message from {:?}", source); self.network_sender_out.send(NetworkEvent::PubsubMessage { source, topics, @@ -144,7 +147,7 @@ impl Libp2pService { }).await; } ForestBehaviourEvent::RPC(peer_id, event) => { - info!("RPC event {:?}", event); + debug!("RPC event {:?}", event); match event { RPCEvent::Response(req_id, res) => { self.network_sender_out.send(NetworkEvent::RPCResponse { @@ -215,8 +218,7 @@ pub fn build_transport(local_key: Keypair) -> Boxed<(PeerId, StreamMuxerBox), Er /// Fetch keypair from disk, returning none if it cannot be decoded pub fn get_keypair(path: &str) -> Option { - let path_to_keystore = get_home_dir() + path; - match read_file_to_vec(&path_to_keystore) { + match read_file_to_vec(&path) { Err(e) => { info!("Networking keystore not found!"); trace!("Error {:?}", e); @@ -224,7 +226,7 @@ pub fn get_keypair(path: &str) -> Option { } Ok(mut vec) => match ed25519::Keypair::decode(&mut vec) { Ok(kp) => { - info!("Recovered keystore from {:?}", &path_to_keystore); + info!("Recovered keystore from {:?}", &path); Some(Keypair::Ed25519(kp)) } Err(e) => { diff --git a/vm/interpreter/src/lib.rs b/vm/interpreter/src/lib.rs index 7ddbf04af350..bf2a596463fb 100644 --- a/vm/interpreter/src/lib.rs +++ b/vm/interpreter/src/lib.rs @@ -27,9 +27,9 @@ impl VMInterpreter { /// Applies the state transition for a single message /// Returns result StateTree, receipts from the transaction, and the miner penalty token amount - pub fn apply_message<'db, DB, ST>( + pub fn apply_message( _in_tree: &ST, - _chain: &ChainStore<'db, DB>, + _chain: &ChainStore, _msg: &UnsignedMessage, _miner_addr: &Address, ) -> (ST, MessageReceipt, TokenAmount)