diff --git a/Cargo.lock b/Cargo.lock index ec9853a31ea5..81ad8e8e91ee 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1037,6 +1037,7 @@ dependencies = [ "libp2p", "log", "lru", + "message_pool", "num-traits 0.2.14", "pretty_env_logger", "rand 0.7.3", @@ -2368,7 +2369,6 @@ dependencies = [ "libp2p-bitswap", "libp2p-request-response", "log", - "message_pool", "serde", "smallvec", "tiny-cid", diff --git a/blockchain/chain_sync/Cargo.toml b/blockchain/chain_sync/Cargo.toml index 73c9467c379d..6e027056fc92 100644 --- a/blockchain/chain_sync/Cargo.toml +++ b/blockchain/chain_sync/Cargo.toml @@ -39,6 +39,7 @@ rand = "0.7.3" smallvec = "1.1.0" actor = { path = "../../vm/actor" } interpreter = { path = "../../vm/interpreter/" } +message_pool = { path = "../message_pool" } [dev-dependencies] test_utils = { version = "0.1.0", path = "../../utils/test_utils/", features = ["test_constructors"] } diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index 02d3435013c9..61e3561bdb72 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -24,8 +24,9 @@ use futures::select; use futures::stream::StreamExt; use ipld_blockstore::BlockStore; use libp2p::core::PeerId; -use log::{debug, info, warn}; +use log::{debug, info, trace, warn}; use message::{SignedMessage, UnsignedMessage}; +use message_pool::{MessagePool, Provider}; use state_manager::StateManager; use std::marker::PhantomData; use std::sync::Arc; @@ -48,7 +49,7 @@ enum ChainSyncState { /// Struct that handles the ChainSync logic. This handles incoming network events such as /// gossipsub messages, Hello protocol requests, as well as sending and receiving BlockSync /// messages to be able to do the initial sync. -pub struct ChainSyncer { +pub struct ChainSyncer { /// State of general `ChainSync` protocol. state: ChainSyncState, @@ -84,17 +85,21 @@ pub struct ChainSyncer { /// Proof verification implementation. verifier: PhantomData, + + mpool: Arc>, } -impl ChainSyncer +impl ChainSyncer where TBeacon: Beacon + Sync + Send + 'static, DB: BlockStore + Sync + Send + 'static, V: ProofVerifier + Sync + Send + 'static, + M: Provider + Sync + Send + 'static, { pub fn new( state_manager: Arc>, beacon: Arc, + mpool: Arc>, network_send: Sender, network_rx: Receiver, genesis: Arc, @@ -118,6 +123,7 @@ where active_sync_tipsets: SyncBucketSet::default(), next_sync_target: None, verifier: Default::default(), + mpool, }) } @@ -229,9 +235,13 @@ where warn!("failed to inform new head from peer {}", source); } } - // ignore pubsub messages because they get handled in the service - // and get added into the mempool - _ => () + forest_libp2p::PubsubMessage::Message(m) => { + // add message to message pool + // TODO handle adding message to mempool in seperate task. + if let Err(e) = self.mpool.add(m).await { + trace!("Gossip Message failed to be added to Message pool: {}", e); + } + } } } // All other network events are being ignored currently @@ -487,10 +497,12 @@ mod tests { use super::*; use async_std::sync::channel; use async_std::sync::Sender; + use async_std::task; use beacon::MockBeacon; use db::MemoryDB; use fil_types::verifier::MockVerifier; use forest_libp2p::NetworkEvent; + use message_pool::{test_provider::TestApi, MessagePool}; use state_manager::StateManager; use std::sync::Arc; use std::time::Duration; @@ -499,12 +511,19 @@ mod tests { fn chain_syncer_setup( db: Arc, ) -> ( - ChainSyncer, + ChainSyncer, Sender, Receiver, ) { let chain_store = Arc::new(ChainStore::new(db.clone())); - + let test_provider = TestApi::default(); + let mpool = task::block_on(MessagePool::new( + test_provider, + "test".to_string(), + Default::default(), + )) + .unwrap(); + let mpool = Arc::new(mpool); let (local_sender, test_receiver) = channel(20); let (event_sender, event_receiver) = channel(20); @@ -518,6 +537,7 @@ mod tests { ChainSyncer::new( Arc::new(StateManager::new(chain_store)), beacon, + mpool, local_sender, event_receiver, genesis_ts, diff --git a/blockchain/chain_sync/src/sync/peer_test.rs b/blockchain/chain_sync/src/sync/peer_test.rs index 6881e59b09dd..5404accfa5b3 100644 --- a/blockchain/chain_sync/src/sync/peer_test.rs +++ b/blockchain/chain_sync/src/sync/peer_test.rs @@ -11,6 +11,7 @@ use db::MemoryDB; use fil_types::verifier::MockVerifier; use forest_libp2p::{hello::HelloRequest, rpc::ResponseChannel}; use libp2p::core::PeerId; +use message_pool::{test_provider::TestApi, MessagePool}; use state_manager::StateManager; use std::time::Duration; @@ -20,6 +21,14 @@ fn peer_manager_update() { let chain_store = Arc::new(ChainStore::new(db.clone())); + let mpool = task::block_on(MessagePool::new( + TestApi::default(), + "test".to_string(), + Default::default(), + )) + .unwrap(); + let mpool = Arc::new(mpool); + let (local_sender, _test_receiver) = channel(20); let (event_sender, event_receiver) = channel(20); @@ -37,9 +46,10 @@ fn peer_manager_update() { let genesis_ts = Arc::new(Tipset::new(vec![dummy_header]).unwrap()); let beacon = Arc::new(MockBeacon::new(Duration::from_secs(1))); let state_manager = Arc::new(StateManager::new(chain_store)); - let cs = ChainSyncer::<_, _, MockVerifier>::new( + let cs = ChainSyncer::<_, _, MockVerifier, TestApi>::new( state_manager, beacon, + mpool, local_sender, event_receiver, genesis_ts.clone(), diff --git a/forest/src/daemon.rs b/forest/src/daemon.rs index 73b7c84e9839..3d28a1b7d391 100644 --- a/forest/src/daemon.rs +++ b/forest/src/daemon.rs @@ -134,13 +134,7 @@ pub(super) async fn start(config: Config) { ); // Libp2p service setup - let p2p_service = Libp2pService::new( - config.network, - chain_store, - Arc::clone(&mpool), - net_keypair, - &network_name, - ); + let p2p_service = Libp2pService::new(config.network, chain_store, net_keypair, &network_name); let network_rx = p2p_service.network_receiver(); let network_send = p2p_service.network_sender(); @@ -158,9 +152,10 @@ pub(super) async fn start(config: Config) { // Initialize ChainSyncer // TODO allow for configuring validation strategy (defaulting to full validation) - let chain_syncer = ChainSyncer::<_, _, FullVerifier>::new( + let chain_syncer = ChainSyncer::<_, _, FullVerifier, _>::new( Arc::clone(&state_manager), Arc::new(beacon), + Arc::clone(&mpool), network_send.clone(), network_rx, Arc::new(genesis), diff --git a/node/forest_libp2p/Cargo.toml b/node/forest_libp2p/Cargo.toml index 025e0fbe11a0..ac1d3821c724 100644 --- a/node/forest_libp2p/Cargo.toml +++ b/node/forest_libp2p/Cargo.toml @@ -41,7 +41,6 @@ libp2p-bitswap = { git = "https://github.com/ChainSafe/libp2p-bitswap", rev = "b tiny-cid = "0.2.0" ipld_blockstore = { path = "../../ipld/blockstore" } async-trait = "0.1" -message_pool = { path = "../../blockchain/message_pool" } [dev-dependencies] forest_address = { path = "../../vm/address" } diff --git a/node/forest_libp2p/src/service.rs b/node/forest_libp2p/src/service.rs index 64736e87f1b0..654f1166689c 100644 --- a/node/forest_libp2p/src/service.rs +++ b/node/forest_libp2p/src/service.rs @@ -26,7 +26,6 @@ use libp2p::{ }; use libp2p_request_response::{RequestId, ResponseChannel}; use log::{debug, error, info, trace, warn}; -use message_pool::{MessagePool, Provider}; use std::collections::HashMap; use std::io::{Error, ErrorKind}; use std::sync::Arc; @@ -100,10 +99,9 @@ pub enum NetworkMessage { }, } /// The Libp2pService listens to events from the Libp2p swarm. -pub struct Libp2pService { +pub struct Libp2pService { pub swarm: Swarm, cs: Arc>, - mpool: Arc>, /// Keeps track of Blocksync requests to responses bs_request_table: HashMap>, network_receiver_in: Receiver, @@ -114,16 +112,14 @@ pub struct Libp2pService { bitswap_response_channels: HashMap>>, } -impl Libp2pService +impl Libp2pService where DB: BlockStore + Sync + Send + 'static, - T: Provider + Sync + Send + 'static, { /// Constructs a Libp2pService pub fn new( config: Libp2pConfig, cs: Arc>, - mpool: Arc>, net_keypair: Keypair, network_name: &str, ) -> Self { @@ -155,7 +151,6 @@ where Libp2pService { swarm, cs, - mpool, bs_request_table: HashMap::new(), network_receiver_in, network_sender_in, @@ -216,14 +211,8 @@ where Ok(m) => { emit_event(&self.network_sender_out, NetworkEvent::PubsubMessage{ source, - message: PubsubMessage::Message(m.clone()), + message: PubsubMessage::Message(m), }).await; - // add message to message pool - // TODO handle adding message to mempool in seperate task. - // Not ideal that it could potentially block network thread - if let Err(e) = self.mpool.add(m).await { - trace!("Gossip Message failed to be added to Message pool: {}", e); - } } Err(e) => warn!("Gossip Message from peer {:?} could not be deserialized: {}", source, e) }