Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
bitfield-signing: remove util::jobs usage (#5523)
Browse files Browse the repository at this point in the history
  • Loading branch information
slumber authored and vstakhov committed May 19, 2022
1 parent 0183579 commit 380356e
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 123 deletions.
271 changes: 155 additions & 116 deletions node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,21 @@ use futures::{
channel::{mpsc, oneshot},
future,
lock::Mutex,
prelude::*,
Future,
FutureExt,
};
use polkadot_node_subsystem::{
errors::RuntimeApiError,
jaeger,
messages::{
AvailabilityStoreMessage, BitfieldDistributionMessage, BitfieldSigningMessage,
RuntimeApiMessage, RuntimeApiRequest,
AvailabilityStoreMessage, BitfieldDistributionMessage, RuntimeApiMessage, RuntimeApiRequest,
},
overseer, ActivatedLeaf, LeafStatus, PerLeafSpan, SubsystemSender,
overseer, ActivatedLeaf, FromOverseer, LeafStatus, OverseerSignal, PerLeafSpan,
SpawnedSubsystem, SubsystemError, SubsystemResult, SubsystemSender,
};
use polkadot_node_subsystem_util::{self as util, JobSender, JobSubsystem, JobTrait, Validator};
use polkadot_node_subsystem_util::{self as util, Validator};
use polkadot_primitives::v2::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use sp_keystore::{Error as KeystoreError, SyncCryptoStorePtr};
use std::{iter::FromIterator, pin::Pin, time::Duration};
use std::{collections::HashMap, iter::FromIterator, time::Duration};
use wasm_timer::{Delay, Instant};

mod metrics;
Expand All @@ -49,12 +48,10 @@ use self::metrics::Metrics;
mod tests;

/// Delay between starting a bitfield signing job and its attempting to create a bitfield.
const JOB_DELAY: Duration = Duration::from_millis(1500);
const SPAWNED_TASK_DELAY: Duration = Duration::from_millis(1500);
const LOG_TARGET: &str = "parachain::bitfield-signing";

/// Each `BitfieldSigningJob` prepares a signed bitfield for a single relay parent.
pub struct BitfieldSigningJob<Sender>(std::marker::PhantomData<Sender>);

// TODO: use `fatality` (https://github.com/paritytech/polkadot/issues/5540).
/// Errors we may encounter in the course of executing the `BitfieldSigningSubsystem`.
#[derive(Debug, thiserror::Error)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -182,114 +179,156 @@ async fn construct_availability_bitfield(
Ok(AvailabilityBitfield(core_bits))
}

impl<Sender> JobTrait for BitfieldSigningJob<Sender>
/// The bitfield signing subsystem.
pub struct BitfieldSigningSubsystem {
keystore: SyncCryptoStorePtr,
metrics: Metrics,
}

impl BitfieldSigningSubsystem {
/// Create a new instance of the `BitfieldSigningSubsystem`.
pub fn new(keystore: SyncCryptoStorePtr, metrics: Metrics) -> Self {
Self { keystore, metrics }
}
}

#[overseer::subsystem(BitfieldSigning, error=SubsystemError, prefix=self::overseer)]
impl<Context> BitfieldSigningSubsystem {
fn start(self, ctx: Context) -> SpawnedSubsystem {
let future = async move {
run(ctx, self.keystore, self.metrics)
.await
.map_err(|e| SubsystemError::with_origin("bitfield-signing", e))
}
.boxed();

SpawnedSubsystem { name: "bitfield-signing-subsystem", future }
}
}

#[overseer::contextbounds(BitfieldSigning, prefix = self::overseer)]
async fn run<Context>(
mut ctx: Context,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
) -> SubsystemResult<()> {
// Track spawned jobs per active leaf.
let mut running = HashMap::<Hash, future::AbortHandle>::new();

loop {
match ctx.recv().await? {
FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => {
// Abort jobs for deactivated leaves.
for leaf in &update.deactivated {
if let Some(handle) = running.remove(leaf) {
handle.abort();
}
}

for leaf in update.activated {
let sender = ctx.sender().clone();
let leaf_hash = leaf.hash;

let (fut, handle) = future::abortable(handle_active_leaves_update(
sender,
leaf,
keystore.clone(),
metrics.clone(),
));

running.insert(leaf_hash, handle);

ctx.spawn("bitfield-signing-job", fut.map(drop).boxed())?;
}
},
FromOverseer::Signal(OverseerSignal::BlockFinalized(..)) => {},
FromOverseer::Signal(OverseerSignal::Conclude) => return Ok(()),
FromOverseer::Communication { .. } => {},
}
}
}

