Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

bitfield-signing: remove util::jobs usage #5523

Merged
merged 5 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 155 additions & 116 deletions node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@ use futures::{
channel::{mpsc, oneshot},
future,
lock::Mutex,
prelude::*,
Future,
FutureExt,
};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
jaeger,
messages::{
AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage,
RuntimeApiMessage, RuntimeApiRequest,
AvailabilityStoreMessage, BitfieldDistributionMessage, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender,
overseer, ActivatedLeaf, FromOverseer, LeafStatus, OverseerSignal, PerLeafSpan,
SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender,
};
use polkadot_node_subsystem_util::{self as util, JobSender, JobSubsystem, JobTrait, Validator};
use polkadot_node_subsystem_util::{self as util, Validator};
use polkadot_primitives::v2::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use std::{iter::FromIterator, pin::Pin, time::Duration};
use std::{collections::HashMap, iter::FromIterator, time::Duration};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FromIterator is part of the std prelude in rust 2021

use wasm_timer::{Delay, Instant};

mod metrics;
Expand All @@ -49,12 +48,10 @@ use self::metrics::Metrics;
mod tests;

/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
const JOB_DELAY: Duration = Duration::from_millis(1500);
const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500);
const LOG_TARGET: &str = "parachain::bitfield-signing";

/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent.
pub struct BitfieldSigningJob<Sender>(std::marker::PhantomData<Sender>);

// TODO: use `fatality` (https://github.com/paritytech/polkadot/issues/5540).
/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -182,114 +179,156 @@ async fn construct_availability_bitfield(
Ok(AvailabilityBitfield(core_bits))
}

impl<Sender> JobTrait for BitfieldSigningJob<Sender>
/// The bitfield signing subsystem.
pub struct BitfieldSigningSubsystem {
keystore: SyncCryptoStorePtr,
metrics: Metrics,
}

impl BitfieldSigningSubsystem {
/// Create a new instance of the `BitfieldSigningSubsystem`.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
}
}

#[overseer::subsystem(BitfieldSigning, error=SubsystemError, prefix=self::overseer)]
impl<Context> BitfieldSigningSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
run(ctx, self.keystore, self.metrics)
.await
.map_err(|e| SubsystemError::with_origin("bitfield-signing", e))
}
.boxed();

SpawnedSubsystem { name: "bitfield-signing-subsystem", future }
}
}

#[overseer::contextbounds(BitfieldSigning, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
) -> SubsystemResult<()> {
// Track spawned jobs per active leaf.
let mut running = HashMap::<Hash, future::AbortHandle>::new();

loop {
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
// Abort jobs for deactivated leaves.
for leaf in &update.deactivated {
if let Some(handle) = running.remove(leaf) {
handle.abort();
}
}

for leaf in update.activated {
let sender = ctx.sender().clone();
let leaf_hash = leaf.hash;

let (fut, handle) = future::abortable(handle_active_leaves_update(
sender,
leaf,
keystore.clone(),
metrics.clone(),
));

running.insert(leaf_hash, handle);

ctx.spawn("bitfield-signing-job", fut.map(drop).boxed())?;
}
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication { .. } => {},
}
}
}

