Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Force value-only iteration in the public API #192

Merged
merged 7 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ pub fn run() -> Result<(), String> {
SubCommand::Check(check) => {
let db = parity_db::Db::open_read_only(&options)
.map_err(|e| format!("Invalid db: {e:?}"))?;
if !check.index_value {
// Note that we should use enum parameter instead.
return Err("Requires one of the following check flag: --index-value".to_string())
}
let check_param = parity_db::CheckOptions::new(
check.column,
check.range_start,
Expand Down Expand Up @@ -274,11 +270,6 @@ pub struct Check {
#[clap(long)]
pub column: Option<u8>,

/// Parse indexes and
/// lookup values.
#[clap(long)]
pub index_value: bool,

/// Start range for operation.
/// Index start chunk in db.
#[clap(long)]
Expand Down
144 changes: 90 additions & 54 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,33 @@ pub struct TablesRef<'a> {
pub ref_counted: bool,
}

// Used for value iteration
pub struct ValueIterState {
pub rc: u32,
pub value: Vec<u8>,
}

// Only used for DB validation and migration.
pub struct CorruptedIndexEntryInfo {
pub chunk_index: u64,
pub sub_index: u32,
pub entry: crate::index::Entry,
pub value_entry: Option<Vec<u8>>,
pub error: Option<Error>,
}

// Only used for DB validation and migration.
pub struct IterState {
pub chunk_index: u64,
pub key: Key,
pub rc: u32,
pub value: Vec<u8>,
}

// Only used for DB validation and migration.
enum IterStateOrCorrupted {
Item(IterState),
Corrupted(crate::index::Entry, Option<Error>),
Corrupted(CorruptedIndexEntryInfo),
}

#[inline]
Expand Down Expand Up @@ -724,59 +741,49 @@ impl HashColumn {
tables.index.write_stats(&self.stats)
}

pub fn iter_while(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
let tables = self.tables.read();
for table in &tables.value[..tables.value.len()] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Might be just for table in &tables.value. I believe tables.value is a vec

log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id);
table.iter_while(log.overlays(), |_, rc, value, compressed| {
let value = if compressed {
if let Ok(value) = self.compression.decompress(&value) {
value
} else {
return false
}
} else {
value
};
let state = ValueIterState { rc, value };
f(state)
})?;
log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id);
}
Ok(())
}

pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
let action = |state| match state {
IterStateOrCorrupted::Item(item) => Ok(f(item)),
IterStateOrCorrupted::Corrupted(..) =>
Err(Error::Corruption("Missing indexed value".into())),
};
self.iter_while_inner(log, action, 0, true)
self.iter_index_internal(log, action, 0)
}

