Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metric to measure the time it takes to gather enough assignments #4587

Merged
merged 9 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion polkadot/node/core/approval-voting/src/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ pub(crate) mod tests {
use super::*;
use crate::{
approval_db::common::{load_block_entry, DbBackend},
RuntimeInfo, RuntimeInfoConfig,
RuntimeInfo, RuntimeInfoConfig, MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
};
use ::test_helpers::{dummy_candidate_receipt, dummy_hash};
use assert_matches::assert_matches;
Expand All @@ -622,6 +622,7 @@ pub(crate) mod tests {
node_features::FeatureIndex, ExecutorParams, Id as ParaId, IndexedVec, NodeFeatures,
SessionInfo, ValidatorId, ValidatorIndex,
};
use schnellru::{ByLength, LruMap};
pub(crate) use sp_consensus_babe::{
digests::{CompatibleDigestItem, PreDigest, SecondaryVRFPreDigest},
AllowedSlots, BabeEpochConfiguration, Epoch as BabeEpoch,
Expand Down Expand Up @@ -658,6 +659,9 @@ pub(crate) mod tests {
clock: Box::new(MockClock::default()),
assignment_criteria: Box::new(MockAssignmentCriteria::default()),
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
)),
}
}

Expand Down
162 changes: 158 additions & 4 deletions polkadot/node/core/approval-voting/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,12 @@ use sc_keystore::LocalKeystore;
use sp_application_crypto::Pair;
use sp_consensus::SyncOracle;
use sp_consensus_slots::Slot;
use std::time::Instant;

// The maximum block we keep track of assignments gathering times, on normal operation
alexggh marked this conversation as resolved.
Show resolved Hide resolved
// this would never be reached because we prune the data on finalization, but we need
// to also ensure the data is not growing unecessarily large.
const MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS: u32 = 100;
alexggh marked this conversation as resolved.
Show resolved Hide resolved

use futures::{
channel::oneshot,
Expand Down Expand Up @@ -182,6 +188,14 @@ struct MetricsInner {
time_recover_and_approve: prometheus::Histogram,
candidate_signatures_requests_total: prometheus::Counter<prometheus::U64>,
unapproved_candidates_in_unfinalized_chain: prometheus::Gauge<prometheus::U64>,
// The time it takes in each stage to gather enough assignments.
// We defined a `stage` as being the entire process of gathering enough assignments to
// be able to approve a candidate:
// E.g:
// - Stage 0: We wait for the needed_approvals assignments to be gathered.
// - Stage 1: We wait for enough tranches to cover all no-shows in stage 0.
// - Stage 2: We wait for enough tranches to cover all no-shows of stage 1.
assignments_gathering_time_by_stage: prometheus::HistogramVec,
alexggh marked this conversation as resolved.
Show resolved Hide resolved
}

/// Approval Voting metrics.
Expand Down Expand Up @@ -302,6 +316,20 @@ impl Metrics {
metrics.unapproved_candidates_in_unfinalized_chain.set(count as u64);
}
}

pub fn observe_assignment_gathering_time(&self, stage: usize, elapsed_as_millis: usize) {
if let Some(metrics) = &self.0 {
let stage_string = stage.to_string();
// We don't want to have too many metrics entries with this label to not put unncessary
// pressure on the metrics infrastructure, so we cap the stage at 10, which is
// equivalent to having already a finalization lag to 10 * no_show_slots, so it should
// be more than enough.
metrics
.assignments_gathering_time_by_stage
.with_label_values(&[if stage < 10 { stage_string.as_str() } else { "inf" }])
alexggh marked this conversation as resolved.
Show resolved Hide resolved
.observe(elapsed_as_millis as f64);
}
}
}

impl metrics::Metrics for Metrics {
Expand Down Expand Up @@ -431,6 +459,17 @@ impl metrics::Metrics for Metrics {
)?,
registry,
)?,
assignments_gathering_time_by_stage: prometheus::register(
prometheus::HistogramVec::new(
prometheus::HistogramOpts::new(
"polkadot_parachain_assignments_gather_time_by_stage_ms",
"The time in ms it takes for each stage to gather enough assignments needed for approval",
)
.buckets(vec![0.0, 250.0, 500.0, 1000.0, 2000.0, 4000.0, 8000.0, 16000.0, 32000.0]),
&["stage"],
)?,
registry,
)?,
};

Ok(Metrics(Some(metrics)))
Expand Down Expand Up @@ -788,6 +827,28 @@ struct State {
clock: Box<dyn Clock + Send + Sync>,
assignment_criteria: Box<dyn AssignmentCriteria + Send + Sync>,
spans: HashMap<Hash, jaeger::PerLeafSpan>,
// Per block, candidate records about how long we take until we gather enough
// assignments, this is relevant because it gives us a good idea about how many
// tranches we trigger and why.
per_block_assignments_gathering_times:
LruMap<BlockNumber, HashMap<(Hash, CandidateHash), AssignmentGatheringRecord>>,
}

