Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(prune, stages): prune pipeline stage #9419

Merged
merged 14 commits into from
Jul 11, 2024
Merged
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions crates/cli/commands/src/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,9 @@ impl PruneCommand {
info!(target: "reth::cli", ?prune_tip, ?prune_config, "Pruning data from database...");
// Run the pruner according to the configuration, and don't enforce any limits on it
let mut pruner = PrunerBuilder::new(prune_config)
.prune_delete_limit(usize::MAX)
.build(provider_factory);
.delete_limit_per_block(usize::MAX)
.build()
.with_provider(provider_factory);

pruner.run(prune_tip)?;
info!(target: "reth::cli", "Pruned data from database");
Expand Down
16 changes: 16 additions & 0 deletions crates/config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ pub struct StageConfig {
pub sender_recovery: SenderRecoveryConfig,
/// Execution stage configuration.
pub execution: ExecutionConfig,
/// Prune stage configuration.
pub prune: PruneStageConfig,
/// Account Hashing stage configuration.
pub account_hashing: HashingConfig,
/// Storage Hashing stage configuration.
Expand Down Expand Up @@ -228,6 +230,20 @@ impl From<ExecutionConfig> for ExecutionStageThresholds {
}
}

/// Prune stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]
pub struct PruneStageConfig {
/// The maximum number of entries to prune before committing progress to the database.
pub commit_threshold: usize,
}

impl Default for PruneStageConfig {
fn default() -> Self {
Self { commit_threshold: 1_000_000 }
}
}

/// Hashing stage configuration.
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Eq, Serialize)]
#[serde(default)]
Expand Down
11 changes: 7 additions & 4 deletions crates/consensus/beacon/src/engine/hooks/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use metrics::Counter;
use reth_db_api::database::Database;
use reth_errors::{RethError, RethResult};
use reth_primitives::BlockNumber;
use reth_prune::{Pruner, PrunerError, PrunerWithResult};
use reth_prune::{Pruner, PrunerError, PrunerWithResult, WithProviderFactory};
use reth_tasks::TaskSpawner;
use std::{
fmt,
Expand Down Expand Up @@ -39,7 +39,10 @@ impl<DB: fmt::Debug> fmt::Debug for PruneHook<DB> {

impl<DB: Database + 'static> PruneHook<DB> {
/// Create a new instance
pub fn new(pruner: Pruner<DB>, pruner_task_spawner: Box<dyn TaskSpawner>) -> Self {
pub fn new(
pruner: Pruner<WithProviderFactory<DB>, DB>,
pruner_task_spawner: Box<dyn TaskSpawner>,
) -> Self {
Self {
pruner_state: PrunerState::Idle(Some(pruner)),
pruner_task_spawner,
Expand Down Expand Up @@ -151,9 +154,9 @@ impl<DB: Database + 'static> EngineHook for PruneHook<DB> {
#[derive(Debug)]
enum PrunerState<DB> {
/// Pruner is idle.
Idle(Option<Pruner<DB>>),
Idle(Option<Pruner<WithProviderFactory<DB>, DB>>),
/// Pruner is running and waiting for a response
Running(oneshot::Receiver<PrunerWithResult<DB>>),
Running(oneshot::Receiver<PrunerWithResult<WithProviderFactory<DB>, DB>>),
}

#[derive(reth_metrics::Metrics)]
Expand Down
4 changes: 2 additions & 2 deletions crates/consensus/beacon/src/engine/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,14 +401,14 @@ where
BlockchainProvider::with_latest(provider_factory.clone(), tree, latest);

let pruner = Pruner::new(
provider_factory.clone(),
vec![],
5,
self.base_config.chain_spec.prune_delete_limit,
config.max_reorg_depth() as usize,
None,
watch::channel(FinishedExExHeight::NoExExs).1,
);
)
.with_provider(provider_factory.clone());

let mut hooks = EngineHooks::new();
hooks.add(PruneHook::new(pruner, Box::<TokioTaskExecutor>::default()));
Expand Down
11 changes: 7 additions & 4 deletions crates/engine/tree/src/persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use reth_provider::{
bundle_state::HashedStateChanges, BlockWriter, HistoryWriter, OriginalValuesKnown,
ProviderFactory, StageCheckpointWriter, StateWriter,
};
use reth_prune::{PruneProgress, Pruner};
use reth_prune::{PruneProgress, Pruner, WithProviderFactory};
use std::sync::mpsc::{Receiver, SendError, Sender};
use tokio::sync::oneshot;
use tracing::debug;
Expand All @@ -31,15 +31,15 @@ pub struct Persistence<DB> {
/// Incoming requests to persist stuff
incoming: Receiver<PersistenceAction>,
/// The pruner
pruner: Pruner<DB>,
pruner: Pruner<WithProviderFactory<DB>, DB>,
}

impl<DB: Database> Persistence<DB> {
/// Create a new persistence task
const fn new(
provider: ProviderFactory<DB>,
incoming: Receiver<PersistenceAction>,
pruner: Pruner<DB>,
pruner: Pruner<WithProviderFactory<DB>, DB>,
) -> Self {
Self { provider, incoming, pruner }
}
Expand Down Expand Up @@ -125,7 +125,10 @@ where
DB: Database + 'static,
{
/// Create a new persistence task, spawning it, and returning a [`PersistenceHandle`].
fn spawn_new(provider: ProviderFactory<DB>, pruner: Pruner<DB>) -> PersistenceHandle {
fn spawn_new(
provider: ProviderFactory<DB>,
pruner: Pruner<WithProviderFactory<DB>, DB>,
) -> PersistenceHandle {
let (tx, rx) = std::sync::mpsc::channel();
let task = Self::new(provider, rx, pruner);
std::thread::Builder::new()
Expand Down
2 changes: 1 addition & 1 deletion crates/node/builder/src/launch/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl<R> LaunchContextWith<Attached<WithConfigs, R>> {
/// Returns an initialized [`PrunerBuilder`] based on the configured [`PruneConfig`]
pub fn pruner_builder(&self) -> PrunerBuilder {
PrunerBuilder::new(self.prune_config().unwrap_or_default())
.prune_delete_limit(self.chain_spec().prune_delete_limit)
.delete_limit_per_block(self.chain_spec().prune_delete_limit)
.timeout(PrunerBuilder::DEFAULT_TIMEOUT)
}

Expand Down
7 changes: 4 additions & 3 deletions crates/node/builder/src/launch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,14 +231,15 @@ where

let initial_target = ctx.node_config().debug.tip;

let mut pruner_builder =
ctx.pruner_builder().max_reorg_depth(ctx.tree_config().max_reorg_depth() as usize);
let mut pruner_builder = ctx
.pruner_builder()
.prune_max_blocks_per_run(ctx.tree_config().max_reorg_depth() as usize);
if let Some(exex_manager_handle) = &exex_manager_handle {
pruner_builder =
pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
}

let pruner = pruner_builder.build(ctx.provider_factory().clone());
let pruner = pruner_builder.build().with_provider(ctx.provider_factory().clone());

let pruner_events = pruner.events();
info!(target: "reth::cli", prune_config=?ctx.prune_config().unwrap_or_default(), "Pruner initialized");
Expand Down
28 changes: 13 additions & 15 deletions crates/prune/prune/src/builder.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
use crate::{segments::SegmentSet, Pruner};
use crate::{segments::SegmentSet, Pruner, WithoutProviderFactory};
use reth_chainspec::MAINNET;
use reth_config::PruneConfig;
use reth_db_api::database::Database;
use reth_exex_types::FinishedExExHeight;
use reth_provider::ProviderFactory;
use reth_prune_types::PruneModes;
use std::time::Duration;
use tokio::sync::watch;
Expand All @@ -15,12 +14,12 @@ pub struct PrunerBuilder {
block_interval: usize,
/// Pruning configuration for every part of the data that can be pruned.
segments: PruneModes,
/// The number of blocks that can be re-orged.
max_reorg_depth: usize,
/// The maximum number of blocks that can be pruned per run.
prune_max_blocks_per_run: usize,
/// The delete limit for pruner, per block. In the actual pruner run it will be multiplied by
/// the amount of blocks between pruner runs to account for the difference in amount of new
/// data coming in.
prune_delete_limit: usize,
delete_limit_per_block: usize,
/// Time a pruner job can run before timing out.
timeout: Option<Duration>,
/// The finished height of all `ExEx`'s.
Expand Down Expand Up @@ -51,14 +50,14 @@ impl PrunerBuilder {
}

/// Sets the number of blocks that can be re-orged.
pub const fn max_reorg_depth(mut self, max_reorg_depth: usize) -> Self {
self.max_reorg_depth = max_reorg_depth;
pub const fn prune_max_blocks_per_run(mut self, prune_max_blocks_per_run: usize) -> Self {
self.prune_max_blocks_per_run = prune_max_blocks_per_run;
self
}

/// Sets the delete limit for pruner, per block.
pub const fn prune_delete_limit(mut self, prune_delete_limit: usize) -> Self {
self.prune_delete_limit = prune_delete_limit;
pub const fn delete_limit_per_block(mut self, delete_limit_per_block: usize) -> Self {
self.delete_limit_per_block = delete_limit_per_block;
self
}

Expand All @@ -81,15 +80,14 @@ impl PrunerBuilder {
}

/// Builds a [Pruner] from the current configuration.
pub fn build<DB: Database>(self, provider_factory: ProviderFactory<DB>) -> Pruner<DB> {
pub fn build<DB: Database>(self) -> Pruner<WithoutProviderFactory, DB> {
let segments = SegmentSet::<DB>::from_prune_modes(self.segments);

Pruner::new(
provider_factory,
segments.into_vec(),
self.block_interval,
self.prune_delete_limit,
self.max_reorg_depth,
self.delete_limit_per_block,
self.prune_max_blocks_per_run,
self.timeout,
emhane marked this conversation as resolved.
Show resolved Hide resolved
self.finished_exex_height,
)
Expand All @@ -101,8 +99,8 @@ impl Default for PrunerBuilder {
Self {
block_interval: 5,
segments: PruneModes::none(),
max_reorg_depth: 64,
prune_delete_limit: MAINNET.prune_delete_limit,
prune_max_blocks_per_run: 64,
delete_limit_per_block: MAINNET.prune_delete_limit,
timeout: None,
emhane marked this conversation as resolved.
Show resolved Hide resolved
finished_exex_height: watch::channel(FinishedExExHeight::NoExExs).1,
}
Expand Down
4 changes: 3 additions & 1 deletion crates/prune/prune/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ use crate::metrics::Metrics;
pub use builder::PrunerBuilder;
pub use error::PrunerError;
pub use event::PrunerEvent;
pub use pruner::{Pruner, PrunerResult, PrunerWithResult};
pub use pruner::{
Pruner, PrunerResult, PrunerWithResult, WithProviderFactory, WithoutProviderFactory,
};

// Re-export prune types
#[doc(inline)]
Expand Down
Loading
Loading