From 1049decd94087b55f021e8f2cce80533ea0e0e3a Mon Sep 17 00:00:00 2001 From: ltitanb Date: Fri, 22 Nov 2024 11:05:27 +0000 Subject: [PATCH 1/2] add retries --- crates/common/src/pbs/error.rs | 5 ++ .../pbs/src/mev_boost/register_validator.rs | 53 ++++++++++++++++- crates/pbs/src/mev_boost/submit_block.rs | 57 +++++++++++++++++-- 3 files changed, 107 insertions(+), 8 deletions(-) diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs index 8330d03a..f6f05ec0 100644 --- a/crates/common/src/pbs/error.rs +++ b/crates/common/src/pbs/error.rs @@ -34,6 +34,11 @@ impl PbsError { pub fn is_timeout(&self) -> bool { matches!(self, PbsError::Reqwest(err) if err.is_timeout()) } + + /// Whether the error is retryable in requests to relays + pub fn should_retry(&self) -> bool { + matches!(self, PbsError::RelayResponse { .. } | PbsError::Reqwest { .. }) + } } #[derive(Debug, Error, PartialEq, Eq)] diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 6c124693..0065549d 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -10,6 +10,7 @@ use eyre::bail; use futures::future::{join_all, select_ok}; use reqwest::header::USER_AGENT; use tracing::{debug, error, Instrument}; +use url::Url; use crate::{ constants::{MAX_SIZE_DEFAULT, REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, @@ -35,7 +36,7 @@ pub async fn register_validator( let mut handles = Vec::with_capacity(relays.len()); for relay in relays { handles.push(tokio::spawn( - send_register_validator( + send_register_validator_with_timeout( registrations.clone(), relay, send_headers.clone(), @@ -63,15 +64,61 @@ pub async fn register_validator( } } -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] -async fn send_register_validator( +/// Register validator to relay, retry connection errors until the +/// given timeout has passed +async fn send_register_validator_with_timeout( registrations: Vec, relay: RelayClient, headers: HeaderMap, timeout_ms: u64, ) -> Result<(), PbsError> { let url = relay.register_validator_url()?; + let mut remaining_timeout_ms = timeout_ms; + let mut retry = 0; + let mut backoff = Duration::from_millis(250); + + loop { + let start_request = Instant::now(); + match send_register_validator( + url.clone(), + ®istrations, + &relay, + headers.clone(), + remaining_timeout_ms, + retry, + ) + .await + { + Ok(_) => return Ok(()), + + Err(err) if err.should_retry() => { + tokio::time::sleep(backoff).await; + backoff += Duration::from_millis(250); + + remaining_timeout_ms = + timeout_ms.saturating_sub(start_request.elapsed().as_millis() as u64); + + if remaining_timeout_ms == 0 { + return Err(err); + } + } + + Err(err) => return Err(err), + }; + retry += 1; + } +} + +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref(), retry = retry))] +async fn send_register_validator( + url: Url, + registrations: &[ValidatorRegistration], + relay: &RelayClient, + headers: HeaderMap, + timeout_ms: u64, + retry: u32, +) -> Result<(), PbsError> { let start_request = Instant::now(); let res = match relay .client diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 5fdedfce..7079d48b 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -12,6 +12,7 @@ use cb_common::{ use futures::future::select_ok; use reqwest::header::USER_AGENT; use tracing::{debug, warn}; +use url::Url; use crate::{ constants::{MAX_SIZE_SUBMIT_BLOCK, SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, @@ -37,11 +38,11 @@ pub async fn submit_block( let relays = state.relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { - handles.push(Box::pin(send_submit_block( + handles.push(Box::pin(submit_block_with_timeout( &signed_blinded_block, relay, send_headers.clone(), - state.config.pbs_config.timeout_get_payload_ms, + state.pbs_config().timeout_get_payload_ms, ))); } @@ -52,17 +53,63 @@ pub async fn submit_block( } } +/// Submit blinded block to relay, retry connection errors until the +/// given timeout has passed +async fn submit_block_with_timeout( + signed_blinded_block: &SignedBlindedBeaconBlock, + relay: &RelayClient, + headers: HeaderMap, + timeout_ms: u64, +) -> Result { + let url = relay.submit_block_url()?; + let mut remaining_timeout_ms = timeout_ms; + let mut retry = 0; + let mut backoff = Duration::from_millis(250); + + loop { + let start_request = Instant::now(); + match send_submit_block( + url.clone(), + &signed_blinded_block, + relay, + headers.clone(), + remaining_timeout_ms, + retry, + ) + .await + { + Ok(response) => return Ok(response), + + Err(err) if err.should_retry() => { + tokio::time::sleep(backoff).await; + backoff += Duration::from_millis(250); + + remaining_timeout_ms = + timeout_ms.saturating_sub(start_request.elapsed().as_millis() as u64); + + if remaining_timeout_ms == 0 { + return Err(err); + } + } + + Err(err) => return Err(err), + }; + + retry += 1; + } +} + // submits blinded signed block and expects the execution payload + blobs bundle // back -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref(), retry = retry))] async fn send_submit_block( + url: Url, signed_blinded_block: &SignedBlindedBeaconBlock, relay: &RelayClient, headers: HeaderMap, timeout_ms: u64, + retry: u32, ) -> Result { - let url = relay.submit_block_url()?; - let start_request = Instant::now(); let res = match relay .client From 8b6e7adfaca3cbf34ea74e75e9aa1fe34e8259cb Mon Sep 17 00:00:00 2001 From: ltitanb Date: Fri, 22 Nov 2024 11:13:53 +0000 Subject: [PATCH 2/2] clippy --- crates/pbs/src/mev_boost/submit_block.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 7079d48b..7f39828e 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -70,7 +70,7 @@ async fn submit_block_with_timeout( let start_request = Instant::now(); match send_submit_block( url.clone(), - &signed_blinded_block, + signed_blinded_block, relay, headers.clone(), remaining_timeout_ms,