Skip to content

Commit

Permalink
Switch from parity-rocksdb to upstream rust-rocksdb
Browse files Browse the repository at this point in the history
  • Loading branch information
bkchr committed Dec 4, 2018
1 parent ac69149 commit 616b401
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 84 deletions.
2 changes: 1 addition & 1 deletion kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ log = "0.4"
num_cpus = "1.0"
parking_lot = "0.6"
regex = "1.0"
parity-rocksdb = "0.5"
rocksdb = "0.10"

[dev-dependencies]
tempdir = "0.3"
Expand Down
191 changes: 108 additions & 83 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ extern crate interleaved_ordered;
extern crate num_cpus;
extern crate parking_lot;
extern crate regex;
extern crate parity_rocksdb;
extern crate rocksdb;

#[cfg(test)]
extern crate ethereum_types;
Expand All @@ -36,9 +36,10 @@ use std::{cmp, fs, io, mem, result, error};
use std::path::Path;

use parking_lot::{Mutex, MutexGuard, RwLock};
use parity_rocksdb::{
DB, Writable, WriteBatch, WriteOptions, IteratorMode, DBIterator,
Options, BlockBasedOptions, Direction, Cache, Column, ReadOptions
use rocksdb::{
DB, WriteBatch, WriteOptions, IteratorMode, DBIterator,
Options, BlockBasedOptions, Direction, ReadOptions, ColumnFamily,
Error
};
use interleaved_ordered::{interleave_ordered, InterleaveOrdered};

Expand Down Expand Up @@ -206,7 +207,7 @@ impl Default for DatabaseConfig {
//
pub struct DatabaseIterator<'a> {
iter: InterleaveOrdered<::std::vec::IntoIter<(Box<[u8]>, Box<[u8]>)>, DBIterator>,
_marker: PhantomData<&'a Database>,
_marker: PhantomData<&'a ()>,
}

impl<'a> Iterator for DatabaseIterator<'a> {
Expand All @@ -219,38 +220,50 @@ impl<'a> Iterator for DatabaseIterator<'a> {

struct DBAndColumns {
db: DB,
cfs: Vec<Column>,
cfs: Vec<MakeSendSync<ColumnFamily>>,
}

// get column family configuration from database config.
fn col_config(config: &DatabaseConfig, block_opts: &BlockBasedOptions) -> io::Result<Options> {
let mut opts = Options::new();

opts.set_parsed_options("level_compaction_dynamic_level_bytes=true").map_err(other_io_err)?;
let mut opts = Options::default();

opts.set_block_based_table_factory(block_opts);

opts.set_parsed_options(
&format!("block_based_table_factory={{{};{}}}",
"cache_index_and_filter_blocks=true",
"pin_l0_filter_and_index_blocks_in_cache=true")).map_err(other_io_err)?;

opts.optimize_level_style_compaction(config.memory_budget_per_col() as i32);
opts.optimize_level_style_compaction(config.memory_budget_per_col());
opts.set_target_file_size_base(config.compaction.initial_file_size);

opts.set_parsed_options("compression_per_level=").map_err(other_io_err)?;

Ok(opts)
}

/// Utility structure that makes the given type implement `Send + Sync`.
/// YOU NEED TO BE SURE WHAT YOU ARE DOING!
struct MakeSendSync<T>(T);

unsafe impl<T> Send for MakeSendSync<T> {}
unsafe impl<T> Sync for MakeSendSync<T> {}

impl<T> ::std::ops::Deref for MakeSendSync<T> {
type Target = T;

fn deref(&self) -> &T {
&self.0
}
}

impl<T> From<T> for MakeSendSync<T> {
fn from(data: T) -> MakeSendSync<T> {
MakeSendSync(data)
}
}

/// Key-Value database.
pub struct Database {
db: RwLock<Option<DBAndColumns>>,
config: DatabaseConfig,
write_opts: WriteOptions,
read_opts: ReadOptions,
block_opts: BlockBasedOptions,
path: String,
write_opts: MakeSendSync<WriteOptions>,
read_opts: MakeSendSync<ReadOptions>,
block_opts: MakeSendSync<BlockBasedOptions>,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<ElasticArray32<u8>, KeyState>>>,
// Values currently being flushed. Cleared when `flush` completes.
Expand All @@ -261,9 +274,12 @@ pub struct Database {
}

#[inline]
fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, String>) -> io::Result<T> {
fn check_for_corruption<T, P: AsRef<Path>>(
path: P,
res: result::Result<T, Error>
) -> io::Result<T> {
if let Err(ref s) = res {
if s.starts_with("Corruption:") {
if is_corrupted(s) {
warn!("DB corrupted: {}. Repair will be triggered on next restart", s);
let _ = fs::File::create(path.as_ref().join(Database::CORRUPTION_FILE_NAME));
}
Expand All @@ -272,8 +288,26 @@ fn check_for_corruption<T, P: AsRef<Path>>(path: P, res: result::Result<T, Strin
res.map_err(other_io_err)
}

fn is_corrupted(s: &str) -> bool {
s.starts_with("Corruption:") || s.starts_with("Invalid argument: You have to open all column families")
fn is_corrupted(err: &Error) -> bool {
err.as_ref().starts_with("Corruption:")

This comment has been minimized.

Copy link
@andresilva

andresilva Dec 4, 2018

Contributor

I'm not sure whether these strings were generated by our wrapper, so maybe they won't match the upstream library.

This comment has been minimized.

Copy link
@bkchr

bkchr Dec 4, 2018

Author Member
|| err.as_ref().starts_with("Invalid argument: You have to open all column families")
}

/// Generate the options for RocksDB, based on the given `DatabaseConfig`.
fn generate_options(config: &DatabaseConfig) -> Options {
let mut opts = Options::default();

//TODO: rate_limiter_bytes_per_sec={} was removed

opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
opts.set_bytes_per_sync(1048576);
//TODO: keep_log_file_num=1 was removed
opts.set_write_buffer_size(config.memory_budget_per_col() / 2);
opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2));

opts
}

impl Database {
Expand All @@ -286,33 +320,16 @@ impl Database {

/// Open database file. Creates if it does not exist.
pub fn open(config: &DatabaseConfig, path: &str) -> io::Result<Database> {
let mut opts = Options::new();

if let Some(rate_limit) = config.compaction.write_rate_limit {
opts.set_parsed_options(&format!("rate_limiter_bytes_per_sec={}", rate_limit)).map_err(other_io_err)?;
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
opts.set_parsed_options("keep_log_file_num=1").map_err(other_io_err)?;
opts.set_parsed_options("bytes_per_sync=1048576").map_err(other_io_err)?;
opts.set_db_write_buffer_size(config.memory_budget_per_col() / 2);
opts.increase_parallelism(cmp::max(1, ::num_cpus::get() as i32 / 2));

let mut block_opts = BlockBasedOptions::new();

{
block_opts.set_block_size(config.compaction.block_size);
let cache_size = cmp::max(8, config.memory_budget() / 3);
let cache = Cache::new(cache_size);
block_opts.set_cache(cache);
}
let mut block_opts = BlockBasedOptions::default();
block_opts.set_block_size(config.compaction.block_size);
let cache_size = cmp::max(8, config.memory_budget() / 3);
block_opts.set_lru_cache(cache_size);

// attempt database repair if it has been previously marked as corrupted
let db_corrupted = Path::new(path).join(Database::CORRUPTION_FILE_NAME);
if db_corrupted.exists() {
warn!("DB has been previously marked as corrupted, attempting repair");
DB::repair(&opts, path).map_err(other_io_err)?;
DB::repair(generate_options(config), path).map_err(other_io_err)?;
fs::remove_file(db_corrupted)?;
}

Expand All @@ -327,21 +344,22 @@ impl Database {
}

let write_opts = WriteOptions::new();
let mut read_opts = ReadOptions::new();
read_opts.set_verify_checksums(false);
let read_opts = ReadOptions::default();
//TODO: removed read_opts.set_verify_checksums(false);

let mut cfs: Vec<Column> = Vec::new();
let opts = generate_options(config);
let mut cfs: Vec<ColumnFamily> = Vec::new();
let db = match config.columns {
Some(_) => {
match DB::open_cf(&opts, path, &cfnames, &cf_options) {
match DB::open_cf(&opts, path, &cfnames) {
Ok(db) => {
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
Ok(db)
}
Err(_) => {
// retry and create CFs
match DB::open_cf(&opts, path, &[], &[]) {
match DB::open_cf(&opts, path, &[]) {
Ok(mut db) => {
cfs = cfnames.iter()
.enumerate()
Expand All @@ -362,33 +380,32 @@ impl Database {
Ok(db) => db,
Err(ref s) if is_corrupted(s) => {
warn!("DB corrupted: {}, attempting repair", s);
DB::repair(&opts, path).map_err(other_io_err)?;

match cfnames.is_empty() {
true => DB::open(&opts, path).map_err(other_io_err)?,
false => {
let db = DB::open_cf(&opts, path, &cfnames, &cf_options).map_err(other_io_err)?;
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
db
},
DB::repair(generate_options(config), path).map_err(other_io_err)?;

if cfnames.is_empty() {
DB::open(&opts, path).map_err(other_io_err)?
} else {
let db = DB::open_cf(&opts, path, &cfnames).map_err(other_io_err)?;
cfs = cfnames.iter().map(|n| db.cf_handle(n)
.expect("rocksdb opens a cf_handle for each cfname; qed")).collect();
db
}
},
}
Err(s) => {
return Err(other_io_err(s))
}
};
let num_cols = cfs.len();
Ok(Database {
db: RwLock::new(Some(DBAndColumns{ db: db, cfs: cfs })),
db: RwLock::new(Some(DBAndColumns{ db, cfs: cfs.into_iter().map(Into::into).collect() })),
config: config.clone(),
write_opts: write_opts,
overlay: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing: RwLock::new((0..(num_cols + 1)).map(|_| HashMap::new()).collect()),
flushing_lock: Mutex::new(false),
path: path.to_owned(),
read_opts: read_opts,
block_opts: block_opts,
read_opts: read_opts.into(),
write_opts: write_opts.into(),
block_opts: block_opts.into(),
})
}

Expand Down Expand Up @@ -423,22 +440,22 @@ impl Database {
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<bool>) -> io::Result<()> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let batch = WriteBatch::new();
let mut batch = WriteBatch::default();
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
{
for (c, column) in self.flushing.read().iter().enumerate() {
for (key, state) in column.iter() {
match *state {
KeyState::Delete => {
if c > 0 {
batch.delete_cf(cfs[c - 1], key).map_err(other_io_err)?;
batch.delete_cf(*cfs[c - 1], key).map_err(other_io_err)?;
} else {
batch.delete(key).map_err(other_io_err)?;
}
},
KeyState::Insert(ref value) => {
if c > 0 {
batch.put_cf(cfs[c - 1], key, value).map_err(other_io_err)?;
batch.put_cf(*cfs[c - 1], key, value).map_err(other_io_err)?;
} else {
batch.put(key, value).map_err(other_io_err)?;
}
Expand All @@ -448,9 +465,7 @@ impl Database {
}
}

check_for_corruption(
&self.path,
db.write_opt(batch, &self.write_opts))?;
check_for_corruption(&self.path, db.write_opt(batch, &self.write_opts))?;

for column in self.flushing.write().iter_mut() {
column.clear();
Expand Down Expand Up @@ -481,7 +496,7 @@ impl Database {
pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let batch = WriteBatch::new();
let mut batch = WriteBatch::default();
let ops = tr.ops;
for op in ops {
// remove any buffered operation for this key
Expand All @@ -490,11 +505,11 @@ impl Database {
match op {
DBOp::Insert { col, key, value } => match col {
None => batch.put(&key, &value).map_err(other_io_err)?,
Some(c) => batch.put_cf(cfs[c as usize], &key, &value).map_err(other_io_err)?,
Some(c) => batch.put_cf(*cfs[c as usize], &key, &value).map_err(other_io_err)?,
},
DBOp::Delete { col, key } => match col {
None => batch.delete(&key).map_err(other_io_err)?,
Some(c) => batch.delete_cf(cfs[c as usize], &key).map_err(other_io_err)?,
Some(c) => batch.delete_cf(*cfs[c as usize], &key).map_err(other_io_err)?,
}
}
}
Expand All @@ -520,8 +535,13 @@ impl Database {
Some(&KeyState::Delete) => Ok(None),
None => {
col.map_or_else(
|| db.get_opt(key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))),
|c| db.get_cf_opt(cfs[c as usize], key, &self.read_opts).map(|r| r.map(|v| DBValue::from_slice(&v))))
|| db
.get_opt(key, &self.read_opts)
.map(|r| r.map(|v| DBValue::from_slice(&v))),
|c| db
.get_cf_opt(*cfs[c as usize], key, &self.read_opts)
.map(|r| r.map(|v| DBValue::from_slice(&v)))
)
.map_err(other_io_err)
},
}
Expand Down Expand Up @@ -552,14 +572,19 @@ impl Database {
let mut overlay_data = overlay.iter()
.filter_map(|(k, v)| match *v {
KeyState::Insert(ref value) =>
Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_vec().into_boxed_slice())),
Some(
(
k.clone().into_vec().into_boxed_slice(),
value.clone().into_vec().into_boxed_slice()
)
),
KeyState::Delete => None,
}).collect::<Vec<_>>();
overlay_data.sort();

let iter = col.map_or_else(
|| db.iterator_opt(IteratorMode::Start, &self.read_opts),
|c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::Start, &self.read_opts)
|| db.iterator(IteratorMode::Start),
|c| db.iterator_cf(*cfs[c as usize], IteratorMode::Start)
.expect("iterator params are valid; qed")
);

Expand All @@ -575,8 +600,8 @@ impl Database {
fn iter_from_prefix(&self, col: Option<u32>, prefix: &[u8]) -> Option<DatabaseIterator> {
match *self.db.read() {
Some(DBAndColumns { ref db, ref cfs }) => {
let iter = col.map_or_else(|| db.iterator_opt(IteratorMode::From(prefix, Direction::Forward), &self.read_opts),
|c| db.iterator_cf_opt(cfs[c as usize], IteratorMode::From(prefix, Direction::Forward), &self.read_opts)
let iter = col.map_or_else(|| db.iterator(IteratorMode::From(prefix, Direction::Forward)),
|c| db.iterator_cf(*cfs[c as usize], IteratorMode::From(prefix, Direction::Forward))
.expect("iterator params are valid; qed"));

Some(DatabaseIterator {
Expand Down Expand Up @@ -657,7 +682,7 @@ impl Database {
Some(DBAndColumns { ref mut db, ref mut cfs }) => {
let col = cfs.len() as u32;
let name = format!("col{}", col);
cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?);
cfs.push(db.create_cf(&name, &col_config(&self.config, &self.block_opts)?).map_err(other_io_err)?.into());
Ok(())
},
None => Ok(()),
Expand Down

0 comments on commit 616b401

Please sign in to comment.