Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 8, 2023
1 parent 882db15 commit c40e5e7
Show file tree
Hide file tree
Showing 9 changed files with 68 additions and 22 deletions.
11 changes: 9 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{storage::DAGStorage, types::CertifiedAck, RpcHandler};
use super::{
storage::DAGStorage,
types::{CertifiedAck, DAGMessage},
RpcHandler,
};
use crate::{
dag::{
dag_store::Dag,
Expand All @@ -20,8 +24,8 @@ use futures::{
FutureExt,
};
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;
use thiserror::Error as ThisError;
use tokio_retry::strategy::ExponentialBackoff;

#[derive(Debug, ThisError)]
pub enum DagDriverError {
Expand Down Expand Up @@ -67,9 +71,12 @@ impl DagDriver {
}

pub fn try_enter_new_round(&mut self) {
// In case of a new epoch, kickstart building the DAG by entering the next round
// without any parents.
if self.current_round == 0 {
self.enter_new_round(vec![]);
}
// TODO: add logic to handle building DAG from the middle, etc.
}

pub fn add_node(&mut self, node: CertifiedNode) -> anyhow::Result<()> {
Expand Down
19 changes: 13 additions & 6 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,29 @@

use super::{
dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, dag_network::DAGNetworkSender,
reliable_broadcast::ReliableBroadcast, storage::DAGStorage, types::TDAGMessage,
storage::DAGStorage, types::TDAGMessage,
};
use crate::{
dag::{
dag_network::RpcHandler, dag_store::Dag, reliable_broadcast::NodeBroadcastHandler,
types::DAGMessage,
},
network::{IncomingDAGRequest, TConsensusMsg},
state_replication::PayloadClient, util::time_service::TimeService,
state_replication::PayloadClient,
util::time_service::TimeService,
};
use anyhow::bail;
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_logger::{error, warn};
use aptos_network::protocols::network::RpcError;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner};
use bytes::Bytes;
use futures::StreamExt;
use std::sync::Arc;
use tokio_retry::strategy::ExponentialBackoff;

struct NetworkHandler {
dag_rpc_rx: aptos_channel::Receiver<Author, IncomingDAGRequest>,
Expand All @@ -39,12 +42,16 @@ impl NetworkHandler {
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
payload_client: Arc<dyn PayloadClient>,
network_sender: Arc<dyn DAGNetworkSender>,
_dag_network_sender: Arc<dyn DAGNetworkSender>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: Arc<dyn TimeService>,
aptos_time_service: aptos_time_service::TimeService,
) -> Self {
let rb = Arc::new(ReliableBroadcast::new(
epoch_state.verifier.get_ordered_account_addresses().clone(),
network_sender,
rb_network_sender,
ExponentialBackoff::from_millis(10),
aptos_time_service,
));
Self {
dag_rpc_rx,
Expand All @@ -70,8 +77,8 @@ impl NetworkHandler {
}

async fn start(mut self) {
self.dag_driver.try_enter_new_round();
self.dag_driver.try_enter_new_round();

// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
while let Some(msg) = self.dag_rpc_rx.next().await {
if let Err(e) = self.process_rpc(msg).await {
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/dag/dag_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use super::types::DAGMessage;
use aptos_consensus_types::common::Author;
use aptos_reliable_broadcast::RBNetworkSender;
use aptos_time_service::{Interval, TimeService, TimeServiceTrait};
use async_trait::async_trait;
use futures::{
Expand All @@ -24,7 +25,7 @@ pub trait RpcHandler {
}

#[async_trait]
pub trait DAGNetworkSender: Send + Sync {
pub trait DAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage> {
async fn send_rpc(
&self,
receiver: Author,
Expand Down
19 changes: 18 additions & 1 deletion consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
// Copyright © Aptos Foundation

use crate::{
dag::{
dag_driver::{DagDriver, DagDriverError},
dag_network::{DAGNetworkSender, RpcWithFallback},
dag_store::Dag,
reliable_broadcast::ReliableBroadcast,
tests::{dag_test::MockStorage, helpers::new_certified_node},
types::{CertifiedAck, DAGMessage},
RpcHandler,
Expand All @@ -13,13 +14,27 @@ use crate::{
};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{epoch_state::EpochState, validator_verifier::random_validator_verifier};
use async_trait::async_trait;
use claims::{assert_ok, assert_ok_eq};
use std::{sync::Arc, time::Duration};
use tokio_retry::strategy::ExponentialBackoff;

struct MockNetworkSender {}

#[async_trait]
impl RBNetworkSender<DAGMessage> for MockNetworkSender {
async fn send_rb_rpc(
&self,
_receiver: Author,
_messagee: DAGMessage,
_timeout: Duration,
) -> anyhow::Result<DAGMessage> {
unimplemented!()
}
}

#[async_trait]
impl DAGNetworkSender for MockNetworkSender {
async fn send_rpc(
Expand Down Expand Up @@ -59,6 +74,8 @@ fn test_certified_node_handler() {
let rb = Arc::new(ReliableBroadcast::new(
signers.iter().map(|s| s.author()).collect(),
Arc::new(MockNetworkSender {}),
ExponentialBackoff::from_millis(10),
aptos_time_service::TimeService::mock(),
));
let time_service = Arc::new(SimulatedTimeService::new());
let mut driver = DagDriver::new(
Expand Down
13 changes: 13 additions & 0 deletions consensus/src/dag/tests/dag_network_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::dag::{
use anyhow::{anyhow, bail};
use aptos_consensus_types::common::Author;
use aptos_infallible::Mutex;
use aptos_reliable_broadcast::RBNetworkSender;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::validator_verifier::random_validator_verifier;
use async_trait::async_trait;
Expand All @@ -27,6 +28,18 @@ struct MockDAGNetworkSender {
test_peer_state: Arc<Mutex<HashMap<Author, TestPeerState>>>,
}

#[async_trait]
impl RBNetworkSender<DAGMessage> for MockDAGNetworkSender {
async fn send_rb_rpc(
&self,
_receiver: Author,
_message: DAGMessage,
_timeout: Duration,
) -> anyhow::Result<DAGMessage> {
unimplemented!()
}
}

#[async_trait]
impl DAGNetworkSender for MockDAGNetworkSender {
async fn send_rpc(
Expand Down
12 changes: 3 additions & 9 deletions consensus/src/dag/tests/reliable_broadcast_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@

use crate::dag::{
dag_store::Dag,
reliable_broadcast::{
CertifiedNodeHandleError, CertifiedNodeHandler, NodeBroadcastHandleError,
NodeBroadcastHandler,
},
reliable_broadcast::{NodeBroadcastHandleError, NodeBroadcastHandler},
storage::DAGStorage,
tests::{
dag_test::MockStorage,
helpers::{new_certified_node, new_node},
},
types::{CertifiedAck, NodeCertificate},
tests::{dag_test::MockStorage, helpers::new_node},
types::NodeCertificate,
NodeId, RpcHandler, Vote,
};
use aptos_infallible::RwLock;
Expand Down
2 changes: 2 additions & 0 deletions consensus/src/dag/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ impl TDAGMessage for Node {

let current_round = self.metadata().round();

ensure!(current_round > 0, "current round cannot be zero");

if current_round == 1 {
ensure!(self.parents().is_empty(), "invalid parents for round 1");
return Ok(());
Expand Down
9 changes: 7 additions & 2 deletions crates/reliable-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ pub trait RBMessage: Send + Sync + Clone {}

#[async_trait]
pub trait RBNetworkSender<M: RBMessage>: Send + Sync {
async fn send_rpc(&self, receiver: Author, message: M, timeout: Duration) -> anyhow::Result<M>;
async fn send_rb_rpc(
&self,
receiver: Author,
message: M,
timeout: Duration,
) -> anyhow::Result<M>;
}

pub trait BroadcastStatus<M: RBMessage> {
Expand Down Expand Up @@ -74,7 +79,7 @@ where
(
receiver,
network_sender
.send_rpc(receiver, message, Duration::from_millis(500))
.send_rb_rpc(receiver, message, Duration::from_millis(500))
.await,
)
}
Expand Down
2 changes: 1 addition & 1 deletion crates/reliable-broadcast/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ where
TestAck: TryFrom<M> + Into<M>,
TestMessage: TryFrom<M, Error = anyhow::Error> + Into<M>,
{
async fn send_rpc(
async fn send_rb_rpc(
&self,
receiver: Author,
message: M,
Expand Down

0 comments on commit c40e5e7

Please sign in to comment.