Skip to content

Commit

Permalink
use provider to initialize persistence state
Browse files Browse the repository at this point in the history
  • Loading branch information
Rjected committed Jul 25, 2024
1 parent de16178 commit e82a424
Showing 1 changed file with 28 additions and 23 deletions.
51 changes: 28 additions & 23 deletions crates/engine/tree/src/tree.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ where
state: EngineApiTreeState,
canonical_in_memory_state: CanonicalInMemoryState,
persistence: PersistenceHandle,
persistence_state: PersistenceState,
payload_builder: PayloadBuilderHandle<T>,
) -> Self {
Self {
Expand All @@ -424,7 +425,7 @@ where
incoming,
outgoing,
persistence,
persistence_state: PersistenceState::default(),
persistence_state,
is_backfill_active: false,
state,
canonical_in_memory_state,
Expand All @@ -449,6 +450,12 @@ where
let best_block_number = provider.best_block_number().unwrap_or(0);
let header = provider.sealed_header(best_block_number).ok().flatten().unwrap_or_default();

let persistence_state = PersistenceState {
last_persisted_block_hash: header.hash(),
last_persisted_block_number: best_block_number,
rx: None,
};

let (tx, outgoing) = tokio::sync::mpsc::unbounded_channel();
let state = EngineApiTreeState::new(
DEFAULT_BLOCK_BUFFER_LIMIT,
Expand All @@ -466,6 +473,7 @@ where
state,
canonical_in_memory_state,
persistence,
persistence_state,
payload_builder,
);
std::thread::Builder::new().name("Tree Task".to_string()).spawn(|| task.run()).unwrap();
Expand Down Expand Up @@ -663,25 +671,13 @@ where
/// Returns true if the canonical chain length minus the last persisted
/// block is greater than or equal to the persistence threshold.
fn should_persist(&self) -> bool {
let min_block = if let Some(last_persisted_block_number) =
self.persistence_state.last_persisted_block_number
{
last_persisted_block_number
} else {
self.state.tree_state.min_block_number()
};
let min_block = self.persistence_state.last_persisted_block_number;

self.state.tree_state.max_block_number() - min_block >= PERSISTENCE_THRESHOLD
}

fn get_blocks_to_persist(&self) -> Vec<ExecutedBlock> {
let start = if let Some(last_persisted_block_number) =
self.persistence_state.last_persisted_block_number
{
last_persisted_block_number
} else {
self.state.tree_state.min_block_number()
};
let start = self.persistence_state.last_persisted_block_number;
let end = start + PERSISTENCE_THRESHOLD;

// NOTE: this is an exclusive range, to try to include exactly PERSISTENCE_THRESHOLD blocks
Expand All @@ -703,7 +699,7 @@ where
fn on_new_persisted_block(&mut self) {
self.remove_persisted_blocks_from_tree_state();
self.canonical_in_memory_state
.remove_persisted_blocks(self.persistence_state.last_persisted_block_number.unwrap());
.remove_persisted_blocks(self.persistence_state.last_persisted_block_number);
}

/// Clears persisted blocks from the in-memory tree state.
Expand All @@ -714,7 +710,7 @@ where
.state
.tree_state
.blocks_by_number
.range(..=self.persistence_state.last_persisted_block_number.unwrap())
.range(..=self.persistence_state.last_persisted_block_number)
.map(|(&k, _)| k)
.collect();

Expand Down Expand Up @@ -1535,18 +1531,18 @@ where

/// The state of the persistence task.
#[derive(Default, Debug)]
struct PersistenceState {
pub struct PersistenceState {
/// Hash of the last block persisted.
///
/// A `None` value means no persistence task has been completed yet.
last_persisted_block_hash: Option<B256>,
last_persisted_block_hash: B256,
/// Receiver end of channel where the result of the persistence task will be
/// sent when done. A None value means there's no persistence task in progress.
rx: Option<oneshot::Receiver<B256>>,
/// The last persisted block number.
///
/// A `None` value means no persistence task has been completed yet
last_persisted_block_number: Option<u64>,
last_persisted_block_number: u64,
}

impl PersistenceState {
Expand All @@ -1564,8 +1560,8 @@ impl PersistenceState {
/// Sets state for a finished persistence task.
fn finish(&mut self, last_persisted_block_hash: B256, last_persisted_block_number: u64) {
self.rx = None;
self.last_persisted_block_number = Some(last_persisted_block_number);
self.last_persisted_block_hash = Some(last_persisted_block_hash);
self.last_persisted_block_number = last_persisted_block_number;
self.last_persisted_block_hash = last_persisted_block_hash;
}
}

Expand Down Expand Up @@ -1632,6 +1628,7 @@ mod tests {
engine_api_tree_state,
canonical_in_memory_state,
persistence_handle,
PersistenceState::default(),
payload_builder,
);

Expand Down Expand Up @@ -1687,6 +1684,14 @@ mod tests {

let header = blocks.first().unwrap().block().header.clone();
let canonical_in_memory_state = CanonicalInMemoryState::with_head(header);
let last_executed_block = blocks.last().unwrap().clone();
let last_header = last_executed_block.block().header();

let persistence_state = PersistenceState {
last_persisted_block_number: last_header.number,
last_persisted_block_hash: last_header.hash_slow(),
rx: None,
};

let (to_payload_service, payload_command_rx) = unbounded_channel();
let payload_builder = PayloadBuilderHandle::new(to_payload_service);
Expand All @@ -1700,9 +1705,9 @@ mod tests {
engine_api_tree_state,
canonical_in_memory_state,
persistence_handle,
persistence_state,
payload_builder,
);
let last_executed_block = blocks.last().unwrap().clone();
let pending = Some(BlockState::new(last_executed_block));
tree.canonical_in_memory_state =
CanonicalInMemoryState::new(state_by_hash, hash_by_number, pending);
Expand Down

0 comments on commit e82a424

Please sign in to comment.