Skip to content
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ ethereum_ssz_derive = "0.8"
eyre = "0.6.12"
futures = "0.3.30"
headers = "0.4.0"
headers-accept = "0.2.1"
indexmap = "2.2.6"
jsonwebtoken = { version = "9.3.1", default-features = false }
lazy_static = "1.5.0"
lh_eth2_keystore = { package = "eth2_keystore", git = "https://github.com/sigp/lighthouse", tag = "v7.1.0" }
lh_types = { package = "types", git = "https://github.com/sigp/lighthouse", tag = "v7.1.0" }
mediatype = "0.20.0"
parking_lot = "0.12.3"
pbkdf2 = "0.12.2"
prometheus = "0.13.4"
Expand Down
2 changes: 2 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@ ethereum_ssz.workspace = true
ethereum_ssz_derive.workspace = true
eyre.workspace = true
futures.workspace = true
headers-accept.workspace = true
jsonwebtoken.workspace = true
lh_eth2_keystore.workspace = true
lh_types.workspace = true
mediatype.workspace = true
pbkdf2.workspace = true
rand.workspace = true
rayon.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/common/src/signer/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,7 @@ mod test {
.join(consensus_signer.pubkey().to_string())
.join("TEST_MODULE")
.join("bls")
.join(format!("{}.sig", proxy_signer.pubkey().to_string()))
.join(format!("{}.sig", proxy_signer.pubkey()))
)
.unwrap()
)
Expand Down
168 changes: 166 additions & 2 deletions crates/common/src/utils.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
#[cfg(test)]
use std::cell::Cell;
use std::{
fmt::Display,
net::Ipv4Addr,
str::FromStr,
time::{SystemTime, UNIX_EPOCH},
};

use alloy::{hex, primitives::U256};
use axum::http::HeaderValue;
use axum::{
extract::{FromRequest, Request},
http::HeaderValue,
response::{IntoResponse, Response as AxumResponse},
};
use bytes::Bytes;
use futures::StreamExt;
use headers_accept::Accept;
pub use lh_types::ForkName;
use lh_types::test_utils::{SeedableRng, TestRandom, XorShiftRng};
use rand::{Rng, distr::Alphanumeric};
use reqwest::{Response, header::HeaderMap};
use reqwest::{
Response,
header::{ACCEPT, CONTENT_TYPE, HeaderMap},
};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use ssz::{Decode, Encode};
Expand All @@ -30,7 +42,12 @@ use crate::{
types::{BlsPublicKey, Chain, Jwt, JwtClaims, ModuleId},
};

const APPLICATION_JSON: &str = "application/json";
const APPLICATION_OCTET_STREAM: &str = "application/octet-stream";
const WILDCARD: &str = "*/*";

const MILLIS_PER_SECOND: u64 = 1_000;
pub const CONSENSUS_VERSION_HEADER: &str = "Eth-Consensus-Version";

#[derive(Debug, Error)]
pub enum ResponseReadError {
Expand Down Expand Up @@ -408,6 +425,153 @@ pub fn get_user_agent_with_version(req_headers: &HeaderMap) -> eyre::Result<Head
Ok(HeaderValue::from_str(&format!("commit-boost/{HEADER_VERSION_VALUE} {ua}"))?)
}

/// Parse the ACCEPT header to get the type of response to encode the body with,
/// defaulting to JSON if missing. Returns an error if malformed or unsupported
/// types are requested. Supports requests with multiple ACCEPT headers or
/// headers with multiple media types.
pub fn get_accept_type(req_headers: &HeaderMap) -> eyre::Result<EncodingType> {
let accept = Accept::from_str(
req_headers.get(ACCEPT).and_then(|value| value.to_str().ok()).unwrap_or(APPLICATION_JSON),
)
.map_err(|e| eyre::eyre!("invalid accept header: {e}"))?;

if accept.media_types().count() == 0 {
// No valid media types found, default to JSON
return Ok(EncodingType::Json);
}

// Get the SSZ and JSON media types if present
let mut ssz_type = false;
let mut json_type = false;
let mut unsupported_type = false;
accept.media_types().for_each(|mt| match mt.essence().to_string().as_str() {
APPLICATION_OCTET_STREAM => ssz_type = true,
APPLICATION_JSON | WILDCARD => json_type = true,
_ => unsupported_type = true,
});

// If SSZ is present, prioritize it
if ssz_type {
return Ok(EncodingType::Ssz);
}
// If there aren't any unsupported types, use JSON
if !unsupported_type {
return Ok(EncodingType::Json);
}
Err(eyre::eyre!("unsupported accept type"))
}

