Skip to content

Commit

Permalink
[dag] split notifier into Order and Proof Notifier
Browse files Browse the repository at this point in the history
[dag] additional ledger info verification checks

[dag] separate out highest committed round provider

[dag] introduce a ledger info provider trait
  • Loading branch information
ibalajiarun authored and zekun000 committed Sep 26, 2023
1 parent d4a3541 commit 414ffff
Show file tree
Hide file tree
Showing 10 changed files with 217 additions and 95 deletions.
100 changes: 78 additions & 22 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use aptos_consensus_types::{
block::Block,
common::{Author, Payload, Round},
executed_block::ExecutedBlock,
quorum_cert::QuorumCert,
};
use aptos_crypto::HashValue;
use aptos_executor_types::StateComputeResult;
Expand All @@ -24,6 +25,7 @@ use aptos_storage_interface::{DbReader, Order};
use aptos_types::{
account_config::{new_block_event_key, NewBlockEvent},
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_change::EpochChangeProof,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
Expand All @@ -47,39 +49,59 @@ pub trait ProofNotifier: Send + Sync {
async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures);
}

pub struct OrderedNotifierAdapter {
pub(crate) fn compute_initial_block_and_ledger_info(
ledger_info_from_storage: LedgerInfoWithSignatures,
) -> (BlockInfo, LedgerInfoWithSignatures) {
// We start from the block that storage's latest ledger info, if storage has end-epoch
// LedgerInfo, we generate the virtual genesis block
if ledger_info_from_storage.ledger_info().ends_epoch() {
let genesis =
Block::make_genesis_block_from_ledger_info(ledger_info_from_storage.ledger_info());

let ledger_info = ledger_info_from_storage.ledger_info();
let genesis_qc = QuorumCert::certificate_for_genesis_from_ledger_info(
ledger_info_from_storage.ledger_info(),
genesis.id(),
);
let genesis_ledger_info = genesis_qc.ledger_info().clone();
(
genesis.gen_block_info(
ledger_info.transaction_accumulator_hash(),
ledger_info.version(),
ledger_info.next_epoch_state().cloned(),
),
genesis_ledger_info,
)
} else {
(
ledger_info_from_storage.ledger_info().commit_info().clone(),
ledger_info_from_storage,
)
}
}

pub(super) struct OrderedNotifierAdapter {
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
parent_block_id: Arc<RwLock<HashValue>>,
parent_block_info: Arc<RwLock<BlockInfo>>,
epoch_state: Arc<EpochState>,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
}

impl OrderedNotifierAdapter {
pub fn new(
pub(super) fn new(
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
epoch_state: Arc<EpochState>,
parent_block_info: BlockInfo,
ledger_info_provider: Arc<RwLock<LedgerInfoProvider>>,
) -> Self {
let ledger_info_from_storage = storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");

// We start from the block that storage's latest ledger info, if storage has end-epoch
// LedgerInfo, we generate the virtual genesis block
let parent_block_id = if ledger_info_from_storage.ledger_info().ends_epoch() {
let genesis =
Block::make_genesis_block_from_ledger_info(ledger_info_from_storage.ledger_info());

genesis.id()
} else {
ledger_info_from_storage.ledger_info().commit_info().id()
};

Self {
executor_channel,
storage,
parent_block_id: Arc::new(RwLock::new(parent_block_id)),
parent_block_info: Arc::new(RwLock::new(parent_block_info)),
epoch_state,
ledger_info_provider,
}
}
}
Expand All @@ -101,7 +123,7 @@ impl OrderedNotifier for OrderedNotifierAdapter {
payload.extend(node.payload().clone());
node_digests.push(node.digest());
}
let parent_block_id = *self.parent_block_id.read();
let parent_block_id = self.parent_block_info.read().id();
// construct the bitvec that indicates which nodes present in the previous round in CommitEvent
let mut parents_bitvec = BitVec::with_num_bits(self.epoch_state.verifier.len() as u16);
for parent in anchor.parents().iter() {
Expand Down Expand Up @@ -131,7 +153,8 @@ impl OrderedNotifier for OrderedNotifierAdapter {
);
let block_info = block.block_info();
let storage = self.storage.clone();
*self.parent_block_id.write() = block.id();
let ledger_info_provider = self.ledger_info_provider.clone();
*self.parent_block_info.write() = block_info.clone();
Ok(self.executor_channel.unbounded_send(OrderedBlocks {
ordered_blocks: vec![block],
ordered_proof: LedgerInfoWithSignatures::new(
Expand All @@ -140,7 +163,10 @@ impl OrderedNotifier for OrderedNotifierAdapter {
),
callback: Box::new(
move |committed_blocks: &[Arc<ExecutedBlock>],
_commit_decision: LedgerInfoWithSignatures| {
commit_decision: LedgerInfoWithSignatures| {
ledger_info_provider
.write()
.notify_commit_proof(commit_decision);
for executed_block in committed_blocks {
if let Some(node_digests) = executed_block.block().block_data().dag_nodes()
{
Expand Down Expand Up @@ -309,3 +335,33 @@ impl DAGStorage for StorageAdapter {
self.aptos_db.get_latest_ledger_info()
}
}

pub(crate) trait TLedgerInfoProvider: Send + Sync {
fn get_latest_ledger_info(&self) -> LedgerInfoWithSignatures;

fn get_highest_committed_anchor_round(&self) -> Round;
}

pub(super) struct LedgerInfoProvider {
latest_ledger_info: LedgerInfoWithSignatures,
}

impl LedgerInfoProvider {
pub(super) fn new(latest_ledger_info: LedgerInfoWithSignatures) -> Self {
Self { latest_ledger_info }
}

pub(super) fn notify_commit_proof(&mut self, ledger_info: LedgerInfoWithSignatures) {
self.latest_ledger_info = ledger_info;
}
}

impl TLedgerInfoProvider for RwLock<LedgerInfoProvider> {
fn get_latest_ledger_info(&self) -> LedgerInfoWithSignatures {
self.read().latest_ledger_info.clone()
}

fn get_highest_committed_anchor_round(&self) -> Round {
self.read().latest_ledger_info.ledger_info().round()
}
}
116 changes: 83 additions & 33 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation

use super::{
adapter::{OrderedNotifier, OrderedNotifierAdapter},
adapter::{OrderedNotifier, OrderedNotifierAdapter, TLedgerInfoProvider},
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, DagFetcherService, FetchRequestHandler},
Expand All @@ -16,7 +16,10 @@ use super::{
ProofNotifier,
};
use crate::{
dag::dag_state_sync::StateSyncStatus,
dag::{
adapter::{compute_initial_block_and_ledger_info, LedgerInfoProvider},
dag_state_sync::StateSyncStatus,
},
experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest,
state_replication::{PayloadClient, StateComputer},
Expand All @@ -25,17 +28,12 @@ use aptos_channels::{
aptos_channel::{self, Receiver},
message_queues::QueueStyle,
};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_consensus_types::common::{Author, Round};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_logger::{debug, error};
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
validator_signer::ValidatorSigner,
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
};
use futures_channel::{
mpsc::{UnboundedReceiver, UnboundedSender},
Expand Down Expand Up @@ -87,22 +85,31 @@ impl DagBootstrapper {

fn bootstrap_dag_store(
&self,
latest_ledger_info: LedgerInfo,
initial_ledger_info: LedgerInfo,
notifier: Arc<dyn OrderedNotifier>,
dag_window_size_config: usize,
) -> (Arc<RwLock<Dag>>, OrderRule) {
let initial_round = if initial_ledger_info.round() <= dag_window_size_config as Round {
1
} else {
initial_ledger_info
.round()
.saturating_sub(dag_window_size_config as Round)
};

let dag = Arc::new(RwLock::new(Dag::new(
self.epoch_state.clone(),
self.storage.clone(),
latest_ledger_info.round(),
DAG_WINDOW,
initial_round,
dag_window_size_config,
)));

let validators = self.epoch_state.verifier.get_ordered_account_addresses();
let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));

let order_rule = OrderRule::new(
self.epoch_state.clone(),
latest_ledger_info,
initial_ledger_info,
dag.clone(),
anchor_election,
notifier,
Expand All @@ -117,6 +124,7 @@ impl DagBootstrapper {
dag: Arc<RwLock<Dag>>,
order_rule: OrderRule,
state_sync_trigger: StateSyncTrigger,
ledger_info_provider: Arc<dyn TLedgerInfoProvider>,
) -> (NetworkHandler, DagFetcherService) {
let validators = self.epoch_state.verifier.get_ordered_account_addresses();

Expand Down Expand Up @@ -150,6 +158,7 @@ impl DagBootstrapper {
self.storage.clone(),
order_rule,
fetch_requester.clone(),
ledger_info_provider,
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
Expand Down Expand Up @@ -186,30 +195,45 @@ impl DagBootstrapper {
self.storage.clone(),
);

// TODO: fetch the correct block info
let ledger_info = LedgerInfoWithSignatures::new(
LedgerInfo::new(BlockInfo::empty(), HashValue::zero()),
AggregateSignature::empty(),
);

loop {
let ledger_info_from_storage = self
.storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");
let (parent_block_info, ledger_info) =
compute_initial_block_and_ledger_info(ledger_info_from_storage);
let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info)));

let adapter = Arc::new(OrderedNotifierAdapter::new(
ordered_nodes_tx.clone(),
self.storage.clone(),
self.epoch_state.clone(),
parent_block_info,
ledger_info_provider.clone(),
));

let (dag_store, order_rule) =
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());
let (dag_store, order_rule) = self.bootstrap_dag_store(
ledger_info_provider
.get_latest_ledger_info()
.ledger_info()
.clone(),
adapter.clone(),
DAG_WINDOW,
);

let state_sync_trigger = StateSyncTrigger::new(
self.epoch_state.clone(),
ledger_info_provider.clone(),
dag_store.clone(),
self.proof_notifier.clone(),
);

let (handler, fetch_service) =
self.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);
let (handler, fetch_service) = self.bootstrap_components(
dag_store.clone(),
order_rule,
state_sync_trigger,
ledger_info_provider.clone(),
);

let df_handle = tokio::spawn(fetch_service.start());

Expand All @@ -222,16 +246,27 @@ impl DagBootstrapper {
return;
},
sync_status = handler.run(&mut dag_rpc_rx) => {
debug!("state sync notification received. {:?}", sync_status);
df_handle.abort();
let _ = df_handle.await;

match sync_status {
StateSyncStatus::NeedsSync(certified_node_msg) => {
let dag_fetcher = DagFetcher::new(self.epoch_state.clone(), self.dag_network_sender.clone(), self.time_service.clone());

if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await {
error!(error = ?e, "unable to sync");
let highest_committed_anchor_round = ledger_info_provider.get_highest_committed_anchor_round();
let sync_future = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone(), highest_committed_anchor_round);

select! {
Err(e) = sync_future => {
error!(error = ?e, "unable to sync");
},
Ok(_) = &mut shutdown_rx => {
return;
}
}

debug!("going to rebootstrap.");
},
StateSyncStatus::EpochEnds => {
// Wait for epoch manager to signal shutdown
Expand All @@ -240,8 +275,6 @@ impl DagBootstrapper {
},
_ => unreachable!()
}


}
}
}
Expand Down Expand Up @@ -279,22 +312,39 @@ pub(super) fn bootstrap_dag_for_test(
state_computer,
);

let ledger_info_from_storage = storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");
let (parent_block_info, ledger_info) =
compute_initial_block_and_ledger_info(ledger_info_from_storage);
let ledger_info_provider = Arc::new(RwLock::new(LedgerInfoProvider::new(ledger_info)));

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Arc::new(OrderedNotifierAdapter::new(
ordered_nodes_tx,
storage.clone(),
epoch_state.clone(),
parent_block_info,
ledger_info_provider.clone(),
));
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

let (dag_store, order_rule) =
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone());
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone(), DAG_WINDOW);

let state_sync_trigger =
StateSyncTrigger::new(epoch_state, dag_store.clone(), proof_notifier.clone());
let state_sync_trigger = StateSyncTrigger::new(
epoch_state,
ledger_info_provider.clone(),
dag_store.clone(),
proof_notifier.clone(),
);

let (handler, fetch_service) =
bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);
let (handler, fetch_service) = bootstraper.bootstrap_components(
dag_store.clone(),
order_rule,
state_sync_trigger,
ledger_info_provider,
);

let dh_handle = tokio::spawn(async move {
let mut dag_rpc_rx = dag_rpc_rx;
Expand Down
Loading

0 comments on commit 414ffff

Please sign in to comment.