Skip to content

Commit

Permalink
[dag] epoch manager integration; dag is here
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun authored and zekun000 committed Sep 29, 2023
1 parent 62de329 commit d01dc7d
Show file tree
Hide file tree
Showing 26 changed files with 495 additions and 205 deletions.
2 changes: 1 addition & 1 deletion consensus/consensus-types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl Display for Block {
author,
self.epoch(),
self.round(),
self.quorum_cert().certified_block().id(),
self.parent_id(),
self.timestamp_usecs(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/consensus-types/src/executed_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl ExecutedBlock {
}

pub fn parent_id(&self) -> HashValue {
self.quorum_cert().certified_block().id()
self.block.parent_id()
}

pub fn quorum_cert(&self) -> &QuorumCert {
Expand Down
54 changes: 5 additions & 49 deletions consensus/src/block_storage/block_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::{
persistent_liveness_storage::{
PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata,
},
quorum_store,
state_replication::StateComputer,
util::time_service::TimeService,
};
Expand All @@ -26,9 +25,8 @@ use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue};
use aptos_executor_types::{ExecutorError, ExecutorResult, StateComputeResult};
use aptos_infallible::RwLock;
use aptos_logger::prelude::*;
use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::TransactionStatus};
use aptos_types::ledger_info::LedgerInfoWithSignatures;
use futures::executor::block_on;
use move_core_types::vm_status::DiscardedVMStatus;
#[cfg(test)]
use std::collections::VecDeque;
#[cfg(any(test, feature = "fuzzing"))]
Expand All @@ -50,49 +48,6 @@ fn update_counters_for_ordered_blocks(ordered_blocks: &[Arc<ExecutedBlock>]) {
}
}

pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<ExecutedBlock>]) {
for block in blocks_to_commit {
observe_block(block.block().timestamp_usecs(), BlockStage::COMMITTED);
let txn_status = block.compute_result().compute_status();
counters::NUM_TXNS_PER_BLOCK.observe(txn_status.len() as f64);
counters::COMMITTED_BLOCKS_COUNT.inc();
counters::LAST_COMMITTED_ROUND.set(block.round() as i64);
counters::LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64);

let failed_rounds = block
.block()
.block_data()
.failed_authors()
.map(|v| v.len())
.unwrap_or(0);
if failed_rounds > 0 {
counters::COMMITTED_FAILED_ROUNDS_COUNT.inc_by(failed_rounds as u64);
}

// Quorum store metrics
quorum_store::counters::NUM_BATCH_PER_BLOCK.observe(block.block().payload_size() as f64);

for status in txn_status.iter() {
let commit_status = match status {
TransactionStatus::Keep(_) => counters::TXN_COMMIT_SUCCESS_LABEL,
TransactionStatus::Discard(reason) => {
if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_NEW {
counters::TXN_COMMIT_RETRY_LABEL
} else if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_OLD {
counters::TXN_COMMIT_FAILED_DUPLICATE_LABEL
} else {
counters::TXN_COMMIT_FAILED_LABEL
}
},
TransactionStatus::Retry => counters::TXN_COMMIT_RETRY_LABEL,
};
counters::COMMITTED_TXNS_COUNT
.with_label_values(&[commit_status])
.inc();
}
}
}

/// Responsible for maintaining all the blocks of payload and the dependencies of those blocks
/// (parent and previous QC links). It is expected to be accessed concurrently by multiple threads
/// and is thread-safe.
Expand Down Expand Up @@ -397,9 +352,10 @@ impl BlockStore {
}
self.time_service.wait_until(block_time).await;
}
self.payload_manager
.prefetch_payload_data(executed_block.block())
.await;
if let Some(payload) = executed_block.block().payload() {
self.payload_manager
.prefetch_payload_data(payload, executed_block.block().timestamp_usecs());
}
self.storage
.save_tree(vec![executed_block.block().clone()], vec![])
.context("Insert block failed when saving block")?;
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/block_storage/block_tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
block_storage::block_store::update_counters_for_committed_blocks,
counters,
counters::update_counters_for_committed_blocks,
logging::{LogEvent, LogSchema},
persistent_liveness_storage::PersistentLivenessStorage,
};
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ pub fn start_consensus(
timeout_sender,
consensus_to_mempool_sender,
state_computer,
storage,
storage.clone(),
quorum_store_db,
reconfig_events,
bounded_executor,
aptos_time_service::TimeService::real(),
);

