Skip to content

Commit

Permalink
Delete by prefix operator in kvdb (#360)
Browse files Browse the repository at this point in the history
* Switch from `parity-rocksdb` to upstream `rust-rocksdb`

* Update to latest rocksdb

* Delete prefix as range exept when start is full of 255 (corner case).

* forcing ignore lifetime (not more unsafe than before, columnfamily is
probably not an issue, dbiterator is one as it will fail on close db).

* remove temporarilly path deps for easier patch usage.

* being a bit more exhaustive

* Tests for delete prefix, fix rocksdb full column delete.

* revert util-mem

* update versionning.

* better end prefix from review

* revert test error check

* Update kvdb-memorydb/src/lib.rs

Co-Authored-By: Andronik Ordian <write@reusable.software>

* Update kvdb-shared-tests/src/lib.rs

Co-Authored-By: Andronik Ordian <write@reusable.software>

* Update kvdb/src/lib.rs

Co-Authored-By: Andronik Ordian <write@reusable.software>

* Update kvdb/src/lib.rs

Co-Authored-By: Andronik Ordian <write@reusable.software>

* applying suggestions, and remove delete by prefix method.

* io stats num column

* end_prefix test

* format

* Redundant delete.

* Update kvdb/src/lib.rs

Co-Authored-By: David <dvdplm@gmail.com>

* Update kvdb/src/lib.rs

Co-Authored-By: David <dvdplm@gmail.com>

* Documentation fix and additional test case in end_prefix_test

* Doc.

* doc

Co-authored-by: Bastian Köcher <git@kchr.de>
Co-authored-by: Benjamin Kampmann <ben@gnunicorn.org>
Co-authored-by: Andronik Ordian <write@reusable.software>
Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
5 people authored Mar 27, 2020
1 parent 7093b7e commit dd89c9a
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 6 deletions.
2 changes: 1 addition & 1 deletion kvdb-memorydb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ parking_lot = "0.10.0"
kvdb = { version = "0.5", path = "../kvdb" }

[dev-dependencies]
kvdb-shared-tests = { path = "../kvdb-shared-tests", version = "0.2" }
kvdb-shared-tests = { path = "../kvdb-shared-tests", version = "0.3" }
21 changes: 21 additions & 0 deletions kvdb-memorydb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,21 @@ impl KeyValueDB for InMemory {
col.remove(&*key);
}
}
DBOp::DeletePrefix { col, prefix } => {
if let Some(col) = columns.get_mut(&col) {
use std::ops::Bound;
if prefix.is_empty() {
col.clear();
} else {
let start_range = Bound::Included(prefix.to_vec());
let end_range = Bound::Excluded(kvdb::end_prefix(&prefix[..]));
let keys: Vec<_> = col.range((start_range, end_range)).map(|(k, _)| k.clone()).collect();
for key in keys.into_iter() {
col.remove(&key[..]);
}
}
}
}
}
}
Ok(())
Expand Down Expand Up @@ -127,6 +142,12 @@ mod tests {
st::test_delete_and_get(&db)
}

#[test]
fn delete_prefix() -> io::Result<()> {
let db = create(st::DELETE_PREFIX_NUM_COLUMNS);
st::test_delete_prefix(&db)
}

#[test]
fn iter() -> io::Result<()> {
let db = create(1);
Expand Down
2 changes: 1 addition & 1 deletion kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ parity-util-mem = { path = "../parity-util-mem", version = "0.6", default-featur
alloc_counter = "0.0.4"
criterion = "0.3"
ethereum-types = { path = "../ethereum-types" }
kvdb-shared-tests = { path = "../kvdb-shared-tests", version = "0.2" }
kvdb-shared-tests = { path = "../kvdb-shared-tests", version = "0.3" }
rand = "0.7.2"
tempdir = "0.3.7"
keccak-hash = { path = "../keccak-hash" }
Expand Down
19 changes: 18 additions & 1 deletion kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,17 @@ impl Database {
stats_total_bytes += key.len();
batch.delete_cf(cf, &key).map_err(other_io_err)?
}
DBOp::DeletePrefix { col: _, prefix } => {
if prefix.len() > 0 {
let end_range = kvdb::end_prefix(&prefix[..]);
batch.delete_range_cf(cf, &prefix[..], &end_range[..]).map_err(other_io_err)?;
} else {
// Deletes all values in the column.
let end_range = &[u8::max_value()];
batch.delete_range_cf(cf, &prefix[..], &end_range[..]).map_err(other_io_err)?;
batch.delete_cf(cf, &end_range[..]).map_err(other_io_err)?;
}
}
};
}
self.stats.tally_bytes_written(stats_total_bytes as u64);
Expand Down Expand Up @@ -705,6 +716,12 @@ mod tests {
st::test_delete_and_get(&db)
}

