Skip to content

Commit

Permalink
[dag] simplify DAGNetworkSender impl (#10042)
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun authored Sep 17, 2023
1 parent 44aa832 commit a4d42ba
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 43 deletions.
2 changes: 1 addition & 1 deletion consensus/src/dag/dag_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage> {
/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
self: Arc<Self>,
responders: Vec<Author>,
message: DAGMessage,
retry_interval: Duration,
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl TDAGNetworkSender for MockNetworkSender {
/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
self: Arc<Self>,
_responders: Vec<Author>,
_message: DAGMessage,
_retry_interval: Duration,
Expand Down
6 changes: 3 additions & 3 deletions consensus/src/dag/tests/dag_network_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl TDAGNetworkSender for MockDAGNetworkSender {
}

async fn send_rpc_with_fallbacks(
&self,
self: Arc<Self>,
responders: Vec<Author>,
message: DAGMessage,
retry_interval: Duration,
Expand All @@ -81,7 +81,7 @@ impl TDAGNetworkSender for MockDAGNetworkSender {
message,
retry_interval,
rpc_timeout,
Arc::new(self.clone()),
self.clone(),
self.time_service.clone(),
)
}
Expand Down Expand Up @@ -111,7 +111,7 @@ async fn test_send_rpc_with_fallback() {
};

let message = TestMessage(vec![42; validators.len() - 1]);
let mut rpc = sender
let mut rpc = Arc::new(sender)
.send_rpc_with_fallbacks(
validators,
message.into(),
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/dag_state_sync_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl TDAGNetworkSender for MockDAGNetworkSender {
/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
self: Arc<Self>,
_responders: Vec<Author>,
_message: DAGMessage,
_retry_interval: Duration,
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::dag_test;
use crate::{
dag::{bootstrap::bootstrap_dag_for_test, types::CertifiedNodeMessage},
experimental::buffer_manager::OrderedBlocks,
network::{DAGNetworkSenderImpl, IncomingDAGRequest, NetworkSender},
network::{IncomingDAGRequest, NetworkSender},
network_interface::{ConsensusMsg, ConsensusNetworkClient, DIRECT_SEND, RPC},
network_tests::{NetworkPlayground, TwinId},
test_utils::{consensus_runtime, EmptyStateComputer, MockPayloadManager, MockStorage},
Expand Down Expand Up @@ -68,7 +68,7 @@ impl DagBootstrapUnit {
let ledger_info = generate_ledger_info_with_sig(&all_signers, storage.get_ledger_info());
let dag_storage = dag_test::MockStorage::new_with_ledger_info(ledger_info);

let network = Arc::new(DAGNetworkSenderImpl::new(Arc::new(network)));
let network = Arc::new(network);

let payload_client = Arc::new(MockPayloadManager::new(None));

Expand Down
47 changes: 12 additions & 35 deletions consensus/src/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use aptos_network::{
protocols::{network::Event, rpc::error::RpcError},
ProtocolId,
};
use aptos_reliable_broadcast::{RBMessage, RBNetworkSender};
use aptos_reliable_broadcast::RBNetworkSender;
use aptos_types::{
account_address::AccountAddress, epoch_change::EpochChangeProof,
ledger_info::LedgerInfoWithSignatures, validator_verifier::ValidatorVerifier,
Expand Down Expand Up @@ -154,6 +154,7 @@ pub struct NetworkSender {
// (self sending is not supported by the networking API).
self_sender: aptos_channels::Sender<Event<ConsensusMsg>>,
validators: ValidatorVerifier,
time_service: aptos_time_service::TimeService,
}

impl NetworkSender {
Expand All @@ -168,6 +169,7 @@ impl NetworkSender {
consensus_network_client,
self_sender,
validators,
time_service: aptos_time_service::TimeService::real(),
}
}

Expand Down Expand Up @@ -441,34 +443,15 @@ impl QuorumStoreSender for NetworkSender {
}
}

// TODO: this can be improved
#[derive(Clone)]
pub struct DAGNetworkSenderImpl {
sender: Arc<NetworkSender>,
time_service: aptos_time_service::TimeService,
}

impl DAGNetworkSenderImpl {
#[allow(unused)]
pub fn new(sender: Arc<NetworkSender>) -> Self {
Self {
sender,
time_service: aptos_time_service::TimeService::real(),
}
}
}

#[async_trait]
impl TDAGNetworkSender for DAGNetworkSenderImpl {
impl TDAGNetworkSender for NetworkSender {
async fn send_rpc(
&self,
receiver: Author,
message: DAGMessage,
timeout: Duration,
) -> anyhow::Result<DAGMessage> {
self.sender
.consensus_network_client
.send_rpc(receiver, message.into_network_message(), timeout)
self.send_rpc(receiver, message.into_network_message(), timeout)
.await
.map_err(|e| anyhow!("invalid rpc response: {}", e))
.and_then(TConsensusMsg::from_network_message)
Expand All @@ -477,41 +460,35 @@ impl TDAGNetworkSender for DAGNetworkSenderImpl {
/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
self: Arc<Self>,
responders: Vec<Author>,
message: DAGMessage,
retry_interval: Duration,
rpc_timeout: Duration,
) -> RpcWithFallback {
let sender = Arc::new(self.clone());
RpcWithFallback::new(
responders,
message,
retry_interval,
rpc_timeout,
sender,
self.clone(),
self.time_service.clone(),
)
}
}

#[async_trait]
impl<M> RBNetworkSender<M> for DAGNetworkSenderImpl
where
M: RBMessage + TConsensusMsg + 'static,
{
impl RBNetworkSender<DAGMessage> for NetworkSender {
async fn send_rb_rpc(
&self,
receiver: Author,
message: M,
message: DAGMessage,
timeout: Duration,
) -> anyhow::Result<M> {
self.sender
.consensus_network_client
.send_rpc(receiver, message.into_network_message(), timeout)
) -> anyhow::Result<DAGMessage> {
self.send_rpc(receiver, message.into_network_message(), timeout)
.await
.map_err(|e| anyhow!("invalid rpc response: {}", e))
.and_then(|msg| TConsensusMsg::from_network_message(msg))
.and_then(TConsensusMsg::from_network_message)
}
}

Expand Down

0 comments on commit a4d42ba

Please sign in to comment.