Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Pause pruning while snapshotting #11178

Merged
merged 19 commits into from
Oct 24, 2019
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 88 additions & 46 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions ethcore/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ rayon = "1.1"
registrar = { path = "../util/registrar" }
rlp = "0.4.0"
rustc-hex = "2"
scopeguard = "1.0.0"
serde = "1.0"
serde_derive = "1.0"
snapshot = { path = "snapshot" }
Expand Down
2 changes: 0 additions & 2 deletions ethcore/service/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,14 +270,12 @@ where
ClientIoMessage::TakeSnapshot(num) => {
let client = self.client.clone();
let snapshot = self.snapshot.clone();

let res = thread::Builder::new().name("Periodic Snapshot".into()).spawn(move || {
if let Err(e) = snapshot.take_snapshot(&*client, num) {
match e {
EthcoreError::Snapshot(SnapshotError::SnapshotAborted) => info!("Snapshot aborted"),
_ => warn!("Failed to take snapshot at block #{}: {}", num, e),
}

}
});

Expand Down
1 change: 1 addition & 0 deletions ethcore/snapshot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ rand = "0.6"
rand_xorshift = "0.1.1"
rlp = "0.4.2"
rlp_derive = { path = "../../util/rlp-derive" }
scopeguard = "1.0.0"
snappy = { package = "parity-snappy", version ="0.1.0" }
state-db = { path = "../state-db" }
trie-db = "0.15.0"
Expand Down
12 changes: 6 additions & 6 deletions ethcore/snapshot/snapshot-tests/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ fn encoding_basic() {

let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::default();
let p = Progress::new();
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
Expand All @@ -69,7 +69,7 @@ fn encoding_version() {

let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::default();
let p = Progress::new();
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
Expand All @@ -96,7 +96,7 @@ fn encoding_storage() {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let p = Progress::default();
let p = Progress::new();

let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
Expand Down Expand Up @@ -124,7 +124,7 @@ fn encoding_storage_split() {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let p = Progress::default();
let p = Progress::new();
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), 500, 1000, &p).unwrap();
let mut root = KECCAK_NULL_RLP;
let mut restored_account = None;
Expand Down Expand Up @@ -170,8 +170,8 @@ fn encoding_code() {
};

let mut used_code = HashSet::new();
let p1 = Progress::default();
let p2 = Progress::default();
let p1 = Progress::new();
let p2 = Progress::new();
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::from_hash(db.as_hash_db(), keccak(addr1)), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::from_hash(db.as_hash_db(), keccak(addr2)), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
assert_eq!(used_code.len(), 1);
Expand Down
5 changes: 3 additions & 2 deletions ethcore/snapshot/snapshot-tests/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ use client_traits::ChainInfo;
use common_types::{
ids::BlockId,
basic_account::BasicAccount,
errors::EthcoreError
errors::EthcoreError,
snapshot::Progress,
};
use engine::Engine;
use ethcore::client::Client;
Expand Down Expand Up @@ -145,7 +146,7 @@ pub fn snap(client: &Client) -> (Box<dyn SnapshotReader>, TempDir) {
let tempdir = TempDir::new("").unwrap();
let path = tempdir.path().join("file");
let writer = PackedWriter::new(&path).unwrap();
let progress = Default::default();
let progress = Progress::new();

let hash = client.chain_info().best_block_hash;
client.take_snapshot(writer, BlockId::Hash(hash), &progress).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion ethcore/snapshot/snapshot-tests/src/proof_of_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ fn chunk_and_restore(amount: u64) {
&bc,
best_hash,
&writer,
&Progress::default()
&Progress::new()
).unwrap();

let manifest = ManifestData {
Expand Down
4 changes: 2 additions & 2 deletions ethcore/snapshot/snapshot-tests/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ fn keep_ancient_blocks() {
&bc,
best_hash,
&writer,
&Progress::default()
&Progress::new()
).unwrap();
let state_db = client.state_db().journal_db().boxed_clone();
let start_header = bc.block_header_data(&best_hash).unwrap();
Expand All @@ -287,7 +287,7 @@ fn keep_ancient_blocks() {
state_db.as_hash_db(),
&state_root,
&writer,
&Progress::default(),
&Progress::new(),
None,
0
).unwrap();
Expand Down
6 changes: 3 additions & 3 deletions ethcore/snapshot/snapshot-tests/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ fn snap_and_restore() {

let mut state_hashes = Vec::new();
for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), Some(part), 0).unwrap();
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), Some(part), 0).unwrap();
state_hashes.append(&mut hashes);
}

Expand Down Expand Up @@ -133,7 +133,7 @@ fn get_code_from_prev_chunk() {
let mut make_chunk = |acc, hash| {
let mut db = journaldb::new_memory_db();
AccountDBMut::from_hash(&mut db, hash).insert(EMPTY_PREFIX, &code[..]);
let p = Progress::default();
let p = Progress::new();
let fat_rlp = to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap();
let mut stream = RlpStream::new_list(1);
stream.append_raw(&fat_rlp[0], 1);
Expand Down Expand Up @@ -178,7 +178,7 @@ fn checks_flag() {
let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());

let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::default(), None, 0).unwrap();
let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), None, 0).unwrap();

