Skip to content

Commit

Permalink
sc-consensus-beefy: reuse instead of recreate GossipEngine (#1262)
Browse files Browse the repository at this point in the history
"sc-consensus-beefy: restart voter on pallet reset #14821" introduced
a mechanism to reinitialize the BEEFY worker on certain errors; but
re-creating the GossipEngine doesn't play well with
"Rework the event system of sc-network #14197".

So this PR slightly changes the re-initialization logic to reuse the original
GossipEngine and not recreate it.

Signed-off-by: Adrian Catangiu <adrian@parity.io>
  • Loading branch information
acatangiu authored and Daanvdplas committed Sep 11, 2023
1 parent 5b80211 commit 3750d85
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 62 deletions.
66 changes: 35 additions & 31 deletions substrate/client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,36 +255,42 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let mut finality_notifications = client.finality_notification_stream().fuse();
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network
// events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name.clone(),
known_peers,
prometheus_registry.clone(),
);
let mut beefy_comms = worker::BeefyComms {
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
};

// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
// select recoverable errors.
loop {
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network
// events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name.clone(),
known_peers,
prometheus_registry.clone(),
);

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state = match wait_for_runtime_pallet(
&*runtime,
&mut gossip_engine,
&mut beefy_comms.gossip_engine,
&mut finality_notifications,
)
.await
Expand All @@ -306,7 +312,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
// Update the gossip validator with the right starting round and set id.
if let Err(e) = persisted_state
.gossip_filter_config()
.map(|f| gossip_validator.update_filter(f))
.map(|f| beefy_comms.gossip_validator.update_filter(f))
{
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
Expand All @@ -318,10 +324,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
runtime: runtime.clone(),
sync: sync.clone(),
key_store: key_store.clone().into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
comms: beefy_comms,
links: links.clone(),
metrics: metrics.clone(),
pending_justifications: BTreeMap::new(),
Expand All @@ -335,12 +338,13 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
.await
{
// On `ConsensusReset` error, just reinit and restart voter.
futures::future::Either::Left((error::Error::ConsensusReset, _)) => {
futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
beefy_comms = reuse_comms;
continue
},
// On other errors, bring down / finish the task.
futures::future::Either::Left((worker_err, _)) =>
futures::future::Either::Left(((worker_err, _), _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
futures::future::Either::Right((odj_handler_err, _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
Expand Down
83 changes: 52 additions & 31 deletions substrate/client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ impl<B: Block> PersistedState<B> {
}
}

/// Helper object holding BEEFY worker communication/gossip components.
///
/// These are created once, but will be reused if worker is restarted/reinitialized.
pub(crate) struct BeefyComms<B: Block> {
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
}

/// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
// utilities
Expand All @@ -322,11 +332,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
pub sync: Arc<S>,
pub key_store: BeefyKeystore,

// communication
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
// communication (created once, but returned and reused if worker is restarted/reinitialized)
pub comms: BeefyComms<B>,

// channels
/// Links between the block importer, the background voter and the RPC layer.
Expand Down Expand Up @@ -475,7 +482,7 @@ where
if let Err(e) = self
.persisted_state
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))
.map(|filter| self.comms.gossip_validator.update_filter(filter))
{
error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
}
Expand All @@ -495,7 +502,11 @@ where
if let Some(finality_proof) = self.handle_vote(vote)? {
let gossip_proof = GossipMessage::<B>::FinalityProof(finality_proof);
let encoded_proof = gossip_proof.encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
self.comms.gossip_engine.gossip_message(
proofs_topic::<B>(),
encoded_proof,
true,
);
},
RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
Expand Down Expand Up @@ -603,7 +614,7 @@ where

metric_set!(self, beefy_best_block, block_num);

self.on_demand_justifications.cancel_requests_older_than(block_num);
self.comms.on_demand_justifications.cancel_requests_older_than(block_num);

if let Err(e) = self
.backend
Expand Down Expand Up @@ -632,7 +643,7 @@ where
// Update gossip validator votes filter.
self.persisted_state
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))?;
.map(|filter| self.comms.gossip_validator.update_filter(filter))?;
Ok(())
}

Expand Down Expand Up @@ -752,12 +763,14 @@ where
err
})? {
let encoded_proof = GossipMessage::<B>::FinalityProof(finality_proof).encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
self.comms
.gossip_engine
.gossip_message(proofs_topic::<B>(), encoded_proof, true);
} else {
metric_inc!(self, beefy_votes_sent);
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
self.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
}

