+ ::core::marker::Send + 'async_trait>,
+ >
+ where
+ P: 'async_trait + Priority,
+ 'life0: 'async_trait,
+ Self: 'async_trait,
+ {
+ self.0.send_message_with_priority::(msg.into())
+ }
+
+ fn try_send_message_with_priority(
+ &mut self,
+ msg: ApprovalDistributionMessage,
+ ) -> Result<(), metered::TrySendError> {
+ self.0.try_send_message_with_priority::(msg.into()).map_err(|err| match err {
+ // Safe to unwrap because it was built from the same type.
+ metered::TrySendError::Closed(msg) =>
+ metered::TrySendError::Closed(msg.try_into().unwrap()),
+ metered::TrySendError::Full(msg) =>
+ metered::TrySendError::Full(msg.try_into().unwrap()),
+ })
+ }
+}
diff --git a/polkadot/node/core/approval-voting-parallel/src/metrics.rs b/polkadot/node/core/approval-voting-parallel/src/metrics.rs
new file mode 100644
index 000000000000..1b4ab4bd9b88
--- /dev/null
+++ b/polkadot/node/core/approval-voting-parallel/src/metrics.rs
@@ -0,0 +1,236 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! The Metrics for Approval Voting Parallel Subsystem.
+
+use std::collections::HashMap;
+
+use polkadot_node_metrics::{metered::Meter, metrics};
+use polkadot_overseer::prometheus;
+
+#[derive(Default, Clone)]
+pub struct Metrics(Option);
+
+/// Approval Voting parallel metrics.
+#[derive(Clone)]
+pub struct MetricsInner {
+ // The inner metrics of the approval distribution workers.
+ approval_distribution: polkadot_approval_distribution::metrics::Metrics,
+ // The inner metrics of the approval voting workers.
+ approval_voting: polkadot_node_core_approval_voting::Metrics,
+
+ // Time of flight metrics for bounded channels.
+ to_worker_bounded_tof: prometheus::HistogramVec,
+ // Number of elements sent to the worker's bounded queue.
+ to_worker_bounded_sent: prometheus::GaugeVec,
+ // Number of elements received by the worker's bounded queue.
+ to_worker_bounded_received: prometheus::GaugeVec,
+ // Number of times senders blocked while sending messages to the worker.
+ to_worker_bounded_blocked: prometheus::GaugeVec,
+ // Time of flight metrics for unbounded channels.
+ to_worker_unbounded_tof: prometheus::HistogramVec,
+ // Number of elements sent to the worker's unbounded queue.
+ to_worker_unbounded_sent: prometheus::GaugeVec,
+ // Number of elements received by the worker's unbounded queue.
+ to_worker_unbounded_received: prometheus::GaugeVec,
+}
+
+impl Metrics {
+ /// Get the approval distribution metrics.
+ pub fn approval_distribution_metrics(
+ &self,
+ ) -> polkadot_approval_distribution::metrics::Metrics {
+ self.0
+ .as_ref()
+ .map(|metrics_inner| metrics_inner.approval_distribution.clone())
+ .unwrap_or_default()
+ }
+
+ /// Get the approval voting metrics.
+ pub fn approval_voting_metrics(&self) -> polkadot_node_core_approval_voting::Metrics {
+ self.0
+ .as_ref()
+ .map(|metrics_inner| metrics_inner.approval_voting.clone())
+ .unwrap_or_default()
+ }
+}
+
+impl metrics::Metrics for Metrics {
+ /// Try to register the metrics.
+ fn try_register(
+ registry: &prometheus::Registry,
+ ) -> std::result::Result {
+ Ok(Metrics(Some(MetricsInner {
+ approval_distribution: polkadot_approval_distribution::metrics::Metrics::try_register(
+ registry,
+ )?,
+ approval_voting: polkadot_node_core_approval_voting::Metrics::try_register(registry)?,
+ to_worker_bounded_tof: prometheus::register(
+ prometheus::HistogramVec::new(
+ prometheus::HistogramOpts::new(
+ "polkadot_approval_voting_parallel_worker_bounded_tof",
+ "Duration spent in a particular approval voting worker channel from entrance to removal",
+ )
+ .buckets(vec![
+ 0.0001, 0.0004, 0.0016, 0.0064, 0.0256, 0.1024, 0.4096, 1.6384, 3.2768,
+ 4.9152, 6.5536,
+ ]),
+ &["worker_name"],
+ )?,
+ registry,
+ )?,
+ to_worker_bounded_sent: prometheus::register(
+ prometheus::GaugeVec::::new(
+ prometheus::Opts::new(
+ "polkadot_approval_voting_parallel_worker_bounded_sent",
+ "Number of elements sent to approval voting workers' bounded queues",
+ ),
+ &["worker_name"],
+ )?,
+ registry,
+ )?,
+ to_worker_bounded_received: prometheus::register(
+ prometheus::GaugeVec::::new(
+ prometheus::Opts::new(
+ "polkadot_approval_voting_parallel_worker_bounded_received",
+ "Number of elements received by approval voting workers' bounded queues",
+ ),
+ &["worker_name"],
+ )?,
+ registry,
+ )?,
+ to_worker_bounded_blocked: prometheus::register(
+ prometheus::GaugeVec::::new(
+ prometheus::Opts::new(
+ "polkadot_approval_voting_parallel_worker_bounded_blocked",
+ "Number of times approval voting workers blocked while sending messages to a subsystem",
+ ),
+ &["worker_name"],
+ )?,
+ registry,
+ )?,
+ to_worker_unbounded_tof: prometheus::register(
+ prometheus::HistogramVec::new(
+ prometheus::HistogramOpts::new(
+ "polkadot_approval_voting_parallel_worker_unbounded_tof",
+ "Duration spent in a particular approval voting worker channel from entrance to removal",
+ )
+ .buckets(vec![
+ 0.0001, 0.0004, 0.0016, 0.0064, 0.0256, 0.1024, 0.4096, 1.6384, 3.2768,
+ 4.9152, 6.5536,
+ ]),
+ &["worker_name"],
+ )?,
+ registry,
+ )?,
+ to_worker_unbounded_sent: prometheus::register(
+ prometheus::GaugeVec::::new(
+ prometheus::Opts::new(
+ "polkadot_approval_voting_parallel_worker_unbounded_sent",
+ "Number of elements sent to approval voting workers' unbounded queues",
+ ),
+ &["worker_name"],
+ )?,
+ registry,
+ )?,
+ to_worker_unbounded_received: prometheus::register(
+ prometheus::GaugeVec::::new(
+ prometheus::Opts::new(
+ "polkadot_approval_voting_parallel_worker_unbounded_received",
+ "Number of elements received by approval voting workers' unbounded queues",
+ ),
+ &["worker_name"],
+ )?,
+ registry,
+ )?,
+ })))
+ }
+}
+
+/// The meters to watch.
+#[derive(Clone)]
+pub struct Meters {
+ bounded: Meter,
+ unbounded: Meter,
+}
+
+impl Meters {
+ pub fn new(bounded: &Meter, unbounded: &Meter) -> Self {
+ Self { bounded: bounded.clone(), unbounded: unbounded.clone() }
+ }
+}
+
+/// A metrics watcher that watches the meters and updates the metrics.
+pub struct MetricsWatcher {
+ to_watch: HashMap,
+ metrics: Metrics,
+}
+
+impl MetricsWatcher {
+ /// Create a new metrics watcher.
+ pub fn new(metrics: Metrics) -> Self {
+ Self { to_watch: HashMap::new(), metrics }
+ }
+
+ /// Watch the meters of a worker with this name.
+ pub fn watch(&mut self, worker_name: String, meters: Meters) {
+ self.to_watch.insert(worker_name, meters);
+ }
+
+ /// Collect all the metrics.
+ pub fn collect_metrics(&self) {
+ for (name, meter) in &self.to_watch {
+ let bounded_readouts = meter.bounded.read();
+ let unbounded_readouts = meter.unbounded.read();
+ if let Some(metrics) = self.metrics.0.as_ref() {
+ metrics
+ .to_worker_bounded_sent
+ .with_label_values(&[name])
+ .set(bounded_readouts.sent as u64);
+
+ metrics
+ .to_worker_bounded_received
+ .with_label_values(&[name])
+ .set(bounded_readouts.received as u64);
+
+ metrics
+ .to_worker_bounded_blocked
+ .with_label_values(&[name])
+ .set(bounded_readouts.blocked as u64);
+
+ metrics
+ .to_worker_unbounded_sent
+ .with_label_values(&[name])
+ .set(unbounded_readouts.sent as u64);
+
+ metrics
+ .to_worker_unbounded_received
+ .with_label_values(&[name])
+ .set(unbounded_readouts.received as u64);
+
+ let hist_bounded = metrics.to_worker_bounded_tof.with_label_values(&[name]);
+ for tof in bounded_readouts.tof {
+ hist_bounded.observe(tof.as_f64());
+ }
+
+ let hist_unbounded = metrics.to_worker_unbounded_tof.with_label_values(&[name]);
+ for tof in unbounded_readouts.tof {
+ hist_unbounded.observe(tof.as_f64());
+ }
+ }
+ }
+ }
+}
diff --git a/polkadot/node/core/approval-voting-parallel/src/tests.rs b/polkadot/node/core/approval-voting-parallel/src/tests.rs
new file mode 100644
index 000000000000..215a707147fc
--- /dev/null
+++ b/polkadot/node/core/approval-voting-parallel/src/tests.rs
@@ -0,0 +1,1178 @@
+// Copyright (C) Parity Technologies (UK) Ltd.
+// This file is part of Polkadot.
+
+// Polkadot is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+
+// Polkadot is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with Polkadot. If not, see .
+
+//! The tests for Approval Voting Parallel Subsystem.
+
+use std::{
+ collections::{HashMap, HashSet},
+ future::Future,
+ sync::Arc,
+ time::Duration,
+};
+
+use crate::{
+ build_worker_handles, metrics::MetricsWatcher, prio_right, run_main_loop, start_workers,
+ validator_index_for_msg, ApprovalVotingParallelSubsystem, Metrics, WorkProvider,
+};
+use assert_matches::assert_matches;
+use futures::{channel::oneshot, future, stream::PollNext, StreamExt};
+use itertools::Itertools;
+use polkadot_node_core_approval_voting::{ApprovalVotingWorkProvider, Config};
+use polkadot_node_network_protocol::{peer_set::ValidationVersion, ObservedRole, PeerId, View};
+use polkadot_node_primitives::approval::{
+ time::SystemClock,
+ v1::{
+ AssignmentCert, AssignmentCertKind, IndirectAssignmentCert, IndirectSignedApprovalVote,
+ RELAY_VRF_MODULO_CONTEXT,
+ },
+ v2::{
+ AssignmentCertKindV2, AssignmentCertV2, CoreBitfield, IndirectAssignmentCertV2,
+ IndirectSignedApprovalVoteV2,
+ },
+};
+use polkadot_node_subsystem::{
+ messages::{ApprovalDistributionMessage, ApprovalVotingMessage, ApprovalVotingParallelMessage},
+ FromOrchestra,
+};
+use polkadot_node_subsystem_test_helpers::{mock::new_leaf, TestSubsystemContext};
+use polkadot_overseer::{ActiveLeavesUpdate, OverseerSignal, SpawnGlue, TimeoutExt};
+use polkadot_primitives::{CandidateHash, CoreIndex, Hash, ValidatorIndex};
+use sc_keystore::{Keystore, LocalKeystore};
+use sp_consensus::SyncOracle;
+use sp_consensus_babe::{VrfPreOutput, VrfProof, VrfSignature};
+use sp_core::{testing::TaskExecutor, H256};
+use sp_keyring::Sr25519Keyring;
+type VirtualOverseer =
+ polkadot_node_subsystem_test_helpers::TestSubsystemContextHandle;
+
+const SLOT_DURATION_MILLIS: u64 = 6000;
+
+pub mod test_constants {
+ pub(crate) const DATA_COL: u32 = 0;
+ pub(crate) const NUM_COLUMNS: u32 = 1;
+}
+
+fn fake_assignment_cert(block_hash: Hash, validator: ValidatorIndex) -> IndirectAssignmentCert {
+ let ctx = schnorrkel::signing_context(RELAY_VRF_MODULO_CONTEXT);
+ let msg = b"WhenParachains?";
+ let mut prng = rand_core::OsRng;
+ let keypair = schnorrkel::Keypair::generate_with(&mut prng);
+ let (inout, proof, _) = keypair.vrf_sign(ctx.bytes(msg));
+ let preout = inout.to_preout();
+
+ IndirectAssignmentCert {
+ block_hash,
+ validator,
+ cert: AssignmentCert {
+ kind: AssignmentCertKind::RelayVRFModulo { sample: 1 },
+ vrf: VrfSignature { pre_output: VrfPreOutput(preout), proof: VrfProof(proof) },
+ },
+ }
+}
+
+fn fake_assignment_cert_v2(
+ block_hash: Hash,
+ validator: ValidatorIndex,
+ core_bitfield: CoreBitfield,
+) -> IndirectAssignmentCertV2 {
+ let ctx = schnorrkel::signing_context(RELAY_VRF_MODULO_CONTEXT);
+ let msg = b"WhenParachains?";
+ let mut prng = rand_core::OsRng;
+ let keypair = schnorrkel::Keypair::generate_with(&mut prng);
+ let (inout, proof, _) = keypair.vrf_sign(ctx.bytes(msg));
+ let preout = inout.to_preout();
+
+ IndirectAssignmentCertV2 {
+ block_hash,
+ validator,
+ cert: AssignmentCertV2 {
+ kind: AssignmentCertKindV2::RelayVRFModuloCompact { core_bitfield },
+ vrf: VrfSignature { pre_output: VrfPreOutput(preout), proof: VrfProof(proof) },
+ },
+ }
+}
+
+/// Creates a meaningless signature
+pub fn dummy_signature() -> polkadot_primitives::ValidatorSignature {
+ sp_core::crypto::UncheckedFrom::unchecked_from([1u8; 64])
+}
+
+fn build_subsystem(
+ sync_oracle: Box,
+) -> (
+ ApprovalVotingParallelSubsystem,
+ TestSubsystemContext>,
+ VirtualOverseer,
+) {
+ sp_tracing::init_for_tests();
+
+ let pool = sp_core::testing::TaskExecutor::new();
+ let (context, virtual_overseer) = polkadot_node_subsystem_test_helpers::make_subsystem_context::<
+ ApprovalVotingParallelMessage,
+ _,
+ >(pool.clone());
+
+ let keystore = LocalKeystore::in_memory();
+ let _ = keystore.sr25519_generate_new(
+ polkadot_primitives::PARACHAIN_KEY_TYPE_ID,
+ Some(&Sr25519Keyring::Alice.to_seed()),
+ );
+
+ let clock = Arc::new(SystemClock {});
+ let db = kvdb_memorydb::create(test_constants::NUM_COLUMNS);
+ let db = polkadot_node_subsystem_util::database::kvdb_impl::DbAdapter::new(db, &[]);
+
+ (
+ ApprovalVotingParallelSubsystem::with_config_and_clock(
+ Config {
+ col_approval_data: test_constants::DATA_COL,
+ slot_duration_millis: SLOT_DURATION_MILLIS,
+ },
+ Arc::new(db),
+ Arc::new(keystore),
+ sync_oracle,
+ Metrics::default(),
+ clock.clone(),
+ SpawnGlue(pool),
+ None,
+ ),
+ context,
+ virtual_overseer,
+ )
+}
+
+#[derive(Clone)]
+struct TestSyncOracle {}
+
+impl SyncOracle for TestSyncOracle {
+ fn is_major_syncing(&self) -> bool {
+ false
+ }
+
+ fn is_offline(&self) -> bool {
+ unimplemented!("not used in network bridge")
+ }
+}
+
+fn test_harness(
+ num_approval_distro_workers: usize,
+ prio_right: Clos,
+ subsystem_gracefully_exits: bool,
+ test_fn: impl FnOnce(
+ VirtualOverseer,
+ WorkProvider,
+ Vec>,
+ ) -> T,
+) where
+ T: Future