fn iter_while_inner(
fn iter_index_internal(
&self,
log: &Log,
mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
start_chunk: u64,
skip_preimage_indexes: bool,
) -> Result<()> {
use blake2::{digest::typenum::U32, Blake2b, Digest};

let tables = self.tables.read();
let source = &tables.index;

if skip_preimage_indexes && self.preimage {
// It is much faster to iterate over the value table than index.
// We have to assume hashing scheme however.
for table in &tables.value[..tables.value.len() - 1] {
log::debug!( target: "parity-db", "{}: Iterating table {}", source.id, table.id);
table.iter_while(log.overlays(), |index, rc, value, compressed| {
let value = if compressed {
if let Ok(value) = self.compression.decompress(&value) {
value
} else {
return false
}
} else {
value
};
let key = Blake2b::<U32>::digest(&value);
let key = self.hash_key(&key);
let state = IterStateOrCorrupted::Item(IterState {
chunk_index: index,
key,
rc,
value,
});
f(state).unwrap_or(false)
})?;
log::debug!( target: "parity-db", "{}: Done iterating table {}", source.id, table.id);
}
}

for c in start_chunk..source.id.total_chunks() {
let entries = source.entries(c, log.overlays())?;
for entry in entries.iter() {
for (sub_index, entry) in entries.iter().enumerate() {
if entry.is_empty() {
continue
}
Expand All @@ -785,20 +792,37 @@ impl HashColumn {
(address.size_tier(), address.offset())
};

if skip_preimage_indexes &&
self.preimage && size_tier as usize != tables.value.len() - 1
{
continue
}
let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
let (value, rc, pk, compressed) = match value {
Ok(Some(v)) => v,
Ok(None) => {
f(IterStateOrCorrupted::Corrupted(*entry, None))?;
let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: c,
sub_index: sub_index as u32,
value_entry,
entry: *entry,
error: None,
}))? {
return Ok(())
}
continue
},
Err(e) => {
f(IterStateOrCorrupted::Corrupted(*entry, Some(e)))?;
let value_entry = if let Error::Corruption(_) = &e {
tables.value[size_tier as usize].dump_entry(offset).ok()
} else {
None
};
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: c,
sub_index: sub_index as u32,
value_entry,
entry: *entry,
error: Some(e),
}))? {
return Ok(())
}
continue
},
};
Expand Down Expand Up @@ -828,19 +852,22 @@ impl HashColumn {
let start_chunk = check_param.from.unwrap_or(0);
let end_chunk = check_param.bound;

let step = 1000;
let step = 10000;
let mut next_info_chunk = step;
let start_time = std::time::Instant::now();
log::info!(target: "parity-db", "Starting full index iteration at {:?}", start_time);
log::info!(target: "parity-db", "for {} chunks of column {}", self.tables.read().index.id.total_chunks(), col);
self.iter_while_inner(
let total_chunks = self.tables.read().index.id.total_chunks();
let index_id = self.tables.read().index.id;
log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
self.iter_index_internal(
log,
|state| match state {
IterStateOrCorrupted::Item(IterState { chunk_index, key, rc, value }) => {
if Some(chunk_index) == end_chunk {
return Ok(false)
}
if chunk_index % step == 0 {
log::info!(target: "parity-db", "Chunk iteration at {}", chunk_index);
if chunk_index >= next_info_chunk {
next_info_chunk += step;
log::info!(target: "parity-db", "Validated {} / {} chunks", chunk_index, total_chunks);
}

match check_param.display {
Expand All @@ -865,16 +892,25 @@ impl HashColumn {
}
Ok(true)
},
IterStateOrCorrupted::Corrupted(entry, e) => {
log::info!("Corrupted value for index entry: {}:\n\t{:?}", entry.as_u64(), e);
IterStateOrCorrupted::Corrupted(c) => {
log::error!(
"Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}",
c.chunk_index,
c.sub_index,
c.entry.address(index_id.index_bits()),
hex(&c.entry.as_u64().to_le_bytes()),
c.error,
);
if let Some(v) = c.value_entry {
log::error!("Value entry: {:?}", hex(v.as_slice()));
}
Ok(true)
},
},
start_chunk,
false,
)?;

log::info!(target: "parity-db", "Ended full index check, elapsed {:?}", start_time.elapsed());
log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
Ok(())
}

Expand Down
30 changes: 24 additions & 6 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

use crate::{
btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
column::{hash_key, ColId, Column, IterState, ReindexBatch},
column::{hash_key, ColId, Column, IterState, ReindexBatch, ValueIterState},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shoudn't we make ValueIterState pub in src/lib.rs to allow its usage from other crates?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, even though public types from private modules are allowed to be leaked. They can be used but can't be named. I've aded it to public exports anyway.

error::{try_io, Error, Result},
hash::IdentityBuildHasher,
index::PlanOutcome,
Expand Down Expand Up @@ -853,9 +853,16 @@ impl DbInner {
}
}

fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_while(&self.log, f),
Column::Hash(column) => column.iter_values(&self.log, f),
Column::Tree(_) => unimplemented!(),
}
}

fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_index(&self.log, f),
Column::Tree(_) => unimplemented!(),
}
}
Expand Down Expand Up @@ -987,12 +994,23 @@ impl Db {
self.inner.columns.len() as u8
}

