From 3a420fbc4244596c8ab608d52621ea0d603a1228 Mon Sep 17 00:00:00 2001 From: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> Date: Tue, 28 May 2024 18:53:53 +0300 Subject: [PATCH] Add metric to measure the time it takes to gather enough assignments (#4587) To understand with high granularity how many assignment tranches are triggered before we concur that we have enough assignments. This metric is important because the triggering of an assignment creates a lot of work in the system for approving the candidate and gossiping the necessary messages. --------- Signed-off-by: Alexandru Gheorghe Co-authored-by: ordian --- .../node/core/approval-voting/src/import.rs | 6 +- polkadot/node/core/approval-voting/src/lib.rs | 170 ++++++++++++- .../node/core/approval-voting/src/tests.rs | 236 +++++++++++++++++- 3 files changed, 406 insertions(+), 6 deletions(-) diff --git a/polkadot/node/core/approval-voting/src/import.rs b/polkadot/node/core/approval-voting/src/import.rs index f4be42a484508..13b0b1bae1bc3 100644 --- a/polkadot/node/core/approval-voting/src/import.rs +++ b/polkadot/node/core/approval-voting/src/import.rs @@ -607,7 +607,7 @@ pub(crate) mod tests { use super::*; use crate::{ approval_db::common::{load_block_entry, DbBackend}, - RuntimeInfo, RuntimeInfoConfig, + RuntimeInfo, RuntimeInfoConfig, MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, }; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; @@ -622,6 +622,7 @@ pub(crate) mod tests { node_features::FeatureIndex, ExecutorParams, Id as ParaId, IndexedVec, NodeFeatures, SessionInfo, ValidatorId, ValidatorIndex, }; + use schnellru::{ByLength, LruMap}; pub(crate) use sp_consensus_babe::{ digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest}, AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch, @@ -658,6 +659,9 @@ pub(crate) mod tests { clock: Box::new(MockClock::default()), assignment_criteria: Box::new(MockAssignmentCriteria::default()), spans: HashMap::new(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), } } diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index c667aee73613f..eece6b15805c2 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -63,6 +63,12 @@ use sc_keystore::LocalKeystore; use sp_application_crypto::Pair; use sp_consensus::SyncOracle; use sp_consensus_slots::Slot; +use std::time::Instant; + +// The max number of blocks we keep track of assignments gathering times. Normally, +// this would never be reached because we prune the data on finalization, but we need +// to also ensure the data is not growing unecessarily large. +const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100; use futures::{ channel::oneshot, @@ -182,6 +188,14 @@ struct MetricsInner { time_recover_and_approve: prometheus::Histogram, candidate_signatures_requests_total: prometheus::Counter, unapproved_candidates_in_unfinalized_chain: prometheus::Gauge, + // The time it takes in each stage to gather enough assignments. + // We defined a `stage` as being the entire process of gathering enough assignments to + // be able to approve a candidate: + // E.g: + // - Stage 0: We wait for the needed_approvals assignments to be gathered. + // - Stage 1: We wait for enough tranches to cover all no-shows in stage 0. + // - Stage 2: We wait for enough tranches to cover all no-shows of stage 1. + assignments_gathering_time_by_stage: prometheus::HistogramVec, } /// Approval Voting metrics. @@ -302,6 +316,20 @@ impl Metrics { metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64); } } + + pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) { + if let Some(metrics) = &self.0 { + let stage_string = stage.to_string(); + // We don't want to have too many metrics entries with this label to not put unncessary + // pressure on the metrics infrastructure, so we cap the stage at 10, which is + // equivalent to having already a finalization lag to 10 * no_show_slots, so it should + // be more than enough. + metrics + .assignments_gathering_time_by_stage + .with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }]) + .observe(elapsed_as_millis as f64); + } + } } impl metrics::Metrics for Metrics { @@ -431,6 +459,17 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + assignments_gathering_time_by_stage: prometheus::register( + prometheus::HistogramVec::new( + prometheus::HistogramOpts::new( + "polkadot_parachain_assignments_gather_time_by_stage_ms", + "The time in ms it takes for each stage to gather enough assignments needed for approval", + ) + .buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]), + &["stage"], + )?, + registry, + )?, }; Ok(Metrics(Some(metrics))) @@ -788,6 +827,28 @@ struct State { clock: Box, assignment_criteria: Box, spans: HashMap, + // Per block, candidate records about how long we take until we gather enough + // assignments, this is relevant because it gives us a good idea about how many + // tranches we trigger and why. + per_block_assignments_gathering_times: + LruMap>, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +struct AssignmentGatheringRecord { + // The stage we are in. + // Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0) + // Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all + // no-shows. + stage: usize, + // The time we started the stage. + stage_start: Option, +} + +impl Default for AssignmentGatheringRecord { + fn default() -> Self { + AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) } + } } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] @@ -893,6 +954,96 @@ impl State { }, } } + + fn mark_begining_of_gathering_assignments( + &mut self, + block_number: BlockNumber, + block_hash: Hash, + candidate: CandidateHash, + ) { + if let Some(record) = self + .per_block_assignments_gathering_times + .get_or_insert(block_number, HashMap::new) + .and_then(|records| Some(records.entry((block_hash, candidate)).or_default())) + { + if record.stage_start.is_none() { + record.stage += 1; + gum::debug!( + target: LOG_TARGET, + stage = ?record.stage, + ?block_hash, + ?candidate, + "Started a new assignment gathering stage", + ); + record.stage_start = Some(Instant::now()); + } + } + } + + fn mark_gathered_enough_assignments( + &mut self, + block_number: BlockNumber, + block_hash: Hash, + candidate: CandidateHash, + ) -> AssignmentGatheringRecord { + let record = self + .per_block_assignments_gathering_times + .get(&block_number) + .and_then(|entry| entry.get_mut(&(block_hash, candidate))); + let stage = record.as_ref().map(|record| record.stage).unwrap_or_default(); + AssignmentGatheringRecord { + stage, + stage_start: record.and_then(|record| record.stage_start.take()), + } + } + + fn cleanup_assignments_gathering_timestamp(&mut self, remove_lower_than: BlockNumber) { + while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest() + { + if *block_number < remove_lower_than { + self.per_block_assignments_gathering_times.pop_oldest(); + } else { + break + } + } + } + + fn observe_assignment_gathering_status( + &mut self, + metrics: &Metrics, + required_tranches: &RequiredTranches, + block_hash: Hash, + block_number: BlockNumber, + candidate_hash: CandidateHash, + ) { + match required_tranches { + RequiredTranches::All | RequiredTranches::Pending { .. } => { + self.mark_begining_of_gathering_assignments( + block_number, + block_hash, + candidate_hash, + ); + }, + RequiredTranches::Exact { .. } => { + let time_to_gather = + self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash); + if let Some(gathering_started) = time_to_gather.stage_start { + if gathering_started.elapsed().as_millis() > 6000 { + gum::trace!( + target: LOG_TARGET, + ?block_hash, + ?candidate_hash, + "Long assignment gathering time", + ); + } + metrics.observe_assignment_gathering_time( + time_to_gather.stage, + gathering_started.elapsed().as_millis() as usize, + ) + } + }, + } + } } #[derive(Debug, Clone)] @@ -942,6 +1093,9 @@ where clock: subsystem.clock, assignment_criteria, spans: HashMap::new(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), }; // `None` on start-up. Gets initialized/updated on leaf update @@ -973,7 +1127,7 @@ where subsystem.metrics.on_wakeup(); process_wakeup( &mut ctx, - &state, + &mut state, &mut overlayed_db, &mut session_info_provider, woken_block, @@ -1632,6 +1786,7 @@ async fn handle_from_overseer( // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans // accordingly. wakeups.prune_finalized_wakeups(block_number, &mut state.spans); + state.cleanup_assignments_gathering_timestamp(block_number); // // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans // accordingly. let hash_set = @@ -2478,7 +2633,7 @@ where async fn check_and_import_approval( sender: &mut Sender, - state: &State, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, metrics: &Metrics, @@ -2710,7 +2865,7 @@ impl ApprovalStateTransition { // as necessary and schedules any further wakeups. async fn advance_approval_state( sender: &mut Sender, - state: &State, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, metrics: &Metrics, @@ -2761,6 +2916,13 @@ where approval_entry, status.required_tranches.clone(), ); + state.observe_assignment_gathering_status( + &metrics, + &status.required_tranches, + block_hash, + block_entry.block_number(), + candidate_hash, + ); // Check whether this is approved, while allowing a maximum // assignment tick of `now - APPROVAL_DELAY` - that is, that @@ -2941,7 +3103,7 @@ fn should_trigger_assignment( #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] async fn process_wakeup( ctx: &mut Context, - state: &State, + state: &mut State, db: &mut OverlayedBackend<'_, impl Backend>, session_info_provider: &mut RuntimeInfo, relay_block: Hash, diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index c3709de59e8bc..43af8d476a6ba 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -17,6 +17,10 @@ use self::test_helpers::mock::new_leaf; use super::*; use crate::backend::V1ReadBackend; +use overseer::prometheus::{ + prometheus::{IntCounter, IntCounterVec}, + Histogram, HistogramOpts, HistogramVec, Opts, +}; use polkadot_node_primitives::{ approval::{ v1::{ @@ -40,7 +44,7 @@ use polkadot_primitives::{ ApprovalVote, CandidateCommitments, CandidateEvent, CoreIndex, GroupIndex, Header, Id as ParaId, IndexedVec, NodeFeatures, ValidationCode, ValidatorSignature, }; -use std::time::Duration; +use std::{cmp::max, time::Duration}; use assert_matches::assert_matches; use async_trait::async_trait; @@ -5049,3 +5053,233 @@ fn subsystem_sends_pending_approvals_on_approval_restart() { virtual_overseer }); } + +// Test we correctly update the timer when we mark the beginning of gathering assignments. +#[test] +fn test_gathering_assignments_statements() { + let mut state = State { + keystore: Arc::new(LocalKeystore::in_memory()), + slot_duration_millis: 6_000, + clock: Box::new(MockClock::default()), + assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), + spans: HashMap::new(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), + }; + + for i in 0..200i32 { + state.mark_begining_of_gathering_assignments( + i as u32, + Hash::repeat_byte(i as u8), + CandidateHash(Hash::repeat_byte(i as u8)), + ); + assert!( + state.per_block_assignments_gathering_times.len() <= + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize + ); + + assert_eq!( + state + .per_block_assignments_gathering_times + .iter() + .map(|(block_number, _)| block_number) + .min(), + Some(max(0, i - MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as i32 + 1) as u32).as_ref() + ) + } + assert_eq!( + state.per_block_assignments_gathering_times.len(), + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS as usize + ); + + let nothing_changes = state + .per_block_assignments_gathering_times + .iter() + .map(|(block_number, _)| *block_number) + .sorted() + .collect::>(); + + for i in 150..200i32 { + state.mark_begining_of_gathering_assignments( + i as u32, + Hash::repeat_byte(i as u8), + CandidateHash(Hash::repeat_byte(i as u8)), + ); + assert_eq!( + nothing_changes, + state + .per_block_assignments_gathering_times + .iter() + .map(|(block_number, _)| *block_number) + .sorted() + .collect::>() + ); + } + + for i in 110..120 { + let block_hash = Hash::repeat_byte(i as u8); + let candidate_hash = CandidateHash(Hash::repeat_byte(i as u8)); + + state.mark_gathered_enough_assignments(i as u32, block_hash, candidate_hash); + + assert!(state + .per_block_assignments_gathering_times + .get(&i) + .unwrap() + .get(&(block_hash, candidate_hash)) + .unwrap() + .stage_start + .is_none()); + state.mark_begining_of_gathering_assignments(i as u32, block_hash, candidate_hash); + let record = state + .per_block_assignments_gathering_times + .get(&i) + .unwrap() + .get(&(block_hash, candidate_hash)) + .unwrap(); + + assert!(record.stage_start.is_some()); + assert_eq!(record.stage, 1); + } + + state.cleanup_assignments_gathering_timestamp(200); + assert_eq!(state.per_block_assignments_gathering_times.len(), 0); +} + +// Test we note the time we took to transition RequiredTranche from Pending to Exact and +// that we increase the stage when we transition from Exact to Pending. +#[test] +fn test_observe_assignment_gathering_status() { + let mut state = State { + keystore: Arc::new(LocalKeystore::in_memory()), + slot_duration_millis: 6_000, + clock: Box::new(MockClock::default()), + assignment_criteria: Box::new(MockAssignmentCriteria::check_only(|_| Ok(0))), + spans: HashMap::new(), + per_block_assignments_gathering_times: LruMap::new(ByLength::new( + MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS, + )), + }; + + let metrics_inner = MetricsInner { + imported_candidates_total: IntCounter::new("dummy", "dummy").unwrap(), + assignments_produced: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")).unwrap(), + approvals_produced_total: IntCounterVec::new(Opts::new("dummy", "dummy"), &["dummy"]) + .unwrap(), + no_shows_total: IntCounter::new("dummy", "dummy").unwrap(), + observed_no_shows: IntCounter::new("dummy", "dummy").unwrap(), + approved_by_one_third: IntCounter::new("dummy", "dummy").unwrap(), + wakeups_triggered_total: IntCounter::new("dummy", "dummy").unwrap(), + coalesced_approvals_buckets: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + coalesced_approvals_delay: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + candidate_approval_time_ticks: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + block_approval_time_ticks: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + time_db_transaction: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")).unwrap(), + time_recover_and_approve: Histogram::with_opts(HistogramOpts::new("dummy", "dummy")) + .unwrap(), + candidate_signatures_requests_total: IntCounter::new("dummy", "dummy").unwrap(), + unapproved_candidates_in_unfinalized_chain: prometheus::Gauge::::new( + "dummy", "dummy", + ) + .unwrap(), + assignments_gathering_time_by_stage: HistogramVec::new( + HistogramOpts::new("test", "test"), + &["stage"], + ) + .unwrap(), + }; + + let metrics = Metrics(Some(metrics_inner)); + let block_hash = Hash::repeat_byte(1); + let candidate_hash = CandidateHash(Hash::repeat_byte(1)); + let block_number = 1; + + // Transition from Pending to Exact and check stage 0 time is recorded. + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Pending { + considered: 0, + next_no_show: None, + maximum_broadcast: 0, + clock_drift: 0, + }, + block_hash, + block_number, + candidate_hash, + ); + + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Exact { + needed: 2, + tolerated_missing: 2, + next_no_show: None, + last_assignment_tick: None, + }, + block_hash, + block_number, + candidate_hash, + ); + + let value = metrics + .0 + .as_ref() + .unwrap() + .assignments_gathering_time_by_stage + .get_metric_with_label_values(&["0"]) + .unwrap(); + + assert_eq!(value.get_sample_count(), 1); + + // Transition from Exact to Pending to Exact and check stage 1 time is recorded. + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Pending { + considered: 0, + next_no_show: None, + maximum_broadcast: 0, + clock_drift: 0, + }, + block_hash, + block_number, + candidate_hash, + ); + + state.observe_assignment_gathering_status( + &metrics, + &RequiredTranches::Exact { + needed: 2, + tolerated_missing: 2, + next_no_show: None, + last_assignment_tick: None, + }, + block_hash, + block_number, + candidate_hash, + ); + + let value = metrics + .0 + .as_ref() + .unwrap() + .assignments_gathering_time_by_stage + .get_metric_with_label_values(&["0"]) + .unwrap(); + + assert_eq!(value.get_sample_count(), 1); + + let value = metrics + .0 + .as_ref() + .unwrap() + .assignments_gathering_time_by_stage + .get_metric_with_label_values(&["1"]) + .unwrap(); + + assert_eq!(value.get_sample_count(), 1); +}