Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Pause pruning while snapshotting #11178

Merged
merged 19 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 87 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ rayon = "1.1"
registrar = { path = "../util/registrar" }
rlp = "0.4.0"
rustc-hex = "2"
scopeguard = "1.0.0"
serde = "1.0"
serde_derive = "1.0"
snapshot = { path = "snapshot" }
Expand Down
2 changes: 0 additions & 2 deletions ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,12 @@ where
ClientIoMessage::TakeSnapshot(num) => {
let client = self.client.clone();
let snapshot = self.snapshot.clone();

let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || {
if let Err(e) = snapshot.take_snapshot(&*client, num) {
match e {
EthcoreError::Snapshot(SnapshotError::SnapshotAborted) => info!("Snapshot aborted"),
_ => warn!("Failed to take snapshot at block #{}: {}", num, e),
}

}
});

Expand Down
1 change: 1 addition & 0 deletions ethcore/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ rand = "0.6"
rand_xorshift = "0.1.1"
rlp = "0.4.2"
rlp_derive = { path = "../../util/rlp-derive" }
scopeguard = "1.0.0"
snappy = { package = "parity-snappy", version ="0.1.0" }
state-db = { path = "../state-db" }
trie-db = "0.15.0"
Expand Down
4 changes: 2 additions & 2 deletions ethcore/snapshot/snapshot-tests/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl Oracle for TestOracle {

struct TestBroadcast(Option<u64>);
impl Broadcast for TestBroadcast {
fn take_at(&self, num: Option<u64>) {
if num != self.0 {
fn request_snapshot_at(&self, num: u64) {
if Some(num) != self.0 {
panic!("Watcher broadcast wrong number. Expected {:?}, found {:?}", self.0, num);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/snapshot/src/consensus/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl<'a> PowWorker<'a> {
let parent_hash = last_header.parent_hash();
let parent_total_difficulty = last_details.total_difficulty - last_header.difficulty();

trace!(target: "snapshot", "parent last written block: {}", parent_hash);
trace!(target: "snapshot", "parent last written block: #{}/{}", parent_number, parent_hash);

let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
Expand Down
41 changes: 24 additions & 17 deletions ethcore/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,35 +129,34 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
let state_root = start_header.state_root();
let block_number = start_header.number();

info!("Taking snapshot starting at block {}", block_number);

info!("Taking snapshot starting at block #{}/{:?}", block_number, block_hash);
let version = chunker.current_version();
let writer = Mutex::new(writer);
let (state_hashes, block_hashes) = thread::scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
let writer = &writer;
let block_guard = scope.spawn(move |_| {
let tb = scope.builder().name("Snapshot Worker - Blocks".to_string());
let block_guard = tb.spawn(move |_| {
chunk_secondary(chunker, chain, block_hash, writer, p)
});
})?;

// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
let num_threads: usize = cmp::min(processing_threads, SNAPSHOT_SUBPARTS);
let num_threads = cmp::min(processing_threads, SNAPSHOT_SUBPARTS);
info!(target: "snapshot", "Using {} threads for Snapshot creation.", num_threads);

let mut state_guards = Vec::with_capacity(num_threads as usize);
let mut state_guards = Vec::with_capacity(num_threads);

for thread_idx in 0..num_threads {
let state_guard = scope.spawn(move |_| -> Result<Vec<H256>, Error> {
let tb = scope.builder().name(format!("Snapshot Worker #{} - State", thread_idx).to_string());
let state_guard = tb.spawn(move |_| -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();

for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
debug!(target: "snapshot", "Chunking part {} of the state at {} in thread {}", part, block_number, thread_idx);
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?;
chunk_hashes.append(&mut hashes);
}

Ok(chunk_hashes)
});
})?;
state_guards.push(state_guard);
}

Expand All @@ -169,7 +168,8 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
state_hashes.extend(part_state_hashes);
}

debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst));
info!("Took a snapshot at #{} of {} accounts", block_number, p.accounts());

Ok((state_hashes, block_hashes))
}).expect("Sub-thread never panics; qed")?;

Expand Down Expand Up @@ -218,7 +218,7 @@ pub fn chunk_secondary<'a>(
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len());

progress.size.fetch_add(size as u64, Ordering::SeqCst);
progress.update(0, size);
chunk_hashes.push(hash);
Ok(())
};
Expand Down Expand Up @@ -275,8 +275,7 @@ impl<'a> StateChunker<'a> {
self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());

self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst);
self.progress.update(num_entries, compressed_size);

