Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Pause pruning while snapshotting #11178

Merged
merged 19 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions ethcore/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
state_hashes.extend(part_state_hashes);
}

debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst));
info!("Took a snapshot at #{} of {} accounts", block_number, p.accounts());

Ok((state_hashes, block_hashes))
}).expect("Sub-thread never panics; qed")?;
Expand Down Expand Up @@ -218,7 +218,7 @@ pub fn chunk_secondary<'a>(
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len());

progress.size.fetch_add(size as u64, Ordering::SeqCst);
progress.update(0, size);
chunk_hashes.push(hash);
Ok(())
};
Expand Down Expand Up @@ -275,8 +275,7 @@ impl<'a> StateChunker<'a> {
self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());

self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst);
self.progress.update(num_entries, compressed_size);

self.hashes.push(hash);
self.cur_size = 0;
Expand Down
10 changes: 6 additions & 4 deletions ethcore/snapshot/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use ethcore_io::IoChannel;
use journaldb::Algorithm;
use keccak_hash::keccak;
use kvdb::DBTransaction;
use log::{error, info, trace, warn};
use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use snappy;
use trie_db::TrieError;
Expand Down Expand Up @@ -280,7 +280,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0),
client: params.client,
progress: Default::default(),
progress: Progress::new(),
taking_snapshot: AtomicBool::new(false),
restoring_snapshot: AtomicBool::new(false),
};
Expand Down Expand Up @@ -486,7 +486,9 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return }

let p = &self.progress;
info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.size());
info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.bytes());
let rate = p.rate();
debug!(target: "snapshot", "Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed)", rate.0, rate.1);
}