writer.into_inner().finish(ManifestData {
version: 2,
Expand Down
4 changes: 2 additions & 2 deletions ethcore/snapshot/snapshot-tests/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ impl Oracle for TestOracle {

struct TestBroadcast(Option<u64>);
impl Broadcast for TestBroadcast {
fn take_at(&self, num: Option<u64>) {
if num != self.0 {
fn request_snapshot_at(&self, num: u64) {
if Some(num) != self.0 {
panic!("Watcher broadcast wrong number. Expected {:?}, found {:?}", self.0, num);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ethcore/snapshot/src/consensus/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl<'a> PowWorker<'a> {
let parent_hash = last_header.parent_hash();
let parent_total_difficulty = last_details.total_difficulty - last_header.difficulty();

trace!(target: "snapshot", "parent last written block: {}", parent_hash);
trace!(target: "snapshot", "parent last written block: #{}/{}", parent_number, parent_hash);

let num_entries = self.rlps.len();
let mut rlp_stream = RlpStream::new_list(3 + num_entries);
Expand Down
41 changes: 24 additions & 17 deletions ethcore/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,35 +129,34 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
let state_root = start_header.state_root();
let block_number = start_header.number();

info!("Taking snapshot starting at block {}", block_number);

info!("Taking snapshot starting at block #{}/{:?}", block_number, block_hash);
let version = chunker.current_version();
let writer = Mutex::new(writer);
let (state_hashes, block_hashes) = thread::scope(|scope| -> Result<(Vec<H256>, Vec<H256>), Error> {
let writer = &writer;
let block_guard = scope.spawn(move |_| {
let tb = scope.builder().name("Snapshot Worker - Blocks".to_string());
let block_guard = tb.spawn(move |_| {
chunk_secondary(chunker, chain, block_hash, writer, p)
});
})?;

// The number of threads must be between 1 and SNAPSHOT_SUBPARTS
assert!(processing_threads >= 1, "Cannot use less than 1 threads for creating snapshots");
let num_threads: usize = cmp::min(processing_threads, SNAPSHOT_SUBPARTS);
let num_threads = cmp::min(processing_threads, SNAPSHOT_SUBPARTS);
info!(target: "snapshot", "Using {} threads for Snapshot creation.", num_threads);

let mut state_guards = Vec::with_capacity(num_threads as usize);
let mut state_guards = Vec::with_capacity(num_threads);

for thread_idx in 0..num_threads {
let state_guard = scope.spawn(move |_| -> Result<Vec<H256>, Error> {
let tb = scope.builder().name(format!("Snapshot Worker #{} - State", thread_idx).to_string());
let state_guard = tb.spawn(move |_| -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();

for part in (thread_idx..SNAPSHOT_SUBPARTS).step_by(num_threads) {
debug!(target: "snapshot", "Chunking part {} in thread {}", part, thread_idx);
debug!(target: "snapshot", "Chunking part {} of the state at {} in thread {}", part, block_number, thread_idx);
let mut hashes = chunk_state(state_db, &state_root, writer, p, Some(part), thread_idx)?;
chunk_hashes.append(&mut hashes);
}

Ok(chunk_hashes)
});
})?;
state_guards.push(state_guard);
}

Expand All @@ -169,7 +168,8 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
state_hashes.extend(part_state_hashes);
}

debug!(target: "snapshot", "Took a snapshot of {} accounts", p.accounts.load(Ordering::SeqCst));
info!("Took a snapshot at #{} of {} accounts", block_number, p.accounts());

Ok((state_hashes, block_hashes))
}).expect("Sub-thread never panics; qed")?;

Expand Down Expand Up @@ -218,7 +218,7 @@ pub fn chunk_secondary<'a>(
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len());

progress.size.fetch_add(size as u64, Ordering::SeqCst);
progress.update(0, size);
chunk_hashes.push(hash);
Ok(())
};
Expand Down Expand Up @@ -275,8 +275,7 @@ impl<'a> StateChunker<'a> {
self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());

self.progress.accounts.fetch_add(num_entries, Ordering::SeqCst);
self.progress.size.fetch_add(compressed_size as u64, Ordering::SeqCst);
self.progress.update(num_entries, compressed_size);

self.hashes.push(hash);
self.cur_size = 0;
Expand Down Expand Up @@ -327,7 +326,7 @@ pub fn chunk_state<'a>(
if let Some(part) = part {
assert!(part < 16, "Wrong chunk state part number (must be <16) in snapshot creation.");

let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS;
let part_offset = MAX_SNAPSHOT_SUBPARTS / SNAPSHOT_SUBPARTS; // 16
let mut seek_from = vec![0; 32];
seek_from[0] = (part * part_offset) as u8;
account_iter.seek(&seek_from)?;
Expand All @@ -349,7 +348,15 @@ pub fn chunk_state<'a>(
let account = ::rlp::decode(&*account_data)?;
let account_db = AccountDB::from_hash(db, account_key_hash);

let fat_rlps = account::to_fat_rlps(&account_key_hash, &account, &account_db, &mut used_code, PREFERRED_CHUNK_SIZE - chunker.chunk_size(), PREFERRED_CHUNK_SIZE, progress)?;
let fat_rlps = account::to_fat_rlps(
&account_key_hash,
&account,
&account_db,
&mut used_code,
PREFERRED_CHUNK_SIZE - chunker.chunk_size(),
PREFERRED_CHUNK_SIZE,
progress
)?;
for (i, fat_rlp) in fat_rlps.into_iter().enumerate() {
if i > 0 {
chunker.write_chunk()?;
Expand Down
Loading