From 0546f88f2a6fdc2f2816d3a984bee7ea0c582d60 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig Date: Fri, 10 Jun 2022 15:00:54 +0200 Subject: [PATCH] feat(sequencer): embed retry logic in builder --- crates/pathfinder/src/sequencer.rs | 421 +++++---------------- crates/pathfinder/src/sequencer/builder.rs | 312 +++++++++++++++ 2 files changed, 401 insertions(+), 332 deletions(-) diff --git a/crates/pathfinder/src/sequencer.rs b/crates/pathfinder/src/sequencer.rs index fea44ab6f9..43fd9a3af9 100644 --- a/crates/pathfinder/src/sequencer.rs +++ b/crates/pathfinder/src/sequencer.rs @@ -16,7 +16,7 @@ use crate::{ sequencer::error::SequencerError, }; use reqwest::Url; -use std::{fmt::Debug, future::Future, result::Result, time::Duration}; +use std::{fmt::Debug, result::Result, time::Duration}; #[cfg_attr(test, mockall::automock)] #[async_trait::async_trait] @@ -124,79 +124,6 @@ pub struct Client { sequencer_url: Url, } -/// Wrapper function to allow retrying sequencer queries in an exponential manner. -/// -/// Does not retry in tests. -async fn retry(future_factory: FutureFactory) -> Result -where - Fut: Future>, - FutureFactory: FnMut() -> Fut, -{ - if cfg!(test) { - retry0(future_factory, |_| false).await - } else { - retry0(future_factory, retry_condition).await - } -} - -/// Wrapper function to allow retrying sequencer queries in an exponential manner. -async fn retry0( - future_factory: FutureFactory, - retry_condition: Ret, -) -> Result -where - Fut: Future>, - FutureFactory: FnMut() -> Fut, - Ret: FnMut(&SequencerError) -> bool, -{ - use crate::retry::Retry; - use std::num::NonZeroU64; - - Retry::exponential(future_factory, NonZeroU64::new(2).unwrap()) - .factor(NonZeroU64::new(15).unwrap()) - .max_delay(Duration::from_secs(60 * 60)) - .when(retry_condition) - .await -} - -/// Determines if an error is retryable or not. -fn retry_condition(e: &SequencerError) -> bool { - use reqwest::StatusCode; - use tracing::{debug, error, info, warn}; - - match e { - SequencerError::ReqwestError(e) => { - if e.is_body() || e.is_connect() || e.is_timeout() { - info!(reason=%e, "Request failed, retrying"); - } else if e.is_status() { - match e.status() { - Some( - StatusCode::NOT_FOUND - | StatusCode::TOO_MANY_REQUESTS - | StatusCode::BAD_GATEWAY - | StatusCode::SERVICE_UNAVAILABLE - | StatusCode::GATEWAY_TIMEOUT, - ) => { - debug!(reason=%e, "Request failed, retrying"); - } - Some(StatusCode::INTERNAL_SERVER_ERROR) => { - error!(reason=%e, "Request failed, retrying"); - } - Some(_) => warn!(reason=%e, "Request failed, retrying"), - None => unreachable!(), - } - } else if e.is_decode() { - error!(reason=%e, "Request failed, retrying"); - } else { - warn!(reason=%e, "Request failed, retrying"); - } - - true - } - SequencerError::StarknetError(_) => false, - } -} - impl Client { /// Creates a new Sequencer client for the given chain. pub fn new(chain: Chain) -> reqwest::Result { @@ -239,15 +166,13 @@ impl ClientApi for Client { &self, block_number: BlockNumberOrTag, ) -> Result { - retry(|| async move { - self.request() - .feeder_gateway() - .get_block() - .at_block(block_number) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_block() + .at_block(block_number) + .auto_retry() + .get() + .await } /// Get block by hash. @@ -256,15 +181,13 @@ impl ClientApi for Client { &self, block_hash: BlockHashOrTag, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_block() - .at_block(block_hash) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_block() + .at_block(block_hash) + .auto_retry() + .get() + .await } /// Performs a `call` on contract's function. Call result is not stored in L2, as opposed to `invoke`. @@ -274,15 +197,13 @@ impl ClientApi for Client { payload: request::Call, block_hash: BlockHashOrTag, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .call_contract() - .at_block(block_hash) - .post_with_json(&payload) - .await - }) - .await + self.request() + .feeder_gateway() + .call_contract() + .at_block(block_hash) + .auto_retry() + .post_with_json(&payload) + .await } /// Gets full contract definition. @@ -291,29 +212,25 @@ impl ClientApi for Client { &self, contract_addr: ContractAddress, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_full_contract() - .with_contract_address(contract_addr) - .get_as_bytes() - .await - }) - .await + self.request() + .feeder_gateway() + .get_full_contract() + .with_contract_address(contract_addr) + .auto_retry() + .get_as_bytes() + .await } /// Gets class for a particular class hash. #[tracing::instrument(skip(self))] async fn class_by_hash(&self, class_hash: ClassHash) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_class_by_hash() - .with_class_hash(class_hash) - .get_as_bytes() - .await - }) - .await + self.request() + .feeder_gateway() + .get_class_by_hash() + .with_class_hash(class_hash) + .auto_retry() + .get_as_bytes() + .await } /// Gets class hash for a particular contract address. @@ -322,15 +239,13 @@ impl ClientApi for Client { &self, contract_address: ContractAddress, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_class_hash_at() - .with_contract_address(contract_address) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_class_hash_at() + .with_contract_address(contract_address) + .auto_retry() + .get() + .await } /// Gets storage value associated with a `key` for a prticular contract. @@ -341,17 +256,15 @@ impl ClientApi for Client { key: StorageAddress, block_hash: BlockHashOrTag, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_storage_at() - .with_contract_address(contract_addr) - .with_storage_address(key) - .at_block(block_hash) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_storage_at() + .with_contract_address(contract_addr) + .with_storage_address(key) + .at_block(block_hash) + .auto_retry() + .get() + .await } /// Gets transaction by hash. @@ -360,15 +273,13 @@ impl ClientApi for Client { &self, transaction_hash: StarknetTransactionHash, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_transaction() - .with_transaction_hash(transaction_hash) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_transaction() + .with_transaction_hash(transaction_hash) + .auto_retry() + .get() + .await } /// Gets transaction status by transaction hash. @@ -377,15 +288,13 @@ impl ClientApi for Client { &self, transaction_hash: StarknetTransactionHash, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_transaction_status() - .with_transaction_hash(transaction_hash) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_transaction_status() + .with_transaction_hash(transaction_hash) + .auto_retry() + .get() + .await } /// Gets state update for a particular block hash. @@ -394,15 +303,13 @@ impl ClientApi for Client { &self, block_hash: BlockHashOrTag, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_state_update() - .at_block(block_hash) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_state_update() + .at_block(block_hash) + .auto_retry() + .get() + .await } /// Gets state update for a particular block number. @@ -411,28 +318,24 @@ impl ClientApi for Client { &self, block_number: BlockNumberOrTag, ) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_state_update() - .at_block(block_number) - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_state_update() + .at_block(block_number) + .auto_retry() + .get() + .await } /// Gets addresses of the Ethereum contracts crucial to Starknet operation. #[tracing::instrument(skip(self))] async fn eth_contract_addresses(&self) -> Result { - retry(|| async { - self.request() - .feeder_gateway() - .get_contract_addresses() - .get() - .await - }) - .await + self.request() + .feeder_gateway() + .get_contract_addresses() + .auto_retry() + .get() + .await } /// Adds a transaction invoking a contract. @@ -461,6 +364,7 @@ impl ClientApi for Client { self.request() .gateway() .add_transaction() + .without_retry() .post_with_json(&req) .await } @@ -496,6 +400,7 @@ impl ClientApi for Client { .add_transaction() // mainnet requires a token (but testnet does not so its optional). .with_optional_token(token.as_deref()) + .without_retry() .post_with_json(&req) .await } @@ -526,6 +431,7 @@ impl ClientApi for Client { .add_transaction() // mainnet requires a token (but testnet does not so its optional). .with_optional_token(token.as_deref()) + .without_retry() .post_with_json(&req) .await } @@ -2060,153 +1966,4 @@ mod tests { } } } - - mod retry { - use super::{SequencerError, StarknetErrorCode}; - use assert_matches::assert_matches; - use http::{response::Builder, StatusCode}; - use pretty_assertions::assert_eq; - use std::{ - collections::VecDeque, convert::Infallible, net::SocketAddr, sync::Arc, time::Duration, - }; - use tokio::{sync::Mutex, task::JoinHandle}; - use warp::Filter; - - // A test helper - fn status_queue_server( - statuses: VecDeque<(StatusCode, &'static str)>, - ) -> (JoinHandle<()>, SocketAddr) { - use std::cell::RefCell; - - let statuses = Arc::new(Mutex::new(RefCell::new(statuses))); - let any = warp::any().and_then(move || { - let s = statuses.clone(); - async move { - let s = s.lock().await; - let s = s.borrow_mut().pop_front().unwrap(); - Result::<_, Infallible>::Ok(Builder::new().status(s.0).body(s.1)) - } - }); - - let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); - let server_handle = tokio::spawn(run_srv); - (server_handle, addr) - } - - // A test helper - fn slow_server() -> (tokio::task::JoinHandle<()>, std::net::SocketAddr) { - async fn slow() -> Result { - tokio::time::sleep(Duration::from_secs(1)).await; - Ok(Builder::new().status(200).body("")) - } - - let any = warp::any().and_then(slow); - let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); - let server_handle = tokio::spawn(run_srv); - (server_handle, addr) - } - - #[test_log::test(tokio::test)] - async fn stop_on_ok() { - use crate::sequencer::builder; - - let statuses = VecDeque::from([ - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - (StatusCode::GATEWAY_TIMEOUT, ""), - (StatusCode::OK, r#""Finally!""#), - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - ]); - - let (_jh, addr) = status_queue_server(statuses); - let result = super::retry0( - || async { - let mut url = reqwest::Url::parse("http://localhost/").unwrap(); - url.set_port(Some(addr.port())).unwrap(); - let response = reqwest::get(url).await?; - builder::parse::(response).await - }, - super::retry_condition, - ) - .await - .unwrap(); - assert_eq!(result, "Finally!"); - } - - #[test_log::test(tokio::test)] - async fn stop_on_fatal() { - use crate::sequencer::builder; - - let statuses = VecDeque::from([ - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - (StatusCode::GATEWAY_TIMEOUT, ""), - ( - StatusCode::INTERNAL_SERVER_ERROR, - r#"{"code":"StarknetErrorCode.BLOCK_NOT_FOUND","message":""}"#, - ), - (StatusCode::TOO_MANY_REQUESTS, ""), - (StatusCode::BAD_GATEWAY, ""), - (StatusCode::SERVICE_UNAVAILABLE, ""), - ]); - - let (_jh, addr) = status_queue_server(statuses); - let error = super::retry0( - || async { - let mut url = reqwest::Url::parse("http://localhost/").unwrap(); - url.set_port(Some(addr.port())).unwrap(); - let response = reqwest::get(url).await?; - builder::parse::(response).await - }, - super::retry_condition, - ) - .await - .unwrap_err(); - assert_matches!( - error, - SequencerError::StarknetError(se) => assert_eq!(se.code, StarknetErrorCode::BlockNotFound) - ); - } - - #[tokio::test(flavor = "current_thread", start_paused = true)] - async fn request_timeout() { - use crate::sequencer::builder; - - use std::sync::atomic::{AtomicUsize, Ordering}; - - let (_jh, addr) = slow_server(); - static CNT: AtomicUsize = AtomicUsize::new(0); - - let fut = super::retry0( - || async { - let mut url = reqwest::Url::parse("http://localhost/").unwrap(); - url.set_port(Some(addr.port())).unwrap(); - - let client = reqwest::Client::builder().build().unwrap(); - - CNT.fetch_add(1, Ordering::Relaxed); - - // This is the same as using Client::builder().timeout() - let response = client - .get(url) - .timeout(Duration::from_millis(1)) - .send() - .await?; - builder::parse::(response).await - }, - super::retry_condition, - ); - - // The retry loops forever, so wrap it in a timeout and check the counter. - tokio::time::timeout(Duration::from_millis(250), fut) - .await - .unwrap_err(); - // 4th try should have timedout if this is really exponential backoff - assert_eq!(CNT.load(Ordering::Relaxed), 4); - } - } } diff --git a/crates/pathfinder/src/sequencer/builder.rs b/crates/pathfinder/src/sequencer/builder.rs index a76935b370..0cdef3421e 100644 --- a/crates/pathfinder/src/sequencer/builder.rs +++ b/crates/pathfinder/src/sequencer/builder.rs @@ -20,6 +20,8 @@ pub struct Start; pub struct WithUrl; pub struct WithGateWay; pub struct WithMethod; +pub struct WithRetry; +pub struct WithoutRetry; pub enum BlockId { Number(StarknetBlockNumber), @@ -182,6 +184,24 @@ impl<'a> Request<'a, WithMethod> { self } + pub fn auto_retry(self) -> Request<'a, WithRetry> { + Request { + url: self.url, + client: self.client, + marker: PhantomData::default(), + } + } + + pub fn without_retry(self) -> Request<'a, WithoutRetry> { + Request { + url: self.url, + client: self.client, + marker: PhantomData::default(), + } + } +} + +impl<'a> Request<'a, WithoutRetry> { pub async fn get(self) -> Result where T: serde::de::DeserializeOwned, @@ -206,6 +226,69 @@ impl<'a> Request<'a, WithMethod> { } } +impl<'a> Request<'a, WithRetry> { + pub async fn get(self) -> Result + where + T: serde::de::DeserializeOwned, + { + retry0( + || { + let clone_url = self.url.clone(); + async move { + let r = Request:: { + url: clone_url, + client: self.client, + marker: PhantomData::default(), + }; + r.get().await + } + }, + retry_condition, + ) + .await + } + + pub async fn get_as_bytes(self) -> Result { + retry0( + || { + let clone_url = self.url.clone(); + async move { + let r = Request:: { + url: clone_url, + client: self.client, + marker: PhantomData::default(), + }; + r.get_as_bytes().await + } + }, + retry_condition, + ) + .await + } + + pub async fn post_with_json(self, json: &J) -> Result + where + T: serde::de::DeserializeOwned, + J: serde::Serialize + ?Sized, + { + retry0( + || { + let clone_url = self.url.clone(); + async move { + let r = Request:: { + url: clone_url, + client: self.client, + marker: PhantomData::default(), + }; + r.post_with_json(json).await + } + }, + retry_condition, + ) + .await + } +} + pub async fn parse(response: reqwest::Response) -> Result where T: ::serde::de::DeserializeOwned, @@ -235,6 +318,8 @@ impl RequestState for Start {} impl RequestState for WithUrl {} impl RequestState for WithGateWay {} impl RequestState for WithMethod {} +impl RequestState for WithRetry {} +impl RequestState for WithoutRetry {} impl From for BlockId { fn from(block: crate::rpc::types::BlockNumberOrTag) -> Self { @@ -261,3 +346,230 @@ impl From for BlockId { } } } + +/// Wrapper function to allow retrying sequencer queries in an exponential manner. +/// +/// Does not retry in tests. +async fn retry(future_factory: FutureFactory) -> Result +where + Fut: futures::Future>, + FutureFactory: FnMut() -> Fut, +{ + if cfg!(test) { + retry0(future_factory, |_| false).await + } else { + retry0(future_factory, retry_condition).await + } +} + +/// Wrapper function to allow retrying sequencer queries in an exponential manner. +async fn retry0( + future_factory: FutureFactory, + retry_condition: Ret, +) -> Result +where + Fut: futures::Future>, + FutureFactory: FnMut() -> Fut, + Ret: FnMut(&SequencerError) -> bool, +{ + use crate::retry::Retry; + use std::num::NonZeroU64; + + Retry::exponential(future_factory, NonZeroU64::new(2).unwrap()) + .factor(NonZeroU64::new(15).unwrap()) + .max_delay(std::time::Duration::from_secs(60 * 60)) + .when(retry_condition) + .await +} + +/// Determines if an error is retryable or not. +fn retry_condition(e: &SequencerError) -> bool { + use reqwest::StatusCode; + use tracing::{debug, error, info, warn}; + + match e { + SequencerError::ReqwestError(e) => { + if e.is_body() || e.is_connect() || e.is_timeout() { + info!(reason=%e, "Request failed, retrying"); + } else if e.is_status() { + match e.status() { + Some( + StatusCode::NOT_FOUND + | StatusCode::TOO_MANY_REQUESTS + | StatusCode::BAD_GATEWAY + | StatusCode::SERVICE_UNAVAILABLE + | StatusCode::GATEWAY_TIMEOUT, + ) => { + debug!(reason=%e, "Request failed, retrying"); + } + Some(StatusCode::INTERNAL_SERVER_ERROR) => { + error!(reason=%e, "Request failed, retrying"); + } + Some(_) => warn!(reason=%e, "Request failed, retrying"), + None => unreachable!(), + } + } else if e.is_decode() { + error!(reason=%e, "Request failed, retrying"); + } else { + warn!(reason=%e, "Request failed, retrying"); + } + + true + } + SequencerError::StarknetError(_) => false, + } +} + +#[cfg(test)] +mod tests { + mod retry { + use assert_matches::assert_matches; + use http::{response::Builder, StatusCode}; + use pretty_assertions::assert_eq; + use std::{ + collections::VecDeque, convert::Infallible, net::SocketAddr, sync::Arc, time::Duration, + }; + use tokio::{sync::Mutex, task::JoinHandle}; + use warp::Filter; + + use crate::sequencer::builder::{retry0, retry_condition}; + + // A test helper + fn status_queue_server( + statuses: VecDeque<(StatusCode, &'static str)>, + ) -> (JoinHandle<()>, SocketAddr) { + use std::cell::RefCell; + + let statuses = Arc::new(Mutex::new(RefCell::new(statuses))); + let any = warp::any().and_then(move || { + let s = statuses.clone(); + async move { + let s = s.lock().await; + let s = s.borrow_mut().pop_front().unwrap(); + Result::<_, Infallible>::Ok(Builder::new().status(s.0).body(s.1)) + } + }); + + let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); + let server_handle = tokio::spawn(run_srv); + (server_handle, addr) + } + + // A test helper + fn slow_server() -> (tokio::task::JoinHandle<()>, std::net::SocketAddr) { + async fn slow() -> Result { + tokio::time::sleep(Duration::from_secs(1)).await; + Ok(Builder::new().status(200).body("")) + } + + let any = warp::any().and_then(slow); + let (addr, run_srv) = warp::serve(any).bind_ephemeral(([127, 0, 0, 1], 0)); + let server_handle = tokio::spawn(run_srv); + (server_handle, addr) + } + + #[test_log::test(tokio::test)] + async fn stop_on_ok() { + use crate::sequencer::builder; + + let statuses = VecDeque::from([ + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + (StatusCode::GATEWAY_TIMEOUT, ""), + (StatusCode::OK, r#""Finally!""#), + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + ]); + + let (_jh, addr) = status_queue_server(statuses); + let result = retry0( + || async { + let mut url = reqwest::Url::parse("http://localhost/").unwrap(); + url.set_port(Some(addr.port())).unwrap(); + let response = reqwest::get(url).await?; + builder::parse::(response).await + }, + retry_condition, + ) + .await + .unwrap(); + assert_eq!(result, "Finally!"); + } + + #[test_log::test(tokio::test)] + async fn stop_on_fatal() { + use crate::sequencer::builder; + use crate::sequencer::error::{SequencerError, StarknetErrorCode}; + + let statuses = VecDeque::from([ + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + (StatusCode::GATEWAY_TIMEOUT, ""), + ( + StatusCode::INTERNAL_SERVER_ERROR, + r#"{"code":"StarknetErrorCode.BLOCK_NOT_FOUND","message":""}"#, + ), + (StatusCode::TOO_MANY_REQUESTS, ""), + (StatusCode::BAD_GATEWAY, ""), + (StatusCode::SERVICE_UNAVAILABLE, ""), + ]); + + let (_jh, addr) = status_queue_server(statuses); + let error = retry0( + || async { + let mut url = reqwest::Url::parse("http://localhost/").unwrap(); + url.set_port(Some(addr.port())).unwrap(); + let response = reqwest::get(url).await?; + builder::parse::(response).await + }, + retry_condition, + ) + .await + .unwrap_err(); + assert_matches!( + error, + SequencerError::StarknetError(se) => assert_eq!(se.code, StarknetErrorCode::BlockNotFound) + ); + } + + #[tokio::test(flavor = "current_thread", start_paused = true)] + async fn request_timeout() { + use crate::sequencer::builder; + + use std::sync::atomic::{AtomicUsize, Ordering}; + + let (_jh, addr) = slow_server(); + static CNT: AtomicUsize = AtomicUsize::new(0); + + let fut = retry0( + || async { + let mut url = reqwest::Url::parse("http://localhost/").unwrap(); + url.set_port(Some(addr.port())).unwrap(); + + let client = reqwest::Client::builder().build().unwrap(); + + CNT.fetch_add(1, Ordering::Relaxed); + + // This is the same as using Client::builder().timeout() + let response = client + .get(url) + .timeout(Duration::from_millis(1)) + .send() + .await?; + builder::parse::(response).await + }, + retry_condition, + ); + + // The retry loops forever, so wrap it in a timeout and check the counter. + tokio::time::timeout(Duration::from_millis(250), fut) + .await + .unwrap_err(); + // 4th try should have timedout if this is really exponential backoff + assert_eq!(CNT.load(Ordering::Relaxed), 4); + } + } +}