Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 11, 2023
1 parent 73c0bd6 commit a2a16b5
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 40 deletions.
5 changes: 5 additions & 0 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -280,4 +280,9 @@ impl DAGStorage for StorageAdapter {
}
Ok(commit_events)
}

fn get_latest_ledger_info(&self) -> anyhow::Result<LedgerInfoWithSignatures> {
// TODO: use callback from notifier to cache the latest ledger info
self.aptos_db.get_latest_ledger_info()
}
}
3 changes: 0 additions & 3 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use aptos_channels::{aptos_channel, message_queues::QueueStyle};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_storage_interface::DbReader;
use aptos_types::{
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
};
Expand All @@ -39,7 +38,6 @@ pub fn bootstrap_dag(
dag_network_sender: Arc<dyn TDAGNetworkSender>,
time_service: aptos_time_service::TimeService,
payload_client: Arc<dyn PayloadClient>,
db: Arc<dyn DbReader>,
) -> (
AbortHandle,
AbortHandle,
Expand Down Expand Up @@ -100,7 +98,6 @@ pub fn bootstrap_dag(
storage.clone(),
order_rule,
fetch_requester.clone(),
db,
);
let rb_handler = NodeBroadcastHandler::new(
dag.clone(),
Expand Down
6 changes: 1 addition & 5 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use aptos_consensus_types::common::{Author, Payload};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_storage_interface::DbReader;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{block_info::Round, epoch_state::EpochState};
use futures::{
Expand Down Expand Up @@ -50,7 +49,6 @@ pub(crate) struct DagDriver {
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
db: Arc<dyn DbReader>,
}

impl DagDriver {
Expand All @@ -64,7 +62,6 @@ impl DagDriver {
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
db: Arc<dyn DbReader>,
) -> Self {
let pending_node = storage
.get_pending_node()
Expand All @@ -86,7 +83,6 @@ impl DagDriver {
storage,
order_rule,
fetch_requester,
db,
};

// If we were broadcasting the node for the round already, resume it
Expand Down Expand Up @@ -156,7 +152,7 @@ impl DagDriver {
SignatureBuilder::new(node.metadata().clone(), self.epoch_state.clone());
let cert_ack_set = CertificateAckState::new(self.epoch_state.verifier.len());
let latest_ledger_info = self
.db
.storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");
let task = self
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/dag/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use super::{types::Vote, NodeId};
use crate::dag::{CertifiedNode, Node};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_types::ledger_info::LedgerInfoWithSignatures;

pub struct CommitEvent {
node_id: NodeId,
Expand Down Expand Up @@ -48,4 +49,6 @@ pub trait DAGStorage: Send + Sync {
fn delete_ordered_anchor_ids(&self, node_ids: Vec<NodeId>) -> anyhow::Result<()>;

fn get_latest_k_committed_events(&self, k: u64) -> anyhow::Result<Vec<CommitEvent>>;

fn get_latest_ledger_info(&self) -> anyhow::Result<LedgerInfoWithSignatures>;
}
27 changes: 6 additions & 21 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ use crate::{
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_storage_interface::DbReader;
use aptos_time_service::TimeService;
use aptos_types::{
epoch_state::EpochState,
ledger_info::{generate_ledger_info_with_sig, LedgerInfo, LedgerInfoWithSignatures},
ledger_info::{generate_ledger_info_with_sig, LedgerInfo},
validator_verifier::random_validator_verifier,
};
use async_trait::async_trait;
Expand Down Expand Up @@ -71,24 +70,17 @@ impl TDAGNetworkSender for MockNetworkSender {
}
}

pub(super) struct MockDbReader {
pub(super) ledger_info: LedgerInfoWithSignatures,
}

impl DbReader for MockDbReader {
fn get_latest_ledger_info(&self) -> anyhow::Result<LedgerInfoWithSignatures> {
Ok(self.ledger_info.clone())
}
}

#[tokio::test]
async fn test_certified_node_handler() {
let (signers, validator_verifier) = random_validator_verifier(4, None, false);
let epoch_state = Arc::new(EpochState {
epoch: 1,
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());

let mock_ledger_info = LedgerInfo::mock_genesis(None);
let mock_ledger_info = generate_ledger_info_with_sig(&signers, mock_ledger_info);
let storage = Arc::new(MockStorage::new_with_ledger_info(mock_ledger_info));
let dag = Arc::new(RwLock::new(Dag::new(
epoch_state.clone(),
storage.clone(),
Expand Down Expand Up @@ -122,13 +114,7 @@ async fn test_certified_node_handler() {
dag.clone(),
aptos_time_service::TimeService::mock(),
);
let fetch_requester = Arc::new(fetch_requester);

let mock_ledger_info = LedgerInfo::mock_genesis(None);
let mock_ledger_info = generate_ledger_info_with_sig(&signers, mock_ledger_info);
let db = Arc::new(MockDbReader {
ledger_info: mock_ledger_info,
});
let fetch_requester = Arc::new(fetch_requester);

let mut driver = DagDriver::new(
signers[0].author(),
Expand All @@ -140,7 +126,6 @@ async fn test_certified_node_handler() {
storage,
order_rule,
fetch_requester,
db,
);

let first_round_node = new_certified_node(1, signers[0].author(), vec![]);
Expand Down
21 changes: 19 additions & 2 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,16 @@ use crate::dag::{
use aptos_crypto::HashValue;
use aptos_infallible::Mutex;
use aptos_types::{
epoch_state::EpochState, validator_signer::ValidatorSigner,
validator_verifier::random_validator_verifier,
epoch_state::EpochState, ledger_info::LedgerInfoWithSignatures,
validator_signer::ValidatorSigner, validator_verifier::random_validator_verifier,
};
use std::{collections::HashMap, sync::Arc};

pub struct MockStorage {
node_data: Mutex<Option<Node>>,
vote_data: Mutex<HashMap<NodeId, Vote>>,
certified_node_data: Mutex<HashMap<HashValue, CertifiedNode>>,
latest_ledger_info: Option<LedgerInfoWithSignatures>,
}

impl MockStorage {
Expand All @@ -29,6 +30,16 @@ impl MockStorage {
node_data: Mutex::new(None),
vote_data: Mutex::new(HashMap::new()),
certified_node_data: Mutex::new(HashMap::new()),
latest_ledger_info: None,
}
}

pub fn new_with_ledger_info(ledger_info: LedgerInfoWithSignatures) -> Self {
Self {
node_data: Mutex::new(None),
vote_data: Mutex::new(HashMap::new()),
certified_node_data: Mutex::new(HashMap::new()),
latest_ledger_info: Some(ledger_info),
}
}
}
Expand Down Expand Up @@ -102,6 +113,12 @@ impl DAGStorage for MockStorage {
fn get_latest_k_committed_events(&self, _k: u64) -> anyhow::Result<Vec<CommitEvent>> {
Ok(vec![])
}

fn get_latest_ledger_info(&self) -> anyhow::Result<LedgerInfoWithSignatures> {
self.latest_ledger_info
.clone()
.ok_or_else(|| anyhow::anyhow!("ledger info not set"))
}
}

fn setup() -> (Vec<ValidatorSigner>, Arc<EpochState>, Dag, Arc<MockStorage>) {
Expand Down
13 changes: 4 additions & 9 deletions consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
// Copyright © Aptos Foundation

use super::{dag_driver_tests::MockDbReader, dag_test};
use super::dag_test;
use crate::{
dag::bootstrap::bootstrap_dag,
experimental::buffer_manager::OrderedBlocks,
Expand All @@ -26,9 +26,8 @@ use aptos_network::{
use aptos_time_service::TimeService;
use aptos_types::{
epoch_state::EpochState,
ledger_info::generate_ledger_info_with_sig,
validator_signer::ValidatorSigner,
validator_verifier::{random_validator_verifier, ValidatorVerifier},
validator_verifier::{random_validator_verifier, ValidatorVerifier}, ledger_info::generate_ledger_info_with_sig,
};
use claims::assert_gt;
use futures::{
Expand Down Expand Up @@ -64,16 +63,13 @@ impl DagBootstrapUnit {
epoch,
verifier: storage.get_validator_set().into(),
};
let dag_storage = dag_test::MockStorage::new();
let ledger_info = generate_ledger_info_with_sig(&all_signers, storage.get_ledger_info());
let dag_storage = dag_test::MockStorage::new_with_ledger_info(ledger_info);

let network = Arc::new(DAGNetworkSenderImpl::new(Arc::new(network)));

let payload_client = Arc::new(MockPayloadManager::new(None));

let mock_storage = Arc::new(MockDbReader {
ledger_info: generate_ledger_info_with_sig(&all_signers, storage.get_ledger_info()),
});

let (nh_abort_handle, df_abort_handle, dag_rpc_tx, ordered_nodes_rx) = bootstrap_dag(
self_peer,
signer,
Expand All @@ -84,7 +80,6 @@ impl DagBootstrapUnit {
network.clone(),
time_service,
payload_client,
mock_storage,
);

(
Expand Down

0 comments on commit a2a16b5

Please sign in to comment.