Skip to content

Commit

Permalink
WIP: sequencer catchup
Browse files Browse the repository at this point in the history
- Add functions to fetch from peers.
  • Loading branch information
sveitser committed Mar 4, 2024
1 parent 9d9774f commit 0586141
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 18 deletions.
78 changes: 78 additions & 0 deletions sequencer/src/catchup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::{
api::endpoints::{AccountQueryData, BlocksFrontier},
state::{BlockMerkleCommitment, BlockMerkleTree, FeeAccount, FeeMerkleTree},
};
use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _};
use jf_primitives::merkle_tree::{MerkleCommitment as _, MerkleTreeScheme as _};
use surf_disco::Client;
use tide_disco::error::ServerError;
use url::Url;

#[derive(Debug, Clone, Default)]
pub struct StatePeers {
clients: Vec<Client<ServerError>>,
}

impl StatePeers {
pub fn from_urls(urls: Vec<Url>) -> Self {
Self {
clients: urls.into_iter().map(Client::new).collect(),
}
}

pub async fn remember_account_balance(
&self,
view: ViewNumber,
account: FeeAccount,
fee_merkle_tree: &mut FeeMerkleTree,
) {
loop {
for client in self.clients.iter() {
match client
.get::<AccountQueryData>(&format!(
"state/catchup/{}/account/{}",
view.get_u64(),
account.address()
))
.send()
.await
{
Ok(res) => match res.proof.remember(fee_merkle_tree) {
Ok(_) => return,
Err(err) => tracing::warn!("Error verifying account proof: {}", err),
},
Err(err) => {
tracing::warn!("Error fetching account from peer: {}", err);
}
}
}
tracing::warn!("Could not fetch account from any peer, retrying");
// TODO: backoff? for testing a function that doesn't loop forever is probably more convenient.
}
}

pub async fn fetch_blocks_frontier(
&self,
view: ViewNumber,
root: BlockMerkleCommitment,
) -> Option<BlocksFrontier> {
for client in self.clients.iter() {
match client
.get::<BlocksFrontier>(&format!("state/catchup/{}/blocks", view.get_u64()))
.send()
.await
{
Ok(frontier) => {
match BlockMerkleTree::verify(root.digest(), root.size() - 1, &frontier) {
Ok(_) => return Some(frontier),
Err(err) => tracing::warn!("Error verifying block proof: {}", err),
}
}
Err(err) => {
tracing::warn!("Error fetching blocks from peer: {}", err);
}
}
}
None
}
}
12 changes: 8 additions & 4 deletions sequencer/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,7 @@ impl QueryableHeader<SeqTypes> for Header {
mod test_headers {
use super::*;
use crate::{
catchup::StatePeers,
l1_client::L1Client,
state::{validate_proposal, BlockMerkleTree, FeeMerkleTree},
NodeState, Payload,
Expand Down Expand Up @@ -554,8 +555,9 @@ mod test_headers {
let mut proposal = parent.clone();

// Advance `proposal.height` to trigger validation error.
let result =
validate_proposal(&mut validated_state, &parent.clone(), &proposal).unwrap_err();
let peers = StatePeers::default();
let result = validate_proposal(&peers, &mut validated_state, &parent.clone(), &proposal)
.unwrap_err();
assert_eq!(
format!("{}", result.root_cause()),
"Invalid Height Error: 0, 0"
Expand All @@ -564,8 +566,8 @@ mod test_headers {
// proposed `Header` root should include parent +
// parent.commit
proposal.height += 1;
let result =
validate_proposal(&mut validated_state, &parent.clone(), &proposal).unwrap_err();
let result = validate_proposal(&peers, &mut validated_state, &parent.clone(), &proposal)
.unwrap_err();
// Fails b/c `proposal` has not advanced from `parent`
assert!(format!("{}", result.root_cause()).contains("Invalid Block Root Error"));
}
Expand Down Expand Up @@ -618,7 +620,9 @@ mod test_headers {
let mut proposal_state = parent_state.clone();
let mut block_merkle_tree = proposal_state.block_merkle_tree.clone();
block_merkle_tree.push(proposal.commit()).unwrap();
let peers = StatePeers::from_urls(vec![]);
validate_proposal(
&peers,
&mut proposal_state,
&parent_header.clone(),
&proposal.clone(),
Expand Down
6 changes: 6 additions & 0 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod api;
pub mod block;
pub mod catchup;
mod chain_variables;
pub mod context;
mod header;
Expand All @@ -8,6 +9,7 @@ pub mod options;
pub mod state_signature;

use block::entry::TxTableEntryWord;
use catchup::StatePeers;
use context::SequencerContext;
use ethers::{
core::k256::ecdsa::SigningKey,
Expand Down Expand Up @@ -177,6 +179,7 @@ impl<N: network::Type> NodeImplementation<SeqTypes> for Node<N> {
#[derive(Clone, Debug)]
pub struct NodeState {
l1_client: L1Client,
peers: StatePeers,
genesis_state: ValidatedState,
builder_address: Wallet<SigningKey>,
}
Expand All @@ -195,6 +198,7 @@ impl Default for NodeState {
genesis_state: ValidatedState::default(),
builder_address: wallet,
l1_client: L1Client::new("http://localhost:3331".parse().unwrap()),
peers: StatePeers::default(),
}
}
}
Expand Down Expand Up @@ -267,6 +271,7 @@ pub async fn init_node(
mut persistence: impl SequencerPersistence,
builder_params: BuilderParams,
l1_params: L1Params,
peers: Vec<Url>,
) -> anyhow::Result<SequencerContext<network::Web>> {
// Orchestrator client
let validator_args = ValidatorArgs {
Expand Down Expand Up @@ -347,6 +352,7 @@ pub async fn init_node(
l1_client,
builder_address: wallet,
genesis_state,
peers: StatePeers::from_urls(peers),
};

let mut ctx = SequencerContext::init(
Expand Down
2 changes: 2 additions & 0 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ where
storage,
builder_params,
l1_params,
vec![], // TODO: peers
)
.await
.unwrap()
Expand All @@ -102,6 +103,7 @@ where
storage_opt.create().await?,
builder_params,
l1_params,
vec![], // TODO: peers
)
.await
}
Expand Down
49 changes: 35 additions & 14 deletions sequencer/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use crate::catchup::StatePeers;
use crate::{Header, L1BlockInfo, Leaf, NodeState, SeqTypes};
use anyhow::{ensure, Context};
use ark_serialize::{
CanonicalDeserialize, CanonicalSerialize, Compress, Read, SerializationError, Valid, Validate,
};
use async_std::task::block_on;
use commit::{Commitment, Committable, RawCommitmentBuilder};
use derive_more::{Add, From, Into, Sub};
use ethers::{
Expand Down Expand Up @@ -58,6 +60,7 @@ impl ValidatedState {
}

pub fn validate_proposal(
peers: &StatePeers,
state: &mut ValidatedState,
parent: &Header,
proposal: &Header,
Expand Down Expand Up @@ -93,14 +96,17 @@ pub fn validate_proposal(
let receipts = fetch_fee_receipts(parent.l1_finalized, proposal.l1_finalized);
for FeeInfo { account, amount } in receipts {
// Get the balance in order to add amount, ignoring the proof.
match fee_merkle_tree.universal_lookup(account) {
LookupResult::Ok(balance, _) => fee_merkle_tree
.update(account, balance.add(amount))
.unwrap(),
// Handle `NotFound` and `NotInMemory` by initializing
// state.
_ => fee_merkle_tree.update(account, amount).unwrap(),
};
let lookup = fee_merkle_tree.universal_lookup(account);
if matches!(lookup, LookupResult::NotInMemory) {
let view = todo!(); // where to get the view number?
block_on(peers.remember_account_balance(view, account, fee_merkle_tree));
}
fee_merkle_tree
.update_with(account, |balance| {
Some(balance.cloned().unwrap_or_default().add(amount))
})
.expect("update_with succeeds");
// TODO: can this really not fail?
}

let fee_merkle_tree_root = fee_merkle_tree.commitment();
Expand All @@ -116,17 +122,26 @@ pub fn validate_proposal(
}

/// Fetch receipts from the l1 and add them to local balance.
fn update_balance(fee_merkle_tree: &mut FeeMerkleTree, parent: &Header, proposed: &Header) {
fn update_balance(
peers: &StatePeers,
fee_merkle_tree: &mut FeeMerkleTree,
parent: &Header,
proposed: &Header,
) {
let receipts = fetch_fee_receipts(parent.l1_finalized, proposed.l1_finalized);
for FeeInfo { account, amount } in receipts {
// Add `amount` to `balance`, if `balance` is `None` set it to `amount`
#[allow(clippy::single_match)]
match fee_merkle_tree.update_with(account, |balance| {
Some(balance.cloned().unwrap_or_default().add(amount))
}) {
Ok(LookupResult::Ok(..)) => (),
// TODO handle `LookupResult::NotInMemory` by looking up the value from
// a peer during catchup.
Ok(LookupResult::NotInMemory) => {
// This account is not in memory, attempt to fetch it from peers.
let view = todo!(); // where to get the view number?
block_on(peers.remember_account_balance(view, account, fee_merkle_tree));
// Now that the entry is "remembered" the update should succeed.
update_balance(peers, fee_merkle_tree, parent, proposed);
}
_ => (),
}
}
Expand Down Expand Up @@ -189,7 +204,7 @@ impl HotShotState<SeqTypes> for ValidatedState {
/// proposal descends from parent. Returns updated `ValidatedState`.
async fn validate_and_apply_header(
&self,
_instance: &Self::Instance,
instance: &Self::Instance,
parent_leaf: &Leaf,
proposed_header: &Header,
) -> Result<Self, Self::Error> {
Expand All @@ -198,7 +213,12 @@ impl HotShotState<SeqTypes> for ValidatedState {
let mut validated_state = self.clone();
let parent_header = parent_leaf.get_block_header();
// validate proposed header against parent
match validate_proposal(&mut validated_state, parent_header, proposed_header) {
match validate_proposal(
&instance.peers,
&mut validated_state,
parent_header,
proposed_header,
) {
// Note that currently only block state is updated.
Ok(validated_state) => validated_state,
Err(e) => {
Expand All @@ -209,6 +229,7 @@ impl HotShotState<SeqTypes> for ValidatedState {

// Update account balance from the l1
update_balance(
&instance.peers,
&mut validated_state.fee_merkle_tree,
parent_header,
proposed_header,
Expand Down

0 comments on commit 0586141

Please sign in to comment.