self.hashes.push(hash);
self.cur_size = 0;
Expand Down Expand Up @@ -327,7 +326,7 @@ pub fn chunk_state<'a>(
if let Some(part) = part {
assert!(part < 16, "Wrong chunk state part number (must be <16) in snapshot creation.");

let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS;
let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS; // 16
let mut seek_from = vec![0; 32];
seek_from[0] = (part * part_offset) as u8;
account_iter.seek(&seek_from)?;
Expand All @@ -349,7 +348,15 @@ pub fn chunk_state<'a>(
let account = ::rlp::decode(&*account_data)?;
let account_db = AccountDB::from_hash(db, account_key_hash);

let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?;
let fat_rlps = account::to_fat_rlps(
&account_key_hash,
&account,
&account_db,
&mut used_code,
PREFERRED_CHUNK_SIZE - chunker.chunk_size(),
PREFERRED_CHUNK_SIZE,
progress
)?;
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
if i > 0 {
chunker.write_chunk()?;
Expand Down
106 changes: 56 additions & 50 deletions ethcore/snapshot/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use ethcore_io::IoChannel;
use journaldb::Algorithm;
use keccak_hash::keccak;
use kvdb::DBTransaction;
use log::{error, info, trace, warn};
use log::{debug, error, info, trace, warn};
use parking_lot::{Mutex, RwLock, RwLockReadGuard};
use snappy;
use trie_db::TrieError;
Expand Down Expand Up @@ -212,7 +212,7 @@ impl Restoration {
}

self.guard.disarm();
trace!(target: "snapshot", "restoration finalised correctly");
trace!(target: "snapshot", "Restoration finalised correctly");
Ok(())
}

Expand Down Expand Up @@ -280,7 +280,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0),
client: params.client,
progress: Default::default(),
progress: Progress::new(),
taking_snapshot: AtomicBool::new(false),
restoring_snapshot: AtomicBool::new(false),
};
Expand Down Expand Up @@ -413,7 +413,10 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
Some(x) => x,
None => return Ok(0),
};

info!(target: "snapshot", "Migrating blocks from old db to new. Start: #{}/{:?}, Target: #{}/{:?}",
self.client.block_number(BlockId::Hash(start_hash)).unwrap_or_default(), start_hash,
self.client.block_number(BlockId::Hash(target_hash)).unwrap_or_default(), target_hash,
);
let mut batch = DBTransaction::new();
let mut parent_hash = start_hash;
while parent_hash != target_hash {
Expand Down Expand Up @@ -455,10 +458,10 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
next_chain.commit();
next_db.key_value().flush().expect("DB flush failed.");
batch = DBTransaction::new();
}

if block_number % 10_000 == 0 {
info!(target: "snapshot", "Block restoration at #{}", block_number);
if block_number % 10_000 == 0 {
info!(target: "snapshot", "Block restoration at #{}", block_number);
}
}
}

Expand All @@ -483,11 +486,13 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return }

let p = &self.progress;
info!("Snapshot: {} accounts {} blocks {} bytes", p.accounts(), p.blocks(), p.size());
info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.bytes());
let rate = p.rate();
debug!(target: "snapshot", "Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed)", rate.0, rate.1);
}

/// Take a snapshot at the block with the given number.
/// calling this while a restoration is in progress or vice versa
/// Calling this while a restoration is in progress or vice versa
/// will lead to a race condition where the first one to finish will
/// have their produced snapshot overwritten.
pub fn take_snapshot(&self, client: &C, num: u64) -> Result<(), Error> {
Expand All @@ -497,45 +502,40 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
}

info!("Taking snapshot at #{}", num);
self.progress.reset();

let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir();
{
scopeguard::defer! {{
self.taking_snapshot.store(false, Ordering::SeqCst);
}}
let start_time = std::time::Instant::now();
self.progress.reset();

let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir();

let _ = fs::remove_dir_all(&temp_dir);
let _ = fs::remove_dir_all(&temp_dir); // expected to fail

let writer = LooseWriter::new(temp_dir.clone())?;
let writer = LooseWriter::new(temp_dir.clone())?;

let guard = Guard::new(temp_dir.clone());
let res = client.take_snapshot(writer, BlockId::Number(num), &self.progress);
self.taking_snapshot.store(false, Ordering::SeqCst);
if let Err(e) = res {
if client.chain_info().best_block_number >= num + client.pruning_history() {
// The state we were snapshotting was pruned before we could finish.
info!("Periodic snapshot failed: block state pruned. Run with a longer `--pruning-history` or with `--no-periodic-snapshot`");
return Err(e);
} else {
return Err(e);
}
}
let guard = Guard::new(temp_dir.clone());
let _ = client.take_snapshot(writer, BlockId::Number(num), &self.progress)?;
info!("Finished taking snapshot at #{}, in {:.0?}", num, start_time.elapsed());

info!("Finished taking snapshot at #{}", num);
// destroy the old snapshot reader.
let mut reader = self.reader.write();
*reader = None;

let mut reader = self.reader.write();
if snapshot_dir.exists() {
trace!(target: "snapshot", "Removing previous snapshot at {:?}", &snapshot_dir);
fs::remove_dir_all(&snapshot_dir)?;
}

// destroy the old snapshot reader.
*reader = None;
fs::rename(temp_dir, &snapshot_dir)?;
trace!(target: "snapshot", "Moved new snapshot into place at {:?}", &snapshot_dir);
*reader = Some(LooseReader::new(snapshot_dir)?);

if snapshot_dir.exists() {
fs::remove_dir_all(&snapshot_dir)?;
guard.disarm();
Ok(())
}

fs::rename(temp_dir, &snapshot_dir)?;

*reader = Some(LooseReader::new(snapshot_dir)?);

guard.disarm();
Ok(())
}

