From 42dc1ed04b432ebc5e8604cab31eacec7d73883a Mon Sep 17 00:00:00 2001 From: Tien Dao <15717476+tiendn@users.noreply.github.com> Date: Tue, 28 Jan 2025 19:32:21 +0700 Subject: [PATCH] docs(exex): Expand documentation with examples, assumptions, and invariants. (#13581) Co-authored-by: Matthias Seitz --- crates/exex/exex/src/context.rs | 20 +++++++++-- crates/exex/exex/src/lib.rs | 60 +++++++++++++++++++++++++++++++-- crates/exex/types/src/head.rs | 7 ++++ 3 files changed, 81 insertions(+), 6 deletions(-) diff --git a/crates/exex/exex/src/context.rs b/crates/exex/exex/src/context.rs index e48abed6dc51..8e29a7abaf7f 100644 --- a/crates/exex/exex/src/context.rs +++ b/crates/exex/exex/src/context.rs @@ -6,9 +6,11 @@ use reth_node_core::node_config::NodeConfig; use reth_provider::BlockReader; use reth_tasks::TaskExecutor; use std::fmt::Debug; -use tokio::sync::mpsc::UnboundedSender; +use tokio::sync::mpsc::{error::SendError, UnboundedSender}; /// Captures the context that an `ExEx` has access to. +/// +/// This type wraps various node components that the `ExEx` has access to. pub struct ExExContext { /// The current head of the blockchain at launch. pub head: BlockNumHash, @@ -103,6 +105,8 @@ where } /// Returns the task executor. + /// + /// This type should be used to spawn (critical) tasks. pub fn task_executor(&self) -> &TaskExecutor { self.components.task_executor() } @@ -118,16 +122,26 @@ where pub fn set_notifications_with_head(&mut self, head: ExExHead) { self.notifications.set_with_head(head); } + + /// Sends an [`ExExEvent::FinishedHeight`] to the ExEx task manager letting it know that this + /// ExEx has processed the corresponding block. + /// + /// Returns an error if the channel was closed (ExEx task manager panicked). + pub fn send_finished_height( + &self, + height: BlockNumHash, + ) -> Result<(), SendError> { + self.events.send(ExExEvent::FinishedHeight(height)).map_err(|_| SendError(height)) + } } #[cfg(test)] mod tests { + use crate::ExExContext; use reth_exex_types::ExExHead; use reth_node_api::FullNodeComponents; use reth_provider::BlockReader; - use crate::ExExContext; - /// #[test] const fn issue_12054() { diff --git a/crates/exex/exex/src/lib.rs b/crates/exex/exex/src/lib.rs index ce6641ff6734..d5da6a18faa5 100644 --- a/crates/exex/exex/src/lib.rs +++ b/crates/exex/exex/src/lib.rs @@ -1,11 +1,11 @@ -// todo: expand this (examples, assumptions, invariants) //! Execution extensions (`ExEx`). //! -//! An execution extension is a task that derives its state from Reth's state. +//! An execution extension is a task that listens to state changes of the node. //! //! Some examples of such state derives are rollups, bridges, and indexers. //! -//! An `ExEx` is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside Reth. +//! An `ExEx` is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside the +//! node. //! //! `ExEx`'s are initialized using an async closure that resolves to the `ExEx`; this closure gets //! passed an [`ExExContext`] where it is possible to spawn additional tasks and modify Reth. @@ -23,6 +23,60 @@ //! event. To clarify: if the `ExEx` emits `ExExEvent::FinishedHeight(0)` it will receive //! notifications for any `block_number > 0`. //! +//! # Examples, Assumptions, and Invariants +//! +//! ## Examples +//! +//! ### Simple Indexer ExEx +//! ```no_run +//! use alloy_consensus::BlockHeader; +//! use futures::StreamExt; +//! use reth_exex::ExExContext; +//! use reth_node_api::FullNodeComponents; +//! use reth_provider::CanonStateNotification; +//! +//! async fn my_indexer( +//! mut ctx: ExExContext, +//! ) -> Result<(), Box> { +//! // Subscribe to canonical state notifications +//! +//! while let Some(Ok(notification)) = ctx.notifications.next().await { +//! if let Some(committed) = notification.committed_chain() { +//! for block in committed.blocks_iter() { +//! // Index or process block data +//! println!("Processed block: {}", block.number()); +//! } +//! +//! // Signal completion for pruning +//! ctx.send_finished_height(committed.tip().num_hash()); +//! } +//! } +//! +//! Ok(()) +//! } +//! ``` +//! +//! ## Assumptions +//! +//! - `ExExs` run indefinitely alongside Reth +//! - `ExExs` receive canonical state notifications for block execution +//! - `ExExs` should handle potential network or database errors gracefully +//! - `ExExs` must emit `FinishedHeight` events for proper state pruning +//! +//! ## Invariants +//! +//! - An ExEx must not block the main Reth execution +//! - Notifications are processed in canonical order +//! - `ExExs` should be able to recover from temporary failures +//! - Memory and resource usage must be controlled +//! +//! ## Performance Considerations +//! +//! - Minimize blocking operations +//! - Use efficient data structures for state tracking +//! - Implement proper error handling and logging +//! - Consider batching operations for better performance +//! //! [`Future`]: std::future::Future //! [`ExExContext`]: crate::ExExContext //! [`CanonStateNotification`]: reth_provider::CanonStateNotification diff --git a/crates/exex/types/src/head.rs b/crates/exex/types/src/head.rs index 8863ab327d06..a4b9f7e7bf0a 100644 --- a/crates/exex/types/src/head.rs +++ b/crates/exex/types/src/head.rs @@ -8,3 +8,10 @@ pub struct ExExHead { /// The head block. pub block: BlockNumHash, } + +impl ExExHead { + /// Creates a new instance for the given head block. + pub const fn new(block: BlockNumHash) -> Self { + Self { block } + } +}