Skip to content

Commit

Permalink
unify notifier and upstreamnotifier trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 6, 2023
1 parent 7b1f793 commit 7e41139
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 40 deletions.
34 changes: 29 additions & 5 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use crate::{
dag::{order_rule::Notifier, storage::DAGStorage, CertifiedNode},
dag::{storage::DAGStorage, CertifiedNode},
experimental::buffer_manager::OrderedBlocks,
};
use aptos_consensus_types::{
Expand All @@ -14,17 +14,32 @@ use aptos_executor_types::StateComputeResult;
use aptos_logger::error;
use aptos_types::{
aggregate_signature::AggregateSignature,
epoch_change::EpochChangeProof,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
};
use async_trait::async_trait;
use futures_channel::mpsc::UnboundedSender;
use std::sync::Arc;

pub struct BufferManagerAdapter {
#[async_trait]
pub trait Notifier: Send {
fn send_ordered_nodes(
&mut self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
failed_author: Vec<(Round, Author)>,
) -> anyhow::Result<()>;

async fn send_epoch_change(&self, proof: EpochChangeProof);

async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures);
}

pub struct NotificationAdapter {
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
}

impl BufferManagerAdapter {
impl NotificationAdapter {
pub fn new(
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
Expand All @@ -36,8 +51,9 @@ impl BufferManagerAdapter {
}
}

impl Notifier for BufferManagerAdapter {
fn send(
#[async_trait]
impl Notifier for NotificationAdapter {
fn send_ordered_nodes(
&mut self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
failed_author: Vec<(Round, Author)>,
Expand Down Expand Up @@ -85,4 +101,12 @@ impl Notifier for BufferManagerAdapter {
),
})?)
}

async fn send_epoch_change(&self, _proof: EpochChangeProof) {
todo!()
}

async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {
todo!()
}
}
4 changes: 2 additions & 2 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use super::{
types::DAGMessage,
};
use crate::{
dag::adapter::BufferManagerAdapter, experimental::buffer_manager::OrderedBlocks,
dag::adapter::NotificationAdapter, experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest, state_replication::PayloadClient,
};
use aptos_channels::{aptos_channel, message_queues::QueueStyle};
Expand Down Expand Up @@ -47,7 +47,7 @@ pub fn bootstrap_dag(
let current_round = latest_ledger_info.round();

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Box::new(BufferManagerAdapter::new(ordered_nodes_tx, storage.clone()));
let adapter = Box::new(NotificationAdapter::new(ordered_nodes_tx, storage.clone()));
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

// A backoff policy that starts at 100ms and doubles each iteration.
Expand Down
32 changes: 16 additions & 16 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright © Aptos Foundation

use super::{
adapter::Notifier,
dag_fetcher::{DagFetcher, TDagFetcher},
dag_store::Dag,
storage::DAGStorage,
Expand All @@ -21,18 +22,15 @@ use std::sync::Arc;
// TODO: move this to onchain config
// TODO: temporarily setting DAG_WINDOW to 1 to maintain Shoal safety
pub const DAG_WINDOW: u64 = 1;
pub const STATE_SYNC_WINDOW_MULTIPLIER: u64 = 30;

#[async_trait]
pub trait TUpstreamNotifier: Send {
async fn send_epoch_change(&self, proof: EpochChangeProof);

async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures);
}
pub trait TUpstreamNotifier: Send {}

pub(super) struct StateSyncManager {
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
upstream_notifier: Arc<dyn TUpstreamNotifier>,
notifier: Arc<dyn Notifier>,
time_service: TimeService,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn DAGStorage>,
Expand All @@ -43,7 +41,7 @@ impl StateSyncManager {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
upstream_notifier: Arc<dyn TUpstreamNotifier>,
notifier: Arc<dyn Notifier>,
time_service: TimeService,
state_computer: Arc<dyn StateComputer>,
storage: Arc<dyn DAGStorage>,
Expand All @@ -52,7 +50,7 @@ impl StateSyncManager {
Self {
epoch_state,
network,
upstream_notifier,
notifier,
time_service,
state_computer,
storage,
Expand All @@ -74,31 +72,33 @@ impl StateSyncManager {

// if the anchor exists between ledger info round and highest ordered round
// Note: ledger info round <= highest ordered round
if dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round()
if dag_reader
.highest_committed_anchor_round()
< ledger_info.commit_info().round()
&& dag_reader
.highest_ordered_anchor_round()
.unwrap_or_default()
>= ledger_info.commit_info().round()
{
self.upstream_notifier
.send_commit_proof(ledger_info.clone())
.await
self.notifier.send_commit_proof(ledger_info.clone()).await
}
}

/// Check if we're far away from this ledger info and need to sync.
/// This ensures that the block referred by the ledger info is not in buffer manager.
pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool {
let dag_reader = self.dag_store.read();
// check whether if DAG order round is behind the given ledger info round
// (meaning consensus is behind) or
// check whether if DAG order round is behind the given ledger info round
// (meaning consensus is behind) or
// the highest committed anchor round is 2*DAG_WINDOW behind the given ledger info round
// (meaning execution is behind the DAG window)
(dag_reader
.highest_ordered_anchor_round()
.unwrap_or_default()
< li.commit_info().round())
|| dag_reader.highest_committed_anchor_round() + 2 * DAG_WINDOW
|| dag_reader
.highest_committed_anchor_round()
+ STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW
< li.commit_info().round()
}

Expand Down Expand Up @@ -130,7 +130,7 @@ impl StateSyncManager {
let commit_li = node.ledger_info();

if commit_li.ledger_info().ends_epoch() {
self.upstream_notifier
self.notifier
.send_epoch_change(EpochChangeProof::new(
vec![commit_li.clone()],
/* more = */ false,
Expand Down
22 changes: 10 additions & 12 deletions consensus/src/dag/order_rule.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::dag_store::NodeStatus;
use crate::dag::{
anchor_election::AnchorElection, dag_store::Dag, storage::DAGStorage, types::NodeMetadata,
adapter::Notifier,
anchor_election::AnchorElection,
dag_store::{Dag, NodeStatus},
storage::DAGStorage,
types::NodeMetadata,
CertifiedNode,
};
use aptos_consensus_types::common::{Author, Round};
use aptos_consensus_types::common::Round;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_types::{epoch_state::EpochState, ledger_info::LedgerInfo};
use std::sync::Arc;

pub trait Notifier: Send {
fn send(
&mut self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
failed_author: Vec<(Round, Author)>,
) -> anyhow::Result<()>;
}

pub struct OrderRule {
epoch_state: Arc<EpochState>,
lowest_unordered_anchor_round: Round,
Expand Down Expand Up @@ -184,7 +179,10 @@ impl OrderRule {
if let Err(e) = self
.storage
.save_ordered_anchor_id(&anchor.id())
.and_then(|_| self.notifier.send(ordered_nodes, failed_authors))
.and_then(|_| {
self.notifier
.send_ordered_nodes(ordered_nodes, failed_authors)
})
{
error!("Failed to send ordered nodes {:?}", e);
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ async fn test_certified_node_handler() {
epoch_state.clone(),
storage.clone(),
1,
0
0,
)));

let network_sender = Arc::new(MockNetworkSender {});
Expand Down
18 changes: 14 additions & 4 deletions consensus/src/dag/tests/order_rule_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,20 @@ use crate::{
dag::{
anchor_election::RoundRobinAnchorElection,
dag_store::Dag,
order_rule::{Notifier, OrderRule},
order_rule::OrderRule,
tests::{dag_test::MockStorage, helpers::new_certified_node},
types::{NodeCertificate, NodeMetadata},
CertifiedNode,
CertifiedNode, adapter::Notifier,
},
test_utils::placeholder_ledger_info,
};
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::{Mutex, RwLock};
use aptos_types::{
aggregate_signature::AggregateSignature, epoch_state::EpochState,
validator_verifier::random_validator_verifier,
validator_verifier::random_validator_verifier, epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures,
};
use async_trait::async_trait;
use futures_channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use proptest::prelude::*;
use std::sync::Arc;
Expand Down Expand Up @@ -121,14 +122,23 @@ pub struct TestNotifier {
pub tx: UnboundedSender<Vec<Arc<CertifiedNode>>>,
}

#[async_trait]
impl Notifier for TestNotifier {
fn send(
fn send_ordered_nodes(
&mut self,
ordered_nodes: Vec<Arc<CertifiedNode>>,
_failed_authors: Vec<(Round, Author)>,
) -> anyhow::Result<()> {
Ok(self.tx.unbounded_send(ordered_nodes)?)
}

async fn send_epoch_change(&self, _proof: EpochChangeProof) {
unimplemented!()
}

async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {
unimplemented!()
}
}

fn create_order_rule(
Expand Down

0 comments on commit 7e41139

Please sign in to comment.