From 767fbef6323cbb560d2f7cf62b024a1a3b6cd524 Mon Sep 17 00:00:00 2001 From: Zekun Li Date: Thu, 5 Dec 2024 14:31:06 -0800 Subject: [PATCH] [Pipeline] add counters --- consensus/src/counters.rs | 15 ++++- consensus/src/lib.rs | 2 +- consensus/src/pipeline/pipeline_builder.rs | 74 ++++++++++++++++------ 3 files changed, 69 insertions(+), 22 deletions(-) diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index f55d9d4ff9f37..8c901c7bbd599 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -836,7 +836,7 @@ pub static NUM_BYTES_PER_BLOCK: Lazy = Lazy::new(|| { // * 0.3 to 2.0: step 0.1 // * 2.0 to 4.0: step 0.2 // * 4.0 to 7.5: step 0.5 -const BLOCK_TRACING_BUCKETS: &[f64] = &[ +const TRACING_BUCKETS: &[f64] = &[ 0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1, 1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.2, 2.4, 2.6, 2.8, 3.0, 3.2, 3.4, 3.6, 3.8, 4.0, 4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 10.0, @@ -848,7 +848,18 @@ pub static BLOCK_TRACING: Lazy = Lazy::new(|| { "aptos_consensus_block_tracing", "Histogram for different stages of a block", &["stage"], - BLOCK_TRACING_BUCKETS.to_vec() + TRACING_BUCKETS.to_vec() + ) + .unwrap() +}); + +/// Traces pipeline stages +pub static PIPELINE_TRACING: Lazy = Lazy::new(|| { + register_histogram_vec!( + "aptos_consensus_pipeline_tracing", + "Histogram for different stages of a block's pipeline", + &["stage", "type"], + TRACING_BUCKETS.to_vec() ) .unwrap() }); diff --git a/consensus/src/lib.rs b/consensus/src/lib.rs index aa1bc3234558e..a38b66912e191 100644 --- a/consensus/src/lib.rs +++ b/consensus/src/lib.rs @@ -75,7 +75,7 @@ pub use quorum_store::quorum_store_db::QUORUM_STORE_DB_NAME; #[cfg(feature = "fuzzing")] pub use round_manager::round_manager_fuzzing; -struct IntGaugeGuard { +pub struct IntGaugeGuard { gauge: IntGauge, } diff --git a/consensus/src/pipeline/pipeline_builder.rs b/consensus/src/pipeline/pipeline_builder.rs index 94577fad0511b..a71f9927fed3c 100644 --- a/consensus/src/pipeline/pipeline_builder.rs +++ b/consensus/src/pipeline/pipeline_builder.rs @@ -10,6 +10,7 @@ use crate::{ monitor, payload_manager::TPayloadManager, txn_notifier::TxnNotifier, + IntGaugeGuard, }; use anyhow::anyhow; use aptos_consensus_notifications::ConsensusNotificationSender; @@ -104,18 +105,30 @@ struct Tracker { block_id: HashValue, epoch: u64, round: Round, + created_at: Instant, + started_at: Option, + running_guard: Option, } impl Tracker { - pub fn new(name: &'static str, block: &Block) -> Self { - let ret = Self { + fn start_waiting(name: &'static str, block: &Block) -> Self { + Self { name, block_id: block.id(), epoch: block.epoch(), round: block.round(), - }; - ret.log_start(); - ret + created_at: Instant::now(), + started_at: None, + running_guard: None, + } + } + + fn start_working(&mut self) { + self.started_at = Some(Instant::now()); + self.running_guard = Some(IntGaugeGuard::new( + counters::OP_COUNTERS.gauge(&format!("{}_running", self.name)), + )); + self.log_start(); } fn log_start(&self) { @@ -126,9 +139,25 @@ impl Tracker { } fn log_end(&self) { + let Some(started_at) = self.started_at else { + return; + }; + let wait_time = started_at.duration_since(self.created_at); + let work_time = Instant::now().duration_since(started_at); + counters::PIPELINE_TRACING + .with_label_values(&[self.name, "wait_time"]) + .observe(wait_time.as_secs_f64()); + counters::PIPELINE_TRACING + .with_label_values(&[self.name, "work_time"]) + .observe(work_time.as_secs_f64()); info!( - "[Pipeline] Block {} {} {} finishes {}", - self.block_id, self.epoch, self.round, self.name + "[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}", + self.block_id, + self.epoch, + self.round, + self.name, + wait_time.as_millis(), + work_time.as_millis() ); } } @@ -139,7 +168,6 @@ impl Drop for Tracker { } } -// TODO: add counters for each phase impl PipelineBuilder { pub fn new( block_preparer: Arc, @@ -379,7 +407,8 @@ impl PipelineBuilder { /// Precondition: Block is inserted into block tree (all ancestors are available) /// What it does: Wait for all data becomes available and verify transaction signatures async fn prepare(preparer: Arc, block: Arc) -> TaskResult { - let _tracker = Tracker::new("prepare", &block); + let mut tracker = Tracker::start_waiting("prepare", &block); + tracker.start_working(); // the loop can only be abort by the caller let input_txns = loop { match preparer.prepare_block(&block).await { @@ -420,13 +449,14 @@ impl PipelineBuilder { validator: Arc<[AccountAddress]>, onchain_execution_config: BlockExecutorConfigFromOnchain, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("execute", &block); parent_block_execute_phase.await?; let user_txns = prepare_phase.await?; let maybe_rand = randomness_rx .await .map_err(|_| anyhow!("randomness tx cancelled"))?; - let _tracker = Tracker::new("execute", &block); + tracker.start_working(); let metadata_txn = if is_randomness_enabled { block.new_metadata_with_randomness(&validator, maybe_rand) } else { @@ -471,9 +501,11 @@ impl PipelineBuilder { executor: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("ledger_update", &block); let (_, _, prev_epoch_end_timestamp) = parent_block_ledger_update_phase.await?; let execution_time = execute_phase.await?; - let _tracker = Tracker::new("ledger_update", &block); + + tracker.start_working(); let timestamp = block.timestamp_usecs(); let result = tokio::task::spawn_blocking(move || { executor @@ -501,10 +533,11 @@ impl PipelineBuilder { mempool_notifier: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("post_ledger_update", &block); let user_txns = prepare_fut.await?; let (compute_result, _, _) = ledger_update.await?; - let _tracker = Tracker::new("post_ledger_update", &block); + tracker.start_working(); let compute_status = compute_result.compute_status_for_input_txns(); // the length of compute_status is user_txns.len() + num_vtxns + 1 due to having blockmetadata if user_txns.len() >= compute_status.len() { @@ -551,6 +584,7 @@ impl PipelineBuilder { signer: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("sign_commit_vote", &block); let (compute_result, _, epoch_end_timestamp) = ledger_update_phase.await?; // either order_vote_rx or order_proof_fut can trigger the next phase select! { @@ -564,8 +598,8 @@ impl PipelineBuilder { return Err(anyhow!("all receivers dropped"))?; } } + tracker.start_working(); - let _tracker = Tracker::new("sign_commit_vote", &block); let mut block_info = block.gen_block_info( compute_result.root_hash(), compute_result.last_version_or_0(), @@ -601,6 +635,7 @@ impl PipelineBuilder { executor: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("pre_commit", &block); let (compute_result, _, _) = ledger_update_phase.await?; parent_block_pre_commit_phase.await?; @@ -610,7 +645,7 @@ impl PipelineBuilder { commit_proof_fut.await?; } - let _tracker = Tracker::new("pre_commit", &block); + tracker.start_working(); tokio::task::spawn_blocking(move || { executor .pre_commit_block(block.id()) @@ -630,12 +665,11 @@ impl PipelineBuilder { state_sync_notifier: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("post_pre_commit", &block); let compute_result = pre_commit.await?; parent_post_pre_commit.await?; - let _tracker = Tracker::new("post_pre_commit", &block); - let _timer = counters::OP_COUNTERS.timer("pre_commit_notify"); - + tracker.start_working(); let txns = compute_result.transactions_to_commit().to_vec(); let subscribable_events = compute_result.subscribable_events().to_vec(); if let Err(e) = monitor!( @@ -659,6 +693,7 @@ impl PipelineBuilder { executor: Arc, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("commit_ledger", &block); parent_block_commit_phase.await?; pre_commit_fut.await?; let ledger_info_with_sigs = commit_proof_fut.await?; @@ -668,7 +703,7 @@ impl PipelineBuilder { return Ok(None); } - let _tracker = Tracker::new("commit_ledger", &block); + tracker.start_working(); let ledger_info_with_sigs_clone = ledger_info_with_sigs.clone(); tokio::task::spawn_blocking(move || { executor @@ -691,12 +726,13 @@ impl PipelineBuilder { block_store_callback: Box, block: Arc, ) -> TaskResult { + let mut tracker = Tracker::start_waiting("post_commit_ledger", &block); parent_post_commit.await?; let maybe_ledger_info_with_sigs = commit_ledger_fut.await?; let compute_result = pre_commit_fut.await?; post_pre_commit_fut.await?; - let _tracker = Tracker::new("post_commit_ledger", &block); + tracker.start_working(); update_counters_for_block(&block); update_counters_for_compute_result(&compute_result);