diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 55e7648c..43174765 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -174,9 +174,18 @@ jobs: working-directory: integration # Run with valgrind - - name: Run valgrind test-fixed - run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/test-fixed + - name: Run valgrind test-vec + run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/test-vec working-directory: integration + + - name: Run valgrind test-mmap + run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/test-mmap + working-directory: integration + + - name: Run valgrind test-mmap-anon + run: valgrind --error-exitcode=1 --leak-check=full --show-leak-kinds=all ./target/debug/test-mmap-anon + working-directory: integration + miri: name: miri diff --git a/Cargo.toml b/Cargo.toml index f318fb52..56478f4f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "skl" -version = "0.3.1" +version = "0.4.0" edition = "2021" repository = "https://github.com/al8n/skl-rs" description = "A lock-free thread-safe concurrent ARENA based skiplist implementation which helps develop MVCC memtable for LSM-Tree. Inspired by Dgraph's badger https://github.com/dgraph-io/badger/tree/main/skl." @@ -22,6 +22,7 @@ name = "loom" [features] default = ["std"] alloc = [] +mmap = ["memmapix", "fs4", "std"] std = ["bytes/std", "rand/std", "rand/std_rng", "inline_more"] inline_more = [] js = ["getrandom/js"] @@ -30,9 +31,11 @@ js = ["getrandom/js"] loom = "0.7" [dependencies] -bytes = { version = "1.4", default-features = false } +bytes = { version = "1.5", default-features = false } crossbeam-utils = { version = "0.8", default-features = false } +fs4 = { version = "0.6", optional = true } getrandom = { version = "0.2", optional = true } +memmapix = { version = "0.7", optional = true } rand = { version = "0.8", default-features = false, features = ["getrandom"] } viewit = "0.1.5" @@ -40,6 +43,7 @@ viewit = "0.1.5" criterion = "0.5" tempfile = "3" parking_lot = "0.12" +paste = "1" [profile.bench] opt-level = 3 diff --git a/README.md b/README.md index 28c772aa..97880ab3 100644 --- a/README.md +++ b/README.md @@ -29,12 +29,12 @@ skl = "0.3" ## Example ```rust -use skl::Skiplist; +use skl::SkipMap; use std::sync::Arc; fn main() { const N: usize = 1000; - let l = Arc::new(Skiplist::new(1 << 20)); + let l = Arc::new(SkipMap::new(1 << 20)); let wg = Arc::new(()); for i in 0..N { let w = wg.clone(); @@ -97,6 +97,13 @@ fn main() { - [x] make the crate test cases pass `cargo miri` - [ ] make the crate test cases pass `cargo loom` +- [ ] Implement + - [ ] `std::iter::Iterator` + - [ ] `get_or_insert` + - [ ] `remove` + - [ ] `contains` + - [ ] change signature from `insert(k, v)` => `insert(k, v) -> Option` + - [ ] mmap backend (currently is vector backend) #### License diff --git a/benches/bench.rs b/benches/bench.rs index 5bf1dbae..bafcb576 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -21,7 +21,7 @@ fn fixed_map_round(l: Arc>>, case: &(Key, bool), exp: } } -fn fixed_skiplist_round(l: &Skiplist, case: &(Key, bool), exp: &Value) { +fn fixed_skiplist_round(l: &SkipMap, case: &(Key, bool), exp: &Value) { if case.1 { if let Some(v) = l.get(case.0.as_key_ref()) { assert_eq!(v.value(), exp.value()); @@ -48,7 +48,7 @@ fn random_key(rng: &mut ThreadRng) -> Key { fn bench_read_write_fixed_skiplist_frac(b: &mut Bencher<'_>, frac: &usize) { let frac = *frac; let value = Value::from(Bytes::from_static(b"00123")); - let list = Arc::new(Skiplist::new(512 << 20)); + let list = Arc::new(SkipMap::new(512 << 20)); let l = list.clone(); let stop = Arc::new(AtomicBool::new(false)); let s = stop.clone(); @@ -170,7 +170,7 @@ fn bench_write_fixed_map(c: &mut Criterion) { } fn bench_write_fixed_skiplist(c: &mut Criterion) { - let list = Arc::new(Skiplist::new(512 << 21)); + let list = Arc::new(SkipMap::new(512 << 21)); let value = Value::from(Bytes::from_static(b"00123")); let l = list.clone(); let stop = Arc::new(AtomicBool::new(false)); diff --git a/ci/miri.sh b/ci/miri.sh index 321c9a0a..d70ad845 100755 --- a/ci/miri.sh +++ b/ci/miri.sh @@ -5,7 +5,7 @@ rustup toolchain install nightly --component miri rustup override set nightly cargo miri setup -export MIRIFLAGS="-Zmiri-strict-provenance -Zmiri-disable-isolation" +export MIRIFLAGS="-Zmiri-strict-provenance -Zmiri-disable-isolation -Zmiri-symbolic-alignment-check" cargo miri test --all-features --target x86_64-unknown-linux-gnu cargo miri test --all-features --target aarch64-unknown-linux-gnu diff --git a/integration/Cargo.toml b/integration/Cargo.toml index 685bc76c..b3774dea 100644 --- a/integration/Cargo.toml +++ b/integration/Cargo.toml @@ -8,4 +8,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -skl = { path = "../"} +skl = { path = "../", features = ["mmap"] } +tempfile = "3" \ No newline at end of file diff --git a/integration/src/bin/test-fixed.rs b/integration/src/bin/test-fixed.rs deleted file mode 100644 index 7da91909..00000000 --- a/integration/src/bin/test-fixed.rs +++ /dev/null @@ -1,54 +0,0 @@ -use integration::{big_value, key, new_value}; -use skl::*; -use std::sync::Arc; - -fn main() { - { - const N: usize = 1000; - let l = Arc::new(Skiplist::new(1 << 20)); - for i in 0..N { - let l = l.clone(); - std::thread::spawn(move || { - l.insert(key(i), new_value(i)); - drop(l); - }); - } - while Arc::strong_count(&l) > 1 {} - for i in 0..N { - let l = l.clone(); - std::thread::spawn(move || { - assert_eq!( - l.get(key(i).as_key_ref()).unwrap(), - new_value(i).as_value_ref(), - "broken: {i}" - ); - drop(l); - }); - } - while Arc::strong_count(&l) > 1 {} - } - - { - const N2: usize = 100; - let l = Arc::new(Skiplist::new(120 << 20)); - for i in 0..N2 { - let l = l.clone(); - std::thread::spawn(move || { - l.insert(key(i), big_value(i)); - }); - } - while Arc::strong_count(&l) > 1 {} - assert_eq!(N2, l.len()); - for i in 0..N2 { - let l = l.clone(); - std::thread::spawn(move || { - assert_eq!( - l.get(key(i).as_key_ref()).unwrap(), - big_value(i).as_value_ref(), - "broken: {i}" - ); - }); - } - while Arc::strong_count(&l) > 1 {} - } -} diff --git a/integration/src/bin/test-mmap-anon.rs b/integration/src/bin/test-mmap-anon.rs new file mode 100644 index 00000000..abe7ea61 --- /dev/null +++ b/integration/src/bin/test-mmap-anon.rs @@ -0,0 +1,54 @@ +use integration::{big_value, key, new_value}; +use skl::*; +use std::sync::Arc; + +fn main() { + { + const N: usize = 1000; + let l = Arc::new(SkipMap::mmap_anon(1 << 20)); + for i in 0..N { + let l = l.clone(); + std::thread::spawn(move || { + l.insert(key(i), new_value(i)); + drop(l); + }); + } + while Arc::strong_count(&l) > 1 {} + for i in 0..N { + let l = l.clone(); + std::thread::spawn(move || { + assert_eq!( + l.get(key(i).as_key_ref()).unwrap(), + new_value(i).as_value_ref(), + "broken: {i}" + ); + drop(l); + }); + } + while Arc::strong_count(&l) > 1 {} + } + + { + const N2: usize = 100; + let l = Arc::new(SkipMap::mmap_anon(120 << 20)); + for i in 0..N2 { + let l = l.clone(); + std::thread::spawn(move || { + l.insert(key(i), big_value(i)); + }); + } + while Arc::strong_count(&l) > 1 {} + assert_eq!(N2, l.len()); + for i in 0..N2 { + let l = l.clone(); + std::thread::spawn(move || { + assert_eq!( + l.get(key(i).as_key_ref()).unwrap(), + big_value(i).as_value_ref(), + "broken: {i}" + ); + }); + } + while Arc::strong_count(&l) > 1 {} + } +} diff --git a/integration/src/bin/test-mmap.rs b/integration/src/bin/test-mmap.rs new file mode 100644 index 00000000..f7c2ae67 --- /dev/null +++ b/integration/src/bin/test-mmap.rs @@ -0,0 +1,54 @@ +use integration::{big_value, key, new_value}; +use skl::*; +use std::sync::Arc; + +fn main() { + { + const N: usize = 1000; + let l = Arc::new(SkipMap::mmap(1 << 20, tempfile::tempfile().unwrap(), true)); + for i in 0..N { + let l = l.clone(); + std::thread::spawn(move || { + l.insert(key(i), new_value(i)); + drop(l); + }); + } + while Arc::strong_count(&l) > 1 {} + for i in 0..N { + let l = l.clone(); + std::thread::spawn(move || { + assert_eq!( + l.get(key(i).as_key_ref()).unwrap(), + new_value(i).as_value_ref(), + "broken: {i}" + ); + drop(l); + }); + } + while Arc::strong_count(&l) > 1 {} + } + + { + const N2: usize = 100; + let l = Arc::new(SkipMap::mmap(120 << 20, tempfile::tempfile().unwrap(), true)); + for i in 0..N2 { + let l = l.clone(); + std::thread::spawn(move || { + l.insert(key(i), big_value(i)); + }); + } + while Arc::strong_count(&l) > 1 {} + assert_eq!(N2, l.len()); + for i in 0..N2 { + let l = l.clone(); + std::thread::spawn(move || { + assert_eq!( + l.get(key(i).as_key_ref()).unwrap(), + big_value(i).as_value_ref(), + "broken: {i}" + ); + }); + } + while Arc::strong_count(&l) > 1 {} + } +} diff --git a/integration/src/bin/test-vec.rs b/integration/src/bin/test-vec.rs new file mode 100644 index 00000000..6998cce5 --- /dev/null +++ b/integration/src/bin/test-vec.rs @@ -0,0 +1,54 @@ +use integration::{big_value, key, new_value}; +use skl::*; +use std::sync::Arc; + +fn main() { + { + const N: usize = 1000; + let l = Arc::new(SkipMap::new(1 << 20)); + for i in 0..N { + let l = l.clone(); + std::thread::spawn(move || { + l.insert(key(i), new_value(i)); + drop(l); + }); + } + while Arc::strong_count(&l) > 1 {} + for i in 0..N { + let l = l.clone(); + std::thread::spawn(move || { + assert_eq!( + l.get(key(i).as_key_ref()).unwrap(), + new_value(i).as_value_ref(), + "broken: {i}" + ); + drop(l); + }); + } + while Arc::strong_count(&l) > 1 {} + } + + { + const N2: usize = 100; + let l = Arc::new(SkipMap::new(120 << 20)); + for i in 0..N2 { + let l = l.clone(); + std::thread::spawn(move || { + l.insert(key(i), big_value(i)); + }); + } + while Arc::strong_count(&l) > 1 {} + assert_eq!(N2, l.len()); + for i in 0..N2 { + let l = l.clone(); + std::thread::spawn(move || { + assert_eq!( + l.get(key(i).as_key_ref()).unwrap(), + big_value(i).as_value_ref(), + "broken: {i}" + ); + }); + } + while Arc::strong_count(&l) > 1 {} + } +} diff --git a/src/lib.rs b/src/lib.rs index 62f02a5c..1a663b95 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,7 +12,7 @@ extern crate alloc; extern crate std; mod skl; -pub use crate::skl::{Skiplist, SkiplistIterator, UniSkiplistIterator}; +pub use crate::skl::{SkipMap, SkipMapIterator, UniSkipMapIterator}; mod value; pub use value::*; diff --git a/src/skl.rs b/src/skl.rs index 22532237..40778944 100644 --- a/src/skl.rs +++ b/src/skl.rs @@ -99,14 +99,14 @@ impl Node { /// Fixed size lock-free ARENA based skiplist. #[derive(Debug)] -pub struct Skiplist { +pub struct SkipMap { // Current height. 1 <= height <= kMaxHeight. CAS. height: CachePadded, head_offset: u32, arena: Arena, } -impl Skiplist { +impl SkipMap { #[inline] fn get_head(&self) -> (*const Node, u32) { let (ptr, offset) = self.arena.get_node(self.head_offset); @@ -276,13 +276,29 @@ impl Skiplist { } } -impl Skiplist { +impl SkipMap { /// Create a new skiplist according to the given capacity /// /// **Note:** The capacity stands for how many memory allocated, /// it does not mean the skiplist can store `cap` entries. + /// + /// + /// + /// **What the difference between this method and [`SkipMap::mmap_anon`]?** + /// + /// 1. This method will use an `AlignedVec` ensures we are working within Rust's memory safety guarantees. + /// Even if we are working with raw pointers with `Box::into_raw`, + /// the backend ARENA will reclaim the ownership of this memory by converting it back to a `Box` + /// when dropping the backend ARENA. Since `AlignedVec` uses heap memory, the data might be more cache-friendly, + /// especially if you're frequently accessing or modifying it. + /// + /// 2. Where as [`SkipMap::mmap_anon`] will use mmap anonymous to require memory from the OS. + /// If you require very large contiguous memory regions, `mmap` might be more suitable because + /// it's more direct in requesting large chunks of memory from the OS. + /// + /// [`SkipMap::mmap_anon`]: #method.mmap_anon pub fn new(cap: usize) -> Self { - let arena = Arena::new(cap); + let arena = Arena::new_vec(cap); let (head, _) = arena.new_node( Key::new().as_key_ref(), Value::new().as_value_ref(), @@ -296,6 +312,61 @@ impl Skiplist { } } + /// Create a new skipmap according to the given capacity, and mmaped to a file. + /// + /// **Note:** The capacity stands for how many memory mmaped, + /// it does not mean the skipmap can store `cap` entries. + /// + /// `lock`: whether to lock the underlying file or not + #[cfg(feature = "mmap")] + #[cfg_attr(docsrs, doc(cfg(feature = "mmap")))] + pub fn mmap(cap: usize, file: std::fs::File, lock: bool) -> std::io::Result { + let arena = Arena::new_mmap(cap, file, lock)?; + let (head, _) = arena.new_node( + Key::new().as_key_ref(), + Value::new().as_value_ref(), + Node::MAX_HEIGHT, + ); + let ho = arena.get_node_offset(head); + Ok(Self { + height: CachePadded::new(AtomicU32::new(1)), + arena, + head_offset: ho, + }) + } + + /// Create a new skipmap according to the given capacity, and mmap anon. + /// + /// **What the difference between this method and [`SkipMap::new`]?** + /// + /// 1. This method will use mmap anonymous to require memory from the OS directly. + /// If you require very large contiguous memory regions, this method might be more suitable because + /// it's more direct in requesting large chunks of memory from the OS. + /// + /// 2. Where as [`SkipMap::new`] will use an `AlignedVec` ensures we are working within Rust's memory safety guarantees. + /// Even if we are working with raw pointers with `Box::into_raw`, + /// the backend ARENA will reclaim the ownership of this memory by converting it back to a `Box` + /// when dropping the backend ARENA. Since `AlignedVec` uses heap memory, the data might be more cache-friendly, + /// especially if you're frequently accessing or modifying it. + /// + /// [`SkipMap::new`]: #method.new + #[cfg(feature = "mmap")] + #[cfg_attr(docsrs, doc(cfg(feature = "mmap")))] + pub fn mmap_anon(cap: usize) -> std::io::Result { + let arena = Arena::new_anonymous_mmap(cap)?; + let (head, _) = arena.new_node( + Key::new().as_key_ref(), + Value::new().as_value_ref(), + Node::MAX_HEIGHT, + ); + let ho = arena.get_node_offset(head); + Ok(Self { + height: CachePadded::new(AtomicU32::new(1)), + arena, + head_offset: ho, + }) + } + /// Inserts the key-value pair. pub fn insert(&self, key: Key, val: Value) { let key_ref = key.as_key_ref(); @@ -426,15 +497,15 @@ impl Skiplist { /// Returns a skiplist iterator. #[inline] - pub fn iter(&self) -> SkiplistIterator<'_> { - SkiplistIterator { + pub fn iter(&self) -> SkipMapIterator<'_> { + SkipMapIterator { skl: self, curr: ptr::null(), curr_tower_offset: 0, } } - /// Returns if the Skiplist is empty + /// Returns if the SkipMap is empty #[inline] pub fn is_empty(&self) -> bool { self.find_last().is_null() @@ -460,16 +531,16 @@ impl Skiplist { } } -/// SkiplistIterator is an iterator over skiplist object. For new objects, you just -/// need to initialize SkiplistIterator.list. +/// SkipMapIterator is an iterator over skiplist object. For new objects, you just +/// need to initialize SkipMapIterator.list. #[derive(Copy, Clone, Debug)] -pub struct SkiplistIterator<'a> { - skl: &'a Skiplist, +pub struct SkipMapIterator<'a> { + skl: &'a SkipMap, curr: *const Node, curr_tower_offset: u32, } -impl<'a> SkiplistIterator<'a> { +impl<'a> SkipMapIterator<'a> { /// Key returns the key at the current position. #[inline] pub fn key<'b: 'a>(&'a self) -> KeyRef<'b> { @@ -550,12 +621,12 @@ impl<'a> SkiplistIterator<'a> { /// Iterator. We like to keep Iterator as before, because it is more powerful and /// we might support bidirectional iterators in the future. #[derive(Copy, Clone, Debug)] -pub struct UniSkiplistIterator<'a> { - iter: SkiplistIterator<'a>, +pub struct UniSkipMapIterator<'a> { + iter: SkipMapIterator<'a>, reversed: bool, } -impl<'a> UniSkiplistIterator<'a> { +impl<'a> UniSkipMapIterator<'a> { #[inline] pub fn next(&mut self) { if !self.reversed { diff --git a/src/skl/arena.rs b/src/skl/arena.rs index 1c78ba52..6dc959df 100644 --- a/src/skl/arena.rs +++ b/src/skl/arena.rs @@ -1,126 +1,18 @@ use super::Node; use crate::{ key::{KeyRef, TIMESTAMP_SIZE}, - sync::{AtomicMut, AtomicPtr, AtomicU32, AtomicU64, AtomicUsize, Ordering}, + sync::{AtomicMut, AtomicPtr, AtomicU64, Ordering}, value::ValueRef, }; -use ::alloc::{alloc, boxed::Box}; +use ::alloc::boxed::Box; use core::{ mem, - ops::{Index, IndexMut}, ptr::{self, NonNull}, slice, }; -#[derive(Debug)] -struct AlignedVec { - ptr: ptr::NonNull, - cap: usize, - len: usize, -} - -impl Drop for AlignedVec { - #[inline] - fn drop(&mut self) { - if self.cap != 0 { - unsafe { - alloc::dealloc(self.ptr.as_ptr(), self.layout()); - } - } - } -} - -impl AlignedVec { - const ALIGNMENT: usize = core::mem::align_of::(); - - const MAX_CAPACITY: usize = isize::MAX as usize - (Self::ALIGNMENT - 1); - - #[inline] - fn new(capacity: usize) -> Self { - assert!( - capacity <= Self::MAX_CAPACITY, - "`capacity` cannot exceed isize::MAX - {}", - Self::ALIGNMENT - 1 - ); - let ptr = unsafe { - let layout = alloc::Layout::from_size_align_unchecked(capacity, Self::ALIGNMENT); - let ptr = alloc::alloc(layout); - if ptr.is_null() { - alloc::handle_alloc_error(layout); - } - ptr::NonNull::new_unchecked(ptr) - }; - - unsafe { - core::ptr::write_bytes(ptr.as_ptr(), 0, capacity); - } - Self { - ptr, - cap: capacity, - len: capacity, - } - } - - #[inline] - fn layout(&self) -> alloc::Layout { - unsafe { alloc::Layout::from_size_align_unchecked(self.cap, Self::ALIGNMENT) } - } - - #[inline] - fn as_mut_ptr(&mut self) -> *mut u8 { - self.ptr.as_ptr() - } - - #[inline] - const fn as_slice(&self) -> &[u8] { - unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } - } - - #[inline] - fn as_mut_slice(&mut self) -> &mut [u8] { - unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } - } -} - -impl> Index for AlignedVec { - type Output = >::Output; - - #[inline] - fn index(&self, index: I) -> &Self::Output { - &self.as_slice()[index] - } -} - -impl> IndexMut for AlignedVec { - #[inline] - fn index_mut(&mut self, index: I) -> &mut Self::Output { - &mut self.as_mut_slice()[index] - } -} - -#[derive(Debug)] -#[repr(C)] -struct Shared { - n: AtomicU32, - vec: AlignedVec, - refs: AtomicUsize, -} - -impl Shared { - fn new(cap: usize) -> Self { - let vec = AlignedVec::new(cap); - Self { - vec, - refs: AtomicUsize::new(1), - // Don't store data at position 0 in order to reserve offset=0 as a kind - // of nil pointer. - n: AtomicU32::new(1), - } - } -} - -unsafe impl Send for Shared {} -unsafe impl Sync for Shared {} +mod shared; +use shared::Shared; /// Arena should be lock-free pub(super) struct Arena { @@ -132,22 +24,46 @@ pub(super) struct Arena { impl core::fmt::Debug for Arena { fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result { let inner = self.inner(); - inner.vec.as_slice()[..inner.n.load(Ordering::Acquire) as usize].fmt(f) + inner.as_slice().fmt(f) } } impl Arena { #[inline] - pub(super) fn new(n: usize) -> Self { - let mut inner = Shared::new(n.max(Node::MAX_NODE_SIZE)); - let data_ptr = unsafe { NonNull::new_unchecked(inner.vec.as_mut_ptr()) }; + pub(super) fn new_vec(n: usize) -> Self { + let mut inner = Shared::new_vec(n.max(Node::MAX_NODE_SIZE)); + let data_ptr = unsafe { NonNull::new_unchecked(inner.as_mut_ptr()) }; Self { - cap: inner.vec.cap, + cap: inner.cap(), inner: AtomicPtr::new(Box::into_raw(Box::new(inner)) as _), data_ptr, } } + #[cfg(feature = "mmap")] + #[inline] + pub(super) fn new_mmap(n: usize, file: std::fs::File, lock: bool) -> std::io::Result { + let mut inner = Shared::new_mmaped(n.max(Node::MAX_NODE_SIZE), file, lock)?; + let data_ptr = unsafe { NonNull::new_unchecked(inner.as_mut_ptr()) }; + Ok(Self { + cap: inner.cap(), + inner: AtomicPtr::new(Box::into_raw(Box::new(inner)) as _), + data_ptr, + }) + } + + #[cfg(feature = "mmap")] + #[inline] + pub(super) fn new_anonymous_mmap(n: usize) -> std::io::Result { + let mut inner = Shared::new_mmaped_anon(n.max(Node::MAX_NODE_SIZE))?; + let data_ptr = unsafe { NonNull::new_unchecked(inner.as_mut_ptr()) }; + Ok(Self { + cap: inner.cap(), + inner: AtomicPtr::new(Box::into_raw(Box::new(inner)) as _), + data_ptr, + }) + } + pub(super) fn put_key(&self, key: KeyRef<'_>) -> (u32, bool) { let ttl = key.ttl(); if ttl == 0 { diff --git a/src/skl/arena/shared.rs b/src/skl/arena/shared.rs new file mode 100644 index 00000000..c4155c83 --- /dev/null +++ b/src/skl/arena/shared.rs @@ -0,0 +1,233 @@ +use ::alloc::alloc; + +use core::{ + ops::{Index, IndexMut}, + ptr, slice, +}; + +use crate::{ + skl::Node, + sync::{AtomicU32, AtomicUsize, Ordering}, +}; + +#[derive(Debug)] +struct AlignedVec { + ptr: ptr::NonNull, + cap: usize, + len: usize, +} + +impl Drop for AlignedVec { + #[inline] + fn drop(&mut self) { + if self.cap != 0 { + unsafe { + alloc::dealloc(self.ptr.as_ptr(), self.layout()); + } + } + } +} + +impl AlignedVec { + const ALIGNMENT: usize = core::mem::align_of::(); + + const MAX_CAPACITY: usize = isize::MAX as usize - (Self::ALIGNMENT - 1); + + #[inline] + fn new(capacity: usize) -> Self { + assert!( + capacity <= Self::MAX_CAPACITY, + "`capacity` cannot exceed isize::MAX - {}", + Self::ALIGNMENT - 1 + ); + let ptr = unsafe { + let layout = alloc::Layout::from_size_align_unchecked(capacity, Self::ALIGNMENT); + let ptr = alloc::alloc(layout); + if ptr.is_null() { + alloc::handle_alloc_error(layout); + } + ptr::NonNull::new_unchecked(ptr) + }; + + unsafe { + core::ptr::write_bytes(ptr.as_ptr(), 0, capacity); + } + Self { + ptr, + cap: capacity, + len: capacity, + } + } + + #[inline] + fn layout(&self) -> alloc::Layout { + unsafe { alloc::Layout::from_size_align_unchecked(self.cap, Self::ALIGNMENT) } + } + + #[inline] + fn as_mut_ptr(&mut self) -> *mut u8 { + self.ptr.as_ptr() + } + + #[inline] + const fn as_slice(&self) -> &[u8] { + unsafe { slice::from_raw_parts(self.ptr.as_ptr(), self.len) } + } + + #[inline] + fn as_mut_slice(&mut self) -> &mut [u8] { + unsafe { slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len) } + } +} + +impl> Index for AlignedVec { + type Output = >::Output; + + #[inline] + fn index(&self, index: I) -> &Self::Output { + &self.as_slice()[index] + } +} + +impl> IndexMut for AlignedVec { + #[inline] + fn index_mut(&mut self, index: I) -> &mut Self::Output { + &mut self.as_mut_slice()[index] + } +} + +enum SharedBackend { + Vec(AlignedVec), + #[cfg(feature = "mmap")] + Mmap { + buf: *mut memmapix::MmapMut, + file: std::fs::File, + lock: bool, + }, + #[cfg(feature = "mmap")] + AnonymousMmap(memmapix::MmapMut), +} + +pub(super) struct Shared { + pub(super) n: AtomicU32, + pub(super) refs: AtomicUsize, + cap: usize, + backend: SharedBackend, +} + +impl Shared { + pub(super) fn new_vec(cap: usize) -> Self { + let vec = AlignedVec::new(cap); + Self { + cap: vec.cap, + backend: SharedBackend::Vec(vec), + refs: AtomicUsize::new(1), + // Don't store data at position 0 in order to reserve offset=0 as a kind + // of nil pointer. + n: AtomicU32::new(1), + } + } + + #[cfg(feature = "mmap")] + pub(super) fn new_mmaped(cap: usize, file: std::fs::File, lock: bool) -> std::io::Result { + use fs4::FileExt; + + unsafe { + if lock { + file.lock_exclusive()?; + } + + file.set_len(cap as u64).and_then(|_| { + memmapix::MmapOptions::new() + .len(cap) + .map_mut(&file) + .map(|mmap| { + Self { + cap, + backend: SharedBackend::Mmap { + buf: Box::into_raw(Box::new(mmap)), + file, + lock, + }, + refs: AtomicUsize::new(1), + // Don't store data at position 0 in order to reserve offset=0 as a kind + // of nil pointer. + n: AtomicU32::new(1), + } + }) + }) + } + } + + #[cfg(feature = "mmap")] + pub(super) fn new_mmaped_anon(cap: usize) -> std::io::Result { + memmapix::MmapOptions::new() + .len(cap) + .map_anon() + .map(|mmap| { + Self { + cap, + backend: SharedBackend::AnonymousMmap(mmap), + refs: AtomicUsize::new(1), + // Don't store data at position 0 in order to reserve offset=0 as a kind + // of nil pointer. + n: AtomicU32::new(1), + } + }) + } + + pub(super) fn as_slice(&self) -> &[u8] { + let end = self.n.load(Ordering::Acquire) as usize; + match &self.backend { + SharedBackend::Vec(vec) => &vec.as_slice()[..end], + #[cfg(feature = "mmap")] + SharedBackend::Mmap { buf: mmap, .. } => unsafe { &(&**mmap)[..end] }, + SharedBackend::AnonymousMmap(mmap) => &mmap[..end], + } + } + + pub(super) fn as_mut_ptr(&mut self) -> *mut u8 { + match &mut self.backend { + SharedBackend::Vec(vec) => vec.as_mut_ptr(), + #[cfg(feature = "mmap")] + SharedBackend::Mmap { buf: mmap, .. } => unsafe { (**mmap).as_mut_ptr() }, + SharedBackend::AnonymousMmap(mmap) => mmap.as_mut_ptr(), + } + } + + #[inline] + pub(super) const fn cap(&self) -> usize { + self.cap + } +} + +impl Drop for Shared { + fn drop(&mut self) { + #[cfg(feature = "mmap")] + if let SharedBackend::Mmap { buf, file, lock } = &self.backend { + use fs4::FileExt; + + // Any errors during unmapping/closing are ignored as the only way + // to report them would be through panicking which is highly discouraged + // in Drop impls, c.f. https://github.com/rust-lang/lang-team/issues/97 + + unsafe { + { + let mmap = &**buf; + let _ = mmap.flush(); + } + + // we must trigger the drop of the mmap before truncating the file + drop(Box::from_raw(*buf)); + + // relaxed ordering is enough here as we're in a drop, no one else can + // access this memory anymore. + let size = self.n.load(Ordering::Relaxed); + let _ = file.set_len(size as u64); + if *lock { + let _ = file.unlock(); + } + } + } + } +} diff --git a/src/skl/tests.rs b/src/skl/tests.rs index b9326ed2..fcf29ebc 100644 --- a/src/skl/tests.rs +++ b/src/skl/tests.rs @@ -5,7 +5,7 @@ use bytes::{BufMut, Bytes, BytesMut}; const ARENA_SIZE: usize = 1 << 20; -fn length(s: Arc) -> usize { +fn length(s: Arc) -> usize { let head = s.get_head(); let mut x = s.get_next(head.0, head.1, 0); let mut ctr = 0; @@ -16,9 +16,7 @@ fn length(s: Arc) -> usize { ctr } -#[test] -fn test_basic() { - let l = Arc::new(Skiplist::new(ARENA_SIZE)); +fn test_basic_runner(l: Arc) { let mut v1 = new_value(42); let mut v2 = new_value(52); let mut v3 = new_value(62); @@ -65,7 +63,25 @@ fn test_basic() { assert_eq!(v.meta(), 60); } -fn test_basic_large_testcases_in(l: Arc) { +#[test] +fn test_basic() { + let l = Arc::new(SkipMap::new(ARENA_SIZE)); + test_basic_runner(l); +} + +#[test] +fn test_basic_mmap() { + let l = Arc::new(SkipMap::mmap(ARENA_SIZE, tempfile::tempfile().unwrap(), true).unwrap()); + test_basic_runner(l); +} + +#[test] +fn test_basic_mmap_anon() { + let l = Arc::new(SkipMap::mmap_anon(ARENA_SIZE).unwrap()); + test_basic_runner(l); +} + +fn test_basic_large_testcases_in(l: Arc) { let n = 1000; for i in 0..n { @@ -82,18 +98,45 @@ fn test_basic_large_testcases_in(l: Arc) { #[test] fn test_basic_large_testcases() { - let l = Arc::new(Skiplist::new(ARENA_SIZE)); + let l = Arc::new(SkipMap::new(ARENA_SIZE)); + test_basic_large_testcases_in(l); +} + +#[test] +fn test_basic_large_testcases_mmap() { + let l = Arc::new(SkipMap::mmap(ARENA_SIZE, tempfile::tempfile().unwrap(), true).unwrap()); + test_basic_large_testcases_in(l); +} + +#[test] +fn test_basic_large_testcases_mmap_anon() { + let l = Arc::new(SkipMap::mmap_anon(ARENA_SIZE).unwrap()); test_basic_large_testcases_in(l); } #[test] fn test_concurrent_basic() { + test_concurrent_basic_runner(Arc::new(SkipMap::new(ARENA_SIZE))); +} + +#[test] +fn test_concurrent_basic_mmap() { + test_concurrent_basic_runner(Arc::new( + SkipMap::mmap(ARENA_SIZE, tempfile::tempfile().unwrap(), true).unwrap(), + )); +} + +#[test] +fn test_concurrent_basic_mmap_anon() { + test_concurrent_basic_runner(Arc::new(SkipMap::mmap_anon(ARENA_SIZE).unwrap())); +} + +fn test_concurrent_basic_runner(l: Arc) { #[cfg(miri)] const N: usize = 100; #[cfg(not(miri))] const N: usize = 1000; - let l = Arc::new(Skiplist::new(ARENA_SIZE)); let wg = Arc::new(()); for i in 0..N { let w = wg.clone(); @@ -121,11 +164,29 @@ fn test_concurrent_basic() { #[test] #[cfg_attr(miri, ignore)] fn test_concurrent_basic_big_values() { + test_concurrent_basic_big_values_runner(Arc::new(SkipMap::new(120 << 20))); +} + +#[test] +#[cfg_attr(miri, ignore)] +fn test_concurrent_basic_big_values_mmap() { + test_concurrent_basic_big_values_runner(Arc::new( + SkipMap::mmap(120 << 20, tempfile::tempfile().unwrap(), true).unwrap(), + )); +} + +#[test] +#[cfg_attr(miri, ignore)] +fn test_concurrent_basic_big_values_mmap_anon() { + test_concurrent_basic_big_values_runner(Arc::new(SkipMap::mmap_anon(120 << 20).unwrap())); +} + +fn test_concurrent_basic_big_values_runner(l: Arc) { #[cfg(miri)] const N: usize = 10; #[cfg(not(miri))] const N: usize = 100; - let l = Arc::new(Skiplist::new(120 << 20)); + for i in 0..N { let l = l.clone(); std::thread::spawn(move || { @@ -148,7 +209,7 @@ fn test_concurrent_basic_big_values() { } fn assert_find_near_not_null( - l: Arc, + l: Arc, less: bool, allow_equal: bool, fk: Key, @@ -164,7 +225,7 @@ fn assert_find_near_not_null( assert_eq!(is_eq, eq); } -fn assert_find_near_null(l: Arc, less: bool, allow_equal: bool, fk: Key) { +fn assert_find_near_null(l: Arc, less: bool, allow_equal: bool, fk: Key) { let (n, _, eq) = l.find_near(fk.as_key_ref(), less, allow_equal); assert!(n.is_null()); assert!(!eq); @@ -172,7 +233,7 @@ fn assert_find_near_null(l: Arc, less: bool, allow_equal: bool, fk: Ke #[test] fn test_find_near() { - let l = Arc::new(Skiplist::new(ARENA_SIZE)); + let l = Arc::new(SkipMap::new(ARENA_SIZE)); for i in 0..1000 { let k = Key::from(format!("{:05}", i * 10 + 5)); l.insert(k, new_value(i)); @@ -357,7 +418,7 @@ fn test_find_near() { #[test] fn test_iter_next() { let n = 100; - let l = Arc::new(Skiplist::new(ARENA_SIZE)); + let l = Arc::new(SkipMap::new(ARENA_SIZE)); let mut iter = l.iter(); assert!(!iter.valid()); iter.seek_to_first(); @@ -380,7 +441,7 @@ fn test_iter_next() { #[test] fn test_iter_prev() { let n = 100; - let l = Arc::new(Skiplist::new(ARENA_SIZE)); + let l = Arc::new(SkipMap::new(ARENA_SIZE)); let mut iter = l.iter(); assert!(!iter.valid()); iter.seek_to_first(); @@ -400,13 +461,13 @@ fn test_iter_prev() { assert!(!iter.valid()); } -fn assert_seek(iter: &mut SkiplistIterator, seek_to: &'static str) { +fn assert_seek(iter: &mut SkipMapIterator, seek_to: &'static str) { iter.seek(KeyRef::from(seek_to)); assert!(iter.valid()); assert_eq!(iter.value().value(), Bytes::from(seek_to)); } -fn assert_seek_null(iter: &mut SkiplistIterator, seek_to: &'static str) { +fn assert_seek_null(iter: &mut SkipMapIterator, seek_to: &'static str) { iter.seek(KeyRef::from(seek_to)); assert!(!iter.valid()); } @@ -414,7 +475,7 @@ fn assert_seek_null(iter: &mut SkiplistIterator, seek_to: &'static str) { #[test] fn test_iter_seek() { let n = 100; - let l = Arc::new(Skiplist::new(ARENA_SIZE)); + let l = Arc::new(SkipMap::new(ARENA_SIZE)); let mut iter = l.iter(); assert!(!iter.valid()); iter.seek_to_first(); diff --git a/tests/loom.rs b/tests/loom.rs index a4de4cd6..5e93d426 100644 --- a/tests/loom.rs +++ b/tests/loom.rs @@ -29,7 +29,7 @@ fn new_value(i: usize) -> Value { fn concurrent_write() { loom::model(|| { const N: usize = 2; - let l = Arc::new(Skiplist::new(ARENA_SIZE)); + let l = Arc::new(SkipMap::new(ARENA_SIZE)); let wg = Arc::new(()); for i in 0..N { let w = wg.clone(); @@ -56,7 +56,7 @@ fn concurrent_write() { fn concurrent_read() { loom::model(|| { const N: usize = 2; - let l = Arc::new(Skiplist::new(ARENA_SIZE)); + let l = Arc::new(SkipMap::new(ARENA_SIZE)); let wg = Arc::new(()); for i in 0..N { l.insert(key(i), new_value(i));