Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

sc-consensus-beefy: restart voter on pallet reset #14821

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/consensus/beefy/src/communication/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use std::collections::{HashMap, VecDeque};

/// Report specifying a reputation change for a given peer.
#[derive(Debug, PartialEq)]
pub(crate) struct PeerReport {
pub struct PeerReport {
pub who: PeerId,
pub cost_benefit: ReputationChange,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

use codec::DecodeAll;
use futures::{channel::oneshot, StreamExt};
use log::{debug, error, trace};
use log::{debug, trace};
use sc_client_api::BlockBackend;
use sc_network::{
config as netconfig, config::RequestResponseConfig, types::ProtocolName, PeerId,
Expand Down Expand Up @@ -182,7 +182,9 @@ where
}

/// Run [`BeefyJustifsRequestHandler`].
pub async fn run(mut self) {
///
/// Should never end, returns `Error` otherwise.
pub async fn run(&mut self) -> Error {
trace!(target: BEEFY_SYNC_LOG_TARGET, "🥩 Running BeefyJustifsRequestHandler");

while let Ok(request) = self
Expand Down Expand Up @@ -215,9 +217,6 @@ where
},
}
}
error!(
target: crate::LOG_TARGET,
"🥩 On-demand requests receiver stream terminated, closing worker."
);
Error::RequestsReceiverStreamClosed
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct JustificationRequest<B: Block> {
}

#[derive(Debug, thiserror::Error)]
pub(crate) enum Error {
pub enum Error {
#[error(transparent)]
Client(#[from] sp_blockchain::Error),

Expand All @@ -102,4 +102,7 @@ pub(crate) enum Error {

#[error("Internal error while getting response.")]
ResponseError,

#[error("On-demand requests receiver stream terminated.")]
RequestsReceiverStreamClosed,
}
12 changes: 11 additions & 1 deletion client/consensus/beefy/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,18 @@ pub enum Error {
Signature(String),
#[error("Session uninitialized")]
UninitSession,
#[error("pallet-beefy was reset, please restart voter")]
#[error("pallet-beefy was reset")]
ConsensusReset,
#[error("Block import stream terminated")]
BlockImportStreamTerminated,
#[error("Gossip Engine terminated")]
GossipEngineTerminated,
#[error("Finality proofs gossiping stream terminated")]
FinalityProofGossipStreamTerminated,
#[error("Finality stream terminated")]
FinalityStreamTerminated,
#[error("Votes gossiping stream terminated")]
VotesGossipStreamTerminated,
}

#[cfg(test)]
Expand Down
154 changes: 88 additions & 66 deletions client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
B: Block,
BE: Backend<B>,
C: Client<B, BE> + BlockBackend<B>,
P: PayloadProvider<B>,
P: PayloadProvider<B> + Clone,
R: ProvideRuntimeApi<B>,
R::Api: BeefyApi<B, AuthorityId> + MmrApi<B, MmrRootHash, NumberFor<B>>,
N: GossipNetwork<B> + NetworkRequest + Send + Sync + 'static,
Expand All @@ -237,7 +237,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
min_block_delta,
prometheus_registry,
links,
on_demand_justifications_handler,
mut on_demand_justifications_handler,
} = beefy_params;

let BeefyNetworkParams {
Expand All @@ -248,83 +248,105 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
..
} = network_params;

let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name,
gossip_validator.clone(),
None,
);
let metrics = register_metrics(prometheus_registry.clone());

// The `GossipValidator` adds and removes known peers based on valid votes and network events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name,
known_peers,
prometheus_registry.clone(),
);

// Subscribe to finality notifications and justifications before waiting for runtime pallet and
// reuse the streams, so we don't miss notifications while waiting for pallet to be available.
let mut finality_notifications = client.finality_notification_stream().fuse();
let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state =
match wait_for_runtime_pallet(&*runtime, &mut gossip_engine, &mut finality_notifications)
.await
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
// select recoverable errors.
loop {
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network
// events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name.clone(),
known_peers,
prometheus_registry.clone(),
);

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state = match wait_for_runtime_pallet(
&*runtime,
&mut gossip_engine,
&mut finality_notifications,
)
.await
.and_then(|(beefy_genesis, best_grandpa)| {
load_or_init_voter_state(
&*backend,
&*runtime,
beefy_genesis,
best_grandpa,
min_block_delta,
)
}) {
Ok(state) => state,
Err(e) => {
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
},
};
// Update the gossip validator with the right starting round and set id.
if let Err(e) = persisted_state
.gossip_filter_config()
.map(|f| gossip_validator.update_filter(f))
{
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
}
// Update the gossip validator with the right starting round and set id.
if let Err(e) = persisted_state
.gossip_filter_config()
.map(|f| gossip_validator.update_filter(f))
{
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
}

let worker = worker::BeefyWorker {
backend,
payload_provider,
runtime,
sync,
key_store: key_store.into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links,
metrics,
pending_justifications: BTreeMap::new(),
persisted_state,
};
let worker = worker::BeefyWorker {
backend: backend.clone(),
payload_provider: payload_provider.clone(),
runtime: runtime.clone(),
sync: sync.clone(),
key_store: key_store.clone().into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
links: links.clone(),
metrics: metrics.clone(),
pending_justifications: BTreeMap::new(),
persisted_state,
};

futures::future::select(
Box::pin(worker.run(block_import_justif, finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await;
match futures::future::select(
Box::pin(worker.run(&mut block_import_justif, &mut finality_notifications)),
Box::pin(on_demand_justifications_handler.run()),
)
.await
{
// On `ConsensusReset` error, just reinit and restart voter.
futures::future::Either::Left((error::Error::ConsensusReset, _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
continue
},
// On other errors, bring down / finish the task.
futures::future::Either::Left((worker_err, _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
futures::future::Either::Right((odj_handler_err, _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
};
return
}
}

fn load_or_init_voter_state<B, BE, R>(
Expand Down
42 changes: 15 additions & 27 deletions client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -447,11 +447,7 @@ where
.ok()
.flatten()
.filter(|genesis| *genesis == self.persisted_state.pallet_genesis)
.ok_or_else(|| {
let err = Error::ConsensusReset;
error!(target: LOG_TARGET, "🥩 Error: {}", err);
err
})?;
.ok_or(Error::ConsensusReset)?;

if *header.number() > self.best_grandpa_block() {
// update best GRANDPA finalized block we have seen
Expand Down Expand Up @@ -795,11 +791,12 @@ where
/// Main loop for BEEFY worker.
///
/// Run the main async loop which is driven by finality notifications and gossiped votes.
/// Should never end, returns `Error` otherwise.
pub(crate) async fn run(
mut self,
mut block_import_justif: Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
mut finality_notifications: Fuse<FinalityNotifications<B>>,
) {
block_import_justif: &mut Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
finality_notifications: &mut Fuse<FinalityNotifications<B>>,
) -> Error {
info!(
target: LOG_TARGET,
"🥩 run BEEFY worker, best grandpa: #{:?}.",
Expand Down Expand Up @@ -848,17 +845,17 @@ where
// Use `select_biased!` to prioritize order below.
// Process finality notifications first since these drive the voter.
notification = finality_notifications.next() => {
if notification.and_then(|notif| {
self.handle_finality_notification(&notif).ok()
}).is_none() {
error!(target: LOG_TARGET, "🥩 Finality stream terminated, closing worker.");
return;
if let Some(notif) = notification {
if let Err(err) = self.handle_finality_notification(&notif) {
return err;
}
} else {
return Error::FinalityStreamTerminated;
}
},
// Make sure to pump gossip engine.
_ = gossip_engine => {
error!(target: LOG_TARGET, "🥩 Gossip engine has terminated, closing worker.");
return;
return Error::GossipEngineTerminated;
},
// Process incoming justifications as these can make some in-flight votes obsolete.
response_info = self.on_demand_justifications.next().fuse() => {
Expand All @@ -881,8 +878,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(target: LOG_TARGET, "🥩 Block import stream terminated, closing worker.");
return;
return Error::BlockImportStreamTerminated;
}
},
justif = gossip_proofs.next() => {
Expand All @@ -892,11 +888,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(
target: LOG_TARGET,
"🥩 Finality proofs gossiping stream terminated, closing worker."
);
return;
return Error::FinalityProofGossipStreamTerminated;
}
},
// Finally process incoming votes.
Expand All @@ -907,11 +899,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
error!(
target: LOG_TARGET,
"🥩 Votes gossiping stream terminated, closing worker."
);
return;
return Error::VotesGossipStreamTerminated;
}
},
// Process peer reports.
Expand Down
6 changes: 6 additions & 0 deletions primitives/consensus/beefy/src/mmr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ mod mmr_root_provider {
_phantom: PhantomData<B>,
}

impl<B, R> Clone for MmrRootProvider<B, R> {
fn clone(&self) -> Self {
Self { runtime: self.runtime.clone(), _phantom: PhantomData }
}
}

impl<B, R> MmrRootProvider<B, R>
where
B: Block,
Expand Down