diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 6c993716..f41eef12 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -27,11 +27,12 @@ use crate::{ }, pbs::{ BuilderEventPublisher, DefaultTimeout, RelayClient, RelayEntry, DEFAULT_PBS_PORT, - LATE_IN_SLOT_TIME_MS, + LATE_IN_SLOT_TIME_MS, REGISTER_VALIDATOR_RETRY_LIMIT, }, types::{Chain, Jwt, ModuleId}, utils::{ - as_eth_str, default_bool, default_host, default_u16, default_u256, default_u64, WEI_PER_ETH, + as_eth_str, default_bool, default_host, default_u16, default_u256, default_u32, + default_u64, WEI_PER_ETH, }, }; @@ -122,6 +123,9 @@ pub struct PbsConfig { pub extra_validation_enabled: bool, /// Execution Layer RPC url to use for extra validation pub rpc_url: Option, + /// Maximum number of retries for validator registration request per relay + #[serde(default = "default_u32::")] + pub register_validator_retry_limit: u32, } impl PbsConfig { @@ -140,6 +144,10 @@ impl PbsConfig { self.timeout_get_header_ms < self.late_in_slot_time_ms, "timeout_get_header_ms must be less than late_in_slot_time_ms" ); + ensure!( + self.register_validator_retry_limit > 0, + "register_validator_retry_limit must be greater than 0" + ); ensure!( self.min_bid_wei < U256::from(WEI_PER_ETH), diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index 1311223a..314915ce 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -30,3 +30,6 @@ impl DefaultTimeout { } pub const LATE_IN_SLOT_TIME_MS: u64 = 2000; + +// Maximum number of retries for validator registration request per relay +pub const REGISTER_VALIDATOR_RETRY_LIMIT: u32 = 3; diff --git a/crates/common/src/pbs/error.rs b/crates/common/src/pbs/error.rs index 2798586c..242cb90e 100644 --- a/crates/common/src/pbs/error.rs +++ b/crates/common/src/pbs/error.rs @@ -37,7 +37,18 @@ impl PbsError { /// Whether the error is retryable in requests to relays pub fn should_retry(&self) -> bool { - matches!(self, PbsError::RelayResponse { .. } | PbsError::Reqwest { .. }) + match self { + PbsError::Reqwest(err) => { + // Retry on timeout or connection error + err.is_timeout() || err.is_connect() + } + PbsError::RelayResponse { code, .. } => match *code { + 500..509 => true, // Retry on server errors + 400 | 429 => false, // Do not retry if rate limited or bad request + _ => false, + }, + _ => false, + } } } diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 37119580..a1dcb7cb 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -137,6 +137,10 @@ pub const fn default_u64() -> u64 { U } +pub const fn default_u32() -> u32 { + U +} + pub const fn default_u16() -> u16 { U } diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 8a82777e..c99f5d5f 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -43,6 +43,7 @@ pub async fn register_validator( relay.clone(), send_headers.clone(), state.pbs_config().timeout_register_validator_ms, + state.pbs_config().register_validator_retry_limit, ) .in_current_span(), )); @@ -54,6 +55,7 @@ pub async fn register_validator( relay.clone(), send_headers.clone(), state.pbs_config().timeout_register_validator_ms, + state.pbs_config().register_validator_retry_limit, ) .in_current_span(), )); @@ -85,6 +87,7 @@ async fn send_register_validator_with_timeout( relay: RelayClient, headers: HeaderMap, timeout_ms: u64, + retry_limit: u32, ) -> Result<(), PbsError> { let url = relay.register_validator_url()?; let mut remaining_timeout_ms = timeout_ms; @@ -106,6 +109,14 @@ async fn send_register_validator_with_timeout( Ok(_) => return Ok(()), Err(err) if err.should_retry() => { + retry += 1; + if retry >= retry_limit { + error!( + relay_id = relay.id.as_str(), + retry, "reached retry limit for validator registration" + ); + return Err(err); + } tokio::time::sleep(backoff).await; backoff += Duration::from_millis(250); @@ -119,8 +130,6 @@ async fn send_register_validator_with_timeout( Err(err) => return Err(err), }; - - retry += 1; } } diff --git a/tests/src/mock_relay.rs b/tests/src/mock_relay.rs index beedb5d7..c70efa56 100644 --- a/tests/src/mock_relay.rs +++ b/tests/src/mock_relay.rs @@ -2,7 +2,7 @@ use std::{ net::SocketAddr, sync::{ atomic::{AtomicU64, Ordering}, - Arc, + Arc, RwLock, }, }; @@ -48,6 +48,7 @@ pub struct MockRelayState { received_get_status: Arc, received_register_validator: Arc, received_submit_block: Arc, + response_override: RwLock>, } impl MockRelayState { @@ -66,6 +67,9 @@ impl MockRelayState { pub fn large_body(&self) -> bool { self.large_body } + pub fn set_response_override(&self, status: StatusCode) { + *self.response_override.write().unwrap() = Some(status); + } } impl MockRelayState { @@ -78,6 +82,7 @@ impl MockRelayState { received_get_status: Default::default(), received_register_validator: Default::default(), received_submit_block: Default::default(), + response_override: RwLock::new(None), } } @@ -130,7 +135,12 @@ async fn handle_register_validator( ) -> impl IntoResponse { state.received_register_validator.fetch_add(1, Ordering::Relaxed); debug!("Received {} registrations", validators.len()); - StatusCode::OK + + if let Some(status) = state.response_override.read().unwrap().as_ref() { + return (*status).into_response(); + } + + StatusCode::OK.into_response() } async fn handle_submit_block(State(state): State>) -> Response { diff --git a/tests/src/utils.rs b/tests/src/utils.rs index f2ae9157..66524514 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -72,6 +72,7 @@ pub fn get_pbs_static_config(port: u16) -> PbsConfig { late_in_slot_time_ms: u64::MAX, extra_validation_enabled: false, rpc_url: None, + register_validator_retry_limit: u32::MAX, } } diff --git a/tests/tests/pbs_post_validators.rs b/tests/tests/pbs_post_validators.rs index c0a27c93..4c70ac01 100644 --- a/tests/tests/pbs_post_validators.rs +++ b/tests/tests/pbs_post_validators.rs @@ -46,7 +46,7 @@ async fn test_register_validators() -> Result<()> { "message": { "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "gas_limit": "100000", - "timestamp": "1000000", + "timestamp": "1000000", "pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" }, "signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" @@ -93,7 +93,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul "message": { "fee_recipient": "0xaa", "gas_limit": "100000", - "timestamp": "1000000", + "timestamp": "1000000", "pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" }, "signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" @@ -115,7 +115,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul "message": { "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "gas_limit": "100000", - "timestamp": "1000000", + "timestamp": "1000000", "pubkey": "0xbbb" }, "signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" @@ -137,7 +137,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul "message": { "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "gas_limit": "100000", - "timestamp": "1000000", + "timestamp": "1000000", "pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" }, "signature": "0xcccc" @@ -159,7 +159,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul "message": { "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "gas_limit": "10000000000000000000000000000000000000000000000000000000", - "timestamp": "1000000", + "timestamp": "1000000", "pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" }, "signature": "0xcccc" @@ -181,7 +181,7 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul "message": { "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", "gas_limit": "1000000", - "timestamp": "10000000000000000000000000000000000000000000000000000000", + "timestamp": "10000000000000000000000000000000000000000000000000000000", "pubkey": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" }, "signature": "0xcccc" @@ -201,3 +201,106 @@ async fn test_register_validators_returns_422_if_request_is_malformed() -> Resul assert_eq!(mock_state.received_register_validator(), 0); Ok(()) } + +#[tokio::test] +async fn test_register_validators_does_not_retry_on_429() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let pbs_port = 4200; + + // Set up mock relay state and override response to 429 + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + mock_state.set_response_override(StatusCode::TOO_MANY_REQUESTS); + + // Run a mock relay + let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?]; + tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); + + // Run the PBS service + let config = to_pbs_config(chain, get_pbs_static_config(pbs_port), relays); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state.clone())); + + // Leave some time to start servers + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending register validator to test 429 response"); + + let registration: ValidatorRegistration = serde_json::from_str( + r#"{ + "message": { + "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "gas_limit": "100000", + "timestamp": "1000000", + "pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + }, + "signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" + }"#, + )?; + + let registrations = vec![registration]; + let res = mock_validator.do_register_custom_validators(registrations).await?; + + // Should only be called once (no retry) + assert_eq!(mock_state.received_register_validator(), 1); + // Expected to return 429 status code + // But it returns `No relay passed register_validator successfully` with 502 + // status code + assert_eq!(res.status(), StatusCode::BAD_GATEWAY); + + Ok(()) +} + +#[tokio::test] +async fn test_register_validators_retries_on_500() -> Result<()> { + setup_test_env(); + let signer = random_secret(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); + + let chain = Chain::Holesky; + let pbs_port = 4300; + + // Set up internal mock relay with 500 response override + let mock_state = Arc::new(MockRelayState::new(chain, signer)); + mock_state.set_response_override(StatusCode::INTERNAL_SERVER_ERROR); // 500 + + let relays = vec![generate_mock_relay(pbs_port + 1, pubkey)?]; + tokio::spawn(start_mock_relay_service(mock_state.clone(), pbs_port + 1)); + + // Set retry limit to 3 + let mut pbs_config = get_pbs_static_config(pbs_port); + pbs_config.register_validator_retry_limit = 3; + + let config = to_pbs_config(chain, pbs_config, relays); + let state = PbsState::new(config); + tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state.clone())); + + tokio::time::sleep(Duration::from_millis(100)).await; + + let mock_validator = MockValidator::new(pbs_port)?; + info!("Sending register validator to test retry on 500"); + + let registration: ValidatorRegistration = serde_json::from_str( + r#"{ + "message": { + "fee_recipient": "0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa", + "gas_limit": "100000", + "timestamp": "1000000", + "pubkey": "0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + }, + "signature": "0xcccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccccc" + }"#, + )?; + + let registrations = vec![registration]; + let _ = mock_validator.do_register_custom_validators(registrations).await; + + // Should retry 3 times (0, 1, 2) → total 3 calls + assert_eq!(mock_state.received_register_validator(), 3); + + Ok(()) +}