Skip to content

Commit

Permalink
broken retry attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
Mirko-von-Leipzig committed Jun 10, 2022
1 parent 7468e32 commit 5dc5384
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 9 deletions.
30 changes: 21 additions & 9 deletions crates/pathfinder/src/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,15 +239,13 @@ impl ClientApi for Client {
&self,
block_number: BlockNumberOrTag,
) -> Result<reply::Block, SequencerError> {
retry(|| async move {
self.request()
.feeder_gateway()
.get_block()
.at_block(block_number)
.get()
.await
})
.await
self.request()
.feeder_gateway()
.get_block()
.at_block(block_number)
.auto_retry()
.get()
.await
}

/// Get block by hash.
Expand All @@ -261,6 +259,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_block()
.at_block(block_hash)
.auto_retry()
.get()
.await
})
Expand All @@ -279,6 +278,7 @@ impl ClientApi for Client {
.feeder_gateway()
.call_contract()
.at_block(block_hash)
.auto_retry()
.post_with_json(&payload)
.await
})
Expand All @@ -296,6 +296,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_full_contract()
.with_contract_address(contract_addr)
.auto_retry()
.get_as_bytes()
.await
})
Expand All @@ -310,6 +311,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_class_by_hash()
.with_class_hash(class_hash)
.auto_retry()
.get_as_bytes()
.await
})
Expand All @@ -327,6 +329,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_class_hash_at()
.with_contract_address(contract_address)
.auto_retry()
.get()
.await
})
Expand All @@ -348,6 +351,7 @@ impl ClientApi for Client {
.with_contract_address(contract_addr)
.with_storage_address(key)
.at_block(block_hash)
.auto_retry()
.get()
.await
})
Expand All @@ -365,6 +369,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_transaction()
.with_transaction_hash(transaction_hash)
.auto_retry()
.get()
.await
})
Expand All @@ -382,6 +387,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_transaction_status()
.with_transaction_hash(transaction_hash)
.auto_retry()
.get()
.await
})
Expand All @@ -399,6 +405,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_state_update()
.at_block(block_hash)
.auto_retry()
.get()
.await
})
Expand All @@ -416,6 +423,7 @@ impl ClientApi for Client {
.feeder_gateway()
.get_state_update()
.at_block(block_number)
.auto_retry()
.get()
.await
})
Expand All @@ -429,6 +437,7 @@ impl ClientApi for Client {
self.request()
.feeder_gateway()
.get_contract_addresses()
.auto_retry()
.get()
.await
})
Expand Down Expand Up @@ -461,6 +470,7 @@ impl ClientApi for Client {
self.request()
.gateway()
.add_transaction()
.without_retry()
.post_with_json(&req)
.await
}
Expand Down Expand Up @@ -496,6 +506,7 @@ impl ClientApi for Client {
.add_transaction()
// mainnet requires a token (but testnet does not so its optional).
.with_optional_token(token.as_deref())
.without_retry()
.post_with_json(&req)
.await
}
Expand Down Expand Up @@ -526,6 +537,7 @@ impl ClientApi for Client {
.add_transaction()
// mainnet requires a token (but testnet does not so its optional).
.with_optional_token(token.as_deref())
.without_retry()
.post_with_json(&req)
.await
}
Expand Down
125 changes: 125 additions & 0 deletions crates/pathfinder/src/sequencer/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ pub struct Start;
pub struct WithUrl;
pub struct WithGateWay;
pub struct WithMethod;
pub struct WithRetry;
pub struct WithoutRetry;

pub enum BlockId {
Number(StarknetBlockNumber),
Expand Down Expand Up @@ -182,6 +184,24 @@ impl<'a> Request<'a, WithMethod> {
self
}

pub fn auto_retry(self) -> Request<'a, WithRetry> {
Request {
url: self.url,
client: self.client,
marker: PhantomData::default(),
}
}

pub fn without_retry(self) -> Request<'a, WithoutRetry> {
Request {
url: self.url,
client: self.client,
marker: PhantomData::default(),
}
}
}

impl<'a> Request<'a, WithoutRetry> {
pub async fn get<T>(self) -> Result<T, SequencerError>
where
T: serde::de::DeserializeOwned,
Expand All @@ -206,6 +226,36 @@ impl<'a> Request<'a, WithMethod> {
}
}