#[test]
fn delete_prefix() -> io::Result<()> {
let db = create(st::DELETE_PREFIX_NUM_COLUMNS)?;
st::test_delete_prefix(&db)
}

#[test]
fn iter() -> io::Result<()> {
let db = create(1)?;
Expand All @@ -725,7 +742,7 @@ mod tests {

#[test]
fn stats() -> io::Result<()> {
let db = create(3)?;
let db = create(st::IOSTATS_NUM_COLUMNS)?;
st::test_io_stats(&db)
}

Expand Down
2 changes: 1 addition & 1 deletion kvdb-shared-tests/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kvdb-shared-tests"
version = "0.2.0"
version = "0.3.0"
authors = ["Parity Technologies <admin@parity.io>"]
edition = "2018"
description = "Shared tests for kvdb functionality, to be executed against actual implementations"
Expand Down
60 changes: 60 additions & 0 deletions kvdb-shared-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ pub fn test_iter_from_prefix(db: &dyn KeyValueDB) -> io::Result<()> {
Ok(())
}

/// The number of columns required to run `test_io_stats`.
pub const IOSTATS_NUM_COLUMNS: u32 = 3;

/// A test for `KeyValueDB::io_stats`.
/// Assumes that the `db` has at least 3 columns.
pub fn test_io_stats(db: &dyn KeyValueDB) -> io::Result<()> {
Expand Down Expand Up @@ -171,6 +174,63 @@ pub fn test_io_stats(db: &dyn KeyValueDB) -> io::Result<()> {
Ok(())
}

/// The number of columns required to run `test_delete_prefix`.
pub const DELETE_PREFIX_NUM_COLUMNS: u32 = 5;

/// A test for `KeyValueDB::delete_prefix`.
pub fn test_delete_prefix(db: &dyn KeyValueDB) -> io::Result<()> {
let keys = [
&[][..],
&[0u8][..],
&[0, 1][..],
&[1][..],
&[1, 0][..],
&[1, 255][..],
&[1, 255, 255][..],
&[2][..],
&[2, 0][..],
&[2, 255][..],
];
let init_db = |ix: u32| -> io::Result<()> {
let mut batch = db.transaction();
for (i, key) in keys.iter().enumerate() {
batch.put(ix, key, &[i as u8]);
}
db.write(batch)?;
Ok(())
};
let check_db = |ix: u32, content: [bool; 10]| -> io::Result<()> {
let mut state = [true; 10];
for (c, key) in keys.iter().enumerate() {
state[c] = db.get(ix, key)?.is_some();
}
assert_eq!(state, content, "at {}", ix);
Ok(())
};
let tests: [_; DELETE_PREFIX_NUM_COLUMNS as usize] = [
// standard
(&[1u8][..], [true, true, true, false, false, false, false, true, true, true]),
// edge
(&[1u8, 255, 255][..], [true, true, true, true, true, true, false, true, true, true]),
// none 1
(&[1, 2][..], [true, true, true, true, true, true, true, true, true, true]),
// none 2
(&[8][..], [true, true, true, true, true, true, true, true, true, true]),
// all
(&[][..], [false, false, false, false, false, false, false, false, false, false]),
];
for (ix, test) in tests.iter().enumerate() {
let ix = ix as u32;
init_db(ix)?;
let mut batch = db.transaction();
batch.delete_prefix(ix, test.0);
db.write(batch)?;
check_db(ix, test.1)?;
}

Ok(())
}

/// A complex test.
pub fn test_complex(db: &dyn KeyValueDB) -> io::Result<()> {
let key1 = b"02c69be41d0b7e40352fc85be1cd65eb03d40ef8427a0ca4596b1ead9a00e9fc";
Expand Down
3 changes: 2 additions & 1 deletion kvdb-web/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ features = [
'EventTarget',
'IdbCursor',
'IdbCursorWithValue',
'IdbKeyRange',
'DomStringList',
]

[dev-dependencies]
console_log = "0.1.2"
kvdb-shared-tests = { path = "../kvdb-shared-tests", version = "0.2" }
kvdb-shared-tests = { path = "../kvdb-shared-tests", version = "0.3" }
wasm-bindgen-test = "0.3.4"
wasm-bindgen-futures = "0.4.4"
15 changes: 14 additions & 1 deletion kvdb-web/src/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

use js_sys::{Array, ArrayBuffer, Uint8Array};
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::{Event, IdbCursorWithValue, IdbDatabase, IdbOpenDbRequest, IdbRequest, IdbTransactionMode};
use web_sys::{Event, IdbCursorWithValue, IdbDatabase, IdbKeyRange, IdbOpenDbRequest, IdbRequest, IdbTransactionMode};

use futures::channel;
use futures::prelude::*;
Expand Down Expand Up @@ -157,6 +157,19 @@ pub fn idb_commit_transaction(idb: &IdbDatabase, txn: &DBTransaction, columns: u
warn!("error deleting key from col_{}: {:?}", column, err);
}
}
DBOp::DeletePrefix { col, prefix } => {
let column = *col as usize;
// Convert rust bytes to js arrays
let prefix_js_start = Uint8Array::from(prefix.as_ref());
let prefix_js_end = Uint8Array::from(prefix.as_ref());

let range = IdbKeyRange::bound(prefix_js_start.as_ref(), prefix_js_end.as_ref())
.expect("Starting and ending at same value is valid bound; qed");
let res = object_stores[column].delete(range.as_ref());
if let Err(err) = res {
warn!("error deleting prefix from col_{}: {:?}", column, err);
}
}
}
}

Expand Down
6 changes: 6 additions & 0 deletions kvdb-web/tests/indexed_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ async fn delete_and_get() {
st::test_delete_and_get(&db).unwrap()
}

#[wasm_bindgen_test]
async fn delete_prefix() {
let db = open_db(st::DELETE_PREFIX_NUM_COLUMNS, "delete_prefix").await;
st::test_delete_prefix(&db).unwrap()
}

#[wasm_bindgen_test]
async fn iter() {
let db = open_db(1, "iter").await;
Expand Down
44 changes: 44 additions & 0 deletions kvdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub struct DBTransaction {
pub enum DBOp {
Insert { col: u32, key: DBKey, value: DBValue },
Delete { col: u32, key: DBKey },
DeletePrefix { col: u32, prefix: DBKey },
}

impl DBOp {
Expand All @@ -43,6 +44,7 @@ impl DBOp {
match *self {
DBOp::Insert { ref key, .. } => key,
DBOp::Delete { ref key, .. } => key,
DBOp::DeletePrefix { ref prefix, .. } => prefix,
}
}

Expand All @@ -51,6 +53,7 @@ impl DBOp {
match *self {
DBOp::Insert { col, .. } => col,
DBOp::Delete { col, .. } => col,
DBOp::DeletePrefix { col, .. } => col,
}
}
}
Expand Down Expand Up @@ -80,6 +83,13 @@ impl DBTransaction {
pub fn delete(&mut self, col: u32, key: &[u8]) {
self.ops.push(DBOp::Delete { col, key: DBKey::from_slice(key) });
}

/// Delete all values with the given key prefix.
/// Using an empty prefix here will remove all keys
/// (all keys starts with the empty prefix).
pub fn delete_prefix(&mut self, col: u32, prefix: &[u8]) {
self.ops.push(DBOp::DeletePrefix { col, prefix: DBKey::from_slice(prefix) });
}
}

/// Generic key-value database.
Expand Down Expand Up @@ -129,3 +139,37 @@ pub trait KeyValueDB: Sync + Send + parity_util_mem::MallocSizeOf {
IoStats::empty()
}
}

/// For a given start prefix (inclusive), returns the correct end prefix (non-inclusive).
/// This assumes the key bytes are ordered in lexicographical order.
pub fn end_prefix(prefix: &[u8]) -> Vec<u8> {
let mut end_range = prefix.to_vec();
while let Some(0xff) = end_range.last() {
end_range.pop();
}
if let Some(byte) = end_range.last_mut() {
*byte += 1;
}
end_range
}

#[cfg(test)]
mod test {
use super::end_prefix;

#[test]
fn end_prefix_test() {
assert_eq!(end_prefix(&[5, 6, 7]), vec![5, 6, 8]);
assert_eq!(end_prefix(&[5, 6, 255]), vec![5, 7]);
// This is not equal as the result is before start.
assert_ne!(end_prefix(&[5, 255, 255]), vec![5, 255]);
// This is equal ([5, 255] will not be deleted because
// it is before start).
assert_eq!(end_prefix(&[5, 255, 255]), vec![6]);
assert_eq!(end_prefix(&[255, 255, 255]), vec![]);

assert_eq!(end_prefix(&[0x00, 0xff]), vec![0x01]);
assert_eq!(end_prefix(&[0xff]), vec![]);
assert_eq!(end_prefix(&[]), vec![]);
}
}

0 comments on commit dd89c9a

Please sign in to comment.