Skip to content

Commit

Permalink
remove committed node status enum
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 5, 2023
1 parent caee6f6 commit 7b1f793
Show file tree
Hide file tree
Showing 8 changed files with 28 additions and 69 deletions.
1 change: 1 addition & 0 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub fn bootstrap_dag(
epoch_state.clone(),
storage.clone(),
current_round,
0,
)));

let anchor_election = Box::new(RoundRobinAnchorElection::new(validators));
Expand Down
33 changes: 8 additions & 25 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use super::{
TDAGNetworkSender,
};
use crate::state_replication::StateComputer;
use anyhow::anyhow;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_time_service::TimeService;
Expand Down Expand Up @@ -75,10 +74,7 @@ impl StateSyncManager {

// if the anchor exists between ledger info round and highest ordered round
// Note: ledger info round <= highest ordered round
if dag_reader
.highest_committed_anchor_round()
.unwrap_or_default()
< ledger_info.commit_info().round()
if dag_reader.highest_committed_anchor_round() < ledger_info.commit_info().round()
&& dag_reader
.highest_ordered_anchor_round()
.unwrap_or_default()
Expand All @@ -94,14 +90,15 @@ impl StateSyncManager {
/// This ensures that the block referred by the ledger info is not in buffer manager.
pub fn need_sync_for_ledger_info(&self, li: &LedgerInfoWithSignatures) -> bool {
let dag_reader = self.dag_store.read();
// check whether if DAG order round is behind the given ledger info round
// (meaning consensus is behind) or
// the highest committed anchor round is 2*DAG_WINDOW behind the given ledger info round
// (meaning execution is behind the DAG window)
(dag_reader
.highest_ordered_anchor_round()
.unwrap_or_default()
< li.commit_info().round())
|| dag_reader
.highest_committed_anchor_round()
.unwrap_or_default()
+ 2 * DAG_WINDOW
|| dag_reader.highest_committed_anchor_round() + 2 * DAG_WINDOW
< li.commit_info().round()
}

Expand Down Expand Up @@ -153,6 +150,7 @@ impl StateSyncManager {
self.epoch_state.clone(),
self.storage.clone(),
start_round,
commit_li.commit_info().round(),
)));
let bitmask = { sync_dag_store.read().bitmask(target_round) };
let request = RemoteFetchRequest::new(
Expand Down Expand Up @@ -180,22 +178,7 @@ impl StateSyncManager {
// State sync
self.state_computer.sync_to(commit_li.clone()).await?;

{
let mut dag_writer = sync_dag_store.write();
dag_writer.prune();
if let Some(node_status) = dag_writer.get_node_ref_mut_by_round_digest(
commit_li.ledger_info().round(),
commit_li.ledger_info().consensus_data_hash(),
) {
node_status.mark_as_committed();
} else {
error!(
"node for commit ledger info does not exist in DAG: {}",
commit_li
);
return Err(anyhow!("commit ledger info node not found"));
}
}
// TODO: the caller should rebootstrap the order rule

Ok(Some(sync_dag_store))
}
Expand Down
47 changes: 9 additions & 38 deletions consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,30 +20,20 @@ use std::{
pub enum NodeStatus {
Unordered(Arc<CertifiedNode>),
Ordered(Arc<CertifiedNode>),
Committed(Arc<CertifiedNode>),
}

impl NodeStatus {
pub fn as_node(&self) -> &Arc<CertifiedNode> {
match self {
NodeStatus::Unordered(node)
| NodeStatus::Ordered(node)
| NodeStatus::Committed(node) => node,
NodeStatus::Unordered(node) | NodeStatus::Ordered(node) => node,
}
}

pub fn mark_as_ordered(&mut self) {
assert!(matches!(self, NodeStatus::Unordered(_)));
*self = NodeStatus::Ordered(self.as_node().clone());
}

pub fn mark_as_committed(&mut self) {
assert!(!matches!(self, NodeStatus::Committed(_)));
// TODO: try to avoid clone
*self = NodeStatus::Committed(self.as_node().clone());
}
}

/// Data structure that stores the DAG representation, it maintains round based index.
#[derive(Clone)]
pub struct Dag {
Expand All @@ -53,13 +43,16 @@ pub struct Dag {
storage: Arc<dyn DAGStorage>,
initial_round: Round,
epoch_state: Arc<EpochState>,

highest_committed_anchor_round: Round,
}

impl Dag {
pub fn new(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
highest_committed_anchor_round: Round,
) -> Self {
let epoch = epoch_state.epoch;
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
Expand Down Expand Up @@ -91,13 +84,15 @@ impl Dag {
storage,
initial_round,
epoch_state,
highest_committed_anchor_round,
}
}

pub fn new_empty(
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
initial_round: Round,
highest_committed_anchor_round: Round,
) -> Self {
let author_to_index = epoch_state.verifier.address_to_validator_index().clone();
let nodes_by_round = BTreeMap::new();
Expand All @@ -107,6 +102,7 @@ impl Dag {
storage,
initial_round,
epoch_state,
highest_committed_anchor_round,
}
}

Expand Down Expand Up @@ -175,15 +171,6 @@ impl Dag {
nodes.filter(|node_metadata| !self.exists(node_metadata))
}

pub fn get_node_ref_mut_by_round_digest(
&mut self,
round: Round,
digest: HashValue,
) -> Option<&mut NodeStatus> {
self.get_round_iter_mut(round)?
.find(|node_status| node_status.as_node().digest() == digest)
}

fn get_node_ref_by_metadata(&self, metadata: &NodeMetadata) -> Option<&NodeStatus> {
self.get_node_ref(metadata.round(), metadata.author())
}
Expand All @@ -206,15 +193,6 @@ impl Dag {
.map(|round_ref| round_ref.iter().flatten())
}

fn get_round_iter_mut(
&mut self,
round: Round,
) -> Option<impl Iterator<Item = &mut NodeStatus>> {
self.nodes_by_round
.get_mut(&round)
.map(|round_ref| round_ref.iter_mut().flatten())
}

pub fn get_node(&self, metadata: &NodeMetadata) -> Option<Arc<CertifiedNode>> {
self.get_node_ref_by_metadata(metadata)
.map(|node_status| node_status.as_node().clone())
Expand Down Expand Up @@ -382,14 +360,7 @@ impl Dag {
None
}

pub(super) fn highest_committed_anchor_round(&self) -> Option<Round> {
for (round, round_nodes) in self.nodes_by_round.iter().rev() {
for maybe_node_status in round_nodes {
if matches!(maybe_node_status, Some(NodeStatus::Committed(_))) {
return Some(*round);
}
}
}
None
pub(super) fn highest_committed_anchor_round(&self) -> Round {
self.highest_committed_anchor_round
}
}
1 change: 1 addition & 0 deletions consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ async fn test_certified_node_handler() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

let network_sender = Arc::new(MockNetworkSender {});
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/tests/dag_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ fn setup() -> (Vec<ValidatorSigner>, Arc<EpochState>, Dag, Arc<MockStorage>) {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Dag::new(epoch_state.clone(), storage.clone(), 1);
let dag = Dag::new(epoch_state.clone(), storage.clone(), 1, 0);
(signers, epoch_state, dag, storage)
}

Expand Down Expand Up @@ -194,7 +194,7 @@ fn test_dag_recover_from_storage() {
assert!(dag.add_node(node).is_ok());
}
}
let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1);
let new_dag = Dag::new(epoch_state.clone(), storage.clone(), 1, 0);

for metadata in &metadatas {
assert!(new_dag.exists(metadata));
Expand All @@ -205,7 +205,7 @@ fn test_dag_recover_from_storage() {
verifier: epoch_state.verifier.clone(),
});

let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1);
let _new_epoch_dag = Dag::new(new_epoch_state, storage.clone(), 1, 0);
assert!(storage.certified_node_data.lock().is_empty());
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/fetcher_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ fn test_dag_fetcher_receiver() {
verifier: validator_verifier,
});
let storage = Arc::new(MockStorage::new());
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1)));
let dag = Arc::new(RwLock::new(Dag::new(epoch_state.clone(), storage, 1, 0)));

let mut fetcher = FetchRequestHandler::new(dag.clone(), epoch_state);

Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/order_rule_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ proptest! {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down Expand Up @@ -261,7 +261,7 @@ fn test_order_rule_basic() {
epoch: 1,
verifier: validator_verifier,
});
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1);
let mut dag = Dag::new(epoch_state.clone(), Arc::new(MockStorage::new()), 1, 0);
for round_nodes in &nodes {
for node in round_nodes.iter().flatten() {
dag.add_node(node.clone()).unwrap();
Expand Down
3 changes: 3 additions & 0 deletions consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ async fn test_node_broadcast_receiver_succeed() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

let wellformed_node = new_node(1, 10, signers[0].author(), vec![]);
Expand Down Expand Up @@ -87,6 +88,7 @@ async fn test_node_broadcast_receiver_failure() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

NodeBroadcastHandler::new(
Expand Down Expand Up @@ -162,6 +164,7 @@ fn test_node_broadcast_receiver_storage() {
epoch_state.clone(),
storage.clone(),
1,
0
)));

let node = new_node(1, 10, signers[0].author(), vec![]);
Expand Down

0 comments on commit 7b1f793

Please sign in to comment.