Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement basic peer manager #252

Merged
merged 11 commits into from
Mar 4, 2020
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
**/*.rs.bk
/Cargo.lock
.idea/
config.toml
config*.toml
.DS_STORE
.vscode
/chain_db
/test_dir*
10 changes: 5 additions & 5 deletions blockchain/chain/src/store/chain_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use std::sync::Arc;
const GENESIS_KEY: &str = "gen_block";

/// Generic implementation of the datastore trait and structures
pub struct ChainStore<'db, DB> {
pub struct ChainStore<DB> {
// TODO add IPLD Store
// TODO add StateTreeLoader
// TODO add a pubsub channel that publishes an event every time the head changes.

// key-value datastore
db: &'db DB,
db: Arc<DB>,

// Tipset at the head of the best-known chain.
// TODO revisit if this should be pointer to tipset on heap
Expand All @@ -30,12 +30,12 @@ pub struct ChainStore<'db, DB> {
tip_index: TipIndex,
}

impl<'db, DB> ChainStore<'db, DB>
impl<DB> ChainStore<DB>
where
DB: BlockStore,
{
/// constructor
pub fn new(db: &'db DB) -> Self {
pub fn new(db: Arc<DB>) -> Self {
// TODO pull heaviest tipset from data storage
let heaviest = Arc::new(Tipset::new(vec![BlockHeader::default()]).unwrap());
Self {
Expand Down Expand Up @@ -171,7 +171,7 @@ mod tests {
fn genesis_test() {
let db = db::MemoryDB::default();

let cs = ChainStore::new(&db);
let cs = ChainStore::new(Arc::new(db));
let gen_block = BlockHeader::builder()
.epoch(1.into())
.weight((2 as u32).into())
Expand Down
1 change: 1 addition & 0 deletions blockchain/chain_sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ mod errors;
mod manager;
mod network_context;
mod network_handler;
mod peer_manager;
mod sync;

pub use self::errors::Error;
Expand Down
10 changes: 9 additions & 1 deletion blockchain/chain_sync/src/network_handler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::peer_manager::PeerManager;
use async_std::prelude::*;
use async_std::sync::{Receiver, Sender};
use async_std::task;
use forest_libp2p::rpc::{RPCResponse, RequestId};
use forest_libp2p::NetworkEvent;
use log::trace;
use std::sync::Arc;

pub(crate) type RPCReceiver = Receiver<(RequestId, RPCResponse)>;
pub(crate) type RPCSender = Sender<(RequestId, RPCResponse)>;
Expand All @@ -31,10 +33,11 @@ impl NetworkHandler {
}
}

pub(crate) fn spawn(&self) {
pub(crate) fn spawn(&self, peer_manager: Arc<PeerManager>) {
let mut receiver = self.receiver.clone();
let rpc_send = self.rpc_send.clone();
let event_send = self.event_send.clone();

task::spawn(async move {
loop {
match receiver.next().await {
Expand All @@ -44,6 +47,11 @@ impl NetworkHandler {
}
// Pass any non RPC responses through event channel
Some(event) => {
// Update peer on this thread before sending hello
if let NetworkEvent::Hello { source, .. } = &event {
peer_manager.add_peer(source.clone()).await;
}

// TODO revisit, doing this to avoid blocking this thread but can handle better
if !event_send.is_full() {
event_send.send(event).await
Expand Down
44 changes: 44 additions & 0 deletions blockchain/chain_sync/src/peer_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use async_std::sync::RwLock;
use libp2p::core::PeerId;
use log::debug;
use std::collections::HashSet;

/// Thread safe peer manager
#[derive(Default)]
pub struct PeerManager {
/// Hash set of full peers available
full_peers: RwLock<HashSet<PeerId>>,
}

impl PeerManager {
/// Adds a PeerId to the set of managed peers
pub async fn add_peer(&self, peer_id: PeerId) {
debug!("Added PeerId to full peers list: {}", &peer_id);
self.full_peers.write().await.insert(peer_id);
}

/// Returns true if peer set is empty
pub async fn is_empty(&self) -> bool {
self.full_peers.read().await.is_empty()
}

/// Retrieves a cloned PeerId to be used to send network request
pub async fn get_peer(&self) -> Option<PeerId> {
// TODO replace this with a shuffled or more random sample
self.full_peers.read().await.iter().next().cloned()
}

/// Removes a peer from the set and returns true if the value was present previously
pub async fn remove_peer(&self, peer_id: &PeerId) -> bool {
// TODO replace this with a shuffled or more random sample
self.full_peers.write().await.remove(&peer_id)
}

/// Gets count of full peers managed
pub async fn len(&self) -> usize {
self.full_peers.read().await.len()
}
}
60 changes: 48 additions & 12 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,35 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

#[cfg(test)]
mod peer_test;

use super::network_handler::NetworkHandler;
use super::peer_manager::PeerManager;
use super::{Error, SyncManager, SyncNetworkContext};
use address::Address;
use amt::AMT;
use async_std::sync::channel;
use async_std::sync::{Receiver, Sender};
use async_std::sync::{channel, Receiver, Sender};
use async_std::task;
use blocks::{Block, BlockHeader, FullTipset, TipSetKeys, Tipset, TxMeta};
use chain::ChainStore;
use cid::Cid;
use core::time::Duration;
use crypto::is_valid_signature;
use db::Error as DBError;
use encoding::{Cbor, Error as EncodingError};
use forest_libp2p::{NetworkEvent, NetworkMessage};
use ipld_blockstore::BlockStore;
use libp2p::core::PeerId;
use log::{info, warn};
use log::{debug, info, warn};
use lru::LruCache;
use message::Message;
use num_bigint::BigUint;
use state_manager::StateManager;
use state_tree::{HamtStateTree, StateTree};
use std::cmp::min;
use std::collections::HashMap;
use std::sync::Arc;

#[derive(PartialEq, Debug, Clone)]
/// Current state of the ChainSyncer
Expand All @@ -42,18 +48,18 @@ enum SyncState {
_Follow,
}

pub struct ChainSyncer<'db, DB, ST> {
pub struct ChainSyncer<DB, ST> {
/// Syncing state of chain sync
_state: SyncState,

/// manages retrieving and updates state objects
state_manager: StateManager<'db, DB, ST>,
state_manager: StateManager<DB, ST>,

/// manages sync buckets
sync_manager: SyncManager,

/// access and store tipsets / blocks / messages
chain_store: ChainStore<'db, DB>,
chain_store: ChainStore<DB>,

/// Context to be able to send requests to p2p network
network: SyncNetworkContext,
Expand All @@ -67,6 +73,9 @@ pub struct ChainSyncer<'db, DB, ST> {

/// incoming network events to be handled by syncer
net_handler: NetworkHandler,

/// Peer manager to handle full peers to send ChainSync requests to
peer_manager: Arc<PeerManager>,
}

/// Message data used to ensure valid state transition
Expand All @@ -75,18 +84,18 @@ struct MsgMetaData {
sequence: u64,
}

impl<'db, DB> ChainSyncer<'db, DB, HamtStateTree>
impl<DB> ChainSyncer<DB, HamtStateTree>
where
DB: BlockStore,
{
pub fn new(
db: &'db DB,
db: Arc<DB>,
network_send: Sender<NetworkMessage>,
network_rx: Receiver<NetworkEvent>,
) -> Result<Self, Error> {
let sync_manager = SyncManager::default();

let chain_store = ChainStore::new(db);
let chain_store = ChainStore::new(db.clone());
let _genesis = match chain_store.genesis()? {
Some(gen) => Tipset::new(vec![gen])?,
None => {
Expand All @@ -104,6 +113,8 @@ where

let network = SyncNetworkContext::new(network_send, rpc_rx, event_rx);

let peer_manager = Arc::new(PeerManager::default());

let net_handler = NetworkHandler::new(network_rx, rpc_send, event_send);

Ok(Self {
Expand All @@ -115,18 +126,34 @@ where
sync_manager,
bad_blocks: LruCache::new(1 << 15),
net_handler,
peer_manager,
})
}
}

impl<'db, DB, ST> ChainSyncer<'db, DB, ST>
impl<DB, ST> ChainSyncer<DB, ST>
where
DB: BlockStore,
ST: StateTree,
{
/// Starts syncing process
pub async fn sync(mut self) -> Result<(), Error> {
self.net_handler.spawn();
self.net_handler.spawn(Arc::clone(&self.peer_manager));

info!("Bootstrapping peers to sync");

// Bootstrap peers before syncing
// TODO increase bootstrap peer count before syncing
const MIN_PEERS: usize = 1;
loop {
let peer_count = self.peer_manager.len().await;
if peer_count < MIN_PEERS {
debug!("bootstrapping peers, have {}", peer_count);
task::sleep(Duration::from_secs(2)).await;
} else {
break;
}
}

info!("Starting chain sync");

Expand Down Expand Up @@ -391,7 +418,15 @@ where
let window = min(epoch_diff, REQUEST_WINDOW);

// TODO change from using random peerID to managed
let peer_id = PeerId::random();
while self.peer_manager.is_empty().await {
warn!("No valid peers to sync, waiting for other nodes");
task::sleep(Duration::from_secs(5)).await;
}
let peer_id = self
.peer_manager
.get_peer()
.await
.expect("Peer set is not empty here");

// Load blocks from network using blocksync
let tipsets: Vec<Tipset> = match self
Expand All @@ -402,6 +437,7 @@ where
Ok(ts) => ts,
Err(e) => {
warn!("Failed blocksync request to peer {:?}: {}", peer_id, e);
self.peer_manager.remove_peer(&peer_id).await;
continue;
}
};
Expand Down
42 changes: 42 additions & 0 deletions blockchain/chain_sync/src/sync/peer_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
// Copyright 2020 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use super::*;
use async_std::sync::channel;
use async_std::task;
use db::MemoryDB;
use forest_libp2p::hello::HelloMessage;
use libp2p::core::PeerId;
use std::time::Duration;

#[test]
fn peer_manager_update() {
let db = MemoryDB::default();
let (local_sender, _test_receiver) = channel(20);
let (event_sender, event_receiver) = channel(20);

let cs = ChainSyncer::new(Arc::new(db), local_sender, event_receiver).unwrap();
let peer_manager = Arc::clone(&cs.peer_manager);

task::spawn(async {
cs.sync().await.unwrap();
});

let source = PeerId::random();
let source_clone = source.clone();

task::block_on(async {
event_sender
.send(NetworkEvent::Hello {
message: HelloMessage::default(),
source,
})
.await;

// Would be ideal to not have to sleep here and have it deterministic
task::sleep(Duration::from_millis(50)).await;

assert_eq!(peer_manager.len().await, 1);
assert_eq!(peer_manager.get_peer().await, Some(source_clone));
});
}
3 changes: 2 additions & 1 deletion blockchain/chain_sync/tests/syncer_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
use async_std::sync::channel;
use chain_sync::ChainSyncer;
use db::MemoryDB;
use std::sync::Arc;

#[test]
fn chainsync_constructor() {
Expand All @@ -13,5 +14,5 @@ fn chainsync_constructor() {

// Test just makes sure that the chain syncer can be created without using a live database or
// p2p network (local channels to simulate network messages and responses)
let _chain_syncer = ChainSyncer::new(&db, local_sender, event_receiver).unwrap();
let _chain_syncer = ChainSyncer::new(Arc::new(db), local_sender, event_receiver).unwrap();
}
9 changes: 5 additions & 4 deletions blockchain/state_manager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,21 @@ use address::Address;
use blockstore::BlockStore;
use encoding::de::DeserializeOwned;
use state_tree::StateTree;
use std::sync::Arc;

/// Intermediary for retrieving state objects and updating actor states
pub struct StateManager<'db, DB, ST> {
bs: &'db DB,
pub struct StateManager<DB, ST> {
bs: Arc<DB>,
tree: ST,
}

impl<'db, DB, ST> StateManager<'db, DB, ST>
impl<DB, ST> StateManager<DB, ST>
where
ST: StateTree,
DB: BlockStore,
{
/// constructor
pub fn new(bs: &'db DB, tree: ST) -> Self {
pub fn new(bs: Arc<DB>, tree: ST) -> Self {
Self { bs, tree }
}
/// Loads actor state from IPLD Store
Expand Down
Loading