Skip to content

Commit

Permalink
[dag] introduce a ledger info provider trait
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 21, 2023
1 parent 29c998b commit 16677d7
Show file tree
Hide file tree
Showing 10 changed files with 162 additions and 95 deletions.
49 changes: 40 additions & 9 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ use aptos_types::{
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_change::EpochChangeProof,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
ledger_info::{LedgerInfo, LedgerInfoWithSignatures}, epoch_state::EpochState,
};
use async_trait::async_trait;
use futures_channel::mpsc::UnboundedSender;
Expand Down Expand Up @@ -80,28 +79,28 @@ pub(crate) fn compute_initial_block_and_ledger_info(
}
}

pub struct OrderedNotifierAdapter {
pub(super) struct OrderedNotifierAdapter {
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
parent_block_info: Arc<RwLock<BlockInfo>>,
epoch_state: Arc<EpochState>,
highest_committed_anchor_round: Arc<RwLock<Round>>,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
}

impl OrderedNotifierAdapter {
pub fn new(
pub(super) fn new(
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
epoch_state: Arc<EpochState>,
parent_block_info: BlockInfo,
highest_committed_anchor_round: Arc<RwLock<Round>>,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
) -> Self {
Self {
executor_channel,
storage,
parent_block_info: Arc::new(RwLock::new(parent_block_info)),
epoch_state,
highest_committed_anchor_round,
ledger_info_provider,
}
}
}
Expand Down Expand Up @@ -153,7 +152,7 @@ impl OrderedNotifier for OrderedNotifierAdapter {
);
let block_info = block.block_info();
let storage = self.storage.clone();
let highest_committed_anchor_round = self.highest_committed_anchor_round.clone();
let ledger_info_provider = self.ledger_info_provider.clone();
*self.parent_block_info.write() = block_info.clone();
Ok(self.executor_channel.unbounded_send(OrderedBlocks {
ordered_blocks: vec![block],
Expand All @@ -164,7 +163,9 @@ impl OrderedNotifier for OrderedNotifierAdapter {
callback: Box::new(
move |_committed_blocks: &[Arc<ExecutedBlock>],
commit_decision: LedgerInfoWithSignatures| {
*highest_committed_anchor_round.write() = commit_decision.commit_info().round();
ledger_info_provider
.write()
.notify_commit_proof(commit_decision);

// TODO: this doesn't really work since not every block will trigger a callback,
// we need to update the buffer manager to invoke all callbacks instead of only last one
Expand Down Expand Up @@ -330,3 +331,33 @@ impl DAGStorage for StorageAdapter {
self.aptos_db.get_latest_ledger_info()
}
}

pub(crate) trait TLedgerInfoProvider: Send + Sync {
fn get_latest_ledger_info(&self) -> LedgerInfoWithSignatures;

fn get_highest_committed_anchor_round(&self) -> Round;
}

pub(super) struct LedgerInfoProvider {
latest_ledger_info: LedgerInfoWithSignatures,
}

impl LedgerInfoProvider {
pub(super) fn new(latest_ledger_info: LedgerInfoWithSignatures) -> Self {
Self { latest_ledger_info }
}

pub(super) fn notify_commit_proof(&mut self, ledger_info: LedgerInfoWithSignatures) {
self.latest_ledger_info = ledger_info;
}
}

impl TLedgerInfoProvider for RwLock<LedgerInfoProvider> {
fn get_latest_ledger_info(&self) -> LedgerInfoWithSignatures {
self.read().latest_ledger_info.clone()
}

fn get_highest_committed_anchor_round(&self) -> Round {
self.read().latest_ledger_info.ledger_info().round()
}
}
111 changes: 65 additions & 46 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
// Copyright © Aptos Foundation

use super::{
adapter::{OrderedNotifier, OrderedNotifierAdapter},
adapter::{OrderedNotifier, OrderedNotifierAdapter, TLedgerInfoProvider},
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, DagFetcherService, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::TDAGNetworkSender,
dag_state_sync::{DagStateSynchronizer, StateSyncStatus, StateSyncTrigger, DAG_WINDOW},
dag_state_sync::{DagStateSynchronizer, StateSyncTrigger, DAG_WINDOW},
dag_store::Dag,
order_rule::OrderRule,
rb_handler::NodeBroadcastHandler,
storage::DAGStorage,
types::DAGMessage,
types::{CertifiedNodeMessage, DAGMessage},
ProofNotifier,
};
use crate::{
dag::adapter::compute_initial_block_and_ledger_info,
dag::adapter::{compute_initial_block_and_ledger_info, LedgerInfoProvider},
experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest,
state_replication::{PayloadClient, StateComputer},
Expand All @@ -25,17 +25,12 @@ use aptos_channels::{
aptos_channel::{self, Receiver},
message_queues::QueueStyle,
};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_logger::{debug, error};
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
validator_signer::ValidatorSigner,
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
};
use futures_channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -87,22 +82,31 @@ impl DagBootstrapper {

fn bootstrap_dag_store(
&self,
latest_ledger_info: LedgerInfo,
initial_ledger_info: LedgerInfo,
notifier: Arc<dyn OrderedNotifier>,
dag_window_size_config: usize,
) -> (Arc<RwLock<Dag>>, OrderRule) {
let initial_round = if initial_ledger_info.round() <= dag_window_size_config as Round {
1
} else {
initial_ledger_info
.round()
.saturating_sub(dag_window_size_config as Round)
};

let dag = Arc::new(RwLock::new(Dag::new(
self.epoch_state.clone(),
self.storage.clone(),
latest_ledger_info.round(),
DAG_WINDOW,
initial_round,
dag_window_size_config,
)));

let validators = self.epoch_state.verifier.get_ordered_account_addresses();
let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));

let order_rule = OrderRule::new(
self.epoch_state.clone(),
latest_ledger_info,
initial_ledger_info,
dag.clone(),
anchor_election,
notifier,
Expand All @@ -117,6 +121,7 @@ impl DagBootstrapper {
dag: Arc<RwLock<Dag>>,
order_rule: OrderRule,
state_sync_trigger: StateSyncTrigger,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
) -> (NetworkHandler, DagFetcherService) {
let validators = self.epoch_state.verifier.get_ordered_account_addresses();

Expand Down Expand Up @@ -150,6 +155,7 @@ impl DagBootstrapper {
self.storage.clone(),
order_rule,
fetch_requester.clone(),
ledger_info_provider,
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
Expand Down Expand Up @@ -186,35 +192,45 @@ impl DagBootstrapper {
self.storage.clone(),
);

loop {
loop {
let ledger_info_from_storage = self
.storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");
let (parent_block_info, ledger_info) =
compute_initial_block_and_ledger_info(ledger_info_from_storage);
let highest_committed_anchor_round =
Arc::new(RwLock::new(ledger_info.commit_info().round()));
let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info)));

let adapter = Arc::new(OrderedNotifierAdapter::new(
ordered_nodes_tx.clone(),
self.storage.clone(),
self.epoch_state.clone(),
parent_block_info,
highest_committed_anchor_round.clone(),
ledger_info_provider.clone(),
));

let (dag_store, order_rule) =
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());
let (dag_store, order_rule) = self.bootstrap_dag_store(
ledger_info_provider
.get_latest_ledger_info()
.ledger_info()
.clone(),
adapter.clone(),
DAG_WINDOW,
);

let state_sync_trigger = StateSyncTrigger::new(
highest_committed_anchor_round.clone(),
self.epoch_state.clone(),
ledger_info_provider.clone(),
dag_store.clone(),
self.proof_notifier.clone(),
);

let (handler, fetch_service) =
self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);
let (handler, fetch_service) = self.bootstrap_components(
dag_store.clone(),
order_rule,
state_sync_trigger,
ledger_info_provider.clone(),
);

let df_handle = tokio::spawn(fetch_service.start());

Expand All @@ -226,28 +242,26 @@ impl DagBootstrapper {
let _ = df_handle.await;
return;
},
sync_status = handler.run(&mut dag_rpc_rx) => {
certified_node_msg = handler.run(&mut dag_rpc_rx) => {
debug!("state sync notification received. {:?}", certified_node_msg);
df_handle.abort();
let _ = df_handle.await;

match sync_status {
StateSyncStatus::NeedsSync(certified_node_msg) => {
let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());
let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());

let highest_committed_anchor_round = { *highest_committed_anchor_round.clone().read() };
if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round).await {
error!(error = ?e, "unable to sync");
}
let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round();
let sync_future = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round);

select! {
Err(e) = sync_future => {
error!(error = ?e, "unable to sync");
},
StateSyncStatus::EpochEnds => {
// Wait for epoch manager to signal shutdown
_ = shutdown_rx.await;
Ok(_) = &mut shutdown_rx => {
return;
},
_ => unreachable!()
}
}


debug!("going to rebootstrap.");
}
}
}
Expand All @@ -267,7 +281,7 @@ pub(super) fn bootstrap_dag_for_test(
payload_client: Arc<dyn PayloadClient>,
state_computer: Arc<dyn StateComputer>,
) -> (
JoinHandle<StateSyncStatus>,
JoinHandle<CertifiedNodeMessage>,
JoinHandle<()>,
aptos_channel::Sender<Author, IncomingDAGRequest>,
UnboundedReceiver<OrderedBlocks>,
Expand All @@ -290,29 +304,34 @@ pub(super) fn bootstrap_dag_for_test(
.expect("latest ledger info must exist");
let (parent_block_info, ledger_info) =
compute_initial_block_and_ledger_info(ledger_info_from_storage);
let highest_committed_anchor_round = Arc::new(RwLock::new(ledger_info.commit_info().round()));
let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info)));

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Arc::new(OrderedNotifierAdapter::new(
ordered_nodes_tx,
storage.clone(),
epoch_state.clone(),
parent_block_info,
highest_committed_anchor_round.clone(),
ledger_info_provider.clone(),
));
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

let (dag_store, order_rule) =
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone());
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone(), DAG_WINDOW);

let state_sync_trigger = StateSyncTrigger::new(
highest_committed_anchor_round,
epoch_state,
ledger_info_provider.clone(),
dag_store.clone(),
proof_notifier.clone(),
);

let (handler, fetch_service) =
bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);
let (handler, fetch_service) = bootstraper.bootstrap_components(
dag_store.clone(),
order_rule,
state_sync_trigger,
ledger_info_provider,
);

