diff --git a/crates/pathfinder/src/core.rs b/crates/pathfinder/src/core.rs index f2bff1cb84..aafbbe14fc 100644 --- a/crates/pathfinder/src/core.rs +++ b/crates/pathfinder/src/core.rs @@ -185,6 +185,15 @@ pub struct EthereumTransactionIndex(pub u64); #[derive(Debug, Copy, Clone, PartialEq, Hash, Eq)] pub struct EthereumLogIndex(pub u64); +/// A way of identifying a specific block. +#[derive(Debug, Copy, Clone, PartialEq)] +pub enum BlockId { + Number(StarknetBlockNumber), + Hash(StarknetBlockHash), + Latest, + Pending, +} + impl StarknetBlockNumber { pub const GENESIS: StarknetBlockNumber = StarknetBlockNumber(0); } @@ -276,3 +285,41 @@ impl From for GasPrice { Self(u128::from(src)) } } + +impl From for BlockId { + fn from(block: crate::rpc::types::BlockNumberOrTag) -> Self { + use crate::rpc::types::BlockNumberOrTag::*; + use crate::rpc::types::Tag::*; + + match block { + Number(number) => Self::Number(number), + Tag(Latest) => Self::Latest, + Tag(Pending) => Self::Pending, + } + } +} + +impl From for BlockId { + fn from(block: crate::rpc::types::BlockHashOrTag) -> Self { + use crate::rpc::types::BlockHashOrTag::*; + use crate::rpc::types::Tag::*; + + match block { + Hash(hash) => Self::Hash(hash), + Tag(Latest) => Self::Latest, + Tag(Pending) => Self::Pending, + } + } +} + +impl From for BlockId { + fn from(number: StarknetBlockNumber) -> Self { + Self::Number(number) + } +} + +impl From for BlockId { + fn from(hash: StarknetBlockHash) -> Self { + Self::Hash(hash) + } +} diff --git a/crates/pathfinder/src/rpc/api.rs b/crates/pathfinder/src/rpc/api.rs index be3f0ee019..731a816f06 100644 --- a/crates/pathfinder/src/rpc/api.rs +++ b/crates/pathfinder/src/rpc/api.rs @@ -99,7 +99,7 @@ impl RpcApi { BlockHashOrTag::Tag(Tag::Pending) => { let block = self .sequencer - .block_by_hash(block_hash) + .block(block_hash.into()) .await .map_err(internal_server_error)?; @@ -218,7 +218,7 @@ impl RpcApi { BlockNumberOrTag::Tag(Tag::Pending) => { let block = self .sequencer - .block_by_number(block_number) + .block(block_number.into()) .await .context("Fetch block from sequencer") .map_err(internal_server_error)?; @@ -514,7 +514,7 @@ impl RpcApi { BlockHashOrTag::Tag(Tag::Pending) => { let block = self .sequencer - .block_by_hash(block_hash) + .block(block_hash.into()) .await .context("Fetch block from sequencer") .map_err(internal_server_error)?; @@ -589,7 +589,7 @@ impl RpcApi { BlockNumberOrTag::Tag(Tag::Pending) => { let block = self .sequencer - .block_by_number(block_number) + .block(block_number.into()) .await .context("Fetch block from sequencer") .map_err(internal_server_error)?; @@ -747,7 +747,7 @@ impl RpcApi { BlockHashOrTag::Tag(Tag::Pending) => { let block = self .sequencer - .block_by_hash(block_hash) + .block(block_hash.into()) .await .context("Fetch block from sequencer") .map_err(internal_server_error)?; @@ -814,7 +814,7 @@ impl RpcApi { BlockNumberOrTag::Tag(Tag::Pending) => { let block = self .sequencer - .block_by_number(block_number) + .block(block_number.into()) .await .context("Fetch block from sequencer") .map_err(internal_server_error)?; diff --git a/crates/pathfinder/src/sequencer.rs b/crates/pathfinder/src/sequencer.rs index 5eff84489a..2adea1f12a 100644 --- a/crates/pathfinder/src/sequencer.rs +++ b/crates/pathfinder/src/sequencer.rs @@ -1,37 +1,27 @@ //! StarkNet L2 sequencer client. +mod builder; pub mod error; pub mod reply; pub mod request; -use self::{ - error::StarknetError, - request::{add_transaction::ContractDefinition, Call}, -}; +use self::request::{add_transaction::ContractDefinition, Call}; use crate::{ core::{ - CallSignatureElem, ClassHash, ConstructorParam, ContractAddress, ContractAddressSalt, Fee, - StarknetTransactionHash, StorageAddress, StorageValue, TransactionNonce, - TransactionVersion, + BlockId, CallSignatureElem, ClassHash, ConstructorParam, ContractAddress, + ContractAddressSalt, Fee, StarknetTransactionHash, StorageAddress, StorageValue, + TransactionNonce, TransactionVersion, }, ethereum::Chain, - rpc::types::{BlockHashOrTag, BlockNumberOrTag, Tag}, + rpc::types::BlockHashOrTag, sequencer::error::SequencerError, }; use reqwest::Url; -use std::{borrow::Cow, fmt::Debug, future::Future, result::Result, time::Duration}; +use std::{fmt::Debug, result::Result, time::Duration}; #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] pub trait ClientApi { - async fn block_by_number( - &self, - block_number: BlockNumberOrTag, - ) -> Result; - - async fn block_by_hash( - &self, - block_hash: BlockHashOrTag, - ) -> Result; + async fn block(&self, block: BlockId) -> Result; async fn call( &self, @@ -68,15 +58,7 @@ pub trait ClientApi { transaction_hash: StarknetTransactionHash, ) -> Result; - async fn state_update_by_hash( - &self, - block_hash: BlockHashOrTag, - ) -> Result; - - async fn state_update_by_number( - &self, - block_number: BlockNumberOrTag, - ) -> Result; + async fn state_update(&self, block: BlockId) -> Result; async fn eth_contract_addresses(&self) -> Result; @@ -126,123 +108,12 @@ pub struct Client { sequencer_url: Url, } -/// Helper function which simplifies the handling of optional block hashes in queries. -fn block_hash_str(hash: BlockHashOrTag) -> (&'static str, Cow<'static, str>) { - match hash { - BlockHashOrTag::Hash(h) => ("blockHash", h.0.to_hex_str()), - BlockHashOrTag::Tag(Tag::Latest) => ("blockNumber", Cow::from("latest")), - BlockHashOrTag::Tag(Tag::Pending) => ("blockNumber", Cow::from("pending")), - } -} - -/// Helper function which simplifies the handling of optional block numbers in queries. -fn block_number_str(number: BlockNumberOrTag) -> Cow<'static, str> { - match number { - BlockNumberOrTag::Number(n) => Cow::from(n.0.to_string()), - BlockNumberOrTag::Tag(Tag::Latest) => Cow::from("latest"), - BlockNumberOrTag::Tag(Tag::Pending) => Cow::from("pending"), - } -} - -/// __Mandatory__ function to parse every sequencer query response and deserialize -/// to expected output type. -async fn parse(resp: reqwest::Response) -> Result -where - T: ::serde::de::DeserializeOwned, -{ - let resp = parse_raw(resp).await?; - // Attempt to deserialize the actual data we are looking for - let resp = resp.json::().await?; - Ok(resp) -} - -/// Helper function which allows skipping deserialization when required. -async fn parse_raw(resp: reqwest::Response) -> Result { - // Starknet specific errors end with a 500 status code - // but the body contains a JSON object with the error description - if resp.status() == reqwest::StatusCode::INTERNAL_SERVER_ERROR { - let starknet_error = resp.json::().await?; - return Err(SequencerError::StarknetError(starknet_error)); - } - // Status codes <400;499> and <501;599> are mapped to SequencerError::TransportError - resp.error_for_status_ref().map(|_| ())?; - Ok(resp) -} - -/// Wrapper function to allow retrying sequencer queries in an exponential manner. -/// -/// Does not retry in tests. -async fn retry(future_factory: FutureFactory) -> Result -where - Fut: Future>, - FutureFactory: FnMut() -> Fut, -{ - if cfg!(test) { - retry0(future_factory, |_| false).await - } else { - retry0(future_factory, retry_condition).await - } -} - -/// Wrapper function to allow retrying sequencer queries in an exponential manner. -async fn retry0( - future_factory: FutureFactory, - retry_condition: Ret, -) -> Result -where - Fut: Future>, - FutureFactory: FnMut() -> Fut, - Ret: FnMut(&SequencerError) -> bool, -{ - use crate::retry::Retry; - use std::num::NonZeroU64; - - Retry::exponential(future_factory, NonZeroU64::new(2).unwrap()) - .factor(NonZeroU64::new(15).unwrap()) - .max_delay(Duration::from_secs(60 * 60)) - .when(retry_condition) - .await -} - -/// Determines if an error is retryable or not. -fn retry_condition(e: &SequencerError) -> bool { - use reqwest::StatusCode; - use tracing::{debug, error, info, warn}; - - match e { - SequencerError::ReqwestError(e) => { - if e.is_body() || e.is_connect() || e.is_timeout() { - info!(reason=%e, "Request failed, retrying"); - } else if e.is_status() { - match e.status() { - Some( - StatusCode::NOT_FOUND - | StatusCode::TOO_MANY_REQUESTS - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT, - ) => { - debug!(reason=%e, "Request failed, retrying"); - } - Some(StatusCode::INTERNAL_SERVER_ERROR) => { - error!(reason=%e, "Request failed, retrying"); - } - Some(_) => warn!(reason=%e, "Request failed, retrying"), - None => unreachable!(), - } - } else if e.is_decode() { - error!(reason=%e, "Request failed, retrying"); - } else { - warn!(reason=%e, "Request failed, retrying"); - } - - true - } - SequencerError::StarknetError(_) => false, - } -} - impl Client { + #[cfg(not(test))] + const RETRY: builder::Retry = builder::Retry::Enabled; + #[cfg(test)] + const RETRY: builder::Retry = builder::Retry::Disabled; + /// Creates a new Sequencer client for the given chain. pub fn new(chain: Chain) -> reqwest::Result { let url = match chain { @@ -271,58 +142,22 @@ impl Client { }) } - /// Helper function that constructs a URL for particular query. - fn build_query(&self, path_segments: &[&str], params: &[(&str, &str)]) -> Url { - let mut query_url = self.sequencer_url.clone(); - query_url - .path_segments_mut() - .expect("Base URL is valid") - .extend(path_segments); - query_url.query_pairs_mut().extend_pairs(params); - tracing::trace!(%query_url); - query_url + fn request(&self) -> builder::Request { + builder::Request::builder(&self.inner, self.sequencer_url.clone()) } } #[async_trait::async_trait] impl ClientApi for Client { - /// Gets block by number. - #[tracing::instrument(skip(self))] - async fn block_by_number( - &self, - block_number: BlockNumberOrTag, - ) -> Result { - let number = block_number_str(block_number); - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_block"], - &[("blockNumber", &number)], - )) - .send() - .await?; - parse::(resp).await - }) - .await - } - - /// Get block by hash. #[tracing::instrument(skip(self))] - async fn block_by_hash( - &self, - block_hash: BlockHashOrTag, - ) -> Result { - let (tag, hash) = block_hash_str(block_hash); - retry(|| async { - let resp = self - .inner - .get(self.build_query(&["feeder_gateway", "get_block"], &[(tag, &hash)])) - .send() - .await?; - parse::(resp).await - }) - .await + async fn block(&self, block: BlockId) -> Result { + self.request() + .feeder_gateway() + .get_block() + .with_block(block) + .with_retry(Self::RETRY) + .get() + .await } /// Performs a `call` on contract's function. Call result is not stored in L2, as opposed to `invoke`. @@ -332,17 +167,13 @@ impl ClientApi for Client { payload: request::Call, block_hash: BlockHashOrTag, ) -> Result { - let (tag, hash) = block_hash_str(block_hash); - retry(|| async { - let resp = self - .inner - .post(self.build_query(&["feeder_gateway", "call_contract"], &[(tag, &hash)])) - .json(&payload) - .send() - .await?; - parse(resp).await - }) - .await + self.request() + .feeder_gateway() + .call_contract() + .with_block(block_hash) + .with_retry(Self::RETRY) + .post_with_json(&payload) + .await } /// Gets full contract definition. @@ -351,39 +182,25 @@ impl ClientApi for Client { &self, contract_addr: ContractAddress, ) -> Result { - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_full_contract"], - &[("contractAddress", &contract_addr.0.to_hex_str())], - )) - .send() - .await?; - let resp = parse_raw(resp).await?; - let resp = resp.bytes().await?; - Ok(resp) - }) - .await + self.request() + .feeder_gateway() + .get_full_contract() + .with_contract_address(contract_addr) + .with_retry(Self::RETRY) + .get_as_bytes() + .await } /// Gets class for a particular class hash. #[tracing::instrument(skip(self))] async fn class_by_hash(&self, class_hash: ClassHash) -> Result { - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_class_by_hash"], - &[("classHash", &class_hash.0.to_hex_str())], - )) - .send() - .await?; - let resp = parse_raw(resp).await?; - let resp = resp.bytes().await?; - Ok(resp) - }) - .await + self.request() + .feeder_gateway() + .get_class_by_hash() + .with_class_hash(class_hash) + .with_retry(Self::RETRY) + .get_as_bytes() + .await } /// Gets class hash for a particular contract address. @@ -392,18 +209,13 @@ impl ClientApi for Client { &self, contract_address: ContractAddress, ) -> Result { - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_class_hash_at"], - &[("contractAddress", &contract_address.0.to_hex_str())], - )) - .send() - .await?; - parse(resp).await - }) - .await + self.request() + .feeder_gateway() + .get_class_hash_at() + .with_contract_address(contract_address) + .with_retry(Self::RETRY) + .get() + .await } /// Gets storage value associated with a `key` for a prticular contract. @@ -414,25 +226,15 @@ impl ClientApi for Client { key: StorageAddress, block_hash: BlockHashOrTag, ) -> Result { - use crate::rpc::serde::starkhash_to_dec_str; - - let (tag, hash) = block_hash_str(block_hash); - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_storage_at"], - &[ - ("contractAddress", &contract_addr.0.to_hex_str()), - ("key", &starkhash_to_dec_str(&key.0)), - (tag, &hash), - ], - )) - .send() - .await?; - parse::(resp).await - }) - .await + self.request() + .feeder_gateway() + .get_storage_at() + .with_contract_address(contract_addr) + .with_storage_address(key) + .with_block(block_hash) + .with_retry(Self::RETRY) + .get() + .await } /// Gets transaction by hash. @@ -441,18 +243,13 @@ impl ClientApi for Client { &self, transaction_hash: StarknetTransactionHash, ) -> Result { - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_transaction"], - &[("transactionHash", &transaction_hash.0.to_hex_str())], - )) - .send() - .await?; - parse(resp).await - }) - .await + self.request() + .feeder_gateway() + .get_transaction() + .with_transaction_hash(transaction_hash) + .with_retry(Self::RETRY) + .get() + .await } /// Gets transaction status by transaction hash. @@ -461,70 +258,35 @@ impl ClientApi for Client { &self, transaction_hash: StarknetTransactionHash, ) -> Result { - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_transaction_status"], - &[("transactionHash", &transaction_hash.0.to_hex_str())], - )) - .send() - .await?; - parse(resp).await - }) - .await - } - - /// Gets state update for a particular block hash. - #[tracing::instrument(skip(self))] - async fn state_update_by_hash( - &self, - block_hash: BlockHashOrTag, - ) -> Result { - let (tag, hash) = block_hash_str(block_hash); - retry(|| async { - let resp = self - .inner - .get(self.build_query(&["feeder_gateway", "get_state_update"], &[(tag, &hash)])) - .send() - .await?; - parse(resp).await - }) - .await + self.request() + .feeder_gateway() + .get_transaction_status() + .with_transaction_hash(transaction_hash) + .with_retry(Self::RETRY) + .get() + .await } - /// Gets state update for a particular block number. #[tracing::instrument(skip(self))] - async fn state_update_by_number( - &self, - block_number: BlockNumberOrTag, - ) -> Result { - retry(|| async { - let resp = self - .inner - .get(self.build_query( - &["feeder_gateway", "get_state_update"], - &[("blockNumber", &block_number_str(block_number))], - )) - .send() - .await?; - parse(resp).await - }) - .await + async fn state_update(&self, block: BlockId) -> Result { + self.request() + .feeder_gateway() + .get_state_update() + .with_block(block) + .with_retry(Self::RETRY) + .get() + .await } /// Gets addresses of the Ethereum contracts crucial to Starknet operation. #[tracing::instrument(skip(self))] async fn eth_contract_addresses(&self) -> Result { - retry(|| async { - let resp = self - .inner - .get(self.build_query(&["feeder_gateway", "get_contract_addresses"], &[])) - .send() - .await?; - parse(resp).await - }) - .await + self.request() + .feeder_gateway() + .get_contract_addresses() + .with_retry(Self::RETRY) + .get() + .await } /// Adds a transaction invoking a contract. @@ -550,13 +312,12 @@ impl ClientApi for Client { // This method is used to proxy an add transaction operation from the JSON-RPC // API to the sequencer. Retries should be implemented in the JSON-RPC // client instead. - let resp = self - .inner - .post(self.build_query(&["gateway", "add_transaction"], &[])) - .json(&req) - .send() - .await?; - parse(resp).await + self.request() + .gateway() + .add_transaction() + .with_retry(builder::Retry::Disabled) + .post_with_json(&req) + .await } /// Adds a transaction declaring a class. @@ -580,17 +341,19 @@ impl ClientApi for Client { nonce, version, }); - let mut url = self.build_query(&["gateway", "add_transaction"], &[]); - // this is an optional token currently required on mainnet - if let Some(token) = token { - url.query_pairs_mut().append_pair("token", &token); - } + // Note that we don't do retries here. // This method is used to proxy an add transaction operation from the JSON-RPC // API to the sequencer. Retries should be implemented in the JSON-RPC // client instead. - let resp = self.inner.post(url).json(&req).send().await?; - parse(resp).await + self.request() + .gateway() + .add_transaction() + // mainnet requires a token (but testnet does not so its optional). + .with_optional_token(token.as_deref()) + .with_retry(builder::Retry::Disabled) + .post_with_json(&req) + .await } /// Deploys a contract. @@ -608,17 +371,20 @@ impl ClientApi for Client { contract_definition, constructor_calldata, }); - let mut url = self.build_query(&["gateway", "add_transaction"], &[]); - // this is an optional token currently required on mainnet - if let Some(token) = token { - url.query_pairs_mut().append_pair("token", &token); - } + // Note that we don't do retries here. // This method is used to proxy an add transaction operation from the JSON-RPC // API to the sequencer. Retries should be implemented in the JSON-RPC // client instead. - let resp = self.inner.post(url).json(&req).send().await?; - parse(resp).await + + self.request() + .gateway() + .add_transaction() + // mainnet requires a token (but testnet does not so its optional). + .with_optional_token(token.as_deref()) + .with_retry(builder::Retry::Disabled) + .post_with_json(&req) + .await } } @@ -680,7 +446,10 @@ pub mod test_utils { #[cfg(test)] mod tests { use super::{error::StarknetErrorCode, test_utils::*, *}; - use crate::core::{StarknetBlockHash, StarknetBlockNumber}; + use crate::{ + core::{StarknetBlockHash, StarknetBlockNumber}, + rpc::types::Tag, + }; use assert_matches::assert_matches; use stark_hash::StarkHash; @@ -733,6 +502,8 @@ mod tests { /// The `message` field is always an empty string. /// The HTTP status code for this response is always `500` (`Internal Server Error`). fn into_response(self) -> (String, u16) { + use crate::sequencer::error::StarknetError; + let e = StarknetError { code: self, message: "".to_string(), @@ -770,12 +541,17 @@ mod tests { (None, Client::new(Chain::Goerli).unwrap()) } else { use warp::Filter; - let path = warp::any() - .and(warp::path::full()) - .and(warp::query::raw()) - .map(move |full_path: warp::path::FullPath, raw_query| { - let actual_full_path_and_query = - format!("{}?{}", full_path.as_str(), raw_query); + let opt_query_raw = warp::query::raw() + .map(Some) + .or_else(|_| async { Ok::<(Option,), std::convert::Infallible>((None,)) }); + let path = warp::any().and(warp::path::full()).and(opt_query_raw).map( + move |full_path: warp::path::FullPath, raw_query: Option| { + let actual_full_path_and_query = match raw_query { + Some(some_raw_query) => { + format!("{}?{}", full_path.as_str(), some_raw_query.as_str()) + } + None => full_path.as_str().to_owned(), + }; match url_paths_queries_and_response_fixtures .iter() @@ -793,7 +569,8 @@ mod tests { .collect::>() ), } - }); + }, + ); let (addr, serve_fut) = warp::serve(path).bind_ephemeral(([127, 0, 0, 1], 0)); let server_handle = tokio::spawn(serve_fut); @@ -845,12 +622,12 @@ mod tests { let url = Url::parse(&url).unwrap(); let client = Client::with_url(url).unwrap(); - let _ = client.block_by_hash(BlockHashOrTag::Tag(Tag::Latest)).await; + let _ = client.block(BlockId::Latest).await; shutdown_tx.send(()).unwrap(); server_handle.await.unwrap(); } - mod block_by_number_matches_by_hash_on { + mod block_matches_by_hash_on { use super::*; #[tokio::test] @@ -871,8 +648,14 @@ mod tests { response!("0.9.0/block/genesis.json"), ), ]); - let by_hash = client.block_by_hash(*GENESIS_BLOCK_HASH).await.unwrap(); - let by_number = client.block_by_number(*GENESIS_BLOCK_NUMBER).await.unwrap(); + let by_hash = client + .block(BlockId::from(*GENESIS_BLOCK_HASH)) + .await + .unwrap(); + let by_number = client + .block(BlockId::from(*GENESIS_BLOCK_NUMBER)) + .await + .unwrap(); assert_eq!(by_hash, by_number); } @@ -889,52 +672,51 @@ mod tests { ), ]); let by_hash = client - .block_by_hash(BlockHashOrTag::Hash( + .block( StarknetBlockHash::from_hex_str( "0x40ffdbd9abbc4fc64652c50db94a29bce65c183316f304a95df624de708e746", ) - .unwrap(), - )) + .unwrap() + .into(), + ) .await .unwrap(); let by_number = client - .block_by_number(BlockNumberOrTag::Number(StarknetBlockNumber(231579))) + .block(StarknetBlockNumber(231579).into()) .await .unwrap(); assert_eq!(by_hash, by_number); } } - mod block_by_hash { + mod block { use super::*; use pretty_assertions::assert_eq; #[tokio::test] async fn latest() { + use crate::core::BlockId; + let (_jh, client) = setup([( "/feeder_gateway/get_block?blockNumber=latest", response!("0.9.0/block/231579.json"), )]); - client - .block_by_hash(BlockHashOrTag::Tag(Tag::Latest)) - .await - .unwrap(); + client.block(BlockId::Latest).await.unwrap(); } #[tokio::test] async fn pending() { + use crate::core::BlockId; + let (_jh, client) = setup([( "/feeder_gateway/get_block?blockNumber=pending", response!("0.9.0/block/pending.json"), )]); - client - .block_by_hash(BlockHashOrTag::Tag(Tag::Pending)) - .await - .unwrap(); + client.block(BlockId::Pending).await.unwrap(); } #[test_log::test(tokio::test)] - async fn invalid() { + async fn invalid_hash() { let (_jh, client) = setup([( format!( "/feeder_gateway/get_block?blockHash={}", @@ -942,43 +724,18 @@ mod tests { ), StarknetErrorCode::BlockNotFound.into_response(), )]); - let error = client.block_by_hash(*INVALID_BLOCK_HASH).await.unwrap_err(); + let error = client + .block(BlockId::from(*INVALID_BLOCK_HASH)) + .await + .unwrap_err(); assert_matches!( error, SequencerError::StarknetError(e) => assert_eq!(e.code, StarknetErrorCode::BlockNotFound) ); } - } - - mod block_by_number { - use super::*; - - #[tokio::test] - async fn latest() { - let (_jh, client) = setup([( - "/feeder_gateway/get_block?blockNumber=latest", - response!("0.9.0/block/231579.json"), - )]); - client - .block_by_number(BlockNumberOrTag::Tag(Tag::Latest)) - .await - .unwrap(); - } - - #[tokio::test] - async fn pending() { - let (_jh, client) = setup([( - "/feeder_gateway/get_block?blockNumber=pending", - response!("0.9.0/block/pending.json"), - )]); - client - .block_by_number(BlockNumberOrTag::Tag(Tag::Pending)) - .await - .unwrap(); - } #[test_log::test(tokio::test)] - async fn invalid() { + async fn invalid_number() { let (_jh, client) = setup([( format!( "/feeder_gateway/get_block?blockNumber={}", @@ -987,7 +744,7 @@ mod tests { StarknetErrorCode::BlockNotFound.into_response(), )]); let error = client - .block_by_number(*INVALID_BLOCK_NUMBER) + .block(BlockId::from(*INVALID_BLOCK_NUMBER)) .await .unwrap_err(); assert_matches!( @@ -995,18 +752,6 @@ mod tests { SequencerError::StarknetError(e) => assert_eq!(e.code, StarknetErrorCode::BlockNotFound) ); } - - #[tokio::test] - async fn contains_receipts_without_status_field() { - let (_jh, client) = setup([( - "/feeder_gateway/get_block?blockNumber=1716", - response!("0.9.0/block/1716.json"), - )]); - client - .block_by_number(BlockNumberOrTag::Number(StarknetBlockNumber(1716))) - .await - .unwrap(); - } } mod call { @@ -1573,7 +1318,7 @@ mod tests { } } - mod state_update_by_number_matches_by_hash_on { + mod state_update_matches_by_hash_on { use super::{ reply::{ state_update::{Contract, StorageDiff}, @@ -1632,12 +1377,12 @@ mod tests { ), ]); let by_number: OrderedStateUpdate = client - .state_update_by_number(*GENESIS_BLOCK_NUMBER) + .state_update(BlockId::from(*GENESIS_BLOCK_NUMBER)) .await .unwrap() .into(); let by_hash: OrderedStateUpdate = client - .state_update_by_hash(*GENESIS_BLOCK_HASH) + .state_update(BlockId::from(*GENESIS_BLOCK_HASH)) .await .unwrap() .into(); @@ -1658,17 +1403,18 @@ mod tests { ), ]); let by_number: OrderedStateUpdate = client - .state_update_by_number(BlockNumberOrTag::Number(StarknetBlockNumber(231579))) + .state_update(StarknetBlockNumber(231579).into()) .await .unwrap() .into(); let by_hash: OrderedStateUpdate = client - .state_update_by_hash(BlockHashOrTag::Hash( + .state_update( StarknetBlockHash::from_hex_str( "0x40ffdbd9abbc4fc64652c50db94a29bce65c183316f304a95df624de708e746", ) - .unwrap(), - )) + .unwrap() + .into(), + ) .await .unwrap() .into(); @@ -1677,7 +1423,7 @@ mod tests { } } - mod state_update_by_number { + mod state_update { use super::*; #[test_log::test(tokio::test)] @@ -1690,7 +1436,7 @@ mod tests { StarknetErrorCode::BlockNotFound.into_response(), )]); let error = client - .state_update_by_number(*INVALID_BLOCK_NUMBER) + .state_update(BlockId::from(*INVALID_BLOCK_NUMBER)) .await .unwrap_err(); assert_matches!( @@ -1699,34 +1445,6 @@ mod tests { ); } - #[tokio::test] - async fn latest() { - let (_jh, client) = setup([( - "/feeder_gateway/get_state_update?blockNumber=latest", - response!("0.9.0/state_update/231579.json"), - )]); - client - .state_update_by_number(BlockNumberOrTag::Tag(Tag::Latest)) - .await - .unwrap(); - } - - #[tokio::test] - async fn pending() { - let (_jh, client) = setup([( - "/feeder_gateway/get_state_update?blockNumber=pending", - response!("0.9.0/state_update/pending.json"), - )]); - client - .state_update_by_number(BlockNumberOrTag::Tag(Tag::Pending)) - .await - .unwrap(); - } - } - - mod state_update_by_hash { - use super::*; - #[tokio::test] async fn invalid_hash() { let (_jh, client) = setup([( @@ -1737,7 +1455,7 @@ mod tests { StarknetErrorCode::BlockNotFound.into_response(), )]); let error = client - .state_update_by_hash(*INVALID_BLOCK_HASH) + .state_update(BlockId::from(*INVALID_BLOCK_HASH)) .await .unwrap_err(); assert_matches!( @@ -1752,10 +1470,7 @@ mod tests { "/feeder_gateway/get_state_update?blockNumber=latest", response!("0.9.0/state_update/231579.json"), )]); - client - .state_update_by_hash(BlockHashOrTag::Tag(Tag::Latest)) - .await - .unwrap(); + client.state_update(BlockId::Latest).await.unwrap(); } #[tokio::test] @@ -1764,17 +1479,14 @@ mod tests { "/feeder_gateway/get_state_update?blockNumber=pending", response!("0.9.0/state_update/pending.json"), )]); - client - .state_update_by_hash(BlockHashOrTag::Tag(Tag::Pending)) - .await - .unwrap(); + client.state_update(BlockId::Pending).await.unwrap(); } } #[tokio::test] async fn eth_contract_addresses() { let (_jh, client) = setup([( - "/feeder_gateway/get_contract_addresses?", + "/feeder_gateway/get_contract_addresses", ( r#"{"Starknet":"0xde29d060d45901fb19ed6c6e959eb22d8626708e","GpsStatementVerifier":"0xab43ba48c9edf4c2c4bb01237348d1d7b28ef168"}"#, 200, @@ -1799,7 +1511,7 @@ mod tests { // test with values dumped from `starknet invoke` for a test contract, // except for an invalid entry point value let (_jh, client) = setup([( - "/gateway/add_transaction?", + "/gateway/add_transaction", StarknetErrorCode::UnsupportedSelectorForFee.into_response(), )]); let error = client @@ -1861,7 +1573,7 @@ mod tests { #[tokio::test] async fn invoke_function() { let (_jh, client) = setup([( - "/gateway/add_transaction?", + "/gateway/add_transaction", ( r#"{"code":"TRANSACTION_RECEIVED","transaction_hash":"0x0389DD0629F42176CC8B6C43ACEFC0713D0064ECDFC0470E0FC179F53421A38B"}"#, 200, @@ -1947,7 +1659,7 @@ mod tests { let contract_class = get_contract_class_from_fixture(); let (_jh, client) = setup([( - "/gateway/add_transaction?", + "/gateway/add_transaction", ( r#"{"code": "TRANSACTION_RECEIVED", "transaction_hash": "0x77ccba4df42cf0f74a8eb59a96d7880fae371edca5d000ca5f9985652c8a8ed", @@ -1976,7 +1688,7 @@ mod tests { let contract_definition = get_contract_class_from_fixture(); let (_jh, client) = setup([( - "/gateway/add_transaction?", + "/gateway/add_transaction", ( r#"{"code":"TRANSACTION_RECEIVED","transaction_hash":"0x057ED4B4C76A1CA0BA044A654DD3EE2D0D3E550343D739350A22AACDD524110D", "address":"0x03926AEA98213EC34FE9783D803237D221C54C52344422E1F4942A5B340FA6AD"}"#, @@ -2145,147 +1857,4 @@ mod tests { } } } - - mod retry { - use super::{SequencerError, StarknetErrorCode}; - use assert_matches::assert_matches; - use http::{response::Builder, StatusCode}; - use pretty_assertions::assert_eq; - use std::{ - collections::VecDeque, convert::Infallible, net::SocketAddr, sync::Arc, time::Duration, - }; - use tokio::{sync::Mutex, task::JoinHandle}; - use warp::Filter; - - // A test helper - fn status_queue_server( - statuses: VecDeque<(StatusCode, &'static str)>, - ) -> (JoinHandle<()>, SocketAddr) { - use std::cell::RefCell; - - let statuses = Arc::new(Mutex::new(RefCell::new(statuses))); - let any = warp::any().and_then(move || { - let s = statuses.clone(); - async move { - let s = s.lock().await; - let s = s.borrow_mut().pop_front().unwrap(); - Result::<_, Infallible>::Ok(Builder::new().status(s.0).body(s.1)) - } - }); - - let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); - let server_handle = tokio::spawn(run_srv); - (server_handle, addr) - } - - // A test helper - fn slow_server() -> (tokio::task::JoinHandle<()>, std::net::SocketAddr) { - async fn slow() -> Result { - tokio::time::sleep(Duration::from_secs(1)).await; - Ok(Builder::new().status(200).body("")) - } - - let any = warp::any().and_then(slow); - let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); - let server_handle = tokio::spawn(run_srv); - (server_handle, addr) - } - - #[test_log::test(tokio::test)] - async fn stop_on_ok() { - let statuses = VecDeque::from([ - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - (StatusCode::GATEWAY_TIMEOUT, ""), - (StatusCode::OK, r#""Finally!""#), - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - ]); - - let (_jh, addr) = status_queue_server(statuses); - let result = super::retry0( - || async { - let mut url = reqwest::Url::parse("http://localhost/").unwrap(); - url.set_port(Some(addr.port())).unwrap(); - let resp = reqwest::get(url).await?; - super::parse::(resp).await - }, - super::retry_condition, - ) - .await - .unwrap(); - assert_eq!(result, "Finally!"); - } - - #[test_log::test(tokio::test)] - async fn stop_on_fatal() { - let statuses = VecDeque::from([ - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - (StatusCode::GATEWAY_TIMEOUT, ""), - ( - StatusCode::INTERNAL_SERVER_ERROR, - r#"{"code":"StarknetErrorCode.BLOCK_NOT_FOUND","message":""}"#, - ), - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - ]); - - let (_jh, addr) = status_queue_server(statuses); - let error = super::retry0( - || async { - let mut url = reqwest::Url::parse("http://localhost/").unwrap(); - url.set_port(Some(addr.port())).unwrap(); - let resp = reqwest::get(url).await?; - super::parse::(resp).await - }, - super::retry_condition, - ) - .await - .unwrap_err(); - assert_matches!( - error, - SequencerError::StarknetError(se) => assert_eq!(se.code, StarknetErrorCode::BlockNotFound) - ); - } - - #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn request_timeout() { - use std::sync::atomic::{AtomicUsize, Ordering}; - - let (_jh, addr) = slow_server(); - static CNT: AtomicUsize = AtomicUsize::new(0); - - let fut = super::retry0( - || async { - let mut url = reqwest::Url::parse("http://localhost/").unwrap(); - url.set_port(Some(addr.port())).unwrap(); - - let client = reqwest::Client::builder().build().unwrap(); - - CNT.fetch_add(1, Ordering::Relaxed); - - // This is the same as using Client::builder().timeout() - let resp = client - .get(url) - .timeout(Duration::from_millis(1)) - .send() - .await?; - super::parse::(resp).await - }, - super::retry_condition, - ); - - // The retry loops forever, so wrap it in a timeout and check the counter. - tokio::time::timeout(Duration::from_millis(250), fut) - .await - .unwrap_err(); - // 4th try should have timedout if this is really exponential backoff - assert_eq!(CNT.load(Ordering::Relaxed), 4); - } - } } diff --git a/crates/pathfinder/src/sequencer/builder.rs b/crates/pathfinder/src/sequencer/builder.rs new file mode 100644 index 0000000000..8ccb54c86c --- /dev/null +++ b/crates/pathfinder/src/sequencer/builder.rs @@ -0,0 +1,561 @@ +//! Provides a builder API for creating and sending Sequencer REST requests. +//! +//! This builder utilises a type state builder pattern with generics to only allow valid operations at each stage of the build process. +//! Each stage is consumed to generate the next stage and the final stage executes the query. +//! +//! Here is an overview of the five builder stages. +//! +//! 1. [Init](stage::Init) which provides the entry point of the [builder](Request). +//! 2. [Gateway](stage::Gateway) where you select between the read and write gateways. +//! 3. [Method](stage::Method) where you select the REST API method. +//! 4. [Params](stage::Params) where you select the retry behavior. +//! 5. [Final](stage::Final) where you select the REST operation type, which is then executed. + +use crate::{ + core::{ClassHash, ContractAddress, StarknetTransactionHash, StorageAddress}, + sequencer::error::SequencerError, +}; + +/// A Sequencer Request builder. +pub struct Request<'a, S: RequestState> { + state: S, + url: reqwest::Url, + client: &'a reqwest::Client, +} + +/// Describes the retry behavior of a [Request] and is specified using +#[allow(dead_code)] +pub enum Retry { + Enabled, + Disabled, +} + +pub mod stage { + /// Provides the [builder](super::Request::builder) entry-point. + pub struct Init; + + /// Select between the [read](super::Request::feeder_gateway) and [write](super::Request::gateway) Sequencer gateways. + pub struct Gateway; + + /// Select the Sequencer API method to call: + /// - [add_transaction](super::Request::add_transaction) + /// - [call_contract](super::Request::call_contract) + /// - [get_block](super::Request::get_block) + /// - [get_full_contract](super::Request::get_full_contract) + /// - [get_class_by_hash](super::Request::get_class_by_hash) + /// - [get_class_hash_at](super::Request::get_class_hash_at) + /// - [get_storage_at](super::Request::get_storage_at) + /// - [get_transaction](super::Request::get_transaction) + /// - [get_transaction_status](super::Request::get_transaction_status) + /// - [get_state_update](super::Request::get_state_update) + /// - [get_contract_addresses](super::Request::get_contract_addresses) + pub struct Method; + + /// Specify the request parameters: + /// - [at_block](super::Request::with_block) + /// - [with_contract_address](super::Request::with_contract_address) + /// - [with_class_hash](super::Request::with_class_hash) + /// - [with_optional_token](super::Request::with_optional_token) + /// - [with_storage_address](super::Request::with_storage_address) + /// - [with_transaction_hash](super::Request::with_transaction_hash) + /// - [add_param](super::Request::add_param) (allows adding custom (name, value) parameter) + /// + /// and then specify the [retry behavior](super::Request::with_retry). + pub struct Params; + + /// Specify the REST operation send the request: + /// - [get](super::Request::get) + /// - [get_as_bytes](super::Request::get_as_bytes) + /// - [post_with_json](super::Request::post_with_json) + pub struct Final { + pub retry: super::Retry, + } + + impl super::RequestState for Init {} + impl super::RequestState for Gateway {} + impl super::RequestState for Method {} + impl super::RequestState for Params {} + impl super::RequestState for Final {} +} + +impl<'a> Request<'a, stage::Init> { + /// Initialize a [Request] builder. + pub fn builder(client: &'a reqwest::Client, url: reqwest::Url) -> Request<'a, stage::Gateway> { + Request { + url, + client, + state: stage::Gateway, + } + } +} + +impl<'a> Request<'a, stage::Gateway> { + /// The Sequencer write gateway, typically only used for submitting StarkNet transactions. + pub fn gateway(self) -> Request<'a, stage::Method> { + self.with_gateway("gateway") + } + + /// The Sequencer read gateway, used for all queries which are not submitting transactions. + pub fn feeder_gateway(self) -> Request<'a, stage::Method> { + self.with_gateway("feeder_gateway") + } + + fn with_gateway(mut self, gateway: &str) -> Request<'a, stage::Method> { + self.url + .path_segments_mut() + .expect("Base URL is valid") + .push(gateway); + Request { + url: self.url, + client: self.client, + state: stage::Method, + } + } +} + +impl<'a> Request<'a, stage::Method> { + pub fn add_transaction(self) -> Request<'a, stage::Params> { + self.with_method("add_transaction") + } + + pub fn call_contract(self) -> Request<'a, stage::Params> { + self.with_method("call_contract") + } + + pub fn get_block(self) -> Request<'a, stage::Params> { + self.with_method("get_block") + } + + pub fn get_full_contract(self) -> Request<'a, stage::Params> { + self.with_method("get_full_contract") + } + + pub fn get_class_by_hash(self) -> Request<'a, stage::Params> { + self.with_method("get_class_by_hash") + } + + pub fn get_class_hash_at(self) -> Request<'a, stage::Params> { + self.with_method("get_class_hash_at") + } + + pub fn get_storage_at(self) -> Request<'a, stage::Params> { + self.with_method("get_storage_at") + } + + pub fn get_transaction(self) -> Request<'a, stage::Params> { + self.with_method("get_transaction") + } + + pub fn get_transaction_status(self) -> Request<'a, stage::Params> { + self.with_method("get_transaction_status") + } + + pub fn get_state_update(self) -> Request<'a, stage::Params> { + self.with_method("get_state_update") + } + + pub fn get_contract_addresses(self) -> Request<'a, stage::Params> { + self.with_method("get_contract_addresses") + } + + /// Appends the given method to the request url. + fn with_method(mut self, method: &str) -> Request<'a, stage::Params> { + self.url + .path_segments_mut() + .expect("Base URL is valid") + .push(method); + + Request { + url: self.url, + client: self.client, + state: stage::Params, + } + } +} + +impl<'a> Request<'a, stage::Params> { + pub fn with_block>(self, block: B) -> Self { + use crate::core::BlockId; + use std::borrow::Cow; + + let block: BlockId = block.into(); + let (name, value) = match block { + BlockId::Number(number) => ("blockNumber", Cow::from(number.0.to_string())), + BlockId::Hash(hash) => ("blockHash", hash.0.to_hex_str()), + // These have to use "blockNumber", "blockHash" does not accept tags. + BlockId::Latest => ("blockNumber", Cow::from("latest")), + BlockId::Pending => ("blockNumber", Cow::from("pending")), + }; + + self.add_param(name, &value) + } + + pub fn with_contract_address(self, address: ContractAddress) -> Self { + self.add_param("contractAddress", &address.0.to_hex_str()) + } + + pub fn with_class_hash(self, class_hash: ClassHash) -> Self { + self.add_param("classHash", &class_hash.0.to_hex_str()) + } + + pub fn with_optional_token(self, token: Option<&str>) -> Self { + match token { + Some(token) => self.add_param("token", token), + None => self, + } + } + + pub fn with_storage_address(self, address: StorageAddress) -> Self { + use crate::rpc::serde::starkhash_to_dec_str; + self.add_param("key", &starkhash_to_dec_str(&address.0)) + } + + pub fn with_transaction_hash(self, hash: StarknetTransactionHash) -> Self { + self.add_param("transactionHash", &hash.0.to_hex_str()) + } + + pub fn add_param(mut self, name: &str, value: &str) -> Self { + self.url.query_pairs_mut().append_pair(name, value); + self + } + + /// Sets the request retry behavior. + pub fn with_retry(self, retry: Retry) -> Request<'a, stage::Final> { + Request { + url: self.url, + client: self.client, + state: stage::Final { retry }, + } + } +} + +impl<'a> Request<'a, stage::Final> { + /// Sends the Sequencer request as a REST `GET` operation and parses the response into `T`. + pub async fn get(self) -> Result + where + T: serde::de::DeserializeOwned, + { + async fn send_request( + url: reqwest::Url, + client: &reqwest::Client, + ) -> Result { + let response = client.get(url).send().await?; + parse::(response).await + } + + match self.state.retry { + Retry::Disabled => send_request(self.url, self.client).await, + Retry::Enabled => { + retry0( + || async { + let clone_url = self.url.clone(); + send_request(clone_url, self.client).await + }, + retry_condition, + ) + .await + } + } + } + + /// Sends the Sequencer request as a REST `GET` operation and returns the response's bytes. + pub async fn get_as_bytes(self) -> Result { + async fn get_as_bytes_inner( + url: reqwest::Url, + client: &reqwest::Client, + ) -> Result { + let response = client.get(url).send().await?; + let response = parse_raw(response).await?; + let bytes = response.bytes().await?; + Ok(bytes) + } + + match self.state.retry { + Retry::Disabled => get_as_bytes_inner(self.url, self.client).await, + Retry::Enabled => { + retry0( + || async { + let clone_url = self.url.clone(); + get_as_bytes_inner(clone_url, self.client).await + }, + retry_condition, + ) + .await + } + } + } + + /// Sends the Sequencer request as a REST `POST` operation, in addition to the specified + /// JSON body. The response is parsed as type `T`. + pub async fn post_with_json(self, json: &J) -> Result + where + T: serde::de::DeserializeOwned, + J: serde::Serialize + ?Sized, + { + async fn post_with_json_inner( + url: reqwest::Url, + client: &reqwest::Client, + json: &J, + ) -> Result + where + T: serde::de::DeserializeOwned, + J: serde::Serialize + ?Sized, + { + let response = client.post(url).json(json).send().await?; + parse::(response).await + } + + match self.state.retry { + Retry::Disabled => post_with_json_inner(self.url, self.client, json).await, + Retry::Enabled => { + retry0( + || async { + let clone_url = self.url.clone(); + post_with_json_inner(clone_url, self.client, json).await + }, + retry_condition, + ) + .await + } + } + } +} + +async fn parse(response: reqwest::Response) -> Result +where + T: ::serde::de::DeserializeOwned, +{ + let response = parse_raw(response).await?; + // Attempt to deserialize the actual data we are looking for + let response = response.json::().await?; + Ok(response) +} + +/// Helper function which allows skipping deserialization when required. +async fn parse_raw(response: reqwest::Response) -> Result { + use crate::sequencer::error::StarknetError; + + // Starknet specific errors end with a 500 status code + // but the body contains a JSON object with the error description + if dbg!(response.status()) == reqwest::StatusCode::INTERNAL_SERVER_ERROR { + let starknet_error = response.json::().await?; + return Err(SequencerError::StarknetError(starknet_error)); + } + // Status codes 400..499 and 501..599 are mapped to SequencerError::TransportError + response.error_for_status_ref().map(|_| ())?; + Ok(response) +} + +pub trait RequestState {} + +/// Wrapper function to allow retrying sequencer queries in an exponential manner. +async fn retry0( + future_factory: FutureFactory, + retry_condition: Ret, +) -> Result +where + Fut: futures::Future>, + FutureFactory: FnMut() -> Fut, + Ret: FnMut(&SequencerError) -> bool, +{ + use crate::retry::Retry; + use std::num::NonZeroU64; + + Retry::exponential(future_factory, NonZeroU64::new(2).unwrap()) + .factor(NonZeroU64::new(15).unwrap()) + .max_delay(std::time::Duration::from_secs(60 * 60)) + .when(retry_condition) + .await +} + +/// Determines if an error is retryable or not. +fn retry_condition(e: &SequencerError) -> bool { + use reqwest::StatusCode; + use tracing::{debug, error, info, warn}; + + match e { + SequencerError::ReqwestError(e) => { + if e.is_body() || e.is_connect() || e.is_timeout() { + info!(reason=%e, "Request failed, retrying"); + } else if e.is_status() { + match e.status() { + Some( + StatusCode::NOT_FOUND + | StatusCode::TOO_MANY_REQUESTS + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT, + ) => { + debug!(reason=%e, "Request failed, retrying"); + } + Some(StatusCode::INTERNAL_SERVER_ERROR) => { + error!(reason=%e, "Request failed, retrying"); + } + Some(_) => warn!(reason=%e, "Request failed, retrying"), + None => unreachable!(), + } + } else if e.is_decode() { + error!(reason=%e, "Request failed, retrying"); + } else { + warn!(reason=%e, "Request failed, retrying"); + } + + true + } + SequencerError::StarknetError(_) => false, + } +} + +#[cfg(test)] +mod tests { + mod retry { + use assert_matches::assert_matches; + use http::{response::Builder, StatusCode}; + use pretty_assertions::assert_eq; + use std::{ + collections::VecDeque, convert::Infallible, net::SocketAddr, sync::Arc, time::Duration, + }; + use tokio::{sync::Mutex, task::JoinHandle}; + use warp::Filter; + + use crate::sequencer::builder::{retry0, retry_condition}; + + // A test helper + fn status_queue_server( + statuses: VecDeque<(StatusCode, &'static str)>, + ) -> (JoinHandle<()>, SocketAddr) { + use std::cell::RefCell; + + let statuses = Arc::new(Mutex::new(RefCell::new(statuses))); + let any = warp::any().and_then(move || { + let s = statuses.clone(); + async move { + let s = s.lock().await; + let s = s.borrow_mut().pop_front().unwrap(); + Result::<_, Infallible>::Ok(Builder::new().status(s.0).body(s.1)) + } + }); + + let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); + let server_handle = tokio::spawn(run_srv); + (server_handle, addr) + } + + // A test helper + fn slow_server() -> (tokio::task::JoinHandle<()>, std::net::SocketAddr) { + async fn slow() -> Result { + tokio::time::sleep(Duration::from_secs(1)).await; + Ok(Builder::new().status(200).body("")) + } + + let any = warp::any().and_then(slow); + let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); + let server_handle = tokio::spawn(run_srv); + (server_handle, addr) + } + + #[test_log::test(tokio::test)] + async fn stop_on_ok() { + use crate::sequencer::builder; + + let statuses = VecDeque::from([ + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + (StatusCode::GATEWAY_TIMEOUT, ""), + (StatusCode::OK, r#""Finally!""#), + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + ]); + + let (_jh, addr) = status_queue_server(statuses); + let result = retry0( + || async { + let mut url = reqwest::Url::parse("http://localhost/").unwrap(); + url.set_port(Some(addr.port())).unwrap(); + let response = reqwest::get(url).await?; + builder::parse::(response).await + }, + retry_condition, + ) + .await + .unwrap(); + assert_eq!(result, "Finally!"); + } + + #[test_log::test(tokio::test)] + async fn stop_on_fatal() { + use crate::sequencer::builder; + use crate::sequencer::error::{SequencerError, StarknetErrorCode}; + + let statuses = VecDeque::from([ + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + (StatusCode::GATEWAY_TIMEOUT, ""), + ( + StatusCode::INTERNAL_SERVER_ERROR, + r#"{"code":"StarknetErrorCode.BLOCK_NOT_FOUND","message":""}"#, + ), + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + ]); + + let (_jh, addr) = status_queue_server(statuses); + let error = retry0( + || async { + let mut url = reqwest::Url::parse("http://localhost/").unwrap(); + url.set_port(Some(addr.port())).unwrap(); + let response = reqwest::get(url).await?; + builder::parse::(response).await + }, + retry_condition, + ) + .await + .unwrap_err(); + assert_matches!( + error, + SequencerError::StarknetError(se) => assert_eq!(se.code, StarknetErrorCode::BlockNotFound) + ); + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn request_timeout() { + use crate::sequencer::builder; + + use std::sync::atomic::{AtomicUsize, Ordering}; + + let (_jh, addr) = slow_server(); + static CNT: AtomicUsize = AtomicUsize::new(0); + + let fut = retry0( + || async { + let mut url = reqwest::Url::parse("http://localhost/").unwrap(); + url.set_port(Some(addr.port())).unwrap(); + + let client = reqwest::Client::builder().build().unwrap(); + + CNT.fetch_add(1, Ordering::Relaxed); + + // This is the same as using Client::builder().timeout() + let response = client + .get(url) + .timeout(Duration::from_millis(1)) + .send() + .await?; + builder::parse::(response).await + }, + retry_condition, + ); + + // The retry loops forever, so wrap it in a timeout and check the counter. + tokio::time::timeout(Duration::from_millis(250), fut) + .await + .unwrap_err(); + // 4th try should have timedout if this is really exponential backoff + assert_eq!(CNT.load(Ordering::Relaxed), 4); + } + } +} diff --git a/crates/pathfinder/src/sequencer/reply.rs b/crates/pathfinder/src/sequencer/reply.rs index 0f69719905..97a66c91f9 100644 --- a/crates/pathfinder/src/sequencer/reply.rs +++ b/crates/pathfinder/src/sequencer/reply.rs @@ -9,8 +9,7 @@ use crate::{ use serde::Deserialize; use serde_with::serde_as; -/// Used to deserialize replies to [ClientApi::block_by_hash](crate::sequencer::ClientApi::block_by_hash) and -/// [ClientApi::block_by_number](crate::sequencer::ClientApi::block_by_number). +/// Used to deserialize replies to [ClientApi::block](crate::sequencer::ClientApi::block). #[serde_as] #[derive(Clone, Debug, Deserialize, PartialEq)] #[cfg_attr(test, derive(serde::Serialize))] @@ -303,7 +302,7 @@ pub mod transaction { } /// Used to deserialize a reply from -/// [ClientApi::state_update_by_hash](crate::sequencer::ClientApi::state_update_by_hash). +/// [ClientApi::state_update](crate::sequencer::ClientApi::state_update). #[derive(Clone, Debug, Deserialize, PartialEq)] #[serde(deny_unknown_fields)] pub struct StateUpdate { diff --git a/crates/pathfinder/src/state/sync.rs b/crates/pathfinder/src/state/sync.rs index 614784445a..6be1e9dfca 100644 --- a/crates/pathfinder/src/state/sync.rs +++ b/crates/pathfinder/src/state/sync.rs @@ -325,16 +325,14 @@ async fn update_sync_status_latest( starting_block_num: StarknetBlockNumber, chain: Chain, ) -> anyhow::Result<()> { - use crate::rpc::types::{BlockNumberOrTag, Tag}; + use crate::core::BlockId; + let poll_interval = head_poll_interval(chain); let starting = NumberedBlock::from((starting_block_hash, starting_block_num)); loop { - match sequencer - .block_by_number(BlockNumberOrTag::Tag(Tag::Latest)) - .await - { + match sequencer.block(BlockId::Latest).await { Ok(block) => { let latest = { let latest_hash = block.block_hash.unwrap(); @@ -631,7 +629,7 @@ mod tests { TransactionVersion, }, ethereum, - rpc::types::{BlockHashOrTag, BlockNumberOrTag}, + rpc::types::BlockHashOrTag, sequencer::{ self, error::SequencerError, @@ -690,15 +688,11 @@ mod tests { #[async_trait::async_trait] impl sequencer::ClientApi for FakeSequencer { - async fn block_by_number( - &self, - _: BlockNumberOrTag, - ) -> Result { - Ok(BLOCK0.clone()) - } - - async fn block_by_hash(&self, _: BlockHashOrTag) -> Result { - unimplemented!() + async fn block(&self, block: crate::core::BlockId) -> Result { + match block { + crate::core::BlockId::Number(_) => Ok(BLOCK0.clone()), + _ => unimplemented!(), + } } async fn call( @@ -744,16 +738,9 @@ mod tests { unimplemented!() } - async fn state_update_by_hash( - &self, - _: BlockHashOrTag, - ) -> Result { - unimplemented!() - } - - async fn state_update_by_number( + async fn state_update( &self, - _: BlockNumberOrTag, + _: crate::core::BlockId, ) -> Result { unimplemented!() } diff --git a/crates/pathfinder/src/state/sync/l2.rs b/crates/pathfinder/src/state/sync/l2.rs index 30eac45730..a3a28919c3 100644 --- a/crates/pathfinder/src/state/sync/l2.rs +++ b/crates/pathfinder/src/state/sync/l2.rs @@ -6,7 +6,6 @@ use tokio::sync::{mpsc, oneshot}; use crate::core::{ClassHash, StarknetBlockHash, StarknetBlockNumber}; use crate::ethereum::state_update::{ContractUpdate, DeployedContract, StateUpdate, StorageUpdate}; -use crate::rpc::types::{BlockNumberOrTag, Tag}; use crate::sequencer::error::SequencerError; use crate::sequencer::reply::state_update::{Contract, StateDiff}; use crate::sequencer::reply::Block; @@ -97,7 +96,7 @@ pub async fn sync( let block_hash = block.block_hash.unwrap(); let t_update = std::time::Instant::now(); let state_update = sequencer - .state_update_by_hash(block_hash.into()) + .state_update(block_hash.into()) .await .with_context(|| format!("Fetch state diff for block {:?} from sequencer", next))?; let state_update_block_hash = state_update.block_hash.unwrap(); @@ -180,9 +179,10 @@ async fn download_block( prev_block_hash: Option, sequencer: &impl sequencer::ClientApi, ) -> anyhow::Result { + use crate::core::BlockId; use sequencer::error::StarknetErrorCode::BlockNotFound; - let result = sequencer.block_by_number(block_number.into()).await; + let result = sequencer.block(block_number.into()).await; match result { Ok(block) => Ok(DownloadBlock::Block(Box::new(block))), @@ -191,7 +191,7 @@ async fn download_block( // a reorg hasn't put us too far in the future. This does run into race conditions with // the sequencer but this is the best we can do I think. let latest = sequencer - .block_by_number(BlockNumberOrTag::Tag(Tag::Latest)) + .block(BlockId::Latest) .await .context("Query sequencer for latest block")?; @@ -387,12 +387,11 @@ mod tests { use super::super::{sync, Event}; use crate::{ core::{ - ClassHash, ContractAddress, GasPrice, GlobalRoot, SequencerAddress, + BlockId, ClassHash, ContractAddress, GasPrice, GlobalRoot, SequencerAddress, StarknetBlockHash, StarknetBlockNumber, StarknetBlockTimestamp, StorageAddress, StorageValue, }, ethereum::state_update, - rpc::types::{BlockHashOrTag, BlockNumberOrTag, Tag}, sequencer::{ error::{SequencerError, StarknetError, StarknetErrorCode}, reply, MockClientApi, @@ -670,24 +669,13 @@ mod tests { fn expect_block( mock: &mut MockClientApi, seq: &mut mockall::Sequence, - block_number: StarknetBlockNumber, + block: BlockId, returned_result: Result, ) { - mock.expect_block_by_number() - .withf(move |x| x == &BlockNumberOrTag::Number(block_number)) - .times(1) - .in_sequence(seq) - .return_once(move |_| returned_result); - } + use mockall::predicate::eq; - /// Convenience wrapper - fn expect_latest_block( - mock: &mut MockClientApi, - seq: &mut mockall::Sequence, - returned_result: Result, - ) { - mock.expect_block_by_number() - .withf(move |x| x == &BlockNumberOrTag::Tag(Tag::Latest)) + mock.expect_block() + .with(eq(block)) .times(1) .in_sequence(seq) .return_once(move |_| returned_result); @@ -697,11 +685,13 @@ mod tests { fn expect_state_update( mock: &mut MockClientApi, seq: &mut mockall::Sequence, - block_hash: StarknetBlockHash, + block: BlockId, returned_result: Result, ) { - mock.expect_state_update_by_hash() - .withf(move |x| x == &BlockHashOrTag::Hash(block_hash)) + use mockall::predicate::eq; + + mock.expect_state_update() + .with(eq(block)) .times(1) .in_sequence(seq) .return_once(|_| returned_result); @@ -741,8 +731,18 @@ mod tests { let mut seq = mockall::Sequence::new(); // Downlad the genesis block with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK0_HASH, Ok(STATE_UPDATE0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK0_HASH).into(), + Ok(STATE_UPDATE0.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -750,8 +750,18 @@ mod tests { Ok(CONTRACT0_DEF.clone()), ); // Downlad block #1 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(BLOCK1.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK1_HASH, Ok(STATE_UPDATE1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(BLOCK1.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK1_HASH).into(), + Ok(STATE_UPDATE1.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -759,8 +769,13 @@ mod tests { Ok(CONTRACT1_DEF.clone()), ); // Stay at head, no more blocks available - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Err(block_not_found())); - expect_latest_block(&mut mock, &mut seq, Ok(BLOCK1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Err(block_not_found()), + ); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(BLOCK1.clone())); // Let's run the UUT let _jh = tokio::spawn(sync(tx_event, mock, None, Chain::Goerli)); @@ -809,8 +824,18 @@ mod tests { let mut seq = mockall::Sequence::new(); // Start with downloading block #1 - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(BLOCK1.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK1_HASH, Ok(STATE_UPDATE1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(BLOCK1.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK1_HASH).into(), + Ok(STATE_UPDATE1.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -819,8 +844,13 @@ mod tests { ); // Stay at head, no more blocks available - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Err(block_not_found())); - expect_latest_block(&mut mock, &mut seq, Ok(BLOCK1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Err(block_not_found()), + ); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(BLOCK1.clone())); // Let's run the UUT let _jh = tokio::spawn(sync( @@ -878,8 +908,18 @@ mod tests { let mut seq = mockall::Sequence::new(); // Fetch the genesis block with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK0_HASH, Ok(STATE_UPDATE0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK0_HASH).into(), + Ok(STATE_UPDATE0.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -888,19 +928,29 @@ mod tests { ); // Block #1 is not there - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Err(block_not_found())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Err(block_not_found()), + ); // L2 sync task is then looking if reorg occured // We indicate that reorg started at genesis - expect_latest_block(&mut mock, &mut seq, Ok(BLOCK0_V2.clone())); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(BLOCK0_V2.clone())); // Finally the L2 sync task is downloading the new genesis block // from the fork with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0_V2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0_V2.clone()), + ); expect_state_update( &mut mock, &mut seq, - *BLOCK0_HASH_V2, + (*BLOCK0_HASH_V2).into(), Ok(STATE_UPDATE0_V2.clone()), ); expect_full_contract( @@ -911,10 +961,15 @@ mod tests { ); // Indicate that we are still staying at the head - no new blocks - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Err(block_not_found())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Err(block_not_found()), + ); // Indicate that we are still staying at the head - the latest block matches our head - expect_latest_block(&mut mock, &mut seq, Ok(BLOCK0_V2.clone())); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(BLOCK0_V2.clone())); // Let's run the UUT let _jh = tokio::spawn(sync(tx_event, mock, None, Chain::Goerli)); @@ -995,8 +1050,18 @@ mod tests { }; // Fetch the genesis block with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK0_HASH, Ok(STATE_UPDATE0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK0_HASH).into(), + Ok(STATE_UPDATE0.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1004,8 +1069,18 @@ mod tests { Ok(CONTRACT0_DEF.clone()), ); // Fetch block #1 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(BLOCK1.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK1_HASH, Ok(STATE_UPDATE1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(BLOCK1.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK1_HASH).into(), + Ok(STATE_UPDATE1.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1013,27 +1088,57 @@ mod tests { Ok(CONTRACT1_DEF.clone()), ); // Fetch block #2 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(BLOCK2.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK2_HASH, Ok(STATE_UPDATE2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(BLOCK2.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK2_HASH).into(), + Ok(STATE_UPDATE2.clone()), + ); // Block #3 is not there - expect_block(&mut mock, &mut seq, BLOCK3_NUMBER, Err(block_not_found())); + expect_block( + &mut mock, + &mut seq, + BLOCK3_NUMBER.into(), + Err(block_not_found()), + ); // L2 sync task is then looking if reorg occured // We indicate that reorg started at genesis by setting the latest on the new genesis block - expect_latest_block(&mut mock, &mut seq, Ok(BLOCK0_V2.clone())); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(BLOCK0_V2.clone())); // Then the L2 sync task goes back block by block to find the last block where the block hash matches the DB - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(block1_v2.clone())); - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0_V2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(block1_v2.clone()), + ); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0_V2.clone()), + ); // Once the L2 sync task has found where reorg occured, // it can get back to downloading the new blocks // Fetch the new genesis block from the fork with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0_V2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0_V2.clone()), + ); expect_state_update( &mut mock, &mut seq, - *BLOCK0_HASH_V2, + (*BLOCK0_HASH_V2).into(), Ok(STATE_UPDATE0_V2.clone()), ); expect_full_contract( @@ -1043,18 +1148,28 @@ mod tests { Ok(CONTRACT0_DEF_V2.clone()), ); // Fetch the new block #1 from the fork with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(block1_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(block1_v2.clone()), + ); expect_state_update( &mut mock, &mut seq, - *BLOCK1_HASH_V2, + (*BLOCK1_HASH_V2).into(), Ok(STATE_UPDATE1_V2.clone()), ); // Indicate that we are still staying at the head // No new blocks found and the latest block matches our head - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Err(block_not_found())); - expect_latest_block(&mut mock, &mut seq, Ok(block1_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Err(block_not_found()), + ); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(block1_v2.clone())); // Run the UUT let _jh = tokio::spawn(sync(tx_event, mock, None, Chain::Goerli)); @@ -1205,8 +1320,18 @@ mod tests { }; // Fetch the genesis block with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK0_HASH, Ok(STATE_UPDATE0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK0_HASH).into(), + Ok(STATE_UPDATE0.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1214,8 +1339,18 @@ mod tests { Ok(CONTRACT0_DEF.clone()), ); // Fetch block #1 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(BLOCK1.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK1_HASH, Ok(STATE_UPDATE1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(BLOCK1.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK1_HASH).into(), + Ok(STATE_UPDATE1.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1223,44 +1358,99 @@ mod tests { Ok(CONTRACT1_DEF.clone()), ); // Fetch block #2 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(BLOCK2.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK2_HASH, Ok(STATE_UPDATE2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(BLOCK2.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK2_HASH).into(), + Ok(STATE_UPDATE2.clone()), + ); // Fetch block #3 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK3_NUMBER, Ok(block3.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK3_HASH, Ok(STATE_UPDATE3.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK3_NUMBER.into(), + Ok(block3.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK3_HASH).into(), + Ok(STATE_UPDATE3.clone()), + ); // Block #4 is not there - expect_block(&mut mock, &mut seq, BLOCK4_NUMBER, Err(block_not_found())); + expect_block( + &mut mock, + &mut seq, + BLOCK4_NUMBER.into(), + Err(block_not_found()), + ); // L2 sync task is then looking if reorg occured // We indicate that reorg started at block #1 - expect_latest_block(&mut mock, &mut seq, Ok(block1_v2.clone())); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(block1_v2.clone())); // L2 sync task goes back block by block to find where the block hash matches the DB - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(block2_v2.clone())); - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(block1_v2.clone())); - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(block2_v2.clone()), + ); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(block1_v2.clone()), + ); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); // Finally the L2 sync task is downloading the new blocks once it knows where to start again // Fetch the new block #1 from the fork with respective state update - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(block1_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(block1_v2.clone()), + ); expect_state_update( &mut mock, &mut seq, - *BLOCK1_HASH_V2, + (*BLOCK1_HASH_V2).into(), Ok(STATE_UPDATE1_V2.clone()), ); // Fetch the new block #2 from the fork with respective state update - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(block2_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(block2_v2.clone()), + ); expect_state_update( &mut mock, &mut seq, - *BLOCK2_HASH_V2, + (*BLOCK2_HASH_V2).into(), Ok(STATE_UPDATE2_V2.clone()), ); // Indicate that we are still staying at the head - no new blocks and the latest block matches our head - expect_block(&mut mock, &mut seq, BLOCK3_NUMBER, Err(block_not_found())); - expect_latest_block(&mut mock, &mut seq, Ok(block2_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK3_NUMBER.into(), + Err(block_not_found()), + ); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(block2_v2.clone())); // Run the UUT let _jh = tokio::spawn(sync(tx_event, mock, None, Chain::Goerli)); @@ -1375,8 +1565,18 @@ mod tests { }; // Fetch the genesis block with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK0_HASH, Ok(STATE_UPDATE0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK0_HASH).into(), + Ok(STATE_UPDATE0.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1384,8 +1584,18 @@ mod tests { Ok(CONTRACT0_DEF.clone()), ); // Fetch block #1 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(BLOCK1.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK1_HASH, Ok(STATE_UPDATE1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(BLOCK1.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK1_HASH).into(), + Ok(STATE_UPDATE1.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1393,31 +1603,61 @@ mod tests { Ok(CONTRACT1_DEF.clone()), ); // Fetch block #2 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(BLOCK2.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK2_HASH, Ok(STATE_UPDATE2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(BLOCK2.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK2_HASH).into(), + Ok(STATE_UPDATE2.clone()), + ); // Block #3 is not there - expect_block(&mut mock, &mut seq, BLOCK3_NUMBER, Err(block_not_found())); + expect_block( + &mut mock, + &mut seq, + BLOCK3_NUMBER.into(), + Err(block_not_found()), + ); // L2 sync task is then looking if reorg occured // We indicate that reorg started at block #2 - expect_latest_block(&mut mock, &mut seq, Ok(block2_v2.clone())); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(block2_v2.clone())); // L2 sync task goes back block by block to find where the block hash matches the DB - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(BLOCK1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(BLOCK1.clone()), + ); // Finally the L2 sync task is downloading the new blocks once it knows where to start again // Fetch the new block #2 from the fork with respective state update - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(block2_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(block2_v2.clone()), + ); expect_state_update( &mut mock, &mut seq, - *BLOCK2_HASH_V2, + (*BLOCK2_HASH_V2).into(), Ok(STATE_UPDATE2_V2.clone()), ); // Indicate that we are still staying at the head - no new blocks and the latest block matches our head - expect_block(&mut mock, &mut seq, BLOCK3_NUMBER, Err(block_not_found())); - expect_latest_block(&mut mock, &mut seq, Ok(block2_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK3_NUMBER.into(), + Err(block_not_found()), + ); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(block2_v2.clone())); // Run the UUT let _jh = tokio::spawn(sync(tx_event, mock, None, Chain::Goerli)); @@ -1526,8 +1766,18 @@ mod tests { }; // Fetch the genesis block with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK0_HASH, Ok(STATE_UPDATE0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK0_HASH).into(), + Ok(STATE_UPDATE0.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1535,8 +1785,18 @@ mod tests { Ok(CONTRACT0_DEF.clone()), ); // Fetch block #1 with respective state update and contracts - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(BLOCK1.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK1_HASH, Ok(STATE_UPDATE1.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(BLOCK1.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK1_HASH).into(), + Ok(STATE_UPDATE1.clone()), + ); expect_full_contract( &mut mock, &mut seq, @@ -1544,28 +1804,58 @@ mod tests { Ok(CONTRACT1_DEF.clone()), ); // Fetch block #2 whose parent hash does not match block #1 hash - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(block2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(block2.clone()), + ); // L2 sync task goes back block by block to find where the block hash matches the DB // It starts at the previous block to which the mismatch happened - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); // Finally the L2 sync task is downloading the new blocks once it knows where to start again // Fetch the new block #1 from the fork with respective state update - expect_block(&mut mock, &mut seq, BLOCK1_NUMBER, Ok(block1_v2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK1_NUMBER.into(), + Ok(block1_v2.clone()), + ); expect_state_update( &mut mock, &mut seq, - *BLOCK1_HASH_V2, + (*BLOCK1_HASH_V2).into(), Ok(STATE_UPDATE1_V2.clone()), ); // Fetch the block #2 again, now with respective state update - expect_block(&mut mock, &mut seq, BLOCK2_NUMBER, Ok(block2.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK2_HASH, Ok(STATE_UPDATE2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK2_NUMBER.into(), + Ok(block2.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK2_HASH).into(), + Ok(STATE_UPDATE2.clone()), + ); // Indicate that we are still staying at the head - no new blocks and the latest block matches our head - expect_block(&mut mock, &mut seq, BLOCK3_NUMBER, Err(block_not_found())); - expect_latest_block(&mut mock, &mut seq, Ok(block2.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK3_NUMBER.into(), + Err(block_not_found()), + ); + expect_block(&mut mock, &mut seq, BlockId::Latest, Ok(block2.clone())); // Run the UUT let _jh = tokio::spawn(sync(tx_event, mock, None, Chain::Goerli)); @@ -1640,8 +1930,18 @@ mod tests { let mut mock = MockClientApi::new(); let mut seq = mockall::Sequence::new(); - expect_block(&mut mock, &mut seq, BLOCK0_NUMBER, Ok(BLOCK0.clone())); - expect_state_update(&mut mock, &mut seq, *BLOCK0_HASH, Ok(STATE_UPDATE0.clone())); + expect_block( + &mut mock, + &mut seq, + BLOCK0_NUMBER.into(), + Ok(BLOCK0.clone()), + ); + expect_state_update( + &mut mock, + &mut seq, + (*BLOCK0_HASH).into(), + Ok(STATE_UPDATE0.clone()), + ); // Run the UUT let jh = tokio::spawn(sync(tx_event, mock, None, Chain::Goerli)); diff --git a/crates/stark_hash/src/hash.rs b/crates/stark_hash/src/hash.rs index dcae16ea2a..f8c8365e5f 100644 --- a/crates/stark_hash/src/hash.rs +++ b/crates/stark_hash/src/hash.rs @@ -369,8 +369,6 @@ mod tests { bits.set(3, false); bits.set(4, false); - dbg!(bits.len()); - let res = StarkHash::from_bits(&bits).unwrap(); let x = res.view_bits();