Skip to content

Commit

Permalink
kvdb: no overlay (#313)
Browse files Browse the repository at this point in the history
* kvdb-rocksdb: no overlay experiment

* kvdb-rocksdb: remove unused dep

* kvdb-rocksdb: panic on write failure

* kvdb: remove write_buffered and flush

* kvdb: update changelog

* kvdb-rocksdb: update changelog

* kvdb-web: update changelog

* kvdb-memory: update changelog

* kvdb-rocksdb: fix the bench

* kvdb-rocksdb: fix the bench #2

* kvdb: cargo fmt

* Apply suggestions from code review

Co-Authored-By: David <dvdplm@gmail.com>

* kvdb-rocksdb: s/acquired_val/value

* kvdb-rocksdb: update the changelog

Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
ordian and dvdplm authored Mar 26, 2020
1 parent 8a29232 commit 7093b7e
Show file tree
Hide file tree
Showing 12 changed files with 48 additions and 209 deletions.
2 changes: 2 additions & 0 deletions kvdb-memorydb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog].
[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/

## [Unreleased]
### Breaking
- Updated to the new `kvdb` interface. [#313](https://github.com/paritytech/parity-common/pull/313)

## [0.5.0] - 2020-03-16
- License changed from GPL3 to dual MIT/Apache2. [#342](https://github.com/paritytech/parity-common/pull/342)
Expand Down
5 changes: 1 addition & 4 deletions kvdb-memorydb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl KeyValueDB for InMemory {
}
}

fn write_buffered(&self, transaction: DBTransaction) {
fn write(&self, transaction: DBTransaction) -> io::Result<()> {
let mut columns = self.columns.write();
let ops = transaction.ops;
for op in ops {
Expand All @@ -69,9 +69,6 @@ impl KeyValueDB for InMemory {
}
}
}
}

fn flush(&self) -> io::Result<()> {
Ok(())
}

Expand Down
2 changes: 2 additions & 0 deletions kvdb-rocksdb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ The format is based on [Keep a Changelog].
[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/

## [Unreleased]
### Breaking
- Updated to the new `kvdb` interface. [#313](https://github.com/paritytech/parity-common/pull/313)

## [0.7.0] - 2020-03-16
- Updated dependencies. [#361](https://github.com/paritytech/parity-common/pull/361)
Expand Down
1 change: 0 additions & 1 deletion kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ harness = false
[dependencies]
smallvec = "1.0.0"
fs-swap = "0.2.4"
interleaved-ordered = "0.1.1"
kvdb = { path = "../kvdb", version = "0.5" }
log = "0.4.8"
num_cpus = "1.10.1"
Expand Down
2 changes: 0 additions & 2 deletions kvdb-rocksdb/benches/bench_read_perf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,6 @@ fn populate(db: &Database) -> io::Result<Vec<H256>> {
batch.put(0, &key.as_bytes(), &n_random_bytes(140));
}
db.write(batch)?;
// Clear the overlay
db.flush()?;
Ok(needles)
}

Expand Down
186 changes: 23 additions & 163 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,14 @@ mod stats;
use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result};

use parity_util_mem::MallocSizeOf;
use parking_lot::{Mutex, MutexGuard, RwLock};
use parking_lot::RwLock;
use rocksdb::{
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB,
};

use crate::iter::KeyValuePair;
use fs_swap::{swap, swap_nonatomic};
use interleaved_ordered::interleave_ordered;
use kvdb::{DBKey, DBOp, DBTransaction, DBValue, KeyValueDB};
use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB};
use log::{debug, warn};

#[cfg(target_os = "linux")]
Expand Down Expand Up @@ -51,12 +50,6 @@ pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128;
/// The default memory budget in MiB.
pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512;

#[derive(MallocSizeOf)]
enum KeyState {
Insert(DBValue),
Delete,
}

/// Compaction profile for the database settings
/// Note, that changing these parameters may trigger
/// the compaction process of RocksDB on startup.
Expand Down Expand Up @@ -281,15 +274,8 @@ pub struct Database {
read_opts: ReadOptions,
#[ignore_malloc_size_of = "insignificant"]
block_opts: BlockBasedOptions,
// Dirty values added with `write_buffered`. Cleaned on `flush`.
overlay: RwLock<Vec<HashMap<DBKey, KeyState>>>,
#[ignore_malloc_size_of = "insignificant"]
stats: stats::RunningDbStats,
// Values currently being flushed. Cleared when `flush` completes.
flushing: RwLock<Vec<HashMap<DBKey, KeyState>>>,
// Prevents concurrent flushes.
// Value indicates if a flush is in progress.
flushing_lock: Mutex<bool>,
}

#[inline]
Expand Down Expand Up @@ -418,9 +404,6 @@ impl Database {
Ok(Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
overlay: RwLock::new((0..config.columns).map(|_| HashMap::new()).collect()),
flushing: RwLock::new((0..config.columns).map(|_| HashMap::new()).collect()),
flushing_lock: Mutex::new(false),
path: path.to_owned(),
opts,
read_opts,
Expand All @@ -435,75 +418,6 @@ impl Database {
DBTransaction::new()
}

/// Commit transaction to database.
pub fn write_buffered(&self, tr: DBTransaction) {
let mut overlay = self.overlay.write();
let ops = tr.ops;
for op in ops {
match op {
DBOp::Insert { col, key, value } => overlay[col as usize].insert(key, KeyState::Insert(value)),
DBOp::Delete { col, key } => overlay[col as usize].insert(key, KeyState::Delete),
};
}
}

/// Commit buffered changes to database. Must be called under `flush_lock`
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<'_, bool>) -> io::Result<()> {
match *self.db.read() {
Some(ref cfs) => {
let mut batch = WriteBatch::default();
let mut ops: usize = 0;
let mut bytes: usize = 0;
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write());
{
for (c, column) in self.flushing.read().iter().enumerate() {
ops += column.len();
for (key, state) in column.iter() {
let cf = cfs.cf(c);
match *state {
KeyState::Delete => {
bytes += key.len();
batch.delete_cf(cf, key).map_err(other_io_err)?
}
KeyState::Insert(ref value) => {
bytes += key.len() + value.len();
batch.put_cf(cf, key, value).map_err(other_io_err)?
}
};
}
}
}

check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?;
self.stats.tally_transactions(1);
self.stats.tally_writes(ops as u64);
self.stats.tally_bytes_written(bytes as u64);

for column in self.flushing.write().iter_mut() {
column.clear();
column.shrink_to_fit();
}
Ok(())
}
None => Err(other_io_err("Database is closed")),
}
}

/// Commit buffered changes to database.
pub fn flush(&self) -> io::Result<()> {
let mut lock = self.flushing_lock.lock();
// If RocksDB batch allocation fails the thread gets terminated and the lock is released.
// The value inside the lock is used to detect that.
if *lock {
// This can only happen if another flushing thread is terminated unexpectedly.
return Err(other_io_err("Database write failure. Running low on memory perhaps?"));
}
*lock = true;
let result = self.write_flushing_with_lock(&mut lock);
*lock = false;
result
}

/// Commit transaction to database.
pub fn write(&self, tr: DBTransaction) -> io::Result<()> {
match *self.db.read() {
Expand All @@ -517,9 +431,6 @@ impl Database {
let mut stats_total_bytes = 0;

for op in ops {
// remove any buffered operation for this key
self.overlay.write()[op.col() as usize].remove(op.key());

let cf = cfs.cf(op.col() as usize);

match op {
Expand All @@ -546,84 +457,55 @@ impl Database {
pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> {
match *self.db.read() {
Some(ref cfs) => {
self.stats.tally_reads(1);
let guard = self.overlay.read();
let overlay =
guard.get(col as usize).ok_or_else(|| other_io_err("kvdb column index is out of bounds"))?;
match overlay.get(key) {
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
let flushing = &self.flushing.read()[col as usize];
match flushing.get(key) {
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())),
Some(&KeyState::Delete) => Ok(None),
None => {
let acquired_val = cfs
.db
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);

match acquired_val {
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
_ => {}
};

acquired_val
}
}
}
if cfs.column_names.get(col as usize).is_none() {
return Err(other_io_err("column index is out of bounds"));
}
self.stats.tally_reads(1);
let value = cfs
.db
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts)
.map(|r| r.map(|v| v.to_vec()))
.map_err(other_io_err);

match value {
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64),
Ok(None) => self.stats.tally_bytes_read(key.len() as u64),
_ => {}
};

value
}
None => Ok(None),
}
}

/// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values.
// TODO: support prefix seek for unflushed data
/// Get value by partial key. Prefix size should match configured prefix size.
pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> {
self.iter_from_prefix(col, prefix).next().map(|(_, v)| v)
}

/// Get database iterator for flushed data.
/// Iterator over the data in the given database column index.
/// Will hold a lock until the iterator is dropped
/// preventing the database from being closed.
pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = KeyValuePair> + 'a {
let read_lock = self.db.read();
let optional = if read_lock.is_some() {
let overlay_data = {
let overlay = &self.overlay.read()[col as usize];
let mut overlay_data = overlay
.iter()
.filter_map(|(k, v)| match *v {
KeyState::Insert(ref value) => {
Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_boxed_slice()))
}
KeyState::Delete => None,
})
.collect::<Vec<_>>();
overlay_data.sort();
overlay_data
};

let guarded = iter::ReadGuardedIterator::new(read_lock, col, &self.read_opts);
Some(interleave_ordered(overlay_data, guarded))
Some(guarded)
} else {
None
};
optional.into_iter().flat_map(identity)
}