/// Initialize the restoration synchronously.
Expand Down Expand Up @@ -654,13 +654,19 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
Ok(())
}

/// Import a previous chunk at the given path. Returns whether the block was imported or not
fn import_prev_chunk(&self, restoration: &mut Option<Restoration>, manifest: &ManifestData, file: io::Result<fs::DirEntry>) -> Result<bool, Error> {
/// Import a previous chunk at the given path. Returns whether the chunk was imported or not
fn import_prev_chunk(
&self,
restoration: &mut Option<Restoration>,
manifest: &ManifestData,
file: io::Result<fs::DirEntry>
) -> Result<bool, Error> {
let file = file?;
let path = file.path();

let mut file = File::open(path.clone())?;
let mut buffer = Vec::new();
let filesize = file.metadata()?.len();
let mut buffer = Vec::with_capacity(filesize as usize + 1); // +1 for EOF
file.read_to_end(&mut buffer)?;

let hash = keccak(&buffer);
Expand All @@ -670,6 +676,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
} else if manifest.state_hashes.contains(&hash) {
true
} else {
warn!(target: "snapshot", "Hash of the content of {:?} not present in the manifest block/state hashes.", path);
return Ok(false);
};

Expand All @@ -680,11 +687,10 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
Ok(true)
}

// finalize the restoration. this accepts an already-locked
// restoration as an argument -- so acquiring it again _will_
// lead to deadlock.
// Finalize the restoration. This accepts an already-locked restoration as an argument -- so
// acquiring it again _will_ lead to deadlock.
fn finalize_restoration(&self, rest: &mut Option<Restoration>) -> Result<(), Error> {
trace!(target: "snapshot", "finalizing restoration");
trace!(target: "snapshot", "Finalizing restoration");
*self.status.lock() = RestorationStatus::Finalizing;

let recover = rest.as_ref().map_or(false, |rest| rest.writer.is_some());
Expand All @@ -695,7 +701,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
.unwrap_or(Ok(()))?;

let migrated_blocks = self.migrate_blocks()?;
info!(target: "snapshot", "Migrated {} ancient blocks", migrated_blocks);
info!(target: "snapshot", "Migrated {} ancient blocks from the old DB", migrated_blocks);

// replace the Client's database with the new one (restart the Client).
self.client.restore_db(&*self.restoration_db().to_string_lossy())?;
Expand All @@ -707,11 +713,11 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
let snapshot_dir = self.snapshot_dir();

if snapshot_dir.exists() {
trace!(target: "snapshot", "removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
trace!(target: "snapshot", "Removing old snapshot dir at {}", snapshot_dir.to_string_lossy());
fs::remove_dir_all(&snapshot_dir)?;
}

trace!(target: "snapshot", "copying restored snapshot files over");
trace!(target: "snapshot", "Copying restored snapshot files over");
fs::rename(self.temp_recovery_dir(), &snapshot_dir)?;

*reader = Some(LooseReader::new(snapshot_dir)?);
Expand Down
4 changes: 2 additions & 2 deletions ethcore/snapshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ pub trait SnapshotComponents: Send {
/// Snapshot related functionality
pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore + BlockChainReset {
/// Take a snapshot at the given block.
/// If the ID given is "latest", this will default to 1000 blocks behind.
/// If the BlockId is 'Latest', this will default to 1000 blocks behind.
fn take_snapshot<W: SnapshotWriter + Send>(
&self,
writer: W,
Expand All @@ -148,7 +148,7 @@ pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore + Block
/// Helper trait for broadcasting a block to take a snapshot at.
pub trait Broadcast: Send + Sync {
/// Start a snapshot from the given block number.
fn take_at(&self, num: Option<u64>);
fn request_snapshot_at(&self, num: u64);
}


Expand Down
Loading