let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver);
Expand Down
52 changes: 52 additions & 0 deletions consensus/src/counters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@
// Parts of the project are originally copyright © Meta Platforms, Inc.
// SPDX-License-Identifier: Apache-2.0

use crate::{
block_storage::tracing::{observe_block, BlockStage},
quorum_store,
};
use aptos_consensus_types::executed_block::ExecutedBlock;
use aptos_metrics_core::{
exponential_buckets, op_counters::DurationHistogram, register_avg_counter, register_counter,
register_gauge, register_gauge_vec, register_histogram, register_histogram_vec,
register_int_counter, register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
Counter, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
IntGaugeVec,
};
use aptos_types::transaction::TransactionStatus;
use move_core_types::vm_status::DiscardedVMStatus;
use once_cell::sync::Lazy;
use std::sync::Arc;

/// Transaction commit was successful
pub const TXN_COMMIT_SUCCESS_LABEL: &str = "success";
Expand Down Expand Up @@ -806,3 +814,47 @@ pub static BUFFER_MANAGER_PHASE_PROCESS_SECONDS: Lazy<HistogramVec> = Lazy::new(
)
.unwrap()
});

/// Update various counters for committed blocks
pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc<ExecutedBlock>]) {
for block in blocks_to_commit {
observe_block(block.block().timestamp_usecs(), BlockStage::COMMITTED);
let txn_status = block.compute_result().compute_status();
NUM_TXNS_PER_BLOCK.observe(txn_status.len() as f64);
COMMITTED_BLOCKS_COUNT.inc();
LAST_COMMITTED_ROUND.set(block.round() as i64);
LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64);

let failed_rounds = block
.block()
.block_data()
.failed_authors()
.map(|v| v.len())
.unwrap_or(0);
if failed_rounds > 0 {
COMMITTED_FAILED_ROUNDS_COUNT.inc_by(failed_rounds as u64);
}

// Quorum store metrics
quorum_store::counters::NUM_BATCH_PER_BLOCK.observe(block.block().payload_size() as f64);

for status in txn_status.iter() {
let commit_status = match status {
TransactionStatus::Keep(_) => TXN_COMMIT_SUCCESS_LABEL,
TransactionStatus::Discard(reason) => {
if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_NEW {
TXN_COMMIT_RETRY_LABEL
} else if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_OLD {
TXN_COMMIT_FAILED_DUPLICATE_LABEL
} else {
TXN_COMMIT_FAILED_LABEL
}
},
TransactionStatus::Retry => TXN_COMMIT_RETRY_LABEL,
};
COMMITTED_TXNS_COUNT
.with_label_values(&[commit_status])
.inc();
}
}
}
2 changes: 2 additions & 0 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