impl<'a> Request<'a, WithRetry> {
pub async fn get<T>(self) -> Result<T, SequencerError>
where
T: serde::de::DeserializeOwned,
{
let without_retry = Request::<WithoutRetry> {
url: self.url,
client: self.client,
marker: PhantomData::default(),
};

retry0(|| async move { without_retry.get().await }, retry_condition).await
}

pub async fn get_as_bytes(self) -> Result<bytes::Bytes, SequencerError> {
let response = self.client.get(self.url).send().await?;
let bytes = parse_raw(response).await?.bytes().await?;
Ok(bytes)
}

pub async fn post_with_json<T, J>(self, json: &J) -> Result<T, SequencerError>
where
T: serde::de::DeserializeOwned,
J: serde::Serialize + ?Sized,
{
let response = self.client.post(self.url).json(json).send().await?;
parse::<T>(response).await
}
}

pub async fn parse<T>(response: reqwest::Response) -> Result<T, SequencerError>
where
T: ::serde::de::DeserializeOwned,
Expand Down Expand Up @@ -235,6 +285,8 @@ impl RequestState for Start {}
impl RequestState for WithUrl {}
impl RequestState for WithGateWay {}
impl RequestState for WithMethod {}
impl RequestState for WithRetry {}
impl RequestState for WithoutRetry {}

impl From<crate::rpc::types::BlockNumberOrTag> for BlockId {
fn from(block: crate::rpc::types::BlockNumberOrTag) -> Self {
Expand All @@ -261,3 +313,76 @@ impl From<crate::rpc::types::BlockHashOrTag> for BlockId {
}
}
}

/// Wrapper function to allow retrying sequencer queries in an exponential manner.
///
/// Does not retry in tests.
async fn retry<T, Fut, FutureFactory>(future_factory: FutureFactory) -> Result<T, SequencerError>
where
Fut: futures::Future<Output = Result<T, SequencerError>>,
FutureFactory: FnMut() -> Fut,
{
if cfg!(test) {
retry0(future_factory, |_| false).await
} else {
retry0(future_factory, retry_condition).await
}
}

/// Wrapper function to allow retrying sequencer queries in an exponential manner.
async fn retry0<T, Fut, FutureFactory, Ret>(
future_factory: FutureFactory,
retry_condition: Ret,
) -> Result<T, SequencerError>
where
Fut: futures::Future<Output = Result<T, SequencerError>>,
FutureFactory: FnMut() -> Fut,
Ret: FnMut(&SequencerError) -> bool,
{
use crate::retry::Retry;
use std::num::NonZeroU64;

Retry::exponential(future_factory, NonZeroU64::new(2).unwrap())
.factor(NonZeroU64::new(15).unwrap())
.max_delay(std::time::Duration::from_secs(60 * 60))
.when(retry_condition)
.await
}

/// Determines if an error is retryable or not.
fn retry_condition(e: &SequencerError) -> bool {
use reqwest::StatusCode;
use tracing::{debug, error, info, warn};

match e {
SequencerError::ReqwestError(e) => {
if e.is_body() || e.is_connect() || e.is_timeout() {
info!(reason=%e, "Request failed, retrying");
} else if e.is_status() {
match e.status() {
Some(
StatusCode::NOT_FOUND
| StatusCode::TOO_MANY_REQUESTS
| StatusCode::BAD_GATEWAY
| StatusCode::SERVICE_UNAVAILABLE
| StatusCode::GATEWAY_TIMEOUT,
) => {
debug!(reason=%e, "Request failed, retrying");
}
Some(StatusCode::INTERNAL_SERVER_ERROR) => {
error!(reason=%e, "Request failed, retrying");
}
Some(_) => warn!(reason=%e, "Request failed, retrying"),
None => unreachable!(),
}
} else if e.is_decode() {
error!(reason=%e, "Request failed, retrying");
} else {
warn!(reason=%e, "Request failed, retrying");
}

true
}
SequencerError::StarknetError(_) => false,
}
}

0 comments on commit 5dc5384

Please sign in to comment.