async fn handle_active_leaves_update<Sender>(
mut sender: Sender,
leaf: ActivatedLeaf,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
) -> Result<(), Error>
where
Sender: overseer::BitfieldSigningSenderTrait + Unpin,
Sender: overseer::BitfieldSigningSenderTrait,
{
type ToJob = BitfieldSigningMessage;
type OutgoingMessages = overseer::BitfieldSigningOutgoingMessages;
type Sender = Sender;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;

const NAME: &'static str = "bitfield-signing-job";

/// Run a job for the parent block indicated
fn run(
leaf: ActivatedLeaf,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
mut sender: JobSender<Sender>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
if let LeafStatus::Stale = leaf.status {
gum::debug!(
if let LeafStatus::Stale = leaf.status {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
block_number = ?leaf.number,
"Skip bitfield signing for stale leaf"
);
return Ok(())
}

let span = PerLeafSpan::new(leaf.span, "bitfield-signing");
let span_delay = span.child("delay");
let wait_until = Instant::now() + SPAWNED_TASK_DELAY;

// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};

// wait a bit before doing anything else
Delay::new_at(wait_until).await?;

// this timer does not appear at the head of the function because we don't want to include
// SPAWNED_TASK_DELAY each time.
let _timer = metrics.time_run();

drop(span_delay);
let span_availability = span.child("availability");

let bitfield = match construct_availability_bitfield(
leaf.hash,
&span_availability,
validator.index(),
&mut sender,
)
.await
{
Err(Error::Runtime(runtime_err)) => {
slumber marked this conversation as resolved.
Show resolved Hide resolved
// Don't take down the node on runtime API errors.
gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(())
},
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};

drop(span_availability);
let span_signing = span.child("signing");

let signed_bitfield =
match validator.sign(keystore, bitfield).await.map_err(|e| Error::Keystore(e))? {
Some(b) => b,
None => {
gum::error!(
target: LOG_TARGET,
hash = ?leaf.hash,
block_number = ?leaf.number,
"Stale leaf - don't sign bitfields."
"Key was found at construction, but while signing it could not be found.",
);
return Ok(())
}

let span = PerLeafSpan::new(leaf.span, "bitfield-signing");
let _span = span.child("delay");
let wait_until = Instant::now() + JOB_DELAY;

// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};

// wait a bit before doing anything else
Delay::new_at(wait_until).await?;

// this timer does not appear at the head of the function because we don't want to include
// JOB_DELAY each time.
let _timer = metrics.time_run();

drop(_span);
let span_availability = span.child("availability");

let bitfield = match construct_availability_bitfield(
leaf.hash,
&span_availability,
validator.index(),
sender.subsystem_sender(),
)
.await
{
Err(Error::Runtime(runtime_err)) => {
// Don't take down the node on runtime API errors.
gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(())
},
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};

drop(span_availability);
let _span = span.child("signing");

let signed_bitfield = match validator
.sign(keystore.clone(), bitfield)
.await
.map_err(|e| Error::Keystore(e))?
{
Some(b) => b,
None => {
gum::error!(
target: LOG_TARGET,
"Key was found at construction, but while signing it could not be found.",
);
return Ok(())
},
};

metrics.on_bitfield_signed();

drop(_span);
let _span = span.child("gossip");

sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(
leaf.hash,
signed_bitfield,
))
.await;

Ok(())
}
.boxed()
}
}
},
};

metrics.on_bitfield_signed();

/// `BitfieldSigningSubsystem` manages a number of bitfield signing jobs.
pub type BitfieldSigningSubsystem<Spawner, Sender> =
JobSubsystem<BitfieldSigningJob<Sender>, Spawner>;
drop(span_signing);
let _span_gossip = span.child("gossip");

sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(leaf.hash, signed_bitfield))
.await;

Ok(())
}
2 changes: 1 addition & 1 deletion node/core/bitfield-signing/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use super::*;
use futures::{executor::block_on, pin_mut};
use futures::{executor::block_on, pin_mut, StreamExt};
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_primitives::v2::{CandidateHash, OccupiedCore};
use test_helpers::dummy_candidate_descriptor;
Expand Down
8 changes: 2 additions & 6 deletions node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
use polkadot_node_core_provisioner::ProvisionerConfig;
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
use polkadot_node_subsystem_types::messages::{BitfieldSigningMessage, ProvisionerMessage};
use polkadot_node_subsystem_types::messages::ProvisionerMessage;
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::{
dummy::{dummy_overseer_builder, DummySubsystem},
Expand Down Expand Up @@ -154,10 +154,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
StatementDistributionSubsystem<rand::rngs::StdRng>,
AvailabilityDistributionSubsystem,
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem<
Spawner,
<OverseerSubsystemContext<BitfieldSigningMessage> as SubsystemContext>::Sender,
>,
BitfieldSigningSubsystem,
BitfieldDistributionSubsystem,
ProvisionerSubsystem<
Spawner,
Expand Down Expand Up @@ -207,7 +204,6 @@ where
))
.bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
.bitfield_signing(BitfieldSigningSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
))
Expand Down