{
+ /// Provider for querying headers.
+ provider: P,
+
/// Handles to communicate with the `ExEx`'s.
exex_handles: Vec,
@@ -217,7 +223,7 @@ pub struct ExExManager {
metrics: ExExManagerMetrics,
}
-impl ExExManager {
+impl
ExExManager
{
/// Create a new [`ExExManager`].
///
/// You must provide an [`ExExHandle`] for each `ExEx` and the maximum capacity of the
@@ -226,6 +232,7 @@ impl ExExManager {
/// When the capacity is exceeded (which can happen if an `ExEx` is slow) no one can send
/// notifications over [`ExExManagerHandle`]s until there is capacity again.
pub fn new(
+ provider: P,
handles: Vec,
max_capacity: usize,
wal: Wal,
@@ -248,6 +255,8 @@ impl ExExManager {
metrics.num_exexs.set(num_exexs as f64);
Self {
+ provider,
+
exex_handles: handles,
handle_rx,
@@ -303,83 +312,152 @@ impl ExExManager {
}
}
-impl Future for ExExManager {
+impl
ExExManager
+where
+ P: HeaderProvider,
+{
+ /// Finalizes the WAL according to the passed finalized header.
+ ///
+ /// This function checks if all ExExes are on the canonical chain and finalizes the WAL if
+ /// necessary.
+ fn finalize_wal(&self, finalized_header: SealedHeader) -> eyre::Result<()> {
+ debug!(header = ?finalized_header.num_hash(), "Received finalized header");
+
+ // Check if all ExExes are on the canonical chain
+ let exex_finished_heights = self
+ .exex_handles
+ .iter()
+ // Get ExEx ID and hash of the finished height for each ExEx
+ .map(|exex_handle| {
+ (&exex_handle.id, exex_handle.finished_height.map(|block| block.hash))
+ })
+ // Deduplicate all hashes
+ .unique_by(|(_, hash)| *hash)
+ // Check if hashes are canonical
+ .map(|(exex_id, hash)| {
+ hash.map_or(Ok((exex_id, hash, false)), |hash| {
+ self.provider
+ .is_known(&hash)
+ // Save the ExEx ID, hash of the finished height, and whether the hash
+ // is canonical
+ .map(|is_canonical| (exex_id, Some(hash), is_canonical))
+ })
+ })
+ // We collect here to be able to log the unfinalized ExExes below
+ .collect::, _>>()?;
+ if exex_finished_heights.iter().all(|(_, _, is_canonical)| *is_canonical) {
+ // If there is a finalized header and all ExExs are on the canonical chain, finalize
+ // the WAL with the new finalized header
+ self.wal.finalize(finalized_header.num_hash())?;
+ } else {
+ let unfinalized_exexes = exex_finished_heights
+ .into_iter()
+ .filter_map(|(exex_id, hash, is_canonical)| {
+ is_canonical.not().then_some((exex_id, hash))
+ })
+ .format_with(", ", |(exex_id, hash), f| f(&format_args!("{exex_id:?} = {hash:?}")));
+ debug!(
+ %unfinalized_exexes,
+ "Not all ExExes are on the canonical chain, can't finalize the WAL"
+ );
+ }
+
+ Ok(())
+ }
+}
+
+impl
Future for ExExManager
+where
+ P: HeaderProvider + Unpin + 'static,
+{
type Output = eyre::Result<()>;
- fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
- // Drain the finalized header stream and grab the last finalized header
+ /// Main loop of the [`ExExManager`]. The order of operations is as follows:
+ /// 1. Handle incoming ExEx events. We do it before finalizing the WAL, because it depends on
+ /// the latest state of [`ExExEvent::FinishedHeight`] events.
+ /// 2. Finalize the WAL with the finalized header, if necessary.
+ /// 3. Drain [`ExExManagerHandle`] notifications, push them to the internal buffer and update
+ /// the internal buffer capacity.
+ /// 5. Send notifications from the internal buffer to those ExExes that are ready to receive new
+ /// notifications.
+ /// 5. Remove notifications from the internal buffer that have been sent to **all** ExExes and
+ /// update the internal buffer capacity.
+ /// 6. Update the channel with the lowest [`FinishedExExHeight`] among all ExExes.
+ fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll {
+ let this = self.get_mut();
+
+ // Handle incoming ExEx events
+ for exex in &mut this.exex_handles {
+ while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
+ debug!(exex_id = %exex.id, ?event, "Received event from ExEx");
+ exex.metrics.events_sent_total.increment(1);
+ match event {
+ ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
+ }
+ }
+ }
+
+ // Drain the finalized header stream and finalize the WAL with the last header
let mut last_finalized_header = None;
- while let Poll::Ready(finalized_header) = self.finalized_header_stream.poll_next_unpin(cx) {
+ while let Poll::Ready(finalized_header) = this.finalized_header_stream.poll_next_unpin(cx) {
last_finalized_header = finalized_header;
}
- // If there is a finalized header, finalize the WAL with it
if let Some(header) = last_finalized_header {
- self.wal.finalize((header.number, header.hash()).into())?;
+ this.finalize_wal(header)?;
}
- // drain handle notifications
- while self.buffer.len() < self.max_capacity {
- if let Poll::Ready(Some(notification)) = self.handle_rx.poll_recv(cx) {
+ // Drain handle notifications
+ while this.buffer.len() < this.max_capacity {
+ if let Poll::Ready(Some(notification)) = this.handle_rx.poll_recv(cx) {
debug!(
committed_tip = ?notification.committed_chain().map(|chain| chain.tip().number),
reverted_tip = ?notification.reverted_chain().map(|chain| chain.tip().number),
"Received new notification"
);
- self.push_notification(notification);
+ this.push_notification(notification);
continue
}
break
}
- // update capacity
- self.update_capacity();
+ // Update capacity
+ this.update_capacity();
- // advance all poll senders
+ // Advance all poll senders
let mut min_id = usize::MAX;
- for idx in (0..self.exex_handles.len()).rev() {
- let mut exex = self.exex_handles.swap_remove(idx);
+ for idx in (0..this.exex_handles.len()).rev() {
+ let mut exex = this.exex_handles.swap_remove(idx);
- // it is a logic error for this to ever underflow since the manager manages the
+ // It is a logic error for this to ever underflow since the manager manages the
// notification IDs
let notification_index = exex
.next_notification_id
- .checked_sub(self.min_id)
+ .checked_sub(this.min_id)
.expect("exex expected notification ID outside the manager's range");
- if let Some(notification) = self.buffer.get(notification_index) {
+ if let Some(notification) = this.buffer.get(notification_index) {
if let Poll::Ready(Err(err)) = exex.send(cx, notification) {
- // the channel was closed, which is irrecoverable for the manager
+ // The channel was closed, which is irrecoverable for the manager
return Poll::Ready(Err(err.into()))
}
}
min_id = min_id.min(exex.next_notification_id);
- self.exex_handles.push(exex);
+ this.exex_handles.push(exex);
}
- // remove processed buffered notifications
+ // Remove processed buffered notifications
debug!(%min_id, "Updating lowest notification id in buffer");
- self.buffer.retain(|&(id, _)| id >= min_id);
- self.min_id = min_id;
+ this.buffer.retain(|&(id, _)| id >= min_id);
+ this.min_id = min_id;
- // update capacity
- self.update_capacity();
-
- // handle incoming exex events
- for exex in &mut self.exex_handles {
- while let Poll::Ready(Some(event)) = exex.receiver.poll_recv(cx) {
- debug!(exex_id = %exex.id, ?event, "Received event from exex");
- exex.metrics.events_sent_total.increment(1);
- match event {
- ExExEvent::FinishedHeight(height) => exex.finished_height = Some(height),
- }
- }
- }
+ // Update capacity
+ this.update_capacity();
- // update watch channel block number
- let finished_height = self.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
+ // Update watch channel block number
+ let finished_height = this.exex_handles.iter_mut().try_fold(u64::MAX, |curr, exex| {
exex.finished_height.map_or(Err(()), |height| Ok(height.number.min(curr)))
});
if let Ok(finished_height) = finished_height {
- let _ = self.finished_height.send(FinishedExExHeight::Height(finished_height));
+ let _ = this.finished_height.send(FinishedExExHeight::Height(finished_height));
}
Poll::Pending
@@ -511,8 +589,9 @@ mod tests {
use alloy_primitives::B256;
use eyre::OptionExt;
use futures::StreamExt;
+ use rand::Rng;
use reth_primitives::SealedBlockWithSenders;
- use reth_provider::Chain;
+ use reth_provider::{test_utils::create_test_provider_factory, BlockWriter, Chain};
use reth_testing_utils::generators::{self, random_block};
fn empty_finalized_header_stream() -> ForkChoiceStream {
@@ -545,11 +624,11 @@ mod tests {
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
- assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
+ assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_exexs());
- assert!(ExExManager::new(vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
+ assert!(ExExManager::new((), vec![exex_handle_1], 0, wal, empty_finalized_header_stream())
.handle
.has_exexs());
}
@@ -562,13 +641,19 @@ mod tests {
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
- assert!(!ExExManager::new(vec![], 0, wal.clone(), empty_finalized_header_stream())
+ assert!(!ExExManager::new((), vec![], 0, wal.clone(), empty_finalized_header_stream())
.handle
.has_capacity());
- assert!(ExExManager::new(vec![exex_handle_1], 10, wal, empty_finalized_header_stream())
- .handle
- .has_capacity());
+ assert!(ExExManager::new(
+ (),
+ vec![exex_handle_1],
+ 10,
+ wal,
+ empty_finalized_header_stream()
+ )
+ .handle
+ .has_capacity());
}
#[test]
@@ -581,7 +666,7 @@ mod tests {
// Create a mock ExExManager and add the exex_handle to it
let mut exex_manager =
- ExExManager::new(vec![exex_handle], 10, wal, empty_finalized_header_stream());
+ ExExManager::new((), vec![exex_handle], 10, wal, empty_finalized_header_stream());
// Define the notification for testing
let mut block1 = SealedBlockWithSenders::default();
@@ -631,8 +716,13 @@ mod tests {
// Create a mock ExExManager and add the exex_handle to it
let max_capacity = 5;
- let mut exex_manager =
- ExExManager::new(vec![exex_handle], max_capacity, wal, empty_finalized_header_stream());
+ let mut exex_manager = ExExManager::new(
+ (),
+ vec![exex_handle],
+ max_capacity,
+ wal,
+ empty_finalized_header_stream(),
+ );
// Push some notifications to fill part of the buffer
let mut block1 = SealedBlockWithSenders::default();
@@ -665,6 +755,8 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
+ let provider_factory = create_test_provider_factory();
+
let (exex_handle, event_tx, mut _notification_rx) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
@@ -677,6 +769,7 @@ mod tests {
// Create a mock ExExManager and add the exex_handle to it
let exex_manager = ExExManager::new(
+ provider_factory,
vec![exex_handle],
10,
Wal::new(temp_dir.path()).unwrap(),
@@ -711,6 +804,8 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
+ let provider_factory = create_test_provider_factory();
+
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
@@ -725,6 +820,7 @@ mod tests {
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
let exex_manager = ExExManager::new(
+ provider_factory,
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
@@ -755,6 +851,8 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
+ let provider_factory = create_test_provider_factory();
+
// Create two `ExExHandle` instances
let (exex_handle1, event_tx1, _) =
ExExHandle::new("test_exex1".to_string(), Head::default(), (), (), wal.handle());
@@ -772,6 +870,7 @@ mod tests {
event_tx2.send(ExExEvent::FinishedHeight(block2)).unwrap();
let exex_manager = ExExManager::new(
+ provider_factory,
vec![exex_handle1, exex_handle2],
10,
Wal::new(temp_dir.path()).unwrap(),
@@ -806,12 +905,15 @@ mod tests {
let temp_dir = tempfile::tempdir().unwrap();
let wal = Wal::new(temp_dir.path()).unwrap();
+ let provider_factory = create_test_provider_factory();
+
let (exex_handle_1, _, _) =
ExExHandle::new("test_exex_1".to_string(), Head::default(), (), (), wal.handle());
// Create an ExExManager with a small max capacity
let max_capacity = 2;
let mut exex_manager = ExExManager::new(
+ provider_factory,
vec![exex_handle_1],
max_capacity,
Wal::new(temp_dir.path()).unwrap(),
@@ -1008,37 +1110,75 @@ mod tests {
async fn test_exex_wal_finalize() -> eyre::Result<()> {
reth_tracing::init_test_tracing();
+ let mut rng = generators::rng();
+
let temp_dir = tempfile::tempdir().unwrap();
let mut wal = Wal::new(temp_dir.path()).unwrap();
- let block = random_block(&mut generators::rng(), 0, Default::default())
+ let provider_factory = create_test_provider_factory();
+
+ let block = random_block(&mut rng, 0, Default::default())
.seal_with_senders()
.ok_or_eyre("failed to recover senders")?;
+ let provider_rw = provider_factory.provider_rw()?;
+ provider_rw.insert_block(block.clone())?;
+ provider_rw.commit()?;
+
let notification = ExExNotification::ChainCommitted {
new: Arc::new(Chain::new(vec![block.clone()], Default::default(), None)),
};
wal.commit(¬ification)?;
- let (tx, rx) = watch::channel(None);
+ let (finalized_headers_tx, rx) = watch::channel(None);
let finalized_header_stream = ForkChoiceStream::new(rx);
- let (exex_handle, _, _) =
+ let (exex_handle, events_tx, _) =
ExExHandle::new("test_exex".to_string(), Head::default(), (), (), wal.handle());
- let mut exex_manager =
- std::pin::pin!(ExExManager::new(vec![exex_handle], 1, wal, finalized_header_stream));
+ let mut exex_manager = std::pin::pin!(ExExManager::new(
+ provider_factory,
+ vec![exex_handle],
+ 1,
+ wal,
+ finalized_header_stream
+ ));
let mut cx = Context::from_waker(futures::task::noop_waker_ref());
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
+ assert_eq!(
+ exex_manager.wal.iter_notifications()?.collect::>>()?,
+ [notification.clone()]
+ );
+
+ finalized_headers_tx.send(Some(block.header.clone()))?;
+ assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
+ // WAL isn't finalized because the ExEx didn't emit the `FinishedHeight` event
+ assert_eq!(
+ exex_manager.wal.iter_notifications()?.collect::>>()?,
+ [notification.clone()]
+ );
+
+ // Send a `FinishedHeight` event with a non-canonical block
+ events_tx
+ .send(ExExEvent::FinishedHeight((rng.gen::(), rng.gen::()).into()))
+ .unwrap();
+
+ finalized_headers_tx.send(Some(block.header.clone()))?;
+ assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
+ // WAL isn't finalized because the ExEx emitted a `FinishedHeight` event with a
+ // non-canonical block
assert_eq!(
exex_manager.wal.iter_notifications()?.collect::>>()?,
[notification]
);
- tx.send(Some(block.header.clone()))?;
+ // Send a `FinishedHeight` event with a canonical block
+ events_tx.send(ExExEvent::FinishedHeight(block.num_hash())).unwrap();
+ finalized_headers_tx.send(Some(block.header.clone()))?;
assert!(exex_manager.as_mut().poll(&mut cx).is_pending());
+ // WAL is finalized
assert!(exex_manager.wal.iter_notifications()?.next().is_none());
Ok(())
diff --git a/crates/exex/exex/src/notifications.rs b/crates/exex/exex/src/notifications.rs
index 369a0586c0c5..9e9ee78e6cd7 100644
--- a/crates/exex/exex/src/notifications.rs
+++ b/crates/exex/exex/src/notifications.rs
@@ -179,7 +179,7 @@ where
/// If the head block is not found in the database, it means we're not on the canonical chain
/// and we need to revert the notification with the ExEx head block.
fn check_canonical(&mut self) -> eyre::Result