From ba96a2515aaf4e62e7999d4a2a537daf9215c50a Mon Sep 17 00:00:00 2001 From: ltitanb Date: Thu, 9 Jan 2025 16:24:12 +0000 Subject: [PATCH 1/3] fix default relays --- crates/common/src/config/pbs.rs | 43 ++++++++++++++++++- crates/common/src/pbs/relay.rs | 2 +- .../pbs/src/mev_boost/register_validator.rs | 2 +- crates/pbs/src/mev_boost/status.rs | 2 +- crates/pbs/src/mev_boost/submit_block.rs | 8 ++-- crates/pbs/src/state.rs | 10 +++-- tests/tests/pbs_integration.rs | 27 ++++++++---- 7 files changed, 75 insertions(+), 19 deletions(-) diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 1922552f..6a5308bc 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -171,8 +171,11 @@ pub struct PbsModuleConfig { pub endpoint: SocketAddr, /// Pbs default config pub pbs_config: Arc, - /// List of relays + /// List of default relays pub relays: Vec, + /// List of all default relays plus additional relays from muxes (based on URL) + /// DO NOT use this for get_header calls, use `relays` or `muxes` instead + pub all_relays: Vec, /// Signer client to call Signer API pub signer_client: Option, /// Event publisher @@ -208,12 +211,31 @@ pub async fn load_pbs_config() -> Result { let relay_clients = config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env()?; + let mut all_relays = HashMap::with_capacity(relay_clients.len()); + + if let Some(muxes) = &muxes { + for (_, mux) in muxes.iter() { + for relay in mux.relays.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + } + } + + // insert default relays after to make sure we keep these as defaults, + // this means we override timing games which is ok since this won't be used for get_header + // we also override headers if the same relays has two definitions (in muxes and default) + for relay in relay_clients.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + + let all_relays = all_relays.into_values().collect(); Ok(PbsModuleConfig { chain: config.chain, endpoint, pbs_config: Arc::new(config.pbs.pbs_config), relays: relay_clients, + all_relays, signer_client: None, event_publisher: maybe_publiher, muxes, @@ -264,6 +286,24 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC let relay_clients = cb_config.relays.into_iter().map(RelayClient::new).collect::>>()?; let maybe_publiher = BuilderEventPublisher::new_from_env()?; + let mut all_relays = HashMap::with_capacity(relay_clients.len()); + + if let Some(muxes) = &muxes { + for (_, mux) in muxes.iter() { + for relay in mux.relays.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + } + } + + // insert default relays after to make sure we keep these as defaults, + // this also means we override timing games which is ok since this won't be used for get header + // we also override headers if the same relays has two definitions (in muxes and default) + for relay in relay_clients.iter() { + all_relays.insert(&relay.config.entry.url, relay.clone()); + } + + let all_relays = all_relays.into_values().collect(); let signer_client = if cb_config.pbs.static_config.with_signer { // if custom pbs requires a signer client, load jwt @@ -280,6 +320,7 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC endpoint, pbs_config: Arc::new(cb_config.pbs.static_config.pbs_config), relays: relay_clients, + all_relays, signer_client, event_publisher: maybe_publiher, muxes, diff --git a/crates/common/src/pbs/relay.rs b/crates/common/src/pbs/relay.rs index db18466a..88be167b 100644 --- a/crates/common/src/pbs/relay.rs +++ b/crates/common/src/pbs/relay.rs @@ -50,7 +50,7 @@ impl<'de> Deserialize<'de> for RelayEntry { } } -/// A client to interact with a relay, safe to share across threads +/// A client to interact with a relay, safe to share across threads and cheaply cloneable #[derive(Debug, Clone)] pub struct RelayClient { /// ID of the relay diff --git a/crates/pbs/src/mev_boost/register_validator.rs b/crates/pbs/src/mev_boost/register_validator.rs index 9f995a86..6a745319 100644 --- a/crates/pbs/src/mev_boost/register_validator.rs +++ b/crates/pbs/src/mev_boost/register_validator.rs @@ -32,7 +32,7 @@ pub async fn register_validator( .insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.relays().to_vec(); + let relays = state.all_relays().to_vec(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays { handles.push(tokio::spawn( diff --git a/crates/pbs/src/mev_boost/status.rs b/crates/pbs/src/mev_boost/status.rs index 5fdf7db5..7d9d67d2 100644 --- a/crates/pbs/src/mev_boost/status.rs +++ b/crates/pbs/src/mev_boost/status.rs @@ -31,7 +31,7 @@ pub async fn get_status( let mut send_headers = HeaderMap::new(); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.relays(); + let relays = state.all_relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays { handles.push(Box::pin(send_relay_check(relay, send_headers.clone()))); diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 751bf2e4..555a0ee7 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -32,7 +32,7 @@ pub async fn submit_block( send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms())); send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?); - let relays = state.relays(); + let relays = state.all_relays(); let mut handles = Vec::with_capacity(relays.len()); for relay in relays.iter() { handles.push(Box::pin(submit_block_with_timeout( @@ -177,9 +177,9 @@ async fn send_submit_block( if let Some(blobs) = &block_response.data.blobs_bundle { let expected_committments = &signed_blinded_block.message.body.blob_kzg_commitments; - if expected_committments.len() != blobs.blobs.len() || - expected_committments.len() != blobs.commitments.len() || - expected_committments.len() != blobs.proofs.len() + if expected_committments.len() != blobs.blobs.len() + || expected_committments.len() != blobs.commitments.len() + || expected_committments.len() != blobs.proofs.len() { return Err(PbsError::Validation(ValidationError::KzgCommitments { expected_blobs: expected_committments.len(), diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index 9eb06eb6..c57eddca 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -43,9 +43,12 @@ where &self.config.pbs_config } - pub fn relays(&self) -> &[RelayClient] { - &self.config.relays + /// Returns all the relays (including those in muxes) + /// DO NOT use this through the PBS module, use [`PbsState::mux_config_and_relays`] instead + pub fn all_relays(&self) -> &[RelayClient] { + &self.config.all_relays } + /// Returns the PBS config and relay clients for the given validator pubkey. /// If the pubkey is not found in any mux, the default configs are /// returned @@ -55,7 +58,8 @@ where ) -> (&PbsConfig, &[RelayClient], Option<&str>) { match self.config.muxes.as_ref().and_then(|muxes| muxes.get(pubkey)) { Some(mux) => (&mux.config, mux.relays.as_slice(), Some(&mux.id)), - None => (self.pbs_config(), self.relays(), None), + // return only the default relays if there's no match + None => (self.pbs_config(), &self.config.relays, None), } } diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 88dea73c..745b44e5 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -48,6 +48,7 @@ fn to_pbs_config(chain: Chain, pbs_config: PbsConfig, relays: Vec) pbs_config: Arc::new(pbs_config), signer_client: None, event_publisher: None, + all_relays: relays.clone(), relays, muxes: None, } @@ -204,24 +205,28 @@ async fn test_submit_block_too_large() -> Result<()> { async fn test_mux() -> Result<()> { setup_test_env(); let signer = random_secret(); - let pubkey_1: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); - let signer_2 = random_secret(); - let pubkey_2: BlsPublicKey = blst_pubkey_to_alloy(&signer_2.sk_to_pk()).into(); + let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into(); let chain = Chain::Holesky; let port = 3600; - let mux_relay = generate_mock_relay(port + 1, *pubkey_1)?; - let relays = vec![mux_relay.clone(), generate_mock_relay(port + 2, *pubkey_2)?]; + let mux_relay_1 = generate_mock_relay(port + 1, *pubkey)?; + let mux_relay_2 = generate_mock_relay(port + 2, *pubkey)?; + let default_relay = generate_mock_relay(port + 3, *pubkey)?; + let mock_state = Arc::new(MockRelayState::new(chain, signer)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1)); tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2)); + tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 3)); + let relays = vec![default_relay.clone()]; let mut config = to_pbs_config(chain, get_pbs_static_config(port), relays); + config.all_relays = vec![mux_relay_1.clone(), mux_relay_2.clone(), default_relay.clone()]; + let mux = RuntimeMuxConfig { id: String::from("test"), config: config.pbs_config.clone(), - relays: vec![mux_relay], + relays: vec![mux_relay_1, mux_relay_2], }; let validator_pubkey = blst_pubkey_to_alloy(&random_secret().sk_to_pk()); @@ -239,12 +244,18 @@ async fn test_mux() -> Result<()> { let res = mock_validator.do_get_header(None).await; assert!(res.is_ok()); - assert_eq!(mock_state.received_get_header(), 2); // both relays were used + assert_eq!(mock_state.received_get_header(), 1); // only default relay was used info!("Sending get header with mux"); let res = mock_validator.do_get_header(Some(validator_pubkey)).await; assert!(res.is_ok()); - assert_eq!(mock_state.received_get_header(), 3); // only one relay was used + assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used + + let res = mock_validator.do_get_status().await; + + assert!(res.is_ok()); + assert_eq!(mock_state.received_get_status(), 3); // default + 2 mux relays were used + Ok(()) } From be4249a1b2534f0c7f85904b2d4e7a6d2222640e Mon Sep 17 00:00:00 2001 From: ltitanb Date: Thu, 9 Jan 2025 16:27:45 +0000 Subject: [PATCH 2/3] add others in test --- tests/tests/pbs_integration.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/tests/pbs_integration.rs b/tests/tests/pbs_integration.rs index 745b44e5..cc8f5d27 100644 --- a/tests/tests/pbs_integration.rs +++ b/tests/tests/pbs_integration.rs @@ -257,5 +257,15 @@ async fn test_mux() -> Result<()> { assert!(res.is_ok()); assert_eq!(mock_state.received_get_status(), 3); // default + 2 mux relays were used + let res = mock_validator.do_register_validator().await; + + assert!(res.is_ok()); + assert_eq!(mock_state.received_register_validator(), 3); // default + 2 mux relays were used + + let res = mock_validator.do_submit_block().await; + + assert!(res.is_err()); + assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used + Ok(()) } From acc5932e33424db57a8d8f4620c6d928de641407 Mon Sep 17 00:00:00 2001 From: ltitanb Date: Thu, 9 Jan 2025 17:54:34 +0000 Subject: [PATCH 3/3] fmt --- crates/common/src/config/pbs.rs | 15 +++++++++------ crates/common/src/pbs/relay.rs | 3 ++- crates/pbs/src/mev_boost/submit_block.rs | 6 +++--- crates/pbs/src/state.rs | 3 ++- 4 files changed, 16 insertions(+), 11 deletions(-) diff --git a/crates/common/src/config/pbs.rs b/crates/common/src/config/pbs.rs index 6a5308bc..5e42b6c6 100644 --- a/crates/common/src/config/pbs.rs +++ b/crates/common/src/config/pbs.rs @@ -173,8 +173,9 @@ pub struct PbsModuleConfig { pub pbs_config: Arc, /// List of default relays pub relays: Vec, - /// List of all default relays plus additional relays from muxes (based on URL) - /// DO NOT use this for get_header calls, use `relays` or `muxes` instead + /// List of all default relays plus additional relays from muxes (based on + /// URL) DO NOT use this for get_header calls, use `relays` or `muxes` + /// instead pub all_relays: Vec, /// Signer client to call Signer API pub signer_client: Option, @@ -222,8 +223,9 @@ pub async fn load_pbs_config() -> Result { } // insert default relays after to make sure we keep these as defaults, - // this means we override timing games which is ok since this won't be used for get_header - // we also override headers if the same relays has two definitions (in muxes and default) + // this means we override timing games which is ok since this won't be used for + // get_header we also override headers if the same relays has two + // definitions (in muxes and default) for relay in relay_clients.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); } @@ -297,8 +299,9 @@ pub async fn load_pbs_custom_config() -> Result<(PbsModuleC } // insert default relays after to make sure we keep these as defaults, - // this also means we override timing games which is ok since this won't be used for get header - // we also override headers if the same relays has two definitions (in muxes and default) + // this also means we override timing games which is ok since this won't be used + // for get header we also override headers if the same relays has two + // definitions (in muxes and default) for relay in relay_clients.iter() { all_relays.insert(&relay.config.entry.url, relay.clone()); } diff --git a/crates/common/src/pbs/relay.rs b/crates/common/src/pbs/relay.rs index 88be167b..c74a6516 100644 --- a/crates/common/src/pbs/relay.rs +++ b/crates/common/src/pbs/relay.rs @@ -50,7 +50,8 @@ impl<'de> Deserialize<'de> for RelayEntry { } } -/// A client to interact with a relay, safe to share across threads and cheaply cloneable +/// A client to interact with a relay, safe to share across threads and cheaply +/// cloneable #[derive(Debug, Clone)] pub struct RelayClient { /// ID of the relay diff --git a/crates/pbs/src/mev_boost/submit_block.rs b/crates/pbs/src/mev_boost/submit_block.rs index 555a0ee7..0d03816b 100644 --- a/crates/pbs/src/mev_boost/submit_block.rs +++ b/crates/pbs/src/mev_boost/submit_block.rs @@ -177,9 +177,9 @@ async fn send_submit_block( if let Some(blobs) = &block_response.data.blobs_bundle { let expected_committments = &signed_blinded_block.message.body.blob_kzg_commitments; - if expected_committments.len() != blobs.blobs.len() - || expected_committments.len() != blobs.commitments.len() - || expected_committments.len() != blobs.proofs.len() + if expected_committments.len() != blobs.blobs.len() || + expected_committments.len() != blobs.commitments.len() || + expected_committments.len() != blobs.proofs.len() { return Err(PbsError::Validation(ValidationError::KzgCommitments { expected_blobs: expected_committments.len(), diff --git a/crates/pbs/src/state.rs b/crates/pbs/src/state.rs index c57eddca..6b3f15c9 100644 --- a/crates/pbs/src/state.rs +++ b/crates/pbs/src/state.rs @@ -44,7 +44,8 @@ where } /// Returns all the relays (including those in muxes) - /// DO NOT use this through the PBS module, use [`PbsState::mux_config_and_relays`] instead + /// DO NOT use this through the PBS module, use + /// [`PbsState::mux_config_and_relays`] instead pub fn all_relays(&self) -> &[RelayClient] { &self.config.all_relays }