#[derive(Debug, Clone, PartialEq, Eq)]
struct AssignmentGatheringRecord {
// The stage we are in.
// Candidate assignment gathering goes in stages, first we wait for needed_approvals(stage 0)
// Then if we have no-shows, we move into stage 1 and wait for enough tranches to cover all
// no-shows.
stage: usize,
// The time we started the stage.
stage_start: Option<Instant>,
}

impl Default for AssignmentGatheringRecord {
fn default() -> Self {
AssignmentGatheringRecord { stage: 0, stage_start: Some(Instant::now()) }
}
}

#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
Expand Down Expand Up @@ -893,6 +954,88 @@ impl State {
},
}
}

fn mark_begining_of_gathering_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) {
if let Some(record) = self
.per_block_assignments_gathering_times
.get_or_insert(block_number, HashMap::new)
.and_then(|records| Some(records.entry((block_hash, candidate)).or_default()))
{
if record.stage_start.is_none() {
record.stage += 1;
gum::debug!(
target: LOG_TARGET,
stage = ?record.stage,
?block_hash,
?candidate,
"Started a new assignment gathering stage",
);
record.stage_start = Some(Instant::now());
}
}
}

fn mark_gathered_enough_assignments(
&mut self,
block_number: BlockNumber,
block_hash: Hash,
candidate: CandidateHash,
) -> AssignmentGatheringRecord {
let record = self
.per_block_assignments_gathering_times
.get(&block_number)
.and_then(|entry| entry.get_mut(&(block_hash, candidate)));
let stage = record.as_ref().map(|record| record.stage).unwrap_or_default();
AssignmentGatheringRecord {
stage,
stage_start: record.and_then(|record| record.stage_start.take()),
}
}

fn cleanup_assignments_gathering_timestamp(&mut self, keep_greater_than: BlockNumber) {
while let Some((block_number, _)) = self.per_block_assignments_gathering_times.peek_oldest()
{
if *block_number < keep_greater_than {
ordian marked this conversation as resolved.
Show resolved Hide resolved
self.per_block_assignments_gathering_times.pop_oldest();
} else {
break
}
}
}

fn observe_assignment_gathering_status(
alexggh marked this conversation as resolved.
Show resolved Hide resolved
&mut self,
metrics: &Metrics,
required_tranches: &RequiredTranches,
block_hash: Hash,
block_number: BlockNumber,
candidate_hash: CandidateHash,
) {
match required_tranches {
RequiredTranches::All | RequiredTranches::Pending { .. } => {
self.mark_begining_of_gathering_assignments(
block_number,
block_hash,
candidate_hash,
);
},
RequiredTranches::Exact { .. } => {
let time_to_gather =
self.mark_gathered_enough_assignments(block_number, block_hash, candidate_hash);
if let Some(gathering_started) = time_to_gather.stage_start {
metrics.observe_assignment_gathering_time(
time_to_gather.stage,
gathering_started.elapsed().as_millis() as usize,
)
}
},
}
}
}

#[derive(Debug, Clone)]
Expand Down Expand Up @@ -941,6 +1084,9 @@ where
clock: subsystem.clock,
assignment_criteria,
spans: HashMap::new(),
per_block_assignments_gathering_times: LruMap::new(ByLength::new(
MAX_BLOCKS_WITH_ASSIGNMENT_TIMESTAMPS,
)),
};

// `None` on start-up. Gets initialized/updated on leaf update
Expand Down Expand Up @@ -972,7 +1118,7 @@ where
subsystem.metrics.on_wakeup();
process_wakeup(
&mut ctx,
&state,
&mut state,
&mut overlayed_db,
&mut session_info_provider,
woken_block,
Expand Down Expand Up @@ -1628,6 +1774,7 @@ async fn handle_from_overseer<Context>(
// `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly.
wakeups.prune_finalized_wakeups(block_number, &mut state.spans);
state.cleanup_assignments_gathering_timestamp(block_number);

// // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans
// accordingly. let hash_set =
Expand Down Expand Up @@ -2474,7 +2621,7 @@ where

async fn check_and_import_approval<T, Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2706,7 +2853,7 @@ impl ApprovalStateTransition {
// as necessary and schedules any further wakeups.
async fn advance_approval_state<Sender>(
sender: &mut Sender,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
metrics: &Metrics,
Expand Down Expand Up @@ -2757,6 +2904,13 @@ where
approval_entry,
status.required_tranches.clone(),
);
state.observe_assignment_gathering_status(
&metrics,
&status.required_tranches,
block_hash,
block_entry.block_number(),
candidate_hash,
);

// Check whether this is approved, while allowing a maximum
// assignment tick of `now - APPROVAL_DELAY` - that is, that
Expand Down Expand Up @@ -2937,7 +3091,7 @@ fn should_trigger_assignment(
#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)]
async fn process_wakeup<Context>(
ctx: &mut Context,
state: &State,
state: &mut State,
db: &mut OverlayedBackend<'_, impl Backend>,
session_info_provider: &mut RuntimeInfo,
relay_block: Hash,
Expand Down
Loading
Loading