// Persist state after vote to avoid double voting in case of voter restarts.
Expand All @@ -783,7 +796,7 @@ where
// make sure there's also an on-demand justification request out for it.
if let Some((block, active)) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block, active);
self.comms.on_demand_justifications.request(block, active);
}
}
}
Expand All @@ -796,15 +809,16 @@ where
mut self,
block_import_justif: &mut Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
finality_notifications: &mut Fuse<FinalityNotifications<B>>,
) -> Error {
) -> (Error, BeefyComms<B>) {
info!(
target: LOG_TARGET,
"🥩 run BEEFY worker, best grandpa: #{:?}.",
self.best_grandpa_block()
);

let mut votes = Box::pin(
self.gossip_engine
self.comms
.gossip_engine
.messages_for(votes_topic::<B>())
.filter_map(|notification| async move {
let vote = GossipMessage::<B>::decode_all(&mut &notification.message[..])
Expand All @@ -816,7 +830,8 @@ where
.fuse(),
);
let mut gossip_proofs = Box::pin(
self.gossip_engine
self.comms
.gossip_engine
.messages_for(proofs_topic::<B>())
.filter_map(|notification| async move {
let proof = GossipMessage::<B>::decode_all(&mut &notification.message[..])
Expand All @@ -828,12 +843,12 @@ where
.fuse(),
);

loop {
let error = loop {
// Act on changed 'state'.
self.process_new_state();

// Mutable reference used to drive the gossip engine.
let mut gossip_engine = &mut self.gossip_engine;
let mut gossip_engine = &mut self.comms.gossip_engine;
// Use temp val and report after async section,
// to avoid having to Mutex-wrap `gossip_engine`.
let mut gossip_report: Option<PeerReport> = None;
Expand All @@ -847,18 +862,18 @@ where
notification = finality_notifications.next() => {
if let Some(notif) = notification {
if let Err(err) = self.handle_finality_notification(&notif) {
return err;
break err;
}
} else {
return Error::FinalityStreamTerminated;
break Error::FinalityStreamTerminated;
}
},
// Make sure to pump gossip engine.
_ = gossip_engine => {
return Error::GossipEngineTerminated;
break Error::GossipEngineTerminated;
},
// Process incoming justifications as these can make some in-flight votes obsolete.
response_info = self.on_demand_justifications.next().fuse() => {
response_info = self.comms.on_demand_justifications.next().fuse() => {
match response_info {
ResponseInfo::ValidProof(justif, peer_report) => {
if let Err(err) = self.triage_incoming_justif(justif) {
Expand All @@ -878,7 +893,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::BlockImportStreamTerminated;
break Error::BlockImportStreamTerminated;
}
},
justif = gossip_proofs.next() => {
Expand All @@ -888,7 +903,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::FinalityProofGossipStreamTerminated;
break Error::FinalityProofGossipStreamTerminated;
}
},
// Finally process incoming votes.
Expand All @@ -899,18 +914,21 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::VotesGossipStreamTerminated;
break Error::VotesGossipStreamTerminated;
}
},
// Process peer reports.
report = self.gossip_report_stream.next() => {
report = self.comms.gossip_report_stream.next() => {
gossip_report = report;
},
}
if let Some(PeerReport { who, cost_benefit }) = gossip_report {
self.gossip_engine.report(who, cost_benefit);
self.comms.gossip_engine.report(who, cost_benefit);
}
}
};

// return error _and_ `comms` that can be reused
(error, self.comms)
}

/// Report the given equivocation to the BEEFY runtime module. This method
Expand Down Expand Up @@ -1146,18 +1164,21 @@ pub(crate) mod tests {
)
.unwrap();
let payload_provider = MmrRootProvider::new(api.clone());
let comms = BeefyComms {
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
};
BeefyWorker {
backend,
payload_provider,
runtime: api,
key_store: Some(keystore).into(),
links,
gossip_engine,
gossip_validator,
gossip_report_stream,
comms,
metrics,
sync: Arc::new(sync),
on_demand_justifications,
pending_justifications: BTreeMap::new(),
persisted_state,
}
Expand Down

0 comments on commit 3750d85

Please sign in to comment.