From 16677d71e5e8aa1e3cd1ef9425a7610febed8440 Mon Sep 17 00:00:00 2001 From: Balaji Arun Date: Wed, 20 Sep 2023 15:13:51 -0700 Subject: [PATCH] [dag] introduce a ledger info provider trait --- consensus/src/dag/adapter.rs | 49 ++++++-- consensus/src/dag/bootstrap.rs | 111 +++++++++++-------- consensus/src/dag/dag_driver.rs | 9 +- consensus/src/dag/dag_handler.rs | 13 ++- consensus/src/dag/dag_state_sync.rs | 23 ++-- consensus/src/dag/dag_store.rs | 14 +-- consensus/src/dag/tests/dag_driver_tests.rs | 26 ++++- consensus/src/dag/tests/dag_test.rs | 2 +- consensus/src/dag/tests/integration_tests.rs | 4 +- consensus/src/dag/types.rs | 6 +- 10 files changed, 162 insertions(+), 95 deletions(-) diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 6aab8c2d60bc1e..05bf0f30e79004 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -27,8 +27,7 @@ use aptos_types::{ aggregate_signature::AggregateSignature, block_info::BlockInfo, epoch_change::EpochChangeProof, - epoch_state::EpochState, - ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, + ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, epoch_state::EpochState, }; use async_trait::async_trait; use futures_channel::mpsc::UnboundedSender; @@ -80,28 +79,28 @@ pub(crate) fn compute_initial_block_and_ledger_info( } } -pub struct OrderedNotifierAdapter { +pub(super) struct OrderedNotifierAdapter { executor_channel: UnboundedSender, storage: Arc, parent_block_info: Arc>, epoch_state: Arc, - highest_committed_anchor_round: Arc>, + ledger_info_provider: Arc>, } impl OrderedNotifierAdapter { - pub fn new( + pub(super) fn new( executor_channel: UnboundedSender, storage: Arc, epoch_state: Arc, parent_block_info: BlockInfo, - highest_committed_anchor_round: Arc>, + ledger_info_provider: Arc>, ) -> Self { Self { executor_channel, storage, parent_block_info: Arc::new(RwLock::new(parent_block_info)), epoch_state, - highest_committed_anchor_round, + ledger_info_provider, } } } @@ -153,7 +152,7 @@ impl OrderedNotifier for OrderedNotifierAdapter { ); let block_info = block.block_info(); let storage = self.storage.clone(); - let highest_committed_anchor_round = self.highest_committed_anchor_round.clone(); + let ledger_info_provider = self.ledger_info_provider.clone(); *self.parent_block_info.write() = block_info.clone(); Ok(self.executor_channel.unbounded_send(OrderedBlocks { ordered_blocks: vec![block], @@ -164,7 +163,9 @@ impl OrderedNotifier for OrderedNotifierAdapter { callback: Box::new( move |_committed_blocks: &[Arc], commit_decision: LedgerInfoWithSignatures| { - *highest_committed_anchor_round.write() = commit_decision.commit_info().round(); + ledger_info_provider + .write() + .notify_commit_proof(commit_decision); // TODO: this doesn't really work since not every block will trigger a callback, // we need to update the buffer manager to invoke all callbacks instead of only last one @@ -330,3 +331,33 @@ impl DAGStorage for StorageAdapter { self.aptos_db.get_latest_ledger_info() } } + +pub(crate) trait TLedgerInfoProvider: Send + Sync { + fn get_latest_ledger_info(&self) -> LedgerInfoWithSignatures; + + fn get_highest_committed_anchor_round(&self) -> Round; +} + +pub(super) struct LedgerInfoProvider { + latest_ledger_info: LedgerInfoWithSignatures, +} + +impl LedgerInfoProvider { + pub(super) fn new(latest_ledger_info: LedgerInfoWithSignatures) -> Self { + Self { latest_ledger_info } + } + + pub(super) fn notify_commit_proof(&mut self, ledger_info: LedgerInfoWithSignatures) { + self.latest_ledger_info = ledger_info; + } +} + +impl TLedgerInfoProvider for RwLock { + fn get_latest_ledger_info(&self) -> LedgerInfoWithSignatures { + self.read().latest_ledger_info.clone() + } + + fn get_highest_committed_anchor_round(&self) -> Round { + self.read().latest_ledger_info.ledger_info().round() + } +} diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index 0f8e983b7a1a0b..2091f5b383a70f 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -1,22 +1,22 @@ // Copyright © Aptos Foundation use super::{ - adapter::{OrderedNotifier, OrderedNotifierAdapter}, + adapter::{OrderedNotifier, OrderedNotifierAdapter, TLedgerInfoProvider}, anchor_election::RoundRobinAnchorElection, dag_driver::DagDriver, dag_fetcher::{DagFetcher, DagFetcherService, FetchRequestHandler}, dag_handler::NetworkHandler, dag_network::TDAGNetworkSender, - dag_state_sync::{DagStateSynchronizer, StateSyncStatus, StateSyncTrigger, DAG_WINDOW}, + dag_state_sync::{DagStateSynchronizer, StateSyncTrigger, DAG_WINDOW}, dag_store::Dag, order_rule::OrderRule, rb_handler::NodeBroadcastHandler, storage::DAGStorage, - types::DAGMessage, + types::{CertifiedNodeMessage, DAGMessage}, ProofNotifier, }; use crate::{ - dag::adapter::compute_initial_block_and_ledger_info, + dag::adapter::{compute_initial_block_and_ledger_info, LedgerInfoProvider}, experimental::buffer_manager::OrderedBlocks, network::IncomingDAGRequest, state_replication::{PayloadClient, StateComputer}, @@ -25,17 +25,12 @@ use aptos_channels::{ aptos_channel::{self, Receiver}, message_queues::QueueStyle, }; -use aptos_consensus_types::common::Author; -use aptos_crypto::HashValue; +use aptos_consensus_types::common::{Author, Round}; use aptos_infallible::RwLock; -use aptos_logger::error; +use aptos_logger::{debug, error}; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; use aptos_types::{ - aggregate_signature::AggregateSignature, - block_info::BlockInfo, - epoch_state::EpochState, - ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, - validator_signer::ValidatorSigner, + epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner, }; use futures_channel::{ mpsc::{UnboundedReceiver, UnboundedSender}, @@ -87,14 +82,23 @@ impl DagBootstrapper { fn bootstrap_dag_store( &self, - latest_ledger_info: LedgerInfo, + initial_ledger_info: LedgerInfo, notifier: Arc, + dag_window_size_config: usize, ) -> (Arc>, OrderRule) { + let initial_round = if initial_ledger_info.round() <= dag_window_size_config as Round { + 1 + } else { + initial_ledger_info + .round() + .saturating_sub(dag_window_size_config as Round) + }; + let dag = Arc::new(RwLock::new(Dag::new( self.epoch_state.clone(), self.storage.clone(), - latest_ledger_info.round(), - DAG_WINDOW, + initial_round, + dag_window_size_config, ))); let validators = self.epoch_state.verifier.get_ordered_account_addresses(); @@ -102,7 +106,7 @@ impl DagBootstrapper { let order_rule = OrderRule::new( self.epoch_state.clone(), - latest_ledger_info, + initial_ledger_info, dag.clone(), anchor_election, notifier, @@ -117,6 +121,7 @@ impl DagBootstrapper { dag: Arc>, order_rule: OrderRule, state_sync_trigger: StateSyncTrigger, + ledger_info_provider: Arc, ) -> (NetworkHandler, DagFetcherService) { let validators = self.epoch_state.verifier.get_ordered_account_addresses(); @@ -150,6 +155,7 @@ impl DagBootstrapper { self.storage.clone(), order_rule, fetch_requester.clone(), + ledger_info_provider, ); let rb_handler = NodeBroadcastHandler::new( dag.clone(), @@ -186,35 +192,45 @@ impl DagBootstrapper { self.storage.clone(), ); - loop { + loop { let ledger_info_from_storage = self .storage .get_latest_ledger_info() .expect("latest ledger info must exist"); let (parent_block_info, ledger_info) = compute_initial_block_and_ledger_info(ledger_info_from_storage); - let highest_committed_anchor_round = - Arc::new(RwLock::new(ledger_info.commit_info().round())); + let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info))); let adapter = Arc::new(OrderedNotifierAdapter::new( ordered_nodes_tx.clone(), self.storage.clone(), self.epoch_state.clone(), parent_block_info, - highest_committed_anchor_round.clone(), + ledger_info_provider.clone(), )); - let (dag_store, order_rule) = - self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone()); + let (dag_store, order_rule) = self.bootstrap_dag_store( + ledger_info_provider + .get_latest_ledger_info() + .ledger_info() + .clone(), + adapter.clone(), + DAG_WINDOW, + ); let state_sync_trigger = StateSyncTrigger::new( - highest_committed_anchor_round.clone(), + self.epoch_state.clone(), + ledger_info_provider.clone(), dag_store.clone(), self.proof_notifier.clone(), ); - let (handler, fetch_service) = - self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); + let (handler, fetch_service) = self.bootstrap_components( + dag_store.clone(), + order_rule, + state_sync_trigger, + ledger_info_provider.clone(), + ); let df_handle = tokio::spawn(fetch_service.start()); @@ -226,28 +242,26 @@ impl DagBootstrapper { let _ = df_handle.await; return; }, - sync_status = handler.run(&mut dag_rpc_rx) => { + certified_node_msg = handler.run(&mut dag_rpc_rx) => { + debug!("state sync notification received. {:?}", certified_node_msg); df_handle.abort(); let _ = df_handle.await; - match sync_status { - StateSyncStatus::NeedsSync(certified_node_msg) => { - let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); + let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone()); - let highest_committed_anchor_round = { *highest_committed_anchor_round.clone().read() }; - if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round).await { - error!(error = ?e, "unable to sync"); - } + let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round(); + let sync_future = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round); + + select! { + Err(e) = sync_future => { + error!(error = ?e, "unable to sync"); }, - StateSyncStatus::EpochEnds => { - // Wait for epoch manager to signal shutdown - _ = shutdown_rx.await; + Ok(_) = &mut shutdown_rx => { return; - }, - _ => unreachable!() + } } - + debug!("going to rebootstrap."); } } } @@ -267,7 +281,7 @@ pub(super) fn bootstrap_dag_for_test( payload_client: Arc, state_computer: Arc, ) -> ( - JoinHandle, + JoinHandle, JoinHandle<()>, aptos_channel::Sender, UnboundedReceiver, @@ -290,7 +304,7 @@ pub(super) fn bootstrap_dag_for_test( .expect("latest ledger info must exist"); let (parent_block_info, ledger_info) = compute_initial_block_and_ledger_info(ledger_info_from_storage); - let highest_committed_anchor_round = Arc::new(RwLock::new(ledger_info.commit_info().round())); + let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info))); let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); let adapter = Arc::new(OrderedNotifierAdapter::new( @@ -298,21 +312,26 @@ pub(super) fn bootstrap_dag_for_test( storage.clone(), epoch_state.clone(), parent_block_info, - highest_committed_anchor_round.clone(), + ledger_info_provider.clone(), )); let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); let (dag_store, order_rule) = - bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone()); + bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone(), DAG_WINDOW); let state_sync_trigger = StateSyncTrigger::new( - highest_committed_anchor_round, + epoch_state, + ledger_info_provider.clone(), dag_store.clone(), proof_notifier.clone(), ); - let (handler, fetch_service) = - bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger); + let (handler, fetch_service) = bootstraper.bootstrap_components( + dag_store.clone(), + order_rule, + state_sync_trigger, + ledger_info_provider, + ); let dh_handle = tokio::spawn(async move { let mut dag_rpc_rx = dag_rpc_rx; diff --git a/consensus/src/dag/dag_driver.rs b/consensus/src/dag/dag_driver.rs index fecf50bf620c63..a2a4b230b367b3 100644 --- a/consensus/src/dag/dag_driver.rs +++ b/consensus/src/dag/dag_driver.rs @@ -2,6 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use super::{ + adapter::TLedgerInfoProvider, dag_fetcher::FetchRequester, order_rule::OrderRule, storage::DAGStorage, @@ -51,6 +52,7 @@ pub(crate) struct DagDriver { storage: Arc, order_rule: OrderRule, fetch_requester: Arc, + ledger_info_provider: Arc, } impl DagDriver { @@ -64,6 +66,7 @@ impl DagDriver { storage: Arc, order_rule: OrderRule, fetch_requester: Arc, + ledger_info_provider: Arc, ) -> Self { let pending_node = storage .get_pending_node() @@ -91,6 +94,7 @@ impl DagDriver { storage, order_rule, fetch_requester, + ledger_info_provider, }; // If we were broadcasting the node for the round already, resume it @@ -184,10 +188,7 @@ impl DagDriver { let signature_builder = SignatureBuilder::new(node.metadata().clone(), self.epoch_state.clone()); let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len()); - let latest_ledger_info = self - .storage - .get_latest_ledger_info() - .expect("latest ledger info must exist"); + let latest_ledger_info = self.ledger_info_provider.get_latest_ledger_info(); let task = self .reliable_broadcast .broadcast(node.clone(), signature_builder) diff --git a/consensus/src/dag/dag_handler.rs b/consensus/src/dag/dag_handler.rs index 0120409412106b..f111e16ef6b070 100644 --- a/consensus/src/dag/dag_handler.rs +++ b/consensus/src/dag/dag_handler.rs @@ -4,7 +4,7 @@ use super::{ dag_driver::DagDriver, dag_fetcher::{FetchRequestHandler, FetchWaiter}, dag_state_sync::{StateSyncStatus, StateSyncTrigger}, - types::TDAGMessage, + types::{TDAGMessage, CertifiedNodeMessage}, CertifiedNode, Node, }; use crate::{ @@ -56,15 +56,15 @@ impl NetworkHandler { pub async fn run( mut self, dag_rpc_rx: &mut aptos_channel::Receiver, - ) -> StateSyncStatus { + ) -> CertifiedNodeMessage { // TODO(ibalajiarun): clean up Reliable Broadcast storage periodically. loop { select! { Some(msg) = dag_rpc_rx.next() => { match self.process_rpc(msg).await { Ok(sync_status) => { - if matches!(sync_status, StateSyncStatus::NeedsSync(_) | StateSyncStatus::EpochEnds) { - return sync_status; + if let StateSyncStatus::NeedsSync(certified_node_msg) = sync_status { + return certified_node_msg; } }, Err(e) => { @@ -136,8 +136,7 @@ impl NetworkHandler { .process(certified_node_msg.certified_node()) .await .map(|r| r.into()), - status @ (StateSyncStatus::NeedsSync(_) - | StateSyncStatus::EpochEnds) => return Ok(status), + status @ StateSyncStatus::NeedsSync(_) => return Ok(status), _ => unreachable!(), } }, @@ -150,6 +149,8 @@ impl NetworkHandler { } }; + debug!("responding to rpc: {:?}", response); + let response = response .and_then(|response_msg| { rpc_request diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index f7e11988587cea..a5bfbfaa14da85 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation use super::{ + adapter::TLedgerInfoProvider, dag_fetcher::TDagFetcher, dag_store::Dag, storage::DAGStorage, @@ -32,7 +33,7 @@ pub enum StateSyncStatus { pub(super) struct StateSyncTrigger { epoch_state: Arc, - highest_committed_anchor_round: Arc>, + ledger_info_provider: Arc, dag_store: Arc>, proof_notifier: Arc, } @@ -40,13 +41,13 @@ pub(super) struct StateSyncTrigger { impl StateSyncTrigger { pub(super) fn new( epoch_state: Arc, - highest_committed_anchor_round: Arc>, + ledger_info_provider: Arc, dag_store: Arc>, proof_notifier: Arc, ) -> Self { Self { epoch_state, - highest_committed_anchor_round, + ledger_info_provider, dag_store, proof_notifier, } @@ -98,7 +99,10 @@ impl StateSyncTrigger { async fn notify_commit_proof(&self, ledger_info: &LedgerInfoWithSignatures) { // if the anchor exists between ledger info round and highest ordered round // Note: ledger info round <= highest ordered round - if *self.highest_committed_anchor_round.read() < ledger_info.commit_info().round() + if self + .ledger_info_provider + .get_highest_committed_anchor_round() + < ledger_info.commit_info().round() && self .dag_store .read() @@ -115,7 +119,11 @@ impl StateSyncTrigger { /// Check if we're far away from this ledger info and need to sync. /// This ensures that the block referred by the ledger info is not in buffer manager. fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { - if li.commit_info().round() <= self.dag_store.read().highest_committed_anchor_round() { + if li.commit_info().round() + <= self + .ledger_info_provider + .get_highest_committed_anchor_round() + { return false; } @@ -127,7 +135,9 @@ impl StateSyncTrigger { dag_reader .highest_ordered_anchor_round() .is_some_and(|r| r < li.commit_info().round()) - || *self.highest_committed_anchor_round.read() + || self + .ledger_info_provider + .get_highest_committed_anchor_round() + ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round) < li.commit_info().round() } @@ -191,7 +201,6 @@ impl DagStateSynchronizer { self.epoch_state.clone(), self.storage.clone(), start_round, - commit_li.commit_info().round(), ))); let bitmask = { sync_dag_store.read().bitmask(target_round) }; let request = RemoteFetchRequest::new( diff --git a/consensus/src/dag/dag_store.rs b/consensus/src/dag/dag_store.rs index 60910b1bb4802f..35f51e0d5d3c84 100644 --- a/consensus/src/dag/dag_store.rs +++ b/consensus/src/dag/dag_store.rs @@ -43,16 +43,14 @@ pub struct Dag { storage: Arc, initial_round: Round, epoch_state: Arc, - - highest_committed_anchor_round: Round, } impl Dag { pub fn new( epoch_state: Arc, storage: Arc, - highest_committed_anchor_round: Round, - dag_window_size_config: usize, + initial_round: Round, + _dag_window_size_config: usize, ) -> Self { let epoch = epoch_state.epoch; let author_to_index = epoch_state.verifier.address_to_validator_index().clone(); @@ -78,18 +76,12 @@ impl Dag { if let Err(e) = storage.delete_certified_nodes(expired) { error!("Error deleting expired nodes: {:?}", e); } - let initial_round = if highest_committed_anchor_round <= dag_window_size_config as Round { - 1 - } else { - highest_committed_anchor_round.saturating_sub(dag_window_size_config as Round) - }; Self { nodes_by_round, author_to_index, storage, initial_round, epoch_state, - highest_committed_anchor_round, } } @@ -97,7 +89,6 @@ impl Dag { epoch_state: Arc, storage: Arc, initial_round: Round, - highest_committed_anchor_round: Round, ) -> Self { let author_to_index = epoch_state.verifier.address_to_validator_index().clone(); let nodes_by_round = BTreeMap::new(); @@ -107,7 +98,6 @@ impl Dag { storage, initial_round, epoch_state, - highest_committed_anchor_round, } } diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 3a1f55a55b6cf8..5b489b404889f4 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -2,6 +2,7 @@ use crate::{ dag::{ + adapter::TLedgerInfoProvider, anchor_election::RoundRobinAnchorElection, dag_driver::{DagDriver, DagDriverError}, dag_fetcher::DagFetcherService, @@ -17,13 +18,13 @@ use crate::{ }, test_utils::MockPayloadManager, }; -use aptos_consensus_types::common::Author; +use aptos_consensus_types::common::{Author, Round}; use aptos_infallible::RwLock; use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast}; use aptos_time_service::TimeService; use aptos_types::{ epoch_state::EpochState, - ledger_info::{generate_ledger_info_with_sig, LedgerInfo}, + ledger_info::{generate_ledger_info_with_sig, LedgerInfo, LedgerInfoWithSignatures}, validator_verifier::random_validator_verifier, }; use async_trait::async_trait; @@ -70,6 +71,20 @@ impl TDAGNetworkSender for MockNetworkSender { } } +struct MockLedgerInfoProvider { + latest_ledger_info: LedgerInfoWithSignatures, +} + +impl TLedgerInfoProvider for MockLedgerInfoProvider { + fn get_latest_ledger_info(&self) -> LedgerInfoWithSignatures { + self.latest_ledger_info.clone() + } + + fn get_highest_committed_anchor_round(&self) -> Round { + self.latest_ledger_info.ledger_info().round() + } +} + #[tokio::test] async fn test_certified_node_handler() { let (signers, validator_verifier) = random_validator_verifier(4, None, false); @@ -80,7 +95,7 @@ async fn test_certified_node_handler() { let mock_ledger_info = LedgerInfo::mock_genesis(None); let mock_ledger_info = generate_ledger_info_with_sig(&signers, mock_ledger_info); - let storage = Arc::new(MockStorage::new_with_ledger_info(mock_ledger_info)); + let storage = Arc::new(MockStorage::new_with_ledger_info(mock_ledger_info.clone())); let dag = Arc::new(RwLock::new(Dag::new( epoch_state.clone(), storage.clone(), @@ -116,6 +131,10 @@ async fn test_certified_node_handler() { ); let fetch_requester = Arc::new(fetch_requester); + let ledger_info_provider = Arc::new(MockLedgerInfoProvider { + latest_ledger_info: mock_ledger_info, + }); + let mut driver = DagDriver::new( signers[0].author(), epoch_state, @@ -126,6 +145,7 @@ async fn test_certified_node_handler() { storage, order_rule, fetch_requester, + ledger_info_provider, ); let first_round_node = new_certified_node(1, signers[0].author(), vec![]); diff --git a/consensus/src/dag/tests/dag_test.rs b/consensus/src/dag/tests/dag_test.rs index 5cf2bd8382b03c..98e4b1e3edc49c 100644 --- a/consensus/src/dag/tests/dag_test.rs +++ b/consensus/src/dag/tests/dag_test.rs @@ -116,7 +116,7 @@ fn setup() -> (Vec, Arc, Dag, Arc) { verifier: validator_verifier, }); let storage = Arc::new(MockStorage::new()); - let dag = Dag::new(epoch_state.clone(), storage.clone(), 0, DAG_WINDOW); + let dag = Dag::new(epoch_state.clone(), storage.clone(), 1, DAG_WINDOW); (signers, epoch_state, dag, storage) } diff --git a/consensus/src/dag/tests/integration_tests.rs b/consensus/src/dag/tests/integration_tests.rs index 54ad393de5c3f9..429c8fce783cb3 100644 --- a/consensus/src/dag/tests/integration_tests.rs +++ b/consensus/src/dag/tests/integration_tests.rs @@ -2,7 +2,7 @@ use super::dag_test; use crate::{ - dag::{bootstrap::bootstrap_dag_for_test, dag_state_sync::StateSyncStatus}, + dag::{bootstrap::bootstrap_dag_for_test, types::CertifiedNodeMessage}, experimental::buffer_manager::OrderedBlocks, network::{IncomingDAGRequest, NetworkSender}, network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC}, @@ -41,7 +41,7 @@ use std::sync::Arc; use tokio::task::JoinHandle; struct DagBootstrapUnit { - nh_task_handle: JoinHandle, + nh_task_handle: JoinHandle, df_task_handle: JoinHandle<()>, dag_rpc_tx: aptos_channel::Sender, network_events: diff --git a/consensus/src/dag/types.rs b/consensus/src/dag/types.rs index 927908f83564dc..1e4fb7fccfb971 100644 --- a/consensus/src/dag/types.rs +++ b/consensus/src/dag/types.rs @@ -450,11 +450,7 @@ impl Deref for CertifiedNodeMessage { impl TDAGMessage for CertifiedNodeMessage { fn verify(&self, verifier: &ValidatorVerifier) -> anyhow::Result<()> { - self.inner.verify(verifier)?; - - self.ledger_info - .verify_signatures(verifier) - .map_err(|e| anyhow::anyhow!("unable to verify ledger info: {}", e)) + self.inner.verify(verifier) } }