diff --git a/Cargo.lock b/Cargo.lock index b2e23a26f7e7..2ea7933e2a42 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4870,6 +4870,7 @@ version = "0.1.0" dependencies = [ "jsonrpsee-types", "lru 0.9.0", + "reth-interfaces", "reth-network-api", "reth-primitives", "reth-rlp", diff --git a/crates/rpc/rpc-api/src/engine.rs b/crates/rpc/rpc-api/src/engine.rs index e30da5f3d35b..c8fb3b642db0 100644 --- a/crates/rpc/rpc-api/src/engine.rs +++ b/crates/rpc/rpc-api/src/engine.rs @@ -1,8 +1,8 @@ use jsonrpsee::{core::RpcResult as Result, proc_macros::rpc}; use reth_primitives::{BlockHash, BlockNumber, H64}; use reth_rpc_types::engine::{ - ExecutionPayload, ExecutionPayloadBody, ForkchoiceState, ForkchoiceUpdated, PayloadAttributes, - PayloadStatus, TransitionConfiguration, + ExecutionPayload, ExecutionPayloadBodies, ForkchoiceState, ForkchoiceUpdated, + PayloadAttributes, PayloadStatus, TransitionConfiguration, }; #[cfg_attr(not(feature = "client"), rpc(server))] @@ -50,7 +50,7 @@ pub trait EngineApi { async fn get_payload_bodies_by_hash_v1( &self, block_hashes: Vec, - ) -> Result>; + ) -> Result; /// See also #[method(name = "engine_getPayloadBodiesByRangeV1")] @@ -58,7 +58,7 @@ pub trait EngineApi { &self, start: BlockNumber, count: u64, - ) -> Result>; + ) -> Result; /// See also #[method(name = "engine_exchangeTransitionConfigurationV1")] diff --git a/crates/rpc/rpc-engine-api/src/engine_api.rs b/crates/rpc/rpc-engine-api/src/engine_api.rs index 0b8dd4e8f59f..2fc4d789fb4f 100644 --- a/crates/rpc/rpc-engine-api/src/engine_api.rs +++ b/crates/rpc/rpc-engine-api/src/engine_api.rs @@ -1,4 +1,4 @@ -use crate::{EngineApiError, EngineApiMessage, EngineApiResult}; +use crate::{message::EngineApiMessageVersion, EngineApiError, EngineApiMessage, EngineApiResult}; use futures::StreamExt; use reth_executor::{ executor, @@ -7,16 +7,16 @@ use reth_executor::{ use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{ proofs::{self, EMPTY_LIST_HASH}, - ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned, H64, U256, + BlockHash, BlockId, BlockNumber, ChainSpec, Hardfork, Header, SealedBlock, TransactionSigned, + H64, U256, }; use reth_provider::{BlockProvider, HeaderProvider, StateProvider}; use reth_rlp::Decodable; use reth_rpc_types::engine::{ - ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, PayloadStatusEnum, - TransitionConfiguration, + ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, + PayloadStatusEnum, TransitionConfiguration, }; use std::{ - collections::HashMap, future::Future, pin::Pin, sync::Arc, @@ -28,6 +28,9 @@ use tokio_stream::wrappers::UnboundedReceiverStream; /// The Engine API response sender pub type EngineApiSender = oneshot::Sender>; +/// The upper limit for payload bodies request. +const MAX_PAYLOAD_BODIES_LIMIT: u64 = 1024; + /// The Engine API implementation that grants the Consensus layer access to data and /// functions in the Execution layer that are crucial for the consensus process. #[must_use = "EngineApi does nothing unless polled."] @@ -37,9 +40,8 @@ pub struct EngineApi { chain_spec: ChainSpec, message_rx: UnboundedReceiverStream, forkchoice_state_tx: watch::Sender, - // TODO: Placeholder for storing future blocks. Make cache bounded. - // Use [lru](https://crates.io/crates/lru) crate - local_store: HashMap, + // TODO: Placeholder for storing future blocks. Make cache bounded. Use lru + // local_store: HashMap, // remote_store: HashMap, } @@ -49,10 +51,35 @@ impl EngineApi { EngineApiMessage::GetPayload(payload_id, tx) => { let _ = tx.send(self.get_payload(payload_id).ok_or(EngineApiError::PayloadUnknown)); } - EngineApiMessage::NewPayload(payload, tx) => { + EngineApiMessage::GetPayloadBodiesByHash(hashes, tx) => { + let _ = tx.send(self.get_payload_bodies_by_hash(hashes)); + } + EngineApiMessage::GetPayloadBodiesByRange(start, count, tx) => { + let _ = tx.send(self.get_payload_bodies_by_range(start, count)); + } + EngineApiMessage::NewPayload(version, payload, tx) => { + if let Err(err) = self.validate_withdrawals_presence( + version, + payload.timestamp.as_u64(), + payload.withdrawals.is_some(), + ) { + let _ = tx.send(Err(err)); + return + } let _ = tx.send(self.new_payload(payload)); } - EngineApiMessage::ForkchoiceUpdated(state, attrs, tx) => { + EngineApiMessage::ForkchoiceUpdated(version, state, attrs, tx) => { + if let Some(attributes) = &attrs { + if let Err(err) = self.validate_withdrawals_presence( + version, + attributes.timestamp.as_u64(), + attributes.withdrawals.is_some(), + ) { + let _ = tx.send(Err(err)); + return + } + } + let _ = tx.send(self.fork_choice_updated(state, attrs)); } EngineApiMessage::ExchangeTransitionConfiguration(config, tx) => { @@ -61,6 +88,35 @@ impl EngineApi { } } + /// Validates the presence of the `withdrawals` field according to the payload timestamp. + /// After Shanghai, withdrawals field must be [Some]. + /// Before Shanghai, withdrawals field must be [None]; + fn validate_withdrawals_presence( + &self, + version: EngineApiMessageVersion, + timestamp: u64, + has_withdrawals: bool, + ) -> EngineApiResult<()> { + let is_shanghai = self.chain_spec.fork(Hardfork::Shanghai).active_at_timestamp(timestamp); + + match version { + EngineApiMessageVersion::V1 => { + if is_shanghai || has_withdrawals { + return Err(EngineApiError::InvalidParams) + } + } + EngineApiMessageVersion::V2 => { + let shanghai_with_no_withdrawals = is_shanghai && !has_withdrawals; + let not_shanghai_with_withdrawals = !is_shanghai && has_withdrawals; + if shanghai_with_no_withdrawals || not_shanghai_with_withdrawals { + return Err(EngineApiError::InvalidParams) + } + } + }; + + Ok(()) + } + /// Try to construct a block from given payload. Perform addition validation of `extra_data` and /// `base_fee_per_gas` fields. /// @@ -130,8 +186,50 @@ impl EngineApi { /// /// NOTE: Will always result in `PayloadUnknown` since we don't support block /// building for now. - pub fn get_payload(&self, payload_id: H64) -> Option { - self.local_store.get(&payload_id).cloned() + pub fn get_payload(&self, _payload_id: H64) -> Option { + None + } + + /// Called to retrieve execution payload bodies by range. + pub fn get_payload_bodies_by_range( + &self, + start: BlockNumber, + count: u64, + ) -> EngineApiResult { + if count > MAX_PAYLOAD_BODIES_LIMIT { + return Err(EngineApiError::PayloadRequestTooLarge { len: count }) + } + + if start == 0 || count == 0 { + return Err(EngineApiError::InvalidParams) + } + + let mut result = Vec::with_capacity(count as usize); + for num in start..start + count { + let block = self.client.block(BlockId::Number(num.into()))?; + result.push(block.map(Into::into)); + } + + Ok(result) + } + + /// Called to retrieve execution payload bodies by hashes. + pub fn get_payload_bodies_by_hash( + &self, + hashes: Vec, + ) -> EngineApiResult { + let len = hashes.len() as u64; + if len > MAX_PAYLOAD_BODIES_LIMIT { + return Err(EngineApiError::PayloadRequestTooLarge { len }) + } + + let mut result = Vec::with_capacity(hashes.len()); + for hash in hashes { + let block = self.client.block(BlockId::Hash(hash.into()))?; + result.push(block.map(Into::into)); + } + + Ok(result) } /// When the Consensus layer receives a new block via the consensus gossip protocol, @@ -321,7 +419,46 @@ mod tests { use reth_interfaces::test_utils::generators::random_block; use reth_primitives::{H256, MAINNET}; use reth_provider::test_utils::MockEthProvider; - use tokio::sync::mpsc::unbounded_channel; + use tokio::sync::{ + mpsc::{unbounded_channel, UnboundedSender}, + watch::Receiver as WatchReceiver, + }; + + fn setup_engine_api() -> (EngineApiTestHandle, EngineApi) { + let chain_spec = MAINNET.clone(); + let client = Arc::new(MockEthProvider::default()); + let (msg_tx, msg_rx) = unbounded_channel(); + let (forkchoice_state_tx, forkchoice_state_rx) = watch::channel(ForkchoiceState::default()); + let api = EngineApi { + client: client.clone(), + chain_spec: chain_spec.clone(), + message_rx: UnboundedReceiverStream::new(msg_rx), + forkchoice_state_tx, + }; + let handle = EngineApiTestHandle { chain_spec, client, msg_tx, forkchoice_state_rx }; + (handle, api) + } + + struct EngineApiTestHandle { + chain_spec: ChainSpec, + client: Arc, + msg_tx: UnboundedSender, + forkchoice_state_rx: WatchReceiver, + } + + impl EngineApiTestHandle { + fn send_message(&self, msg: EngineApiMessage) { + self.msg_tx.send(msg).expect("failed to send engine msg"); + } + + fn forkchoice_state(&self) -> ForkchoiceState { + self.forkchoice_state_rx.borrow().clone() + } + + fn forkchoice_state_has_changed(&self) -> bool { + self.forkchoice_state_rx.has_changed().unwrap() + } + } mod new_payload { use super::*; @@ -350,15 +487,7 @@ mod tests { #[tokio::test] async fn payload_validation() { - let (_msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let engine = EngineApi { - client: Arc::new(MockEthProvider::default()), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; + let (_, api) = setup_engine_api(); let block = random_block(100, Some(H256::random()), Some(3), Some(0)); @@ -367,7 +496,7 @@ mod tests { b.header.extra_data = BytesMut::zeroed(32).freeze().into(); b }); - assert_matches!(engine.try_construct_block(block_with_valid_extra_data.into()), Ok(_)); + assert_matches!(api.try_construct_block(block_with_valid_extra_data.into()), Ok(_)); // Invalid extra data let block_with_invalid_extra_data: Bytes = BytesMut::zeroed(33).freeze(); @@ -376,7 +505,7 @@ mod tests { b }); assert_matches!( - engine.try_construct_block(invalid_extra_data_block.into()), + api.try_construct_block(invalid_extra_data_block.into()), Err(EngineApiError::PayloadExtraData(data)) if data == block_with_invalid_extra_data ); @@ -386,7 +515,7 @@ mod tests { b }); assert_matches!( - engine.try_construct_block(block_with_zero_base_fee.into()), + api.try_construct_block(block_with_zero_base_fee.into()), Err(EngineApiError::PayloadBaseFee(val)) if val == U256::ZERO ); @@ -396,7 +525,7 @@ mod tests { *tx = Bytes::new().into(); }); assert_matches!( - engine.try_construct_block(payload_with_invalid_txs), + api.try_construct_block(payload_with_invalid_txs), Err(EngineApiError::Decode(DecodeError::InputTooShort)) ); @@ -406,7 +535,7 @@ mod tests { b }); assert_matches!( - engine.try_construct_block(block_with_ommers.clone().into()), + api.try_construct_block(block_with_ommers.clone().into()), Err(EngineApiError::PayloadBlockHash { consensus, .. }) if consensus == block_with_ommers.hash() ); @@ -417,7 +546,7 @@ mod tests { b }); assert_matches!( - engine.try_construct_block(block_with_difficulty.clone().into()), + api.try_construct_block(block_with_difficulty.clone().into()), Err(EngineApiError::PayloadBlockHash { consensus, .. }) if consensus == block_with_difficulty.hash() ); @@ -428,135 +557,93 @@ mod tests { b }); assert_matches!( - engine.try_construct_block(block_with_nonce.clone().into()), + api.try_construct_block(block_with_nonce.clone().into()), Err(EngineApiError::PayloadBlockHash { consensus, .. }) if consensus == block_with_nonce.hash() ); // Valid block let valid_block = block; - assert_matches!(engine.try_construct_block(valid_block.into()), Ok(_)); + assert_matches!(api.try_construct_block(valid_block.into()), Ok(_)); } #[tokio::test] async fn payload_known() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let client = Arc::new(MockEthProvider::default()); - let engine = EngineApi { - client: client.clone(), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let block = random_block(100, Some(H256::random()), None, Some(0)); // payload must have no ommers let block_hash = block.hash(); let execution_payload = block.clone().into(); - client.add_header(block_hash, block.header.unseal()); + handle.client.add_header(block_hash, block.header.unseal()); let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::NewPayload(execution_payload, result_tx)) - .expect("failed to send engine msg"); + handle.send_message(EngineApiMessage::NewPayload( + EngineApiMessageVersion::V1, + execution_payload, + result_tx, + )); - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); let expected_result = PayloadStatus::new(PayloadStatusEnum::Valid, block_hash); - assert_eq!(result.unwrap().unwrap(), expected_result); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); } #[tokio::test] async fn payload_parent_unknown() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let engine = EngineApi { - client: Arc::new(MockEthProvider::default()), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let (result_tx, result_rx) = oneshot::channel(); let block = random_block(100, Some(H256::random()), None, Some(0)); // payload must have no ommers - msg_tx - .send(EngineApiMessage::NewPayload(block.into(), result_tx)) - .expect("failed to send engine msg"); + handle.send_message(EngineApiMessage::NewPayload( + EngineApiMessageVersion::V1, + block.into(), + result_tx, + )); - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Syncing); - assert_eq!(result.unwrap().unwrap(), expected_result); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); } #[tokio::test] async fn payload_pre_merge() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let chain_spec = MAINNET.clone(); - let client = Arc::new(MockEthProvider::default()); - let engine = EngineApi { - client: client.clone(), - chain_spec: chain_spec.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); - let (result_tx, result_rx) = oneshot::channel(); let parent = transform_block(random_block(100, None, None, Some(0)), |mut b| { b.header.difficulty = - chain_spec.fork(Hardfork::Paris).ttd().unwrap() - U256::from(1); + handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() - U256::from(1); b }); let block = random_block(101, Some(parent.hash()), None, Some(0)); - client.add_block(parent.hash(), parent.clone().unseal()); + handle.client.add_block(parent.hash(), parent.clone().unseal()); - msg_tx - .send(EngineApiMessage::NewPayload(block.clone().into(), result_tx)) - .expect("failed to send engine msg"); + let (result_tx, result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::NewPayload( + EngineApiMessageVersion::V1, + block.clone().into(), + result_tx, + )); - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: EngineApiError::PayloadPreMerge.to_string(), }); - assert_eq!(result.unwrap().unwrap(), expected_result); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); } #[tokio::test] async fn invalid_payload_timestamp() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let chain_spec = MAINNET.clone(); - let client = Arc::new(MockEthProvider::default()); - let engine = EngineApi { - client: client.clone(), - chain_spec: chain_spec.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); - let (result_tx, result_rx) = oneshot::channel(); let block_timestamp = 100; let parent_timestamp = block_timestamp + 10; let parent = transform_block(random_block(100, None, None, Some(0)), |mut b| { b.header.timestamp = parent_timestamp; b.header.difficulty = - chain_spec.fork(Hardfork::Paris).ttd().unwrap() + U256::from(1); + handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() + U256::from(1); b }); let block = @@ -565,14 +652,15 @@ mod tests { b }); - client.add_block(parent.hash(), parent.clone().unseal()); + handle.client.add_block(parent.hash(), parent.clone().unseal()); - msg_tx - .send(EngineApiMessage::NewPayload(block.clone().into(), result_tx)) - .expect("failed to send engine msg"); + let (result_tx, result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::NewPayload( + EngineApiMessageVersion::V1, + block.clone().into(), + result_tx, + )); - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); let expected_result = PayloadStatus::from_status(PayloadStatusEnum::Invalid { validation_error: EngineApiError::PayloadTimestamp { invalid: block_timestamp, @@ -580,7 +668,7 @@ mod tests { } .to_string(), }); - assert_eq!(result.unwrap().unwrap(), expected_result); + assert_matches!( result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); } // TODO: add execution tests @@ -593,115 +681,187 @@ mod tests { #[tokio::test] async fn payload_unknown() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let engine = EngineApi { - client: Arc::new(MockEthProvider::default()), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let payload_id = H64::random(); - let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::GetPayload(payload_id, result_tx)) - .expect("failed to send engine msg"); + handle.send_message(EngineApiMessage::GetPayload(payload_id, result_tx)); assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadUnknown))); } } - mod fork_choice_updated { - use reth_interfaces::test_utils::generators::random_header; - + // tests covering `engine_getPayloadBodiesByRange` and `engine_getPayloadBodiesByHash` + mod get_payload_bodies { use super::*; + use reth_interfaces::test_utils::generators::random_block_range; #[tokio::test] - async fn empty_head() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); - let engine = EngineApi { - client: Arc::new(MockEthProvider::default()), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; + async fn invalid_params() { + let (handle, api) = setup_engine_api(); + tokio::spawn(api); + + let by_range_tests = [ + // (start, count) + (0, 0), + (0, 1), + (1, 0), + ]; + + // test [EngineApiMessage::GetPayloadBodiesByRange] + for (start, count) in by_range_tests { + let (result_tx, result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::GetPayloadBodiesByRange( + start, count, result_tx, + )); + assert_matches!(result_rx.await, Ok(Err(EngineApiError::InvalidParams))); + } + } + + #[tokio::test] + async fn request_too_large() { + let (handle, api) = setup_engine_api(); + tokio::spawn(api); - tokio::spawn(engine); + let request_count = MAX_PAYLOAD_BODIES_LIMIT + 1; let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ForkchoiceUpdated( - ForkchoiceState::default(), - None, - result_tx, - )) - .expect("failed to send engine msg"); - - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); - assert_eq!( - result.unwrap().unwrap(), - ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { - validation_error: EngineApiError::ForkchoiceEmptyHead.to_string(), - }) + handle.send_message(EngineApiMessage::GetPayloadBodiesByRange( + 0, + request_count, + result_tx, + )); + assert_matches!( + result_rx.await, + Ok(Err(EngineApiError::PayloadRequestTooLarge { .. })) ); - assert!(!tip_rx.has_changed().unwrap()); + + let (result_tx, result_rx) = oneshot::channel(); + let hashes = std::iter::repeat(H256::default()).take(request_count as usize).collect(); + handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx)); + assert_matches!(result_rx.await, Ok(Err(EngineApiError::PayloadRequestTooLarge { .. }))) } #[tokio::test] - async fn unknown_head_hash() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); - let engine = EngineApi { - client: Arc::new(MockEthProvider::default()), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; + async fn returns_payload_bodies() { + let (handle, api) = setup_engine_api(); + tokio::spawn(api); - tokio::spawn(engine); + let (start, count) = (1, 10); + let blocks = random_block_range(start..start + count, H256::default(), 0..2); + handle.client.extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.unseal()))); - let state = ForkchoiceState { head_block_hash: H256::random(), ..Default::default() }; + let expected = + blocks.iter().cloned().map(|b| Some(b.unseal().into())).collect::>(); let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ForkchoiceUpdated(state, None, result_tx)) - .expect("failed to send engine msg"); - - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); - assert_eq!( - result.unwrap().unwrap(), - ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing) + handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(start, count, result_tx)); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); + + let (result_tx, result_rx) = oneshot::channel(); + let hashes = blocks.iter().map(|b| b.hash()).collect(); + handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx)); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); + } + + #[tokio::test] + async fn returns_payload_bodies_with_gaps() { + let (handle, api) = setup_engine_api(); + tokio::spawn(api); + + let (start, count) = (1, 100); + let blocks = random_block_range(start..start + count, H256::default(), 0..2); + + // Insert only blocks in ranges 1-25 and 50-75 + let first_missing_range = 26..=50; + let second_missing_range = 76..=100; + handle.client.extend_blocks( + blocks + .iter() + .filter(|b| { + !first_missing_range.contains(&b.number) && + !second_missing_range.contains(&b.number) + }) + .map(|b| (b.hash(), b.clone().unseal())), ); - assert!(!tip_rx.has_changed().unwrap()); + + let expected = blocks + .iter() + .cloned() + .map(|b| { + if first_missing_range.contains(&b.number) || + second_missing_range.contains(&b.number) + { + None + } else { + Some(b.unseal().into()) + } + }) + .collect::>(); + + let (result_tx, result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::GetPayloadBodiesByRange(start, count, result_tx)); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); + + let (result_tx, result_rx) = oneshot::channel(); + let hashes = blocks.iter().map(|b| b.hash()).collect(); + handle.send_message(EngineApiMessage::GetPayloadBodiesByHash(hashes, result_tx)); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected)); + } + } + + mod fork_choice_updated { + use super::*; + use reth_interfaces::test_utils::generators::random_header; + + #[tokio::test] + async fn empty_head() { + let (handle, api) = setup_engine_api(); + tokio::spawn(api); + + let (result_tx, result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::ForkchoiceUpdated( + EngineApiMessageVersion::V1, + ForkchoiceState::default(), + None, + result_tx, + )); + + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Invalid { + validation_error: EngineApiError::ForkchoiceEmptyHead.to_string(), + }); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert!(!handle.forkchoice_state_has_changed()); } #[tokio::test] - async fn unknown_finalized_hash() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); - let client = Arc::new(MockEthProvider::default()); - let engine = EngineApi { - client: client.clone(), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; + async fn unknown_head_hash() { + let (handle, api) = setup_engine_api(); + tokio::spawn(api); - tokio::spawn(engine); + let state = ForkchoiceState { head_block_hash: H256::random(), ..Default::default() }; + + let (result_tx, result_rx) = oneshot::channel(); + handle.send_message(EngineApiMessage::ForkchoiceUpdated( + EngineApiMessageVersion::V1, + state, + None, + result_tx, + )); + + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert!(!handle.forkchoice_state_has_changed()); + } + + #[tokio::test] + async fn unknown_finalized_hash() { + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let head = random_header(100, None); - client.add_header(head.hash(), head.clone().unseal()); + handle.client.add_header(head.hash(), head.clone().unseal()); let state = ForkchoiceState { head_block_hash: head.hash(), @@ -710,37 +870,26 @@ mod tests { }; let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ForkchoiceUpdated(state, None, result_tx)) - .expect("failed to send engine msg"); - - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); - assert_eq!( - result.unwrap().unwrap(), - ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing) - ); - assert!(!tip_rx.has_changed().unwrap()); + handle.send_message(EngineApiMessage::ForkchoiceUpdated( + EngineApiMessageVersion::V1, + state, + None, + result_tx, + )); + + let expected_result = ForkchoiceUpdated::from_status(PayloadStatusEnum::Syncing); + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); + assert!(!handle.forkchoice_state_has_changed()); } #[tokio::test] async fn forkchoice_state_is_updated() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, tip_rx) = watch::channel(ForkchoiceState::default()); - let client = Arc::new(MockEthProvider::default()); - let engine = EngineApi { - client: client.clone(), - chain_spec: MAINNET.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let finalized = random_header(90, None); let head = random_header(100, None); - client.extend_headers([ + handle.client.extend_headers([ (head.hash(), head.clone().unseal()), (finalized.hash(), finalized.clone().unseal()), ]); @@ -752,25 +901,24 @@ mod tests { }; let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ForkchoiceUpdated(state.clone(), None, result_tx)) - .expect("failed to send engine msg"); - - let result = result_rx.await; - assert_matches!(result, Ok(Ok(_))); - assert_eq!( - result.unwrap().unwrap(), - ForkchoiceUpdated { - payload_id: None, - payload_status: PayloadStatus { - status: PayloadStatusEnum::Valid, - latest_valid_hash: Some(head.hash()) - } - } - ); + handle.send_message(EngineApiMessage::ForkchoiceUpdated( + EngineApiMessageVersion::V1, + state.clone(), + None, + result_tx, + )); + + let expected_result = ForkchoiceUpdated { + payload_id: None, + payload_status: PayloadStatus { + status: PayloadStatusEnum::Valid, + latest_valid_hash: Some(head.hash()), + }, + }; + assert_matches!(result_rx.await, Ok(Ok(result)) => assert_eq!(result, expected_result)); - assert!(tip_rx.has_changed().unwrap()); - assert_eq!(tip_rx.borrow().clone(), state); + assert!(handle.forkchoice_state_has_changed()); + assert_eq!(handle.forkchoice_state(), state); } } @@ -780,144 +928,98 @@ mod tests { #[tokio::test] async fn terminal_td_mismatch() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let chain_spec = MAINNET.clone(); - let engine = EngineApi { - client: Arc::new(MockEthProvider::default()), - chain_spec: chain_spec.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let transition_config = TransitionConfiguration { - terminal_total_difficulty: chain_spec.fork(Hardfork::Paris).ttd().unwrap() + + terminal_total_difficulty: handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap() + U256::from(1), ..Default::default() }; let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); - - assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::TerminalTD { execution, consensus })) - if execution == chain_spec.fork(Hardfork::Paris).ttd().unwrap() - && consensus == U256::from(transition_config.terminal_total_difficulty) - ); + handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )); + + let expected_error = EngineApiError::TerminalTD { + execution: handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap(), + consensus: U256::from(transition_config.terminal_total_difficulty), + }; + assert_matches!(result_rx.await, Ok(Err(error)) => assert_eq!(error, expected_error)); } #[tokio::test] async fn terminal_block_hash_mismatch() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let client = Arc::new(MockEthProvider::default()); - let chain_spec = MAINNET.clone(); - let engine = EngineApi { - client: client.clone(), - chain_spec: chain_spec.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let terminal_block_number = 1000; let consensus_terminal_block = random_block(terminal_block_number, None, None, None); let execution_terminal_block = random_block(terminal_block_number, None, None, None); let transition_config = TransitionConfiguration { - terminal_total_difficulty: chain_spec.fork(Hardfork::Paris).ttd().unwrap(), + terminal_total_difficulty: handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap(), terminal_block_hash: consensus_terminal_block.hash(), terminal_block_number: terminal_block_number.into(), }; // Unknown block number let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); - - assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus })) - if execution.is_none() - && consensus == transition_config.terminal_block_hash - ); + handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )); + + let expected_error = EngineApiError::TerminalBlockHash { + execution: None, + consensus: transition_config.terminal_block_hash, + }; + assert_matches!(result_rx.await, Ok(Err(error)) => assert_eq!(error, expected_error)); // Add block and to provider local store and test for mismatch - client.add_block( + handle.client.add_block( execution_terminal_block.hash(), execution_terminal_block.clone().unseal(), ); let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); - - assert_matches!( - result_rx.await, - Ok(Err(EngineApiError::TerminalBlockHash { execution, consensus })) - if execution == Some(execution_terminal_block.hash()) - && consensus == transition_config.terminal_block_hash - ); + handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )); + + let expected_error = EngineApiError::TerminalBlockHash { + execution: Some(execution_terminal_block.hash()), + consensus: transition_config.terminal_block_hash, + }; + assert_matches!(result_rx.await, Ok(Err(error)) => assert_eq!(error, expected_error)); } #[tokio::test] async fn configurations_match() { - let (msg_tx, msg_rx) = unbounded_channel(); - let (tip_tx, _tip_rx) = watch::channel(ForkchoiceState::default()); - let client = Arc::new(MockEthProvider::default()); - let chain_spec = MAINNET.clone(); - let engine = EngineApi { - client: client.clone(), - chain_spec: chain_spec.clone(), - local_store: Default::default(), - message_rx: UnboundedReceiverStream::new(msg_rx), - forkchoice_state_tx: tip_tx, - }; - - tokio::spawn(engine); + let (handle, api) = setup_engine_api(); + tokio::spawn(api); let terminal_block_number = 1000; let terminal_block = random_block(terminal_block_number, None, None, None); let transition_config = TransitionConfiguration { - terminal_total_difficulty: chain_spec.fork(Hardfork::Paris).ttd().unwrap(), + terminal_total_difficulty: handle.chain_spec.fork(Hardfork::Paris).ttd().unwrap(), terminal_block_hash: terminal_block.hash(), terminal_block_number: terminal_block_number.into(), }; - client.add_block(terminal_block.hash(), terminal_block.clone().unseal()); + handle.client.add_block(terminal_block.hash(), terminal_block.clone().unseal()); let (result_tx, result_rx) = oneshot::channel(); - msg_tx - .send(EngineApiMessage::ExchangeTransitionConfiguration( - transition_config.clone(), - result_tx, - )) - .expect("failed to send engine msg"); + handle.send_message(EngineApiMessage::ExchangeTransitionConfiguration( + transition_config.clone(), + result_tx, + )); - assert_matches!( - result_rx.await, - Ok(Ok(config)) if config == transition_config - ); + assert_matches!(result_rx.await, Ok(Ok(config)) => assert_eq!(config, transition_config)); } } } diff --git a/crates/rpc/rpc-engine-api/src/error.rs b/crates/rpc/rpc-engine-api/src/error.rs index 86eab5d7886f..49c7c1afce1f 100644 --- a/crates/rpc/rpc-engine-api/src/error.rs +++ b/crates/rpc/rpc-engine-api/src/error.rs @@ -4,8 +4,13 @@ use thiserror::Error; /// The Engine API result type pub type EngineApiResult = Result; +/// Payload unknown error code. +pub const UNKNOWN_PAYLOAD_CODE: i32 = -38001; +/// Request too large error code. +pub const REQUEST_TOO_LARGE_CODE: i32 = -38004; + /// Error returned by [`EngineApi`][crate::EngineApi] -#[derive(Error, Debug)] +#[derive(Error, PartialEq, Debug)] pub enum EngineApiError { /// Invalid payload extra data. #[error("Invalid payload extra data: {0}")] @@ -41,6 +46,15 @@ pub enum EngineApiError { /// Unknown payload requested. #[error("Unknown payload")] PayloadUnknown, + /// The payload body request length is too large. + #[error("Payload request too large: {len}")] + PayloadRequestTooLarge { + /// The length that was requested. + len: u64, + }, + /// The params are invalid. + #[error("Invalid params")] + InvalidParams, /// Terminal total difficulty mismatch during transition configuration exchange. #[error( "Invalid transition terminal total difficulty. Execution: {execution}. Consensus: {consensus}" diff --git a/crates/rpc/rpc-engine-api/src/lib.rs b/crates/rpc/rpc-engine-api/src/lib.rs index e7cadc0b06c1..daeab9bb2f88 100644 --- a/crates/rpc/rpc-engine-api/src/lib.rs +++ b/crates/rpc/rpc-engine-api/src/lib.rs @@ -9,14 +9,14 @@ //! [Read more](https://github.com/ethereum/execution-apis/tree/main/src/engine). /// The Engine API implementation. -pub mod engine_api; +mod engine_api; /// The Engine API message type. -pub mod message; +mod message; /// Engine API error. -pub mod error; +mod error; pub use engine_api::{EngineApi, EngineApiSender}; -pub use error::{EngineApiError, EngineApiResult}; -pub use message::EngineApiMessage; +pub use error::*; +pub use message::{EngineApiMessage, EngineApiMessageVersion}; diff --git a/crates/rpc/rpc-engine-api/src/message.rs b/crates/rpc/rpc-engine-api/src/message.rs index 5f5f123de887..2f6707ced10b 100644 --- a/crates/rpc/rpc-engine-api/src/message.rs +++ b/crates/rpc/rpc-engine-api/src/message.rs @@ -1,19 +1,25 @@ use crate::EngineApiSender; use reth_interfaces::consensus::ForkchoiceState; -use reth_primitives::H64; +use reth_primitives::{BlockHash, BlockNumber, H64}; use reth_rpc_types::engine::{ - ExecutionPayload, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, TransitionConfiguration, + ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, + TransitionConfiguration, }; /// Message type for communicating with [`EngineApi`][crate::EngineApi]. #[derive(Debug)] pub enum EngineApiMessage { /// New payload message - NewPayload(ExecutionPayload, EngineApiSender), + NewPayload(EngineApiMessageVersion, ExecutionPayload, EngineApiSender), /// Get payload message GetPayload(H64, EngineApiSender), + /// Get payload bodies by range message + GetPayloadBodiesByRange(BlockNumber, u64, EngineApiSender), + /// Get payload bodies by hash message + GetPayloadBodiesByHash(Vec, EngineApiSender), /// Forkchoice updated message ForkchoiceUpdated( + EngineApiMessageVersion, ForkchoiceState, Option, EngineApiSender, @@ -24,3 +30,12 @@ pub enum EngineApiMessage { EngineApiSender, ), } + +/// The version of Engine API message. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum EngineApiMessageVersion { + /// Version 1 + V1, + /// Version 2 + V2, +} diff --git a/crates/rpc/rpc-types/Cargo.toml b/crates/rpc/rpc-types/Cargo.toml index 13996c5266df..d3d5a9d1c18a 100644 --- a/crates/rpc/rpc-types/Cargo.toml +++ b/crates/rpc/rpc-types/Cargo.toml @@ -25,3 +25,6 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" jsonrpsee-types = { version = "0.16" } lru = "0.9" + +[dev-dependencies] +reth-interfaces = { path = "../../interfaces", features = ["test-utils"] } diff --git a/crates/rpc/rpc-types/src/eth/engine.rs b/crates/rpc/rpc-types/src/eth/engine.rs index a536d86125b8..4d0d7667bb14 100644 --- a/crates/rpc/rpc-types/src/eth/engine.rs +++ b/crates/rpc/rpc-types/src/eth/engine.rs @@ -3,7 +3,7 @@ #![allow(missing_docs)] use reth_primitives::{ - bytes::BytesMut, Address, Bloom, Bytes, SealedBlock, Withdrawal, H256, H64, U256, U64, + Address, Block, Bloom, Bytes, SealedBlock, Withdrawal, H256, H64, U256, U64, }; use reth_rlp::Encodable; use serde::{Deserialize, Serialize}; @@ -40,9 +40,9 @@ impl From for ExecutionPayload { .body .iter() .map(|tx| { - let mut encoded = BytesMut::new(); + let mut encoded = Vec::new(); tx.encode(&mut encoded); - encoded.freeze().into() + encoded.into() }) .collect(); ExecutionPayload { @@ -70,10 +70,27 @@ impl From for ExecutionPayload { /// See also: #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] pub struct ExecutionPayloadBody { - transactions: Vec, - withdrawals: Vec, + pub transactions: Vec, + pub withdrawals: Vec, +} + +impl From for ExecutionPayloadBody { + fn from(value: Block) -> Self { + let transactions = value.body.into_iter().map(|tx| { + let mut out = Vec::new(); + tx.encode(&mut out); + out.into() + }); + ExecutionPayloadBody { + transactions: transactions.collect(), + withdrawals: value.withdrawals.unwrap_or_default(), + } + } } +/// The execution payload body response that allows for `null` values. +pub type ExecutionPayloadBodies = Vec>; + /// This structure encapsulates the fork choice state #[derive(Default, Clone, Debug, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] @@ -171,3 +188,30 @@ impl ForkchoiceUpdated { self } } + +#[cfg(test)] +mod tests { + use super::*; + use reth_interfaces::test_utils::generators::random_block_range; + use reth_primitives::{TransactionSigned, H256}; + use reth_rlp::Decodable; + + #[test] + fn payload_body_roundtrip() { + for block in random_block_range(0..100, H256::default(), 0..2) { + let unsealed = block.clone().unseal(); + let payload_body: ExecutionPayloadBody = unsealed.into(); + + assert_eq!( + Ok(block.body), + payload_body + .transactions + .iter() + .map(|x| TransactionSigned::decode(&mut &x[..])) + .collect::, _>>(), + ); + + assert_eq!(block.withdrawals.unwrap_or_default(), payload_body.withdrawals); + } + } +} diff --git a/crates/rpc/rpc/src/engine/mod.rs b/crates/rpc/rpc/src/engine/mod.rs index a2d7aea28738..e7c492d9638a 100644 --- a/crates/rpc/rpc/src/engine/mod.rs +++ b/crates/rpc/rpc/src/engine/mod.rs @@ -1,12 +1,18 @@ -use crate::result::{internal_rpc_err, rpc_err}; +use crate::result::rpc_err; use async_trait::async_trait; -use jsonrpsee::core::{Error, RpcResult as Result}; +use jsonrpsee::{ + core::{Error, RpcResult as Result}, + types::error::INVALID_PARAMS_CODE, +}; use reth_interfaces::consensus::ForkchoiceState; use reth_primitives::{BlockHash, BlockNumber, H64}; use reth_rpc_api::EngineApiServer; -use reth_rpc_engine_api::{EngineApiError, EngineApiMessage, EngineApiResult}; +use reth_rpc_engine_api::{ + EngineApiError, EngineApiMessage, EngineApiMessageVersion, EngineApiResult, + REQUEST_TOO_LARGE_CODE, UNKNOWN_PAYLOAD_CODE, +}; use reth_rpc_types::engine::{ - ExecutionPayload, ExecutionPayloadBody, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, + ExecutionPayload, ExecutionPayloadBodies, ForkchoiceUpdated, PayloadAttributes, PayloadStatus, TransitionConfiguration, }; use tokio::sync::{ @@ -35,7 +41,9 @@ impl EngineApi { let _ = self.engine_tx.send(msg); rx.await.map_err(|err| Error::Custom(err.to_string()))?.map_err(|err| { let code = match err { - EngineApiError::PayloadUnknown => -38001, + EngineApiError::InvalidParams => INVALID_PARAMS_CODE, + EngineApiError::PayloadUnknown => UNKNOWN_PAYLOAD_CODE, + EngineApiError::PayloadRequestTooLarge { .. } => REQUEST_TOO_LARGE_CODE, // Any other server error _ => jsonrpsee::types::error::INTERNAL_ERROR_CODE, }; @@ -50,13 +58,21 @@ impl EngineApiServer for EngineApi { /// Caution: This should not accept the `withdrawals` field async fn new_payload_v1(&self, payload: ExecutionPayload) -> Result { let (tx, rx) = oneshot::channel(); - self.delegate_request(EngineApiMessage::NewPayload(payload, tx), rx).await + self.delegate_request( + EngineApiMessage::NewPayload(EngineApiMessageVersion::V1, payload, tx), + rx, + ) + .await } /// See also - async fn new_payload_v2(&self, _payload: ExecutionPayload) -> Result { - // TODO: - Err(internal_rpc_err("unimplemented")) + async fn new_payload_v2(&self, payload: ExecutionPayload) -> Result { + let (tx, rx) = oneshot::channel(); + self.delegate_request( + EngineApiMessage::NewPayload(EngineApiMessageVersion::V2, payload, tx), + rx, + ) + .await } /// See also @@ -69,7 +85,12 @@ impl EngineApiServer for EngineApi { ) -> Result { let (tx, rx) = oneshot::channel(); self.delegate_request( - EngineApiMessage::ForkchoiceUpdated(fork_choice_state, payload_attributes, tx), + EngineApiMessage::ForkchoiceUpdated( + EngineApiMessageVersion::V1, + fork_choice_state, + payload_attributes, + tx, + ), rx, ) .await @@ -78,11 +99,20 @@ impl EngineApiServer for EngineApi { /// See also async fn fork_choice_updated_v2( &self, - _fork_choice_state: ForkchoiceState, - _payload_attributes: Option, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, ) -> Result { - // TODO: - Err(internal_rpc_err("unimplemented")) + let (tx, rx) = oneshot::channel(); + self.delegate_request( + EngineApiMessage::ForkchoiceUpdated( + EngineApiMessageVersion::V2, + fork_choice_state, + payload_attributes, + tx, + ), + rx, + ) + .await } /// See also @@ -94,28 +124,28 @@ impl EngineApiServer for EngineApi { } /// See also - async fn get_payload_v2(&self, _payload_id: H64) -> Result { - // TODO: - Err(internal_rpc_err("unimplemented")) + async fn get_payload_v2(&self, payload_id: H64) -> Result { + let (tx, rx) = oneshot::channel(); + self.delegate_request(EngineApiMessage::GetPayload(payload_id, tx), rx).await } /// See also async fn get_payload_bodies_by_hash_v1( &self, - _block_hashes: Vec, - ) -> Result> { - // TODO: - Err(internal_rpc_err("unimplemented")) + block_hashes: Vec, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.delegate_request(EngineApiMessage::GetPayloadBodiesByHash(block_hashes, tx), rx).await } /// See also async fn get_payload_bodies_by_range_v1( &self, - _start: BlockNumber, - _count: u64, - ) -> Result> { - // TODO: - Err(internal_rpc_err("unimplemented")) + start: BlockNumber, + count: u64, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.delegate_request(EngineApiMessage::GetPayloadBodiesByRange(start, count, tx), rx).await } /// See also diff --git a/crates/storage/provider/src/traits/header.rs b/crates/storage/provider/src/traits/header.rs index 347c9713a639..720ed3011114 100644 --- a/crates/storage/provider/src/traits/header.rs +++ b/crates/storage/provider/src/traits/header.rs @@ -28,6 +28,6 @@ pub trait HeaderProvider: Send + Sync { /// Get total difficulty by block hash. fn header_td(&self, hash: &BlockHash) -> Result>; - /// Get headers in range of block hashes or numbers + /// Get headers in range of block numbers fn headers_range(&self, range: impl RangeBounds) -> Result>; }