From 93ab09821b0d24aa56f58a1386c901fa4d782bd7 Mon Sep 17 00:00:00 2001 From: Dan Cline <6798349+Rjected@users.noreply.github.com> Date: Thu, 1 Aug 2024 05:59:35 -0400 Subject: [PATCH] feat: combine block writing in persistence task (#9960) Co-authored-by: joshieDo <93316087+joshieDo@users.noreply.github.com> Co-authored-by: Federico Gimenez --- crates/engine/tree/src/persistence.rs | 175 +++++++++++--------------- 1 file changed, 71 insertions(+), 104 deletions(-) diff --git a/crates/engine/tree/src/persistence.rs b/crates/engine/tree/src/persistence.rs index 99cbab79b995..316e9330b8aa 100644 --- a/crates/engine/tree/src/persistence.rs +++ b/crates/engine/tree/src/persistence.rs @@ -3,11 +3,12 @@ use reth_chain_state::ExecutedBlock; use reth_db::{models::CompactU256, tables, transaction::DbTxMut, Database}; use reth_errors::ProviderResult; -use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256, U256}; +use reth_primitives::{SealedBlock, StaticFileSegment, TransactionSignedNoHash, B256}; use reth_provider::{ - writer::StorageWriter, BlockExecutionWriter, BlockNumReader, BlockWriter, DatabaseProviderRW, - HistoryWriter, OriginalValuesKnown, ProviderFactory, StageCheckpointWriter, StateChangeWriter, - StateWriter, StaticFileProviderFactory, StaticFileWriter, TransactionsProviderExt, TrieWriter, + providers::StaticFileProvider, writer::StorageWriter, BlockExecutionWriter, BlockNumReader, + BlockWriter, DatabaseProviderRW, HistoryWriter, OriginalValuesKnown, ProviderFactory, + StageCheckpointWriter, StateChangeWriter, StateWriter, StaticFileProviderFactory, + StaticFileWriter, TransactionsProviderExt, TrieWriter, }; use reth_prune::{Pruner, PrunerOutput}; use reth_stages_types::{StageCheckpoint, StageId}; @@ -45,64 +46,6 @@ impl PersistenceService { Self { provider, incoming, pruner } } - /// Writes the cloned tree state to database - fn write( - &self, - blocks: &[ExecutedBlock], - provider_rw: &DatabaseProviderRW, - ) -> ProviderResult<()> { - if blocks.is_empty() { - debug!(target: "tree::persistence", "Attempted to write empty block range"); - return Ok(()) - } - - debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database"); - let first_number = blocks.first().unwrap().block().number; - - let last = blocks.last().unwrap().block(); - let last_block_number = last.number; - - // TODO: remove all the clones and do performant / batched writes for each type of object - // instead of a loop over all blocks, - // meaning: - // * blocks - // * state - // * hashed state - // * trie updates (cannot naively extend, need helper) - // * indices (already done basically) - // Insert the blocks - for block in blocks { - let sealed_block = - block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); - provider_rw.insert_block(sealed_block)?; - - // Write state and changesets to the database. - // Must be written after blocks because of the receipt lookup. - let execution_outcome = block.execution_outcome().clone(); - // TODO: do we provide a static file producer here? - let mut storage_writer = StorageWriter::new(Some(provider_rw), None); - storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?; - - // insert hashes and intermediate merkle nodes - { - let trie_updates = block.trie_updates().clone(); - let hashed_state = block.hashed_state(); - provider_rw.write_hashed_state(&hashed_state.clone().into_sorted())?; - provider_rw.write_trie_updates(&trie_updates)?; - } - } - - // update history indices - provider_rw.update_history_indices(first_number..=last_block_number)?; - - // Update pipeline progress - provider_rw.update_pipeline_stages(last_block_number, false)?; - - debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data"); - - Ok(()) - } - /// Removes block data above the given block number from the database. /// This is exclusive, i.e., it only removes blocks above `block_number`, and does not remove /// `block_number`. @@ -126,42 +69,22 @@ impl PersistenceService { self.pruner.run(block_num).expect("todo: handle errors") } - /// Updates checkpoints related to block headers and bodies. This should be called after new - /// transactions have been successfully written to disk. - fn update_transaction_meta( - &self, - block_num: u64, - td: U256, - provider_rw: &DatabaseProviderRW, - ) -> ProviderResult<()> { - debug!(target: "tree::persistence", ?block_num, "Updating transaction metadata after writing"); - provider_rw - .tx_ref() - .put::(block_num, CompactU256(td))?; - provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block_num))?; - provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block_num))?; - Ok(()) - } - /// Writes the transactions to static files. /// /// Returns the block number and new total difficulty. - /// - /// The [`update_transaction_meta`](Self::update_transaction_meta) method should be called - /// after this, to update the checkpoints for headers and block bodies. #[instrument(level = "trace", skip_all, fields(block = ?block.num_hash()) target = "engine")] fn write_transactions( &self, block: Arc, provider_rw: &DatabaseProviderRW, - ) -> ProviderResult<(u64, U256)> { + ) -> ProviderResult<()> { debug!(target: "tree::persistence", "Writing transactions"); let provider = self.provider.static_file_provider(); - let new_td = { + let td = { let header_writer = provider.get_writer(block.number, StaticFileSegment::Headers)?; let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(header_writer)); - let new_td = storage_writer.append_headers_from_blocks( + let td = storage_writer.append_headers_from_blocks( block.header().number, std::iter::once(&(block.header(), block.hash())), )?; @@ -177,33 +100,41 @@ impl PersistenceService { std::iter::once(&no_hash_transactions), )?; - new_td + td }; - Ok((block.number, new_td)) + debug!(target: "tree::persistence", block_num=block.number, "Updating transaction metadata after writing"); + provider_rw + .tx_ref() + .put::(block.number, CompactU256(td))?; + provider_rw.save_stage_checkpoint(StageId::Headers, StageCheckpoint::new(block.number))?; + provider_rw.save_stage_checkpoint(StageId::Bodies, StageCheckpoint::new(block.number))?; + + Ok(()) } - /// Write execution-related block data to database and/or static files. - fn write_execution_data( + /// Writes the cloned tree state to database + fn save_blocks( &self, blocks: &[ExecutedBlock], provider_rw: &DatabaseProviderRW, + static_file_provider: &StaticFileProvider, ) -> ProviderResult<()> { if blocks.is_empty() { + debug!(target: "tree::persistence", "Attempted to write empty block range"); return Ok(()) } - let provider = self.provider.static_file_provider(); // NOTE: checked non-empty above let first_block = blocks.first().unwrap().block(); let last_block = blocks.last().unwrap().block().clone(); - // use the storage writer + // use the storage writer to write receipts let current_block = first_block.number; debug!(target: "tree::persistence", len=blocks.len(), ?current_block, "Writing execution data to static files"); let receipts_writer = - provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; + static_file_provider.get_writer(first_block.number, StaticFileSegment::Receipts)?; { let mut storage_writer = StorageWriter::new(Some(provider_rw), Some(receipts_writer)); @@ -215,6 +146,50 @@ impl PersistenceService { storage_writer.append_receipts_from_blocks(current_block, receipts_iter)?; } + debug!(target: "tree::persistence", block_count = %blocks.len(), "Writing blocks to database"); + let first_number = first_block.number; + + let last_block_number = last_block.number; + + // TODO: remove all the clones and do performant / batched writes for each type of object + // instead of a loop over all blocks, + // meaning: + // * blocks + // * state + // * hashed state + // * trie updates (cannot naively extend, need helper) + // * indices (already done basically) + // Insert the blocks + for block in blocks { + let sealed_block = + block.block().clone().try_with_senders_unchecked(block.senders().clone()).unwrap(); + provider_rw.insert_block(sealed_block)?; + self.write_transactions(block.block.clone(), provider_rw)?; + + // Write state and changesets to the database. + // Must be written after blocks because of the receipt lookup. + let execution_outcome = block.execution_outcome().clone(); + // TODO: do we provide a static file producer here? + let mut storage_writer = StorageWriter::new(Some(provider_rw), None); + storage_writer.write_to_storage(execution_outcome, OriginalValuesKnown::No)?; + + // insert hashes and intermediate merkle nodes + { + let trie_updates = block.trie_updates().clone(); + let hashed_state = block.hashed_state(); + provider_rw.write_hashed_state(&hashed_state.clone().into_sorted())?; + provider_rw.write_trie_updates(&trie_updates)?; + } + } + + // update history indices + provider_rw.update_history_indices(first_number..=last_block_number)?; + + // Update pipeline progress + provider_rw.update_pipeline_stages(last_block_number, false)?; + + debug!(target: "tree::persistence", range = ?first_number..=last_block_number, "Appended block data"); + Ok(()) } @@ -289,19 +264,11 @@ where let last_block_hash = blocks.last().unwrap().block().hash(); let provider_rw = self.provider.provider_rw().expect("todo: handle errors"); - self.write_execution_data(&blocks, &provider_rw).expect("todo: handle errors"); - self.write(&blocks, &provider_rw).expect("todo: handle errors"); - - for block in &blocks { - // first write transactions - let (block_num, td) = self - .write_transactions(block.block.clone(), &provider_rw) - .expect("todo: handle errors"); - self.update_transaction_meta(block_num, td, &provider_rw) - .expect("todo: handle errors"); - } + let static_file_provider = self.provider.static_file_provider(); + self.save_blocks(&blocks, &provider_rw, &static_file_provider) + .expect("todo: handle errors"); - self.provider.static_file_provider().commit().expect("todo: handle errors"); + static_file_provider.commit().expect("todo: handle errors"); provider_rw.commit().expect("todo: handle errors"); // we ignore the error because the caller may or may not care about the result