use crate::{
consensusdb::{CertifiedNodeSchema, ConsensusDB, DagVoteSchema, NodeSchema},
counters::update_counters_for_committed_blocks,
dag::{
storage::{CommitEvent, DAGStorage},
CertifiedNode, Node, NodeId, Vote,
Expand Down Expand Up @@ -167,6 +168,7 @@ impl OrderedNotifier for OrderedNotifierAdapter {
ledger_info_provider
.write()
.notify_commit_proof(commit_decision);
update_counters_for_committed_blocks(committed_blocks);
for executed_block in committed_blocks {
if let Some(node_digests) = executed_block.block().block_data().dag_nodes()
{
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/anchor_election.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use aptos_consensus_types::common::{Author, Round};

pub trait AnchorElection: Send {
pub trait AnchorElection: Send + Sync {
fn get_anchor(&self, round: Round) -> Author;

fn update_reputation(
Expand Down
44 changes: 33 additions & 11 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use crate::{
},
experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest,
payload_manager::PayloadManager,
state_replication::{PayloadClient, StateComputer},
};
use aptos_channels::{
Expand All @@ -43,7 +44,7 @@ use std::{sync::Arc, time::Duration};
use tokio::{select, task::JoinHandle};
use tokio_retry::strategy::ExponentialBackoff;

struct DagBootstrapper {
pub struct DagBootstrapper {
self_peer: Author,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
Expand All @@ -52,12 +53,13 @@ struct DagBootstrapper {
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
payload_manager: Arc<PayloadManager>,
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
}

impl DagBootstrapper {
fn new(
pub fn new(
self_peer: Author,
signer: Arc<ValidatorSigner>,
epoch_state: Arc<EpochState>,
Expand All @@ -66,6 +68,7 @@ impl DagBootstrapper {
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
payload_manager: Arc<PayloadManager>,
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> Self {
Expand All @@ -78,6 +81,7 @@ impl DagBootstrapper {
dag_network_sender,
proof_notifier,
time_service,
payload_manager,
payload_client,
state_computer,
}
Expand Down Expand Up @@ -152,6 +156,7 @@ impl DagBootstrapper {
self.self_peer,
self.epoch_state.clone(),
dag.clone(),
self.payload_manager.clone(),
self.payload_client.clone(),
rb,
self.time_service.clone(),
Expand Down Expand Up @@ -182,11 +187,11 @@ impl DagBootstrapper {
(dag_handler, dag_fetcher)
}

async fn bootstrapper(
pub async fn start(
self,
mut dag_rpc_rx: Receiver<Author, IncomingDAGRequest>,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
mut shutdown_rx: oneshot::Receiver<()>,
mut shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>,
) {
let sync_manager = DagStateSynchronizer::new(
self.epoch_state.clone(),
Expand All @@ -202,6 +207,12 @@ impl DagBootstrapper {
.expect("latest ledger info must exist");
let (parent_block_info, ledger_info) =
compute_initial_block_and_ledger_info(ledger_info_from_storage);
debug!(
"Starting DAG instance for epoch {} round {}",
self.epoch_state.epoch,
ledger_info.commit_info().round(),
);

let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info)));

let adapter = Arc::new(OrderedNotifierAdapter::new(
Expand Down Expand Up @@ -240,28 +251,35 @@ impl DagBootstrapper {
// poll the network handler while waiting for rebootstrap notification or shutdown notification
select! {
biased;
_ = &mut shutdown_rx => {
Ok(ack_tx) = &mut shutdown_rx => {
df_handle.abort();
let _ = df_handle.await;
if let Err(e) = ack_tx.send(()) {
error!(error = ?e, "unable to ack to shutdown signal");
}
return;
},
sync_status = handler.run(&mut dag_rpc_rx) => {
debug!("state sync notification received. {:?}", sync_status);
df_handle.abort();
let _ = df_handle.await;

match sync_status {
StateSyncStatus::NeedsSync(certified_node_msg) => {
let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round();
debug!("state sync notification received for round {}, dag round {}, ordered round {:?} commit round {} ", certified_node_msg.round(), dag_store.read().highest_round(), dag_store.read().highest_ordered_anchor_round(), highest_committed_anchor_round);
let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());

let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round();
let sync_future = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round);

select! {
Err(e) = sync_future => {
error!(error = ?e, "unable to sync");
result = sync_future => {
match result {
Ok(_) => debug!("Sync finishes"),
Err(e) => error!(error = ?e, "unable to sync"),
}
},
Ok(_) = &mut shutdown_rx => {
Ok(ack_tx) = &mut shutdown_rx => {
let _ = ack_tx.send(());
return;
}
}
Expand All @@ -270,7 +288,9 @@ impl DagBootstrapper {
},
StateSyncStatus::EpochEnds => {
// Wait for epoch manager to signal shutdown
_ = shutdown_rx.await;
if let Ok(ack_tx) = shutdown_rx.await {
let _ = ack_tx.send(());
}
return;
},
_ => unreachable!()
Expand All @@ -291,6 +311,7 @@ pub(super) fn bootstrap_dag_for_test(
dag_network_sender: Arc<dyn TDAGNetworkSender>,
proof_notifier: Arc<dyn ProofNotifier>,
time_service: aptos_time_service::TimeService,
payload_manager: Arc<PayloadManager>,
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> (
Expand All @@ -308,6 +329,7 @@ pub(super) fn bootstrap_dag_for_test(
dag_network_sender,
proof_notifier.clone(),
time_service,
payload_manager,
payload_client,
state_computer,
);
Expand Down
6 changes: 4 additions & 2 deletions consensus/src/dag/commit_signer.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
// Copyright © Aptos Foundation

use std::sync::Arc;

use crate::experimental::signing_phase::CommitSignerProvider;
use aptos_crypto::bls12381;
use aptos_types::validator_signer::ValidatorSigner;

pub struct DagCommitSigner {
signer: ValidatorSigner,
signer: Arc<ValidatorSigner>,
}

impl DagCommitSigner {
pub fn new(signer: ValidatorSigner) -> Self {
pub fn new(signer: Arc<ValidatorSigner>) -> Self {
Self { signer }
}
}
Expand Down
Loading

0 comments on commit d01dc7d

Please sign in to comment.