From 380356ec0208692f93bfd0cefe77aceddb2d2b6b Mon Sep 17 00:00:00 2001 From: Chris Sosnin <48099298+slumber@users.noreply.github.com> Date: Thu, 19 May 2022 14:01:22 +0300 Subject: [PATCH] bitfield-signing: remove util::jobs usage (#5523) --- node/core/bitfield-signing/src/lib.rs | 271 ++++++++++++++---------- node/core/bitfield-signing/src/tests.rs | 2 +- node/service/src/overseer.rs | 8 +- 3 files changed, 158 insertions(+), 123 deletions(-) diff --git a/node/core/bitfield-signing/src/lib.rs b/node/core/bitfield-signing/src/lib.rs index 3b082de92109..ad9f93f3d598 100644 --- a/node/core/bitfield-signing/src/lib.rs +++ b/node/core/bitfield-signing/src/lib.rs @@ -24,22 +24,21 @@ use futures::{ channel::{mpsc, oneshot}, future, lock::Mutex, - prelude::*, - Future, + FutureExt, }; use polkadot_node_subsystem::{ errors::RuntimeApiError, jaeger, messages::{ - AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage, - RuntimeApiMessage, RuntimeApiRequest, + AvailabilityStoreMessage, BitfieldDistributionMessage, RuntimeApiMessage, RuntimeApiRequest, }, - overseer, ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender, + overseer, ActivatedLeaf, FromOverseer, LeafStatus, OverseerSignal, PerLeafSpan, + SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender, }; -use polkadot_node_subsystem_util::{self as util, JobSender, JobSubsystem, JobTrait, Validator}; +use polkadot_node_subsystem_util::{self as util, Validator}; use polkadot_primitives::v2::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex}; use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr}; -use std::{iter::FromIterator, pin::Pin, time::Duration}; +use std::{collections::HashMap, iter::FromIterator, time::Duration}; use wasm_timer::{Delay, Instant}; mod metrics; @@ -49,12 +48,10 @@ use self::metrics::Metrics; mod tests; /// Delay between starting a bitfield signing job and its attempting to create a bitfield. -const JOB_DELAY: Duration = Duration::from_millis(1500); +const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500); const LOG_TARGET: &str = "parachain::bitfield-signing"; -/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent. -pub struct BitfieldSigningJob(std::marker::PhantomData); - +// TODO: use `fatality` (https://github.com/paritytech/polkadot/issues/5540). /// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`. #[derive(Debug, thiserror::Error)] #[allow(missing_docs)] @@ -182,114 +179,156 @@ async fn construct_availability_bitfield( Ok(AvailabilityBitfield(core_bits)) } -impl JobTrait for BitfieldSigningJob +/// The bitfield signing subsystem. +pub struct BitfieldSigningSubsystem { + keystore: SyncCryptoStorePtr, + metrics: Metrics, +} + +impl BitfieldSigningSubsystem { + /// Create a new instance of the `BitfieldSigningSubsystem`. + pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self { + Self { keystore, metrics } + } +} + +#[overseer::subsystem(BitfieldSigning, error=SubsystemError, prefix=self::overseer)] +impl BitfieldSigningSubsystem { + fn start(self, ctx: Context) -> SpawnedSubsystem { + let future = async move { + run(ctx, self.keystore, self.metrics) + .await + .map_err(|e| SubsystemError::with_origin("bitfield-signing", e)) + } + .boxed(); + + SpawnedSubsystem { name: "bitfield-signing-subsystem", future } + } +} + +#[overseer::contextbounds(BitfieldSigning, prefix = self::overseer)] +async fn run( + mut ctx: Context, + keystore: SyncCryptoStorePtr, + metrics: Metrics, +) -> SubsystemResult<()> { + // Track spawned jobs per active leaf. + let mut running = HashMap::::new(); + + loop { + match ctx.recv().await? { + FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { + // Abort jobs for deactivated leaves. + for leaf in &update.deactivated { + if let Some(handle) = running.remove(leaf) { + handle.abort(); + } + } + + for leaf in update.activated { + let sender = ctx.sender().clone(); + let leaf_hash = leaf.hash; + + let (fut, handle) = future::abortable(handle_active_leaves_update( + sender, + leaf, + keystore.clone(), + metrics.clone(), + )); + + running.insert(leaf_hash, handle); + + ctx.spawn("bitfield-signing-job", fut.map(drop).boxed())?; + } + }, + FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {}, + FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()), + FromOverseer::Communication { .. } => {}, + } + } +} + +async fn handle_active_leaves_update( + mut sender: Sender, + leaf: ActivatedLeaf, + keystore: SyncCryptoStorePtr, + metrics: Metrics, +) -> Result<(), Error> where - Sender: overseer::BitfieldSigningSenderTrait + Unpin, + Sender: overseer::BitfieldSigningSenderTrait, { - type ToJob = BitfieldSigningMessage; - type OutgoingMessages = overseer::BitfieldSigningOutgoingMessages; - type Sender = Sender; - type Error = Error; - type RunArgs = SyncCryptoStorePtr; - type Metrics = Metrics; - - const NAME: &'static str = "bitfield-signing-job"; - - /// Run a job for the parent block indicated - fn run( - leaf: ActivatedLeaf, - keystore: Self::RunArgs, - metrics: Self::Metrics, - _receiver: mpsc::Receiver, - mut sender: JobSender, - ) -> Pin> + Send>> { - let metrics = metrics.clone(); - async move { - if let LeafStatus::Stale = leaf.status { - gum::debug!( + if let LeafStatus::Stale = leaf.status { + gum::debug!( + target: LOG_TARGET, + relay_parent = ?leaf.hash, + block_number = ?leaf.number, + "Skip bitfield signing for stale leaf" + ); + return Ok(()) + } + + let span = PerLeafSpan::new(leaf.span, "bitfield-signing"); + let span_delay = span.child("delay"); + let wait_until = Instant::now() + SPAWNED_TASK_DELAY; + + // now do all the work we can before we need to wait for the availability store + // if we're not a validator, we can just succeed effortlessly + let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await { + Ok(validator) => validator, + Err(util::Error::NotAValidator) => return Ok(()), + Err(err) => return Err(Error::Util(err)), + }; + + // wait a bit before doing anything else + Delay::new_at(wait_until).await?; + + // this timer does not appear at the head of the function because we don't want to include + // SPAWNED_TASK_DELAY each time. + let _timer = metrics.time_run(); + + drop(span_delay); + let span_availability = span.child("availability"); + + let bitfield = match construct_availability_bitfield( + leaf.hash, + &span_availability, + validator.index(), + &mut sender, + ) + .await + { + Err(Error::Runtime(runtime_err)) => { + // Don't take down the node on runtime API errors. + gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error"); + return Ok(()) + }, + Err(err) => return Err(err), + Ok(bitfield) => bitfield, + }; + + drop(span_availability); + let span_signing = span.child("signing"); + + let signed_bitfield = + match validator.sign(keystore, bitfield).await.map_err(|e| Error::Keystore(e))? { + Some(b) => b, + None => { + gum::error!( target: LOG_TARGET, - hash = ?leaf.hash, - block_number = ?leaf.number, - "Stale leaf - don't sign bitfields." + "Key was found at construction, but while signing it could not be found.", ); return Ok(()) - } - - let span = PerLeafSpan::new(leaf.span, "bitfield-signing"); - let _span = span.child("delay"); - let wait_until = Instant::now() + JOB_DELAY; - - // now do all the work we can before we need to wait for the availability store - // if we're not a validator, we can just succeed effortlessly - let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await { - Ok(validator) => validator, - Err(util::Error::NotAValidator) => return Ok(()), - Err(err) => return Err(Error::Util(err)), - }; - - // wait a bit before doing anything else - Delay::new_at(wait_until).await?; - - // this timer does not appear at the head of the function because we don't want to include - // JOB_DELAY each time. - let _timer = metrics.time_run(); - - drop(_span); - let span_availability = span.child("availability"); - - let bitfield = match construct_availability_bitfield( - leaf.hash, - &span_availability, - validator.index(), - sender.subsystem_sender(), - ) - .await - { - Err(Error::Runtime(runtime_err)) => { - // Don't take down the node on runtime API errors. - gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error"); - return Ok(()) - }, - Err(err) => return Err(err), - Ok(bitfield) => bitfield, - }; - - drop(span_availability); - let _span = span.child("signing"); - - let signed_bitfield = match validator - .sign(keystore.clone(), bitfield) - .await - .map_err(|e| Error::Keystore(e))? - { - Some(b) => b, - None => { - gum::error!( - target: LOG_TARGET, - "Key was found at construction, but while signing it could not be found.", - ); - return Ok(()) - }, - }; - - metrics.on_bitfield_signed(); - - drop(_span); - let _span = span.child("gossip"); - - sender - .send_message(BitfieldDistributionMessage::DistributeBitfield( - leaf.hash, - signed_bitfield, - )) - .await; - - Ok(()) - } - .boxed() - } -} + }, + }; + + metrics.on_bitfield_signed(); -/// `BitfieldSigningSubsystem` manages a number of bitfield signing jobs. -pub type BitfieldSigningSubsystem = - JobSubsystem, Spawner>; + drop(span_signing); + let _span_gossip = span.child("gossip"); + + sender + .send_message(BitfieldDistributionMessage::DistributeBitfield(leaf.hash, signed_bitfield)) + .await; + + Ok(()) +} diff --git a/node/core/bitfield-signing/src/tests.rs b/node/core/bitfield-signing/src/tests.rs index b2c0c4e174d9..19777c29157f 100644 --- a/node/core/bitfield-signing/src/tests.rs +++ b/node/core/bitfield-signing/src/tests.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use super::*; -use futures::{executor::block_on, pin_mut}; +use futures::{executor::block_on, pin_mut, StreamExt}; use polkadot_node_subsystem::messages::AllMessages; use polkadot_primitives::v2::{CandidateHash, OccupiedCore}; use test_helpers::dummy_candidate_descriptor; diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 2001076aa1db..06d47d7de0c5 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -26,7 +26,7 @@ use polkadot_node_core_chain_selection::Config as ChainSelectionConfig; use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig; use polkadot_node_core_provisioner::ProvisionerConfig; use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver}; -use polkadot_node_subsystem_types::messages::{BitfieldSigningMessage, ProvisionerMessage}; +use polkadot_node_subsystem_types::messages::ProvisionerMessage; #[cfg(any(feature = "malus", test))] pub use polkadot_overseer::{ dummy::{dummy_overseer_builder, DummySubsystem}, @@ -156,10 +156,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( StatementDistributionSubsystem, AvailabilityDistributionSubsystem, AvailabilityRecoverySubsystem, - BitfieldSigningSubsystem< - SpawnGlue, - as SubsystemContext>::Sender, - >, + BitfieldSigningSubsystem, BitfieldDistributionSubsystem, ProvisionerSubsystem< SpawnGlue, @@ -211,7 +208,6 @@ where )) .bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?)) .bitfield_signing(BitfieldSigningSubsystem::new( - spawner.clone(), keystore.clone(), Metrics::register(registry)?, ))