Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion crates/common/src/config/pbs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,12 @@ pub struct PbsModuleConfig {
pub endpoint: SocketAddr,
/// Pbs default config
pub pbs_config: Arc<PbsConfig>,
/// List of relays
/// List of default relays
pub relays: Vec<RelayClient>,
/// List of all default relays plus additional relays from muxes (based on
/// URL) DO NOT use this for get_header calls, use `relays` or `muxes`
/// instead
pub all_relays: Vec<RelayClient>,
/// Signer client to call Signer API
pub signer_client: Option<SignerClient>,
/// Event publisher
Expand Down Expand Up @@ -208,12 +212,32 @@ pub async fn load_pbs_config() -> Result<PbsModuleConfig> {
let relay_clients =
config.relays.into_iter().map(RelayClient::new).collect::<Result<Vec<_>>>()?;
let maybe_publiher = BuilderEventPublisher::new_from_env()?;
let mut all_relays = HashMap::with_capacity(relay_clients.len());

if let Some(muxes) = &muxes {
for (_, mux) in muxes.iter() {
for relay in mux.relays.iter() {
all_relays.insert(&relay.config.entry.url, relay.clone());
}
}
}

// insert default relays after to make sure we keep these as defaults,
// this means we override timing games which is ok since this won't be used for
// get_header we also override headers if the same relays has two
// definitions (in muxes and default)
for relay in relay_clients.iter() {
all_relays.insert(&relay.config.entry.url, relay.clone());
}

let all_relays = all_relays.into_values().collect();

Ok(PbsModuleConfig {
chain: config.chain,
endpoint,
pbs_config: Arc::new(config.pbs.pbs_config),
relays: relay_clients,
all_relays,
signer_client: None,
event_publisher: maybe_publiher,
muxes,
Expand Down Expand Up @@ -264,6 +288,25 @@ pub async fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleC
let relay_clients =
cb_config.relays.into_iter().map(RelayClient::new).collect::<Result<Vec<_>>>()?;
let maybe_publiher = BuilderEventPublisher::new_from_env()?;
let mut all_relays = HashMap::with_capacity(relay_clients.len());

if let Some(muxes) = &muxes {
for (_, mux) in muxes.iter() {
for relay in mux.relays.iter() {
all_relays.insert(&relay.config.entry.url, relay.clone());
}
}
}

// insert default relays after to make sure we keep these as defaults,
// this also means we override timing games which is ok since this won't be used
// for get header we also override headers if the same relays has two
// definitions (in muxes and default)
for relay in relay_clients.iter() {
all_relays.insert(&relay.config.entry.url, relay.clone());
}

let all_relays = all_relays.into_values().collect();

let signer_client = if cb_config.pbs.static_config.with_signer {
// if custom pbs requires a signer client, load jwt
Expand All @@ -280,6 +323,7 @@ pub async fn load_pbs_custom_config<T: DeserializeOwned>() -> Result<(PbsModuleC
endpoint,
pbs_config: Arc::new(cb_config.pbs.static_config.pbs_config),
relays: relay_clients,
all_relays,
signer_client,
event_publisher: maybe_publiher,
muxes,
Expand Down
3 changes: 2 additions & 1 deletion crates/common/src/pbs/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ impl<'de> Deserialize<'de> for RelayEntry {
}
}

/// A client to interact with a relay, safe to share across threads
/// A client to interact with a relay, safe to share across threads and cheaply
/// cloneable
#[derive(Debug, Clone)]
pub struct RelayClient {
/// ID of the relay
Expand Down
2 changes: 1 addition & 1 deletion crates/pbs/src/mev_boost/register_validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn register_validator<S: BuilderApiState>(
.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from_str(&utcnow_ms().to_string())?);
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

let relays = state.relays().to_vec();
let relays = state.all_relays().to_vec();
let mut handles = Vec::with_capacity(relays.len());
for relay in relays {
handles.push(tokio::spawn(
Expand Down
2 changes: 1 addition & 1 deletion crates/pbs/src/mev_boost/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub async fn get_status<S: BuilderApiState>(
let mut send_headers = HeaderMap::new();
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

let relays = state.relays();
let relays = state.all_relays();
let mut handles = Vec::with_capacity(relays.len());
for relay in relays {
handles.push(Box::pin(send_relay_check(relay, send_headers.clone())));
Expand Down
2 changes: 1 addition & 1 deletion crates/pbs/src/mev_boost/submit_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ pub async fn submit_block<S: BuilderApiState>(
send_headers.insert(HEADER_START_TIME_UNIX_MS, HeaderValue::from(utcnow_ms()));
send_headers.insert(USER_AGENT, get_user_agent_with_version(&req_headers)?);

let relays = state.relays();
let relays = state.all_relays();
let mut handles = Vec::with_capacity(relays.len());
for relay in relays.iter() {
handles.push(Box::pin(submit_block_with_timeout(
Expand Down
11 changes: 8 additions & 3 deletions crates/pbs/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,13 @@ where
&self.config.pbs_config
}

pub fn relays(&self) -> &[RelayClient] {
&self.config.relays
/// Returns all the relays (including those in muxes)
/// DO NOT use this through the PBS module, use
/// [`PbsState::mux_config_and_relays`] instead
pub fn all_relays(&self) -> &[RelayClient] {
&self.config.all_relays
}

/// Returns the PBS config and relay clients for the given validator pubkey.
/// If the pubkey is not found in any mux, the default configs are
/// returned
Expand All @@ -55,7 +59,8 @@ where
) -> (&PbsConfig, &[RelayClient], Option<&str>) {
match self.config.muxes.as_ref().and_then(|muxes| muxes.get(pubkey)) {
Some(mux) => (&mux.config, mux.relays.as_slice(), Some(&mux.id)),
None => (self.pbs_config(), self.relays(), None),
// return only the default relays if there's no match
None => (self.pbs_config(), &self.config.relays, None),
}
}

Expand Down
37 changes: 29 additions & 8 deletions tests/tests/pbs_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ fn to_pbs_config(chain: Chain, pbs_config: PbsConfig, relays: Vec<RelayClient>)
pbs_config: Arc::new(pbs_config),
signer_client: None,
event_publisher: None,
all_relays: relays.clone(),
relays,
muxes: None,
}
Expand Down Expand Up @@ -204,24 +205,28 @@ async fn test_submit_block_too_large() -> Result<()> {
async fn test_mux() -> Result<()> {
setup_test_env();
let signer = random_secret();
let pubkey_1: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();
let signer_2 = random_secret();
let pubkey_2: BlsPublicKey = blst_pubkey_to_alloy(&signer_2.sk_to_pk()).into();
let pubkey: BlsPublicKey = blst_pubkey_to_alloy(&signer.sk_to_pk()).into();

let chain = Chain::Holesky;
let port = 3600;

let mux_relay = generate_mock_relay(port + 1, *pubkey_1)?;
let relays = vec![mux_relay.clone(), generate_mock_relay(port + 2, *pubkey_2)?];
let mux_relay_1 = generate_mock_relay(port + 1, *pubkey)?;
let mux_relay_2 = generate_mock_relay(port + 2, *pubkey)?;
let default_relay = generate_mock_relay(port + 3, *pubkey)?;

let mock_state = Arc::new(MockRelayState::new(chain, signer));
tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 1));
tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 2));
tokio::spawn(start_mock_relay_service(mock_state.clone(), port + 3));

let relays = vec![default_relay.clone()];
let mut config = to_pbs_config(chain, get_pbs_static_config(port), relays);
config.all_relays = vec![mux_relay_1.clone(), mux_relay_2.clone(), default_relay.clone()];

let mux = RuntimeMuxConfig {
id: String::from("test"),
config: config.pbs_config.clone(),
relays: vec![mux_relay],
relays: vec![mux_relay_1, mux_relay_2],
};

let validator_pubkey = blst_pubkey_to_alloy(&random_secret().sk_to_pk());
Expand All @@ -239,12 +244,28 @@ async fn test_mux() -> Result<()> {
let res = mock_validator.do_get_header(None).await;

assert!(res.is_ok());
assert_eq!(mock_state.received_get_header(), 2); // both relays were used
assert_eq!(mock_state.received_get_header(), 1); // only default relay was used

info!("Sending get header with mux");
let res = mock_validator.do_get_header(Some(validator_pubkey)).await;

assert!(res.is_ok());
assert_eq!(mock_state.received_get_header(), 3); // only one relay was used
assert_eq!(mock_state.received_get_header(), 3); // two mux relays were used

let res = mock_validator.do_get_status().await;

assert!(res.is_ok());
assert_eq!(mock_state.received_get_status(), 3); // default + 2 mux relays were used

let res = mock_validator.do_register_validator().await;

assert!(res.is_ok());
assert_eq!(mock_state.received_register_validator(), 3); // default + 2 mux relays were used

let res = mock_validator.do_submit_block().await;

assert!(res.is_err());
assert_eq!(mock_state.received_submit_block(), 3); // default + 2 mux relays were used

Ok(())
}
Loading