/// Get database iterator from prefix for flushed data.
/// Iterator over data in the `col` database column index matching the given prefix.
/// Will hold a lock until the iterator is dropped
/// preventing the database from being closed.
fn iter_from_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a {
let read_lock = self.db.read();
let optional = if read_lock.is_some() {
let guarded = iter::ReadGuardedIterator::new_from_prefix(read_lock, col, prefix, &self.read_opts);
Some(interleave_ordered(Vec::new(), guarded))
Some(guarded)
} else {
None
};
Expand All @@ -636,8 +518,6 @@ impl Database {
/// Close the database
fn close(&self) {
*self.db.write() = None;
self.overlay.write().clear();
self.flushing.write().clear();
}

/// Restore the database from a copy at given path.
Expand Down Expand Up @@ -671,8 +551,6 @@ impl Database {
// reopen the database and steal handles into self
let db = Self::open(&self.config, &self.path)?;
*self.db.write() = mem::replace(&mut *db.db.write(), None);
*self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new());
*self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new());
Ok(())
}

Expand All @@ -687,7 +565,6 @@ impl Database {
}

/// The number of keys in a column (estimated).
/// Does not take into account the unflushed data.
pub fn num_keys(&self, col: u32) -> io::Result<u64> {
const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys";
match *self.db.read() {
Expand Down Expand Up @@ -751,18 +628,10 @@ impl KeyValueDB for Database {
Database::get_by_prefix(self, col, prefix)
}

fn write_buffered(&self, transaction: DBTransaction) {
Database::write_buffered(self, transaction)
}

fn write(&self, transaction: DBTransaction) -> io::Result<()> {
Database::write(self, transaction)
}

fn flush(&self) -> io::Result<()> {
Database::flush(self)
}

fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> {
let unboxed = Database::iter(self, col);
Box::new(unboxed.into_iter())
Expand Down Expand Up @@ -805,13 +674,6 @@ impl KeyValueDB for Database {
}
}

impl Drop for Database {
fn drop(&mut self) {
// write all buffered changes if we can.
let _ = self.flush();
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -888,8 +750,6 @@ mod tests {
}
db.write(batch).unwrap();

db.flush().unwrap();

{
let db = db.db.read();
db.as_ref().map(|db| {
Expand Down
Loading

0 comments on commit 7093b7e

Please sign in to comment.