let dh_handle = tokio::spawn(async move {
let mut dag_rpc_rx = dag_rpc_rx;
Expand Down
9 changes: 5 additions & 4 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
adapter::TLedgerInfoProvider,
dag_fetcher::FetchRequester,
order_rule::OrderRule,
storage::DAGStorage,
Expand Down Expand Up @@ -51,6 +52,7 @@ pub(crate) struct DagDriver {
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
}

impl DagDriver {
Expand All @@ -64,6 +66,7 @@ impl DagDriver {
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
) -> Self {
let pending_node = storage
.get_pending_node()
Expand Down Expand Up @@ -91,6 +94,7 @@ impl DagDriver {
storage,
order_rule,
fetch_requester,
ledger_info_provider,
};

// If we were broadcasting the node for the round already, resume it
Expand Down Expand Up @@ -184,10 +188,7 @@ impl DagDriver {
let signature_builder =
SignatureBuilder::new(node.metadata().clone(), self.epoch_state.clone());
let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len());
let latest_ledger_info = self
.storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");
let latest_ledger_info = self.ledger_info_provider.get_latest_ledger_info();
let task = self
.reliable_broadcast
.broadcast(node.clone(), signature_builder)
Expand Down
Loading

0 comments on commit 16677d7

Please sign in to comment.