Skip to content

Commit

Permalink
[dag] basic state sync impl
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Aug 29, 2023
1 parent b6557f5 commit abf8a81
Show file tree
Hide file tree
Showing 15 changed files with 502 additions and 99 deletions.
3 changes: 1 addition & 2 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use aptos_consensus_types::{
common::{Author, Payload, Round},
executed_block::ExecutedBlock,
};
use aptos_crypto::HashValue;
use aptos_executor_types::StateComputeResult;
use aptos_logger::error;
use aptos_types::{
Expand Down Expand Up @@ -65,7 +64,7 @@ impl Notifier for BufferManagerAdapter {
Ok(self.executor_channel.unbounded_send(OrderedBlocks {
ordered_blocks: vec![block],
ordered_proof: LedgerInfoWithSignatures::new(
LedgerInfo::new(block_info, HashValue::zero()),
LedgerInfo::new(block_info, anchor.digest()),
AggregateSignature::empty(),
),
callback: Box::new(
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use super::{
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcher, FetchRequestHandler},
dag_fetcher::{DagFetcherService, FetchRequestHandler},
dag_handler::NetworkHandler,
dag_network::TDAGNetworkSender,
dag_store::Dag,
Expand Down Expand Up @@ -76,7 +76,7 @@ pub fn bootstrap_dag(
);

let (dag_fetcher, fetch_requester, node_fetch_waiter, certified_node_fetch_waiter) =
DagFetcher::new(
DagFetcherService::new(
epoch_state.clone(),
dag_network_sender,
dag.clone(),
Expand Down
227 changes: 148 additions & 79 deletions consensus/src/dag/dag_fetcher.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
// Copyright © Aptos Foundation
// SPDX-License-Identifier: Apache-2.0

use super::{dag_network::RpcWithFallback, types::NodeMetadata, RpcHandler};
use super::{
dag_network::RpcWithFallback,
types::{NodeMetadata, RemoteFetchRequestTargets},
RpcHandler,
};
use crate::dag::{
dag_network::TDAGNetworkSender,
dag_store::Dag,
types::{CertifiedNode, FetchResponse, Node, RemoteFetchRequest},
};
use anyhow::ensure;
use anyhow::{anyhow, ensure};
use aptos_consensus_types::common::Author;
use aptos_infallible::RwLock;
use aptos_infallible::{RwLock, RwLockReadGuard};
use aptos_logger::error;
use aptos_time_service::TimeService;
use aptos_types::epoch_state::EpochState;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, Stream, StreamExt};
use std::{
collections::HashMap,
Expand Down Expand Up @@ -124,15 +129,13 @@ impl LocalFetchRequest {
}
}

pub struct DagFetcher {
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
pub struct DagFetcherService {
inner: DagFetcher,
ordered_authors: Vec<Author>,
request_rx: Receiver<LocalFetchRequest>,
time_service: TimeService,
}

impl DagFetcher {
impl DagFetcherService {
pub fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
Expand All @@ -147,13 +150,12 @@ impl DagFetcher {
let (request_tx, request_rx) = tokio::sync::mpsc::channel(16);
let (node_tx, node_rx) = tokio::sync::mpsc::channel(100);
let (certified_node_tx, certified_node_rx) = tokio::sync::mpsc::channel(100);
let ordered_authors = epoch_state.verifier.get_ordered_account_addresses();
(
Self {
epoch_state,
network,
dag,
inner: DagFetcher::new(epoch_state, network, dag, time_service),
request_rx,
time_service,
ordered_authors,
},
FetchRequester {
request_tx,
Expand All @@ -167,75 +169,130 @@ impl DagFetcher {

pub async fn start(mut self) {
while let Some(local_request) = self.request_rx.recv().await {
let responders = local_request
.responders(&self.epoch_state.verifier.get_ordered_account_addresses());
let remote_request = {
let dag_reader = self.dag.read();

let missing_parents: Vec<NodeMetadata> = dag_reader
.filter_missing(local_request.node().parents_metadata())
.cloned()
.collect();

if missing_parents.is_empty() {
local_request.notify();
continue;
}

let target = local_request.node();
RemoteFetchRequest::new(
target.metadata().epoch(),
missing_parents,
dag_reader.bitmask(local_request.node().round()),
match self
.fetch_for_node(
local_request.node(),
local_request.responders(&self.ordered_authors),
)
};

let mut rpc = RpcWithFallback::new(
responders,
remote_request.clone().into(),
Duration::from_millis(500),
Duration::from_secs(1),
self.network.clone(),
self.time_service.clone(),
);
while let Some(response) = rpc.next().await {
if let Ok(response) =
response
.and_then(FetchResponse::try_from)
.and_then(|response| {
response.verify(&remote_request, &self.epoch_state.verifier)
})
.await
{
Ok(_) => local_request.notify(),
Err(err) => error!("unable to complete fetch successfully: {}", err),
}
}
}

pub(super) async fn fetch_for_node(
&mut self,
node: &Node,
responders: Vec<Author>,
) -> anyhow::Result<()> {
let remote_request = {
let dag_reader = self.inner.dag.read();

let missing_parents: Vec<NodeMetadata> = dag_reader
.filter_missing(node.parents_metadata())
.cloned()
.collect();

if missing_parents.is_empty() {
return Ok(());
}

RemoteFetchRequest::new(
node.metadata().epoch(),
RemoteFetchRequestTargets::ByMetadata(missing_parents),
dag_reader.bitmask(node.round()),
)
};
self.inner
.fetch(remote_request, responders, |dag_reader| {
dag_reader.all_exists(node.parents_metadata())
})
.await
}
}

#[async_trait]
pub trait TDagFetcher {
async fn fetch(
&self,
remote_request: RemoteFetchRequest,
responders: Vec<Author>,
predicate: impl Fn(RwLockReadGuard<Dag>) -> bool + Send,
) -> anyhow::Result<()>;
}

pub(crate) struct DagFetcher {
network: Arc<dyn TDAGNetworkSender>,
time_service: TimeService,
epoch_state: Arc<EpochState>,
dag: Arc<RwLock<Dag>>,
}

impl DagFetcher {
pub(crate) fn new(
epoch_state: Arc<EpochState>,
network: Arc<dyn TDAGNetworkSender>,
dag: Arc<RwLock<Dag>>,
time_service: TimeService,
) -> Self {
Self {
network,
time_service,
epoch_state,
dag,
}
}
}

#[async_trait]
impl TDagFetcher for DagFetcher {
async fn fetch(
&self,
remote_request: RemoteFetchRequest,
responders: Vec<Author>,
predicate: impl Fn(RwLockReadGuard<Dag>) -> bool + Send,
) -> anyhow::Result<()> {
let mut rpc = RpcWithFallback::new(
responders,
remote_request.clone().into(),
Duration::from_millis(500),
Duration::from_secs(1),
self.network.clone(),
self.time_service.clone(),
);

// TODO retry
while let Some(response) = rpc.next().await {
if let Ok(response) = response
.and_then(FetchResponse::try_from)
.and_then(|response| response.verify(&remote_request, &self.epoch_state.verifier))
{
let certified_nodes = response.certified_nodes();
// TODO: support chunk response or fallback to state sync
{
let certified_nodes = response.certified_nodes();
// TODO: support chunk response or fallback to state sync
{
let mut dag_writer = self.dag.write();
for node in certified_nodes {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
}
let mut dag_writer = self.dag.write();
for node in certified_nodes {
if let Err(e) = dag_writer.add_node(node) {
error!("Failed to add node {}", e);
}
}
}

if self
.dag
.read()
.all_exists(local_request.node().parents_metadata())
{
local_request.notify();
break;
}
if predicate(self.dag.read()) {
return Ok(());
}
}
// TODO retry
}
Err(anyhow!("fetch failed"))
}
}

#[derive(Debug, ThisError)]
pub enum FetchRequestHandleError {
#[error("parents are missing")]
ParentsMissing,
#[error("target nodes are missing")]
TargetsMissing,
}

pub struct FetchRequestHandler {
Expand All @@ -259,19 +316,31 @@ impl RpcHandler for FetchRequestHandler {
fn process(&mut self, message: Self::Request) -> anyhow::Result<Self::Response> {
let dag_reader = self.dag.read();

// `Certified Node`: In the good case, there should exist at least one honest validator that
// signed the Certified Node that has the all the parents to fulfil this
// request.
// `Node`: In the good case, the sender of the Node should have the parents in its local DAG
// to satisfy this request.
ensure!(
dag_reader.all_exists(message.targets().iter()),
FetchRequestHandleError::ParentsMissing
);
let targets = match message.targets() {
RemoteFetchRequestTargets::ByMetadata(target_metadata) => {
// `Certified Node`: In the good case, there should exist at least one honest validator that
// signed the Certified Node that has the all the parents to fulfil this
// request.
// `Node`: In the good case, the sender of the Node should have the parents in its local DAG
// to satisfy this request.
ensure!(
dag_reader.all_exists(target_metadata.iter()),
FetchRequestHandleError::TargetsMissing
);

target_metadata.clone()
},
RemoteFetchRequestTargets::ByRoundDigest(round, digest) => {
vec![dag_reader
.get_node_by_round_digest(*round, *digest)
.map(|node| node.metadata().clone())
.ok_or_else(|| anyhow::anyhow!("unable to find node"))?]
},
};

let certified_nodes: Vec<_> = dag_reader
.reachable(
message.targets(),
&targets,
Some(message.exists_bitmask().first_round()),
|_| true,
)
Expand Down
5 changes: 5 additions & 0 deletions consensus/src/dag/dag_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::types::DAGMessage;
use aptos_consensus_types::common::Author;
use aptos_reliable_broadcast::RBNetworkSender;
use aptos_time_service::{Interval, TimeService, TimeServiceTrait};
use aptos_types::{epoch_change::EpochChangeProof, ledger_info::LedgerInfoWithSignatures};
use async_trait::async_trait;
use futures::{
stream::{FusedStream, FuturesUnordered},
Expand Down Expand Up @@ -42,6 +43,10 @@ pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage> {
retry_interval: Duration,
rpc_timeout: Duration,
) -> RpcWithFallback;

async fn send_epoch_change(&self, proof: EpochChangeProof);

async fn send_commit_proof(&self, ledger_info: LedgerInfoWithSignatures);
}

struct Responders {
Expand Down
Loading

0 comments on commit abf8a81

Please sign in to comment.