diff --git a/consensus/consensus-types/src/block.rs b/consensus/consensus-types/src/block.rs index c3b5bbe7330e4b..6481822075b25a 100644 --- a/consensus/consensus-types/src/block.rs +++ b/consensus/consensus-types/src/block.rs @@ -70,7 +70,7 @@ impl Display for Block { author, self.epoch(), self.round(), - self.quorum_cert().certified_block().id(), + self.parent_id(), self.timestamp_usecs(), ) } diff --git a/consensus/consensus-types/src/executed_block.rs b/consensus/consensus-types/src/executed_block.rs index c2c71e5554d2d2..561e0e25ab9adc 100644 --- a/consensus/consensus-types/src/executed_block.rs +++ b/consensus/consensus-types/src/executed_block.rs @@ -75,7 +75,7 @@ impl ExecutedBlock { } pub fn parent_id(&self) -> HashValue { - self.quorum_cert().certified_block().id() + self.block.parent_id() } pub fn quorum_cert(&self) -> &QuorumCert { diff --git a/consensus/src/block_storage/block_store.rs b/consensus/src/block_storage/block_store.rs index c5a2cda12f6980..fdc7eecd56b658 100644 --- a/consensus/src/block_storage/block_store.rs +++ b/consensus/src/block_storage/block_store.rs @@ -13,7 +13,6 @@ use crate::{ persistent_liveness_storage::{ PersistentLivenessStorage, RecoveryData, RootInfo, RootMetadata, }, - quorum_store, state_replication::StateComputer, util::time_service::TimeService, }; @@ -26,9 +25,8 @@ use aptos_crypto::{hash::ACCUMULATOR_PLACEHOLDER_HASH, HashValue}; use aptos_executor_types::{ExecutorError, ExecutorResult, StateComputeResult}; use aptos_infallible::RwLock; use aptos_logger::prelude::*; -use aptos_types::{ledger_info::LedgerInfoWithSignatures, transaction::TransactionStatus}; +use aptos_types::ledger_info::LedgerInfoWithSignatures; use futures::executor::block_on; -use move_core_types::vm_status::DiscardedVMStatus; #[cfg(test)] use std::collections::VecDeque; #[cfg(any(test, feature = "fuzzing"))] @@ -50,49 +48,6 @@ fn update_counters_for_ordered_blocks(ordered_blocks: &[Arc]) { } } -pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc]) { - for block in blocks_to_commit { - observe_block(block.block().timestamp_usecs(), BlockStage::COMMITTED); - let txn_status = block.compute_result().compute_status(); - counters::NUM_TXNS_PER_BLOCK.observe(txn_status.len() as f64); - counters::COMMITTED_BLOCKS_COUNT.inc(); - counters::LAST_COMMITTED_ROUND.set(block.round() as i64); - counters::LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64); - - let failed_rounds = block - .block() - .block_data() - .failed_authors() - .map(|v| v.len()) - .unwrap_or(0); - if failed_rounds > 0 { - counters::COMMITTED_FAILED_ROUNDS_COUNT.inc_by(failed_rounds as u64); - } - - // Quorum store metrics - quorum_store::counters::NUM_BATCH_PER_BLOCK.observe(block.block().payload_size() as f64); - - for status in txn_status.iter() { - let commit_status = match status { - TransactionStatus::Keep(_) => counters::TXN_COMMIT_SUCCESS_LABEL, - TransactionStatus::Discard(reason) => { - if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_NEW { - counters::TXN_COMMIT_RETRY_LABEL - } else if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_OLD { - counters::TXN_COMMIT_FAILED_DUPLICATE_LABEL - } else { - counters::TXN_COMMIT_FAILED_LABEL - } - }, - TransactionStatus::Retry => counters::TXN_COMMIT_RETRY_LABEL, - }; - counters::COMMITTED_TXNS_COUNT - .with_label_values(&[commit_status]) - .inc(); - } - } -} - /// Responsible for maintaining all the blocks of payload and the dependencies of those blocks /// (parent and previous QC links). It is expected to be accessed concurrently by multiple threads /// and is thread-safe. @@ -397,9 +352,10 @@ impl BlockStore { } self.time_service.wait_until(block_time).await; } - self.payload_manager - .prefetch_payload_data(executed_block.block()) - .await; + if let Some(payload) = executed_block.block().payload() { + self.payload_manager + .prefetch_payload_data(payload, executed_block.block().timestamp_usecs()); + } self.storage .save_tree(vec![executed_block.block().clone()], vec![]) .context("Insert block failed when saving block")?; diff --git a/consensus/src/block_storage/block_tree.rs b/consensus/src/block_storage/block_tree.rs index e215e3a12c7e87..21c8090c78892b 100644 --- a/consensus/src/block_storage/block_tree.rs +++ b/consensus/src/block_storage/block_tree.rs @@ -3,8 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - block_storage::block_store::update_counters_for_committed_blocks, counters, + counters::update_counters_for_committed_blocks, logging::{LogEvent, LogSchema}, persistent_liveness_storage::PersistentLivenessStorage, }; diff --git a/consensus/src/consensus_provider.rs b/consensus/src/consensus_provider.rs index 57dc70bac3a635..c1ab625eac7eec 100644 --- a/consensus/src/consensus_provider.rs +++ b/consensus/src/consensus_provider.rs @@ -69,10 +69,11 @@ pub fn start_consensus( timeout_sender, consensus_to_mempool_sender, state_computer, - storage, + storage.clone(), quorum_store_db, reconfig_events, bounded_executor, + aptos_time_service::TimeService::real(), ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver); diff --git a/consensus/src/counters.rs b/consensus/src/counters.rs index 33b2e1a893c498..960e2d502245a1 100644 --- a/consensus/src/counters.rs +++ b/consensus/src/counters.rs @@ -2,6 +2,11 @@ // Parts of the project are originally copyright © Meta Platforms, Inc. // SPDX-License-Identifier: Apache-2.0 +use crate::{ + block_storage::tracing::{observe_block, BlockStage}, + quorum_store, +}; +use aptos_consensus_types::executed_block::ExecutedBlock; use aptos_metrics_core::{ exponential_buckets, op_counters::DurationHistogram, register_avg_counter, register_counter, register_gauge, register_gauge_vec, register_histogram, register_histogram_vec, @@ -9,7 +14,10 @@ use aptos_metrics_core::{ Counter, Gauge, GaugeVec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, }; +use aptos_types::transaction::TransactionStatus; +use move_core_types::vm_status::DiscardedVMStatus; use once_cell::sync::Lazy; +use std::sync::Arc; /// Transaction commit was successful pub const TXN_COMMIT_SUCCESS_LABEL: &str = "success"; @@ -806,3 +814,47 @@ pub static BUFFER_MANAGER_PHASE_PROCESS_SECONDS: Lazy = Lazy::new( ) .unwrap() }); + +/// Update various counters for committed blocks +pub fn update_counters_for_committed_blocks(blocks_to_commit: &[Arc]) { + for block in blocks_to_commit { + observe_block(block.block().timestamp_usecs(), BlockStage::COMMITTED); + let txn_status = block.compute_result().compute_status(); + NUM_TXNS_PER_BLOCK.observe(txn_status.len() as f64); + COMMITTED_BLOCKS_COUNT.inc(); + LAST_COMMITTED_ROUND.set(block.round() as i64); + LAST_COMMITTED_VERSION.set(block.compute_result().num_leaves() as i64); + + let failed_rounds = block + .block() + .block_data() + .failed_authors() + .map(|v| v.len()) + .unwrap_or(0); + if failed_rounds > 0 { + COMMITTED_FAILED_ROUNDS_COUNT.inc_by(failed_rounds as u64); + } + + // Quorum store metrics + quorum_store::counters::NUM_BATCH_PER_BLOCK.observe(block.block().payload_size() as f64); + + for status in txn_status.iter() { + let commit_status = match status { + TransactionStatus::Keep(_) => TXN_COMMIT_SUCCESS_LABEL, + TransactionStatus::Discard(reason) => { + if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_NEW { + TXN_COMMIT_RETRY_LABEL + } else if *reason == DiscardedVMStatus::SEQUENCE_NUMBER_TOO_OLD { + TXN_COMMIT_FAILED_DUPLICATE_LABEL + } else { + TXN_COMMIT_FAILED_LABEL + } + }, + TransactionStatus::Retry => TXN_COMMIT_RETRY_LABEL, + }; + COMMITTED_TXNS_COUNT + .with_label_values(&[commit_status]) + .inc(); + } + } +} diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 8b05490962f5c8..e802d353dc484c 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -3,6 +3,7 @@ use crate::{ consensusdb::{CertifiedNodeSchema, ConsensusDB, DagVoteSchema, NodeSchema}, + counters::update_counters_for_committed_blocks, dag::{ storage::{CommitEvent, DAGStorage}, CertifiedNode, Node, NodeId, Vote, @@ -167,6 +168,7 @@ impl OrderedNotifier for OrderedNotifierAdapter { ledger_info_provider .write() .notify_commit_proof(commit_decision); + update_counters_for_committed_blocks(committed_blocks); for executed_block in committed_blocks { if let Some(node_digests) = executed_block.block().block_data().dag_nodes() { diff --git a/consensus/src/dag/anchor_election.rs b/consensus/src/dag/anchor_election.rs index e01b44198b4bde..ecbe51d4a7c61a 100644 --- a/consensus/src/dag/anchor_election.rs +++ b/consensus/src/dag/anchor_election.rs @@ -3,7 +3,7 @@ use aptos_consensus_types::common::{Author, Round}; -pub trait AnchorElection: Send { +pub trait AnchorElection: Send + Sync { fn get_anchor(&self, round: Round) -> Author; fn update_reputation( diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 92656e093e112b..17123be624e29f 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -22,6 +22,7 @@ use crate::{ }, experimental::buffer_manager::OrderedBlocks, network::IncomingDAGRequest, + payload_manager::PayloadManager, state_replication::{PayloadClient, StateComputer}, }; use aptos_channels::{ @@ -43,7 +44,7 @@ use std::{sync::Arc, time::Duration}; use tokio::{select, task::JoinHandle}; use tokio_retry::strategy::ExponentialBackoff; -struct DagBootstrapper { +pub struct DagBootstrapper { self_peer: Author, signer: Arc, epoch_state: Arc, @@ -52,12 +53,13 @@ struct DagBootstrapper { dag_network_sender: Arc, proof_notifier: Arc, time_service: aptos_time_service::TimeService, + payload_manager: Arc, payload_client: Arc, state_computer: Arc, } impl DagBootstrapper { - fn new( + pub fn new( self_peer: Author, signer: Arc, epoch_state: Arc, @@ -66,6 +68,7 @@ impl DagBootstrapper { dag_network_sender: Arc, proof_notifier: Arc, time_service: aptos_time_service::TimeService, + payload_manager: Arc, payload_client: Arc, state_computer: Arc, ) -> Self { @@ -78,6 +81,7 @@ impl DagBootstrapper { dag_network_sender, proof_notifier, time_service, + payload_manager, payload_client, state_computer, } @@ -152,6 +156,7 @@ impl DagBootstrapper { self.self_peer, self.epoch_state.clone(), dag.clone(), + self.payload_manager.clone(), self.payload_client.clone(), rb, self.time_service.clone(), @@ -182,11 +187,11 @@ impl DagBootstrapper { (dag_handler, dag_fetcher) } - async fn bootstrapper( + pub async fn start( self, mut dag_rpc_rx: Receiver, ordered_nodes_tx: UnboundedSender, - mut shutdown_rx: oneshot::Receiver<()>, + mut shutdown_rx: oneshot::Receiver>, ) { let sync_manager = DagStateSynchronizer::new( self.epoch_state.clone(), @@ -202,6 +207,12 @@ impl DagBootstrapper { .expect("latest ledger info must exist"); let (parent_block_info, ledger_info) = compute_initial_block_and_ledger_info(ledger_info_from_storage); + debug!( + "Starting DAG instance for epoch {} round {}", + self.epoch_state.epoch, + ledger_info.commit_info().round(), + ); + let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info))); let adapter = Arc::new(OrderedNotifierAdapter::new( @@ -240,28 +251,35 @@ impl DagBootstrapper { // poll the network handler while waiting for rebootstrap notification or shutdown notification select! { biased; - _ = &mut shutdown_rx => { + Ok(ack_tx) = &mut shutdown_rx => { df_handle.abort(); let _ = df_handle.await; + if let Err(e) = ack_tx.send(()) { + error!(error = ?e, "unable to ack to shutdown signal"); + } return; }, sync_status = handler.run(&mut dag_rpc_rx) => { - debug!("state sync notification received. {:?}", sync_status); df_handle.abort(); let _ = df_handle.await; match sync_status { StateSyncStatus::NeedsSync(certified_node_msg) => { + let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round(); + debug!("state sync notification received for round {}, dag round {}, ordered round {:?} commit round {} ", certified_node_msg.round(), dag_store.read().highest_round(), dag_store.read().highest_ordered_anchor_round(), highest_committed_anchor_round); let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); - let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round(); let sync_future = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round); select! { - Err(e) = sync_future => { - error!(error = ?e, "unable to sync"); + result = sync_future => { + match result { + Ok(_) => debug!("Sync finishes"), + Err(e) => error!(error = ?e, "unable to sync"), + } }, - Ok(_) = &mut shutdown_rx => { + Ok(ack_tx) = &mut shutdown_rx => { + let _ = ack_tx.send(()); return; } } @@ -270,7 +288,9 @@ impl DagBootstrapper { }, StateSyncStatus::EpochEnds => { // Wait for epoch manager to signal shutdown - _ = shutdown_rx.await; + if let Ok(ack_tx) = shutdown_rx.await { + let _ = ack_tx.send(()); + } return; }, _ => unreachable!() @@ -291,6 +311,7 @@ pub(super) fn bootstrap_dag_for_test( dag_network_sender: Arc, proof_notifier: Arc, time_service: aptos_time_service::TimeService, + payload_manager: Arc, payload_client: Arc, state_computer: Arc, ) -> ( @@ -308,6 +329,7 @@ pub(super) fn bootstrap_dag_for_test( dag_network_sender, proof_notifier.clone(), time_service, + payload_manager, payload_client, state_computer, ); diff --git a/consensus/src/dag/commit_signer.rs b/consensus/src/dag/commit_signer.rs index 54e978e5e41faf..df992b0678e7cf 100644 --- a/consensus/src/dag/commit_signer.rs +++ b/consensus/src/dag/commit_signer.rs @@ -1,15 +1,17 @@ // Copyright © Aptos Foundation +use std::sync::Arc; + use crate::experimental::signing_phase::CommitSignerProvider; use aptos_crypto::bls12381; use aptos_types::validator_signer::ValidatorSigner; pub struct DagCommitSigner { - signer: ValidatorSigner, + signer: Arc, } impl DagCommitSigner { - pub fn new(signer: ValidatorSigner) -> Self { + pub fn new(signer: Arc) -> Self { Self { signer } } } diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index 294c43e65e57a7..ee6ac6fab44d83 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -14,8 +14,9 @@ use crate::{ dag_fetcher::TFetchRequester, dag_state_sync::DAG_WINDOW, dag_store::Dag, - types::{CertificateAckState, CertifiedNode, Node, NodeCertificate, SignatureBuilder}, + types::{CertificateAckState, CertifiedNode, Node, SignatureBuilder}, }, + payload_manager::PayloadManager, state_replication::PayloadClient, }; use anyhow::bail; @@ -45,6 +46,7 @@ pub(crate) struct DagDriver { author: Author, epoch_state: Arc, dag: Arc>, + payload_manager: Arc, payload_client: Arc, reliable_broadcast: Arc>, current_round: Round, @@ -61,6 +63,7 @@ impl DagDriver { author: Author, epoch_state: Arc, dag: Arc>, + payload_manager: Arc, payload_client: Arc, reliable_broadcast: Arc>, time_service: TimeService, @@ -73,23 +76,24 @@ impl DagDriver { .get_pending_node() .expect("should be able to read dag storage"); let highest_round = dag.read().highest_round(); - let current_round = dag + let highest_strong_links_round = dag .read() .get_strong_links_for_round(highest_round, &epoch_state.verifier) .map_or_else(|| highest_round.saturating_sub(1), |_| highest_round); debug!( "highest_round: {}, current_round: {}", - highest_round, current_round + highest_round, highest_strong_links_round ); let mut driver = Self { author, epoch_state, dag, + payload_manager, payload_client, reliable_broadcast, - current_round, + current_round: highest_strong_links_round, time_service, rb_abort_handle: None, storage, @@ -99,25 +103,21 @@ impl DagDriver { }; // If we were broadcasting the node for the round already, resume it - if let Some(node) = pending_node.filter(|node| node.round() == current_round + 1) { + if let Some(node) = + pending_node.filter(|node| node.round() == highest_strong_links_round + 1) + { driver.current_round = node.round(); driver.broadcast_node(node); } else { // kick start a new round - let strong_links = driver - .dag - .read() - .get_strong_links_for_round(current_round, &driver.epoch_state.verifier) - .unwrap_or(vec![]); - block_on(driver.enter_new_round(current_round + 1, strong_links)); + block_on(driver.enter_new_round(highest_strong_links_round + 1)); } driver } pub async fn add_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> { - let maybe_strong_links = { + let highest_strong_links_round = { let mut dag_writer = self.dag.write(); - let round = node.metadata().round(); if !dag_writer.all_exists(node.parents_metadata()) { if let Err(err) = self.fetch_requester.request_for_certified_node(node) { @@ -126,24 +126,32 @@ impl DagDriver { bail!(DagDriverError::MissingParents); } + self.payload_manager + .prefetch_payload_data(node.payload(), node.metadata().timestamp()); dag_writer.add_node(node)?; - if self.current_round == round { - dag_writer - .get_strong_links_for_round(self.current_round, &self.epoch_state.verifier) - } else { - None - } + + let highest_round = dag_writer.highest_round(); + dag_writer + .get_strong_links_for_round(highest_round, &self.epoch_state.verifier) + .map_or_else(|| highest_round.saturating_sub(1), |_| highest_round) }; - if let Some(strong_links) = maybe_strong_links { - self.enter_new_round(self.current_round + 1, strong_links) - .await; + if self.current_round <= highest_strong_links_round { + self.enter_new_round(highest_strong_links_round + 1).await; } Ok(()) } - pub async fn enter_new_round(&mut self, new_round: Round, strong_links: Vec) { + pub async fn enter_new_round(&mut self, new_round: Round) { debug!("entering new round {}", new_round); + let strong_links = self + .dag + .read() + .get_strong_links_for_round(new_round - 1, &self.epoch_state.verifier) + .unwrap_or_else(|| { + assert_eq!(new_round, 1, "Only expect empty strong links for round 1"); + vec![] + }); let payload_filter = { let dag_reader = self.dag.read(); let highest_commit_round = self @@ -168,8 +176,8 @@ impl DagDriver { .payload_client .pull_payload( Duration::from_secs(1), - 100, 1000, + 10 * 1024 * 1024, payload_filter, Box::pin(async {}), false, @@ -180,8 +188,8 @@ impl DagDriver { { Ok(payload) => payload, Err(e) => { - error!("error pulling payload: {}", e); - return; + // TODO: return empty payload instead + panic!("error pulling payload: {}", e); }, }; // TODO: need to wait to pass median of parents timestamp @@ -209,7 +217,8 @@ impl DagDriver { SignatureBuilder::new(node.metadata().clone(), self.epoch_state.clone()); let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len()); let latest_ledger_info = self.ledger_info_provider.get_latest_ledger_info(); - let task = self + let round = node.round(); + let core_task = self .reliable_broadcast .broadcast(node.clone(), signature_builder) .then(move |certificate| { @@ -218,6 +227,11 @@ impl DagDriver { CertifiedNodeMessage::new(certified_node, latest_ledger_info); rb.broadcast(certified_node_msg, cert_ack_set) }); + let task = async move { + debug!("Start reliable broadcast for round {}", round); + core_task.await; + debug!("Finish reliable broadcast for round {}", round); + }; tokio::spawn(Abortable::new(task, abort_registration)); if let Some(prev_handle) = self.rb_abort_handle.replace(abort_handle) { prev_handle.abort(); diff --git a/consensus/src/dag/dag_fetcher.rs b/consensus/src/dag/dag_fetcher.rs index 30c9bdff407ae4..9979c2b842825e 100644 --- a/consensus/src/dag/dag_fetcher.rs +++ b/consensus/src/dag/dag_fetcher.rs @@ -10,7 +10,7 @@ use crate::dag::{ use anyhow::{anyhow, ensure}; use aptos_consensus_types::common::Author; use aptos_infallible::RwLock; -use aptos_logger::error; +use aptos_logger::{debug, error}; use aptos_time_service::TimeService; use aptos_types::epoch_state::EpochState; use async_trait::async_trait; @@ -187,6 +187,12 @@ impl DagFetcherService { ) -> anyhow::Result<()> { let remote_request = { let dag_reader = self.dag.read(); + ensure!( + node.round() > dag_reader.lowest_incomplete_round(), + "Already synced beyond requested round {}, lowest incomplete round {}", + node.round(), + dag_reader.lowest_incomplete_round() + ); let missing_parents: Vec = dag_reader .filter_missing(node.parents_metadata()) @@ -247,6 +253,7 @@ impl TDagFetcher for DagFetcher { responders: Vec, dag: Arc>, ) -> anyhow::Result<()> { + debug!("Start fetch request: {:?}", remote_request); let mut rpc = RpcWithFallback::new( responders, remote_request.clone().into(), diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 47124a3a9a8acf..6851a20d488f03 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -11,7 +11,7 @@ use crate::{ dag::{dag_network::RpcHandler, rb_handler::NodeBroadcastHandler, types::DAGMessage}, network::{IncomingDAGRequest, TConsensusMsg}, }; -use anyhow::bail; +use anyhow::ensure; use aptos_channels::aptos_channel; use aptos_consensus_types::common::Author; use aptos_logger::{debug, warn}; @@ -60,7 +60,7 @@ impl NetworkHandler { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { - Some(msg) = dag_rpc_rx.next() => { + msg = dag_rpc_rx.select_next_some() => { match self.process_rpc(msg).await { Ok(sync_status) => { if matches!(sync_status, StateSyncStatus::NeedsSync(_) | StateSyncStatus::EpochEnds) { @@ -95,16 +95,31 @@ impl NetworkHandler { } } - fn verify_incoming_rpc(&self, dag_message: &DAGMessage) -> Result<(), anyhow::Error> { + fn verify_incoming_rpc( + &self, + dag_message: &DAGMessage, + sender: Author, + ) -> Result<(), anyhow::Error> { match dag_message { - DAGMessage::NodeMsg(node) => node.verify(&self.epoch_state.verifier), + DAGMessage::NodeMsg(node) => { + ensure!( + *node.author() == sender, + "Message author mismatch network sender" + ); + node.verify(&self.epoch_state.verifier) + }, DAGMessage::CertifiedNodeMsg(certified_node) => { + ensure!( + *certified_node.author() == sender, + "Message author mismatch network sender" + ); certified_node.verify(&self.epoch_state.verifier) }, DAGMessage::FetchRequest(request) => request.verify(&self.epoch_state.verifier), _ => Err(anyhow::anyhow!( - "unexpected rpc message{:?}", - std::mem::discriminant(dag_message) + "unexpected rpc message {} from {}", + dag_message.name(), + sender )), } } @@ -115,15 +130,14 @@ impl NetworkHandler { ) -> anyhow::Result { let dag_message: DAGMessage = rpc_request.req.try_into()?; - let author = dag_message - .author() - .map_err(|_| anyhow::anyhow!("unexpected rpc message {:?}", dag_message))?; - if author != rpc_request.sender { - bail!("message author and network author mismatch"); - } + debug!( + "processing rpc message {} from {}", + dag_message.name(), + rpc_request.sender + ); let response: anyhow::Result = { - let verification_result = self.verify_incoming_rpc(&dag_message); + let verification_result = self.verify_incoming_rpc(&dag_message, rpc_request.sender); match verification_result { Ok(_) => match dag_message { DAGMessage::NodeMsg(node) => { @@ -150,7 +164,10 @@ impl NetworkHandler { } }; - debug!("responding to rpc: {:?}", response); + debug!( + "responding to process_rpc {:?}", + response.as_ref().and_then(|r| Ok(r.name())) + ); let response = response .and_then(|response_msg| { diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 3fdab3faa70735..bdeaf3c351e9c4 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -9,7 +9,7 @@ use crate::dag::{ use anyhow::{anyhow, ensure}; use aptos_consensus_types::common::{Author, Round}; use aptos_crypto::HashValue; -use aptos_logger::error; +use aptos_logger::{debug, error}; use aptos_types::{epoch_state::EpochState, validator_verifier::ValidatorVerifier}; use std::{ collections::{BTreeMap, HashMap, HashSet}, @@ -59,12 +59,14 @@ impl Dag { let mut expired = vec![]; let mut nodes_by_round = BTreeMap::new(); for (digest, certified_node) in all_nodes { - if certified_node.metadata().epoch() == epoch { + if certified_node.metadata().epoch() == epoch && certified_node.round() >= initial_round + { let arc_node = Arc::new(certified_node); let index = *author_to_index .get(arc_node.metadata().author()) .expect("Author from certified node should exist"); let round = arc_node.metadata().round(); + debug!("Recovered node {} from storage", arc_node.id()); nodes_by_round .entry(round) .or_insert_with(|| vec![None; num_validators])[index] = @@ -140,6 +142,7 @@ impl Dag { // mutate after all checks pass self.storage.save_certified_node(&node)?; + debug!("Added node {}", node.id()); round_ref[index] = Some(NodeStatus::Unordered(node.clone())); Ok(()) } @@ -320,7 +323,7 @@ impl Dag { let bitmask = self .nodes_by_round - .range(lowest_round..target_round) + .range(lowest_round..=target_round) .map(|(_, round_nodes)| round_nodes.iter().map(|node| node.is_some()).collect()) .collect(); diff --git a/consensus/src/dag/mod.rs b/consensus/src/dag/mod.rs index 17b4d7c7511423..8e223064d002a0 100644 --- a/consensus/src/dag/mod.rs +++ b/consensus/src/dag/mod.rs @@ -19,6 +19,9 @@ mod storage; mod tests; mod types; -pub use adapter::ProofNotifier; +pub use adapter::{ProofNotifier, StorageAdapter}; +pub use bootstrap::DagBootstrapper; +pub use commit_signer::DagCommitSigner; pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender}; +pub use storage::DAGStorage; pub use types::{CertifiedNode, DAGMessage, DAGNetworkMessage, Extensions, Node, NodeId, Vote}; diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index f5822caa86c76e..4a6aff99973060 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -12,7 +12,7 @@ use crate::dag::{ }; use aptos_consensus_types::common::Round; use aptos_infallible::RwLock; -use aptos_logger::error; +use aptos_logger::{debug, error}; use aptos_types::{epoch_state::EpochState, ledger_info::LedgerInfo}; use std::sync::Arc; @@ -194,6 +194,12 @@ impl OrderRule { }) .collect(); ordered_nodes.reverse(); + debug!( + "Ordered anchor {}, reached round {} with {} nodes", + anchor.id(), + lowest_round_to_reach, + ordered_nodes.len() + ); self.lowest_unordered_anchor_round = anchor.round() + 1; if let Err(e) = self diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 5b489b404889f4..58c3b0211c40ee 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -16,6 +16,7 @@ use crate::{ types::{CertifiedAck, DAGMessage}, RpcHandler, }, + payload_manager::PayloadManager, test_utils::MockPayloadManager, }; use aptos_consensus_types::common::{Author, Round}; @@ -139,6 +140,7 @@ async fn test_certified_node_handler() { signers[0].author(), epoch_state, dag, + Arc::new(PayloadManager::DirectMempool), Arc::new(MockPayloadManager::new(None)), rb, time_service, diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index 54ad393de5c3f9..c5c1a0db5321c4 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -7,6 +7,7 @@ use crate::{ network::{IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, network_tests::{NetworkPlayground, TwinId}, + payload_manager::PayloadManager, test_utils::{consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage}, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; @@ -71,6 +72,7 @@ impl DagBootstrapUnit { let network = Arc::new(network); let payload_client = Arc::new(MockPayloadManager::new(None)); + let payload_manager = Arc::new(PayloadManager::DirectMempool); let state_computer = Arc::new(EmptyStateComputer {}); @@ -85,6 +87,7 @@ impl DagBootstrapUnit { network.clone(), network.clone(), time_service, + payload_manager, payload_client, state_computer, ); diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index b5dbac7afefeff..3301b6ab690c18 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -21,7 +21,13 @@ use aptos_types::{ validator_verifier::ValidatorVerifier, }; use serde::{Deserialize, Serialize}; -use std::{cmp::min, collections::HashSet, ops::Deref, sync::Arc}; +use std::{ + cmp::min, + collections::HashSet, + fmt::{Display, Formatter}, + ops::Deref, + sync::Arc, +}; pub trait TDAGMessage: Into + TryFrom { fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()>; @@ -349,6 +355,16 @@ impl NodeId { } } +impl Display for NodeId { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!( + f, + "NodeId: [epoch: {}, round: {}, author: {}]", + self.epoch, self.round, self.author + ) + } +} + /// Quorum signatures over the node digest #[derive(Clone, Serialize, Deserialize, Debug, PartialEq)] pub struct NodeCertificate { diff --git a/consensus/src/epoch_manager.rs b/consensus/src/epoch_manager.rs index a73664a3d26378..132599d3dba682 100644 --- a/consensus/src/epoch_manager.rs +++ b/consensus/src/epoch_manager.rs @@ -8,11 +8,12 @@ use crate::{ BlockStore, }, counters, + dag::{DagBootstrapper, DagCommitSigner, StorageAdapter}, error::{error_kind, DbError}, experimental::{ buffer_manager::{OrderedBlocks, ResetRequest}, decoupled_execution_utils::prepare_phases_and_buffer_manager, - ordering_state_computer::OrderingStateComputer, + ordering_state_computer::{DagStateSyncComputer, OrderingStateComputer}, signing_phase::CommitSignerProvider, }, liveness::{ @@ -34,7 +35,7 @@ use crate::{ monitor, network::{ IncomingBatchRetrievalRequest, IncomingBlockRetrievalRequest, IncomingCommitRequest, - IncomingRpcRequest, NetworkReceivers, NetworkSender, + IncomingDAGRequest, IncomingRpcRequest, NetworkReceivers, NetworkSender, }, network_interface::{ConsensusMsg, ConsensusNetworkClient}, payload_client::QuorumStoreClient, @@ -47,7 +48,7 @@ use crate::{ }, recovery_manager::RecoveryManager, round_manager::{RoundManager, UnverifiedEvent, VerifiedEvent}, - state_replication::StateComputer, + state_replication::{PayloadClient, StateComputer}, transaction_deduper::create_transaction_deduper, transaction_shuffler::create_transaction_shuffler, util::time_service::TimeService, @@ -147,6 +148,10 @@ pub struct EpochManager { bounded_executor: BoundedExecutor, // recovery_mode is set to true when the recovery manager is spawned recovery_mode: bool, + + aptos_time_service: aptos_time_service::TimeService, + dag_rpc_tx: Option>, + dag_shutdown_tx: Option>>, } impl EpochManager

{ @@ -162,6 +167,7 @@ impl EpochManager

{ quorum_store_storage: Arc, reconfig_events: ReconfigNotificationListener

, bounded_executor: BoundedExecutor, + aptos_time_service: aptos_time_service::TimeService, ) -> Self { let author = node_config.validator_network.as_ref().unwrap().peer_id(); let config = node_config.consensus.clone(); @@ -194,6 +200,9 @@ impl EpochManager

{ batch_retrieval_tx: None, bounded_executor, recovery_mode: false, + dag_rpc_tx: None, + dag_shutdown_tx: None, + aptos_time_service, } } @@ -290,32 +299,12 @@ impl EpochManager

{ vec![1; proposers.len()] }; - // Genesis is epoch=0 - // First block (after genesis) is epoch=1, and is the only block in that epoch. - // It has no votes, so we skip it unless we are in epoch 1, as otherwise it will - // skew leader elections for exclude_round number of rounds. - let first_epoch_to_consider = std::cmp::max( - if epoch_state.epoch == 1 { 1 } else { 2 }, - epoch_state - .epoch - .saturating_sub(use_history_from_previous_epoch_max_count as u64), + let epoch_to_proposers = self.extract_epoch_proposers( + epoch_state, + use_history_from_previous_epoch_max_count, + proposers, + (window_size + seek_len) as u64, ); - // If we are considering beyond the current epoch, we need to fetch validators for those epochs - let epoch_to_proposers = if epoch_state.epoch > first_epoch_to_consider { - self.storage - .aptos_db() - .get_epoch_ending_ledger_infos(first_epoch_to_consider - 1, epoch_state.epoch) - .and_then(|proof| { - ensure!(proof.ledger_info_with_sigs.len() as u64 == (epoch_state.epoch - (first_epoch_to_consider - 1))); - extract_epoch_to_proposers(proof, epoch_state.epoch, &proposers, (window_size + seek_len) as u64) - }) - .unwrap_or_else(|err| { - error!("Couldn't create leader reputation with history across epochs, {:?}", err); - HashMap::from([(epoch_state.epoch, proposers)]) - }) - } else { - HashMap::from([(epoch_state.epoch, proposers)]) - }; info!( "Starting epoch {}: proposers across epochs for leader election: {:?}", @@ -356,6 +345,48 @@ impl EpochManager

{ } } + fn extract_epoch_proposers( + &self, + epoch_state: &EpochState, + use_history_from_previous_epoch_max_count: u32, + proposers: Vec, + needed_rounds: u64, + ) -> HashMap> { + // Genesis is epoch=0 + // First block (after genesis) is epoch=1, and is the only block in that epoch. + // It has no votes, so we skip it unless we are in epoch 1, as otherwise it will + // skew leader elections for exclude_round number of rounds. + let first_epoch_to_consider = std::cmp::max( + if epoch_state.epoch == 1 { 1 } else { 2 }, + epoch_state + .epoch + .saturating_sub(use_history_from_previous_epoch_max_count as u64), + ); + // If we are considering beyond the current epoch, we need to fetch validators for those epochs + let epoch_to_proposers = if epoch_state.epoch > first_epoch_to_consider { + self.storage + .aptos_db() + .get_epoch_ending_ledger_infos(first_epoch_to_consider - 1, epoch_state.epoch) + .and_then(|proof| { + ensure!( + proof.ledger_info_with_sigs.len() as u64 + == (epoch_state.epoch - (first_epoch_to_consider - 1)) + ); + extract_epoch_to_proposers(proof, epoch_state.epoch, &proposers, needed_rounds) + }) + .unwrap_or_else(|err| { + error!( + "Couldn't create leader reputation with history across epochs, {:?}", + err + ); + HashMap::from([(epoch_state.epoch, proposers)]) + }) + } else { + HashMap::from([(epoch_state.epoch, proposers)]) + }; + epoch_to_proposers + } + fn process_epoch_retrieval( &mut self, request: EpochRetrievalRequest, @@ -493,7 +524,10 @@ impl EpochManager

{ &mut self, commit_signer_provider: Arc, verifier: ValidatorVerifier, - ) -> OrderingStateComputer { + ) -> ( + UnboundedSender, + UnboundedSender, + ) { let network_sender = NetworkSender::new( self.author, self.network_sender.clone(), @@ -538,7 +572,7 @@ impl EpochManager

{ tokio::spawn(persisting_phase.start()); tokio::spawn(buffer_manager.start()); - OrderingStateComputer::new(block_tx, self.commit_state_computer.clone(), reset_tx) + (block_tx, reset_tx) } async fn shutdown_current_processor(&mut self) { @@ -554,6 +588,18 @@ impl EpochManager

{ } self.round_manager_tx = None; + if let Some(close_tx) = self.dag_shutdown_tx.take() { + // Release the previous RoundManager, especially the SafetyRule client + let (ack_tx, ack_rx) = oneshot::channel(); + close_tx + .send(ack_tx) + .expect("[EpochManager] Fail to drop DAG bootstrapper"); + ack_rx + .await + .expect("[EpochManager] Fail to drop DAG bootstrapper"); + } + self.dag_shutdown_tx = None; + // Shutdown the previous buffer manager, to release the SafetyRule client self.buffer_manager_msg_tx = None; if let Some(mut tx) = self.buffer_manager_reset_tx.take() { @@ -589,13 +635,8 @@ impl EpochManager

{ &mut self, ledger_data: LedgerRecoveryData, epoch_state: EpochState, + network_sender: NetworkSender, ) { - let network_sender = NetworkSender::new( - self.author, - self.network_sender.clone(), - self.self_sender.clone(), - epoch_state.verifier.clone(), - ); let (recovery_manager_tx, recovery_manager_rx) = aptos_channel::new( QueueStyle::LIFO, 1, @@ -660,14 +701,12 @@ impl EpochManager

{ (payload_manager, payload_client, quorum_store_builder) } - fn init_state_computer( + fn init_commit_state_computer( &mut self, epoch_state: &EpochState, payload_manager: Arc, - onchain_consensus_config: &OnChainConsensusConfig, onchain_execution_config: &OnChainExecutionConfig, - commit_signer_provider: Arc, - ) -> Arc { + ) { let transaction_shuffler = create_transaction_shuffler(onchain_execution_config.transaction_shuffler_type()); let block_gas_limit = onchain_execution_config.block_gas_limit(); @@ -680,14 +719,22 @@ impl EpochManager

{ block_gas_limit, transaction_deduper, ); + } + fn init_ordering_state_computer( + &mut self, + epoch_state: &EpochState, + onchain_consensus_config: &OnChainConsensusConfig, + commit_signer_provider: Arc, + ) -> Arc { if onchain_consensus_config.decoupled_execution() { - Arc::new( - self.spawn_decoupled_execution( - commit_signer_provider, - epoch_state.verifier.clone(), - ), - ) + let (block_tx, reset_tx) = self + .spawn_decoupled_execution(commit_signer_provider, epoch_state.verifier.clone()); + Arc::new(OrderingStateComputer::new( + block_tx, + self.commit_state_computer.clone(), + reset_tx, + )) } else { self.commit_state_computer.clone() } @@ -719,7 +766,9 @@ impl EpochManager

{ recovery_data: RecoveryData, epoch_state: EpochState, onchain_consensus_config: OnChainConsensusConfig, - onchain_execution_config: OnChainExecutionConfig, + network_sender: NetworkSender, + payload_client: Arc, + payload_manager: Arc, ) { let epoch = epoch_state.epoch; info!( @@ -748,7 +797,6 @@ impl EpochManager

{ info!(epoch = epoch, "Create ProposerElection"); let proposer_election = self.create_proposer_election(&epoch_state, &onchain_consensus_config); - let network_sender = self.init_network_sender(&epoch_state); let chain_health_backoff_config = ChainHealthBackoffConfig::new(self.config.chain_health_backoff.clone()); let pipeline_backpressure_config = @@ -756,14 +804,9 @@ impl EpochManager

{ let safety_rules_container = Arc::new(Mutex::new(safety_rules)); - let (payload_manager, payload_client, quorum_store_builder) = self - .init_payload_provider(&epoch_state, network_sender.clone()) - .await; - let state_computer = self.init_state_computer( + let state_computer = self.init_ordering_state_computer( &epoch_state, - payload_manager.clone(), &onchain_consensus_config, - &onchain_execution_config, safety_rules_container.clone(), ); @@ -777,23 +820,16 @@ impl EpochManager

{ self.config.max_pruned_blocks_in_mem, Arc::clone(&self.time_service), self.config.vote_back_pressure_limit, - payload_manager.clone(), + payload_manager, )); - if let Some((quorum_store_coordinator_tx, batch_retrieval_rx)) = - quorum_store_builder.start() - { - self.quorum_store_coordinator_tx = Some(quorum_store_coordinator_tx); - self.batch_retrieval_tx = Some(batch_retrieval_rx); - } - info!(epoch = epoch, "Create ProposalGenerator"); // txn manager is required both by proposal generator (to pull the proposers) // and by event processor (to update their status). let proposal_generator = ProposalGenerator::new( self.author, block_store.clone(), - Arc::new(payload_client), + payload_client, self.time_service.clone(), Duration::from_millis(self.config.quorum_store_poll_time_ms), self.config @@ -849,8 +885,6 @@ impl EpochManager

{ } }); - self.set_epoch_start_metrics(&epoch_state); - let mut round_manager = RoundManager::new( epoch_state, block_store.clone(), @@ -874,7 +908,16 @@ impl EpochManager

{ self.spawn_block_retrieval_task(epoch, block_store); } - fn init_network_sender(&self, epoch_state: &EpochState) -> NetworkSender { + fn start_quorum_store(&mut self, quorum_store_builder: QuorumStoreBuilder) { + if let Some((quorum_store_coordinator_tx, batch_retrieval_rx)) = + quorum_store_builder.start() + { + self.quorum_store_coordinator_tx = Some(quorum_store_coordinator_tx); + self.batch_retrieval_tx = Some(batch_retrieval_rx); + } + } + + fn create_network_sender(&mut self, epoch_state: &EpochState) -> NetworkSender { NetworkSender::new( self.author, self.network_sender.clone(), @@ -907,35 +950,141 @@ impl EpochManager

{ let consensus_config = onchain_consensus_config.unwrap_or_default(); let execution_config = onchain_execution_config .unwrap_or_else(|_| OnChainExecutionConfig::default_if_missing()); - self.start_new_epoch_with_joltean(epoch_state, consensus_config, execution_config) + let (network_sender, payload_client, payload_manager) = self + .initialize_shared_component(&epoch_state, &consensus_config, &execution_config) + .await; + + if consensus_config.is_dag_enabled() { + self.start_new_epoch_with_dag( + epoch_state, + consensus_config, + network_sender, + payload_client, + payload_manager, + ) .await + } else { + self.start_new_epoch_with_joltean( + epoch_state, + consensus_config, + network_sender, + payload_client, + payload_manager, + ) + .await + } + } + + async fn initialize_shared_component( + &mut self, + epoch_state: &EpochState, + consensus_config: &OnChainConsensusConfig, + execution_config: &OnChainExecutionConfig, + ) -> (NetworkSender, Arc, Arc) { + self.set_epoch_start_metrics(&epoch_state); + self.quorum_store_enabled = self.enable_quorum_store(consensus_config); + let network_sender = self.create_network_sender(epoch_state); + let (payload_manager, payload_client, quorum_store_builder) = self + .init_payload_provider(epoch_state, network_sender.clone()) + .await; + + self.init_commit_state_computer(epoch_state, payload_manager.clone(), execution_config); + self.start_quorum_store(quorum_store_builder); + (network_sender, Arc::new(payload_client), payload_manager) } async fn start_new_epoch_with_joltean( &mut self, epoch_state: EpochState, consensus_config: OnChainConsensusConfig, - execution_config: OnChainExecutionConfig, + network_sender: NetworkSender, + payload_client: Arc, + payload_manager: Arc, ) { match self.storage.start() { LivenessStorageData::FullRecoveryData(initial_data) => { - self.quorum_store_enabled = self.enable_quorum_store(&consensus_config); self.recovery_mode = false; self.start_round_manager( initial_data, epoch_state, consensus_config, - execution_config, + network_sender, + payload_client, + payload_manager, ) .await }, LivenessStorageData::PartialRecoveryData(ledger_data) => { self.recovery_mode = true; - self.start_recovery_manager(ledger_data, epoch_state).await + self.start_recovery_manager(ledger_data, epoch_state, network_sender) + .await }, } } + async fn start_new_epoch_with_dag( + &mut self, + epoch_state: EpochState, + onchain_consensus_config: OnChainConsensusConfig, + network_sender: NetworkSender, + payload_client: Arc, + payload_manager: Arc, + ) { + let epoch = epoch_state.epoch; + + let signer = new_signer_from_storage(self.author, &self.config.safety_rules.backend); + let commit_signer = Arc::new(DagCommitSigner::new(signer)); + + assert!( + onchain_consensus_config.decoupled_execution(), + "decoupled execution must be enabled" + ); + let (block_tx, reset_tx) = + self.spawn_decoupled_execution(commit_signer, epoch_state.verifier.clone()); + let state_computer = Arc::new(DagStateSyncComputer::new( + self.commit_state_computer.clone(), + reset_tx, + )); + + let onchain_dag_consensus_config = onchain_consensus_config.unwrap_dag_config_v1(); + let epoch_to_validators = self.extract_epoch_proposers( + &epoch_state, + onchain_dag_consensus_config.dag_ordering_causal_history_window as u32, + epoch_state.verifier.get_ordered_account_addresses(), + onchain_dag_consensus_config.dag_ordering_causal_history_window as u64, + ); + let dag_storage = Arc::new(StorageAdapter::new( + epoch, + epoch_to_validators, + self.storage.consensus_db(), + self.storage.aptos_db(), + )); + + let signer = new_signer_from_storage(self.author, &self.config.safety_rules.backend); + let network_sender_arc = Arc::new(network_sender); + + let bootstrapper = DagBootstrapper::new( + self.author, + signer, + Arc::new(epoch_state), + dag_storage, + network_sender_arc.clone(), + network_sender_arc.clone(), + network_sender_arc, + self.aptos_time_service.clone(), + payload_manager, + payload_client, + state_computer, + ); + + let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 10, None); + self.dag_rpc_tx = Some(dag_rpc_tx); + let (dag_shutdown_tx, dag_shutdown_rx) = oneshot::channel(); + self.dag_shutdown_tx = Some(dag_shutdown_tx); + + tokio::spawn(bootstrapper.start(dag_rpc_rx, block_tx, dag_shutdown_rx)); + } + fn enable_quorum_store(&mut self, onchain_config: &OnChainConsensusConfig) -> bool { fail_point!("consensus::start_new_epoch::disable_qs", |_| false); onchain_config.quorum_store_enabled() @@ -1167,15 +1316,18 @@ impl EpochManager

{ } }, IncomingRpcRequest::DAGRequest(request) => { - let dag_message = request.req; + let dag_msg_epoch = request.req.epoch; - if dag_message.epoch == self.epoch() { - // TODO: send message to DAG handler - Ok(()) + if dag_msg_epoch == self.epoch() { + if let Some(tx) = &self.dag_rpc_tx { + tx.push(peer_id, request) + } else { + Err(anyhow::anyhow!("DAG not bootstrapped")) + } } else { monitor!( "process_different_epoch_dag_rpc", - self.process_different_epoch(dag_message.epoch, peer_id) + self.process_different_epoch(dag_msg_epoch, peer_id) ) } }, diff --git a/consensus/src/payload_manager.rs b/consensus/src/payload_manager.rs index 935bc55af3e8ea..f3eeec7a9e195e 100644 --- a/consensus/src/payload_manager.rs +++ b/consensus/src/payload_manager.rs @@ -30,7 +30,7 @@ pub enum PayloadManager { } impl PayloadManager { - async fn request_transactions( + fn request_transactions( proofs: Vec, block_timestamp: u64, batch_store: &BatchStore, @@ -95,11 +95,7 @@ impl PayloadManager { } /// Called from consensus to pre-fetch the transaction behind the batches in the block. - pub async fn prefetch_payload_data(&self, block: &Block) { - let payload = match block.payload() { - Some(p) => p, - None => return, - }; + pub fn prefetch_payload_data(&self, payload: &Payload, timestamp: u64) { match self { PayloadManager::DirectMempool => {}, PayloadManager::InQuorumStore(batch_store, _) => match payload { @@ -107,10 +103,9 @@ impl PayloadManager { if proof_with_status.status.lock().is_none() { let receivers = PayloadManager::request_transactions( proof_with_status.proofs.clone(), - block.timestamp_usecs(), + timestamp, batch_store, - ) - .await; + ); proof_with_status .status .lock() @@ -167,8 +162,7 @@ impl PayloadManager { proof_with_data.proofs.clone(), block.timestamp_usecs(), batch_store, - ) - .await; + ); // Could not get all data so requested again proof_with_data .status @@ -184,8 +178,7 @@ impl PayloadManager { proof_with_data.proofs.clone(), block.timestamp_usecs(), batch_store, - ) - .await; + ); // Could not get all data so requested again proof_with_data .status diff --git a/consensus/src/twins/twins_node.rs b/consensus/src/twins/twins_node.rs index 91c1018788180b..01aec817093e6e 100644 --- a/consensus/src/twins/twins_node.rs +++ b/consensus/src/twins/twins_node.rs @@ -157,6 +157,7 @@ impl SMRNode { quorum_store_storage, reconfig_listener, bounded_executor, + aptos_time_service::TimeService::real(), ); let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver); diff --git a/testsuite/smoke-test/src/aptos_cli/validator.rs b/testsuite/smoke-test/src/aptos_cli/validator.rs index 6944ec5df2ef08..8fd6174f7a151a 100644 --- a/testsuite/smoke-test/src/aptos_cli/validator.rs +++ b/testsuite/smoke-test/src/aptos_cli/validator.rs @@ -198,6 +198,7 @@ async fn test_onchain_config_change() { let inner = match genesis_config.consensus_config.clone() { OnChainConsensusConfig::V1(inner) => inner, OnChainConsensusConfig::V2(inner) => inner, + _ => unimplemented!(), }; let leader_reputation_type = @@ -255,6 +256,7 @@ async fn test_onchain_config_change() { let inner = match current_consensus_config { OnChainConsensusConfig::V1(inner) => inner, OnChainConsensusConfig::V2(inner) => inner, + _ => unimplemented!() }; let leader_reputation_type = if let ProposerElectionType::LeaderReputation(leader_reputation_type) = diff --git a/testsuite/smoke-test/src/consensus/quorum_store_fault_tolerance.rs b/testsuite/smoke-test/src/consensus/quorum_store_fault_tolerance.rs index a9eea2cd2b55d3..890ddb59ff5088 100644 --- a/testsuite/smoke-test/src/consensus/quorum_store_fault_tolerance.rs +++ b/testsuite/smoke-test/src/consensus/quorum_store_fault_tolerance.rs @@ -132,6 +132,7 @@ async fn test_onchain_config_quorum_store_enabled_and_disabled() { let inner = match current_consensus_config { OnChainConsensusConfig::V1(inner) => inner, OnChainConsensusConfig::V2(_) => panic!("Unexpected V2 config"), + _ => unimplemented!() }; // Change to V2 let new_consensus_config = OnChainConsensusConfig::V2(ConsensusConfigV1 { ..inner }); @@ -153,6 +154,7 @@ async fn test_onchain_config_quorum_store_enabled_and_disabled() { let inner = match current_consensus_config { OnChainConsensusConfig::V1(_) => panic!("Unexpected V1 config"), OnChainConsensusConfig::V2(inner) => inner, + _ => unimplemented!() }; // Disaster rollback to V1 diff --git a/testsuite/testcases/src/quorum_store_onchain_enable_test.rs b/testsuite/testcases/src/quorum_store_onchain_enable_test.rs index 69ec6a2e478f69..946cb439daa5c7 100644 --- a/testsuite/testcases/src/quorum_store_onchain_enable_test.rs +++ b/testsuite/testcases/src/quorum_store_onchain_enable_test.rs @@ -69,6 +69,7 @@ impl NetworkLoadTest for QuorumStoreOnChainEnableTest { let inner = match current_consensus_config { OnChainConsensusConfig::V1(inner) => inner, OnChainConsensusConfig::V2(_) => panic!("Unexpected V2 config"), + _ => unimplemented!() }; // Change to V2 diff --git a/types/src/on_chain_config/consensus_config.rs b/types/src/on_chain_config/consensus_config.rs index dca9738816be44..32e56285a72859 100644 --- a/types/src/on_chain_config/consensus_config.rs +++ b/types/src/on_chain_config/consensus_config.rs @@ -13,6 +13,7 @@ use std::collections::HashMap; pub enum OnChainConsensusConfig { V1(ConsensusConfigV1), V2(ConsensusConfigV1), + DagV1(DagConsensusConfigV1), } /// The public interface that exposes all values with safe fallback. @@ -23,6 +24,7 @@ impl OnChainConsensusConfig { OnChainConsensusConfig::V1(config) | OnChainConsensusConfig::V2(config) => { config.exclude_round }, + _ => unimplemented!("method not supported"), } } @@ -38,6 +40,7 @@ impl OnChainConsensusConfig { OnChainConsensusConfig::V1(config) | OnChainConsensusConfig::V2(config) => { config.max_failed_authors_to_store }, + _ => unimplemented!("method not supported"), } } @@ -47,6 +50,7 @@ impl OnChainConsensusConfig { OnChainConsensusConfig::V1(config) | OnChainConsensusConfig::V2(config) => { &config.proposer_election_type }, + _ => unimplemented!("method not supported"), } } @@ -54,6 +58,21 @@ impl OnChainConsensusConfig { match &self { OnChainConsensusConfig::V1(_config) => false, OnChainConsensusConfig::V2(_config) => true, + OnChainConsensusConfig::DagV1(_) => false, + } + } + + pub fn is_dag_enabled(&self) -> bool { + match &self { + OnChainConsensusConfig::DagV1(_) => true, + _ => false, + } + } + + pub fn unwrap_dag_config_v1(&self) -> &DagConsensusConfigV1 { + match &self { + OnChainConsensusConfig::DagV1(config) => config, + _ => unreachable!("not a dag config"), } } } @@ -61,7 +80,8 @@ impl OnChainConsensusConfig { /// This is used when on-chain config is not initialized. impl Default for OnChainConsensusConfig { fn default() -> Self { - OnChainConsensusConfig::V2(ConsensusConfigV1::default()) + // OnChainConsensusConfig::V2(ConsensusConfigV1::default()) + OnChainConsensusConfig::DagV1(DagConsensusConfigV1::default()) } } @@ -189,6 +209,19 @@ pub struct ProposerAndVoterConfig { pub use_history_from_previous_epoch_max_count: u32, } +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub struct DagConsensusConfigV1 { + pub dag_ordering_causal_history_window: usize, +} + +impl Default for DagConsensusConfigV1 { + fn default() -> Self { + Self { + dag_ordering_causal_history_window: 1, + } + } +} + #[cfg(test)] mod test { use super::*;