Skip to content

Commit

Permalink
[Pipeline] add counters
Browse files Browse the repository at this point in the history
  • Loading branch information
Zekun Li authored and Zekun Li committed Dec 6, 2024
1 parent 3c6e75b commit 767fbef
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 22 deletions.
15 changes: 13 additions & 2 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ pub static NUM_BYTES_PER_BLOCK: Lazy<Histogram> = Lazy::new(|| {
// * 0.3 to 2.0: step 0.1
// * 2.0 to 4.0: step 0.2
// * 4.0 to 7.5: step 0.5
const BLOCK_TRACING_BUCKETS: &[f64] = &[
const TRACING_BUCKETS: &[f64] = &[
0.005, 0.01, 0.025, 0.05, 0.1, 0.15, 0.2, 0.25, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0, 1.1,
1.2, 1.3, 1.4, 1.5, 1.6, 1.7, 1.8, 1.9, 2.0, 2.2, 2.4, 2.6, 2.8, 3.0, 3.2, 3.4, 3.6, 3.8, 4.0,
4.5, 5.0, 5.5, 6.0, 6.5, 7.0, 7.5, 10.0,
Expand All @@ -848,7 +848,18 @@ pub static BLOCK_TRACING: Lazy<HistogramVec> = Lazy::new(|| {
"aptos_consensus_block_tracing",
"Histogram for different stages of a block",
&["stage"],
BLOCK_TRACING_BUCKETS.to_vec()
TRACING_BUCKETS.to_vec()
)
.unwrap()
});

/// Traces pipeline stages
pub static PIPELINE_TRACING: Lazy<HistogramVec> = Lazy::new(|| {
register_histogram_vec!(
"aptos_consensus_pipeline_tracing",
"Histogram for different stages of a block's pipeline",
&["stage", "type"],
TRACING_BUCKETS.to_vec()
)
.unwrap()
});
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub use quorum_store::quorum_store_db::QUORUM_STORE_DB_NAME;
#[cfg(feature = "fuzzing")]
pub use round_manager::round_manager_fuzzing;

struct IntGaugeGuard {
pub struct IntGaugeGuard {
gauge: IntGauge,
}

Expand Down
74 changes: 55 additions & 19 deletions consensus/src/pipeline/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use crate::{
monitor,
payload_manager::TPayloadManager,
txn_notifier::TxnNotifier,
IntGaugeGuard,
};
use anyhow::anyhow;
use aptos_consensus_notifications::ConsensusNotificationSender;
Expand Down Expand Up @@ -104,18 +105,30 @@ struct Tracker {
block_id: HashValue,
epoch: u64,
round: Round,
created_at: Instant,
started_at: Option<Instant>,
running_guard: Option<IntGaugeGuard>,
}

impl Tracker {
pub fn new(name: &'static str, block: &Block) -> Self {
let ret = Self {
fn start_waiting(name: &'static str, block: &Block) -> Self {
Self {
name,
block_id: block.id(),
epoch: block.epoch(),
round: block.round(),
};
ret.log_start();
ret
created_at: Instant::now(),
started_at: None,
running_guard: None,
}
}

fn start_working(&mut self) {
self.started_at = Some(Instant::now());
self.running_guard = Some(IntGaugeGuard::new(
counters::OP_COUNTERS.gauge(&format!("{}_running", self.name)),
));
self.log_start();
}

fn log_start(&self) {
Expand All @@ -126,9 +139,25 @@ impl Tracker {
}

fn log_end(&self) {
let Some(started_at) = self.started_at else {
return;
};
let wait_time = started_at.duration_since(self.created_at);
let work_time = Instant::now().duration_since(started_at);
counters::PIPELINE_TRACING
.with_label_values(&[self.name, "wait_time"])
.observe(wait_time.as_secs_f64());
counters::PIPELINE_TRACING
.with_label_values(&[self.name, "work_time"])
.observe(work_time.as_secs_f64());
info!(
"[Pipeline] Block {} {} {} finishes {}",
self.block_id, self.epoch, self.round, self.name
"[Pipeline] Block {} {} {} finishes {}, waits {}, takes {}",
self.block_id,
self.epoch,
self.round,
self.name,
wait_time.as_millis(),
work_time.as_millis()
);
}
}
Expand All @@ -139,7 +168,6 @@ impl Drop for Tracker {
}
}

// TODO: add counters for each phase
impl PipelineBuilder {
pub fn new(
block_preparer: Arc<BlockPreparer>,
Expand Down Expand Up @@ -379,7 +407,8 @@ impl PipelineBuilder {
/// Precondition: Block is inserted into block tree (all ancestors are available)
/// What it does: Wait for all data becomes available and verify transaction signatures
async fn prepare(preparer: Arc<BlockPreparer>, block: Arc<Block>) -> TaskResult<PrepareResult> {
let _tracker = Tracker::new("prepare", &block);
let mut tracker = Tracker::start_waiting("prepare", &block);
tracker.start_working();
// the loop can only be abort by the caller
let input_txns = loop {
match preparer.prepare_block(&block).await {
Expand Down Expand Up @@ -420,13 +449,14 @@ impl PipelineBuilder {
validator: Arc<[AccountAddress]>,
onchain_execution_config: BlockExecutorConfigFromOnchain,
) -> TaskResult<ExecuteResult> {
let mut tracker = Tracker::start_waiting("execute", &block);
parent_block_execute_phase.await?;
let user_txns = prepare_phase.await?;
let maybe_rand = randomness_rx
.await
.map_err(|_| anyhow!("randomness tx cancelled"))?;

let _tracker = Tracker::new("execute", &block);
tracker.start_working();
let metadata_txn = if is_randomness_enabled {
block.new_metadata_with_randomness(&validator, maybe_rand)
} else {
Expand Down Expand Up @@ -471,9 +501,11 @@ impl PipelineBuilder {
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<LedgerUpdateResult> {
let mut tracker = Tracker::start_waiting("ledger_update", &block);
let (_, _, prev_epoch_end_timestamp) = parent_block_ledger_update_phase.await?;
let execution_time = execute_phase.await?;
let _tracker = Tracker::new("ledger_update", &block);

tracker.start_working();
let timestamp = block.timestamp_usecs();
let result = tokio::task::spawn_blocking(move || {
executor
Expand Down Expand Up @@ -501,10 +533,11 @@ impl PipelineBuilder {
mempool_notifier: Arc<dyn TxnNotifier>,
block: Arc<Block>,
) -> TaskResult<PostLedgerUpdateResult> {
let mut tracker = Tracker::start_waiting("post_ledger_update", &block);
let user_txns = prepare_fut.await?;
let (compute_result, _, _) = ledger_update.await?;

let _tracker = Tracker::new("post_ledger_update", &block);
tracker.start_working();
let compute_status = compute_result.compute_status_for_input_txns();
// the length of compute_status is user_txns.len() + num_vtxns + 1 due to having blockmetadata
if user_txns.len() >= compute_status.len() {
Expand Down Expand Up @@ -551,6 +584,7 @@ impl PipelineBuilder {
signer: Arc<ValidatorSigner>,
block: Arc<Block>,
) -> TaskResult<CommitVoteResult> {
let mut tracker = Tracker::start_waiting("sign_commit_vote", &block);
let (compute_result, _, epoch_end_timestamp) = ledger_update_phase.await?;
// either order_vote_rx or order_proof_fut can trigger the next phase
select! {
Expand All @@ -564,8 +598,8 @@ impl PipelineBuilder {
return Err(anyhow!("all receivers dropped"))?;
}
}
tracker.start_working();

let _tracker = Tracker::new("sign_commit_vote", &block);
let mut block_info = block.gen_block_info(
compute_result.root_hash(),
compute_result.last_version_or_0(),
Expand Down Expand Up @@ -601,6 +635,7 @@ impl PipelineBuilder {
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<PreCommitResult> {
let mut tracker = Tracker::start_waiting("pre_commit", &block);
let (compute_result, _, _) = ledger_update_phase.await?;
parent_block_pre_commit_phase.await?;

Expand All @@ -610,7 +645,7 @@ impl PipelineBuilder {
commit_proof_fut.await?;
}

let _tracker = Tracker::new("pre_commit", &block);
tracker.start_working();
tokio::task::spawn_blocking(move || {
executor
.pre_commit_block(block.id())
Expand All @@ -630,12 +665,11 @@ impl PipelineBuilder {
state_sync_notifier: Arc<dyn ConsensusNotificationSender>,
block: Arc<Block>,
) -> TaskResult<PostPreCommitResult> {
let mut tracker = Tracker::start_waiting("post_pre_commit", &block);
let compute_result = pre_commit.await?;
parent_post_pre_commit.await?;

let _tracker = Tracker::new("post_pre_commit", &block);
let _timer = counters::OP_COUNTERS.timer("pre_commit_notify");

tracker.start_working();
let txns = compute_result.transactions_to_commit().to_vec();
let subscribable_events = compute_result.subscribable_events().to_vec();
if let Err(e) = monitor!(
Expand All @@ -659,6 +693,7 @@ impl PipelineBuilder {
executor: Arc<dyn BlockExecutorTrait>,
block: Arc<Block>,
) -> TaskResult<CommitLedgerResult> {
let mut tracker = Tracker::start_waiting("commit_ledger", &block);
parent_block_commit_phase.await?;
pre_commit_fut.await?;
let ledger_info_with_sigs = commit_proof_fut.await?;
Expand All @@ -668,7 +703,7 @@ impl PipelineBuilder {
return Ok(None);
}

let _tracker = Tracker::new("commit_ledger", &block);
tracker.start_working();
let ledger_info_with_sigs_clone = ledger_info_with_sigs.clone();
tokio::task::spawn_blocking(move || {
executor
Expand All @@ -691,12 +726,13 @@ impl PipelineBuilder {
block_store_callback: Box<dyn FnOnce(LedgerInfoWithSignatures) + Send + Sync>,
block: Arc<Block>,
) -> TaskResult<PostCommitResult> {
let mut tracker = Tracker::start_waiting("post_commit_ledger", &block);
parent_post_commit.await?;
let maybe_ledger_info_with_sigs = commit_ledger_fut.await?;
let compute_result = pre_commit_fut.await?;
post_pre_commit_fut.await?;

let _tracker = Tracker::new("post_commit_ledger", &block);
tracker.start_working();
update_counters_for_block(&block);
update_counters_for_compute_result(&compute_result);

Expand Down

0 comments on commit 767fbef

Please sign in to comment.