/// Iterate a column and call a function for each value. This is only supported for columns with
/// `btree_index` set to `false`. Iteration order is unspecified.
/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
self.inner.iter_column_while(c, f)
}

/// Iterate a column and call a function for each value. This is only supported for columns with
/// `btree_index` set to `false`. Iteration order is unspecified. Note that the
/// `key` field in the state is the hash of the original key.
/// Unlinke `get` the iteration may not include changes made in recent `commit` calls.
pub fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
self.inner.iter_column_while(c, f)
/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
pub fn iter_column_index_while(
&self,
c: ColId,
f: impl FnMut(IterState) -> bool,
) -> Result<()> {
self.inner.iter_column_index_while(c, f)
}

fn commit_worker(db: Arc<DbInner>) -> Result<()> {
Expand Down
11 changes: 8 additions & 3 deletions src/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! Utilities for db file.

use crate::{
error::{try_io, Result},
error::{try_io, Error, Result},
parking_lot::{RwLock, RwLockUpgradableReadGuard, RwLockWriteGuard},
table::TableId,
};
Expand Down Expand Up @@ -113,7 +113,12 @@ impl TableFile {
#[cfg(unix)]
pub fn read_at(&self, buf: &mut [u8], offset: u64) -> Result<()> {
use std::os::unix::fs::FileExt;
try_io!(self.file.read().as_ref().unwrap().read_exact_at(buf, offset));
try_io!(self
.file
.read()
.as_ref()
.ok_or_else(|| Error::Corruption("File does not exist.".into()))?
.read_exact_at(buf, offset));
Ok(())
}

Expand All @@ -131,7 +136,7 @@ impl TableFile {
use std::{io, os::windows::fs::FileExt};

let file = self.file.read();
let file = file.as_ref().unwrap();
let file = file.as_ref().ok_or_else(|| Error::Corruption("File does not exist.".into()))?;

while !buf.is_empty() {
match file.seek_read(buf, offset) {
Expand Down
51 changes: 27 additions & 24 deletions src/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,33 +76,36 @@ pub fn migrate(from: &Path, mut to: Options, overwrite: bool, force_migrate: &[u
continue
}
log::info!("Migrating col {}", c);
source.iter_column_while(c, |IterState { chunk_index: index, key, rc, mut value }| {
//TODO: more efficient ref migration
for _ in 0..rc {
let value = std::mem::take(&mut value);
commit
.indexed
.entry(c)
.or_insert_with(|| IndexedChangeSet::new(c))
.changes
.push(Operation::Set(key, value.into()));
nb_commit += 1;
if nb_commit == COMMIT_SIZE {
ncommits += 1;
if let Err(e) = dest.commit_raw(std::mem::take(&mut commit)) {
log::warn!("Migration error: {:?}", e);
return false
}
nb_commit = 0;
source.iter_column_index_while(
c,
|IterState { chunk_index: index, key, rc, mut value }| {
//TODO: more efficient ref migration
for _ in 0..rc {
let value = std::mem::take(&mut value);
commit
.indexed
.entry(c)
.or_insert_with(|| IndexedChangeSet::new(c))
.changes
.push(Operation::Set(key, value.into()));
nb_commit += 1;
if nb_commit == COMMIT_SIZE {
ncommits += 1;
if let Err(e) = dest.commit_raw(std::mem::take(&mut commit)) {
log::warn!("Migration error: {:?}", e);
return false
}
nb_commit = 0;

if last_time.elapsed() > std::time::Duration::from_secs(3) {
last_time = std::time::Instant::now();
log::info!("Migrating {} #{}, commit {}", c, index, ncommits);
if last_time.elapsed() > std::time::Duration::from_secs(3) {
last_time = std::time::Instant::now();
log::info!("Migrating {} #{}, commit {}", c, index, ncommits);
}
}
}
}
true
})?;
true
},
)?;
if overwrite {
dest.commit_raw(commit)?;
commit = Default::default();
Expand Down
Loading