Skip to content

Commit

Permalink
skiplist: doubly list [20x faster reverse scan] (#163)
Browse files Browse the repository at this point in the history
* skiplist: doubly list

Signed-off-by: zhangjinpeng1987 <zhangjinpeng@pingcap.com>
  • Loading branch information
zhangjinpeng87 authored Jun 24, 2022
1 parent f7cd04a commit 94fd3a8
Show file tree
Hide file tree
Showing 3 changed files with 108 additions and 28 deletions.
42 changes: 41 additions & 1 deletion skiplist/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,10 +163,50 @@ fn bench_write_skiplist(c: &mut Criterion) {
j.join().unwrap();
}

fn bench_reverse_scan_skiplist(c: &mut Criterion) {
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, 16 << 20, true);
let value = Bytes::from_static(b"0123456789abcdefghijklmkopqrstuvwxyz");
let mut rng = rand::thread_rng();
for _ in 0..10000 {
list.put(random_key(&mut rng), value.clone());
}
c.bench_function("skiplist_reverse_scan", |b| {
b.iter(|| {
let mut iter = list.iter_ref();
iter.seek_to_last();
while iter.valid() {
iter.prev();
}
})
});
}

fn bench_reverse_scan_doubly_skiplist(c: &mut Criterion) {
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, 16 << 20, false);
let value = Bytes::from_static(b"0123456789abcdefghijklmkopqrstuvwxyz");
let mut rng = rand::thread_rng();
for _ in 0..10000 {
list.put(random_key(&mut rng), value.clone());
}
c.bench_function("doubly_skiplist_reverse_scan", |b| {
b.iter(|| {
let mut iter = list.iter_ref();
iter.seek_to_last();
while iter.valid() {
iter.prev();
}
})
});
}

criterion_group!(
benches,
bench_reverse_scan_skiplist,
bench_reverse_scan_doubly_skiplist,
bench_read_write_skiplist,
bench_read_write_map,
bench_write_skiplist
bench_write_skiplist,
);
criterion_main!(benches);
44 changes: 35 additions & 9 deletions skiplist/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ pub struct Node {
key: Bytes,
value: Bytes,
height: usize,
// PrevList for fast reverse scan.
prev: AtomicUsize,
tower: [AtomicUsize; MAX_HEIGHT],
}

Expand Down Expand Up @@ -283,6 +285,17 @@ impl<C: KeyComparator> Skiplist<C> {
next[i] = n;
assert_ne!(p, n);
}
// Construct the PrevList for level 0.
if i == 0 {
let prev_offset = self.inner.arena.offset(prev[0]);
x.prev.store(prev_offset, Ordering::Relaxed);
if !next[i].is_null() {
unsafe { &*next[i] }
.prev
.store(node_offset, Ordering::Release);
}
}
// Construct the NextList for level i.
let next_offset = self.inner.arena.offset(next[i]);
x.tower[i].store(next_offset, Ordering::Relaxed);
unsafe { &*prev[i] }.tower[i].store(node_offset, Ordering::Release);
Expand Down Expand Up @@ -432,8 +445,20 @@ impl<T: AsRef<Skiplist<C>>, C: KeyComparator> IterRef<T, C> {

pub fn prev(&mut self) {
assert!(self.valid());
unsafe {
self.cursor = self.list.as_ref().find_near(self.key(), true, false);
if self.list.as_ref().allow_concurrent_write {
unsafe {
self.cursor = self.list.as_ref().find_near(self.key(), true, false);
}
} else {
unsafe {
let prev_offset = (*self.cursor).prev.load(Ordering::Acquire);
let node = self.list.as_ref().inner.arena.get_mut(prev_offset);
if node != self.list.as_ref().inner.head.as_ptr() {
self.cursor = node;
} else {
self.cursor = ptr::null();
}
}
}
}

Expand Down Expand Up @@ -466,18 +491,19 @@ mod tests {
use super::*;
use crate::FixedLengthSuffixComparator;

const ARENA_SIZE: usize = 1 << 20;

fn with_skl_test(
allow_concurrent_write: bool,
capacity: usize,
f: impl FnOnce(Skiplist<FixedLengthSuffixComparator>),
) {
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, capacity, allow_concurrent_write);
let list = Skiplist::with_capacity(comp, ARENA_SIZE, allow_concurrent_write);
f(list);
}

fn test_find_near_imp(allow_concurrent_write: bool, capacity: usize) {
with_skl_test(allow_concurrent_write, capacity, |list| {
fn test_find_near_imp(allow_concurrent_write: bool) {
with_skl_test(allow_concurrent_write, |list| {
for i in 0..1000 {
let key = Bytes::from(format!("{:05}{:08}", i * 10 + 5, 0));
let value = Bytes::from(format!("{:05}", i));
Expand Down Expand Up @@ -523,8 +549,8 @@ mod tests {
}

#[test]
fn test_skl_basic() {
test_find_near_imp(true, 1 << 20);
test_find_near_imp(false, 1 << 20);
fn test_skl_find_near() {
test_find_near_imp(true);
test_find_near_imp(false);
}
}
50 changes: 32 additions & 18 deletions skiplist/tests/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,25 +191,39 @@ fn test_iterator_next() {

#[test]
fn test_iterator_prev() {
let n = 100;
test_iterator_prev_imp(true);
test_iterator_prev_imp(false);
}

fn with_skl_test(
allow_concurrent_write: bool,
f: impl FnOnce(Skiplist<FixedLengthSuffixComparator>),
) {
let comp = FixedLengthSuffixComparator::new(8);
let list = Skiplist::with_capacity(comp, ARENA_SIZE, true);
let mut iter_ref = list.iter_ref();
assert!(!iter_ref.valid());
iter_ref.seek_to_last();
assert!(!iter_ref.valid());
for i in (0..n).rev() {
let key = key_with_ts(format!("{:05}", i).as_str(), 0);
list.put(key, new_value(i));
}
iter_ref.seek_to_last();
for i in (0..n).rev() {
assert!(iter_ref.valid());
let v = iter_ref.value();
assert_eq!(*v, new_value(i));
iter_ref.prev();
}
assert!(!iter_ref.valid());
let list = Skiplist::with_capacity(comp, ARENA_SIZE, allow_concurrent_write);
f(list);
}

fn test_iterator_prev_imp(allow_concurrent_write: bool) {
with_skl_test(allow_concurrent_write, |list| {
let n = 100;
let mut iter_ref = list.iter_ref();
assert!(!iter_ref.valid());
iter_ref.seek_to_last();
assert!(!iter_ref.valid());
for i in (0..n).rev() {
let key = key_with_ts(format!("{:05}", i).as_str(), 0);
list.put(key, new_value(i));
}
iter_ref.seek_to_last();
for i in (0..n).rev() {
assert!(iter_ref.valid());
let v = iter_ref.value();
assert_eq!(*v, new_value(i));
iter_ref.prev();
}
assert!(!iter_ref.valid());
});
}

#[test]
Expand Down

0 comments on commit 94fd3a8

Please sign in to comment.