-
Notifications
You must be signed in to change notification settings - Fork 223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
kvdb: no overlay #313
kvdb: no overlay #313
Changes from all commits
9f79bf8
7e6f2ad
60329e7
b1defce
9c62d07
544ee1e
65635d1
efb54b8
7100682
b82a5f4
24eb74c
4ae7e3f
2d2d670
4842d63
7fb861a
0784e54
488cb36
20a62a4
2fe0a11
26ef26f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,15 +12,14 @@ mod stats; | |
use std::{cmp, collections::HashMap, convert::identity, error, fs, io, mem, path::Path, result}; | ||
|
||
use parity_util_mem::MallocSizeOf; | ||
use parking_lot::{Mutex, MutexGuard, RwLock}; | ||
use parking_lot::RwLock; | ||
use rocksdb::{ | ||
BlockBasedOptions, ColumnFamily, ColumnFamilyDescriptor, Error, Options, ReadOptions, WriteBatch, WriteOptions, DB, | ||
}; | ||
|
||
use crate::iter::KeyValuePair; | ||
use fs_swap::{swap, swap_nonatomic}; | ||
use interleaved_ordered::interleave_ordered; | ||
use kvdb::{DBKey, DBOp, DBTransaction, DBValue, KeyValueDB}; | ||
use kvdb::{DBOp, DBTransaction, DBValue, KeyValueDB}; | ||
use log::{debug, warn}; | ||
|
||
#[cfg(target_os = "linux")] | ||
|
@@ -51,12 +50,6 @@ pub const DB_DEFAULT_COLUMN_MEMORY_BUDGET_MB: MiB = 128; | |
/// The default memory budget in MiB. | ||
pub const DB_DEFAULT_MEMORY_BUDGET_MB: MiB = 512; | ||
|
||
#[derive(MallocSizeOf)] | ||
enum KeyState { | ||
Insert(DBValue), | ||
Delete, | ||
} | ||
|
||
/// Compaction profile for the database settings | ||
/// Note, that changing these parameters may trigger | ||
/// the compaction process of RocksDB on startup. | ||
|
@@ -281,15 +274,8 @@ pub struct Database { | |
read_opts: ReadOptions, | ||
#[ignore_malloc_size_of = "insignificant"] | ||
block_opts: BlockBasedOptions, | ||
// Dirty values added with `write_buffered`. Cleaned on `flush`. | ||
overlay: RwLock<Vec<HashMap<DBKey, KeyState>>>, | ||
#[ignore_malloc_size_of = "insignificant"] | ||
stats: stats::RunningDbStats, | ||
// Values currently being flushed. Cleared when `flush` completes. | ||
flushing: RwLock<Vec<HashMap<DBKey, KeyState>>>, | ||
// Prevents concurrent flushes. | ||
// Value indicates if a flush is in progress. | ||
flushing_lock: Mutex<bool>, | ||
} | ||
|
||
#[inline] | ||
|
@@ -418,9 +404,6 @@ impl Database { | |
Ok(Database { | ||
db: RwLock::new(Some(DBAndColumns { db, column_names })), | ||
config: config.clone(), | ||
overlay: RwLock::new((0..config.columns).map(|_| HashMap::new()).collect()), | ||
flushing: RwLock::new((0..config.columns).map(|_| HashMap::new()).collect()), | ||
flushing_lock: Mutex::new(false), | ||
path: path.to_owned(), | ||
opts, | ||
read_opts, | ||
|
@@ -435,75 +418,6 @@ impl Database { | |
DBTransaction::new() | ||
} | ||
|
||
/// Commit transaction to database. | ||
pub fn write_buffered(&self, tr: DBTransaction) { | ||
let mut overlay = self.overlay.write(); | ||
let ops = tr.ops; | ||
for op in ops { | ||
match op { | ||
DBOp::Insert { col, key, value } => overlay[col as usize].insert(key, KeyState::Insert(value)), | ||
DBOp::Delete { col, key } => overlay[col as usize].insert(key, KeyState::Delete), | ||
}; | ||
} | ||
} | ||
|
||
/// Commit buffered changes to database. Must be called under `flush_lock` | ||
fn write_flushing_with_lock(&self, _lock: &mut MutexGuard<'_, bool>) -> io::Result<()> { | ||
match *self.db.read() { | ||
Some(ref cfs) => { | ||
let mut batch = WriteBatch::default(); | ||
let mut ops: usize = 0; | ||
let mut bytes: usize = 0; | ||
mem::swap(&mut *self.overlay.write(), &mut *self.flushing.write()); | ||
{ | ||
for (c, column) in self.flushing.read().iter().enumerate() { | ||
ops += column.len(); | ||
for (key, state) in column.iter() { | ||
let cf = cfs.cf(c); | ||
match *state { | ||
KeyState::Delete => { | ||
bytes += key.len(); | ||
batch.delete_cf(cf, key).map_err(other_io_err)? | ||
} | ||
KeyState::Insert(ref value) => { | ||
bytes += key.len() + value.len(); | ||
batch.put_cf(cf, key, value).map_err(other_io_err)? | ||
} | ||
}; | ||
} | ||
} | ||
} | ||
|
||
check_for_corruption(&self.path, cfs.db.write_opt(batch, &self.write_opts))?; | ||
self.stats.tally_transactions(1); | ||
self.stats.tally_writes(ops as u64); | ||
self.stats.tally_bytes_written(bytes as u64); | ||
|
||
for column in self.flushing.write().iter_mut() { | ||
column.clear(); | ||
column.shrink_to_fit(); | ||
} | ||
Ok(()) | ||
} | ||
None => Err(other_io_err("Database is closed")), | ||
} | ||
} | ||
|
||
/// Commit buffered changes to database. | ||
pub fn flush(&self) -> io::Result<()> { | ||
let mut lock = self.flushing_lock.lock(); | ||
// If RocksDB batch allocation fails the thread gets terminated and the lock is released. | ||
// The value inside the lock is used to detect that. | ||
if *lock { | ||
// This can only happen if another flushing thread is terminated unexpectedly. | ||
return Err(other_io_err("Database write failure. Running low on memory perhaps?")); | ||
} | ||
*lock = true; | ||
let result = self.write_flushing_with_lock(&mut lock); | ||
*lock = false; | ||
result | ||
} | ||
|
||
/// Commit transaction to database. | ||
pub fn write(&self, tr: DBTransaction) -> io::Result<()> { | ||
match *self.db.read() { | ||
|
@@ -517,9 +431,6 @@ impl Database { | |
let mut stats_total_bytes = 0; | ||
|
||
for op in ops { | ||
// remove any buffered operation for this key | ||
self.overlay.write()[op.col() as usize].remove(op.key()); | ||
|
||
let cf = cfs.cf(op.col() as usize); | ||
|
||
match op { | ||
|
@@ -546,84 +457,55 @@ impl Database { | |
pub fn get(&self, col: u32, key: &[u8]) -> io::Result<Option<DBValue>> { | ||
match *self.db.read() { | ||
Some(ref cfs) => { | ||
self.stats.tally_reads(1); | ||
let guard = self.overlay.read(); | ||
let overlay = | ||
guard.get(col as usize).ok_or_else(|| other_io_err("kvdb column index is out of bounds"))?; | ||
match overlay.get(key) { | ||
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), | ||
Some(&KeyState::Delete) => Ok(None), | ||
None => { | ||
let flushing = &self.flushing.read()[col as usize]; | ||
match flushing.get(key) { | ||
Some(&KeyState::Insert(ref value)) => Ok(Some(value.clone())), | ||
Some(&KeyState::Delete) => Ok(None), | ||
None => { | ||
let acquired_val = cfs | ||
.db | ||
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) | ||
.map(|r| r.map(|v| v.to_vec())) | ||
.map_err(other_io_err); | ||
|
||
match acquired_val { | ||
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64), | ||
Ok(None) => self.stats.tally_bytes_read(key.len() as u64), | ||
_ => {} | ||
}; | ||
|
||
acquired_val | ||
} | ||
} | ||
} | ||
if cfs.column_names.get(col as usize).is_none() { | ||
return Err(other_io_err("column index is out of bounds")); | ||
} | ||
self.stats.tally_reads(1); | ||
let value = cfs | ||
.db | ||
.get_pinned_cf_opt(cfs.cf(col as usize), key, &self.read_opts) | ||
.map(|r| r.map(|v| v.to_vec())) | ||
.map_err(other_io_err); | ||
|
||
match value { | ||
Ok(Some(ref v)) => self.stats.tally_bytes_read((key.len() + v.len()) as u64), | ||
Ok(None) => self.stats.tally_bytes_read(key.len() as u64), | ||
_ => {} | ||
}; | ||
|
||
value | ||
} | ||
None => Ok(None), | ||
} | ||
} | ||
|
||
/// Get value by partial key. Prefix size should match configured prefix size. Only searches flushed values. | ||
// TODO: support prefix seek for unflushed data | ||
/// Get value by partial key. Prefix size should match configured prefix size. | ||
pub fn get_by_prefix(&self, col: u32, prefix: &[u8]) -> Option<Box<[u8]>> { | ||
self.iter_from_prefix(col, prefix).next().map(|(_, v)| v) | ||
} | ||
|
||
/// Get database iterator for flushed data. | ||
/// Iterator over the data in the given database column index. | ||
/// Will hold a lock until the iterator is dropped | ||
/// preventing the database from being closed. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've probably asked this before, apologies, but why is it bad to close the DB while some thread is iterating over data? Is the assumption that threads iterating over some data must be allowed to complete? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a good question, but I consider it to be part of the #314 |
||
pub fn iter<'a>(&'a self, col: u32) -> impl Iterator<Item = KeyValuePair> + 'a { | ||
let read_lock = self.db.read(); | ||
let optional = if read_lock.is_some() { | ||
let overlay_data = { | ||
let overlay = &self.overlay.read()[col as usize]; | ||
let mut overlay_data = overlay | ||
.iter() | ||
.filter_map(|(k, v)| match *v { | ||
KeyState::Insert(ref value) => { | ||
Some((k.clone().into_vec().into_boxed_slice(), value.clone().into_boxed_slice())) | ||
} | ||
KeyState::Delete => None, | ||
}) | ||
.collect::<Vec<_>>(); | ||
overlay_data.sort(); | ||
overlay_data | ||
}; | ||
|
||
let guarded = iter::ReadGuardedIterator::new(read_lock, col, &self.read_opts); | ||
Some(interleave_ordered(overlay_data, guarded)) | ||
Some(guarded) | ||
} else { | ||
None | ||
}; | ||
optional.into_iter().flat_map(identity) | ||
} | ||
|
||
/// Get database iterator from prefix for flushed data. | ||
/// Iterator over data in the `col` database column index matching the given prefix. | ||
/// Will hold a lock until the iterator is dropped | ||
/// preventing the database from being closed. | ||
fn iter_from_prefix<'a>(&'a self, col: u32, prefix: &'a [u8]) -> impl Iterator<Item = iter::KeyValuePair> + 'a { | ||
let read_lock = self.db.read(); | ||
let optional = if read_lock.is_some() { | ||
let guarded = iter::ReadGuardedIterator::new_from_prefix(read_lock, col, prefix, &self.read_opts); | ||
Some(interleave_ordered(Vec::new(), guarded)) | ||
Some(guarded) | ||
} else { | ||
None | ||
}; | ||
|
@@ -636,8 +518,6 @@ impl Database { | |
/// Close the database | ||
fn close(&self) { | ||
*self.db.write() = None; | ||
self.overlay.write().clear(); | ||
self.flushing.write().clear(); | ||
} | ||
|
||
/// Restore the database from a copy at given path. | ||
|
@@ -671,8 +551,6 @@ impl Database { | |
// reopen the database and steal handles into self | ||
let db = Self::open(&self.config, &self.path)?; | ||
*self.db.write() = mem::replace(&mut *db.db.write(), None); | ||
*self.overlay.write() = mem::replace(&mut *db.overlay.write(), Vec::new()); | ||
*self.flushing.write() = mem::replace(&mut *db.flushing.write(), Vec::new()); | ||
Ok(()) | ||
} | ||
|
||
|
@@ -687,7 +565,6 @@ impl Database { | |
} | ||
|
||
/// The number of keys in a column (estimated). | ||
/// Does not take into account the unflushed data. | ||
pub fn num_keys(&self, col: u32) -> io::Result<u64> { | ||
const ESTIMATE_NUM_KEYS: &str = "rocksdb.estimate-num-keys"; | ||
match *self.db.read() { | ||
|
@@ -751,18 +628,10 @@ impl KeyValueDB for Database { | |
Database::get_by_prefix(self, col, prefix) | ||
} | ||
|
||
fn write_buffered(&self, transaction: DBTransaction) { | ||
Database::write_buffered(self, transaction) | ||
} | ||
|
||
fn write(&self, transaction: DBTransaction) -> io::Result<()> { | ||
Database::write(self, transaction) | ||
} | ||
|
||
fn flush(&self) -> io::Result<()> { | ||
Database::flush(self) | ||
} | ||
|
||
fn iter<'a>(&'a self, col: u32) -> Box<dyn Iterator<Item = KeyValuePair> + 'a> { | ||
let unboxed = Database::iter(self, col); | ||
Box::new(unboxed.into_iter()) | ||
|
@@ -805,13 +674,6 @@ impl KeyValueDB for Database { | |
} | ||
} | ||
|
||
impl Drop for Database { | ||
fn drop(&mut self) { | ||
// write all buffered changes if we can. | ||
let _ = self.flush(); | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
use super::*; | ||
|
@@ -888,8 +750,6 @@ mod tests { | |
} | ||
db.write(batch).unwrap(); | ||
|
||
db.flush().unwrap(); | ||
|
||
{ | ||
let db = db.db.read(); | ||
db.as_ref().map(|db| { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was probably lying before?
since it was interleaved with actual data?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partially, I guess
in order to support non-flushed data properly, it would have to take
flushing
into account (see e.g.get
)