/// Take a snapshot at the block with the given number.
Expand Down Expand Up @@ -516,7 +518,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {

let guard = Guard::new(temp_dir.clone());
let _ = client.take_snapshot(writer, BlockId::Number(num), &self.progress)?;
info!("Finished taking snapshot at #{}, in {:#?}", num, start_time.elapsed());
info!("Finished taking snapshot at #{}, in {:.0?}", num, start_time.elapsed());

// destroy the old snapshot reader.
let mut reader = self.reader.write();
Expand Down
13 changes: 3 additions & 10 deletions ethcore/snapshot/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,9 @@ impl ChainNotify for Watcher {
let highest = new_blocks.imported.into_iter()
// Convert block hashes to block numbers for all newly imported blocks
.filter_map(|h| self.oracle.to_number(h))
// …only keep the new blocks that have numbers bigger than period + history
// todo: this seems nonsensical: period is always 5000 and history is always 100, so
// this filters out blocknumbers that are lower than 5100; what's the point of that?
.filter(|&num| num >= self.period + self.history)
// Subtract `history` (i.e. `SNAPSHOT_HISTORY`, i.e. 100) from the block numbers.
// todo: why? Here we back off 100 blocks such that the final block
// number from which we start the snapshot is "(new) highest block" - 100)
// Maybe we do this to avoid IO contention on the tip? Or for reorgs? If the
// latter, it should be maybe be higher?
.map(|num| num - self.history)
// Subtract `history` (i.e. `SNAPSHOT_HISTORY`, i.e. 100) from the block numbers to stay
// clear of reorgs.
.map(|num| num.saturating_sub(self.history) )
// …filter out blocks that do not fall on the a multiple of `period`. This regulates the
// frequency of snapshots and ensures more snapshots are produced from similar points in
// the chain.
Expand Down
12 changes: 7 additions & 5 deletions ethcore/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -973,8 +973,10 @@ impl Client {
None => return Ok(()),
};

// prune all ancient eras until we're below the memory target,
// but have at least the minimum number of states, i.e. `history` .
// Prune all ancient eras until we're below the memory target (default: 32Mb),
// but have at least the minimum number of states, i.e. `history`.
// If a snapshot is under way, no pruning happens and memory consumption is allowed to
// increase above the memory target until the snapshot has finished.
loop {
let needs_pruning = state_db.journal_db().journal_size() >= self.config.history_mem;

Expand All @@ -984,16 +986,16 @@ impl Client {

match state_db.journal_db().earliest_era() {
Some(earliest_era) if earliest_era + self.history <= latest_era => {
let freeze_at = self.snapshotting_at.load(Ordering::SeqCst); // todo[dvdplm]: can be `Acquire` I think?
let freeze_at = self.snapshotting_at.load(Ordering::SeqCst);
if freeze_at > 0 && freeze_at == earliest_era {
trace!(target: "pruning", "Pruning is freezed at era {}; earliest era={}, latest era={}, journal_size={}, mem_used={}. Not pruning.",
freeze_at, earliest_era, latest_era, state_db.journal_db().journal_size(), state_db.journal_db().mem_used());
break;
}
trace!(target: "pruning", "Pruning state for ancient era #{}; latest era={}, journal_size={}, mem_used={}.",
earliest_era, latest_era, state_db.journal_db().journal_size(), state_db.journal_db().mem_used());
// todo[dvdplm] reinstate this before merge, logging mem is expensive:
// trace!(target: "pruning", "Pruning state for ancient era #{}", earliest_era);
// todo[dvdplm] reinstate this before merge, logging mem is expensive:
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
// trace!(target: "pruning", "Pruning state for ancient era #{}", earliest_era);
match chain.block_hash(earliest_era) {
Some(ancient_hash) => {
let mut batch = DBTransaction::new();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/src/client/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ pub struct ClientConfig {
pub history: u64,
/// Ideal memory usage for state pruning history.
pub history_mem: usize,
/// Check seal valididity on block import
/// Check seal validity on block import
pub check_seal: bool,
/// Maximal number of transactions queued for verification in a separate thread.
pub transaction_verification_queue_size: usize,
Expand Down
3 changes: 2 additions & 1 deletion ethcore/types/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ ethjson = { path = "../../json" }
ethkey = { path = "../../accounts/ethkey" }
keccak-hash = "0.4.0"
parity-bytes = "0.1"
parity-util-mem = "0.2.0"
parity-snappy = "0.1"
parity-util-mem = "0.2.0"
parking_lot = "0.9.0"
patricia-trie-ethereum = { path = "../../util/patricia-trie-ethereum" }
rlp = "0.4.0"
rlp_derive = { path = "../../util/rlp-derive" }
Expand Down
1 change: 1 addition & 0 deletions ethcore/types/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ extern crate ethkey;
#[macro_use]
extern crate derive_more;
extern crate keccak_hash as hash;
extern crate parking_lot;
extern crate parity_bytes as bytes;
extern crate patricia_trie_ethereum as ethtrie;
extern crate rlp;
Expand Down
65 changes: 58 additions & 7 deletions ethcore/types/src/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

//! Snapshot type definitions

use std::sync::atomic::{AtomicBool, AtomicUsize, AtomicU64, Ordering};
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::time::Instant;

use bytes::Bytes;
use ethereum_types::H256;
use parking_lot::RwLock;
use rlp::{Rlp, RlpStream, DecoderError};
use bytes::Bytes;

/// Modes of snapshotting
pub enum Snapshotting {
Expand All @@ -39,31 +41,53 @@ pub enum Snapshotting {
}

/// A progress indicator for snapshots.
#[derive(Debug, Default)]
#[derive(Debug)]
pub struct Progress {
/// Number of accounts processed so far
pub accounts: AtomicUsize,
accounts: AtomicUsize,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this many fields, it's almost certainly better to use a single parking lot mutex wrapping a single struct containing all this data. maybe beyond the scope

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I'll make the change in a diff PR as it gets a bit noisy.

// Number of accounts processed at last tick.
prev_accounts: AtomicUsize,
/// Number of blocks processed so far
pub blocks: AtomicUsize,
/// Size in bytes of a all compressed chunks processed so far
pub size: AtomicU64,
bytes: AtomicUsize,
// Number of bytes processed at last tick.
prev_bytes: AtomicUsize,
/// Signals that the snapshotting process is completed
pub done: AtomicBool,
/// Signal snapshotting process to abort
pub abort: AtomicBool,

last_tick: RwLock<Instant>,
}

impl Progress {
/// Create a new progress tracker.
pub fn new() -> Progress {
Progress {
accounts: AtomicUsize::new(0),
prev_accounts: AtomicUsize::new(0),
blocks: AtomicUsize::new(0),
bytes: AtomicUsize::new(0),
prev_bytes: AtomicUsize::new(0),
abort: AtomicBool::new(false),
done: AtomicBool::new(false),
last_tick: RwLock::new(Instant::now()),
}
}

/// Reset the progress.
pub fn reset(&self) {
self.accounts.store(0, Ordering::Release);
self.blocks.store(0, Ordering::Release);
self.size.store(0, Ordering::Release);
self.bytes.store(0, Ordering::Release);
self.abort.store(false, Ordering::Release);

// atomic fence here to ensure the others are written first?
// logs might very rarely get polluted if not.
self.done.store(false, Ordering::Release);

*self.last_tick.write() = Instant::now();
}

/// Get the number of accounts snapshotted thus far.
Expand All @@ -73,10 +97,37 @@ impl Progress {
pub fn blocks(&self) -> usize { self.blocks.load(Ordering::Acquire) }

/// Get the written size of the snapshot in bytes.
pub fn size(&self) -> u64 { self.size.load(Ordering::Acquire) }
pub fn bytes(&self) -> usize { self.bytes.load(Ordering::Acquire) }

/// Whether the snapshot is complete.
pub fn done(&self) -> bool { self.done.load(Ordering::Acquire) }

/// Return the progress rate over the last tick (i.e. since last update).
pub fn rate(&self) -> (f64, f64) {
let last_tick = *self.last_tick.read();
let dt = last_tick.elapsed().as_secs_f64();
if dt < 1.0 {
return (0f64, 0f64);
}
let delta_acc = self.accounts.load(Ordering::Relaxed)
.saturating_sub(self.prev_accounts.load(Ordering::Relaxed));
let delta_bytes = self.bytes.load(Ordering::Relaxed)
.saturating_sub(self.prev_bytes.load(Ordering::Relaxed));
(delta_acc as f64 / dt, delta_bytes as f64 / dt)
}

/// Update state progress counters and set the last tick.
pub fn update(&self, accounts_delta: usize, bytes_delta: usize) {
dvdplm marked this conversation as resolved.
Show resolved Hide resolved
*self.last_tick.write() = Instant::now();
self.prev_accounts.store(
self.accounts.fetch_add(accounts_delta, Ordering::SeqCst),
Ordering::SeqCst
);
self.prev_bytes.store(
self.bytes.fetch_add(bytes_delta, Ordering::SeqCst),
Ordering::SeqCst
);
}
}

/// Manifest data.
Expand Down
7 changes: 5 additions & 2 deletions parity/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ use db;
use registrar::RegistrarClient;

// How often we attempt to take a snapshot: only snapshot on blocknumbers that are multiples of this.
// todo[dvdplm] reinstate before merging
// const SNAPSHOT_PERIOD: u64 = 5000;
const SNAPSHOT_PERIOD: u64 = 50;

// Start snapshots `history` blocks from the tip.
const SNAPSHOT_HISTORY: u64 = 100;
// Start snapshots `history` blocks from the tip. Should be smaller than `SNAPSHOT_HISTORY`.
// todo[dvdplm] reinstate before merging
//const SNAPSHOT_HISTORY: u64 = 100;
const SNAPSHOT_HISTORY: u64 = 10;

// Number of minutes before a given gas price corpus should expire.
// Light client only.
Expand Down
4 changes: 2 additions & 2 deletions parity/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,14 +257,14 @@ impl SnapshotCommand {
let writer = PackedWriter::new(&file_path)
.map_err(|e| format!("Failed to open snapshot writer: {}", e))?;

let progress = Arc::new(Progress::default());
let progress = Arc::new(Progress::new());
let p = progress.clone();
let informant_handle = ::std::thread::spawn(move || {
::std::thread::sleep(Duration::from_secs(5));

let mut last_size = 0;
while !p.done() {
let cur_size = p.size();
let cur_size = p.bytes();
if cur_size != last_size {
last_size = cur_size;
let bytes = ::informant::format_bytes(cur_size as usize);
Expand Down