Skip to content

Commit

Permalink
Fuzzer: model more carefully what is written to disk
Browse files Browse the repository at this point in the history
Allows to check better data loss
  • Loading branch information
Tpt committed Mar 27, 2023
1 parent a28a58d commit 70c60f0
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 42 deletions.
2 changes: 1 addition & 1 deletion fuzz/fuzz_targets/refcounted_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ impl DbSimulator for Simulator {
}
}
}
Layer { values: new_state_safe_counts, written: true }
Layer { values: new_state_safe_counts, written: WrittenState::Yes }
}

fn map_operation(operation: &Operation) -> parity_db::Operation<Vec<u8>, Vec<u8>> {
Expand Down
4 changes: 3 additions & 1 deletion fuzz/fuzz_targets/simple_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,9 @@ impl DbSimulator for Simulator {
}

fn build_best_layer_for_recovery(layers: &[&Layer<u8>]) -> Layer<u8> {
layers[0].clone()
let mut layer = layers[0].clone();
layer.written = WrittenState::Yes;
layer
}

fn map_operation(operation: &(u8, Option<u8>)) -> parity_db::Operation<Vec<u8>, Vec<u8>> {
Expand Down
125 changes: 85 additions & 40 deletions fuzz/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use arbitrary::Arbitrary;
use std::{
cmp::{Ordering, PartialOrd},
collections::HashMap,
fmt::Debug,
fmt::{self, Debug, Formatter},
};
use tempfile::tempdir;

Expand Down Expand Up @@ -48,11 +48,31 @@ pub enum Action<O: Debug> {
IterNext,
}

#[derive(Clone, Debug)]
#[derive(Clone)]
pub struct Layer<V> {
// The stored value per possible key (depends if we have ref counting or not)
pub values: [Option<V>; NUMBER_OF_POSSIBLE_KEYS],
pub written: bool,
pub written: WrittenState,
}

impl<V: Debug> Debug for Layer<V> {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let mut output = f.debug_map();
output.entry(&"written", &self.written);
for (k, v) in self.values.iter().enumerate() {
if let Some(v) = v {
output.entry(&k, v);
}
}
output.finish()
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq)]
pub enum WrittenState {
Yes,
Processed,
No,
}

#[derive(Clone, Copy, Debug)]
Expand Down Expand Up @@ -166,64 +186,78 @@ pub trait DbSimulator {
Action::Transaction(operations) => {
let mut values = model_values(&layers);
Self::apply_operations_on_values(operations, &mut values);
layers.push(Layer { values, written: false });
db.db
.commit_changes(operations.iter().map(|o| (0, Self::map_operation(o))))
.unwrap();
layers.push(Layer { values, written: WrittenState::No });
},
Action::ProcessReindex =>
db = Self::try_or_restart(
Action::ProcessReindex => {
let (new_db, _) = Self::try_or_restart(
|db| db.process_reindex(),
db,
&mut layers,
&old_layers,
&options,
),
);
db = new_db;
},
Action::ProcessCommits => {
for layer in &mut layers {
if !layer.written {
layer.written = true;
if layer.written == WrittenState::No {
layer.written = WrittenState::Processed;
break
}
}
db = Self::try_or_restart(
let (new_db, _) = Self::try_or_restart(
|db| db.process_commits(),
db,
&mut layers,
&old_layers,
&options,
)
);
db = new_db;
},
Action::FlushLog =>
db = Self::try_or_restart(
Action::FlushLog => {
let (new_db, result) = Self::try_or_restart(
|db| db.flush_logs(),
db,
&mut layers,
&old_layers,
&options,
),
Action::EnactLog =>
db = Self::try_or_restart(
);
db = new_db;
if result.is_ok() {
for layer in &mut layers {
if layer.written == WrittenState::Processed {
layer.written = WrittenState::Yes;
}
}
}
},
Action::EnactLog => {
let (new_db, _) = Self::try_or_restart(
|db| db.enact_logs(),
db,
&mut layers,
&old_layers,
&options,
),
Action::CleanLogs =>
db = Self::try_or_restart(
);
db = new_db;
},
Action::CleanLogs => {
let (new_db, _) = Self::try_or_restart(
|db| db.clean_logs(),
db,
&mut layers,
&old_layers,
&options,
),
);
db = new_db;
},
Action::Restart => {
// drop processes commits
// drop might flush commits
for layer in &mut layers {
if !layer.written {
layer.written = true;
}
layer.written = WrittenState::Processed;
}
old_layers.push(layers.clone());
db = {
Expand Down Expand Up @@ -305,16 +339,16 @@ pub trait DbSimulator {
layers: &mut Vec<Layer<Self::ValueType>>,
old_layers: &[Vec<Layer<Self::ValueType>>],
options: &parity_db::Options,
) -> DbWithIter {
) -> (DbWithIter, Result<(), parity_db::Error>) {
match op(&db.db) {
Ok(()) => db,
Ok(()) => (db, Ok(())),
Err(e) if e.to_string().contains("Instrumented failure") => {
log::debug!("Restarting after an instrumented failure");
drop(db);
parity_db::set_number_of_allowed_io_operations(usize::MAX);
db = DbWithIter::open(options).unwrap();
Self::reset_model_from_database(&db.db, layers, old_layers);
db
(db, Err(e))
},
Err(e) => panic!("database error: {}", e),
}
Expand All @@ -333,44 +367,55 @@ pub trait DbSimulator {
}
}

if let Some(layers) = Self::attempt_to_reset_model_to_disk_state(layers, &disk_state) {
if let Some(layers) = Self::attempt_to_reset_model_to_disk_state(layers, &disk_state)? {
return Ok(layers)
}
for layers in old_layers {
if let Some(layers) =
Self::attempt_to_reset_model_to_disk_state(layers, &disk_state)
Self::attempt_to_reset_model_to_disk_state(layers, &disk_state)?
{
return Ok(layers)
}
}
Err(parity_db::Error::Corruption(format!("Not able to recover the database to one of the valid state. The current database state is: {disk_state:?}")))
Err(parity_db::Error::Corruption(format!("Not able to recover the database to one of the valid state. The current database state is: {disk_state:?} and the state stack is is {layers:?}")))
})
}

fn attempt_to_reset_model_to_disk_state(
layers: &[Layer<Self::ValueType>],
state: &[(u8, u8)],
) -> Option<Vec<Layer<Self::ValueType>>> {
) -> Result<Option<Vec<Layer<Self::ValueType>>>, parity_db::Error> {
let mut candidates = Vec::new();
for layer in layers.iter().rev() {
if !layer.written {
continue
}

if Self::is_layer_state_compatible_with_disk_state(&layer.values, state) {
// We found a correct last layer
candidates.push(layer);
match layer.written {
WrittenState::No => (),
WrittenState::Processed => {
// might already be flushed to disk but maybe we are in a previous state
if Self::is_layer_state_compatible_with_disk_state(&layer.values, state) {
candidates.push(layer)
}
},
WrittenState::Yes => {
// must be the state
if Self::is_layer_state_compatible_with_disk_state(&layer.values, state) {
candidates.push(layer);
break
}
if candidates.is_empty() {
return Err(parity_db::Error::Corruption(format!("Not able to recover the database to one of the valid state. The current database state is: {state:?} and the state stack is is {layers:?}")));
}
},
}
}
if candidates.is_empty() {
Ok(if candidates.is_empty() {
if state.is_empty() {
Some(Vec::new())
} else {
None
}
} else {
Some(vec![Self::build_best_layer_for_recovery(&candidates)])
}
})
}

fn check_db_and_model_are_equals(
Expand Down

0 comments on commit 70c60f0

Please sign in to comment.