Skip to content

Commit

Permalink
docs(exex): Expand documentation with examples, assumptions, and inva…
Browse files Browse the repository at this point in the history
…riants. (#13581)

Co-authored-by: Matthias Seitz <matthias.seitz@outlook.de>
  • Loading branch information
tiendn and mattsse authored Jan 28, 2025
1 parent 2e4376f commit 42dc1ed
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 6 deletions.
20 changes: 17 additions & 3 deletions crates/exex/exex/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ use reth_node_core::node_config::NodeConfig;
use reth_provider::BlockReader;
use reth_tasks::TaskExecutor;
use std::fmt::Debug;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::mpsc::{error::SendError, UnboundedSender};

/// Captures the context that an `ExEx` has access to.
///
/// This type wraps various node components that the `ExEx` has access to.
pub struct ExExContext<Node: FullNodeComponents> {
/// The current head of the blockchain at launch.
pub head: BlockNumHash,
Expand Down Expand Up @@ -103,6 +105,8 @@ where
}

/// Returns the task executor.
///
/// This type should be used to spawn (critical) tasks.
pub fn task_executor(&self) -> &TaskExecutor {
self.components.task_executor()
}
Expand All @@ -118,16 +122,26 @@ where
pub fn set_notifications_with_head(&mut self, head: ExExHead) {
self.notifications.set_with_head(head);
}

/// Sends an [`ExExEvent::FinishedHeight`] to the ExEx task manager letting it know that this
/// ExEx has processed the corresponding block.
///
/// Returns an error if the channel was closed (ExEx task manager panicked).
pub fn send_finished_height(
&self,
height: BlockNumHash,
) -> Result<(), SendError<BlockNumHash>> {
self.events.send(ExExEvent::FinishedHeight(height)).map_err(|_| SendError(height))
}
}

#[cfg(test)]
mod tests {
use crate::ExExContext;
use reth_exex_types::ExExHead;
use reth_node_api::FullNodeComponents;
use reth_provider::BlockReader;

use crate::ExExContext;

/// <https://github.com/paradigmxyz/reth/issues/12054>
#[test]
const fn issue_12054() {
Expand Down
60 changes: 57 additions & 3 deletions crates/exex/exex/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
// todo: expand this (examples, assumptions, invariants)
//! Execution extensions (`ExEx`).
//!
//! An execution extension is a task that derives its state from Reth's state.
//! An execution extension is a task that listens to state changes of the node.
//!
//! Some examples of such state derives are rollups, bridges, and indexers.
//!
//! An `ExEx` is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside Reth.
//! An `ExEx` is a [`Future`] resolving to a `Result<()>` that is run indefinitely alongside the
//! node.
//!
//! `ExEx`'s are initialized using an async closure that resolves to the `ExEx`; this closure gets
//! passed an [`ExExContext`] where it is possible to spawn additional tasks and modify Reth.
Expand All @@ -23,6 +23,60 @@
//! event. To clarify: if the `ExEx` emits `ExExEvent::FinishedHeight(0)` it will receive
//! notifications for any `block_number > 0`.
//!
//! # Examples, Assumptions, and Invariants
//!
//! ## Examples
//!
//! ### Simple Indexer ExEx
//! ```no_run
//! use alloy_consensus::BlockHeader;
//! use futures::StreamExt;
//! use reth_exex::ExExContext;
//! use reth_node_api::FullNodeComponents;
//! use reth_provider::CanonStateNotification;
//!
//! async fn my_indexer<N: FullNodeComponents>(
//! mut ctx: ExExContext<N>,
//! ) -> Result<(), Box<dyn std::error::Error>> {
//! // Subscribe to canonical state notifications
//!
//! while let Some(Ok(notification)) = ctx.notifications.next().await {
//! if let Some(committed) = notification.committed_chain() {
//! for block in committed.blocks_iter() {
//! // Index or process block data
//! println!("Processed block: {}", block.number());
//! }
//!
//! // Signal completion for pruning
//! ctx.send_finished_height(committed.tip().num_hash());
//! }
//! }
//!
//! Ok(())
//! }
//! ```
//!
//! ## Assumptions
//!
//! - `ExExs` run indefinitely alongside Reth
//! - `ExExs` receive canonical state notifications for block execution
//! - `ExExs` should handle potential network or database errors gracefully
//! - `ExExs` must emit `FinishedHeight` events for proper state pruning
//!
//! ## Invariants
//!
//! - An ExEx must not block the main Reth execution
//! - Notifications are processed in canonical order
//! - `ExExs` should be able to recover from temporary failures
//! - Memory and resource usage must be controlled
//!
//! ## Performance Considerations
//!
//! - Minimize blocking operations
//! - Use efficient data structures for state tracking
//! - Implement proper error handling and logging
//! - Consider batching operations for better performance
//!
//! [`Future`]: std::future::Future
//! [`ExExContext`]: crate::ExExContext
//! [`CanonStateNotification`]: reth_provider::CanonStateNotification
Expand Down
7 changes: 7 additions & 0 deletions crates/exex/types/src/head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,10 @@ pub struct ExExHead {
/// The head block.
pub block: BlockNumHash,
}

impl ExExHead {
/// Creates a new instance for the given head block.
pub const fn new(block: BlockNumHash) -> Self {
Self { block }
}
}

0 comments on commit 42dc1ed

Please sign in to comment.