Skip to content

Commit

Permalink
[dag] epoch manager integration; dag is here
Browse files Browse the repository at this point in the history
  • Loading branch information
ibalajiarun committed Sep 13, 2023
1 parent f31b871 commit 8a03f22
Show file tree
Hide file tree
Showing 28 changed files with 466 additions and 135 deletions.
3 changes: 2 additions & 1 deletion consensus/consensus-types/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,9 +210,10 @@ impl Block {
payload: Payload,
author: Author,
failed_authors: Vec<(Round, Author)>,
parent_block_info: BlockInfo,
) -> anyhow::Result<Self> {
let block_data =
BlockData::new_for_dag(epoch, round, timestamp, payload, author, failed_authors);
BlockData::new_for_dag(epoch, round, timestamp, payload, author, failed_authors, parent_block_info);
Self::new_proposal_from_block_data(block_data, &ValidatorSigner::from_int(0))
}

Expand Down
3 changes: 2 additions & 1 deletion consensus/consensus-types/src/block_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,13 +218,14 @@ impl BlockData {
payload: Payload,
author: Author,
failed_authors: Vec<(Round, Author)>,
parent_block_info: BlockInfo
) -> Self {
Self {
epoch,
round,
timestamp_usecs,
quorum_cert: QuorumCert::new(
VoteData::new(BlockInfo::empty(), BlockInfo::empty()),
VoteData::new(parent_block_info, BlockInfo::empty()),
LedgerInfoWithSignatures::new(
LedgerInfo::new(BlockInfo::empty(), HashValue::zero()),
AggregateSignature::empty(),
Expand Down
3 changes: 2 additions & 1 deletion consensus/src/consensus_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,11 @@ pub fn start_consensus(
timeout_sender,
consensus_to_mempool_sender,
state_computer,
storage,
storage.clone(),
quorum_store_db,
reconfig_events,
bounded_executor,
aptos_time_service::TimeService::real(),
);

let (network_task, network_receiver) = NetworkTask::new(network_service_events, self_receiver);
Expand Down
40 changes: 37 additions & 3 deletions consensus/src/dag/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@ use aptos_consensus_types::{
};
use aptos_crypto::HashValue;
use aptos_executor_types::StateComputeResult;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_storage_interface::{DbReader, Order};
use aptos_types::{
account_config::{new_block_event_key, NewBlockEvent},
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_change::EpochChangeProof,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
};
Expand All @@ -47,16 +49,38 @@ pub trait Notifier: Send + Sync {
pub struct NotifierAdapter {
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
parent_block_info: Arc<RwLock<BlockInfo>>,
}

impl NotifierAdapter {
pub fn new(
executor_channel: UnboundedSender<OrderedBlocks>,
storage: Arc<dyn DAGStorage>,
) -> Self {
let ledger_info_from_storage = storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");

// We start from the block that storage's latest ledger info, if storage has end-epoch
// LedgerInfo, we generate the virtual genesis block
let parent_block_info = if ledger_info_from_storage.ledger_info().ends_epoch() {
let genesis =
Block::make_genesis_block_from_ledger_info(ledger_info_from_storage.ledger_info());

let ledger_info = ledger_info_from_storage.ledger_info();
genesis.gen_block_info(
ledger_info.transaction_accumulator_hash(),
ledger_info.version(),
ledger_info.next_epoch_state().cloned(),
)
} else {
ledger_info_from_storage.ledger_info().commit_info().clone()
};

Self {
executor_channel,
storage,
parent_block_info: Arc::new(RwLock::new(parent_block_info)),
}
}
}
Expand All @@ -80,13 +104,23 @@ impl Notifier for NotifierAdapter {
payload.extend(node.payload().clone());
node_digests.push(node.digest());
}
let parent_block_info = self.parent_block_info.read().clone();
// TODO: we may want to split payload into multiple blocks
let block = ExecutedBlock::new(
Block::new_for_dag(epoch, round, timestamp, payload, author, failed_author)?,
Block::new_for_dag(
epoch,
round,
timestamp,
payload,
author,
failed_author,
parent_block_info,
)?,
StateComputeResult::new_dummy(),
);
let block_info = block.block_info();
let storage = self.storage.clone();
*self.parent_block_info.write() = block_info.clone();
Ok(self.executor_channel.unbounded_send(OrderedBlocks {
ordered_blocks: vec![block],
ordered_proof: LedgerInfoWithSignatures::new(
Expand All @@ -113,11 +147,11 @@ impl Notifier for NotifierAdapter {
}

async fn send_epoch_change(&self, _proof: EpochChangeProof) {
todo!()
// todo!()
}

async fn send_commit_proof(&self, _ledger_info: LedgerInfoWithSignatures) {
todo!()
// todo!()
}
}

Expand Down
49 changes: 27 additions & 22 deletions consensus/src/dag/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright © Aptos Foundation

use super::{
adapter::Notifier,
adapter::{Notifier, NotifierAdapter},
anchor_election::RoundRobinAnchorElection,
dag_driver::DagDriver,
dag_fetcher::{DagFetcherService, FetchRequestHandler},
Expand All @@ -15,7 +15,7 @@ use super::{
types::{CertifiedNodeMessage, DAGMessage},
};
use crate::{
dag::{adapter::NotifierAdapter, dag_fetcher::DagFetcher},
dag::dag_fetcher::DagFetcher,
experimental::buffer_manager::OrderedBlocks,
network::IncomingDAGRequest,
state_replication::{PayloadClient, StateComputer},
Expand All @@ -25,16 +25,11 @@ use aptos_channels::{
message_queues::QueueStyle,
};
use aptos_consensus_types::common::Author;
use aptos_crypto::HashValue;
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_logger::{debug, error};
use aptos_reliable_broadcast::{RBNetworkSender, ReliableBroadcast};
use aptos_types::{
aggregate_signature::AggregateSignature,
block_info::BlockInfo,
epoch_state::EpochState,
ledger_info::{LedgerInfo, LedgerInfoWithSignatures},
validator_signer::ValidatorSigner,
epoch_state::EpochState, ledger_info::LedgerInfo, validator_signer::ValidatorSigner,
};
use futures::{FutureExt, StreamExt};
use futures_channel::{
Expand All @@ -45,7 +40,7 @@ use std::{sync::Arc, time::Duration};
use tokio::{select, task::JoinHandle};
use tokio_retry::strategy::ExponentialBackoff;

struct DagBootstrapper {
pub struct DagBootstrapper {
self_peer: Author,
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
Expand All @@ -58,7 +53,7 @@ struct DagBootstrapper {
}

impl DagBootstrapper {
fn new(
pub fn new(
self_peer: Author,
signer: ValidatorSigner,
epoch_state: Arc<EpochState>,
Expand Down Expand Up @@ -170,12 +165,12 @@ impl DagBootstrapper {
(dag_handler, dag_fetcher)
}

async fn bootstrapper(
pub async fn bootstrapper(
self,
mut dag_rpc_rx: Receiver<Author, IncomingDAGRequest>,
ordered_nodes_tx: UnboundedSender<OrderedBlocks>,
shutdown_rx: oneshot::Receiver<()>,
) -> anyhow::Result<()> {
shutdown_rx: oneshot::Receiver<oneshot::Sender<()>>,
) {
let adapter = Arc::new(NotifierAdapter::new(ordered_nodes_tx, self.storage.clone()));

let sync_manager = DagStateSynchronizer::new(
Expand All @@ -186,15 +181,20 @@ impl DagBootstrapper {
self.storage.clone(),
);

// TODO: fetch the correct block info
let ledger_info = LedgerInfoWithSignatures::new(
LedgerInfo::new(BlockInfo::empty(), HashValue::zero()),
AggregateSignature::empty(),
);

let mut shutdown_rx = shutdown_rx.into_stream();

loop {
debug!(
"Bootstrapping DAG instance for epoch {}",
self.epoch_state.epoch
);

// TODO: make sure this will get the correct ledger info after state sync
let ledger_info = self
.storage
.get_latest_ledger_info()
.expect("latest ledger info must exist");

let (dag_store, order_rule) =
self.bootstrap_dag_store(ledger_info.ledger_info().clone(), adapter.clone());

Expand All @@ -208,12 +208,16 @@ impl DagBootstrapper {
// poll the network handler while waiting for rebootstrap notification or shutdown notification
select! {
biased;
_ = shutdown_rx.select_next_some() => {
Ok(ack_tx) = shutdown_rx.select_next_some() => {
df_handle.abort();
let _ = df_handle.await;
return Ok(());
if let Err(e) = ack_tx.send(()) {
error!(error = ?e, "unable to ack to shutdown signal");
}
return;
},
certified_node_msg = handler.run(&mut dag_rpc_rx) => {
debug!("state sync notification received. {:?}", certified_node_msg);
df_handle.abort();
let _ = df_handle.await;

Expand All @@ -222,6 +226,7 @@ impl DagBootstrapper {
if let Err(e) = sync_manager.sync_dag_to(&certified_node_msg, dag_fetcher, dag_store.clone()).await {
error!(error = ?e, "unable to sync");
}
debug!("going to rebootstrap.");
}
}
}
Expand Down
28 changes: 28 additions & 0 deletions consensus/src/dag/commit_signer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
use crate::experimental::signing_phase::CommitSignerProvider;
use aptos_crypto::bls12381;
use aptos_types::validator_signer::ValidatorSigner;

pub struct DagCommitSigner {
signer: ValidatorSigner,
}

impl DagCommitSigner {
pub fn new(signer: ValidatorSigner) -> Self {
Self { signer }
}
}

impl CommitSignerProvider for DagCommitSigner {
fn sign_commit_vote(
&self,
_ledger_info: aptos_types::ledger_info::LedgerInfoWithSignatures,
new_ledger_info: aptos_types::ledger_info::LedgerInfo,
) -> Result<bls12381::Signature, aptos_safety_rules::Error> {
let signature = self
.signer
.sign(&new_ledger_info)
.map_err(|err| aptos_safety_rules::Error::SerializationError(err.to_string()))?;

Ok(signature)
}
}
3 changes: 2 additions & 1 deletion consensus/src/dag/dag_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use crate::{
use anyhow::{bail, Ok};
use aptos_consensus_types::common::{Author, Payload};
use aptos_infallible::RwLock;
use aptos_logger::error;
use aptos_logger::{error, debug};
use aptos_reliable_broadcast::ReliableBroadcast;
use aptos_time_service::{TimeService, TimeServiceTrait};
use aptos_types::{block_info::Round, epoch_state::EpochState};
Expand Down Expand Up @@ -125,6 +125,7 @@ impl DagDriver {
}

pub fn enter_new_round(&mut self, new_round: Round, strong_links: Vec<NodeCertificate>) {
debug!("entering new round {}", new_round);
// TODO: support pulling payload
let payload = Payload::empty(false);
// TODO: need to wait to pass median of parents timestamp
Expand Down
8 changes: 6 additions & 2 deletions consensus/src/dag/dag_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use crate::{
use anyhow::bail;
use aptos_channels::aptos_channel;
use aptos_consensus_types::common::Author;
use aptos_logger::{error, warn};
use aptos_logger::{debug, error, warn};
use aptos_network::protocols::network::RpcError;
use aptos_types::epoch_state::EpochState;
use bytes::Bytes;
Expand Down Expand Up @@ -63,7 +63,7 @@ impl NetworkHandler {
// TODO(ibalajiarun): clean up Reliable Broadcast storage periodically.
loop {
select! {
Some(msg) = dag_rpc_rx.next() => {
msg = dag_rpc_rx.select_next_some() => {
match self.process_rpc(msg).await {
Ok(sync_status) => {
if let StateSyncStatus::NeedsSync(certified_node_msg) = sync_status {
Expand Down Expand Up @@ -95,6 +95,8 @@ impl NetworkHandler {
) -> anyhow::Result<StateSyncStatus> {
let dag_message: DAGMessage = rpc_request.req.try_into()?;

debug!("processing rpc: {:?}", dag_message);

let author = dag_message
.author()
.map_err(|_| anyhow::anyhow!("unexpected rpc message {:?}", dag_message))?;
Expand Down Expand Up @@ -130,6 +132,8 @@ impl NetworkHandler {
},
};

debug!("responding to process_rpc {:?}", response);

let response = response
.and_then(|response_msg| {
rpc_request
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/dag/dag_network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub trait TDAGNetworkSender: Send + Sync + RBNetworkSender<DAGMessage> {
/// Given a list of potential responders, sending rpc to get response from any of them and could
/// fallback to more in case of failures.
async fn send_rpc_with_fallbacks(
&self,
self: Arc<Self>,
responders: Vec<Author>,
message: DAGMessage,
retry_interval: Duration,
Expand Down
5 changes: 2 additions & 3 deletions consensus/src/dag/dag_state_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,10 +85,9 @@ impl StateSyncTrigger {
// (meaning consensus is behind) or
// the highest committed anchor round is 2*DAG_WINDOW behind the given ledger info round
// (meaning execution is behind the DAG window)
(dag_reader
dag_reader
.highest_ordered_anchor_round()
.unwrap_or_default()
< li.commit_info().round())
.is_some_and(|r| r < li.commit_info().round())
|| dag_reader.highest_committed_anchor_round()
+ ((STATE_SYNC_WINDOW_MULTIPLIER * DAG_WINDOW) as Round)
< li.commit_info().round()
Expand Down
7 changes: 6 additions & 1 deletion consensus/src/dag/dag_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,11 @@ impl Dag {
}

pub(super) fn highest_committed_anchor_round(&self) -> Round {
self.highest_committed_anchor_round
// TODO: update this info
self.storage
.get_latest_ledger_info()
.expect("must exist")
.commit_info()
.round()
}
}
5 changes: 5 additions & 0 deletions consensus/src/dag/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
mod adapter;
mod anchor_election;
mod bootstrap;
mod commit_signer;
mod dag_driver;
mod dag_fetcher;
mod dag_handler;
Expand All @@ -18,5 +19,9 @@ mod storage;
mod tests;
mod types;

pub use adapter::StorageAdapter;
pub use bootstrap::DagBootstrapper;
pub use dag_network::{RpcHandler, RpcWithFallback, TDAGNetworkSender};
pub use storage::DAGStorage;
pub use types::{CertifiedNode, DAGMessage, DAGNetworkMessage, Extensions, Node, NodeId, Vote};
pub use commit_signer::DagCommitSigner;
Loading

0 comments on commit 8a03f22

Please sign in to comment.