From 562557a9489d25e398bfc7c2b02138d99da11ea9 Mon Sep 17 00:00:00 2001 From: Adrian Catangiu Date: Tue, 29 Aug 2023 18:47:05 +0300 Subject: [PATCH] sc-consensus-beefy: reuse instead of recreate GossipEngine (#1262) "sc-consensus-beefy: restart voter on pallet reset #14821" introduced a mechanism to reinitialize the BEEFY worker on certain errors; but re-creating the GossipEngine doesn't play well with "Rework the event system of sc-network #14197". So this PR slightly changes the re-initialization logic to reuse the original GossipEngine and not recreate it. Signed-off-by: Adrian Catangiu --- substrate/client/consensus/beefy/src/lib.rs | 66 ++++++++------- .../client/consensus/beefy/src/worker.rs | 83 ++++++++++++------- 2 files changed, 87 insertions(+), 62 deletions(-) diff --git a/substrate/client/consensus/beefy/src/lib.rs b/substrate/client/consensus/beefy/src/lib.rs index 0b3baa007c1c..72df3cab8550 100644 --- a/substrate/client/consensus/beefy/src/lib.rs +++ b/substrate/client/consensus/beefy/src/lib.rs @@ -255,36 +255,42 @@ pub async fn start_beefy_gadget( let mut finality_notifications = client.finality_notification_stream().fuse(); let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); + let known_peers = Arc::new(Mutex::new(KnownPeers::new())); + // Default votes filter is to discard everything. + // Validator is updated later with correct starting round and set id. + let (gossip_validator, gossip_report_stream) = + communication::gossip::GossipValidator::new(known_peers.clone()); + let gossip_validator = Arc::new(gossip_validator); + let gossip_engine = GossipEngine::new( + network.clone(), + sync.clone(), + gossip_protocol_name.clone(), + gossip_validator.clone(), + None, + ); + + // The `GossipValidator` adds and removes known peers based on valid votes and network + // events. + let on_demand_justifications = OnDemandJustificationsEngine::new( + network.clone(), + justifications_protocol_name.clone(), + known_peers, + prometheus_registry.clone(), + ); + let mut beefy_comms = worker::BeefyComms { + gossip_engine, + gossip_validator, + gossip_report_stream, + on_demand_justifications, + }; + // We re-create and re-run the worker in this loop in order to quickly reinit and resume after // select recoverable errors. loop { - let known_peers = Arc::new(Mutex::new(KnownPeers::new())); - // Default votes filter is to discard everything. - // Validator is updated later with correct starting round and set id. - let (gossip_validator, gossip_report_stream) = - communication::gossip::GossipValidator::new(known_peers.clone()); - let gossip_validator = Arc::new(gossip_validator); - let mut gossip_engine = GossipEngine::new( - network.clone(), - sync.clone(), - gossip_protocol_name.clone(), - gossip_validator.clone(), - None, - ); - - // The `GossipValidator` adds and removes known peers based on valid votes and network - // events. - let on_demand_justifications = OnDemandJustificationsEngine::new( - network.clone(), - justifications_protocol_name.clone(), - known_peers, - prometheus_registry.clone(), - ); - // Wait for BEEFY pallet to be active before starting voter. let persisted_state = match wait_for_runtime_pallet( &*runtime, - &mut gossip_engine, + &mut beefy_comms.gossip_engine, &mut finality_notifications, ) .await @@ -306,7 +312,7 @@ pub async fn start_beefy_gadget( // Update the gossip validator with the right starting round and set id. if let Err(e) = persisted_state .gossip_filter_config() - .map(|f| gossip_validator.update_filter(f)) + .map(|f| beefy_comms.gossip_validator.update_filter(f)) { error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e); return @@ -318,10 +324,7 @@ pub async fn start_beefy_gadget( runtime: runtime.clone(), sync: sync.clone(), key_store: key_store.clone().into(), - gossip_engine, - gossip_validator, - gossip_report_stream, - on_demand_justifications, + comms: beefy_comms, links: links.clone(), metrics: metrics.clone(), pending_justifications: BTreeMap::new(), @@ -335,12 +338,13 @@ pub async fn start_beefy_gadget( .await { // On `ConsensusReset` error, just reinit and restart voter. - futures::future::Either::Left((error::Error::ConsensusReset, _)) => { + futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => { error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset); + beefy_comms = reuse_comms; continue }, // On other errors, bring down / finish the task. - futures::future::Either::Left((worker_err, _)) => + futures::future::Either::Left(((worker_err, _), _)) => error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err), futures::future::Either::Right((odj_handler_err, _)) => error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err), diff --git a/substrate/client/consensus/beefy/src/worker.rs b/substrate/client/consensus/beefy/src/worker.rs index 0d3845a27036..a239e34030c2 100644 --- a/substrate/client/consensus/beefy/src/worker.rs +++ b/substrate/client/consensus/beefy/src/worker.rs @@ -313,6 +313,16 @@ impl PersistedState { } } +/// Helper object holding BEEFY worker communication/gossip components. +/// +/// These are created once, but will be reused if worker is restarted/reinitialized. +pub(crate) struct BeefyComms { + pub gossip_engine: GossipEngine, + pub gossip_validator: Arc>, + pub gossip_report_stream: TracingUnboundedReceiver, + pub on_demand_justifications: OnDemandJustificationsEngine, +} + /// A BEEFY worker plays the BEEFY protocol pub(crate) struct BeefyWorker { // utilities @@ -322,11 +332,8 @@ pub(crate) struct BeefyWorker { pub sync: Arc, pub key_store: BeefyKeystore, - // communication - pub gossip_engine: GossipEngine, - pub gossip_validator: Arc>, - pub gossip_report_stream: TracingUnboundedReceiver, - pub on_demand_justifications: OnDemandJustificationsEngine, + // communication (created once, but returned and reused if worker is restarted/reinitialized) + pub comms: BeefyComms, // channels /// Links between the block importer, the background voter and the RPC layer. @@ -475,7 +482,7 @@ where if let Err(e) = self .persisted_state .gossip_filter_config() - .map(|filter| self.gossip_validator.update_filter(filter)) + .map(|filter| self.comms.gossip_validator.update_filter(filter)) { error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e); } @@ -495,7 +502,11 @@ where if let Some(finality_proof) = self.handle_vote(vote)? { let gossip_proof = GossipMessage::::FinalityProof(finality_proof); let encoded_proof = gossip_proof.encode(); - self.gossip_engine.gossip_message(proofs_topic::(), encoded_proof, true); + self.comms.gossip_engine.gossip_message( + proofs_topic::(), + encoded_proof, + true, + ); }, RoundAction::Drop => metric_inc!(self, beefy_stale_votes), RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote), @@ -603,7 +614,7 @@ where metric_set!(self, beefy_best_block, block_num); - self.on_demand_justifications.cancel_requests_older_than(block_num); + self.comms.on_demand_justifications.cancel_requests_older_than(block_num); if let Err(e) = self .backend @@ -632,7 +643,7 @@ where // Update gossip validator votes filter. self.persisted_state .gossip_filter_config() - .map(|filter| self.gossip_validator.update_filter(filter))?; + .map(|filter| self.comms.gossip_validator.update_filter(filter))?; Ok(()) } @@ -752,12 +763,14 @@ where err })? { let encoded_proof = GossipMessage::::FinalityProof(finality_proof).encode(); - self.gossip_engine.gossip_message(proofs_topic::(), encoded_proof, true); + self.comms + .gossip_engine + .gossip_message(proofs_topic::(), encoded_proof, true); } else { metric_inc!(self, beefy_votes_sent); debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote); let encoded_vote = GossipMessage::::Vote(vote).encode(); - self.gossip_engine.gossip_message(votes_topic::(), encoded_vote, false); + self.comms.gossip_engine.gossip_message(votes_topic::(), encoded_vote, false); } // Persist state after vote to avoid double voting in case of voter restarts. @@ -783,7 +796,7 @@ where // make sure there's also an on-demand justification request out for it. if let Some((block, active)) = self.voting_oracle().mandatory_pending() { // This only starts new request if there isn't already an active one. - self.on_demand_justifications.request(block, active); + self.comms.on_demand_justifications.request(block, active); } } } @@ -796,7 +809,7 @@ where mut self, block_import_justif: &mut Fuse>>, finality_notifications: &mut Fuse>, - ) -> Error { + ) -> (Error, BeefyComms) { info!( target: LOG_TARGET, "🥩 run BEEFY worker, best grandpa: #{:?}.", @@ -804,7 +817,8 @@ where ); let mut votes = Box::pin( - self.gossip_engine + self.comms + .gossip_engine .messages_for(votes_topic::()) .filter_map(|notification| async move { let vote = GossipMessage::::decode_all(&mut ¬ification.message[..]) @@ -816,7 +830,8 @@ where .fuse(), ); let mut gossip_proofs = Box::pin( - self.gossip_engine + self.comms + .gossip_engine .messages_for(proofs_topic::()) .filter_map(|notification| async move { let proof = GossipMessage::::decode_all(&mut ¬ification.message[..]) @@ -828,12 +843,12 @@ where .fuse(), ); - loop { + let error = loop { // Act on changed 'state'. self.process_new_state(); // Mutable reference used to drive the gossip engine. - let mut gossip_engine = &mut self.gossip_engine; + let mut gossip_engine = &mut self.comms.gossip_engine; // Use temp val and report after async section, // to avoid having to Mutex-wrap `gossip_engine`. let mut gossip_report: Option = None; @@ -847,18 +862,18 @@ where notification = finality_notifications.next() => { if let Some(notif) = notification { if let Err(err) = self.handle_finality_notification(¬if) { - return err; + break err; } } else { - return Error::FinalityStreamTerminated; + break Error::FinalityStreamTerminated; } }, // Make sure to pump gossip engine. _ = gossip_engine => { - return Error::GossipEngineTerminated; + break Error::GossipEngineTerminated; }, // Process incoming justifications as these can make some in-flight votes obsolete. - response_info = self.on_demand_justifications.next().fuse() => { + response_info = self.comms.on_demand_justifications.next().fuse() => { match response_info { ResponseInfo::ValidProof(justif, peer_report) => { if let Err(err) = self.triage_incoming_justif(justif) { @@ -878,7 +893,7 @@ where debug!(target: LOG_TARGET, "🥩 {}", err); } } else { - return Error::BlockImportStreamTerminated; + break Error::BlockImportStreamTerminated; } }, justif = gossip_proofs.next() => { @@ -888,7 +903,7 @@ where debug!(target: LOG_TARGET, "🥩 {}", err); } } else { - return Error::FinalityProofGossipStreamTerminated; + break Error::FinalityProofGossipStreamTerminated; } }, // Finally process incoming votes. @@ -899,18 +914,21 @@ where debug!(target: LOG_TARGET, "🥩 {}", err); } } else { - return Error::VotesGossipStreamTerminated; + break Error::VotesGossipStreamTerminated; } }, // Process peer reports. - report = self.gossip_report_stream.next() => { + report = self.comms.gossip_report_stream.next() => { gossip_report = report; }, } if let Some(PeerReport { who, cost_benefit }) = gossip_report { - self.gossip_engine.report(who, cost_benefit); + self.comms.gossip_engine.report(who, cost_benefit); } - } + }; + + // return error _and_ `comms` that can be reused + (error, self.comms) } /// Report the given equivocation to the BEEFY runtime module. This method @@ -1146,18 +1164,21 @@ pub(crate) mod tests { ) .unwrap(); let payload_provider = MmrRootProvider::new(api.clone()); + let comms = BeefyComms { + gossip_engine, + gossip_validator, + gossip_report_stream, + on_demand_justifications, + }; BeefyWorker { backend, payload_provider, runtime: api, key_store: Some(keystore).into(), links, - gossip_engine, - gossip_validator, - gossip_report_stream, + comms, metrics, sync: Arc::new(sync), - on_demand_justifications, pending_justifications: BTreeMap::new(), persisted_state, }