diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..1de56593 --- /dev/null +++ b/.dockerignore @@ -0,0 +1 @@ +target \ No newline at end of file diff --git a/config.example.toml b/config.example.toml index ae0b0cdf..bfd1add3 100644 --- a/config.example.toml +++ b/config.example.toml @@ -2,7 +2,6 @@ chain = "Holesky" [pbs] port = 18550 -relays = [] relay_check = true timeout_get_header_ms = 950 timeout_get_payload_ms = 4000 @@ -10,8 +9,15 @@ timeout_register_validator_ms = 3000 skip_sigverify = true min_bid_eth = 0.0 -[headers] -X-MyCustomHeader = "MyCustomValue" +late_in_slot_time_ms = 2000 + +[[relays]] +id = "example-relay" +url = "http://0xa1cec75a3f0661e99299274182938151e8433c61a19222347ea1313d839229cb4ce4e3e5aa2bdeb71c8fcf1b084963c2@abc.xyz" +headers = { X-MyCustomHeader = "MyCustomValue" } +enable_timing_games = false +target_first_request_ms = 200 +frequency_get_header_ms = 300 [signer] [signer.loader] diff --git a/crates/common/src/commit/client.rs b/crates/common/src/commit/client.rs index 9b6f7c27..b452fa4e 100644 --- a/crates/common/src/commit/client.rs +++ b/crates/common/src/commit/client.rs @@ -21,7 +21,7 @@ pub struct GetPubkeysResponse { #[derive(Debug, Clone)] pub struct SignerClient { /// Url endpoint of the Signer Module - url: Arc, + url: Arc, client: reqwest::Client, } diff --git a/crates/common/src/config.rs b/crates/common/src/config.rs index e037dd87..dd8aad42 100644 --- a/crates/common/src/config.rs +++ b/crates/common/src/config.rs @@ -1,11 +1,15 @@ use std::{collections::HashMap, sync::Arc}; -use alloy::primitives::U256; use eyre::{eyre, ContextCompat}; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use super::utils::as_eth_str; -use crate::{commit::client::SignerClient, loader::SignerLoader, pbs::RelayEntry, types::Chain}; +use crate::{ + commit::client::SignerClient, + loader::SignerLoader, + pbs::{PbsConfig, RelayClient, RelayConfig}, + types::Chain, + utils::default_bool, +}; pub const MODULE_ID_ENV: &str = "CB_MODULE_ID"; pub const MODULE_JWT_ENV: &str = "CB_SIGNER_JWT"; @@ -32,6 +36,7 @@ pub const SIGNER_IMAGE: &str = "commitboost_signer"; pub struct CommitBoostConfig { // TODO: generalize this with a spec file pub chain: Chain, + pub relays: Vec, pub pbs: StaticPbsConfig, pub modules: Option>, pub signer: Option, @@ -136,55 +141,23 @@ pub struct StaticPbsConfig { pub with_signer: bool, } -#[derive(Debug, Clone, Default, Deserialize, Serialize)] -pub struct PbsConfig { - /// Port to receive BuilderAPI calls from CL - pub port: u16, - /// Which relay to register/subscribe to - pub relays: Vec, - /// Whether to forward getStatus to relays or skip it - pub relay_check: bool, - #[serde(default = "default_u64::<950>")] - pub timeout_get_header_ms: u64, - #[serde(default = "default_u64::<4000>")] - pub timeout_get_payload_ms: u64, - #[serde(default = "default_u64::<3000>")] - pub timeout_register_validator_ms: u64, - /// Whether to skip the relay signature verification - #[serde(default = "default_bool::")] - pub skip_sigverify: bool, - /// Minimum bid that will be accepted from get_header - #[serde(rename = "min_bid_eth", with = "as_eth_str", default = "default_u256")] - pub min_bid_wei: U256, - /// Custom headers to send to relays - pub headers: Option>, -} - /// Runtime config for the pbs module with support for custom extra config +/// This will be shared across threads, so the `extra` should be thread safe, +/// e.g. wrapped in an Arc #[derive(Debug, Clone)] pub struct PbsModuleConfig { /// Chain spec pub chain: Chain, /// Pbs default config pub pbs_config: Arc, + /// List of relays + pub relays: Vec, /// Signer client to call Signer API pub signer_client: Option, /// Opaque module config pub extra: T, } -const fn default_u64() -> u64 { - U -} - -const fn default_bool() -> bool { - U -} - -const fn default_u256() -> U256 { - U256::ZERO -} - fn default_pbs() -> String { PBS_DEFAULT_IMAGE.to_string() } @@ -192,9 +165,12 @@ fn default_pbs() -> String { /// Loads the default pbs config, i.e. with no signer client or custom data pub fn load_pbs_config() -> eyre::Result> { let config = CommitBoostConfig::from_env_path(); + let relay_clients = config.relays.into_iter().map(RelayClient::new).collect(); + Ok(PbsModuleConfig { chain: config.chain, pbs_config: Arc::new(config.pbs.pbs_config), + relays: relay_clients, signer_client: None, extra: (), }) @@ -213,11 +189,13 @@ pub fn load_pbs_custom_config() -> eyre::Result { chain: Chain, + relays: Vec, pbs: CustomPbsConfig, } // load module config including the extra data (if any) let cb_config: StubConfig = load_file_from_env(CB_CONFIG_ENV); + let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect(); let signer_client = if cb_config.pbs.static_config.with_signer { // if custom pbs requires a signer client, load jwt @@ -231,6 +209,7 @@ pub fn load_pbs_custom_config() -> eyre::Result, + /// Relay in the form of pubkey@url + #[serde(rename = "url")] + pub entry: RelayEntry, + /// Optional headers to send with each request + pub headers: Option>, + /// Whether to enable timing games + #[serde(default = "default_bool::")] + pub enable_timing_games: bool, + /// Target time in slot when to send the first header request + pub target_first_request_ms: Option, + /// Frequency in ms to send get_header requests + pub frequency_get_header_ms: Option, +} + +#[derive(Debug, Clone, Default, Deserialize, Serialize)] +pub struct PbsConfig { + /// Port to receive BuilderAPI calls from beacon node + pub port: u16, + /// Whether to forward `get_status`` to relays or skip it + pub relay_check: bool, + /// Timeout for get_header request in milliseconds + #[serde(default = "default_u64::<{ DefaultTimeout::GET_HEADER_MS }>")] + pub timeout_get_header_ms: u64, + /// Timeout for get_payload request in milliseconds + #[serde(default = "default_u64::<{ DefaultTimeout::GET_PAYLOAD_MS }>")] + pub timeout_get_payload_ms: u64, + /// Timeout for register_validator request in milliseconds + #[serde(default = "default_u64::<{ DefaultTimeout::REGISTER_VALIDATOR_MS }>")] + pub timeout_register_validator_ms: u64, + /// Whether to skip the relay signature verification + #[serde(default = "default_bool::")] + pub skip_sigverify: bool, + /// Minimum bid that will be accepted from get_header + #[serde(rename = "min_bid_eth", with = "as_eth_str", default = "default_u256")] + pub min_bid_wei: U256, + /// How late in the slot we consider to be "late" + #[serde(default = "default_u64::")] + pub late_in_slot_time_ms: u64, +} diff --git a/crates/common/src/pbs/constants.rs b/crates/common/src/pbs/constants.rs index ce3f0306..80f68a18 100644 --- a/crates/common/src/pbs/constants.rs +++ b/crates/common/src/pbs/constants.rs @@ -14,3 +14,13 @@ pub const HEADER_START_TIME_UNIX_MS: &str = "X-MEVBoost-StartTimeUnixMS"; pub const BUILDER_EVENTS_PATH: &str = "/events"; pub const DEFAULT_PBS_JWT_KEY: &str = "DEFAULT_PBS"; + +#[non_exhaustive] +pub struct DefaultTimeout; +impl DefaultTimeout { + pub const GET_HEADER_MS: u64 = 950; + pub const GET_PAYLOAD_MS: u64 = 4000; + pub const REGISTER_VALIDATOR_MS: u64 = 3000; +} + +pub const LATE_IN_SLOT_TIME_MS: u64 = 2000; diff --git a/crates/common/src/pbs/mod.rs b/crates/common/src/pbs/mod.rs index 174564b0..d4b0a907 100644 --- a/crates/common/src/pbs/mod.rs +++ b/crates/common/src/pbs/mod.rs @@ -1,5 +1,7 @@ +mod config; mod constants; mod types; +pub use config::*; pub use constants::*; -pub use types::RelayEntry; +pub use types::*; diff --git a/crates/common/src/pbs/types.rs b/crates/common/src/pbs/types.rs index b41c59aa..f770afd3 100644 --- a/crates/common/src/pbs/types.rs +++ b/crates/common/src/pbs/types.rs @@ -1,24 +1,99 @@ +use std::{str::FromStr, sync::Arc}; + use alloy::{ primitives::{hex::FromHex, B256}, rpc::types::beacon::BlsPublicKey, }; +use reqwest::header::{HeaderMap, HeaderName, HeaderValue}; use serde::{Deserialize, Serialize}; use url::Url; -use super::constants::{ - BULDER_API_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, +use super::{ + constants::{BULDER_API_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH}, + RelayConfig, HEADER_VERSION_KEY, HEAVER_VERSION_VALUE, }; - +use crate::DEFAULT_REQUEST_TIMEOUT; +/// A parsed entry of the relay url in the format: scheme://pubkey@host #[derive(Debug, Default, Clone)] pub struct RelayEntry { + /// Default if of the relay, the hostname of the url pub id: String, + /// Public key of the relay pub pubkey: BlsPublicKey, + /// Full url of the relay pub url: String, } -impl RelayEntry { - fn get_url(&self, path: &str) -> String { - format!("{}{path}", &self.url) +impl Serialize for RelayEntry { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + serializer.serialize_str(&self.url) + } +} + +impl<'de> Deserialize<'de> for RelayEntry { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let str = String::deserialize(deserializer)?; + let url = Url::parse(&str).map_err(serde::de::Error::custom)?; + let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?; + let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string(); + + Ok(RelayEntry { pubkey, url: str, id }) + } +} + +/// A client to interact with a relay, safe to share across threads +#[derive(Debug, Clone)] +pub struct RelayClient { + /// ID of the relay + pub id: Arc, + /// HTTP client to send requests + pub client: reqwest::Client, + /// Configuration of the relay + pub config: Arc, +} + +impl RelayClient { + pub fn new(config: RelayConfig) -> Self { + let mut headers = HeaderMap::new(); + headers.insert(HEADER_VERSION_KEY, HeaderValue::from_static(HEAVER_VERSION_VALUE)); + + if let Some(custom_headers) = &config.headers { + for (key, value) in custom_headers { + headers.insert( + HeaderName::from_str(key) + .unwrap_or_else(|_| panic!("{key} is an invalid header name")), + HeaderValue::from_str(value) + .unwrap_or_else(|_| panic!("{key} has an invalid header value")), + ); + } + } + + let client = reqwest::Client::builder() + .default_headers(headers) + .timeout(DEFAULT_REQUEST_TIMEOUT) + .build() + .expect("failed to build relay client"); + + Self { + id: Arc::new(config.id.clone().unwrap_or(config.entry.id.clone())), + client, + config: Arc::new(config), + } + } + + pub fn pubkey(&self) -> BlsPublicKey { + self.config.entry.pubkey + } + + // URL builders + pub fn get_url(&self, path: &str) -> String { + format!("{}{path}", &self.config.entry.url) } pub fn get_header_url( @@ -43,29 +118,6 @@ impl RelayEntry { } } -impl Serialize for RelayEntry { - fn serialize(&self, serializer: S) -> Result - where - S: serde::Serializer, - { - serializer.serialize_str(&self.url) - } -} - -impl<'de> Deserialize<'de> for RelayEntry { - fn deserialize(deserializer: D) -> Result - where - D: serde::Deserializer<'de>, - { - let str = String::deserialize(deserializer)?; - let url = Url::parse(&str).map_err(serde::de::Error::custom)?; - let pubkey = BlsPublicKey::from_hex(url.username()).map_err(serde::de::Error::custom)?; - let id = url.host().ok_or(serde::de::Error::custom("missing host"))?.to_string(); - - Ok(RelayEntry { pubkey, url: str, id }) - } -} - #[cfg(test)] mod tests { use alloy::{primitives::hex::FromHex, rpc::types::beacon::BlsPublicKey}; diff --git a/crates/common/src/types.rs b/crates/common/src/types.rs index 043782ab..6767164b 100644 --- a/crates/common/src/types.rs +++ b/crates/common/src/types.rs @@ -7,7 +7,7 @@ use crate::constants::{ RHEA_BUILDER_DOMAIN, RHEA_FORK_VERSION, RHEA_GENESIS_TIME_SECONDS, }; -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] +#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] pub enum Chain { Mainnet, Holesky, diff --git a/crates/common/src/utils.rs b/crates/common/src/utils.rs index 1ffb70d2..6cb17cfc 100644 --- a/crates/common/src/utils.rs +++ b/crates/common/src/utils.rs @@ -16,9 +16,12 @@ const MILLIS_PER_SECOND: u64 = 1_000; pub fn timestamp_of_slot_start_millis(slot: u64, chain: Chain) -> u64 { let seconds_since_genesis = chain.genesis_time_sec() + slot * SECONDS_PER_SLOT; - seconds_since_genesis * MILLIS_PER_SECOND } +pub fn ms_into_slot(slot: u64, chain: Chain) -> u64 { + let slot_start_ms = timestamp_of_slot_start_millis(slot, chain); + utcnow_ms().saturating_sub(slot_start_ms) +} /// Seconds pub fn utcnow_sec() -> u64 { @@ -94,6 +97,18 @@ pub mod as_eth_str { } } +pub const fn default_u64() -> u64 { + U +} + +pub const fn default_bool() -> bool { + U +} + +pub const fn default_u256() -> U256 { + U256::ZERO +} + // LOGGING // TODO: more customized logging + logging guard pub fn initialize_tracing_log() { diff --git a/crates/pbs/src/constants.rs b/crates/pbs/src/constants.rs index 050be538..d0d4d8d6 100644 --- a/crates/pbs/src/constants.rs +++ b/crates/pbs/src/constants.rs @@ -2,3 +2,7 @@ pub(crate) const STATUS_ENDPOINT_TAG: &str = "status"; pub(crate) const REGISTER_VALIDATOR_ENDPOINT_TAG: &str = "register_validator"; pub(crate) const SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG: &str = "submit_blinded_block"; pub(crate) const GET_HEADER_ENDPOINT_TAG: &str = "get_header"; + +/// For metrics recorded when a request times out +pub(crate) const TIMEOUT_ERROR_CODE: u16 = 555; +pub(crate) const TIMEOUT_ERROR_CODE_STR: &str = "555"; diff --git a/crates/pbs/src/error.rs b/crates/pbs/src/error.rs index 9b2c2a07..79216676 100644 --- a/crates/pbs/src/error.rs +++ b/crates/pbs/src/error.rs @@ -43,13 +43,19 @@ pub enum PbsError { #[error("serde decode error: {0}")] SerdeDecodeError(#[from] serde_json::Error), - #[error("relay response error. Code: {code}, text: {error_msg}")] + #[error("relay response error. Code: {code}, err: {error_msg}")] RelayResponse { error_msg: String, code: u16 }, #[error("failed validating relay response: {0}")] Validation(#[from] ValidationError), } +impl PbsError { + pub fn is_timeout(&self) -> bool { + matches!(self, PbsError::Reqwest(err) if err.is_timeout()) + } +} + #[derive(Debug, Error, PartialEq, Eq)] pub enum ValidationError { #[error("empty blockhash")] diff --git a/crates/pbs/src/mev_boost/get_header.rs b/crates/pbs/src/mev_boost/get_header.rs index 49b86bca..8e711f85 100644 --- a/crates/pbs/src/mev_boost/get_header.rs +++ b/crates/pbs/src/mev_boost/get_header.rs @@ -1,4 +1,4 @@ -use std::{ops::Mul, sync::Arc, time::Duration}; +use std::time::{Duration, Instant}; use alloy::{ primitives::{utils::format_ether, B256, U256}, @@ -6,18 +6,18 @@ use alloy::{ }; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - config::PbsConfig, - pbs::{RelayEntry, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, + pbs::{PbsConfig, RelayClient, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, signature::verify_signed_builder_message, types::Chain, - utils::{get_user_agent, utcnow_ms}, + utils::{get_user_agent, ms_into_slot, utcnow_ms}, }; use futures::future::join_all; use reqwest::{header::USER_AGENT, StatusCode}; -use tracing::{debug, error}; +use tokio::time::sleep; +use tracing::{debug, error, warn, Instrument}; use crate::{ - constants::GET_HEADER_ENDPOINT_TAG, + constants::{GET_HEADER_ENDPOINT_TAG, TIMEOUT_ERROR_CODE, TIMEOUT_ERROR_CODE_STR}, error::{PbsError, ValidationError}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -32,14 +32,27 @@ pub async fn get_header( req_headers: HeaderMap, state: PbsState, ) -> eyre::Result> { - let GetHeaderParams { slot, parent_hash, pubkey: validator_pubkey } = params; - let slot_uuid = state.get_or_update_slot_uuid(slot); + let ms_into_slot = ms_into_slot(params.slot, state.config.chain); + let max_timeout_ms = state + .pbs_config() + .timeout_get_header_ms + .min(state.pbs_config().late_in_slot_time_ms.saturating_sub(ms_into_slot)); + + if max_timeout_ms == 0 { + warn!( + ms_into_slot, + threshold = state.pbs_config().late_in_slot_time_ms, + "late in slot, skipping relay requests" + ); + + return Ok(None) + } + + let (_, slot_uuid) = state.get_slot_and_uuid(); - // prepare headers + // prepare headers, except for start time which is set in `send_one_get_header` let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); - send_headers - .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -47,55 +60,176 @@ pub async fn get_header( let relays = state.relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { - handles.push(send_get_header( - send_headers.clone(), - slot, - parent_hash, - validator_pubkey, + handles.push(send_timed_get_header( + params, relay.clone(), state.config.chain, - state.config.pbs_config.clone(), - state.relay_client(), + state.pbs_config(), + send_headers.clone(), + ms_into_slot, + max_timeout_ms, )); } let results = join_all(handles).await; let mut relay_bids = Vec::with_capacity(relays.len()); for (i, res) in results.into_iter().enumerate() { - let relay_id = relays[i].id.clone(); + let relay_id = relays[i].id.as_ref(); match res { Ok(Some(res)) => relay_bids.push(res), Ok(_) => {} + Err(err) if err.is_timeout() => error!(err = "Timed Out", relay_id), Err(err) => error!(?err, relay_id), } } - Ok(state.add_bids(slot, relay_bids)) + Ok(state.add_bids(params.slot, relay_bids)) } -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] -async fn send_get_header( - headers: HeaderMap, - slot: u64, - parent_hash: B256, - validator_pubkey: BlsPublicKey, - relay: RelayEntry, +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] +async fn send_timed_get_header( + params: GetHeaderParams, + relay: RelayClient, chain: Chain, - config: Arc, - client: reqwest::Client, + pbs_config: &PbsConfig, + headers: HeaderMap, + ms_into_slot: u64, + mut timeout_left_ms: u64, ) -> Result, PbsError> { - let url = relay.get_header_url(slot, parent_hash, validator_pubkey); + let url = relay.get_header_url(params.slot, params.parent_hash, params.pubkey); + + if relay.config.enable_timing_games { + if let Some(target_ms) = relay.config.target_first_request_ms { + // sleep until target time in slot + + let delay = target_ms.saturating_sub(ms_into_slot); + if delay > 0 { + debug!(target_ms, ms_into_slot, "TG: waiting to send first header request"); + timeout_left_ms = timeout_left_ms.saturating_sub(delay); + sleep(Duration::from_millis(delay)).await; + } else { + debug!(target_ms, ms_into_slot, "TG: request already late enough in slot"); + } + } + + if let Some(send_freq_ms) = relay.config.frequency_get_header_ms { + let mut handles = Vec::new(); + + debug!(send_freq_ms, timeout_left_ms, "TG: sending multiple header requests"); + + loop { + handles.push(tokio::spawn( + send_one_get_header( + url.clone(), + params, + relay.clone(), + chain, + headers.clone(), + timeout_left_ms, + pbs_config.skip_sigverify, + pbs_config.min_bid_wei, + ) + .in_current_span(), + )); + + if timeout_left_ms > send_freq_ms { + // enough time for one more + timeout_left_ms = timeout_left_ms.saturating_sub(send_freq_ms); + sleep(Duration::from_millis(send_freq_ms)).await; + } else { + break; + } + } + + let results = join_all(handles).await; + let mut n_headers = 0; + + if let Some((_, maybe_header)) = results + .into_iter() + .filter_map(|res| { + // ignore join error and timeouts, log other errors + res.ok().and_then(|inner_res| match inner_res { + Ok(maybe_header) => { + n_headers += 1; + Some(maybe_header) + } + Err(err) if err.is_timeout() => None, + Err(err) => { + error!(?err, "TG: error sending header request"); + None + } + }) + }) + .max_by_key(|(start_time, _)| *start_time) + { + debug!(n_headers, "TG: received headers from relay"); + return Ok(maybe_header) + } else { + // all requests failed + warn!("TG: no headers received"); + + return Err(PbsError::RelayResponse { + error_msg: "no headers received".to_string(), + code: TIMEOUT_ERROR_CODE, + }) + } + } + } + + // if no timing games or no repeated send, just send one request + send_one_get_header( + url, + params, + relay, + chain, + headers, + timeout_left_ms, + pbs_config.skip_sigverify, + pbs_config.min_bid_wei, + ) + .await + .map(|(_, maybe_header)| maybe_header) +} - let timer = - RELAY_LATENCY.with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id]).start_timer(); - let res = client +async fn send_one_get_header( + url: String, + params: GetHeaderParams, + relay: RelayClient, + chain: Chain, + mut headers: HeaderMap, + timeout_ms: u64, + skip_sigverify: bool, + min_bid_wei: U256, +) -> Result<(u64, Option), PbsError> { + // the timestamp in the header is the consensus block time which is fixed, + // use the beginning of the request as proxy to make sure we use only the + // last one received + let start_request_time = utcnow_ms(); + headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(start_request_time)); + + let start_request = Instant::now(); + let res = match relay + .client .get(url) - .timeout(Duration::from_millis(config.timeout_get_header_ms)) + .timeout(Duration::from_millis(timeout_ms)) .headers(headers) .send() - .await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[TIMEOUT_ERROR_CODE_STR, GET_HEADER_ENDPOINT_TAG, &relay.id]) + .inc(); + return Err(err.into()); + } + }; + + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[GET_HEADER_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), GET_HEADER_ENDPOINT_TAG, &relay.id]).inc(); @@ -111,18 +245,17 @@ async fn send_get_header( if code == StatusCode::NO_CONTENT { debug!( ?code, - latency_ms, + latency = ?request_latency, response = ?response_bytes, "no header from relay" ); - return Ok(None) + return Ok((start_request_time, None)) } let get_header_response: GetHeaderReponse = serde_json::from_slice(&response_bytes)?; debug!( - ?code, - latency_ms, + latency = ?request_latency, block_hash = %get_header_response.block_hash(), value_eth = format_ether(get_header_response.value()), "received new header" @@ -131,25 +264,25 @@ async fn send_get_header( validate_header( &get_header_response.data, chain, - &relay, - parent_hash, - config.skip_sigverify, - config.min_bid_wei, + relay.pubkey(), + params.parent_hash, + skip_sigverify, + min_bid_wei, )?; - Ok(Some(get_header_response)) + Ok((start_request_time, Some(get_header_response))) } fn validate_header( signed_header: &SignedExecutionPayloadHeader, chain: Chain, - relay: &RelayEntry, + expected_relay_pubkey: BlsPublicKey, parent_hash: B256, skip_sig_verify: bool, minimum_bid_wei: U256, ) -> Result<(), ValidationError> { let block_hash = signed_header.message.header.block_hash; - let relay_pubkey = signed_header.message.pubkey; + let received_relay_pubkey = signed_header.message.pubkey; let tx_root = signed_header.message.header.transactions_root; let value = signed_header.message.value(); @@ -172,14 +305,17 @@ fn validate_header( return Err(ValidationError::BidTooLow { min: minimum_bid_wei, got: value }); } - if relay.pubkey != relay_pubkey { - return Err(ValidationError::PubkeyMismatch { expected: relay.pubkey, got: relay_pubkey }) + if expected_relay_pubkey != received_relay_pubkey { + return Err(ValidationError::PubkeyMismatch { + expected: expected_relay_pubkey, + got: received_relay_pubkey, + }) } if !skip_sig_verify { verify_signed_builder_message( chain, - &relay_pubkey, + &received_relay_pubkey, &signed_header.message, &signed_header.signature, ) @@ -196,7 +332,7 @@ mod tests { rpc::types::beacon::BlsPublicKey, }; use blst::min_pk; - use cb_common::{pbs::RelayEntry, signature::sign_builder_message, types::Chain}; + use cb_common::{signature::sign_builder_message, types::Chain}; use super::validate_header; use crate::{ @@ -207,23 +343,44 @@ mod tests { #[test] fn test_validate_header() { let mut mock_header = SignedExecutionPayloadHeader::default(); - let mut mock_relay = RelayEntry::default(); + let parent_hash = B256::from_slice(&[1; 32]); let chain = Chain::Holesky; let min_bid = U256::ZERO; + let secret_key = min_pk::SecretKey::from_bytes(&[ + 0, 136, 227, 100, 165, 57, 106, 129, 181, 15, 235, 189, 200, 120, 70, 99, 251, 144, + 137, 181, 230, 124, 189, 193, 115, 153, 26, 0, 197, 135, 103, 63, + ]) + .unwrap(); + let pubkey = BlsPublicKey::from_slice(&secret_key.sk_to_pk().to_bytes()); + mock_header.message.header.transactions_root = alloy::primitives::FixedBytes(EMPTY_TX_ROOT_HASH); assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::EmptyBlockhash) ); mock_header.message.header.block_hash.0[1] = 1; assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::ParentHashMismatch { expected: parent_hash, got: B256::default() @@ -233,46 +390,55 @@ mod tests { mock_header.message.header.parent_hash = parent_hash; assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::EmptyTxRoot) ); mock_header.message.header.transactions_root = Default::default(); assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::BidTooLow { min: min_bid, got: U256::ZERO }) ); mock_header.message.set_value(U256::from(1)); - let secret_key = min_pk::SecretKey::from_bytes(&[ - 0, 136, 227, 100, 165, 57, 106, 129, 181, 15, 235, 189, 200, 120, 70, 99, 251, 144, - 137, 181, 230, 124, 189, 193, 115, 153, 26, 0, 197, 135, 103, 63, - ]) - .unwrap(); - let pubkey = BlsPublicKey::from_slice(&secret_key.sk_to_pk().to_bytes()); mock_header.message.pubkey = pubkey; assert_eq!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header( + &mock_header, + chain, + BlsPublicKey::default(), + parent_hash, + false, + min_bid + ), Err(ValidationError::PubkeyMismatch { expected: BlsPublicKey::default(), got: pubkey }) ); - mock_relay.pubkey = pubkey; - assert!(matches!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid), + validate_header(&mock_header, chain, pubkey, parent_hash, false, min_bid), Err(ValidationError::Sigverify(_)) )); - assert!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, true, min_bid).is_ok() - ); + assert!(validate_header(&mock_header, chain, pubkey, parent_hash, true, min_bid).is_ok()); mock_header.signature = sign_builder_message(chain, &secret_key, &mock_header.message); - assert!( - validate_header(&mock_header, chain, &mock_relay, parent_hash, false, min_bid).is_ok() - ) + assert!(validate_header(&mock_header, chain, pubkey, parent_hash, false, min_bid).is_ok()) } } diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 98e63fbb..7b8c3d93 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -1,9 +1,9 @@ -use std::{ops::Mul, time::Duration}; +use std::time::{Duration, Instant}; use alloy::rpc::types::beacon::relay::ValidatorRegistration; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - pbs::{RelayEntry, HEADER_START_TIME_UNIX_MS}, + pbs::{RelayClient, HEADER_START_TIME_UNIX_MS}, utils::{get_user_agent, utcnow_ms}, }; use eyre::bail; @@ -12,7 +12,7 @@ use reqwest::header::USER_AGENT; use tracing::{debug, error}; use crate::{ - constants::REGISTER_VALIDATOR_ENDPOINT_TAG, + constants::{REGISTER_VALIDATOR_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, error::PbsError, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -37,11 +37,10 @@ pub async fn register_validator( let mut handles = Vec::with_capacity(relays.len()); for relay in relays { handles.push(send_register_validator( - send_headers.clone(), - relay.clone(), registrations.clone(), - state.config.pbs_config.timeout_register_validator_ms, - state.relay_client(), + relay, + send_headers.clone(), + state.pbs_config().timeout_register_validator_ms, )); } @@ -54,27 +53,41 @@ pub async fn register_validator( } } -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] async fn send_register_validator( - headers: HeaderMap, - relay: RelayEntry, registrations: Vec, + relay: &RelayClient, + headers: HeaderMap, timeout_ms: u64, - client: reqwest::Client, ) -> Result<(), PbsError> { let url = relay.register_validator_url(); - let timer = RELAY_LATENCY - .with_label_values(&[REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id]) - .start_timer(); - let res = client + let start_request = Instant::now(); + let res = match relay + .client .post(url) .timeout(Duration::from_millis(timeout_ms)) .headers(headers) .json(®istrations) .send() - .await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[ + TIMEOUT_ERROR_CODE_STR, + REGISTER_VALIDATOR_ENDPOINT_TAG, + &relay.id, + ]) + .inc(); + return Err(err.into()); + } + }; + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[REGISTER_VALIDATOR_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE @@ -93,7 +106,7 @@ async fn send_register_validator( return Err(err); }; - debug!(?code, latency_ms, "registration successful"); + debug!(?code, latency = ?request_latency, "registration successful"); Ok(()) } diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 5f71359d..463198c7 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -1,13 +1,13 @@ -use std::{ops::Mul, time::Duration}; +use std::time::{Duration, Instant}; use axum::http::{HeaderMap, HeaderValue}; -use cb_common::{pbs::RelayEntry, utils::get_user_agent}; +use cb_common::{pbs::RelayClient, utils::get_user_agent}; use futures::future::select_ok; use reqwest::header::USER_AGENT; use tracing::{debug, error}; use crate::{ - constants::STATUS_ENDPOINT_TAG, + constants::{STATUS_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, error::PbsError, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -33,11 +33,7 @@ pub async fn get_status( let relays = state.relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays { - handles.push(Box::pin(send_relay_check( - send_headers.clone(), - relay.clone(), - state.relay_client(), - ))); + handles.push(Box::pin(send_relay_check(relay, send_headers.clone()))); } // return ok if at least one relay returns 200 @@ -49,17 +45,31 @@ pub async fn get_status( } } -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] -async fn send_relay_check( - headers: HeaderMap, - relay: RelayEntry, - client: reqwest::Client, -) -> Result<(), PbsError> { +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] +async fn send_relay_check(relay: &RelayClient, headers: HeaderMap) -> Result<(), PbsError> { let url = relay.get_status_url(); - let timer = RELAY_LATENCY.with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]).start_timer(); - let res = client.get(url).timeout(Duration::from_secs(30)).headers(headers).send().await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + let start_request = Instant::now(); + let res = match relay + .client + .get(url) + .timeout(Duration::from_secs(30)) + .headers(headers) + .send() + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[TIMEOUT_ERROR_CODE_STR, STATUS_ENDPOINT_TAG, &relay.id]) + .inc(); + return Err(err.into()) + } + }; + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[STATUS_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE.with_label_values(&[code.as_str(), STATUS_ENDPOINT_TAG, &relay.id]).inc(); @@ -75,7 +85,7 @@ async fn send_relay_check( return Err(err) }; - debug!(?code, latency_ms, "status passed"); + debug!(?code, latency = ?request_latency, "status passed"); Ok(()) } diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index eb49ece5..853eccb6 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -1,17 +1,16 @@ -use std::{ops::Mul, sync::Arc, time::Duration}; +use std::time::{Duration, Instant}; use axum::http::{HeaderMap, HeaderValue}; use cb_common::{ - config::PbsConfig, - pbs::{RelayEntry, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, + pbs::{RelayClient, HEADER_SLOT_UUID_KEY, HEADER_START_TIME_UNIX_MS}, utils::{get_user_agent, utcnow_ms}, }; use futures::future::select_ok; use reqwest::header::USER_AGENT; -use tracing::{debug, error}; +use tracing::{debug, warn}; use crate::{ - constants::SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, + constants::{SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, TIMEOUT_ERROR_CODE_STR}, error::{PbsError, ValidationError}, metrics::{RELAY_LATENCY, RELAY_STATUS_CODE}, state::{BuilderApiState, PbsState}, @@ -29,8 +28,7 @@ pub async fn submit_block( // prepare headers let mut send_headers = HeaderMap::new(); send_headers.insert(HEADER_SLOT_UUID_KEY, HeaderValue::from_str(&slot_uuid.to_string())?); - send_headers - .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); + send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms())); if let Some(ua) = get_user_agent(&req_headers) { send_headers.insert(USER_AGENT, HeaderValue::from_str(&ua)?); } @@ -39,11 +37,10 @@ pub async fn submit_block( let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { handles.push(Box::pin(send_submit_block( - send_headers.clone(), - relay.clone(), &signed_blinded_block, - state.config.pbs_config.clone(), - state.relay_client(), + relay, + send_headers.clone(), + state.config.pbs_config.timeout_get_payload_ms, ))); } @@ -56,27 +53,41 @@ pub async fn submit_block( // submits blinded signed block and expects the execution payload + blobs bundle // back -#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id))] +#[tracing::instrument(skip_all, name = "handler", fields(relay_id = relay.id.as_ref()))] async fn send_submit_block( - headers: HeaderMap, - relay: RelayEntry, signed_blinded_block: &SignedBlindedBeaconBlock, - config: Arc, - client: reqwest::Client, + relay: &RelayClient, + headers: HeaderMap, + timeout_ms: u64, ) -> Result { let url = relay.submit_block_url(); - let timer = RELAY_LATENCY - .with_label_values(&[SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) - .start_timer(); - let res = client + let start_request = Instant::now(); + let res = match relay + .client .post(url) - .timeout(Duration::from_millis(config.timeout_get_payload_ms)) + .timeout(Duration::from_millis(timeout_ms)) .headers(headers) .json(&signed_blinded_block) .send() - .await?; - let latency_ms = timer.stop_and_record().mul(1000.0).ceil() as u64; + .await + { + Ok(res) => res, + Err(err) => { + RELAY_STATUS_CODE + .with_label_values(&[ + TIMEOUT_ERROR_CODE_STR, + SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, + &relay.id, + ]) + .inc(); + return Err(err.into()) + } + }; + let request_latency = start_request.elapsed(); + RELAY_LATENCY + .with_label_values(&[SUBMIT_BLINDED_BLOCK_ENDPOINT_TAG, &relay.id]) + .observe(request_latency.as_secs_f64()); let code = res.status(); RELAY_STATUS_CODE @@ -90,15 +101,15 @@ async fn send_submit_block( code: code.as_u16(), }; - error!(?err, "failed submit block"); + // we request payload to all relays, but some may have not received it + warn!(?err, "failed to get payload (this might be ok if other relays have it)"); return Err(err) }; let block_response: SubmitBlindedBlockResponse = serde_json::from_slice(&response_bytes)?; debug!( - ?code, - latency_ms, + latency = ?request_latency, block_hash = %block_response.block_hash(), "received unblinded block" ); diff --git a/crates/pbs/src/routes/get_header.rs b/crates/pbs/src/routes/get_header.rs index 082e43a5..1691a6b5 100644 --- a/crates/pbs/src/routes/get_header.rs +++ b/crates/pbs/src/routes/get_header.rs @@ -4,7 +4,7 @@ use axum::{ http::HeaderMap, response::IntoResponse, }; -use cb_common::utils::{get_user_agent, timestamp_of_slot_start_millis, utcnow_ms}; +use cb_common::utils::{get_user_agent, ms_into_slot}; use reqwest::StatusCode; use tracing::{error, info}; use uuid::Uuid; @@ -25,12 +25,12 @@ pub async fn handle_get_header>( Path(params): Path, ) -> Result { state.publish_event(BuilderEvent::GetHeaderRequest(params)); + state.get_or_update_slot_uuid(params.slot); - let now = utcnow_ms(); - let slot_start_ms = timestamp_of_slot_start_millis(params.slot, state.config.chain); let ua = get_user_agent(&req_headers); + let ms_into_slot = ms_into_slot(params.slot, state.config.chain); - info!(?ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot=now.saturating_sub(slot_start_ms)); + info!(?ua, parent_hash=%params.parent_hash, validator_pubkey=%params.pubkey, ms_into_slot); match T::get_header(params, req_headers, state.clone()).await { Ok(res) => { diff --git a/crates/pbs/src/routes/router.rs b/crates/pbs/src/routes/router.rs index df67ed4f..13af21c6 100644 --- a/crates/pbs/src/routes/router.rs +++ b/crates/pbs/src/routes/router.rs @@ -1,14 +1,10 @@ use axum::{ - extract::Request, - http::{Response, StatusCode}, - middleware::{map_request, map_response}, routing::{get, post}, Router, }; use cb_common::pbs::{ BULDER_API_PATH, GET_HEADER_PATH, GET_STATUS_PATH, REGISTER_VALIDATOR_PATH, SUBMIT_BLOCK_PATH, }; -use tracing::debug; use super::{handle_get_header, handle_get_status, handle_register_validator, handle_submit_block}; use crate::{ @@ -31,24 +27,5 @@ pub fn create_app_router>(state: PbsState StatusCode { - StatusCode::NOT_FOUND -} - -// TODO: remove -async fn log_all_responses(response: Response) -> Response { - debug!("RECEIVED RESPONSE: {response:?}"); - response -} - -// TODO: remove -async fn log_all_requests(request: Request) -> Request { - debug!("SENDING REQUEST: {request:?}"); - request + app.with_state(state) } diff --git a/crates/pbs/src/routes/submit_block.rs b/crates/pbs/src/routes/submit_block.rs index 4c7c17fe..49360826 100644 --- a/crates/pbs/src/routes/submit_block.rs +++ b/crates/pbs/src/routes/submit_block.rs @@ -51,8 +51,8 @@ pub async fn handle_submit_block>( let fault_relays = state .relays() .iter() - .filter(|relay| fault_pubkeys.contains(&relay.pubkey)) - .map(|relay| relay.id.clone()) + .filter(|relay| fault_pubkeys.contains(&relay.pubkey())) + .map(|relay| &**relay.id) .collect::>() .join(","); diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index c270ace4..9daa1dcb 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -1,15 +1,13 @@ use std::{ collections::HashSet, - str::FromStr, + fmt, sync::{Arc, Mutex}, }; use alloy::{primitives::B256, rpc::types::beacon::BlsPublicKey}; -use axum::http::{HeaderMap, HeaderName, HeaderValue}; use cb_common::{ - config::{PbsConfig, PbsModuleConfig}, - pbs::{RelayEntry, HEADER_VERSION_KEY, HEAVER_VERSION_VALUE}, - DEFAULT_REQUEST_TIMEOUT, + config::PbsModuleConfig, + pbs::{PbsConfig, RelayClient}, }; use dashmap::DashMap; use tokio::sync::broadcast; @@ -17,7 +15,7 @@ use uuid::Uuid; use crate::{types::GetHeaderReponse, BuilderEvent}; -pub trait BuilderApiState: std::fmt::Debug + Default + Clone + Sync + Send + 'static {} +pub trait BuilderApiState: fmt::Debug + Default + Clone + Sync + Send + 'static {} impl BuilderApiState for () {} pub type BuilderEventReceiver = broadcast::Receiver; @@ -28,11 +26,9 @@ pub type BuilderEventReceiver = broadcast::Receiver; #[derive(Debug, Clone)] pub struct PbsState { /// Config data for the Pbs service - pub config: Arc>, + pub config: PbsModuleConfig, /// Opaque extra data for library use pub data: S, - /// Relay client to reuse across requests - relay_client: reqwest::Client, /// Pubsliher for builder events event_publisher: broadcast::Sender, /// Info about the latest slot and its uuid @@ -48,28 +44,9 @@ where pub fn new(config: PbsModuleConfig) -> Self { let (tx, _) = broadcast::channel(10); - let mut headers = HeaderMap::new(); - headers.insert(HEADER_VERSION_KEY, HeaderValue::from_static(HEAVER_VERSION_VALUE)); - - if let Some(custom_headers) = &config.pbs_config.headers { - for (key, value) in custom_headers.iter() { - headers.insert( - HeaderName::from_str(key).expect("invalid header name"), - HeaderValue::from_str(&value).expect("invalid header value"), - ); - } - } - - let relay_client = reqwest::Client::builder() - .default_headers(headers) - .timeout(DEFAULT_REQUEST_TIMEOUT) - .build() - .expect("failed to build relay client"); - Self { - config: Arc::new(config), + config, data: S::default(), - relay_client, event_publisher: tx, current_slot_info: Arc::new(Mutex::new((0, Uuid::default()))), bid_cache: Arc::new(DashMap::new()), @@ -109,17 +86,12 @@ where pub fn pbs_config(&self) -> &PbsConfig { &self.config.pbs_config } - pub fn relays(&self) -> &[RelayEntry] { - &self.pbs_config().relays - } - pub fn relay_client(&self) -> reqwest::Client { - self.relay_client.clone() + pub fn relays(&self) -> &[RelayClient] { + &self.config.relays } /// Add some bids to the cache, the bids are all assumed to be for the /// provided slot Returns the bid with the max value - /// TODO: this doesnt handle cancellations if we call multiple times - /// get_header pub fn add_bids(&self, slot: u64, bids: Vec) -> Option { let mut slot_entry = self.bid_cache.entry(slot).or_default(); slot_entry.extend(bids); diff --git a/tests/src/mock_validator.rs b/tests/src/mock_validator.rs index 75e78369..67d332d2 100644 --- a/tests/src/mock_validator.rs +++ b/tests/src/mock_validator.rs @@ -1,33 +1,25 @@ -use std::net::SocketAddr; - -use alloy::primitives::B256; -use alloy::rpc::types::beacon::{relay::ValidatorRegistration, BlsPublicKey}; -use cb_common::pbs::RelayEntry; +use alloy::{ + primitives::B256, + rpc::types::beacon::{relay::ValidatorRegistration, BlsPublicKey}, +}; +use cb_common::pbs::RelayClient; use cb_pbs::{GetHeaderReponse, SignedBlindedBeaconBlock}; use reqwest::Error; +use crate::utils::generate_mock_relay; + pub struct MockValidator { - comm_boost: RelayEntry, - client: reqwest::Client, + comm_boost: RelayClient, } impl MockValidator { - pub fn new(address: SocketAddr) -> Self { - let client = reqwest::Client::new(); - - Self { - comm_boost: RelayEntry { - id: "".to_owned(), - pubkey: BlsPublicKey::ZERO, - url: format!("http://{address}"), - }, - client, - } + pub fn new(port: u16) -> Self { + Self { comm_boost: generate_mock_relay(port, BlsPublicKey::default()) } } pub async fn do_get_header(&self) -> Result<(), Error> { let url = self.comm_boost.get_header_url(0, B256::ZERO, BlsPublicKey::ZERO); - let res = self.client.get(url).send().await?.bytes().await?; + let res = self.comm_boost.client.get(url).send().await?.bytes().await?; assert!(serde_json::from_slice::(&res).is_ok()); Ok(()) @@ -35,7 +27,7 @@ impl MockValidator { pub async fn do_get_status(&self) -> Result<(), Error> { let url = self.comm_boost.get_status_url(); - let _res = self.client.get(url).send().await?; + let _res = self.comm_boost.client.get(url).send().await?; // assert!(res.status().is_success()); Ok(()) @@ -46,7 +38,8 @@ impl MockValidator { let registration: Vec = vec![]; - self.client + self.comm_boost + .client .post(url) .header("Content-Type", "application/json") .body(serde_json::to_string(®istration).unwrap()) @@ -62,7 +55,8 @@ impl MockValidator { let signed_blinded_block = SignedBlindedBeaconBlock::default(); - self.client + self.comm_boost + .client .post(url) .header("Content-Type", "application/json") .body(serde_json::to_string(&signed_blinded_block).unwrap()) diff --git a/tests/src/utils.rs b/tests/src/utils.rs index 7b04140c..b1975bac 100644 --- a/tests/src/utils.rs +++ b/tests/src/utils.rs @@ -1,5 +1,8 @@ use std::sync::Once; +use alloy::rpc::types::beacon::BlsPublicKey; +use cb_common::pbs::{RelayClient, RelayConfig, RelayEntry}; + pub fn get_local_address(port: u16) -> String { format!("http://0.0.0.0:{port}") } @@ -10,3 +13,9 @@ pub fn setup_test_env() { tracing_subscriber::fmt().with_max_level(tracing::Level::DEBUG).init(); }); } + +pub fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> RelayClient { + let entry = RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port) }; + let config = RelayConfig { entry, ..RelayConfig::default() }; + RelayClient::new(config) +} diff --git a/tests/tests/config.rs b/tests/tests/config.rs new file mode 100644 index 00000000..090ff103 --- /dev/null +++ b/tests/tests/config.rs @@ -0,0 +1,10 @@ +use cb_common::{config::CommitBoostConfig, types::Chain}; + +#[tokio::test] +async fn test_load_config() { + let config = CommitBoostConfig::from_file("../config.example.toml"); + + assert_eq!(config.chain, Chain::Holesky); + assert!(config.relays[0].headers.is_some()) + // TODO: add more +} diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 329899ed..72e4873e 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -1,10 +1,9 @@ -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, sync::Arc, time::Duration, u64}; use alloy::primitives::U256; -use alloy::rpc::types::beacon::BlsPublicKey; use cb_common::{ - config::{PbsConfig, PbsModuleConfig}, - pbs::RelayEntry, + config::PbsModuleConfig, + pbs::{PbsConfig, RelayClient}, signer::Signer, types::Chain, }; @@ -12,15 +11,11 @@ use cb_pbs::{DefaultBuilderApi, PbsService, PbsState}; use cb_tests::{ mock_relay::{mock_relay_app_router, MockRelayState}, mock_validator::MockValidator, - utils::{get_local_address, setup_test_env}, + utils::{generate_mock_relay, setup_test_env}, }; use tokio::net::TcpListener; use tracing::info; -fn generate_mock_relay(port: u16, pubkey: BlsPublicKey) -> RelayEntry { - RelayEntry { id: format!("mock_{port}"), pubkey, url: get_local_address(port) } -} - async fn start_mock_relay_service(state: Arc, port: u16) { let app = mock_relay_app_router(state); @@ -31,22 +26,31 @@ async fn start_mock_relay_service(state: Arc, port: u16) { axum::serve(listener, app).await.unwrap(); } -fn get_pbs_static_config(port: u16, relays: Vec) -> PbsConfig { +fn get_pbs_static_config(port: u16) -> PbsConfig { PbsConfig { port, - relays, relay_check: true, timeout_get_header_ms: u64::MAX, timeout_get_payload_ms: u64::MAX, timeout_register_validator_ms: u64::MAX, skip_sigverify: false, min_bid_wei: U256::ZERO, - headers: None, + late_in_slot_time_ms: u64::MAX, } } -fn to_pbs_config(chain: Chain, pbs_config: PbsConfig) -> PbsModuleConfig<()> { - PbsModuleConfig { chain, pbs_config: Arc::new(pbs_config), signer_client: None, extra: () } +fn to_pbs_config( + chain: Chain, + pbs_config: PbsConfig, + relays: Vec, +) -> PbsModuleConfig<()> { + PbsModuleConfig { + chain, + pbs_config: Arc::new(pbs_config), + signer_client: None, + extra: (), + relays, + } } #[tokio::test] @@ -61,15 +65,14 @@ async fn test_get_header() { let mock_state = Arc::new(MockRelayState::new(chain, signer, 0)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - let config = to_pbs_config(chain, get_pbs_static_config(port, vec![mock_relay])); + let config = to_pbs_config(chain, get_pbs_static_config(port), vec![mock_relay]); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending get header"); let res = mock_validator.do_get_header().await; @@ -93,15 +96,14 @@ async fn test_get_status() { tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2)); - let config = to_pbs_config(chain, get_pbs_static_config(port, relays)); + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending get status"); let res = mock_validator.do_get_status().await; @@ -121,15 +123,14 @@ async fn test_register_validators() { let mock_state = Arc::new(MockRelayState::new(chain, signer, 0)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - let config = to_pbs_config(chain, get_pbs_static_config(port, relays)); + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending register validator"); let res = mock_validator.do_register_validator().await; @@ -149,15 +150,14 @@ async fn test_submit_block() { let mock_state = Arc::new(MockRelayState::new(chain, signer, 0)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); - let config = to_pbs_config(chain, get_pbs_static_config(port, relays)); + let config = to_pbs_config(chain, get_pbs_static_config(port), relays); let state = PbsState::new(config); tokio::spawn(PbsService::run::<(), DefaultBuilderApi>(state)); // leave some time to start servers tokio::time::sleep(Duration::from_millis(100)).await; - let address = format!("0.0.0.0:{port}").parse().unwrap(); - let mock_validator = MockValidator::new(address); + let mock_validator = MockValidator::new(port); info!("Sending submit block"); let res = mock_validator.do_submit_block().await;