Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

sc-consensus-beefy: reuse instead of recreate GossipEngine #1262

Merged
merged 1 commit into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 35 additions & 31 deletions substrate/client/consensus/beefy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,36 +255,42 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
let mut finality_notifications = client.finality_notification_stream().fuse();
let mut block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse();

let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network
// events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name.clone(),
known_peers,
prometheus_registry.clone(),
);
let mut beefy_comms = worker::BeefyComms {
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
};

// We re-create and re-run the worker in this loop in order to quickly reinit and resume after
// select recoverable errors.
loop {
let known_peers = Arc::new(Mutex::new(KnownPeers::new()));
// Default votes filter is to discard everything.
// Validator is updated later with correct starting round and set id.
let (gossip_validator, gossip_report_stream) =
communication::gossip::GossipValidator::new(known_peers.clone());
let gossip_validator = Arc::new(gossip_validator);
let mut gossip_engine = GossipEngine::new(
network.clone(),
sync.clone(),
gossip_protocol_name.clone(),
gossip_validator.clone(),
None,
);

// The `GossipValidator` adds and removes known peers based on valid votes and network
// events.
let on_demand_justifications = OnDemandJustificationsEngine::new(
network.clone(),
justifications_protocol_name.clone(),
known_peers,
prometheus_registry.clone(),
);

// Wait for BEEFY pallet to be active before starting voter.
let persisted_state = match wait_for_runtime_pallet(
&*runtime,
&mut gossip_engine,
&mut beefy_comms.gossip_engine,
&mut finality_notifications,
)
.await
Expand All @@ -306,7 +312,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
// Update the gossip validator with the right starting round and set id.
if let Err(e) = persisted_state
.gossip_filter_config()
.map(|f| gossip_validator.update_filter(f))
.map(|f| beefy_comms.gossip_validator.update_filter(f))
{
error!(target: LOG_TARGET, "Error: {:?}. Terminating.", e);
return
Expand All @@ -318,10 +324,7 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
runtime: runtime.clone(),
sync: sync.clone(),
key_store: key_store.clone().into(),
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
comms: beefy_comms,
links: links.clone(),
metrics: metrics.clone(),
pending_justifications: BTreeMap::new(),
Expand All @@ -335,12 +338,13 @@ pub async fn start_beefy_gadget<B, BE, C, N, P, R, S>(
.await
{
// On `ConsensusReset` error, just reinit and restart voter.
futures::future::Either::Left((error::Error::ConsensusReset, _)) => {
futures::future::Either::Left(((error::Error::ConsensusReset, reuse_comms), _)) => {
error!(target: LOG_TARGET, "🥩 Error: {:?}. Restarting voter.", error::Error::ConsensusReset);
beefy_comms = reuse_comms;
continue
},
// On other errors, bring down / finish the task.
futures::future::Either::Left((worker_err, _)) =>
futures::future::Either::Left(((worker_err, _), _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", worker_err),
futures::future::Either::Right((odj_handler_err, _)) =>
error!(target: LOG_TARGET, "🥩 Error: {:?}. Terminating.", odj_handler_err),
Expand Down
83 changes: 52 additions & 31 deletions substrate/client/consensus/beefy/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,16 @@ impl<B: Block> PersistedState<B> {
}
}

/// Helper object holding BEEFY worker communication/gossip components.
///
/// These are created once, but will be reused if worker is restarted/reinitialized.
pub(crate) struct BeefyComms<B: Block> {
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
}

/// A BEEFY worker plays the BEEFY protocol
pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
// utilities
Expand All @@ -322,11 +332,8 @@ pub(crate) struct BeefyWorker<B: Block, BE, P, RuntimeApi, S> {
pub sync: Arc<S>,
pub key_store: BeefyKeystore,

// communication
pub gossip_engine: GossipEngine<B>,
pub gossip_validator: Arc<GossipValidator<B>>,
pub gossip_report_stream: TracingUnboundedReceiver<PeerReport>,
pub on_demand_justifications: OnDemandJustificationsEngine<B>,
// communication (created once, but returned and reused if worker is restarted/reinitialized)
pub comms: BeefyComms<B>,

// channels
/// Links between the block importer, the background voter and the RPC layer.
Expand Down Expand Up @@ -475,7 +482,7 @@ where
if let Err(e) = self
.persisted_state
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))
.map(|filter| self.comms.gossip_validator.update_filter(filter))
{
error!(target: LOG_TARGET, "🥩 Voter error: {:?}", e);
}
Expand All @@ -495,7 +502,11 @@ where
if let Some(finality_proof) = self.handle_vote(vote)? {
let gossip_proof = GossipMessage::<B>::FinalityProof(finality_proof);
let encoded_proof = gossip_proof.encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
self.comms.gossip_engine.gossip_message(
proofs_topic::<B>(),
encoded_proof,
true,
);
},
RoundAction::Drop => metric_inc!(self, beefy_stale_votes),
RoundAction::Enqueue => error!(target: LOG_TARGET, "🥩 unexpected vote: {:?}.", vote),
Expand Down Expand Up @@ -603,7 +614,7 @@ where

metric_set!(self, beefy_best_block, block_num);

self.on_demand_justifications.cancel_requests_older_than(block_num);
self.comms.on_demand_justifications.cancel_requests_older_than(block_num);

if let Err(e) = self
.backend
Expand Down Expand Up @@ -632,7 +643,7 @@ where
// Update gossip validator votes filter.
self.persisted_state
.gossip_filter_config()
.map(|filter| self.gossip_validator.update_filter(filter))?;
.map(|filter| self.comms.gossip_validator.update_filter(filter))?;
Ok(())
}

Expand Down Expand Up @@ -752,12 +763,14 @@ where
err
})? {
let encoded_proof = GossipMessage::<B>::FinalityProof(finality_proof).encode();
self.gossip_engine.gossip_message(proofs_topic::<B>(), encoded_proof, true);
self.comms
.gossip_engine
.gossip_message(proofs_topic::<B>(), encoded_proof, true);
} else {
metric_inc!(self, beefy_votes_sent);
debug!(target: LOG_TARGET, "🥩 Sent vote message: {:?}", vote);
let encoded_vote = GossipMessage::<B>::Vote(vote).encode();
self.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
self.comms.gossip_engine.gossip_message(votes_topic::<B>(), encoded_vote, false);
}

// Persist state after vote to avoid double voting in case of voter restarts.
Expand All @@ -783,7 +796,7 @@ where
// make sure there's also an on-demand justification request out for it.
if let Some((block, active)) = self.voting_oracle().mandatory_pending() {
// This only starts new request if there isn't already an active one.
self.on_demand_justifications.request(block, active);
self.comms.on_demand_justifications.request(block, active);
}
}
}
Expand All @@ -796,15 +809,16 @@ where
mut self,
block_import_justif: &mut Fuse<NotificationReceiver<BeefyVersionedFinalityProof<B>>>,
finality_notifications: &mut Fuse<FinalityNotifications<B>>,
) -> Error {
) -> (Error, BeefyComms<B>) {
info!(
target: LOG_TARGET,
"🥩 run BEEFY worker, best grandpa: #{:?}.",
self.best_grandpa_block()
);

let mut votes = Box::pin(
self.gossip_engine
self.comms
.gossip_engine
.messages_for(votes_topic::<B>())
.filter_map(|notification| async move {
let vote = GossipMessage::<B>::decode_all(&mut &notification.message[..])
Expand All @@ -816,7 +830,8 @@ where
.fuse(),
);
let mut gossip_proofs = Box::pin(
self.gossip_engine
self.comms
.gossip_engine
.messages_for(proofs_topic::<B>())
.filter_map(|notification| async move {
let proof = GossipMessage::<B>::decode_all(&mut &notification.message[..])
Expand All @@ -828,12 +843,12 @@ where
.fuse(),
);

loop {
let error = loop {
// Act on changed 'state'.
self.process_new_state();

// Mutable reference used to drive the gossip engine.
let mut gossip_engine = &mut self.gossip_engine;
let mut gossip_engine = &mut self.comms.gossip_engine;
// Use temp val and report after async section,
// to avoid having to Mutex-wrap `gossip_engine`.
let mut gossip_report: Option<PeerReport> = None;
Expand All @@ -847,18 +862,18 @@ where
notification = finality_notifications.next() => {
if let Some(notif) = notification {
if let Err(err) = self.handle_finality_notification(&notif) {
return err;
break err;
}
} else {
return Error::FinalityStreamTerminated;
break Error::FinalityStreamTerminated;
}
},
// Make sure to pump gossip engine.
_ = gossip_engine => {
return Error::GossipEngineTerminated;
break Error::GossipEngineTerminated;
},
// Process incoming justifications as these can make some in-flight votes obsolete.
response_info = self.on_demand_justifications.next().fuse() => {
response_info = self.comms.on_demand_justifications.next().fuse() => {
match response_info {
ResponseInfo::ValidProof(justif, peer_report) => {
if let Err(err) = self.triage_incoming_justif(justif) {
Expand All @@ -878,7 +893,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::BlockImportStreamTerminated;
break Error::BlockImportStreamTerminated;
}
},
justif = gossip_proofs.next() => {
Expand All @@ -888,7 +903,7 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::FinalityProofGossipStreamTerminated;
break Error::FinalityProofGossipStreamTerminated;
}
},
// Finally process incoming votes.
Expand All @@ -899,18 +914,21 @@ where
debug!(target: LOG_TARGET, "🥩 {}", err);
}
} else {
return Error::VotesGossipStreamTerminated;
break Error::VotesGossipStreamTerminated;
}
},
// Process peer reports.
report = self.gossip_report_stream.next() => {
report = self.comms.gossip_report_stream.next() => {
gossip_report = report;
},
}
if let Some(PeerReport { who, cost_benefit }) = gossip_report {
self.gossip_engine.report(who, cost_benefit);
self.comms.gossip_engine.report(who, cost_benefit);
}
}
};

// return error _and_ `comms` that can be reused
(error, self.comms)
}

/// Report the given equivocation to the BEEFY runtime module. This method
Expand Down Expand Up @@ -1146,18 +1164,21 @@ pub(crate) mod tests {
)
.unwrap();
let payload_provider = MmrRootProvider::new(api.clone());
let comms = BeefyComms {
gossip_engine,
gossip_validator,
gossip_report_stream,
on_demand_justifications,
};
BeefyWorker {
backend,
payload_provider,
runtime: api,
key_store: Some(keystore).into(),
links,
gossip_engine,
gossip_validator,
gossip_report_stream,
comms,
metrics,
sync: Arc::new(sync),
on_demand_justifications,
pending_justifications: BTreeMap::new(),
persisted_state,
}
Expand Down