/// Parse CONTENT TYPE header to get the encoding type of the body, defaulting
/// to JSON if missing or malformed.
pub fn get_content_type(req_headers: &HeaderMap) -> EncodingType {
EncodingType::from_str(
req_headers
.get(CONTENT_TYPE)
.and_then(|value| value.to_str().ok())
.unwrap_or(APPLICATION_JSON),
)
.unwrap_or(EncodingType::Json)
}

/// Parse CONSENSUS_VERSION header
pub fn get_consensus_version_header(req_headers: &HeaderMap) -> Option<ForkName> {
ForkName::from_str(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woth double checking this is not case sensitive

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just realized we have our own ForkName, we could also import it from lighthouse

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For case sensitivity, it's defined the way the spec defines it (https://github.com/ethereum/beacon-APIs/blob/672e03e25ace85a3bacaea553fbf374f4f844435/apis/beacon/blocks/blocks.yaml#L21) but Rust will convert everything to lower case regardless.

For ForkName, done in 86fa858.

req_headers
.get(CONSENSUS_VERSION_HEADER)
.and_then(|value| value.to_str().ok())
.unwrap_or(""),
)
.ok()
}

/// Enum for types that can be used to encode incoming request bodies or
/// outgoing response bodies
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum EncodingType {
/// Body is UTF-8 encoded as JSON
Json,

/// Body is raw bytes representing an SSZ object
Ssz,
}

impl std::fmt::Display for EncodingType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
EncodingType::Json => write!(f, "application/json"),
EncodingType::Ssz => write!(f, "application/octet-stream"),
}
}
}

impl FromStr for EncodingType {
type Err = String;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value {
"application/json" | "" => Ok(EncodingType::Json),
"application/octet-stream" => Ok(EncodingType::Ssz),
_ => Err(format!("unsupported encoding type: {value}")),
}
}
}

pub enum BodyDeserializeError {
SerdeJsonError(serde_json::Error),
SszDecodeError(ssz::DecodeError),
UnsupportedMediaType,
}

impl Display for BodyDeserializeError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
BodyDeserializeError::SerdeJsonError(e) => write!(f, "JSON deserialization error: {e}"),
BodyDeserializeError::SszDecodeError(e) => {
write!(f, "SSZ deserialization error: {e:?}")
}
BodyDeserializeError::UnsupportedMediaType => write!(f, "unsupported media type"),
}
}
}

pub async fn deserialize_body<T>(
headers: &HeaderMap,
body: Bytes,
) -> Result<T, BodyDeserializeError>
where
T: serde::de::DeserializeOwned + ssz::Decode + 'static,
{
if headers.contains_key(CONTENT_TYPE) {
return match get_content_type(headers) {
EncodingType::Json => {
serde_json::from_slice::<T>(&body).map_err(BodyDeserializeError::SerdeJsonError)
}
EncodingType::Ssz => {
T::from_ssz_bytes(&body).map_err(BodyDeserializeError::SszDecodeError)
}
};
}

Err(BodyDeserializeError::UnsupportedMediaType)
}

#[must_use]
#[derive(Debug, Clone, Default)]
pub struct RawRequest {
pub body_bytes: Bytes,
}

impl<S> FromRequest<S> for RawRequest
where
S: Send + Sync,
{
type Rejection = AxumResponse;

async fn from_request(req: Request, _state: &S) -> Result<Self, Self::Rejection> {
let bytes = Bytes::from_request(req, _state).await.map_err(IntoResponse::into_response)?;
Ok(Self { body_bytes: bytes })
}
}

