Skip to content
This repository has been archived by the owner on Nov 6, 2020. It is now read-only.

Use a lock instead of atomics for snapshot Progress #11197

Merged
merged 20 commits into from
Oct 28, 2019
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions ethcore/snapshot/snapshot-tests/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use ethereum_types::{H256, Address};
use hash_db::{HashDB, EMPTY_PREFIX};
use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP, keccak};
use kvdb::DBValue;
use parking_lot::RwLock;
use rlp::Rlp;
use snapshot::test_helpers::{ACC_EMPTY, to_fat_rlps, from_fat_rlp};

Expand All @@ -48,7 +49,7 @@ fn encoding_basic() {

let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new();
let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
Expand All @@ -69,7 +70,7 @@ fn encoding_version() {

let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);
let p = Progress::new();
let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlps[0]).at(1).unwrap();
assert_eq!(from_fat_rlp(&mut AccountDBMut::from_hash(db.as_hash_db_mut(), keccak(addr)), fat_rlp, H256::zero()).unwrap().0, account);
Expand All @@ -96,7 +97,7 @@ fn encoding_storage() {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let p = Progress::new();
let p = RwLock::new(Progress::new());

let fat_rlp = to_fat_rlps(&keccak(&addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), usize::max_value(), usize::max_value(), &p).unwrap();
let fat_rlp = Rlp::new(&fat_rlp[0]).at(1).unwrap();
Expand Down Expand Up @@ -124,7 +125,7 @@ fn encoding_storage_split() {
let thin_rlp = ::rlp::encode(&account);
assert_eq!(::rlp::decode::<BasicAccount>(&thin_rlp).unwrap(), account);

let p = Progress::new();
let p = RwLock::new(Progress::new());
let fat_rlps = to_fat_rlps(&keccak(addr), &account, &AccountDB::from_hash(db.as_hash_db(), keccak(addr)), &mut Default::default(), 500, 1000, &p).unwrap();
let mut root = KECCAK_NULL_RLP;
let mut restored_account = None;
Expand Down Expand Up @@ -170,8 +171,8 @@ fn encoding_code() {
};

let mut used_code = HashSet::new();
let p1 = Progress::new();
let p2 = Progress::new();
let p1 = RwLock::new(Progress::new());
let p2 = RwLock::new(Progress::new());
let fat_rlp1 = to_fat_rlps(&keccak(&addr1), &account1, &AccountDB::from_hash(db.as_hash_db(), keccak(addr1)), &mut used_code, usize::max_value(), usize::max_value(), &p1).unwrap();
let fat_rlp2 = to_fat_rlps(&keccak(&addr2), &account2, &AccountDB::from_hash(db.as_hash_db(), keccak(addr2)), &mut used_code, usize::max_value(), usize::max_value(), &p2).unwrap();
assert_eq!(used_code.len(), 1);
Expand Down
3 changes: 2 additions & 1 deletion ethcore/snapshot/snapshot-tests/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use keccak_hash::{KECCAK_NULL_RLP};
use keccak_hasher::KeccakHasher;
use kvdb::DBValue;
use log::trace;
use parking_lot::RwLock;
use rand::Rng;
use rlp;
use snapshot::{
Expand Down Expand Up @@ -146,7 +147,7 @@ pub fn snap(client: &Client) -> (Box<dyn SnapshotReader>, TempDir) {
let tempdir = TempDir::new("").unwrap();
let path = tempdir.path().join("file");
let writer = PackedWriter::new(&path).unwrap();
let progress = Progress::new();
let progress = RwLock::new(Progress::new());

let hash = client.chain_info().best_block_hash;
client.take_snapshot(writer, BlockId::Hash(hash), &progress).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions ethcore/snapshot/snapshot-tests/src/proof_of_work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use snapshot::{
io::{PackedReader, PackedWriter, SnapshotReader, SnapshotWriter},
PowSnapshot,
};
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use snappy;
use keccak_hash::KECCAK_NULL_RLP;
use kvdb::DBTransaction;
Expand Down Expand Up @@ -74,7 +74,7 @@ fn chunk_and_restore(amount: u64) {
&bc,
best_hash,
&writer,
&Progress::new()
&RwLock::new(Progress::new())
).unwrap();

let manifest = ManifestData {
Expand Down
6 changes: 3 additions & 3 deletions ethcore/snapshot/snapshot-tests/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ use ethcore::{
test_helpers::{new_db, new_temp_db, generate_dummy_client_with_spec_and_data, restoration_db_handler}
};

use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use ethcore_io::{IoChannel, IoService};
use kvdb_rocksdb::DatabaseConfig;
use journaldb::Algorithm;
Expand Down Expand Up @@ -278,7 +278,7 @@ fn keep_ancient_blocks() {
&bc,
best_hash,
&writer,
&Progress::new()
&RwLock::new(Progress::new())
).unwrap();
let state_db = client.state_db().journal_db().boxed_clone();
let start_header = bc.block_header_data(&best_hash).unwrap();
Expand All @@ -287,7 +287,7 @@ fn keep_ancient_blocks() {
state_db.as_hash_db(),
&state_root,
&writer,
&Progress::new(),
&RwLock::new(Progress::new()),
None,
0
).unwrap();
Expand Down
20 changes: 15 additions & 5 deletions ethcore/snapshot/snapshot-tests/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use rand_xorshift::XorShiftRng;
use ethereum_types::H256;
use journaldb::{self, Algorithm};
use kvdb_rocksdb::{Database, DatabaseConfig};
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use tempdir::TempDir;

use crate::helpers::StateProducer;
Expand All @@ -61,8 +61,9 @@ fn snap_and_restore() {
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());

let mut state_hashes = Vec::new();
let progress = RwLock::new(Progress::new());
for part in 0..SNAPSHOT_SUBPARTS {
let mut hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), Some(part), 0).unwrap();
let mut hashes = chunk_state(&old_db, &state_root, &writer, &progress, Some(part), 0).unwrap();
state_hashes.append(&mut hashes);
}

Expand Down Expand Up @@ -133,8 +134,16 @@ fn get_code_from_prev_chunk() {
let mut make_chunk = |acc, hash| {
let mut db = journaldb::new_memory_db();
AccountDBMut::from_hash(&mut db, hash).insert(EMPTY_PREFIX, &code[..]);
let p = Progress::new();
let fat_rlp = to_fat_rlps(&hash, &acc, &AccountDB::from_hash(&db, hash), &mut used_code, usize::max_value(), usize::max_value(), &p).unwrap();
let p = RwLock::new(Progress::new());
let fat_rlp = to_fat_rlps(
&hash,
&acc,
&AccountDB::from_hash(&db, hash),
&mut used_code,
usize::max_value(),
usize::max_value(),
&p
).unwrap();
let mut stream = RlpStream::new_list(1);
stream.append_raw(&fat_rlp[0], 1);
stream.out()
Expand Down Expand Up @@ -177,8 +186,9 @@ fn checks_flag() {

let state_root = producer.state_root();
let writer = Mutex::new(PackedWriter::new(&snap_file).unwrap());
let progress = RwLock::new(Progress::new());

let state_hashes = chunk_state(&old_db, &state_root, &writer, &Progress::new(), None, 0).unwrap();
let state_hashes = chunk_state(&old_db, &state_root, &writer, &progress, None, 0).unwrap();

writer.into_inner().finish(ManifestData {
version: 2,
Expand Down
5 changes: 3 additions & 2 deletions ethcore/snapshot/src/account.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash_db::HashDB;
use keccak_hash::{KECCAK_EMPTY, KECCAK_NULL_RLP};
use log::{trace, warn};
use parking_lot::RwLock;
use rlp::{RlpStream, Rlp};
use trie_db::{Trie, TrieMut};

Expand Down Expand Up @@ -79,7 +80,7 @@ pub fn to_fat_rlps(
used_code: &mut HashSet<H256>,
first_chunk_size: usize,
max_chunk_size: usize,
p: &Progress,
p: &RwLock<Progress>,
) -> Result<Vec<Bytes>, Error> {
let db = &(acct_db as &dyn HashDB<_,_>);
let db = TrieDB::new(db, &acc.storage_root)?;
Expand Down Expand Up @@ -135,7 +136,7 @@ pub fn to_fat_rlps(
}

loop {
if p.abort.load(Ordering::SeqCst) {
if p.read().abort {
trace!(target: "snapshot", "to_fat_rlps: aborting snapshot");
return Err(Error::SnapshotAborted);
}
Expand Down
3 changes: 2 additions & 1 deletion ethcore/snapshot/src/consensus/authority.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use ethereum_types::{H256, U256};
use itertools::{Position, Itertools};
use kvdb::KeyValueDB;
use log::trace;
use parking_lot::RwLock;
use rlp::{RlpStream, Rlp};

use crate::{SnapshotComponents, Rebuilder};
Expand All @@ -62,7 +63,7 @@ impl SnapshotComponents for PoaSnapshot {
chain: &BlockChain,
block_at: H256,
sink: &mut ChunkSink,
_progress: &Progress,
_progress: &RwLock<Progress>,
preferred_size: usize,
) -> Result<(), SnapshotError> {
let number = chain.block_number(&block_at)
Expand Down
7 changes: 4 additions & 3 deletions ethcore/snapshot/src/consensus/work.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use engine::Engine;
use ethereum_types::{H256, U256};
use kvdb::KeyValueDB;
use log::trace;
use parking_lot::RwLock;
use rand::rngs::OsRng;
use rlp::{RlpStream, Rlp};
use triehash::ordered_trie_root;
Expand Down Expand Up @@ -72,7 +73,7 @@ impl SnapshotComponents for PowSnapshot {
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &Progress,
progress: &RwLock<Progress>,
preferred_size: usize,
) -> Result<(), SnapshotError> {
PowWorker {
Expand Down Expand Up @@ -110,7 +111,7 @@ struct PowWorker<'a> {
rlps: VecDeque<Bytes>,
current_hash: H256,
writer: &'a mut ChunkSink<'a>,
progress: &'a Progress,
progress: &'a RwLock<Progress>,
preferred_size: usize,
}

Expand Down Expand Up @@ -153,7 +154,7 @@ impl<'a> PowWorker<'a> {

last = self.current_hash;
self.current_hash = block.header_view().parent_hash();
self.progress.blocks.fetch_add(1, Ordering::SeqCst);
self.progress.write().blocks += 1;
}

if loaded_size != 0 {
Expand Down
18 changes: 9 additions & 9 deletions ethcore/snapshot/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use ethtrie::{TrieDB, TrieDBMut};
use hash_db::HashDB;
use journaldb::{self, Algorithm, JournalDB};
use keccak_hasher::KeccakHasher;
use parking_lot::Mutex;
use parking_lot::{Mutex, RwLock};
use kvdb::{KeyValueDB, DBValue};
use log::{debug, info, trace};
use num_cpus;
Expand Down Expand Up @@ -121,7 +121,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
block_hash: H256,
state_db: &dyn HashDB<KeccakHasher, DBValue>,
writer: W,
p: &Progress,
p: &RwLock<Progress>,
processing_threads: usize,
) -> Result<(), Error> {
let start_header = chain.block_header_data(&block_hash)
Expand Down Expand Up @@ -168,7 +168,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(
state_hashes.extend(part_state_hashes);
}

info!("Took a snapshot at #{} of {} accounts", block_number, p.accounts());
info!("Took a snapshot at #{} of {} accounts", block_number, p.read().accounts());

Ok((state_hashes, block_hashes))
}).expect("Sub-thread never panics; qed")?;
Expand All @@ -186,7 +186,7 @@ pub fn take_snapshot<W: SnapshotWriter + Send>(

writer.into_inner().finish(manifest_data)?;

p.done.store(true, Ordering::SeqCst);
p.write().done = true;

Ok(())
}
Expand All @@ -202,7 +202,7 @@ pub fn chunk_secondary<'a>(
chain: &'a BlockChain,
start_hash: H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress
progress: &'a RwLock<Progress>
) -> Result<Vec<H256>, Error> {
let mut chunk_hashes = Vec::new();
let mut snappy_buffer = vec![0; snappy::max_compressed_len(PREFERRED_CHUNK_SIZE)];
Expand All @@ -218,7 +218,7 @@ pub fn chunk_secondary<'a>(
trace!(target: "snapshot", "wrote secondary chunk. hash: {:x}, size: {}, uncompressed size: {}",
hash, size, raw_data.len());

progress.update(0, size);
progress.write().update(0, size as u64);
chunk_hashes.push(hash);
Ok(())
};
Expand All @@ -242,7 +242,7 @@ struct StateChunker<'a> {
cur_size: usize,
snappy_buffer: Vec<u8>,
writer: &'a Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
progress: &'a RwLock<Progress>,
thread_idx: usize,
}

Expand Down Expand Up @@ -275,7 +275,7 @@ impl<'a> StateChunker<'a> {
self.writer.lock().write_state_chunk(hash, compressed)?;
trace!(target: "snapshot", "Thread {} wrote state chunk. size: {}, uncompressed size: {}", self.thread_idx, compressed_size, raw_data.len());

self.progress.update(num_entries, compressed_size);
self.progress.write().update(num_entries as u64, compressed_size as u64);

self.hashes.push(hash);
self.cur_size = 0;
Expand All @@ -300,7 +300,7 @@ pub fn chunk_state<'a>(
db: &dyn HashDB<KeccakHasher, DBValue>,
root: &H256,
writer: &Mutex<dyn SnapshotWriter + 'a>,
progress: &'a Progress,
progress: &'a RwLock<Progress>,
part: Option<usize>,
thread_idx: usize,
) -> Result<Vec<H256>, Error> {
Expand Down
12 changes: 6 additions & 6 deletions ethcore/snapshot/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ pub struct Service<C: Send + Sync + 'static> {
state_chunks: AtomicUsize,
block_chunks: AtomicUsize,
client: Arc<C>,
progress: Progress,
progress: RwLock<Progress>,
taking_snapshot: AtomicBool,
restoring_snapshot: AtomicBool,
}
Expand All @@ -280,7 +280,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
state_chunks: AtomicUsize::new(0),
block_chunks: AtomicUsize::new(0),
client: params.client,
progress: Progress::new(),
progress: RwLock::new(Progress::new()),
taking_snapshot: AtomicBool::new(false),
restoring_snapshot: AtomicBool::new(false),
};
Expand Down Expand Up @@ -483,9 +483,9 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
/// Tick the snapshot service. This will log any active snapshot
/// being taken.
pub fn tick(&self) {
if self.progress.done() || !self.taking_snapshot.load(Ordering::SeqCst) { return }
if self.progress.read().done() || !self.taking_snapshot.load(Ordering::SeqCst) { return }

let p = &self.progress;
let p = &self.progress.read();
info!("Snapshot: {} accounts, {} blocks, {} bytes", p.accounts(), p.blocks(), p.bytes());
let rate = p.rate();
debug!(target: "snapshot", "Current progress rate: {:.0} acc/s, {:.0} bytes/s (compressed)", rate.0, rate.1);
Expand All @@ -507,7 +507,7 @@ impl<C> Service<C> where C: SnapshotClient + ChainInfo {
self.taking_snapshot.store(false, Ordering::SeqCst);
}}
let start_time = std::time::Instant::now();
self.progress.reset();
*self.progress.write() = Progress::new();

let temp_dir = self.temp_snapshot_dir();
let snapshot_dir = self.snapshot_dir();
Expand Down Expand Up @@ -893,7 +893,7 @@ impl<C: Send + Sync> SnapshotService for Service<C> {
fn abort_snapshot(&self) {
if self.taking_snapshot.load(Ordering::SeqCst) {
trace!(target: "snapshot", "Aborting snapshot – Snapshot under way");
self.progress.abort.store(true, Ordering::SeqCst);
self.progress.write().abort = true;
}
}

Expand Down
5 changes: 3 additions & 2 deletions ethcore/snapshot/src/traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use common_types::{
};
use engine::Engine;
use ethereum_types::H256;
use parking_lot::RwLock;

use crate::io::SnapshotWriter;

Expand Down Expand Up @@ -108,7 +109,7 @@ pub trait SnapshotComponents: Send {
chain: &BlockChain,
block_at: H256,
chunk_sink: &mut ChunkSink,
progress: &Progress,
progress: &RwLock<Progress>,
preferred_size: usize,
) -> Result<(), SnapshotError>;

Expand Down Expand Up @@ -141,7 +142,7 @@ pub trait SnapshotClient: BlockChainClient + BlockInfo + DatabaseRestore + Block
&self,
writer: W,
at: BlockId,
p: &Progress,
p: &RwLock<Progress>,
) -> Result<(), Error>;
}

Expand Down
Loading