Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wrap ChainSyncer::state in an Arc<Mutex<_>> and set it to Follow accordingly #914

Merged
merged 1 commit into from
Jan 7, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions blockchain/chain_sync/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use super::sync_state::SyncState;
use super::sync_worker::SyncWorker;
use super::{Error, SyncNetworkContext};
use amt::Amt;
use async_std::sync::{channel, Receiver, RwLock, Sender};
use async_std::sync::{channel, Mutex, Receiver, RwLock, Sender};
use async_std::task::{self, JoinHandle};
use beacon::Beacon;
use blocks::{Block, FullTipset, GossipBlock, Tipset, TipsetKeys, TxMeta};
Expand Down Expand Up @@ -38,7 +38,7 @@ use std::sync::Arc;
type WorkerState = Arc<RwLock<Vec<Arc<RwLock<SyncState>>>>>;

#[derive(Debug, PartialEq)]
enum ChainSyncState {
pub enum ChainSyncState {
/// Bootstrapping peers before starting sync.
Bootstrap,
/// Syncing chain with ChainExchange protocol.
Expand Down Expand Up @@ -78,7 +78,7 @@ impl Default for SyncConfig {
/// messages to be able to do the initial sync.
pub struct ChainSyncer<DB, TBeacon, V, M> {
/// State of general `ChainSync` protocol.
state: ChainSyncState,
state: Arc<Mutex<ChainSyncState>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is mutex over rwlock required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not, but does RwLock have any benefits if we're not expecting concurrent reads?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I just was asking because there doesn't seem to be a reason to have the additional constraint (just thinking for future proofing)

It's definitely fine if this stays as is, this will probably be replaced in the future anyway


/// Syncing state of chain sync workers.
worker_state: WorkerState,
Expand Down Expand Up @@ -142,7 +142,7 @@ where
);

Ok(Self {
state: ChainSyncState::Bootstrap,
state: Arc::new(Mutex::new(ChainSyncState::Bootstrap)),
worker_state: Default::default(),
beacon,
network,
Expand Down Expand Up @@ -221,7 +221,7 @@ where
.await
}
NetworkEvent::PubsubMessage { source, message } => {
if self.state != ChainSyncState::Follow {
if *self.state.lock().await != ChainSyncState::Follow {
// Ignore gossipsub events if not in following state
return;
}
Expand Down Expand Up @@ -375,7 +375,7 @@ where
verifier: PhantomData::<V>::default(),
req_window: self.sync_config.req_window,
}
.spawn(channel)
.spawn(channel, Arc::clone(&self.state))
.await
}

Expand Down Expand Up @@ -424,10 +424,10 @@ where
.await;

// Only update target on initial sync
if self.state == ChainSyncState::Bootstrap {
if *self.state.lock().await == ChainSyncState::Bootstrap {
if let Some(best_target) = self.select_sync_target().await {
self.schedule_tipset(best_target).await;
self.state = ChainSyncState::Initial;
*self.state.lock().await = ChainSyncState::Initial;
Comment on lines +427 to +430
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this keeps the lock and will deadlock here, has this been tested?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not tested, but I'm quite certain that if *self.state.lock().await does not keep the guard around since it's not bound to a variable, if that's what you're referring to. (playground that uses std's Mutex)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've had issues with async-std mutexes holding the lock in this exact case in the past, not sure if it's changed or only applies to RwLock but can double check

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, there was a specific case where it deadlocked similar to this, and moving the read outside of the if removed the deadlock, I just tested and this works

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh yeah, I can totally see that happening. Thanks for pointing it out either way, I could have easily messed this up

return;
}
}
Expand Down
21 changes: 14 additions & 7 deletions blockchain/chain_sync/src/sync_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ mod full_sync_test;
#[cfg(test)]
mod validate_block_test;

use super::bad_block_cache::BadBlockCache;
use super::sync_state::{SyncStage, SyncState};
use super::{bad_block_cache::BadBlockCache, sync::ChainSyncState};
use super::{Error, SyncNetworkContext};
use actor::{is_account_actor, power};
use address::Address;
use amt::Amt;
use async_std::sync::{Receiver, RwLock};
use async_std::sync::{Mutex, Receiver, RwLock};
use async_std::task::{self, JoinHandle};
use beacon::{Beacon, BeaconEntry, IGNORE_DRAND_VAR};
use blocks::{Block, BlockHeader, FullTipset, Tipset, TipsetKeys, TxMeta};
Expand Down Expand Up @@ -77,13 +77,20 @@ where
self.state_manager.chain_store()
}

pub async fn spawn(self, mut inbound_channel: Receiver<Arc<Tipset>>) -> JoinHandle<()> {
pub async fn spawn(
self,
mut inbound_channel: Receiver<Arc<Tipset>>,
state: Arc<Mutex<ChainSyncState>>,
) -> JoinHandle<()> {
task::spawn(async move {
while let Some(ts) = inbound_channel.next().await {
if let Err(e) = self.sync(ts).await {
let err = e.to_string();
warn!("failed to sync tipset: {}", &err);
self.state.write().await.error(err);
match self.sync(ts).await {
Ok(()) => *state.lock().await = ChainSyncState::Follow,
Err(e) => {
let err = e.to_string();
warn!("failed to sync tipset: {}", &err);
self.state.write().await.error(err);
}
}
}
})
Expand Down