#[cfg(unix)]
pub async fn wait_for_signal() -> eyre::Result<()> {
use tokio::signal::unix::{SignalKind, signal};
Expand Down
1 change: 1 addition & 0 deletions crates/pbs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ axum.workspace = true
axum-extra.workspace = true
cb-common.workspace = true
cb-metrics.workspace = true
ethereum_ssz.workspace = true
eyre.workspace = true
futures.workspace = true
lazy_static.workspace = true
Expand Down
3 changes: 3 additions & 0 deletions crates/pbs/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub enum PbsClientError {
NoResponse,
NoPayload,
Internal,
DecodeError(String),
}

impl PbsClientError {
Expand All @@ -14,6 +15,7 @@ impl PbsClientError {
PbsClientError::NoResponse => StatusCode::BAD_GATEWAY,
PbsClientError::NoPayload => StatusCode::BAD_GATEWAY,
PbsClientError::Internal => StatusCode::INTERNAL_SERVER_ERROR,
PbsClientError::DecodeError(_) => StatusCode::BAD_REQUEST,
}
}
}
Expand All @@ -24,6 +26,7 @@ impl IntoResponse for PbsClientError {
PbsClientError::NoResponse => "no response from relays".to_string(),
PbsClientError::NoPayload => "no payload from relays".to_string(),
PbsClientError::Internal => "internal server error".to_string(),
PbsClientError::DecodeError(e) => format!("error decoding request: {e}"),
};

(self.status_code(), msg).into_response()
Expand Down
48 changes: 42 additions & 6 deletions crates/pbs/src/routes/get_header.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
use alloy::primitives::utils::format_ether;
use axum::{
extract::{Path, State},
http::HeaderMap,
http::{HeaderMap, HeaderValue},
response::IntoResponse,
};
use cb_common::{
pbs::GetHeaderParams,
utils::{get_user_agent, ms_into_slot},
pbs::{GetHeaderParams, VersionedResponse},
utils::{
CONSENSUS_VERSION_HEADER, EncodingType, get_accept_type, get_user_agent, ms_into_slot,
},
};
use reqwest::StatusCode;
use reqwest::{StatusCode, header::CONTENT_TYPE};
use ssz::Encode;
use tracing::{error, info};

use crate::{
Expand All @@ -32,16 +35,49 @@ pub async fn handle_get_header<S: BuilderApiState, A: BuilderApi<S>>(

let ua = get_user_agent(&req_headers);
let ms_into_slot = ms_into_slot(params.slot, state.config.chain);
let accept_type = get_accept_type(&req_headers).map_err(|e| {
error!(%e, "error parsing accept header");
PbsClientError::DecodeError(format!("error parsing accept header: {e}"))
});
if let Err(e) = accept_type {
return Ok((StatusCode::BAD_REQUEST, e).into_response());
}
let accept_type = accept_type.unwrap();

info!(ua, ms_into_slot, "new request");

match A::get_header(params, req_headers, state.clone()).await {
Ok(res) => {
if let Some(max_bid) = res {
info!(value_eth = format_ether(max_bid.value()), block_hash =% max_bid.block_hash(), "received header");

BEACON_NODE_STATUS.with_label_values(&["200", GET_HEADER_ENDPOINT_TAG]).inc();
Ok((StatusCode::OK, axum::Json(max_bid)).into_response())
let response = match accept_type {
EncodingType::Ssz => {
let mut res = match &max_bid {
VersionedResponse::Electra(max_bid) => {
(StatusCode::OK, max_bid.as_ssz_bytes()).into_response()
}
};
let Ok(consensus_version_header) = HeaderValue::from_str(max_bid.version())
else {
info!("sending response as JSON");
return Ok((StatusCode::OK, axum::Json(max_bid)).into_response());
};
let Ok(content_type_header) =
HeaderValue::from_str(&format!("{}", EncodingType::Ssz))
else {
info!("sending response as JSON");
return Ok((StatusCode::OK, axum::Json(max_bid)).into_response());
};
res.headers_mut()
.insert(CONSENSUS_VERSION_HEADER, consensus_version_header);
res.headers_mut().insert(CONTENT_TYPE, content_type_header);
info!("sending response as SSZ");
res
}
EncodingType::Json => (StatusCode::OK, axum::Json(max_bid)).into_response(),
};
Ok(response)
} else {
// spec: return 204 if request is valid but no bid available
info!("no header available for slot");
Expand Down
Loading
Loading