Skip to content

Commit

Permalink
WIP: sequencer catchup
Browse files Browse the repository at this point in the history
- Add functions to fetch from peers.
  • Loading branch information
sveitser committed Feb 26, 2024
1 parent 13cf93d commit 525719e
Show file tree
Hide file tree
Showing 5 changed files with 136 additions and 19 deletions.
78 changes: 78 additions & 0 deletions sequencer/src/catchup.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use crate::{
api::endpoints::{AccountQueryData, BlocksFrontier},
state::{BlockMerkleCommitment, BlockMerkleTree, FeeAccount, FeeMerkleTree},
};
use hotshot_types::{data::ViewNumber, traits::node_implementation::ConsensusTime as _};
use jf_primitives::merkle_tree::{MerkleCommitment as _, MerkleTreeScheme as _};
use surf_disco::Client;
use tide_disco::error::ServerError;
use url::Url;

#[derive(Debug, Clone, Default)]
pub struct StatePeers {
clients: Vec<Client<ServerError>>,
}

impl StatePeers {
pub fn from_urls(urls: Vec<Url>) -> Self {
Self {
clients: urls.into_iter().map(Client::new).collect(),
}
}

pub async fn remember_account_balance(
&self,
view: ViewNumber,
account: FeeAccount,
fee_merkle_tree: &mut FeeMerkleTree,
) {
loop {
for client in self.clients.iter() {
match client
.get::<AccountQueryData>(&format!(
"state/catchup/{}/account/{}",
view.get_u64(),
account.address()
))
.send()
.await
{
Ok(res) => match res.proof.remember(fee_merkle_tree) {
Ok(_) => return,
Err(err) => tracing::warn!("Error verifying account proof: {}", err),
},
Err(err) => {
tracing::warn!("Error fetching account from peer: {}", err);
}
}
}
tracing::warn!("Could not fetch account from any peer, retrying");
// TODO: backoff? for testing a function that doesn't loop forever is probably more convenient.
}
}

pub async fn fetch_blocks_frontier(
&self,
view: ViewNumber,
root: BlockMerkleCommitment,
) -> Option<BlocksFrontier> {
for client in self.clients.iter() {
match client
.get::<BlocksFrontier>(&format!("state/catchup/{}/blocks", view.get_u64()))
.send()
.await
{
Ok(frontier) => {
match BlockMerkleTree::verify(root.digest(), root.size() - 1, &frontier) {
Ok(_) => return Some(frontier),
Err(err) => tracing::warn!("Error verifying block proof: {}", err),
}
}
Err(err) => {
tracing::warn!("Error fetching blocks from peer: {}", err);
}
}
}
None
}
}
19 changes: 14 additions & 5 deletions sequencer/src/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ impl BlockHeader for Header {
#[cfg(test)]
mod test_headers {
use crate::{
catchup::StatePeers,
l1_client::L1Client,
state::{validate_proposal, BlockMerkleTree, FeeMerkleTree},
NodeState,
Expand Down Expand Up @@ -531,8 +532,9 @@ mod test_headers {
let mut proposal = parent.clone();

// Advance `proposal.height` to trigger validation error.
let result =
validate_proposal(&mut validated_state, &parent.clone(), &proposal).unwrap_err();
let peers = StatePeers::default();
let result = validate_proposal(&peers, &mut validated_state, &parent.clone(), &proposal)
.unwrap_err();
assert_eq!(
format!("{}", result.root_cause()),
"Invalid Height Error: 0, 0"
Expand All @@ -541,8 +543,8 @@ mod test_headers {
// proposed `Header` root should include parent +
// parent.commit
proposal.height += 1;
let result =
validate_proposal(&mut validated_state, &parent.clone(), &proposal).unwrap_err();
let result = validate_proposal(&peers, &mut validated_state, &parent.clone(), &proposal)
.unwrap_err();
// Fails b/c `proposal` has not advanced from `parent`
assert!(format!("{}", result.root_cause()).contains("Invalid Block Root Error"));
}
Expand Down Expand Up @@ -583,7 +585,14 @@ mod test_headers {
let mut proposal_state = parent_state.clone();
let mut block_merkle_tree = proposal_state.block_merkle_tree.clone();
block_merkle_tree.push(proposal.commit()).unwrap();
validate_proposal(&mut proposal_state, &parent.clone(), &proposal.clone()).unwrap();
let peers = StatePeers::default();
validate_proposal(
&peers,
&mut proposal_state,
&parent.clone(),
&proposal.clone(),
)
.unwrap();
assert_eq!(
proposal_state.block_merkle_tree.commitment(),
proposal.block_merkle_tree_root
Expand Down
6 changes: 6 additions & 0 deletions sequencer/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod api;
pub mod block;
pub mod catchup;
mod chain_variables;
pub mod context;
mod header;
Expand All @@ -8,6 +9,7 @@ pub mod options;
pub mod state_signature;

use block::entry::TxTableEntryWord;
use catchup::StatePeers;
use context::SequencerContext;
use ethers::{
core::k256::ecdsa::SigningKey,
Expand Down Expand Up @@ -145,6 +147,7 @@ impl<N: network::Type> NodeImplementation<SeqTypes> for Node<N> {
#[derive(Clone, Debug)]
pub struct NodeState {
l1_client: L1Client,
peers: StatePeers,
genesis_state: ValidatedState,
builder_address: Wallet<SigningKey>,
}
Expand All @@ -163,6 +166,7 @@ impl Default for NodeState {
genesis_state: ValidatedState::default(),
builder_address: wallet,
l1_client: L1Client::new("http://localhost:3331".parse().unwrap()),
peers: StatePeers::default(),
}
}
}
Expand Down Expand Up @@ -269,6 +273,7 @@ pub async fn init_node(
persistence: &mut impl SequencerPersistence,
builder_params: BuilderParams,
l1_params: L1Params,
peers: Vec<Url>,
) -> anyhow::Result<SequencerContext<network::Web>> {
// Orchestrator client
let validator_args = ValidatorArgs {
Expand Down Expand Up @@ -372,6 +377,7 @@ pub async fn init_node(
l1_client,
builder_address: wallet,
genesis_state,
peers: StatePeers::from_urls(peers),
};
let hotshot = init_hotshot(
pub_keys.clone(),
Expand Down
2 changes: 2 additions & 0 deletions sequencer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ where
&mut storage,
builder_params,
l1_params,
vec![], // TODO: peers
)
.await
.unwrap()
Expand All @@ -109,6 +110,7 @@ where
&mut storage_opt.create().await?,
builder_params,
l1_params,
vec![], // TODO: peers
)
.await?
}
Expand Down
50 changes: 36 additions & 14 deletions sequencer/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::block::entry::TxTableEntryWord;
use crate::catchup::StatePeers;
use crate::{Header, L1BlockInfo, NodeState, Payload};
use anyhow::{ensure, Context};
use ark_serialize::{
CanonicalDeserialize, CanonicalSerialize, Compress, Read, SerializationError, Valid, Validate,
};
use async_std::task::block_on;
use commit::{Commitment, Committable, RawCommitmentBuilder};
use derive_more::{Add, From, Into, Sub};
use ethers::{
Expand All @@ -13,6 +15,7 @@ use ethers::{
types::{self, RecoveryMessage, U256},
};
use hotshot::traits::ValidatedState as HotShotState;
use hotshot_query_service::data_source::VersionedDataSource;
use hotshot_types::data::{BlockError, ViewNumber};
use jf_primitives::merkle_tree::{
prelude::{LightWeightSHA3MerkleTree, Sha3Digest, Sha3Node},
Expand Down Expand Up @@ -59,6 +62,7 @@ impl ValidatedState {
}

pub fn validate_proposal(
peers: &StatePeers,
state: &mut ValidatedState,
parent: &Header,
proposal: &Header,
Expand Down Expand Up @@ -94,14 +98,17 @@ pub fn validate_proposal(
let receipts = fetch_fee_receipts(parent.l1_finalized, proposal.l1_finalized);
for FeeInfo { account, amount } in receipts {
// Get the balance in order to add amount, ignoring the proof.
match fee_merkle_tree.universal_lookup(account) {
LookupResult::Ok(balance, _) => fee_merkle_tree
.update(account, balance.add(amount))
.unwrap(),
// Handle `NotFound` and `NotInMemory` by initializing
// state.
_ => fee_merkle_tree.update(account, amount).unwrap(),
};
let lookup = fee_merkle_tree.universal_lookup(account);
if matches!(lookup, LookupResult::NotInMemory) {
let view = todo!(); // where to get the view number?
block_on(peers.remember_account_balance(view, account, fee_merkle_tree));
}
fee_merkle_tree
.update_with(account, |balance| {
Some(balance.cloned().unwrap_or_default().add(amount))
})
.expect("update_with succeeds");
// TODO: can this really not fail?
}

let fee_merkle_tree_root = fee_merkle_tree.commitment();
Expand All @@ -117,17 +124,26 @@ pub fn validate_proposal(
}

/// Fetch receipts from the l1 and add them to local balance.
fn update_balance(fee_merkle_tree: &mut FeeMerkleTree, parent: &Header, proposed: &Header) {
fn update_balance(
peers: &StatePeers,
fee_merkle_tree: &mut FeeMerkleTree,
parent: &Header,
proposed: &Header,
) {
let receipts = fetch_fee_receipts(parent.l1_finalized, proposed.l1_finalized);
for FeeInfo { account, amount } in receipts {
// Add `amount` to `balance`, if `balance` is `None` set it to `amount`
#[allow(clippy::single_match)]
match fee_merkle_tree.update_with(account, |balance| {
Some(balance.cloned().unwrap_or_default().add(amount))
}) {
Ok(LookupResult::Ok(..)) => (),
// TODO handle `LookupResult::NotInMemory` by looking up the value from
// a peer during catchup.
Ok(LookupResult::NotInMemory) => {
// This account is not in memory, attempt to fetch it from peers.
let view = todo!(); // where to get the view number?
block_on(peers.remember_account_balance(view, account, fee_merkle_tree));
// Now that the entry is "remembered" the update should succeed.
update_balance(peers, fee_merkle_tree, parent, proposed);
}
_ => (),
}
}
Expand Down Expand Up @@ -192,15 +208,20 @@ impl HotShotState for ValidatedState {
/// proposal descends from parent. Returns updated `ValidatedState`.
fn validate_and_apply_header(
&self,
_instance: &Self::Instance,
instance: &Self::Instance,
parent_header: &Self::BlockHeader,
proposed_header: &Self::BlockHeader,
) -> Result<Self, Self::Error> {
// Clone state to avoid mutation. Consumer can take update
// through returned value.
let mut validated_state = self.clone();
// validate proposed header against parent
match validate_proposal(&mut validated_state, parent_header, proposed_header) {
match validate_proposal(
&instance.peers,
&mut validated_state,
parent_header,
proposed_header,
) {
// Note that currently only block state is updated.
Ok(validated_state) => validated_state,
Err(e) => {
Expand All @@ -211,6 +232,7 @@ impl HotShotState for ValidatedState {

// Update account balance from the l1
update_balance(
&instance.peers,
&mut validated_state.fee_merkle_tree,
parent_header,
proposed_header,
Expand Down

0 comments on commit 525719e

Please sign in to comment.