Skip to content

Commit

Permalink
Force value-only iteration in the public API (#192)
Browse files Browse the repository at this point in the history
* Force value-only iteration in the public API

* Improved test

* Cleanup up API

* Simplified iteraton over value tables

* Fmt

* Fixed tests
  • Loading branch information
arkpar authored Mar 22, 2023
1 parent ccdbe15 commit 443dff5
Show file tree
Hide file tree
Showing 8 changed files with 252 additions and 110 deletions.
9 changes: 0 additions & 9 deletions admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ pub fn run() -> Result<(), String> {
SubCommand::Check(check) => {
let db = parity_db::Db::open_read_only(&options)
.map_err(|e| format!("Invalid db: {e:?}"))?;
if !check.index_value {
// Note that we should use enum parameter instead.
return Err("Requires one of the following check flag: --index-value".to_string())
}
let check_param = parity_db::CheckOptions::new(
check.column,
check.range_start,
Expand Down Expand Up @@ -274,11 +270,6 @@ pub struct Check {
#[clap(long)]
pub column: Option<u8>,

/// Parse indexes and
/// lookup values.
#[clap(long)]
pub index_value: bool,

/// Start range for operation.
/// Index start chunk in db.
#[clap(long)]
Expand Down
146 changes: 92 additions & 54 deletions src/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,16 +114,35 @@ pub struct TablesRef<'a> {
pub ref_counted: bool,
}

/// Value iteration state
pub struct ValueIterState {
/// Reference counter.
pub rc: u32,
/// Value.
pub value: Vec<u8>,
}

// Only used for DB validation and migration.
pub struct CorruptedIndexEntryInfo {
pub chunk_index: u64,
pub sub_index: u32,
pub entry: crate::index::Entry,
pub value_entry: Option<Vec<u8>>,
pub error: Option<Error>,
}

// Only used for DB validation and migration.
pub struct IterState {
pub chunk_index: u64,
pub key: Key,
pub rc: u32,
pub value: Vec<u8>,
}

// Only used for DB validation and migration.
enum IterStateOrCorrupted {
Item(IterState),
Corrupted(crate::index::Entry, Option<Error>),
Corrupted(CorruptedIndexEntryInfo),
}

#[inline]
Expand Down Expand Up @@ -724,59 +743,49 @@ impl HashColumn {
tables.index.write_stats(&self.stats)
}

pub fn iter_while(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
pub fn iter_values(&self, log: &Log, mut f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
let tables = self.tables.read();
for table in &tables.value {
log::debug!( target: "parity-db", "{}: Iterating table {}", tables.index.id, table.id);
table.iter_while(log.overlays(), |_, rc, value, compressed| {
let value = if compressed {
if let Ok(value) = self.compression.decompress(&value) {
value
} else {
return false
}
} else {
value
};
let state = ValueIterState { rc, value };
f(state)
})?;
log::debug!( target: "parity-db", "{}: Done iterating table {}", tables.index.id, table.id);
}
Ok(())
}

pub fn iter_index(&self, log: &Log, mut f: impl FnMut(IterState) -> bool) -> Result<()> {
let action = |state| match state {
IterStateOrCorrupted::Item(item) => Ok(f(item)),
IterStateOrCorrupted::Corrupted(..) =>
Err(Error::Corruption("Missing indexed value".into())),
};
self.iter_while_inner(log, action, 0, true)
self.iter_index_internal(log, action, 0)
}

fn iter_while_inner(
fn iter_index_internal(
&self,
log: &Log,
mut f: impl FnMut(IterStateOrCorrupted) -> Result<bool>,
start_chunk: u64,
skip_preimage_indexes: bool,
) -> Result<()> {
use blake2::{digest::typenum::U32, Blake2b, Digest};

let tables = self.tables.read();
let source = &tables.index;

if skip_preimage_indexes && self.preimage {
// It is much faster to iterate over the value table than index.
// We have to assume hashing scheme however.
for table in &tables.value[..tables.value.len() - 1] {
log::debug!( target: "parity-db", "{}: Iterating table {}", source.id, table.id);
table.iter_while(log.overlays(), |index, rc, value, compressed| {
let value = if compressed {
if let Ok(value) = self.compression.decompress(&value) {
value
} else {
return false
}
} else {
value
};
let key = Blake2b::<U32>::digest(&value);
let key = self.hash_key(&key);
let state = IterStateOrCorrupted::Item(IterState {
chunk_index: index,
key,
rc,
value,
});
f(state).unwrap_or(false)
})?;
log::debug!( target: "parity-db", "{}: Done iterating table {}", source.id, table.id);
}
}

for c in start_chunk..source.id.total_chunks() {
let entries = source.entries(c, log.overlays())?;
for entry in entries.iter() {
for (sub_index, entry) in entries.iter().enumerate() {
if entry.is_empty() {
continue
}
Expand All @@ -785,20 +794,37 @@ impl HashColumn {
(address.size_tier(), address.offset())
};

if skip_preimage_indexes &&
self.preimage && size_tier as usize != tables.value.len() - 1
{
continue
}
let value = tables.value[size_tier as usize].get_with_meta(offset, log.overlays());
let (value, rc, pk, compressed) = match value {
Ok(Some(v)) => v,
Ok(None) => {
f(IterStateOrCorrupted::Corrupted(*entry, None))?;
let value_entry = tables.value[size_tier as usize].dump_entry(offset).ok();
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: c,
sub_index: sub_index as u32,
value_entry,
entry: *entry,
error: None,
}))? {
return Ok(())
}
continue
},
Err(e) => {
f(IterStateOrCorrupted::Corrupted(*entry, Some(e)))?;
let value_entry = if let Error::Corruption(_) = &e {
tables.value[size_tier as usize].dump_entry(offset).ok()
} else {
None
};
if !f(IterStateOrCorrupted::Corrupted(CorruptedIndexEntryInfo {
chunk_index: c,
sub_index: sub_index as u32,
value_entry,
entry: *entry,
error: Some(e),
}))? {
return Ok(())
}
continue
},
};
Expand Down Expand Up @@ -828,19 +854,22 @@ impl HashColumn {
let start_chunk = check_param.from.unwrap_or(0);
let end_chunk = check_param.bound;

let step = 1000;
let step = 10000;
let mut next_info_chunk = step;
let start_time = std::time::Instant::now();
log::info!(target: "parity-db", "Starting full index iteration at {:?}", start_time);
log::info!(target: "parity-db", "for {} chunks of column {}", self.tables.read().index.id.total_chunks(), col);
self.iter_while_inner(
let total_chunks = self.tables.read().index.id.total_chunks();
let index_id = self.tables.read().index.id;
log::info!(target: "parity-db", "Column {} (hash): Starting index validation", col);
self.iter_index_internal(
log,
|state| match state {
IterStateOrCorrupted::Item(IterState { chunk_index, key, rc, value }) => {
if Some(chunk_index) == end_chunk {
return Ok(false)
}
if chunk_index % step == 0 {
log::info!(target: "parity-db", "Chunk iteration at {}", chunk_index);
if chunk_index >= next_info_chunk {
next_info_chunk += step;
log::info!(target: "parity-db", "Validated {} / {} chunks", chunk_index, total_chunks);
}

match check_param.display {
Expand All @@ -865,16 +894,25 @@ impl HashColumn {
}
Ok(true)
},
IterStateOrCorrupted::Corrupted(entry, e) => {
log::info!("Corrupted value for index entry: {}:\n\t{:?}", entry.as_u64(), e);
IterStateOrCorrupted::Corrupted(c) => {
log::error!(
"Corrupted value for index entry: [{}][{}]: {} ({:?}). Error: {:?}",
c.chunk_index,
c.sub_index,
c.entry.address(index_id.index_bits()),
hex(&c.entry.as_u64().to_le_bytes()),
c.error,
);
if let Some(v) = c.value_entry {
log::error!("Value entry: {:?}", hex(v.as_slice()));
}
Ok(true)
},
},
start_chunk,
false,
)?;

log::info!(target: "parity-db", "Ended full index check, elapsed {:?}", start_time.elapsed());
log::info!(target: "parity-db", "Index validation complete successfully, elapsed {:?}", start_time.elapsed());
Ok(())
}

Expand Down
54 changes: 44 additions & 10 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use crate::{
btree::{commit_overlay::BTreeChangeSet, BTreeIterator, BTreeTable},
column::{hash_key, ColId, Column, IterState, ReindexBatch},
column::{hash_key, ColId, Column, IterState, ReindexBatch, ValueIterState},
error::{try_io, Error, Result},
hash::IdentityBuildHasher,
index::PlanOutcome,
Expand Down Expand Up @@ -853,14 +853,22 @@ impl DbInner {
}
}

fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_while(&self.log, f),
Column::Hash(column) => column.iter_values(&self.log, f),
Column::Tree(_) => unimplemented!(),
}
}

fn iter_column_index_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
match &self.columns[c as usize] {
Column::Hash(column) => column.iter_index(&self.log, f),
Column::Tree(_) => unimplemented!(),
}
}
}

/// Database instance.
pub struct Db {
inner: Arc<DbInner>,
commit_thread: Option<thread::JoinHandle<()>>,
Expand All @@ -870,22 +878,25 @@ pub struct Db {
}

impl Db {
pub fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
#[cfg(test)]
pub(crate) fn with_columns(path: &std::path::Path, num_columns: u8) -> Result<Db> {
let options = Options::with_columns(path, num_columns);
Self::open_inner(&options, OpeningMode::Create)
}

/// Open the database with given options.
/// Open the database with given options. An error will be returned if the database does not
/// exist.
pub fn open(options: &Options) -> Result<Db> {
Self::open_inner(options, OpeningMode::Write)
}

/// Create the database using given options.
/// Open the database using given options. If the database does not exist it will be created
/// empty.
pub fn open_or_create(options: &Options) -> Result<Db> {
Self::open_inner(options, OpeningMode::Create)
}

/// Read the database using given options
/// Open an existing database in read-only mode.
pub fn open_read_only(options: &Options) -> Result<Db> {
Self::open_inner(options, OpeningMode::ReadOnly)
}
Expand Down Expand Up @@ -987,12 +998,23 @@ impl Db {
self.inner.columns.len() as u8
}

/// Iterate a column and call a function for each value. This is only supported for columns with
/// `btree_index` set to `false`. Iteration order is unspecified.
/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
pub fn iter_column_while(&self, c: ColId, f: impl FnMut(ValueIterState) -> bool) -> Result<()> {
self.inner.iter_column_while(c, f)
}

/// Iterate a column and call a function for each value. This is only supported for columns with
/// `btree_index` set to `false`. Iteration order is unspecified. Note that the
/// `key` field in the state is the hash of the original key.
/// Unlinke `get` the iteration may not include changes made in recent `commit` calls.
pub fn iter_column_while(&self, c: ColId, f: impl FnMut(IterState) -> bool) -> Result<()> {
self.inner.iter_column_while(c, f)
/// Unlike `get` the iteration may not include changes made in recent `commit` calls.
pub(crate) fn iter_column_index_while(
&self,
c: ColId,
f: impl FnMut(IterState) -> bool,
) -> Result<()> {
self.inner.iter_column_index_while(c, f)
}

fn commit_worker(db: Arc<DbInner>) -> Result<()> {
Expand Down Expand Up @@ -1052,6 +1074,7 @@ impl Db {
Ok(())
}

/// Dump full database stats to the text output.
pub fn write_stats_text(
&self,
writer: &mut impl std::io::Write,
Expand All @@ -1060,6 +1083,7 @@ impl Db {
self.inner.write_stats_text(writer, column)
}

/// Reset internal database statistics for the database or specified column.
pub fn clear_stats(&self, column: Option<u8>) -> Result<()> {
self.inner.clear_stats(column)
}
Expand Down Expand Up @@ -1418,20 +1442,30 @@ impl IndexedChangeSet {

/// Verification operation utilities.
pub mod check {
/// Database dump verbosity.
pub enum CheckDisplay {
/// Don't output any data.
None,
/// Output full data.
Full,
/// Limit value output to the specified size.
Short(u64),
}

/// Options for producing a database dump.
pub struct CheckOptions {
/// Only process this column. If this is `None` all columns will be processed.
pub column: Option<u8>,
/// Start with this index.
pub from: Option<u64>,
/// End with this index.
pub bound: Option<u64>,
/// Verbosity.
pub display: CheckDisplay,
}

impl CheckOptions {
/// Create a new instance.
pub fn new(
column: Option<u8>,
from: Option<u64>,
Expand Down
Loading

0 comments on commit 443dff5

Please sign in to comment.