From 8894162bd929d1dc2ab2a794b3b9b2574ea44b57 Mon Sep 17 00:00:00 2001 From: Tim Vermeulen Date: Wed, 6 Jan 2021 17:29:46 +0100 Subject: [PATCH] Wrap ChainSyncer::state in an Arc and set it to Follow accordingly --- blockchain/chain_sync/src/sync.rs | 16 ++++++++-------- blockchain/chain_sync/src/sync_worker.rs | 21 ++++++++++++++------- 2 files changed, 22 insertions(+), 15 deletions(-) diff --git a/blockchain/chain_sync/src/sync.rs b/blockchain/chain_sync/src/sync.rs index ca36a52b85ca..f052d6113452 100644 --- a/blockchain/chain_sync/src/sync.rs +++ b/blockchain/chain_sync/src/sync.rs @@ -10,7 +10,7 @@ use super::sync_state::SyncState; use super::sync_worker::SyncWorker; use super::{Error, SyncNetworkContext}; use amt::Amt; -use async_std::sync::{channel, Receiver, RwLock, Sender}; +use async_std::sync::{channel, Mutex, Receiver, RwLock, Sender}; use async_std::task::{self, JoinHandle}; use beacon::Beacon; use blocks::{Block, FullTipset, GossipBlock, Tipset, TipsetKeys, TxMeta}; @@ -38,7 +38,7 @@ use std::sync::Arc; type WorkerState = Arc>>>>; #[derive(Debug, PartialEq)] -enum ChainSyncState { +pub enum ChainSyncState { /// Bootstrapping peers before starting sync. Bootstrap, /// Syncing chain with ChainExchange protocol. @@ -78,7 +78,7 @@ impl Default for SyncConfig { /// messages to be able to do the initial sync. pub struct ChainSyncer { /// State of general `ChainSync` protocol. - state: ChainSyncState, + state: Arc>, /// Syncing state of chain sync workers. worker_state: WorkerState, @@ -142,7 +142,7 @@ where ); Ok(Self { - state: ChainSyncState::Bootstrap, + state: Arc::new(Mutex::new(ChainSyncState::Bootstrap)), worker_state: Default::default(), beacon, network, @@ -221,7 +221,7 @@ where .await } NetworkEvent::PubsubMessage { source, message } => { - if self.state != ChainSyncState::Follow { + if *self.state.lock().await != ChainSyncState::Follow { // Ignore gossipsub events if not in following state return; } @@ -375,7 +375,7 @@ where verifier: PhantomData::::default(), req_window: self.sync_config.req_window, } - .spawn(channel) + .spawn(channel, Arc::clone(&self.state)) .await } @@ -424,10 +424,10 @@ where .await; // Only update target on initial sync - if self.state == ChainSyncState::Bootstrap { + if *self.state.lock().await == ChainSyncState::Bootstrap { if let Some(best_target) = self.select_sync_target().await { self.schedule_tipset(best_target).await; - self.state = ChainSyncState::Initial; + *self.state.lock().await = ChainSyncState::Initial; return; } } diff --git a/blockchain/chain_sync/src/sync_worker.rs b/blockchain/chain_sync/src/sync_worker.rs index 78d6c2dd0a35..cdd5ba77e775 100644 --- a/blockchain/chain_sync/src/sync_worker.rs +++ b/blockchain/chain_sync/src/sync_worker.rs @@ -6,13 +6,13 @@ mod full_sync_test; #[cfg(test)] mod validate_block_test; -use super::bad_block_cache::BadBlockCache; use super::sync_state::{SyncStage, SyncState}; +use super::{bad_block_cache::BadBlockCache, sync::ChainSyncState}; use super::{Error, SyncNetworkContext}; use actor::{is_account_actor, power}; use address::Address; use amt::Amt; -use async_std::sync::{Receiver, RwLock}; +use async_std::sync::{Mutex, Receiver, RwLock}; use async_std::task::{self, JoinHandle}; use beacon::{Beacon, BeaconEntry, IGNORE_DRAND_VAR}; use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta}; @@ -77,13 +77,20 @@ where self.state_manager.chain_store() } - pub async fn spawn(self, mut inbound_channel: Receiver>) -> JoinHandle<()> { + pub async fn spawn( + self, + mut inbound_channel: Receiver>, + state: Arc>, + ) -> JoinHandle<()> { task::spawn(async move { while let Some(ts) = inbound_channel.next().await { - if let Err(e) = self.sync(ts).await { - let err = e.to_string(); - warn!("failed to sync tipset: {}", &err); - self.state.write().await.error(err); + match self.sync(ts).await { + Ok(()) => *state.lock().await = ChainSyncState::Follow, + Err(e) => { + let err = e.to_string(); + warn!("failed to sync tipset: {}", &err); + self.state.write().await.error(err); + } } } })