diff --git a/crates/consensus/beacon/src/engine/forkchoice.rs b/crates/consensus/beacon/src/engine/forkchoice.rs index 491d0ff8aade..ba09dff6c017 100644 --- a/crates/consensus/beacon/src/engine/forkchoice.rs +++ b/crates/consensus/beacon/src/engine/forkchoice.rs @@ -20,7 +20,7 @@ impl ForkchoiceStateTracker { /// /// If the status is `VALID`, we also update the last valid forkchoice state and set the /// `sync_target` to `None`, since we're now fully synced. - pub(crate) fn set_latest(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) { + pub fn set_latest(&mut self, state: ForkchoiceState, status: ForkchoiceStatus) { if status.is_valid() { self.set_valid(state); } else if status.is_syncing() { diff --git a/crates/consensus/beacon/src/engine/message.rs b/crates/consensus/beacon/src/engine/message.rs index 052b275c181d..f58f620b44ac 100644 --- a/crates/consensus/beacon/src/engine/message.rs +++ b/crates/consensus/beacon/src/engine/message.rs @@ -48,7 +48,7 @@ impl OnForkChoiceUpdated { /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update succeeded and no /// payload attributes were provided. - pub(crate) fn valid(status: PayloadStatus) -> Self { + pub fn valid(status: PayloadStatus) -> Self { Self { forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status), fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))), @@ -57,7 +57,7 @@ impl OnForkChoiceUpdated { /// Creates a new instance of `OnForkChoiceUpdated` with the given payload status, if the /// forkchoice update failed due to an invalid payload. - pub(crate) fn with_invalid(status: PayloadStatus) -> Self { + pub fn with_invalid(status: PayloadStatus) -> Self { Self { forkchoice_status: ForkchoiceStatus::from_payload_status(&status.status), fut: Either::Left(futures::future::ready(Ok(ForkchoiceUpdated::new(status)))), @@ -66,7 +66,7 @@ impl OnForkChoiceUpdated { /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update failed because the /// given state is considered invalid - pub(crate) fn invalid_state() -> Self { + pub fn invalid_state() -> Self { Self { forkchoice_status: ForkchoiceStatus::Invalid, fut: Either::Left(futures::future::ready(Err(ForkchoiceUpdateError::InvalidState))), @@ -75,7 +75,7 @@ impl OnForkChoiceUpdated { /// Creates a new instance of `OnForkChoiceUpdated` if the forkchoice update was successful but /// payload attributes were invalid. - pub(crate) fn invalid_payload_attributes() -> Self { + pub fn invalid_payload_attributes() -> Self { Self { // This is valid because this is only reachable if the state and payload is valid forkchoice_status: ForkchoiceStatus::Valid, @@ -86,7 +86,7 @@ impl OnForkChoiceUpdated { } /// If the forkchoice update was successful and no payload attributes were provided, this method - pub(crate) const fn updated_with_pending_payload_id( + pub const fn updated_with_pending_payload_id( payload_status: PayloadStatus, pending_payload_id: oneshot::Receiver>, ) -> Self { diff --git a/crates/engine/tree/src/tree/mod.rs b/crates/engine/tree/src/tree/mod.rs index 9ac472961033..d5d035e3af19 100644 --- a/crates/engine/tree/src/tree/mod.rs +++ b/crates/engine/tree/src/tree/mod.rs @@ -12,7 +12,7 @@ use reth_blockchain_tree::{ use reth_blockchain_tree_api::{error::InsertBlockError, InsertPayloadOk}; use reth_consensus::{Consensus, PostExecutionInput}; use reth_engine_primitives::EngineTypes; -use reth_errors::{ConsensusError, ProviderResult, RethError}; +use reth_errors::{ConsensusError, ProviderResult}; use reth_evm::execute::{BlockExecutorProvider, Executor}; use reth_payload_primitives::PayloadTypes; use reth_payload_validator::ExecutionPayloadValidator; @@ -188,7 +188,7 @@ pub trait EngineApiTreeHandler { &mut self, state: ForkchoiceState, attrs: Option<::PayloadAttributes>, - ) -> TreeOutcome>; + ) -> ProviderResult>; } /// The outcome of a tree operation. @@ -312,7 +312,8 @@ where FromEngine::Request(request) => match request { BeaconEngineMessage::ForkchoiceUpdated { state, payload_attrs, tx } => { let output = self.on_forkchoice_updated(state, payload_attrs); - if let Err(err) = tx.send(output.outcome) { + if let Err(err) = tx.send(output.map(|o| o.outcome).map_err(Into::into)) + { error!("Failed to send event: {err:?}"); } } @@ -466,6 +467,15 @@ where Ok(Some(status)) } + /// Checks if the given `head` points to an invalid header, which requires a specific response + /// to a forkchoice update. + fn check_invalid_ancestor(&mut self, head: B256) -> ProviderResult> { + // check if the head was previously marked as invalid + let Some(header) = self.state.invalid_headers.get(&head) else { return Ok(None) }; + // populate the latest valid hash field + Ok(Some(self.prepare_invalid_response(header.parent_hash)?)) + } + /// Validate if block is correct and satisfies all the consensus rules that concern the header /// and block body itself. fn validate_block(&self, block: &SealedBlockWithSenders) -> Result<(), ConsensusError> { @@ -570,6 +580,35 @@ where let attachment = BlockAttachment::Canonical; // TODO: remove or revise attachment Ok(InsertPayloadOk::Inserted(BlockStatus::Valid(attachment))) } + + /// Pre-validate forkchoice update and check whether it can be processed. + /// + /// This method returns the update outcome if validation fails or + /// the node is syncing and the update cannot be processed at the moment. + fn pre_validate_forkchoice_update( + &mut self, + state: ForkchoiceState, + ) -> ProviderResult> { + if state.head_block_hash.is_zero() { + return Ok(Some(OnForkChoiceUpdated::invalid_state())) + } + + // check if the new head hash is connected to any ancestor that we previously marked as + // invalid + let lowest_buffered_ancestor_fcu = self.lowest_buffered_ancestor_or(state.head_block_hash); + if let Some(status) = self.check_invalid_ancestor(lowest_buffered_ancestor_fcu)? { + return Ok(Some(OnForkChoiceUpdated::with_invalid(status))) + } + + if self.is_pipeline_active { + // We can only process new forkchoice updates if the pipeline is idle, since it requires + // exclusive access to the database + trace!(target: "consensus::engine", "Pipeline is syncing, skipping forkchoice update"); + return Ok(Some(OnForkChoiceUpdated::syncing())) + } + + Ok(None) + } } impl EngineApiTreeHandler for EngineApiTreeHandlerImpl @@ -699,7 +738,12 @@ where &mut self, state: ForkchoiceState, attrs: Option<::PayloadAttributes>, - ) -> TreeOutcome> { + ) -> ProviderResult> { + if let Some(on_updated) = self.pre_validate_forkchoice_update(state)? { + self.state.forkchoice_state_tracker.set_latest(state, on_updated.forkchoice_status()); + return Ok(TreeOutcome::new(on_updated)) + } + todo!() } }