From 8b43b52fb8e1ebc086e1dd22e5fb9df2edb21ae7 Mon Sep 17 00:00:00 2001 From: sveitser Date: Mon, 26 Feb 2024 20:06:48 +0100 Subject: [PATCH 1/4] Sequencer catchup - Add functions to fetch from peers. - Re-organize block building 1. Check what's missing 2. Fetch missing data from peers 3. Build block - Restore merkle tree frontier - dd backoff to state peers - Make state peers configurable - Add nightly dev shell - Initial mock implementation of StateCatchup - Address review comments --- docker-compose.yaml | 5 + flake.nix | 18 +++ process-compose.yaml | 5 + sequencer/src/api/endpoints.rs | 6 + sequencer/src/block.rs | 4 +- sequencer/src/catchup.rs | 210 ++++++++++++++++++++++++++ sequencer/src/header.rs | 245 ++++++++++++++++++------------ sequencer/src/lib.rs | 19 +-- sequencer/src/main.rs | 1 + sequencer/src/options.rs | 4 + sequencer/src/state.rs | 267 +++++++++++++++++++++++---------- 11 files changed, 595 insertions(+), 189 deletions(-) create mode 100644 sequencer/src/catchup.rs diff --git a/docker-compose.yaml b/docker-compose.yaml index 9a26705b1..e39fee343 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -95,6 +95,7 @@ services: - ESPRESSO_SEQUENCER_DA_SERVER_URL - ESPRESSO_SEQUENCER_CONSENSUS_SERVER_URL - ESPRESSO_SEQUENCER_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer1:$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_STORAGE_PATH - ESPRESSO_SEQUENCER_L1_PROVIDER - ESPRESSO_SEQUENCER_L1_USE_LATEST_BLOCK_TAG @@ -128,6 +129,7 @@ services: - ESPRESSO_SEQUENCER_CONSENSUS_SERVER_URL - ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_API_PEERS=http://sequencer2:$ESPRESSO_SEQUENCER_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer2:$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_POSTGRES_HOST=sequencer-db - ESPRESSO_SEQUENCER_POSTGRES_USER=postgres - ESPRESSO_SEQUENCER_POSTGRES_PASSWORD=password @@ -161,6 +163,7 @@ services: - ESPRESSO_SEQUENCER_CONSENSUS_SERVER_URL - ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_API_PEERS=http://sequencer1:$ESPRESSO_SEQUENCER_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer3:$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_L1_PROVIDER - ESPRESSO_SEQUENCER_L1_USE_LATEST_BLOCK_TAG - ESPRESSO_STATE_RELAY_SERVER_URL @@ -190,6 +193,7 @@ services: - ESPRESSO_SEQUENCER_DA_SERVER_URL - ESPRESSO_SEQUENCER_CONSENSUS_SERVER_URL - ESPRESSO_SEQUENCER_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer4:$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_L1_PROVIDER - ESPRESSO_SEQUENCER_L1_USE_LATEST_BLOCK_TAG - ESPRESSO_STATE_RELAY_SERVER_URL @@ -219,6 +223,7 @@ services: - ESPRESSO_SEQUENCER_DA_SERVER_URL - ESPRESSO_SEQUENCER_CONSENSUS_SERVER_URL - ESPRESSO_SEQUENCER_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://sequencer0:$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_L1_PROVIDER - ESPRESSO_SEQUENCER_L1_USE_LATEST_BLOCK_TAG - ESPRESSO_STATE_RELAY_SERVER_URL diff --git a/flake.nix b/flake.nix index a62a993c2..22a3f53ee 100644 --- a/flake.nix +++ b/flake.nix @@ -229,6 +229,24 @@ crossShell { config = "x86_64-unknown-linux-musl"; }; devShells.armCrossShell = crossShell { config = "aarch64-unknown-linux-musl"; }; + devShells.nightly = + let + toolchain = pkgs.rust-bin.nightly.latest.minimal.override { + extensions = [ "rustfmt" "clippy" "llvm-tools-preview" "rust-src" ]; + }; + in + mkShell { + buildInputs = [ + # Rust dependencies + pkg-config + openssl + curl + protobuf # to compile libp2p-autonat + toolchain + ]; + inherit RUST_LOG RUST_BACKTRACE RUSTFLAGS CARGO_TARGET_DIR; + }; + devShells.rustShell = let stableToolchain = pkgs.rust-bin.stable.latest.minimal.override { diff --git a/process-compose.yaml b/process-compose.yaml index 91025246a..0e6fbfb61 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -102,6 +102,7 @@ processes: environment: - ESPRESSO_SEQUENCER_API_PORT=$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_API_PEERS=http://localhost:$ESPRESSO_SEQUENCER1_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://localhost:$ESPRESSO_SEQUENCER1_API_PORT - ESPRESSO_SEQUENCER_STORAGE_PATH=$ESPRESSO_BASE_STORAGE_PATH/seq0 - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_0 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_0 @@ -129,6 +130,7 @@ processes: environment: - ESPRESSO_SEQUENCER_API_PORT=$ESPRESSO_SEQUENCER1_API_PORT - ESPRESSO_SEQUENCER_API_PEERS=http://localhost:$ESPRESSO_SEQUENCER_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://localhost:$ESPRESSO_SEQUENCER2_API_PORT - ESPRESSO_SEQUENCER_STORAGE_PATH=$ESPRESSO_BASE_STORAGE_PATH/seq1 - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_1 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_1 @@ -155,6 +157,7 @@ processes: command: sequencer -- http -- status environment: - ESPRESSO_SEQUENCER_API_PORT=$ESPRESSO_SEQUENCER2_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://localhost:$ESPRESSO_SEQUENCER3_API_PORT - ESPRESSO_SEQUENCER_STORAGE_PATH=$ESPRESSO_BASE_STORAGE_PATH/seq2 - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_2 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_2 @@ -181,6 +184,7 @@ processes: command: sequencer -- http -- status environment: - ESPRESSO_SEQUENCER_API_PORT=$ESPRESSO_SEQUENCER3_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://localhost:$ESPRESSO_SEQUENCER4_API_PORT - ESPRESSO_SEQUENCER_STORAGE_PATH=$ESPRESSO_BASE_STORAGE_PATH/seq3 - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_3 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_3 @@ -207,6 +211,7 @@ processes: command: sequencer -- http -- status environment: - ESPRESSO_SEQUENCER_API_PORT=$ESPRESSO_SEQUENCER4_API_PORT + - ESPRESSO_SEQUENCER_STATE_PEERS=http://localhost:$ESPRESSO_SEQUENCER_API_PORT - ESPRESSO_SEQUENCER_STORAGE_PATH=$ESPRESSO_BASE_STORAGE_PATH/seq4 - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_4 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_4 diff --git a/sequencer/src/api/endpoints.rs b/sequencer/src/api/endpoints.rs index 032a576c1..c66c87447 100644 --- a/sequencer/src/api/endpoints.rs +++ b/sequencer/src/api/endpoints.rs @@ -60,6 +60,12 @@ pub struct AccountQueryData { pub proof: FeeAccountProof, } +impl From<(FeeAccountProof, U256)> for AccountQueryData { + fn from((proof, balance): (FeeAccountProof, U256)) -> Self { + Self { balance, proof } + } +} + pub type BlocksFrontier = ::MembershipProof; pub(super) type AvailState = Arc>>; diff --git a/sequencer/src/block.rs b/sequencer/src/block.rs index 2f8d685a7..0e760b4b8 100644 --- a/sequencer/src/block.rs +++ b/sequencer/src/block.rs @@ -16,10 +16,12 @@ use entry::TxTableEntryWord; use payload::Payload; use tables::NameSpaceTable; +pub type NsTable = NameSpaceTable; + impl BlockPayload for Payload { type Error = crate::Error; type Transaction = Transaction; - type Metadata = NameSpaceTable; + type Metadata = NsTable; // TODO change `BlockPayload::Encode` trait bounds to enable copyless encoding such as AsRef<[u8]> // https://github.com/EspressoSystems/HotShot/issues/2115 diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs new file mode 100644 index 000000000..4d5146fb6 --- /dev/null +++ b/sequencer/src/catchup.rs @@ -0,0 +1,210 @@ +use crate::{ + api::endpoints::{AccountQueryData, BlocksFrontier}, + state::{BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment}, + Header, ValidatedState, +}; +use async_trait::async_trait; +use commit::Commitment; +use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _}; +use jf_primitives::merkle_tree::{ForgetableMerkleTreeScheme, MerkleTreeScheme as _}; +use serde::de::DeserializeOwned; +use std::time::Duration; +use surf_disco::Request; +use tide_disco::error::ServerError; +use url::Url; + +// This newtype is probably not worth having. It's only used to be able to log +// URLs before doing requests. +#[derive(Debug, Clone)] +struct Client { + inner: surf_disco::Client, + url: Url, +} + +impl Client { + pub fn new(url: Url) -> Self { + Self { + inner: surf_disco::Client::new(url.clone()), + url, + } + } + + pub fn get(&self, route: &str) -> Request { + self.inner.get(route) + } +} + +#[async_trait] +pub trait StateCatchup: Send + Sync + std::fmt::Debug { + async fn fetch_accounts( + &self, + view: ViewNumber, + fee_merkle_tree_root: FeeMerkleCommitment, + accounts: Vec, + ) -> Vec; + + async fn remember_blocks_merkle_tree( + &self, + view: ViewNumber, + mt: &mut BlockMerkleTree, + elem: &Commitment
, + ); +} + +#[derive(Debug, Clone, Default)] +pub struct MockStateCatchup { + state: ValidatedState, +} + +impl From for MockStateCatchup { + fn from(state: ValidatedState) -> Self { + Self { state } + } +} + +#[async_trait] +impl StateCatchup for MockStateCatchup { + async fn fetch_accounts( + &self, + view: ViewNumber, + _fee_merkle_tree_root: FeeMerkleCommitment, + accounts: Vec, + ) -> Vec { + accounts + .into_iter() + .map(|account| { + tracing::info!("catchup: fetching account {account:?} for view {view:?}"); + FeeAccountProof::prove(&self.state.fee_merkle_tree, account.into()) + .unwrap_or_else(|| panic!("Account {account:?} not in memory")) + .into() + }) + .collect() + } + + async fn remember_blocks_merkle_tree( + &self, + view: ViewNumber, + mt: &mut BlockMerkleTree, + elem: &Commitment
, + ) { + tracing::info!("catchup: fetching frontier for view {view:?}"); + let view = view.get_u64(); + let (_, proof) = self + .state + .block_merkle_tree + .lookup(view) + .expect_ok() + .unwrap(); + mt.remember(view, elem, proof.clone()) + .expect("Proof verifies"); + } +} + +#[derive(Debug, Clone, Default)] +pub struct StatePeers { + clients: Vec>, + interval: Duration, +} + +impl StatePeers { + pub fn from_urls(urls: Vec) -> Self { + if urls.is_empty() { + panic!("Cannot create StatePeers with no peers"); + } + + Self { + clients: urls.into_iter().map(Client::new).collect(), + interval: Duration::from_secs(1), + } + } + + async fn fetch_account( + &self, + view: ViewNumber, + fee_merkle_tree_root: FeeMerkleCommitment, + account: FeeAccount, + ) -> AccountQueryData { + if self.clients.is_empty() { + panic!("No peers to fetch account from"); + } + loop { + for client in self.clients.iter() { + tracing::info!( + "Fetching account {account:?} for view {view:?} from {}", + client.url + ); + match client + .get::(&format!( + "state/catchup/{}/account/{}", + view.get_u64(), + account.address() + )) + .send() + .await + { + Ok(res) => match res.proof.verify(&fee_merkle_tree_root) { + Ok(_) => return res, + Err(err) => tracing::warn!("Error verifying account proof: {}", err), + }, + Err(err) => { + tracing::warn!("Error fetching account from peer: {}", err); + } + } + } + tracing::warn!("Could not fetch account from any peer, retrying"); + async_std::task::sleep(self.interval).await; + } + } +} + +#[async_trait] +impl StateCatchup for StatePeers { + async fn fetch_accounts( + &self, + view: ViewNumber, + fee_merkle_tree_root: FeeMerkleCommitment, + accounts: Vec, + ) -> Vec { + let mut ret = vec![]; + for account in accounts { + tracing::info!("Fetching account {account:?} for view {view:?}"); + ret.push( + self.fetch_account(view, fee_merkle_tree_root, account) + .await, + ) + } + ret + } + + async fn remember_blocks_merkle_tree( + &self, + view: ViewNumber, + mt: &mut BlockMerkleTree, + elem: &Commitment
, + ) { + if self.clients.is_empty() { + panic!("No peers to fetch frontier from"); + } + loop { + for client in self.clients.iter() { + tracing::info!("Fetching frontier for view {view:?} from {}", client.url); + match client + .get::(&format!("state/catchup/{}/blocks", view.get_u64())) + .send() + .await + { + // TODO: is this the right way to remember the frontier? + Ok(frontier) => match mt.remember(view.get_u64(), elem, &frontier) { + Ok(_) => return, + Err(err) => tracing::warn!("Error verifying block proof: {}", err), + }, + Err(err) => { + tracing::warn!("Error fetching blocks from peer: {}", err); + } + } + } + tracing::warn!("Could not fetch frontier from any peer, retrying"); + async_std::task::sleep(self.interval).await; + } + } +} diff --git a/sequencer/src/header.rs b/sequencer/src/header.rs index 63c738029..96216c602 100644 --- a/sequencer/src/header.rs +++ b/sequencer/src/header.rs @@ -1,7 +1,7 @@ use crate::{ - block::{entry::TxTableEntryWord, tables::NameSpaceTable}, + block::{entry::TxTableEntryWord, tables::NameSpaceTable, NsTable}, l1_client::L1Snapshot, - state::{fetch_fee_receipts, BlockMerkleCommitment, FeeInfo, FeeMerkleCommitment}, + state::{fetch_fee_receipts, BlockMerkleCommitment, FeeAccount, FeeInfo, FeeMerkleCommitment}, L1BlockInfo, Leaf, NodeState, SeqTypes, ValidatedState, }; use ark_serialize::CanonicalSerialize; @@ -24,7 +24,6 @@ use hotshot_types::{ use jf_primitives::merkle_tree::prelude::*; use serde::{Deserialize, Serialize}; -use std::{fmt::Debug, ops::Add}; use time::OffsetDateTime; /// A header is like a [`Block`] with the body replaced by a digest. @@ -131,9 +130,10 @@ impl Header { // TODO pub or merely pub(super)? pub fn from_info( payload_commitment: VidCommitment, - ns_table: NameSpaceTable, + ns_table: NsTable, parent_leaf: &Leaf, mut l1: L1Snapshot, + l1_deposits: &[FeeInfo], mut timestamp: u64, parent_state: &ValidatedState, builder_address: Wallet, @@ -182,33 +182,25 @@ impl Header { } } - let ValidatedState { - mut block_merkle_tree, - mut fee_merkle_tree, - } = parent_state.clone(); - block_merkle_tree.push(parent_header.commit()).unwrap(); - let block_merkle_tree_root = block_merkle_tree.commitment(); - - // fetch receipts from the l1 - let receipts = fetch_fee_receipts(parent_header.l1_finalized, l1.finalized); - for receipt in receipts { - let account = receipt.account(); - let amount = receipt.amount(); - - // Get the balance in order to add amount, ignoring the proof. - match fee_merkle_tree.update_with(account, |balance| { - Some(balance.cloned().unwrap_or_default().add(amount)) - }) { - Ok(LookupResult::Ok(..)) => (), - // Handle `NotFound` and `NotInMemory` by initializing - // state. - _ => { - fee_merkle_tree.update(account, amount).unwrap(); - } - } + let mut state = parent_state.clone(); + state + .block_merkle_tree + .push(parent_header.commit()) + .unwrap(); + let block_merkle_tree_root = state.block_merkle_tree.commitment(); + + // Insert the new L1 deposits + for fee_info in l1_deposits { + state + .insert_fee_deposit(*fee_info) + .expect("fee deposit previously verified"); + // TODO: Check LookupResult } - let fee_merkle_tree_root = fee_merkle_tree.commitment(); + // TODO Check that we have the fee to pay for the block. + // We currently can't return an error from Header::new. + + let fee_merkle_tree_root = state.fee_merkle_tree.commitment(); let header = Self { height, @@ -244,21 +236,62 @@ impl BlockHeader for Header { payload_commitment: VidCommitment, metadata: <::BlockPayload as BlockPayload>::Metadata, ) -> Self { - // The HotShot APIs should be redesigned so that - // * they are async - // * new blocks being created have access to the application state, which in our case could - // contain an already connected L1 client. - // For now, as a workaround, we will create a new L1 client based on environment variables - // and use `block_on` to query it. - let l1 = instance_state.l1_client().snapshot().await; + // Fetch the latest L1 snapshot. + let l1_snapshot = instance_state.l1_client().snapshot().await; + // Fetch the new L1 deposits between parent and current finalized L1 block. + let l1_deposits = fetch_fee_receipts( + parent_leaf.get_block_header().l1_finalized, + l1_snapshot.finalized, + ); + + // Find missing fee state entries + let missing_accounts = parent_state.forgotten_accounts( + std::iter::once(FeeAccount::from(instance_state.builder_address.address())) + .chain(l1_deposits.iter().map(|info| info.account())), + ); + + // Fetch missing fee state entries + let missing_account_proofs = instance_state + .peers + .as_ref() + .fetch_accounts( + parent_leaf.get_view_number(), + parent_state.fee_merkle_tree.commitment(), + missing_accounts, + ) + .await; + + // Insert missing fee state entries + let mut validated_state = parent_state.clone(); + for account in missing_account_proofs.iter() { + account + .proof + .remember(&mut validated_state.fee_merkle_tree) + .expect("proof previously verified"); + } + + // Ensure merkle tree has frontier + let view = parent_leaf.get_view_number(); + if validated_state.need_to_fetch_blocks_mt_frontier(view) { + instance_state + .peers + .as_ref() + .remember_blocks_merkle_tree( + view, + &mut validated_state.block_merkle_tree, + &parent_leaf.get_block_header().commit(), + ) + .await; + } Self::from_info( payload_commitment, metadata, parent_leaf, - l1, + l1_snapshot, + &l1_deposits, OffsetDateTime::now_utc().unix_timestamp() as u64, - parent_state, + &validated_state, instance_state.builder_address.clone(), ) } @@ -312,18 +345,20 @@ impl QueryableHeader for Header { #[cfg(test)] mod test_headers { + use std::sync::Arc; + use super::*; use crate::{ + catchup::MockStateCatchup, l1_client::L1Client, - state::{validate_proposal, BlockMerkleTree, FeeMerkleTree}, - NodeState, Payload, + state::{validate_and_apply_proposal, BlockMerkleTree, FeeMerkleTree}, + NodeState, }; use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; use ethers::{ types::{Address, RecoveryMessage}, utils::Anvil, }; - use hotshot_types::traits::block_contents::{vid_commitment, GENESIS_VID_NUM_STORAGE_NODES}; #[derive(Debug, Default)] #[must_use] @@ -344,6 +379,31 @@ mod test_headers { expected_l1_finalized: Option, } + struct GenesisForTest { + pub instance_state: NodeState, + pub validated_state: ValidatedState, + pub leaf: Leaf, + pub header: Header, + pub ns_table: NameSpaceTable, + } + + impl Default for GenesisForTest { + fn default() -> Self { + let instance_state = NodeState::default(); + let validated_state = ValidatedState::genesis(&instance_state); + let leaf = Leaf::genesis(&instance_state); + let header = leaf.get_block_header().clone(); + let ns_table = leaf.get_block_payload().unwrap().get_ns_table().clone(); + Self { + instance_state, + validated_state, + leaf, + header, + ns_table, + } + } + } + impl TestCase { fn run(self) { setup_logging(); @@ -354,21 +414,13 @@ mod test_headers { assert!(self.expected_l1_head >= self.parent_l1_head); assert!(self.expected_l1_finalized >= self.parent_l1_finalized); - let genesis_state = NodeState::default(); - let genesis_leaf = Leaf::genesis(&genesis_state); - let genesis_header = genesis_leaf.get_block_header(); - let genesis_ns_table = genesis_leaf - .get_block_payload() - .unwrap() - .get_ns_table() - .clone(); - - let mut parent = genesis_header.clone(); + let genesis = GenesisForTest::default(); + let mut parent = genesis.header.clone(); parent.timestamp = self.parent_timestamp; parent.l1_head = self.parent_l1_head; parent.l1_finalized = self.parent_l1_finalized; - let mut parent_leaf = genesis_leaf.clone(); + let mut parent_leaf = genesis.leaf.clone(); parent_leaf.block_header = parent.clone(); let block_merkle_tree = @@ -386,16 +438,17 @@ mod test_headers { }; let header = Header::from_info( - genesis_header.payload_commitment, - genesis_ns_table, + genesis.header.payload_commitment, + genesis.ns_table, &parent_leaf, L1Snapshot { head: self.l1_head, finalized: self.l1_finalized, }, + &[], // TODO: l1 deposits self.timestamp, &validated_state, - genesis_state.builder_address, + genesis.instance_state.builder_address, ); assert_eq!(header.height, parent.height + 1); assert_eq!(header.timestamp, self.expected_timestamp); @@ -530,35 +583,26 @@ mod test_headers { #[test] fn test_validate_proposal_error_cases() { - let mut genesis_header = { - // TODO refactor repeated code from other tests - let (genesis_payload, genesis_ns_table) = Payload::genesis(); - let genesis_commitment = { - // TODO we should not need to collect payload bytes just to compute vid_commitment - let payload_bytes = genesis_payload - .encode() - .expect("unable to encode genesis payload") - .collect(); - vid_commitment(&payload_bytes, GENESIS_VID_NUM_STORAGE_NODES) - }; - let genesis_state = NodeState::default(); - Header::genesis(&genesis_state, genesis_commitment, genesis_ns_table) - }; + let genesis = GenesisForTest::default(); let mut validated_state = ValidatedState::default(); let mut block_merkle_tree = validated_state.block_merkle_tree.clone(); + let mut parent_header = genesis.header.clone(); + let mut parent_leaf = genesis.leaf.clone(); + parent_leaf.block_header = parent_header.clone(); + // Populate the tree with an initial `push`. - block_merkle_tree.push(genesis_header.commit()).unwrap(); + block_merkle_tree.push(genesis.header.commit()).unwrap(); let block_merkle_tree_root = block_merkle_tree.commitment(); validated_state.block_merkle_tree = block_merkle_tree.clone(); - genesis_header.block_merkle_tree_root = block_merkle_tree_root; - let parent = genesis_header.clone(); - let mut proposal = parent.clone(); + parent_header.block_merkle_tree_root = block_merkle_tree_root; + let mut proposal = parent_header.clone(); // Advance `proposal.height` to trigger validation error. let result = - validate_proposal(&mut validated_state, &parent.clone(), &proposal).unwrap_err(); + validate_and_apply_proposal(&mut validated_state, &parent_leaf, &proposal, vec![]) + .unwrap_err(); assert_eq!( format!("{}", result.root_cause()), "Invalid Height Error: 0, 0" @@ -568,65 +612,72 @@ mod test_headers { // parent.commit proposal.height += 1; let result = - validate_proposal(&mut validated_state, &parent.clone(), &proposal).unwrap_err(); + validate_and_apply_proposal(&mut validated_state, &parent_leaf, &proposal, vec![]) + .unwrap_err(); // Fails b/c `proposal` has not advanced from `parent` assert!(format!("{}", result.root_cause()).contains("Invalid Block Root Error")); } #[async_std::test] async fn test_validate_proposal_success() { + setup_logging(); + setup_backtrace(); + let anvil = Anvil::new().block_time(1u32).spawn(); - let genesis_state = NodeState { + let mut genesis_state = NodeState { l1_client: L1Client::new(anvil.endpoint().parse().unwrap(), Address::default()), ..Default::default() }; - // TODO refactor repeated code from other tests - let genesis_leaf = Leaf::genesis(&genesis_state); - let genesis_header = genesis_leaf.get_block_header().clone(); - let genesis_ns_table = genesis_leaf - .get_block_payload() - .unwrap() - .get_ns_table() - .clone(); + let genesis = GenesisForTest::default(); + + let mut parent_state = genesis.validated_state.clone(); - let mut parent_state = ValidatedState::genesis(&genesis_state); let mut block_merkle_tree = parent_state.block_merkle_tree.clone(); let fee_merkle_tree = parent_state.fee_merkle_tree.clone(); // Populate the tree with an initial `push`. - block_merkle_tree.push(genesis_header.commit()).unwrap(); + block_merkle_tree.push(genesis.header.commit()).unwrap(); let block_merkle_tree_root = block_merkle_tree.commitment(); let fee_merkle_tree_root = fee_merkle_tree.commitment(); parent_state.block_merkle_tree = block_merkle_tree.clone(); parent_state.fee_merkle_tree = fee_merkle_tree.clone(); - let mut parent_header = genesis_header.clone(); + let mut parent_header = genesis.header.clone(); parent_header.block_merkle_tree_root = block_merkle_tree_root; parent_header.fee_merkle_tree_root = fee_merkle_tree_root; - let mut parent_leaf = genesis_leaf.clone(); + let mut parent_leaf = genesis.leaf.clone(); parent_leaf.block_header = parent_header.clone(); - // get a proposal from a parent + // Forget the state to trigger lookups in Header::new + let forgotten_state = parent_state.forget(); + genesis_state.peers = Arc::new(MockStateCatchup::from(parent_state.clone())); + // Get a proposal from a parent + + // TODO this currently fails because after fetching the blocks frontier + // the element (header commitment) does not match the one in the proof. let proposal = Header::new( - &parent_state, + &forgotten_state, &genesis_state, &parent_leaf, parent_header.payload_commitment, - genesis_ns_table, + genesis.ns_table, ) .await; let mut proposal_state = parent_state.clone(); + // The current fake implementation of fetch_fee_receipts returns + // some fee info. To validate the proposal we need to insert these + // records here. + for fee_info in fetch_fee_receipts(None, None) { + proposal_state.insert_fee_deposit(fee_info).unwrap(); + } + let mut block_merkle_tree = proposal_state.block_merkle_tree.clone(); block_merkle_tree.push(proposal.commit()).unwrap(); - validate_proposal( - &mut proposal_state, - &parent_header.clone(), - &proposal.clone(), - ) - .unwrap(); + validate_and_apply_proposal(&mut proposal_state, &parent_leaf, &proposal.clone(), vec![]) + .unwrap(); assert_eq!( proposal_state.block_merkle_tree.commitment(), proposal.block_merkle_tree_root diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index a7956d42b..37df39dc2 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -1,5 +1,6 @@ pub mod api; pub mod block; +pub mod catchup; mod chain_variables; pub mod context; mod header; @@ -8,6 +9,7 @@ pub mod options; pub mod state_signature; use block::entry::TxTableEntryWord; +use catchup::{MockStateCatchup, StateCatchup, StatePeers}; use context::SequencerContext; use ethers::{ core::k256::ecdsa::SigningKey, @@ -173,9 +175,10 @@ impl NodeImplementation for Node { type CommitteeNetwork = N::DAChannel; } -#[derive(Clone, Debug)] +#[derive(Debug, Clone)] pub struct NodeState { l1_client: L1Client, + peers: Arc, genesis_state: ValidatedState, builder_address: Wallet, } @@ -186,18 +189,6 @@ impl NodeState { } } -impl Default for NodeState { - fn default() -> Self { - let wallet = FeeAccount::test_wallet(); - - Self { - genesis_state: ValidatedState::default(), - builder_address: wallet, - l1_client: L1Client::new("http://localhost:3331".parse().unwrap(), Address::default()), - } - } -} - impl InstanceState for NodeState {} impl NodeType for SeqTypes { @@ -246,6 +237,7 @@ pub struct NetworkParams { pub webserver_poll_interval: Duration, pub private_staking_key: BLSPrivKey, pub private_state_key: SchnorrPrivKey, + pub state_peers: Vec, } #[derive(Clone, Debug)] @@ -346,6 +338,7 @@ pub async fn init_node( l1_client, builder_address: wallet, genesis_state, + peers: Arc::new(StatePeers::from_urls(network_params.state_peers)), }; let mut ctx = SequencerContext::init( diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index 5e3ba18a2..e95249c69 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -60,6 +60,7 @@ where webserver_poll_interval: opt.webserver_poll_interval, private_staking_key: opt.private_staking_key, private_state_key: opt.private_state_key, + state_peers: opt.state_peers, }; // Inititialize HotShot. If the user requested the HTTP module, we must initialize the handle in diff --git a/sequencer/src/options.rs b/sequencer/src/options.rs index b57de8310..c0b2ee8ea 100644 --- a/sequencer/src/options.rs +++ b/sequencer/src/options.rs @@ -133,6 +133,10 @@ pub struct Options { /// Url we will use for RPC communication with L1. #[clap(long, env = "ESPRESSO_SEQUENCER_L1_PROVIDER")] pub l1_provider_url: Url, + + /// Peer nodes use to fetch missing state + #[clap(long, env = "ESPRESSO_SEQUENCER_STATE_PEERS", value_delimiter = ',')] + pub state_peers: Vec, } impl Options { diff --git a/sequencer/src/state.rs b/sequencer/src/state.rs index 04b82d856..e9bd4dd66 100644 --- a/sequencer/src/state.rs +++ b/sequencer/src/state.rs @@ -1,5 +1,5 @@ use crate::{Header, L1BlockInfo, Leaf, NodeState, SeqTypes}; -use anyhow::{ensure, Context}; +use anyhow::{bail, ensure, Context}; use ark_serialize::{ CanonicalDeserialize, CanonicalSerialize, Compress, Read, SerializationError, Valid, Validate, }; @@ -13,14 +13,21 @@ use ethers::{ types::{self, RecoveryMessage, U256}, }; use hotshot::traits::ValidatedState as HotShotState; -use hotshot_types::data::{BlockError, ViewNumber}; -use jf_primitives::merkle_tree::{ - prelude::{LightWeightSHA3MerkleTree, Sha3Digest, Sha3Node}, - universal_merkle_tree::UniversalMerkleTree, - AppendableMerkleTreeScheme, ForgetableMerkleTreeScheme, ForgetableUniversalMerkleTreeScheme, - LookupResult, MerkleCommitment, MerkleTreeScheme, +use hotshot_types::{ + data::{BlockError, ViewNumber}, + traits::node_implementation::ConsensusTime as _, }; +use itertools::Itertools; use jf_primitives::merkle_tree::{ToTraversalPath, UniversalMerkleTreeScheme}; +use jf_primitives::{ + errors::PrimitivesError, + merkle_tree::{ + prelude::{LightWeightSHA3MerkleTree, Sha3Digest, Sha3Node}, + universal_merkle_tree::UniversalMerkleTree, + AppendableMerkleTreeScheme, ForgetableMerkleTreeScheme, + ForgetableUniversalMerkleTreeScheme, LookupResult, MerkleCommitment, MerkleTreeScheme, + }, +}; use num_traits::CheckedSub; use serde::{Deserialize, Serialize}; use std::ops::Add; @@ -56,19 +63,72 @@ impl ValidatedState { pub fn prefund_account(&mut self, account: FeeAccount, amount: FeeAmount) { self.fee_merkle_tree.update(account, amount).unwrap(); } + + /// Find accounts that are not in memory. + /// + /// As an optimization we could try to apply updates and return the + /// forgotten accounts to be fetched from peers and update them later. + pub fn forgotten_accounts( + &self, + accounts: impl IntoIterator, + ) -> Vec { + accounts + .into_iter() + .unique() + .filter(|account| { + self.fee_merkle_tree + .lookup(*account) + .expect_not_in_memory() + .is_ok() + }) + .collect() + } + + /// Check if the merkle tree is available + pub fn need_to_fetch_blocks_mt_frontier(&self, view: ViewNumber) -> bool { + self.block_merkle_tree + .lookup(view.get_u64()) + .expect_ok() + .is_err() + } + + /// Insert a fee deposit receipt + pub fn insert_fee_deposit( + &mut self, + fee_info: FeeInfo, + ) -> Result, PrimitivesError> { + self.fee_merkle_tree + .update_with(fee_info.account, |balance| { + Some(balance.cloned().unwrap_or_default().add(fee_info.amount)) + }) + } } -pub fn validate_proposal( +#[cfg(any(test, feature = "testing"))] +impl ValidatedState { + pub fn forget(&self) -> Self { + Self { + fee_merkle_tree: FeeMerkleTree::from_commitment(self.fee_merkle_tree.commitment()), + block_merkle_tree: BlockMerkleTree::from_commitment( + self.block_merkle_tree.commitment(), + ), + } + } +} + +pub fn validate_and_apply_proposal( state: &mut ValidatedState, - parent: &Header, + parent_leaf: &Leaf, proposal: &Header, + receipts: Vec, ) -> anyhow::Result<()> { + let parent_header = parent_leaf.get_block_header(); // validate height anyhow::ensure!( - proposal.height == parent.height + 1, + proposal.height == parent_header.height + 1, anyhow::anyhow!( "Invalid Height Error: {}, {}", - parent.height, + parent_header.height, proposal.height ) ); @@ -79,36 +139,31 @@ pub fn validate_proposal( } = state; // validate proposal is descendent of parent by appending to parent - block_merkle_tree.push(parent.commit()).unwrap(); + block_merkle_tree.push(parent_header.commit()).unwrap(); let block_merkle_tree_root = block_merkle_tree.commitment(); anyhow::ensure!( proposal.block_merkle_tree_root == block_merkle_tree_root, anyhow::anyhow!( - "Invalid Block Root Error: {}, {}", + "Invalid Block Root Error: local={}, proposal={}", block_merkle_tree_root, proposal.block_merkle_tree_root ) ); - // fetch receipts from the l1 - let receipts = fetch_fee_receipts(parent.l1_finalized, proposal.l1_finalized); + // Insert the fee deposits for FeeInfo { account, amount } in receipts { - // Get the balance in order to add amount, ignoring the proof. - match fee_merkle_tree.universal_lookup(account) { - LookupResult::Ok(balance, _) => fee_merkle_tree - .update(account, balance.add(amount)) - .unwrap(), - // Handle `NotFound` and `NotInMemory` by initializing - // state. - _ => fee_merkle_tree.update(account, amount).unwrap(), - }; + fee_merkle_tree + .update_with(account, |balance| { + Some(balance.cloned().unwrap_or_default().add(amount)) + }) + .expect("update_with succeeds"); } let fee_merkle_tree_root = fee_merkle_tree.commitment(); anyhow::ensure!( proposal.fee_merkle_tree_root == fee_merkle_tree_root, anyhow::anyhow!( - "Invalid Fee Root Error: {}, {}", + "Invalid Fee Root Error: local={}, proposal={}", fee_merkle_tree_root, proposal.fee_merkle_tree_root ) @@ -116,24 +171,37 @@ pub fn validate_proposal( Ok(()) } -/// Fetch receipts from the l1 and add them to local balance. -fn update_balance(fee_merkle_tree: &mut FeeMerkleTree, parent: &Header, proposed: &Header) { - let receipts = fetch_fee_receipts(parent.l1_finalized, proposed.l1_finalized); - for FeeInfo { account, amount } in receipts { - // Add `amount` to `balance`, if `balance` is `None` set it to `amount` - #[allow(clippy::single_match)] - match fee_merkle_tree.update_with(account, |balance| { - Some(balance.cloned().unwrap_or_default().add(amount)) - }) { - Ok(LookupResult::Ok(..)) => (), - // TODO handle `LookupResult::NotInMemory` by looking up the value from - // a peer during catchup. - _ => (), +#[derive(Debug)] +enum ChargeFeeError { + /// Account not in memory, needs to be fetched from peer + NotInMemory, + /// Account exists but has insufficient funds + InsufficientFunds, +} + +fn charge_fee( + fee_merkle_tree: &mut FeeMerkleTree, + fee_info: FeeInfo, +) -> Result<(), ChargeFeeError> { + let FeeInfo { account, amount } = fee_info; + let lookup = fee_merkle_tree.universal_lookup(account); + match lookup { + LookupResult::Ok(balance, _) => { + let updated = balance + .checked_sub(&amount) + .ok_or(ChargeFeeError::InsufficientFunds)?; + fee_merkle_tree + .update(account, updated) + .expect("update succeeds"); + Ok(()) } + LookupResult::NotInMemory => Err(ChargeFeeError::NotInMemory), + LookupResult::NotFound(..) => Err(ChargeFeeError::InsufficientFunds), } } + /// Validate builder account by verifiying signature and charging the account. -fn validate_builder( +fn validate_and_charge_builder( fee_merkle_tree: &mut FeeMerkleTree, proposed_header: &Header, ) -> anyhow::Result<()> { @@ -155,27 +223,39 @@ fn validate_builder( ); // charge the fee to the builder - let mut fee_merkle_tree = fee_merkle_tree.clone(); - match fee_merkle_tree.universal_lookup(fee_info.account) { - LookupResult::Ok(balance, _) => { - let updated = balance - .checked_sub(&fee_info.amount) - .ok_or_else(|| anyhow::anyhow!("Insufficient funds"))?; - fee_merkle_tree.update(fee_info.account, updated).unwrap(); - } - LookupResult::NotFound(_) => { - anyhow::bail!(format!( - "Account Not Found {:?}", - fee_info.account.address() - )); - } - LookupResult::NotInMemory => { - anyhow::bail!(format!( - "Invalid Builder Account {:?}", - fee_info.account.address() - )); + if charge_fee(fee_merkle_tree, fee_info).is_err() { + bail!("Insufficient funds") + }; + Ok(()) +} + +/// A pure function to validate and apply a header to the state. +/// +/// It assumes that all state required to validate and apply the header +/// is available in the `validated_state`. +fn validate_and_apply_header( + validated_state: &mut ValidatedState, + parent_leaf: &Leaf, + proposed_header: &Header, + l1_deposits: Vec, +) -> Result<(), BlockError> { + // validate proposed header against parent + match validate_and_apply_proposal(validated_state, parent_leaf, proposed_header, l1_deposits) { + // Note that currently only block state is updated. + Ok(validated_state) => validated_state, + Err(e) => { + tracing::warn!("Invalid Proposal: {}", e); + return Err(BlockError::InvalidBlockHeader); } }; + + // Validate builder by verifying signature and charging account + if let Err(e) = + validate_and_charge_builder(&mut validated_state.fee_merkle_tree, proposed_header) + { + tracing::warn!("Invalid Builder: {}", e); + return Err(BlockError::InvalidBlockHeader); + }; Ok(()) } @@ -190,37 +270,68 @@ impl HotShotState for ValidatedState { /// proposal descends from parent. Returns updated `ValidatedState`. async fn validate_and_apply_header( &self, - _instance: &Self::Instance, + instance: &Self::Instance, parent_leaf: &Leaf, proposed_header: &Header, ) -> Result { // Clone state to avoid mutation. Consumer can take update // through returned value. let mut validated_state = self.clone(); - let parent_header = parent_leaf.get_block_header(); - // validate proposed header against parent - match validate_proposal(&mut validated_state, parent_header, proposed_header) { - // Note that currently only block state is updated. - Ok(validated_state) => validated_state, - Err(e) => { - tracing::warn!("Invalid Proposal: {}", e); - return Err(BlockError::InvalidBlockHeader); - } - }; - // Update account balance from the l1 - update_balance( - &mut validated_state.fee_merkle_tree, - parent_header, - proposed_header, + // Fetch the new L1 deposits between parent and current finalized L1 block. + let l1_deposits = fetch_fee_receipts( + parent_leaf.get_block_header().l1_finalized, + proposed_header.l1_finalized, ); - // Validate builder by verifying signature and charging account - if let Err(e) = validate_builder(&mut validated_state.fee_merkle_tree, proposed_header) { - tracing::warn!("Invalid Builder: {}", e); - return Err(BlockError::InvalidBlockHeader); + // Find missing state entries + let missing_accounts = self.forgotten_accounts( + std::iter::once(proposed_header.fee_info.account) + .chain(l1_deposits.iter().map(|fee_info| fee_info.account)), + ); + + let view = parent_leaf.get_view_number(); + + // Ensure merkle tree has frontier + if self.need_to_fetch_blocks_mt_frontier(view) { + instance + .peers + .as_ref() + .remember_blocks_merkle_tree( + view, + &mut validated_state.block_merkle_tree, + &parent_leaf.get_block_header().commit(), + ) + .await; + } + + // Fetch missing fee state entries + let missing_account_proofs = instance + .peers + .as_ref() + .fetch_accounts( + view, + validated_state.fee_merkle_tree.commitment(), + missing_accounts, + ) + .await; + + // Remember the fee state entries + for account in missing_account_proofs.iter() { + account + .proof + .remember(&mut validated_state.fee_merkle_tree) + .expect("proof previously verified"); } + // Lastly validate and apply the header + validate_and_apply_header( + &mut validated_state, + parent_leaf, + proposed_header, + l1_deposits, + )?; + Ok(validated_state) } /// Construct the state with the given block header. From 05df1aa0c9c09c169e57385a2bf0aafe34c3ee19 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Fri, 8 Mar 2024 12:26:55 -0800 Subject: [PATCH 2/4] Fix blocks catchup; feature-flag test code --- sequencer/src/catchup.rs | 140 ++++++++++++++-------------- sequencer/src/header.rs | 69 ++++++++------ sequencer/src/hotshot_commitment.rs | 24 ++--- sequencer/src/l1_client.rs | 8 +- sequencer/src/lib.rs | 42 +++++++-- sequencer/src/persistence.rs | 2 +- sequencer/src/state.rs | 66 +++++++------ 7 files changed, 195 insertions(+), 156 deletions(-) diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index 4d5146fb6..ef3768c15 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -1,10 +1,9 @@ use crate::{ api::endpoints::{AccountQueryData, BlocksFrontier}, state::{BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment}, - Header, ValidatedState, + ValidatedState, }; use async_trait::async_trait; -use commit::Commitment; use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _}; use jf_primitives::merkle_tree::{ForgetableMerkleTreeScheme, MerkleTreeScheme as _}; use serde::de::DeserializeOwned; @@ -43,61 +42,7 @@ pub trait StateCatchup: Send + Sync + std::fmt::Debug { accounts: Vec, ) -> Vec; - async fn remember_blocks_merkle_tree( - &self, - view: ViewNumber, - mt: &mut BlockMerkleTree, - elem: &Commitment
, - ); -} - -#[derive(Debug, Clone, Default)] -pub struct MockStateCatchup { - state: ValidatedState, -} - -impl From for MockStateCatchup { - fn from(state: ValidatedState) -> Self { - Self { state } - } -} - -#[async_trait] -impl StateCatchup for MockStateCatchup { - async fn fetch_accounts( - &self, - view: ViewNumber, - _fee_merkle_tree_root: FeeMerkleCommitment, - accounts: Vec, - ) -> Vec { - accounts - .into_iter() - .map(|account| { - tracing::info!("catchup: fetching account {account:?} for view {view:?}"); - FeeAccountProof::prove(&self.state.fee_merkle_tree, account.into()) - .unwrap_or_else(|| panic!("Account {account:?} not in memory")) - .into() - }) - .collect() - } - - async fn remember_blocks_merkle_tree( - &self, - view: ViewNumber, - mt: &mut BlockMerkleTree, - elem: &Commitment
, - ) { - tracing::info!("catchup: fetching frontier for view {view:?}"); - let view = view.get_u64(); - let (_, proof) = self - .state - .block_merkle_tree - .lookup(view) - .expect_ok() - .unwrap(); - mt.remember(view, elem, proof.clone()) - .expect("Proof verifies"); - } + async fn remember_blocks_merkle_tree(&self, view: ViewNumber, mt: &mut BlockMerkleTree); } #[derive(Debug, Clone, Default)] @@ -176,12 +121,7 @@ impl StateCatchup for StatePeers { ret } - async fn remember_blocks_merkle_tree( - &self, - view: ViewNumber, - mt: &mut BlockMerkleTree, - elem: &Commitment
, - ) { + async fn remember_blocks_merkle_tree(&self, view: ViewNumber, mt: &mut BlockMerkleTree) { if self.clients.is_empty() { panic!("No peers to fetch frontier from"); } @@ -193,11 +133,16 @@ impl StateCatchup for StatePeers { .send() .await { - // TODO: is this the right way to remember the frontier? - Ok(frontier) => match mt.remember(view.get_u64(), elem, &frontier) { - Ok(_) => return, - Err(err) => tracing::warn!("Error verifying block proof: {}", err), - }, + Ok(frontier) => { + let Some(elem) = frontier.elem() else { + tracing::warn!("Provided frontier is missing leaf element"); + continue; + }; + match mt.remember(view.get_u64(), *elem, &frontier) { + Ok(_) => return, + Err(err) => tracing::warn!("Error verifying block proof: {}", err), + } + } Err(err) => { tracing::warn!("Error fetching blocks from peer: {}", err); } @@ -208,3 +153,62 @@ impl StateCatchup for StatePeers { } } } + +#[cfg(any(test, feature = "testing"))] +pub mod mock { + use super::*; + use std::collections::HashMap; + + #[derive(Debug, Clone, Default)] + pub struct MockStateCatchup { + state: HashMap, + } + + impl FromIterator<(ViewNumber, ValidatedState)> for MockStateCatchup { + fn from_iter>(iter: I) -> Self { + Self { + state: iter.into_iter().collect(), + } + } + } + + #[async_trait] + impl StateCatchup for MockStateCatchup { + async fn fetch_accounts( + &self, + view: ViewNumber, + fee_merkle_tree_root: FeeMerkleCommitment, + accounts: Vec, + ) -> Vec { + tracing::info!("catchup: fetching account data for view {view:?}"); + let src = &self.state[&view].fee_merkle_tree; + assert_eq!(src.commitment(), fee_merkle_tree_root); + + accounts + .into_iter() + .map(|account| { + tracing::info!("catchup: fetching account {account:?} for view {view:?}"); + FeeAccountProof::prove(src, account.into()) + .unwrap_or_else(|| panic!("Account {account:?} not in memory")) + .into() + }) + .collect() + } + + async fn remember_blocks_merkle_tree(&self, view: ViewNumber, mt: &mut BlockMerkleTree) { + tracing::info!("catchup: fetching frontier for view {view:?}"); + let src = &self.state[&view].block_merkle_tree; + + assert_eq!(src.commitment(), mt.commitment()); + assert!( + src.num_leaves() > 0, + "catchup should not be triggered when blocks tree is empty" + ); + + let index = src.num_leaves() - 1; + let (elem, proof) = src.lookup(index).expect_ok().unwrap(); + mt.remember(index, elem, proof.clone()) + .expect("Proof verifies"); + } + } +} diff --git a/sequencer/src/header.rs b/sequencer/src/header.rs index 96216c602..7d93dfaba 100644 --- a/sequencer/src/header.rs +++ b/sequencer/src/header.rs @@ -229,6 +229,10 @@ impl Header { } impl BlockHeader for Header { + #[tracing::instrument( + skip_all, + fields(view = ?parent_leaf.view_number, height = parent_leaf.block_header.height), + )] async fn new( parent_state: &ValidatedState, instance_state: &NodeState, @@ -236,6 +240,8 @@ impl BlockHeader for Header { payload_commitment: VidCommitment, metadata: <::BlockPayload as BlockPayload>::Metadata, ) -> Self { + let mut validated_state = parent_state.clone(); + // Fetch the latest L1 snapshot. let l1_snapshot = instance_state.l1_client().snapshot().await; // Fetch the new L1 deposits between parent and current finalized L1 block. @@ -249,37 +255,35 @@ impl BlockHeader for Header { std::iter::once(FeeAccount::from(instance_state.builder_address.address())) .chain(l1_deposits.iter().map(|info| info.account())), ); + if !missing_accounts.is_empty() { + // Fetch missing fee state entries + let missing_account_proofs = instance_state + .peers + .as_ref() + .fetch_accounts( + parent_leaf.get_view_number(), + parent_state.fee_merkle_tree.commitment(), + missing_accounts, + ) + .await; - // Fetch missing fee state entries - let missing_account_proofs = instance_state - .peers - .as_ref() - .fetch_accounts( - parent_leaf.get_view_number(), - parent_state.fee_merkle_tree.commitment(), - missing_accounts, - ) - .await; - - // Insert missing fee state entries - let mut validated_state = parent_state.clone(); - for account in missing_account_proofs.iter() { - account - .proof - .remember(&mut validated_state.fee_merkle_tree) - .expect("proof previously verified"); + // Insert missing fee state entries + for account in missing_account_proofs.iter() { + account + .proof + .remember(&mut validated_state.fee_merkle_tree) + .expect("proof previously verified"); + } } // Ensure merkle tree has frontier - let view = parent_leaf.get_view_number(); - if validated_state.need_to_fetch_blocks_mt_frontier(view) { + if validated_state.need_to_fetch_blocks_mt_frontier() { instance_state .peers .as_ref() .remember_blocks_merkle_tree( - view, + parent_leaf.get_view_number(), &mut validated_state.block_merkle_tree, - &parent_leaf.get_block_header().commit(), ) .await; } @@ -349,7 +353,7 @@ mod test_headers { use super::*; use crate::{ - catchup::MockStateCatchup, + catchup::mock::MockStateCatchup, l1_client::L1Client, state::{validate_and_apply_proposal, BlockMerkleTree, FeeMerkleTree}, NodeState, @@ -389,7 +393,7 @@ mod test_headers { impl Default for GenesisForTest { fn default() -> Self { - let instance_state = NodeState::default(); + let instance_state = NodeState::mock(); let validated_state = ValidatedState::genesis(&instance_state); let leaf = Leaf::genesis(&instance_state); let header = leaf.get_block_header().clone(); @@ -624,10 +628,10 @@ mod test_headers { setup_backtrace(); let anvil = Anvil::new().block_time(1u32).spawn(); - let mut genesis_state = NodeState { - l1_client: L1Client::new(anvil.endpoint().parse().unwrap(), Address::default()), - ..Default::default() - }; + let mut genesis_state = NodeState::mock().with_l1(L1Client::new( + anvil.endpoint().parse().unwrap(), + Address::default(), + )); let genesis = GenesisForTest::default(); @@ -652,7 +656,10 @@ mod test_headers { // Forget the state to trigger lookups in Header::new let forgotten_state = parent_state.forget(); - genesis_state.peers = Arc::new(MockStateCatchup::from(parent_state.clone())); + genesis_state.peers = Arc::new(MockStateCatchup::from_iter([( + parent_leaf.view_number, + parent_state.clone(), + )])); // Get a proposal from a parent // TODO this currently fails because after fetching the blocks frontier @@ -692,7 +699,7 @@ mod test_headers { use ethers::signers::Wallet; // easy way to get a wallet: - let state = NodeState::default(); + let state = NodeState::mock(); let message = ";)"; // let address = state.builder_address.address(); let address: Wallet = state.builder_address; @@ -714,7 +721,7 @@ mod test_headers { use ethers::types; // easy way to get a wallet: - let state = NodeState::default(); + let state = NodeState::mock(); // simulate a fixed size hash by padding our message let message = ";)"; diff --git a/sequencer/src/hotshot_commitment.rs b/sequencer/src/hotshot_commitment.rs index fb26b903f..994792273 100644 --- a/sequencer/src/hotshot_commitment.rs +++ b/sequencer/src/hotshot_commitment.rs @@ -378,10 +378,10 @@ mod test { let num_batches = l1.hotshot.max_blocks().call().await.unwrap().as_usize(); let mut data = MockDataSource::default(); - let node_state = NodeState { - l1_client: L1Client::new(anvil.provider().url().clone(), Address::default()), - ..Default::default() - }; + let node_state = NodeState::mock().with_l1(L1Client::new( + anvil.provider().url().clone(), + Address::default(), + )); for i in 0..num_batches { data.leaves.push(Some(mock_leaf(i as u64, &node_state))); @@ -450,10 +450,10 @@ mod test { // Create a test batch. let mut data = MockDataSource::default(); - let node_state = NodeState { - l1_client: L1Client::new(anvil.provider().url().clone(), Address::default()), - ..Default::default() - }; + let node_state = NodeState::mock().with_l1(L1Client::new( + anvil.provider().url().clone(), + Address::default(), + )); data.leaves.push(Some(mock_leaf(0, &node_state))); // Connect to the HotShot contract with the expected L1 client. @@ -514,10 +514,10 @@ mod test { .unwrap(), ); - let node_state = NodeState { - l1_client: L1Client::new(anvil.provider().url().clone(), Address::default()), - ..Default::default() - }; + let node_state = NodeState::mock().with_l1(L1Client::new( + anvil.provider().url().clone(), + Address::default(), + )); // Create a sequence of leaves, some of which are missing. let mut data = MockDataSource::default(); diff --git a/sequencer/src/l1_client.rs b/sequencer/src/l1_client.rs index dfe292566..ece487917 100644 --- a/sequencer/src/l1_client.rs +++ b/sequencer/src/l1_client.rs @@ -224,10 +224,10 @@ mod test { // Test that nothing funky is happening to the provider when // passed along in state. - let state = NodeState { - l1_client: L1Client::new(anvil.endpoint().parse().unwrap(), Address::default()), - ..Default::default() - }; + let state = NodeState::mock().with_l1(L1Client::new( + anvil.endpoint().parse().unwrap(), + Address::default(), + )); let version = state.l1_client().provider.client_version().await.unwrap(); assert_eq!("anvil/v0.2.0", version); diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 37df39dc2..1997081fc 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -9,7 +9,7 @@ pub mod options; pub mod state_signature; use block::entry::TxTableEntryWord; -use catchup::{MockStateCatchup, StateCatchup, StatePeers}; +use catchup::{StateCatchup, StatePeers}; use context::SequencerContext; use ethers::{ core::k256::ecdsa::SigningKey, @@ -184,6 +184,33 @@ pub struct NodeState { } impl NodeState { + pub fn new( + l1_client: L1Client, + builder_address: Wallet, + catchup: impl StateCatchup + 'static, + ) -> Self { + Self { + l1_client, + peers: Arc::new(catchup), + genesis_state: Default::default(), + builder_address, + } + } + + #[cfg(any(test, feature = "testing"))] + pub fn mock() -> Self { + Self::new( + L1Client::new("http://localhost:3331".parse().unwrap(), Address::default()), + FeeAccount::test_wallet(), + catchup::mock::MockStateCatchup::default(), + ) + } + + pub fn with_l1(mut self, l1_client: L1Client) -> Self { + self.l1_client = l1_client; + self + } + fn l1_client(&self) -> &L1Client { &self.l1_client } @@ -486,13 +513,10 @@ pub mod testing { _pd: Default::default(), }; - let node_state = NodeState { - l1_client: L1Client::new( - self.anvil.endpoint().parse().unwrap(), - Address::default(), - ), - ..Default::default() - }; + let node_state = NodeState::mock().with_l1(L1Client::new( + self.anvil.endpoint().parse().unwrap(), + Address::default(), + )); SequencerContext::init( config, @@ -608,7 +632,7 @@ mod test { .collect(); vid_commitment(&payload_bytes, GENESIS_VID_NUM_STORAGE_NODES) }; - let genesis_state = NodeState::default(); + let genesis_state = NodeState::mock(); Header::genesis(&genesis_state, genesis_commitment, genesis_ns_table) }; diff --git a/sequencer/src/persistence.rs b/sequencer/src/persistence.rs index 8dd838be5..b9c6f46d7 100644 --- a/sequencer/src/persistence.rs +++ b/sequencer/src/persistence.rs @@ -190,7 +190,7 @@ mod persistence_tests { assert_eq!(storage.load_anchor_leaf().await.unwrap(), None); // Store a leaf. - let leaf1 = Leaf::genesis(&NodeState::default()); + let leaf1 = Leaf::genesis(&NodeState::mock()); storage.save_anchor_leaf(&leaf1).await.unwrap(); assert_eq!(storage.load_anchor_leaf().await.unwrap().unwrap(), leaf1); diff --git a/sequencer/src/state.rs b/sequencer/src/state.rs index e9bd4dd66..9975982f9 100644 --- a/sequencer/src/state.rs +++ b/sequencer/src/state.rs @@ -13,10 +13,7 @@ use ethers::{ types::{self, RecoveryMessage, U256}, }; use hotshot::traits::ValidatedState as HotShotState; -use hotshot_types::{ - data::{BlockError, ViewNumber}, - traits::node_implementation::ConsensusTime as _, -}; +use hotshot_types::data::{BlockError, ViewNumber}; use itertools::Itertools; use jf_primitives::merkle_tree::{ToTraversalPath, UniversalMerkleTreeScheme}; use jf_primitives::{ @@ -85,11 +82,16 @@ impl ValidatedState { } /// Check if the merkle tree is available - pub fn need_to_fetch_blocks_mt_frontier(&self, view: ViewNumber) -> bool { - self.block_merkle_tree - .lookup(view.get_u64()) - .expect_ok() - .is_err() + pub fn need_to_fetch_blocks_mt_frontier(&self) -> bool { + let num_leaves = self.block_merkle_tree.num_leaves(); + if num_leaves == 0 { + false + } else { + self.block_merkle_tree + .lookup(num_leaves - 1) + .expect_ok() + .is_err() + } } /// Insert a fee deposit receipt @@ -268,6 +270,10 @@ impl HotShotState for ValidatedState { fn on_commit(&self) {} /// Validate parent against known values (from state) and validate /// proposal descends from parent. Returns updated `ValidatedState`. + #[tracing::instrument( + skip_all, + fields(view = ?parent_leaf.view_number, height = parent_leaf.block_header.height), + )] async fn validate_and_apply_header( &self, instance: &Self::Instance, @@ -293,35 +299,33 @@ impl HotShotState for ValidatedState { let view = parent_leaf.get_view_number(); // Ensure merkle tree has frontier - if self.need_to_fetch_blocks_mt_frontier(view) { + if self.need_to_fetch_blocks_mt_frontier() { instance .peers .as_ref() - .remember_blocks_merkle_tree( - view, - &mut validated_state.block_merkle_tree, - &parent_leaf.get_block_header().commit(), - ) + .remember_blocks_merkle_tree(view, &mut validated_state.block_merkle_tree) .await; } // Fetch missing fee state entries - let missing_account_proofs = instance - .peers - .as_ref() - .fetch_accounts( - view, - validated_state.fee_merkle_tree.commitment(), - missing_accounts, - ) - .await; - - // Remember the fee state entries - for account in missing_account_proofs.iter() { - account - .proof - .remember(&mut validated_state.fee_merkle_tree) - .expect("proof previously verified"); + if !missing_accounts.is_empty() { + let missing_account_proofs = instance + .peers + .as_ref() + .fetch_accounts( + view, + validated_state.fee_merkle_tree.commitment(), + missing_accounts, + ) + .await; + + // Remember the fee state entries + for account in missing_account_proofs.iter() { + account + .proof + .remember(&mut validated_state.fee_merkle_tree) + .expect("proof previously verified"); + } } // Lastly validate and apply the header From 1ec5e6bf07411b749470ebef865042c16fa7da50 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Fri, 8 Mar 2024 16:24:52 -0800 Subject: [PATCH 3/4] Add end-to-end catchup test, make things work when nodes have different builder IDs --- .env | 1 - docker-compose.yaml | 10 ++-- process-compose.yaml | 10 ++-- sequencer/src/api.rs | 106 ++++++++++++++++++++++++++++++++++++--- sequencer/src/catchup.rs | 10 ++-- sequencer/src/header.rs | 12 +++-- sequencer/src/lib.rs | 52 +++++++++++++++---- sequencer/src/state.rs | 88 +++++++++++++++++++++----------- 8 files changed, 223 insertions(+), 66 deletions(-) diff --git a/.env b/.env index e0c9ece86..5660255e4 100644 --- a/.env +++ b/.env @@ -35,7 +35,6 @@ ESPRESSO_SEQUENCER_L1_USE_LATEST_BLOCK_TAG=true ESPRESSO_SEQUENCER_ETH_MNEMONIC="test test test test test test test test test test test junk" ESPRESSO_SEQUENCER_HOTSHOT_ACCOUNT_INDEX=5 ESPRESSO_SEQUENCER_HOTSHOT_NUM_BLOCKS_PER_EPOCH=4294967295 # u32::MAX for now as we do not handle epochs -ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=8 ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS=0x23618e81E3f5cdF7f54C3d65f7FBc0aBf5B21E8f ESPRESSO_COMMITMENT_TASK_PORT=60000 ESPRESSO_SEQUENCER_DB_PORT=5432 diff --git a/docker-compose.yaml b/docker-compose.yaml index e39fee343..292041a05 100644 --- a/docker-compose.yaml +++ b/docker-compose.yaml @@ -103,7 +103,7 @@ services: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_0 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_0 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=10 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - RUST_LOG - RUST_LOG_FORMAT @@ -139,7 +139,7 @@ services: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_1 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_1 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=11 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - RUST_LOG - RUST_LOG_FORMAT @@ -170,7 +170,7 @@ services: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_2 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_2 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=12 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - RUST_LOG - RUST_LOG_FORMAT @@ -200,7 +200,7 @@ services: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_3 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_3 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=13 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - RUST_LOG - RUST_LOG_FORMAT @@ -230,7 +230,7 @@ services: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_4 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_4 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=14 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - RUST_LOG - RUST_LOG_FORMAT diff --git a/process-compose.yaml b/process-compose.yaml index 0e6fbfb61..b6e931b8b 100644 --- a/process-compose.yaml +++ b/process-compose.yaml @@ -107,7 +107,7 @@ processes: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_0 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_0 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=10 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - ESPRESSO_SEQUENCER_L1_PROVIDER depends_on: @@ -135,7 +135,7 @@ processes: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_1 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_1 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=11 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - ESPRESSO_SEQUENCER_L1_PROVIDER depends_on: @@ -162,7 +162,7 @@ processes: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_2 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_2 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=12 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - ESPRESSO_SEQUENCER_L1_PROVIDER depends_on: @@ -189,7 +189,7 @@ processes: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_3 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_3 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=13 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - ESPRESSO_SEQUENCER_L1_PROVIDER depends_on: @@ -216,7 +216,7 @@ processes: - ESPRESSO_SEQUENCER_PRIVATE_STAKING_KEY=$ESPRESSO_DEMO_SEQUENCER_STAKING_PRIVATE_KEY_4 - ESPRESSO_SEQUENCER_PRIVATE_STATE_KEY=$ESPRESSO_DEMO_SEQUENCER_STATE_PRIVATE_KEY_4 - ESPRESSO_SEQUENCER_ETH_MNEMONIC - - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX + - ESPRESSO_SEQUENCER_ETH_ACCOUNT_INDEX=14 - ESPRESSO_SEQUENCER_PREFUNDED_BUILDER_ACCOUNTS - ESPRESSO_SEQUENCER_L1_PROVIDER depends_on: diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 61a8fdb4b..265cc2565 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -86,6 +86,7 @@ mod test_helpers { use super::*; use crate::{ api::endpoints::{AccountQueryData, BlocksFrontier}, + catchup::{mock::MockStateCatchup, StateCatchup}, persistence::{no_storage::NoStorage, SequencerPersistence}, state::BlockMerkleTree, testing::{wait_for_decide_on_handle, TestConfig}, @@ -116,28 +117,36 @@ mod test_helpers { impl TestNetwork { pub async fn with_state( opt: Options, + state: [ValidatedState; TestConfig::NUM_NODES], persistence: [impl SequencerPersistence; TestConfig::NUM_NODES], + catchup: impl StateCatchup + Clone + 'static, ) -> Self { let cfg = TestConfig::default(); - let mut nodes = - join_all(persistence.into_iter().enumerate().map(|(i, persistence)| { + let mut nodes = join_all(state.into_iter().zip(persistence).enumerate().map( + |(i, (state, persistence))| { let opt = opt.clone(); let cfg = &cfg; + let catchup = catchup.clone(); async move { if i == 0 { opt.serve(|metrics| { let cfg = cfg.clone(); - async move { cfg.init_node(0, persistence, &*metrics).await } - .boxed() + async move { + cfg.init_node(0, state, persistence, catchup, &*metrics) + .await + } + .boxed() }) .await .unwrap() } else { - cfg.init_node(i, persistence, &NoMetrics).await + cfg.init_node(i, state, persistence, catchup, &NoMetrics) + .await } } - })) - .await; + }, + )) + .await; for ctx in &nodes { ctx.start_consensus().await; @@ -150,7 +159,13 @@ mod test_helpers { } pub async fn new(opt: Options) -> Self { - Self::with_state(opt, [NoStorage; TestConfig::NUM_NODES]).await + Self::with_state( + opt, + Default::default(), + [NoStorage; TestConfig::NUM_NODES], + MockStateCatchup::default(), + ) + .await } pub async fn stop_consensus(&mut self) { @@ -409,6 +424,7 @@ mod test_helpers { mod api_tests { use super::*; use crate::{ + catchup::mock::MockStateCatchup, testing::{wait_for_decide_on_handle, TestConfig}, Header, Transaction, }; @@ -547,7 +563,9 @@ mod api_tests { let port = pick_unused_port().unwrap(); let mut network = TestNetwork::with_state( D::options(&storage[0], options::Http { port }.into()).status(Default::default()), + Default::default(), persistence, + MockStateCatchup::default(), ) .await; @@ -600,7 +618,9 @@ mod api_tests { .unwrap(); let _network = TestNetwork::with_state( D::options(&storage[0], options::Http { port }.into()), + Default::default(), persistence, + MockStateCatchup::default(), ) .await; let client: Client = @@ -815,7 +835,17 @@ mod api_tests { #[cfg(test)] mod test { use super::*; + use crate::{ + catchup::StatePeers, persistence::no_storage::NoStorage, testing::TestConfig, Header, + NodeState, + }; use async_compatibility_layer::logging::{setup_backtrace, setup_logging}; + use commit::Committable; + use ethers::prelude::Signer; + use futures::stream::StreamExt; + use hotshot::types::EventType; + use hotshot_types::traits::block_contents::BlockHeader; + use jf_primitives::merkle_tree::AppendableMerkleTreeScheme; use portpicker::pick_unused_port; use surf_disco::Client; use test_helpers::{ @@ -859,4 +889,64 @@ mod test { async fn state_test_without_query_module() { state_test_helper(|opt| opt).await } + + #[async_std::test] + async fn test_catchup() { + setup_logging(); + setup_backtrace(); + + // Create some non-trivial initial state. We will give all the nodes in the network this + // state, except for one, which will have a forgotten state and need to catch up. + let mut state = ValidatedState::default(); + // Prefund an arbitrary account so the fee state has some data to forget. + state.prefund_account(Default::default(), 1000.into()); + // Push an arbitrary header commitment so the block state has some data to forget. + state + .block_merkle_tree + .push( + Header::genesis(&NodeState::mock(), Default::default(), Default::default()) + .commit(), + ) + .unwrap(); + let states = std::array::from_fn(|i| { + if i == TestConfig::NUM_NODES - 1 { + state.forget() + } else { + state.clone() + } + }); + + // Start a sequencer network, using the query service for catchup. + let port = pick_unused_port().expect("No ports free"); + let network = TestNetwork::with_state( + Options::from(options::Http { port }).state(Default::default()), + states, + [NoStorage; TestConfig::NUM_NODES], + StatePeers::from_urls(vec![format!("http://localhost:{port}").parse().unwrap()]), + ) + .await; + let mut events = network.server.get_event_stream(); + + // Wait for a (non-genesis) block proposed by the lagging node, to prove that it has caught + // up. + let builder = TestConfig::builder_wallet(TestConfig::NUM_NODES - 1) + .address() + .into(); + 'outer: loop { + let event = events.next().await.unwrap(); + let EventType::Decide { leaf_chain, .. } = event.event else { + continue; + }; + for (leaf, _) in leaf_chain.iter().rev() { + let height = leaf.block_header.height; + let leaf_builder = leaf.block_header.fee_info.account(); + tracing::info!( + "waiting for block from {builder}, block {height} is from {leaf_builder}", + ); + if height > 1 && leaf_builder == builder { + break 'outer; + } + } + } + } } diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index ef3768c15..f48954a1a 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -1,11 +1,10 @@ use crate::{ api::endpoints::{AccountQueryData, BlocksFrontier}, - state::{BlockMerkleTree, FeeAccount, FeeAccountProof, FeeMerkleCommitment}, - ValidatedState, + state::{BlockMerkleTree, FeeAccount, FeeMerkleCommitment}, }; use async_trait::async_trait; use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _}; -use jf_primitives::merkle_tree::{ForgetableMerkleTreeScheme, MerkleTreeScheme as _}; +use jf_primitives::merkle_tree::ForgetableMerkleTreeScheme; use serde::de::DeserializeOwned; use std::time::Duration; use surf_disco::Request; @@ -80,9 +79,8 @@ impl StatePeers { ); match client .get::(&format!( - "state/catchup/{}/account/{}", + "state/catchup/{}/account/{account}", view.get_u64(), - account.address() )) .send() .await @@ -157,6 +155,8 @@ impl StateCatchup for StatePeers { #[cfg(any(test, feature = "testing"))] pub mod mock { use super::*; + use crate::state::{FeeAccountProof, ValidatedState}; + use jf_primitives::merkle_tree::MerkleTreeScheme; use std::collections::HashMap; #[derive(Debug, Clone, Default)] diff --git a/sequencer/src/header.rs b/sequencer/src/header.rs index 7d93dfaba..57bb4f05e 100644 --- a/sequencer/src/header.rs +++ b/sequencer/src/header.rs @@ -211,7 +211,7 @@ impl Header { ns_table, fee_merkle_tree_root, block_merkle_tree_root, - fee_info: parent_header.fee_info, + fee_info: FeeInfo::base_fee(builder_address.address().into()), builder_signature: None, }; @@ -256,6 +256,11 @@ impl BlockHeader for Header { .chain(l1_deposits.iter().map(|info| info.account())), ); if !missing_accounts.is_empty() { + tracing::warn!( + "fetching {} missing accounts from peers", + missing_accounts.len() + ); + // Fetch missing fee state entries let missing_account_proofs = instance_state .peers @@ -278,6 +283,7 @@ impl BlockHeader for Header { // Ensure merkle tree has frontier if validated_state.need_to_fetch_blocks_mt_frontier() { + tracing::warn!("fetching block frontier from peers"); instance_state .peers .as_ref() @@ -323,7 +329,7 @@ impl BlockHeader for Header { ns_table, block_merkle_tree_root, fee_merkle_tree_root, - fee_info: FeeInfo::new(instance_state.builder_address.address().into()), + fee_info: FeeInfo::genesis(), builder_signature: None, } } @@ -430,7 +436,7 @@ mod test_headers { let block_merkle_tree = BlockMerkleTree::from_elems(Some(32), Vec::>::new()).unwrap(); - let fee_info = FeeInfo::default(); + let fee_info = FeeInfo::genesis(); let fee_merkle_tree = FeeMerkleTree::from_kv_set( 20, Vec::from([(fee_info.account(), fee_info.amount())]), diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 1997081fc..7052e325f 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -21,7 +21,6 @@ use ethers::{ use l1_client::L1Client; -use state::FeeAccount; use state_signature::static_stake_table_commitment; use url::Url; pub mod bytes; @@ -201,7 +200,7 @@ impl NodeState { pub fn mock() -> Self { Self::new( L1Client::new("http://localhost:3331".parse().unwrap(), Address::default()), - FeeAccount::test_wallet(), + state::FeeAccount::test_wallet(), catchup::mock::MockStateCatchup::default(), ) } @@ -211,6 +210,16 @@ impl NodeState { self } + pub fn with_builder(mut self, wallet: Wallet) -> Self { + self.builder_address = wallet; + self + } + + pub fn with_genesis(mut self, state: ValidatedState) -> Self { + self.genesis_state = state; + self + } + fn l1_client(&self) -> &L1Client { &self.l1_client } @@ -387,7 +396,7 @@ pub async fn init_node( #[cfg(any(test, feature = "testing"))] pub mod testing { use super::*; - use crate::persistence::no_storage::NoStorage; + use crate::{catchup::mock::MockStateCatchup, persistence::no_storage::NoStorage}; use commit::Committable; use ethers::utils::{Anvil, AnvilInstance}; use futures::{ @@ -477,17 +486,25 @@ pub mod testing { } pub async fn init_nodes(&self) -> Vec> { - join_all( - (0..self.num_nodes()) - .map(|i| async move { self.init_node(i, NoStorage, &NoMetrics).await }), - ) + join_all((0..self.num_nodes()).map(|i| async move { + self.init_node( + i, + ValidatedState::default(), + NoStorage, + MockStateCatchup::default(), + &NoMetrics, + ) + .await + })) .await } pub async fn init_node( &self, i: usize, + state: ValidatedState, persistence: impl SequencerPersistence, + catchup: impl StateCatchup + 'static, metrics: &dyn Metrics, ) -> SequencerContext { let mut config = self.config.clone(); @@ -513,10 +530,14 @@ pub mod testing { _pd: Default::default(), }; - let node_state = NodeState::mock().with_l1(L1Client::new( - self.anvil.endpoint().parse().unwrap(), - Address::default(), - )); + let wallet = Self::builder_wallet(i); + tracing::info!("node {i} is builder {:x}", wallet.address()); + let node_state = NodeState::new( + L1Client::new(self.anvil.endpoint().parse().unwrap(), Address::default()), + wallet, + catchup, + ) + .with_genesis(state); SequencerContext::init( config, @@ -530,6 +551,15 @@ pub mod testing { .await .unwrap() } + + pub fn builder_wallet(i: usize) -> Wallet { + MnemonicBuilder::::default() + .phrase("test test test test test test test test test test test junk") + .index(i as u32) + .unwrap() + .build() + .unwrap() + } } // Wait for decide event, make sure it matches submitted transaction. Return the block number diff --git a/sequencer/src/state.rs b/sequencer/src/state.rs index 9975982f9..db58eb57c 100644 --- a/sequencer/src/state.rs +++ b/sequencer/src/state.rs @@ -5,11 +5,11 @@ use ark_serialize::{ }; use commit::{Commitment, Committable, RawCommitmentBuilder}; use contract_bindings::fee_contract::DepositFilter; -use derive_more::{Add, From, Into, Sub}; +use derive_more::{Add, Display, From, Into, Sub}; use ethers::{ abi::Address, core::k256::ecdsa::SigningKey, - signers::{coins_bip39::English, MnemonicBuilder, Signer, Wallet}, + signers::{coins_bip39::English, MnemonicBuilder, Wallet}, types::{self, RecoveryMessage, U256}, }; use hotshot::traits::ValidatedState as HotShotState; @@ -186,19 +186,32 @@ fn charge_fee( fee_info: FeeInfo, ) -> Result<(), ChargeFeeError> { let FeeInfo { account, amount } = fee_info; - let lookup = fee_merkle_tree.universal_lookup(account); - match lookup { - LookupResult::Ok(balance, _) => { - let updated = balance - .checked_sub(&amount) - .ok_or(ChargeFeeError::InsufficientFunds)?; - fee_merkle_tree - .update(account, updated) - .expect("update succeeds"); - Ok(()) - } - LookupResult::NotInMemory => Err(ChargeFeeError::NotInMemory), - LookupResult::NotFound(..) => Err(ChargeFeeError::InsufficientFunds), + let mut err = None; + let res = fee_merkle_tree + .update_with(account, |balance| { + let balance = balance.copied(); + let Some(updated) = balance.unwrap_or_default().checked_sub(&amount) else { + // Return an error without updating the account. + err = Some(ChargeFeeError::InsufficientFunds); + return balance; + }; + if updated == FeeAmount::default() { + // Delete the account from the tree if its balance ended up at 0; this saves some + // space since the account is no longer carrying any information. + None + } else { + // Otherwise store the updated balance. + Some(updated) + } + }) + .expect("updated succeeds"); + if res.expect_not_in_memory().is_ok() { + return Err(ChargeFeeError::NotInMemory); + } + if let Some(err) = err { + Err(err) + } else { + Ok(()) } } @@ -300,6 +313,7 @@ impl HotShotState for ValidatedState { // Ensure merkle tree has frontier if self.need_to_fetch_blocks_mt_frontier() { + tracing::warn!("fetching block frontier from peers"); instance .peers .as_ref() @@ -309,6 +323,11 @@ impl HotShotState for ValidatedState { // Fetch missing fee state entries if !missing_accounts.is_empty() { + tracing::warn!( + "fetching {} missing accounts from peers", + missing_accounts.len() + ); + let missing_account_proofs = instance .peers .as_ref() @@ -396,13 +415,27 @@ pub struct FeeInfo { amount: FeeAmount, } impl FeeInfo { - pub fn new(account: FeeAccount) -> Self { - let amount = FeeAmount::default(); // TODO grab from config (instance_state?) - Self { account, amount } + /// The minimum fee paid by the given builder account for a proposed block. + // TODO this function should take the block size as an input, we need to get this information + // from HotShot. + pub fn base_fee(account: FeeAccount) -> Self { + Self { + account, + amount: FeeAmount::default(), + } + } + + pub fn genesis() -> Self { + Self { + account: Default::default(), + amount: Default::default(), + } } + pub fn account(&self) -> FeeAccount { self.account } + pub fn amount(&self) -> FeeAmount { self.amount } @@ -417,15 +450,6 @@ impl From for FeeInfo { } } -impl Default for FeeInfo { - fn default() -> Self { - Self { - amount: FeeAmount::default(), - account: FeeAccount::test_wallet().address().into(), - } - } -} - impl Committable for FeeInfo { fn commit(&self) -> Commitment { RawCommitmentBuilder::new(&Self::tag()) @@ -453,6 +477,12 @@ impl FeeAmount { } } +impl From for FeeAmount { + fn from(amt: u64) -> Self { + Self(amt.into()) + } +} + impl CheckedSub for FeeAmount { fn checked_sub(&self, v: &Self) -> Option { self.0.checked_sub(v.0).map(FeeAmount) @@ -467,6 +497,7 @@ impl CheckedSub for FeeAmount { Copy, Clone, Debug, + Display, Deserialize, Serialize, PartialEq, @@ -476,6 +507,7 @@ impl CheckedSub for FeeAmount { From, Into, )] +#[display(fmt = "{_0:x}")] pub struct FeeAccount(Address); impl FeeAccount { /// Return inner `Address` @@ -580,7 +612,7 @@ pub fn fetch_fee_receipts( _prev_l1_finalized: Option, _new_l1_finalized: Option, ) -> Vec { - Vec::from([FeeInfo::default()]) + vec![] } pub type FeeMerkleTree = From f1cee9bd831b0cc5e41515e3eddc238b760981f4 Mon Sep 17 00:00:00 2001 From: Jeb Bearer Date: Fri, 8 Mar 2024 17:13:40 -0800 Subject: [PATCH 4/4] Update `test_restart` This commit adds catchup to the restartability test, so that it should in theory pass now. However, it is still failing due to an error from HotShot: "Couldn't find parent view in state map". I will look into this more next week. For now, the test remains ignored. --- sequencer/src/api.rs | 39 +++++++++++++++++++++++++++-------- sequencer/src/catchup.rs | 44 ++++++++++++++++++++++++++++++++++++---- sequencer/src/header.rs | 2 +- 3 files changed, 71 insertions(+), 14 deletions(-) diff --git a/sequencer/src/api.rs b/sequencer/src/api.rs index 265cc2565..db8756a02 100644 --- a/sequencer/src/api.rs +++ b/sequencer/src/api.rs @@ -102,6 +102,7 @@ mod test_helpers { }; use hotshot::types::{Event, EventType}; use hotshot_types::traits::{metrics::NoMetrics, node_implementation::ConsensusTime}; + use itertools::izip; use jf_primitives::merkle_tree::{MerkleCommitment, MerkleTreeScheme}; use portpicker::pick_unused_port; use std::time::Duration; @@ -119,14 +120,13 @@ mod test_helpers { opt: Options, state: [ValidatedState; TestConfig::NUM_NODES], persistence: [impl SequencerPersistence; TestConfig::NUM_NODES], - catchup: impl StateCatchup + Clone + 'static, + catchup: [impl StateCatchup + 'static; TestConfig::NUM_NODES], ) -> Self { let cfg = TestConfig::default(); - let mut nodes = join_all(state.into_iter().zip(persistence).enumerate().map( - |(i, (state, persistence))| { + let mut nodes = join_all(izip!(state, persistence, catchup).enumerate().map( + |(i, (state, persistence, catchup))| { let opt = opt.clone(); let cfg = &cfg; - let catchup = catchup.clone(); async move { if i == 0 { opt.serve(|metrics| { @@ -163,7 +163,7 @@ mod test_helpers { opt, Default::default(), [NoStorage; TestConfig::NUM_NODES], - MockStateCatchup::default(), + std::array::from_fn(|_| MockStateCatchup::default()), ) .await } @@ -424,7 +424,7 @@ mod test_helpers { mod api_tests { use super::*; use crate::{ - catchup::mock::MockStateCatchup, + catchup::{mock::MockStateCatchup, StateCatchup, StatePeers}, testing::{wait_for_decide_on_handle, TestConfig}, Header, Transaction, }; @@ -565,7 +565,7 @@ mod api_tests { D::options(&storage[0], options::Http { port }.into()).status(Default::default()), Default::default(), persistence, - MockStateCatchup::default(), + std::array::from_fn(|_| MockStateCatchup::default()), ) .await; @@ -606,6 +606,10 @@ mod api_tests { .try_collect() .await .unwrap(); + let decided_view = chain.last().unwrap().leaf().view_number; + + // Get the most recent state, for catchup. + let state = network.server.consensus().get_decided_state().await; // Fully shut down the API servers. drop(network); @@ -620,7 +624,22 @@ mod api_tests { D::options(&storage[0], options::Http { port }.into()), Default::default(), persistence, - MockStateCatchup::default(), + std::array::from_fn(|i| { + let catchup: Box = if i == 0 { + // Give the server node a copy of the full state to use for catchup. This + // simulates a node with archival state storage, which is then able to seed the + // rest of the network after a restart. + Box::new(MockStateCatchup::from_iter([(decided_view, state.clone())])) + } else { + // The remaining nodes should use this archival node as a peer for catchup. + Box::new(StatePeers::from_urls(vec![format!( + "http://localhost:{port}" + ) + .parse() + .unwrap()])) + }; + catchup + }), ) .await; let client: Client = @@ -922,7 +941,9 @@ mod test { Options::from(options::Http { port }).state(Default::default()), states, [NoStorage; TestConfig::NUM_NODES], - StatePeers::from_urls(vec![format!("http://localhost:{port}").parse().unwrap()]), + std::array::from_fn(|_| { + StatePeers::from_urls(vec![format!("http://localhost:{port}").parse().unwrap()]) + }), ) .await; let mut events = network.server.get_event_stream(); diff --git a/sequencer/src/catchup.rs b/sequencer/src/catchup.rs index f48954a1a..0f4202834 100644 --- a/sequencer/src/catchup.rs +++ b/sequencer/src/catchup.rs @@ -6,7 +6,7 @@ use async_trait::async_trait; use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _}; use jf_primitives::merkle_tree::ForgetableMerkleTreeScheme; use serde::de::DeserializeOwned; -use std::time::Duration; +use std::{sync::Arc, time::Duration}; use surf_disco::Request; use tide_disco::error::ServerError; use url::Url; @@ -152,6 +152,42 @@ impl StateCatchup for StatePeers { } } +#[async_trait] +impl StateCatchup for Box { + async fn fetch_accounts( + &self, + view: ViewNumber, + fee_merkle_tree_root: FeeMerkleCommitment, + accounts: Vec, + ) -> Vec { + (**self) + .fetch_accounts(view, fee_merkle_tree_root, accounts) + .await + } + + async fn remember_blocks_merkle_tree(&self, view: ViewNumber, mt: &mut BlockMerkleTree) { + (**self).remember_blocks_merkle_tree(view, mt).await + } +} + +#[async_trait] +impl StateCatchup for Arc { + async fn fetch_accounts( + &self, + view: ViewNumber, + fee_merkle_tree_root: FeeMerkleCommitment, + accounts: Vec, + ) -> Vec { + (**self) + .fetch_accounts(view, fee_merkle_tree_root, accounts) + .await + } + + async fn remember_blocks_merkle_tree(&self, view: ViewNumber, mt: &mut BlockMerkleTree) { + (**self).remember_blocks_merkle_tree(view, mt).await + } +} + #[cfg(any(test, feature = "testing"))] pub mod mock { use super::*; @@ -161,11 +197,11 @@ pub mod mock { #[derive(Debug, Clone, Default)] pub struct MockStateCatchup { - state: HashMap, + state: HashMap>, } - impl FromIterator<(ViewNumber, ValidatedState)> for MockStateCatchup { - fn from_iter>(iter: I) -> Self { + impl FromIterator<(ViewNumber, Arc)> for MockStateCatchup { + fn from_iter)>>(iter: I) -> Self { Self { state: iter.into_iter().collect(), } diff --git a/sequencer/src/header.rs b/sequencer/src/header.rs index 57bb4f05e..36f4a7f43 100644 --- a/sequencer/src/header.rs +++ b/sequencer/src/header.rs @@ -664,7 +664,7 @@ mod test_headers { let forgotten_state = parent_state.forget(); genesis_state.peers = Arc::new(MockStateCatchup::from_iter([( parent_leaf.view_number, - parent_state.clone(), + Arc::new(parent_state.clone()), )])); // Get a proposal from a parent