diff --git a/consensus/src/dag/adapter.rs b/consensus/src/dag/adapter.rs index 3ea876970490a9..d96eb6d293e483 100644 --- a/consensus/src/dag/adapter.rs +++ b/consensus/src/dag/adapter.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 use crate::{ - dag::{order_rule::Notifier, storage::DAGStorage, CertifiedNode}, + dag::{storage::DAGStorage, CertifiedNode}, experimental::buffer_manager::OrderedBlocks, }; use aptos_consensus_types::{ @@ -14,17 +14,32 @@ use aptos_executor_types::StateComputeResult; use aptos_logger::error; use aptos_types::{ aggregate_signature::AggregateSignature, + epoch_change::EpochChangeProof, ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, }; +use async_trait::async_trait; use futures_channel::mpsc::UnboundedSender; use std::sync::Arc; -pub struct BufferManagerAdapter { +#[async_trait] +pub trait Notifier: Send { + fn send_ordered_nodes( + &mut self, + ordered_nodes: Vec>, + failed_author: Vec<(Round, Author)>, + ) -> anyhow::Result<()>; + + async fn send_epoch_change(&self, proof: EpochChangeProof); + + async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures); +} + +pub struct NotificationAdapter { executor_channel: UnboundedSender, storage: Arc, } -impl BufferManagerAdapter { +impl NotificationAdapter { pub fn new( executor_channel: UnboundedSender, storage: Arc, @@ -36,8 +51,9 @@ impl BufferManagerAdapter { } } -impl Notifier for BufferManagerAdapter { - fn send( +#[async_trait] +impl Notifier for NotificationAdapter { + fn send_ordered_nodes( &mut self, ordered_nodes: Vec>, failed_author: Vec<(Round, Author)>, @@ -85,4 +101,12 @@ impl Notifier for BufferManagerAdapter { ), })?) } + + async fn send_epoch_change(&self, _proof: EpochChangeProof) { + todo!() + } + + async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) { + todo!() + } } diff --git a/consensus/src/dag/bootstrap.rs b/consensus/src/dag/bootstrap.rs index bfa1dcdbead517..e8a8674abb9435 100644 --- a/consensus/src/dag/bootstrap.rs +++ b/consensus/src/dag/bootstrap.rs @@ -13,7 +13,7 @@ use super::{ types::DAGMessage, }; use crate::{ - dag::adapter::BufferManagerAdapter, experimental::buffer_manager::OrderedBlocks, + dag::adapter::NotificationAdapter, experimental::buffer_manager::OrderedBlocks, network::IncomingDAGRequest, state_replication::PayloadClient, }; use aptos_channels::{aptos_channel, message_queues::QueueStyle}; @@ -47,7 +47,7 @@ pub fn bootstrap_dag( let current_round = latest_ledger_info.round(); let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded(); - let adapter = Box::new(BufferManagerAdapter::new(ordered_nodes_tx, storage.clone())); + let adapter = Box::new(NotificationAdapter::new(ordered_nodes_tx, storage.clone())); let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None); // A backoff policy that starts at 100ms and doubles each iteration. diff --git a/consensus/src/dag/dag_state_sync.rs b/consensus/src/dag/dag_state_sync.rs index b4c422d5b1ac12..c6c9b8bfea078d 100644 --- a/consensus/src/dag/dag_state_sync.rs +++ b/consensus/src/dag/dag_state_sync.rs @@ -1,6 +1,7 @@ // Copyright © Aptos Foundation use super::{ + adapter::Notifier, dag_fetcher::{DagFetcher, TDagFetcher}, dag_store::Dag, storage::DAGStorage, @@ -21,18 +22,15 @@ use std::sync::Arc; // TODO: move this to onchain config // TODO: temporarily setting DAG_WINDOW to 1 to maintain Shoal safety pub const DAG_WINDOW: u64 = 1; +pub const STATE_SYNC_WINDOW_MULTIPLIER: u64 = 30; #[async_trait] -pub trait TUpstreamNotifier: Send { - async fn send_epoch_change(&self, proof: EpochChangeProof); - - async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures); -} +pub trait TUpstreamNotifier: Send {} pub(super) struct StateSyncManager { epoch_state: Arc, network: Arc, - upstream_notifier: Arc, + notifier: Arc, time_service: TimeService, state_computer: Arc, storage: Arc, @@ -43,7 +41,7 @@ impl StateSyncManager { pub fn new( epoch_state: Arc, network: Arc, - upstream_notifier: Arc, + notifier: Arc, time_service: TimeService, state_computer: Arc, storage: Arc, @@ -52,7 +50,7 @@ impl StateSyncManager { Self { epoch_state, network, - upstream_notifier, + notifier, time_service, state_computer, storage, @@ -74,15 +72,15 @@ impl StateSyncManager { // if the anchor exists between ledger info round and highest ordered round // Note: ledger info round <= highest ordered round - if dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round() + if dag_reader + .highest_committed_anchor_round() + < ledger_info.commit_info().round() && dag_reader .highest_ordered_anchor_round() .unwrap_or_default() >= ledger_info.commit_info().round() { - self.upstream_notifier - .send_commit_proof(ledger_info.clone()) - .await + self.notifier.send_commit_proof(ledger_info.clone()).await } } @@ -90,15 +88,17 @@ impl StateSyncManager { /// This ensures that the block referred by the ledger info is not in buffer manager. pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool { let dag_reader = self.dag_store.read(); - // check whether if DAG order round is behind the given ledger info round - // (meaning consensus is behind) or + // check whether if DAG order round is behind the given ledger info round + // (meaning consensus is behind) or // the highest committed anchor round is 2*DAG_WINDOW behind the given ledger info round // (meaning execution is behind the DAG window) (dag_reader .highest_ordered_anchor_round() .unwrap_or_default() < li.commit_info().round()) - || dag_reader.highest_committed_anchor_round() + 2 * DAG_WINDOW + || dag_reader + .highest_committed_anchor_round() + + STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW < li.commit_info().round() } @@ -130,7 +130,7 @@ impl StateSyncManager { let commit_li = node.ledger_info(); if commit_li.ledger_info().ends_epoch() { - self.upstream_notifier + self.notifier .send_epoch_change(EpochChangeProof::new( vec![commit_li.clone()], /* more = */ false, diff --git a/consensus/src/dag/order_rule.rs b/consensus/src/dag/order_rule.rs index fe8d533e6c35e9..52687efd80cdef 100644 --- a/consensus/src/dag/order_rule.rs +++ b/consensus/src/dag/order_rule.rs @@ -1,25 +1,20 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use super::dag_store::NodeStatus; use crate::dag::{ - anchor_election::AnchorElection, dag_store::Dag, storage::DAGStorage, types::NodeMetadata, + adapter::Notifier, + anchor_election::AnchorElection, + dag_store::{Dag, NodeStatus}, + storage::DAGStorage, + types::NodeMetadata, CertifiedNode, }; -use aptos_consensus_types::common::{Author, Round}; +use aptos_consensus_types::common::Round; use aptos_infallible::RwLock; use aptos_logger::error; use aptos_types::{epoch_state::EpochState, ledger_info::LedgerInfo}; use std::sync::Arc; -pub trait Notifier: Send { - fn send( - &mut self, - ordered_nodes: Vec>, - failed_author: Vec<(Round, Author)>, - ) -> anyhow::Result<()>; -} - pub struct OrderRule { epoch_state: Arc, lowest_unordered_anchor_round: Round, @@ -184,7 +179,10 @@ impl OrderRule { if let Err(e) = self .storage .save_ordered_anchor_id(&anchor.id()) - .and_then(|_| self.notifier.send(ordered_nodes, failed_authors)) + .and_then(|_| { + self.notifier + .send_ordered_nodes(ordered_nodes, failed_authors) + }) { error!("Failed to send ordered nodes {:?}", e); } diff --git a/consensus/src/dag/tests/dag_driver_tests.rs b/consensus/src/dag/tests/dag_driver_tests.rs index 51d1d6f8f4a368..94e1f9036cd1c5 100644 --- a/consensus/src/dag/tests/dag_driver_tests.rs +++ b/consensus/src/dag/tests/dag_driver_tests.rs @@ -79,7 +79,7 @@ async fn test_certified_node_handler() { epoch_state.clone(), storage.clone(), 1, - 0 + 0, ))); let network_sender = Arc::new(MockNetworkSender {}); diff --git a/consensus/src/dag/tests/order_rule_tests.rs b/consensus/src/dag/tests/order_rule_tests.rs index 42b749c4ded924..4772366f345acf 100644 --- a/consensus/src/dag/tests/order_rule_tests.rs +++ b/consensus/src/dag/tests/order_rule_tests.rs @@ -5,10 +5,10 @@ use crate::{ dag::{ anchor_election::RoundRobinAnchorElection, dag_store::Dag, - order_rule::{Notifier, OrderRule}, + order_rule::OrderRule, tests::{dag_test::MockStorage, helpers::new_certified_node}, types::{NodeCertificate, NodeMetadata}, - CertifiedNode, + CertifiedNode, adapter::Notifier, }, test_utils::placeholder_ledger_info, }; @@ -16,8 +16,9 @@ use aptos_consensus_types::common::{Author, Round}; use aptos_infallible::{Mutex, RwLock}; use aptos_types::{ aggregate_signature::AggregateSignature, epoch_state::EpochState, - validator_verifier::random_validator_verifier, + validator_verifier::random_validator_verifier, epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures, }; +use async_trait::async_trait; use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use proptest::prelude::*; use std::sync::Arc; @@ -121,14 +122,23 @@ pub struct TestNotifier { pub tx: UnboundedSender>>, } +#[async_trait] impl Notifier for TestNotifier { - fn send( + fn send_ordered_nodes( &mut self, ordered_nodes: Vec>, _failed_authors: Vec<(Round, Author)>, ) -> anyhow::Result<()> { Ok(self.tx.unbounded_send(ordered_nodes)?) } + + async fn send_epoch_change(&self, _proof: EpochChangeProof) { + unimplemented!() + } + + async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) { + unimplemented!() + } } fn create_order_rule(