Skip to content

Commit

Permalink
[dag] Integrate Dag Fetcher with Dag Driver (#9453)
Browse files Browse the repository at this point in the history
* [dag] Integrate Dag Fetcher with Dag Driver
* [dag] Handle fetch response in dag handler
  • Loading branch information
ibalajiarun authored Aug 11, 2023
1 parent d814913 commit 20fb11f
Show file tree
Hide file tree
Showing 8 changed files with 147 additions and 18 deletions.
11 changes: 9 additions & 2 deletions consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-License-Identifier: Apache-2.0

use super::{
dag_fetcher::FetchRequester,
order_rule::OrderRule,
storage::DAGStorage,
types::{CertifiedAck, DAGMessage},
Expand All @@ -14,9 +15,10 @@ use crate::{
},
state_replication::PayloadClient,
};
use anyhow::bail;
use anyhow::{bail, Ok};
use aptos_consensus_types::common::{Author, Payload};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{block_info::Round, epoch_state::EpochState};
Expand Down Expand Up @@ -45,6 +47,7 @@ pub(crate) struct DagDriver {
rb_abort_handle: Option<AbortHandle>,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
}

impl DagDriver {
Expand All @@ -58,6 +61,7 @@ impl DagDriver {
time_service: TimeService,
storage: Arc<dyn DAGStorage>,
order_rule: OrderRule,
fetch_requester: Arc<FetchRequester>,
) -> Self {
// TODO: rebroadcast nodes after recovery
Self {
Expand All @@ -71,6 +75,7 @@ impl DagDriver {
rb_abort_handle: None,
storage,
order_rule,
fetch_requester,
}
}

Expand All @@ -88,7 +93,9 @@ impl DagDriver {
let round = node.metadata().round();

if !dag_writer.all_exists(node.parents_metadata()) {
// TODO(ibalajiarun): implement fetching logic.
if let Err(err) = self.fetch_requester.request_for_certified_node(node) {
error!("request to fetch failed: {}", err);
}
bail!(DagDriverError::MissingParents);
}

Expand Down
87 changes: 82 additions & 5 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,78 @@ use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_time_service::TimeService;
use aptos_types::epoch_state::EpochState;
use futures::StreamExt;
use std::{collections::HashMap, sync::Arc, time::Duration};
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use std::{
collections::HashMap,
pin::Pin,
sync::Arc,
task::{Context, Poll},
time::Duration,
};
use thiserror::Error as ThisError;
use tokio::sync::{
mpsc::{Receiver, Sender},
oneshot,
};

pub struct FetchWaiter<T> {
rx: Receiver<oneshot::Receiver<T>>,
futures: Pin<Box<FuturesUnordered<oneshot::Receiver<T>>>>,
}

impl<T> FetchWaiter<T> {
fn new(rx: Receiver<oneshot::Receiver<T>>) -> Self {
Self {
rx,
futures: Box::pin(FuturesUnordered::new()),
}
}
}

impl<T> Stream for FetchWaiter<T> {
type Item = Result<T, oneshot::error::RecvError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Poll::Ready(Some(rx)) = self.rx.poll_recv(cx) {
self.futures.push(rx);
}

self.futures.as_mut().poll_next(cx)
}
}

pub struct FetchRequester {
request_tx: Sender<LocalFetchRequest>,
node_waiter_tx: Sender<oneshot::Receiver<Node>>,
certified_node_waiter_tx: Sender<oneshot::Receiver<CertifiedNode>>,
}

impl FetchRequester {
pub fn request_for_node(&self, node: Node) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::Node(node, res_tx);
self.request_tx
.try_send(fetch_req)
.map_err(|e| anyhow::anyhow!("unable to send node fetch request to channel: {}", e))?;
self.node_waiter_tx.try_send(res_rx)?;
Ok(())
}

pub fn request_for_certified_node(&self, node: CertifiedNode) -> anyhow::Result<()> {
let (res_tx, res_rx) = oneshot::channel();
let fetch_req = LocalFetchRequest::CertifiedNode(node, res_tx);
self.request_tx.try_send(fetch_req).map_err(|e| {
anyhow::anyhow!(
"unable to send certified node fetch request to channel: {}",
e
)
})?;
self.certified_node_waiter_tx.try_send(res_rx)?;
Ok(())
}
}

#[derive(Debug)]
pub enum LocalFetchRequest {
Node(Node, oneshot::Sender<Node>),
CertifiedNode(CertifiedNode, oneshot::Sender<CertifiedNode>),
Expand Down Expand Up @@ -55,7 +119,7 @@ impl LocalFetchRequest {
}
}

struct DagFetcher {
pub struct DagFetcher {
epoch_state: Arc<EpochState>,
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
Expand All @@ -69,8 +133,15 @@ impl DagFetcher {
network: Arc<dyn DAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
time_service: TimeService,
) -> (Self, Sender<LocalFetchRequest>) {
) -> (
Self,
FetchRequester,
FetchWaiter<Node>,
FetchWaiter<CertifiedNode>,
) {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(16);
let (node_tx, node_rx) = tokio::sync::mpsc::channel(100);
let (certified_node_tx, certified_node_rx) = tokio::sync::mpsc::channel(100);
(
Self {
epoch_state,
Expand All @@ -79,7 +150,13 @@ impl DagFetcher {
request_rx,
time_service,
},
request_tx,
FetchRequester {
request_tx,
node_waiter_tx: node_tx,
certified_node_waiter_tx: certified_node_tx,
},
FetchWaiter::new(node_rx),
FetchWaiter::new(certified_node_rx),
)
}

Expand Down
47 changes: 40 additions & 7 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
// Copyright © Aptos Foundation

use super::{
dag_driver::DagDriver, dag_fetcher::FetchRequestHandler, dag_network::DAGNetworkSender,
order_rule::OrderRule, storage::DAGStorage, types::TDAGMessage,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler, FetchWaiter},
dag_network::DAGNetworkSender,
order_rule::OrderRule,
storage::DAGStorage,
types::TDAGMessage,
CertifiedNode, Node,
};
use crate::{
dag::{
dag_network::RpcHandler, dag_store::Dag, reliable_broadcast::NodeBroadcastHandler,
dag_network::RpcHandler, dag_store::Dag, rb_handler::NodeBroadcastHandler,
types::DAGMessage,
},
network::{IncomingDAGRequest, TConsensusMsg},
Expand All @@ -24,6 +29,7 @@ use aptos_types::{epoch_state::EpochState, validator_signer::ValidatorSigner};
use bytes::Bytes;
use futures::StreamExt;
use std::sync::Arc;
use tokio::select;
use tokio_retry::strategy::ExponentialBackoff;

struct NetworkHandler {
Expand All @@ -32,6 +38,8 @@ struct NetworkHandler {
dag_driver: DagDriver,
fetch_receiver: FetchRequestHandler,
epoch_state: Arc<EpochState>,
node_fetch_waiter: FetchWaiter<Node>,
certified_node_fetch_waiter: FetchWaiter<CertifiedNode>,
}

impl NetworkHandler {
Expand All @@ -42,7 +50,7 @@ impl NetworkHandler {
epoch_state: Arc<EpochState>,
storage: Arc<dyn DAGStorage>,
payload_client: Arc<dyn PayloadClient>,
_dag_network_sender: Arc<dyn DAGNetworkSender>,
dag_network_sender: Arc<dyn DAGNetworkSender>,
rb_network_sender: Arc<dyn RBNetworkSender<DAGMessage>>,
time_service: TimeService,
order_rule: OrderRule,
Expand All @@ -53,6 +61,14 @@ impl NetworkHandler {
ExponentialBackoff::from_millis(10),
time_service.clone(),
));
let (_dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcher::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
time_service.clone(),
);
let fetch_requester = Arc::new(fetch_requester);
Self {
dag_rpc_rx,
node_receiver: NodeBroadcastHandler::new(
Expand All @@ -71,19 +87,36 @@ impl NetworkHandler {
time_service,
storage,
order_rule,
fetch_requester,
),
epoch_state: epoch_state.clone(),
fetch_receiver: FetchRequestHandler::new(dag, epoch_state),
node_fetch_waiter,
certified_node_fetch_waiter,
}
}

async fn start(mut self) {
self.dag_driver.try_enter_new_round();

// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
while let Some(msg) = self.dag_rpc_rx.next().await {
if let Err(e) = self.process_rpc(msg).await {
warn!(error = ?e, "error processing rpc");
loop {
select! {
Some(msg) = self.dag_rpc_rx.next() => {
if let Err(e) = self.process_rpc(msg).await {
warn!(error = ?e, "error processing rpc");
}
},
Some(res) = self.node_fetch_waiter.next() => {
if let Err(e) = res.map_err(|e| anyhow::anyhow!("recv error: {}", e)).and_then(|node| self.node_receiver.process(node)) {
warn!(error = ?e, "error processing node fetch notification");
}
},
Some(res) = self.certified_node_fetch_waiter.next() => {
if let Err(e) = res.map_err(|e| anyhow::anyhow!("recv error: {}", e)).and_then(|certified_node| self.dag_driver.process(certified_node)) {
warn!(error = ?e, "error processing certified node fetch notification");
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod dag_handler;
mod dag_network;
mod dag_store;
mod order_rule;
mod reliable_broadcast;
mod rb_handler;
mod storage;
#[cfg(test)]
mod tests;
Expand Down
File renamed without changes.
14 changes: 13 additions & 1 deletion consensus/src/dag/tests/dag_driver_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::{
dag::{
anchor_election::RoundRobinAnchorElection,
dag_driver::{DagDriver, DagDriverError},
dag_fetcher::DagFetcher,
dag_network::{DAGNetworkSender, RpcWithFallback},
dag_store::Dag,
order_rule::OrderRule,
Expand Down Expand Up @@ -75,9 +76,10 @@ fn test_certified_node_handler() {

let zeroth_round_node = new_certified_node(0, signers[0].author(), vec![]);

let network_sender = Arc::new(MockNetworkSender {});
let rb = Arc::new(ReliableBroadcast::new(
signers.iter().map(|s| s.author()).collect(),
Arc::new(MockNetworkSender {}),
network_sender.clone(),
ExponentialBackoff::from_millis(10),
aptos_time_service::TimeService::mock(),
));
Expand All @@ -91,6 +93,15 @@ fn test_certified_node_handler() {
Box::new(RoundRobinAnchorElection::new(validators)),
ordered_nodes_sender,
);

let (_, fetch_requester, _, _) = DagFetcher::new(
epoch_state.clone(),
network_sender,
dag.clone(),
aptos_time_service::TimeService::mock(),
);
let fetch_requester = Arc::new(fetch_requester);

let mut driver = DagDriver::new(
signers[0].author(),
epoch_state,
Expand All @@ -101,6 +112,7 @@ fn test_certified_node_handler() {
time_service,
storage,
order_rule,
fetch_requester,
);

// expect an ack for a valid message
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,5 @@ mod dag_test;
mod fetcher_test;
mod helpers;
mod order_rule_tests;
mod reliable_broadcast_tests;
mod rb_handler_tests;
mod types_test;
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use crate::dag::{
dag_store::Dag,
reliable_broadcast::{NodeBroadcastHandleError, NodeBroadcastHandler},
rb_handler::{NodeBroadcastHandleError, NodeBroadcastHandler},
storage::DAGStorage,
tests::{dag_test::MockStorage, helpers::new_node},
types::NodeCertificate,
Expand Down

0 comments on commit 20fb11f

Please sign in to comment.