async fn handle_active_leaves_update<Sender>(
mut sender: Sender,
leaf: ActivatedLeaf,
keystore: SyncCryptoStorePtr,
metrics: Metrics,
) -> Result<(), Error>
where
Sender: overseer::BitfieldSigningSenderTrait + Unpin,
Sender: overseer::BitfieldSigningSenderTrait,
{
type ToJob = BitfieldSigningMessage;
type OutgoingMessages = overseer::BitfieldSigningOutgoingMessages;
type Sender = Sender;
type Error = Error;
type RunArgs = SyncCryptoStorePtr;
type Metrics = Metrics;

const NAME: &'static str = "bitfield-signing-job";

/// Run a job for the parent block indicated
fn run(
leaf: ActivatedLeaf,
keystore: Self::RunArgs,
metrics: Self::Metrics,
_receiver: mpsc::Receiver<BitfieldSigningMessage>,
mut sender: JobSender<Sender>,
) -> Pin<Box<dyn Future<Output = Result<(), Self::Error>> + Send>> {
let metrics = metrics.clone();
async move {
if let LeafStatus::Stale = leaf.status {
gum::debug!(
if let LeafStatus::Stale = leaf.status {
gum::debug!(
target: LOG_TARGET,
relay_parent = ?leaf.hash,
block_number = ?leaf.number,
"Skip bitfield signing for stale leaf"
);
return Ok(())
}

let span = PerLeafSpan::new(leaf.span, "bitfield-signing");
let span_delay = span.child("delay");
let wait_until = Instant::now() + SPAWNED_TASK_DELAY;

// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};

// wait a bit before doing anything else
Delay::new_at(wait_until).await?;

// this timer does not appear at the head of the function because we don't want to include
// SPAWNED_TASK_DELAY each time.
let _timer = metrics.time_run();

drop(span_delay);
let span_availability = span.child("availability");

let bitfield = match construct_availability_bitfield(
leaf.hash,
&span_availability,
validator.index(),
&mut sender,
)
.await
{
Err(Error::Runtime(runtime_err)) => {
// Don't take down the node on runtime API errors.
gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(())
},
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};

drop(span_availability);
let span_signing = span.child("signing");

let signed_bitfield =
match validator.sign(keystore, bitfield).await.map_err(|e| Error::Keystore(e))? {
Some(b) => b,
None => {
gum::error!(
target: LOG_TARGET,
hash = ?leaf.hash,
block_number = ?leaf.number,
"Stale leaf - don't sign bitfields."
"Key was found at construction, but while signing it could not be found.",
);
return Ok(())
}

let span = PerLeafSpan::new(leaf.span, "bitfield-signing");
let _span = span.child("delay");
let wait_until = Instant::now() + JOB_DELAY;

// now do all the work we can before we need to wait for the availability store
// if we're not a validator, we can just succeed effortlessly
let validator = match Validator::new(leaf.hash, keystore.clone(), &mut sender).await {
Ok(validator) => validator,
Err(util::Error::NotAValidator) => return Ok(()),
Err(err) => return Err(Error::Util(err)),
};

// wait a bit before doing anything else
Delay::new_at(wait_until).await?;

// this timer does not appear at the head of the function because we don't want to include
// JOB_DELAY each time.
let _timer = metrics.time_run();

drop(_span);
let span_availability = span.child("availability");

let bitfield = match construct_availability_bitfield(
leaf.hash,
&span_availability,
validator.index(),
sender.subsystem_sender(),
)
.await
{
Err(Error::Runtime(runtime_err)) => {
// Don't take down the node on runtime API errors.
gum::warn!(target: LOG_TARGET, err = ?runtime_err, "Encountered a runtime API error");
return Ok(())
},
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};

drop(span_availability);
let _span = span.child("signing");

let signed_bitfield = match validator
.sign(keystore.clone(), bitfield)
.await
.map_err(|e| Error::Keystore(e))?
{
Some(b) => b,
None => {
gum::error!(
target: LOG_TARGET,
"Key was found at construction, but while signing it could not be found.",
);
return Ok(())
},
};

metrics.on_bitfield_signed();

drop(_span);
let _span = span.child("gossip");

sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(
leaf.hash,
signed_bitfield,
))
.await;

Ok(())
}
.boxed()
}
}
},
};

metrics.on_bitfield_signed();

/// `BitfieldSigningSubsystem` manages a number of bitfield signing jobs.
pub type BitfieldSigningSubsystem<Spawner, Sender> =
JobSubsystem<BitfieldSigningJob<Sender>, Spawner>;
drop(span_signing);
let _span_gossip = span.child("gossip");

sender
.send_message(BitfieldDistributionMessage::DistributeBitfield(leaf.hash, signed_bitfield))
.await;

Ok(())
}
2 changes: 1 addition & 1 deletion node/core/bitfield-signing/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use super::*;
use futures::{executor::block_on, pin_mut};
use futures::{executor::block_on, pin_mut, StreamExt};
use polkadot_node_subsystem::messages::AllMessages;
use polkadot_primitives::v2::{CandidateHash, OccupiedCore};
use test_helpers::dummy_candidate_descriptor;
Expand Down
8 changes: 2 additions & 6 deletions node/service/src/overseer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use polkadot_node_core_chain_selection::Config as ChainSelectionConfig;
use polkadot_node_core_dispute_coordinator::Config as DisputeCoordinatorConfig;
use polkadot_node_core_provisioner::ProvisionerConfig;
use polkadot_node_network_protocol::request_response::{v1 as request_v1, IncomingRequestReceiver};
use polkadot_node_subsystem_types::messages::{BitfieldSigningMessage, ProvisionerMessage};
use polkadot_node_subsystem_types::messages::ProvisionerMessage;
#[cfg(any(feature = "malus", test))]
pub use polkadot_overseer::{
dummy::{dummy_overseer_builder, DummySubsystem},
Expand Down Expand Up @@ -156,10 +156,7 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>(
StatementDistributionSubsystem<rand::rngs::StdRng>,
AvailabilityDistributionSubsystem,
AvailabilityRecoverySubsystem,
BitfieldSigningSubsystem<
SpawnGlue<Spawner>,
<OverseerSubsystemContext<BitfieldSigningMessage> as SubsystemContext>::Sender,
>,
BitfieldSigningSubsystem,
BitfieldDistributionSubsystem,
ProvisionerSubsystem<
SpawnGlue<Spawner>,
Expand Down Expand Up @@ -211,7 +208,6 @@ where
))
.bitfield_distribution(BitfieldDistributionSubsystem::new(Metrics::register(registry)?))
.bitfield_signing(BitfieldSigningSubsystem::new(
spawner.clone(),
keystore.clone(),
Metrics::register(registry)?,
))
Expand Down

0 comments on commit 380356e

Please sign in to comment.