Skip to content

Commit

Permalink
lint/feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 18, 2023
1 parent 0808e25 commit 64ea42f
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 25 deletions.
7 changes: 5 additions & 2 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ pub(super) fn bootstrap_dag_for_test(

let (ordered_nodes_tx, ordered_nodes_rx) = futures_channel::mpsc::unbounded();
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, storage.clone()));
let (dag_rpc_tx, mut dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);
let (dag_rpc_tx, dag_rpc_rx) = aptos_channel::new(QueueStyle::FIFO, 64, None);

let (dag_store, order_rule) =
bootstraper.bootstrap_dag_store(latest_ledger_info, adapter.clone());
Expand All @@ -266,7 +266,10 @@ pub(super) fn bootstrap_dag_for_test(
let (handler, fetch_service) =
bootstraper.bootstrap_components(dag_store.clone(), order_rule, state_sync_trigger);

let dh_handle = tokio::spawn(async move { handler.run(&mut dag_rpc_rx).await });
let dh_handle = tokio::spawn(async move {
let mut dag_rpc_rx = dag_rpc_rx;
handler.run(&mut dag_rpc_rx).await
});
let df_handle = tokio::spawn(fetch_service.start());

(dh_handle, df_handle, dag_rpc_tx, ordered_nodes_rx)
Expand Down
8 changes: 6 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{block_info::Round, epoch_state::EpochState};
use async_trait::async_trait;
use futures::{
executor::block_on,
future::{AbortHandle, Abortable},
FutureExt, executor::block_on,
FutureExt,
};
use std::{sync::Arc, time::Duration};
use thiserror::Error as ThisError;
Expand Down Expand Up @@ -73,7 +74,10 @@ impl DagDriver {
.get_strong_links_for_round(highest_round, &epoch_state.verifier)
.map_or_else(|| highest_round.saturating_sub(1), |_| highest_round);

debug!("highest_round: {}, current_round: {}", highest_round, current_round);
debug!(
"highest_round: {}, current_round: {}",
highest_round, current_round
);

let mut driver = Self {
author,
Expand Down
34 changes: 16 additions & 18 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
use anyhow::bail;
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::Author;
use aptos_logger::{error, warn};
use aptos_logger::{debug, warn};
use aptos_network::protocols::network::RpcError;
use aptos_types::epoch_state::EpochState;
use bytes::Bytes;
Expand Down Expand Up @@ -78,7 +78,7 @@ impl NetworkHandler {
Some(res) = self.node_fetch_waiter.next() => {
match res {
Ok(node) => if let Err(e) = self.node_receiver.process(node).await {
warn!(error = ?e, "error processing node fetch notification");
warn!(error = ?e, "error processing node fetch notification");
},
Err(e) => {
debug!("sender dropped channel: {}", e);
Expand All @@ -105,13 +105,10 @@ impl NetworkHandler {
certified_node.verify(&self.epoch_state.verifier)
},
DAGMessage::FetchRequest(request) => request.verify(&self.epoch_state.verifier),
_ => {
error!(
"unknown rpc message {:?}",
std::mem::discriminant(dag_message)
);
Err(anyhow::anyhow!("unexpected rpc message"))
},
_ => Err(anyhow::anyhow!(
"unexpected rpc message{:?}",
std::mem::discriminant(dag_message)
)),
}
}

Expand All @@ -129,22 +126,23 @@ impl NetworkHandler {
}

let response: anyhow::Result<DAGMessage> = {
match self.verify_incoming_rpc(&dag_message) {
let verification_result = self.verify_incoming_rpc(&dag_message);
match verification_result {
Ok(_) => match dag_message {
DAGMessage::NodeMsg(node) => {
self.node_receiver.process(node).await.map(|r| r.into())
},
DAGMessage::CertifiedNodeMsg(certified_node_msg) => {
DAGMessage::CertifiedNodeMsg(certified_node_msg) => {
match self.state_sync_trigger.check(certified_node_msg).await {
ret @ (NeedsSync(_), None) => return Ok(ret.0),
(Synced, Some(certified_node_msg)) => self
.dag_driver
.process(certified_node_msg.certified_node())
ret @ (NeedsSync(_), None) => return Ok(ret.0),
(Synced, Some(certified_node_msg)) => self
.dag_driver
.process(certified_node_msg.certified_node())
.await
.map(|r| r.into()),
_ => unreachable!(),
.map(|r| r.into()),
_ => unreachable!(),
}
},
},
DAGMessage::FetchRequest(request) => {
self.fetch_receiver.process(request).await.map(|r| r.into())
},
Expand Down
3 changes: 1 addition & 2 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ impl StateSyncTrigger {
}

/// This method checks if a state sync is required, and if so,
/// notifies the bootstraper and yields the current task infinitely,
/// to let the bootstraper can abort this task.
/// notifies the bootstraper, to let the bootstraper can abort this task.
pub(super) async fn check(
&self,
node: CertifiedNodeMessage,
Expand Down
5 changes: 4 additions & 1 deletion consensus/src/dag/tests/rb_handler_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,10 @@ async fn test_node_broadcast_receiver_succeed() {
// expect an ack for a valid message
assert_ok_eq!(rb_receiver.process(wellformed_node).await, expected_result);
// expect the original ack for any future message from same author
assert_ok_eq!(rb_receiver.process(equivocating_node).await, expected_result);
assert_ok_eq!(
rb_receiver.process(equivocating_node).await,
expected_result
);
}

// TODO: Unit test node broad receiver with a pruned DAG store. Possibly need a validator verifier trait.
Expand Down

0 comments on commit 64ea42f

Please sign in to comment.