diff --git a/Cargo.toml b/Cargo.toml index 0d4d3f5f..2fff5b20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ proto = { path = "proto" } skiplist = { path = "skiplist" } memmap = "0.7" farmhash = "1.1" -prost = "0.7" +prost = "0.8" enum_dispatch = "0.3" crossbeam-channel = "0.5" crc32fast = "1.2" @@ -51,6 +51,10 @@ harness = false name = "bench_iterator" harness = false +[[bench]] +name = "bench_manifest" +harness = false + [profile.bench] opt-level = 3 debug = false diff --git a/benches/bench_manifest.rs b/benches/bench_manifest.rs new file mode 100644 index 00000000..30ff4243 --- /dev/null +++ b/benches/bench_manifest.rs @@ -0,0 +1,65 @@ +mod common; + +use agatedb::util::{BTree, ComparableRecord}; +use rand::Rng; +use std::sync::Arc; + +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use std::time::Instant; + +struct FakeTableInner { + id: u64, + smallest: Bytes, + largest: Bytes, +} + +#[derive(Clone)] +struct FakeTable { + inner: Arc, +} + +impl ComparableRecord for FakeTable { + fn smallest(&self) -> &Bytes { + &self.inner.smallest + } + + fn largest(&self) -> &Bytes { + &self.inner.largest + } + + fn id(&self) -> u64 { + self.inner.id + } +} + +fn benche_manifest(c: &mut Criterion) { + const KEY_RANGE_COUNT: u64 = 10000; // about 64MB + const KEY_BASE: u64 = 1000_1000; // about 64MB + + // let mut rng = rand::thread_rng(); + // let mut tree = LevelTree::::new(64, 128); + + let mut test_count = 0; + println!("start bench"); + c.bench_function("table builder", |b| { + b.iter(|| { + // let j = rng.gen_range(0, KEY_RANGE_COUNT - 1); + // let left = KEY_BASE + j as usize * 100; + // let left = left + 99; + // let smallest = Bytes::from(left.to_string()); + // let largest = Bytes::from(right.to_string()); + // TODO: check whether key existed and decide delete or insert. + test_count += 1; + }); + }); + println!("end bench, {}", test_count); +} + +criterion_group! { + name = benches_manifest; + config = Criterion::default(); + targets = benche_manifest +} + +criterion_main!(benches_manifest); diff --git a/proto/Cargo.toml b/proto/Cargo.toml index ccfee994..1f55de7e 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] bytes = "1.0" -prost = "0.7" +prost = "0.8" [build-dependencies] -prost-build = { version = "0.6" } +prost-build = { version = "0.8" } diff --git a/src/db.rs b/src/db.rs index 7e3ec8c7..1338747e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -626,14 +626,11 @@ impl Core { vlog.write(&mut requests)?; } - let mut cnt = 0; - // writing to LSM for req in requests { if req.entries.is_empty() { continue; } - cnt += req.entries.len(); while let Err(_) = self.ensure_room_for_write() { std::thread::sleep(std::time::Duration::from_millis(10)); @@ -643,8 +640,6 @@ impl Core { self.write_to_lsm(req)?; } - // eprintln!("{} entries written", cnt); - Ok(()) }; diff --git a/src/levels.rs b/src/levels.rs index 250e73fd..b71da3cd 100644 --- a/src/levels.rs +++ b/src/levels.rs @@ -4,10 +4,11 @@ mod handler; #[cfg(test)] pub(crate) mod tests; +use crate::table::BTreeTableAccessor; use compaction::{ - get_key_range, get_key_range_single, CompactDef, CompactStatus, CompactionPriority, KeyRange, - LevelCompactStatus, Targets, + get_key_range, CompactDef, CompactStatus, CompactionPriority, LevelCompactStatus, Targets, }; +use handler::HandlerBaseLevel; use handler::LevelHandler; use proto::meta::ManifestChangeSet; @@ -16,7 +17,9 @@ use crate::manifest::{new_create_change, new_delete_change, ManifestFile}; use crate::ops::oracle::Oracle; use crate::opt::build_table_options; use crate::table::{MergeIterator, TableIterators}; -use crate::util::{has_any_prefixes, same_key, KeyComparator, COMPARATOR}; +use crate::util::{ + has_any_prefixes, same_key, ComparableRecord, KeyComparator, KeyRange, COMPARATOR, +}; use crate::value::{Value, ValuePointer}; use crate::AgateIterator; use crate::TableBuilder; @@ -33,6 +36,7 @@ use std::sync::atomic::AtomicU64; use std::sync::{Arc, RwLock}; use std::time::Duration; +use crate::levels::handler::HandlerLevel0; use bytes::{BufMut, Bytes, BytesMut}; use crossbeam_channel::{select, tick, unbounded}; use yatp::task::callback::Handle; @@ -40,7 +44,7 @@ use yatp::task::callback::Handle; pub(crate) struct Core { next_file_id: AtomicU64, // `levels[i].level == i` should be ensured - pub(crate) levels: Vec>>, + pub(crate) levels: Vec>>>, opts: AgateOptions, // TODO: agate oracle, manifest should be added here cpt_status: RwLock, @@ -85,8 +89,15 @@ impl Core { eprintln!("{} tables opened", num_opened); for (i, tables) in tables.into_iter().enumerate() { - let mut level = LevelHandler::new(opts.clone(), i); - level.init_tables(tables); + let level: Box = if i == 0 { + Box::new(HandlerLevel0::new(tables, opts.clone(), i)) + } else { + Box::new(HandlerBaseLevel::::new( + tables, + opts.clone(), + i, + )) + }; levels.push(Arc::new(RwLock::new(level))); cpt_status_levels.push(LevelCompactStatus::default()); @@ -122,7 +133,10 @@ impl Core { let start = std::time::Instant::now(); let mut last_log = std::time::Instant::now(); - while !self.levels[0].write()?.try_add_l0_table(table.clone()) { + while !self.levels[0] + .write()? + .replace_tables(&[], &[table.clone()]) + { let current = std::time::Instant::now(); let duration = current.duration_since(start); if duration.as_millis() > 1000 { @@ -184,30 +198,17 @@ impl Core { )); } - if this_level.tables.is_empty() { + if this_level.num_tables() == 0 { return Err(Error::CustomError("not table in this level".to_string())); } if compact_def.drop_prefixes.is_empty() { - let mut out = vec![]; - let mut kr = KeyRange::default(); - for table in this_level.tables.iter() { - let dkr = get_key_range_single(table); - if kr.overlaps_with(&dkr) { - out.push(table.clone()); - kr.extend(&dkr); - } else { - break; - } - } - compact_def.top = out; + compact_def.top = this_level.overlapping_tables(&KeyRange::default()); } compact_def.this_range = get_key_range(&compact_def.top); - let (left, right) = next_level.overlapping_tables(&compact_def.this_range); - - compact_def.bot = next_level.tables[left..right].to_vec(); + compact_def.bot = next_level.overlapping_tables(&compact_def.this_range); if compact_def.bot.is_empty() { compact_def.next_range = compact_def.this_range.clone(); @@ -240,20 +241,8 @@ impl Core { // TODO: don't hold cpt_status through this function let mut cpt_status = self.cpt_status.write().unwrap(); - let mut out = vec![]; - // let now = std::time::Instant::now(); - - for table in this_level.tables.iter() { - if table.size() > 2 * compact_def.targets.file_size[0] { - // file already big, don't include it - continue; - } - // TODO: created at logic - if cpt_status.tables.contains(&table.id()) { - continue; - } - out.push(table.clone()); - } + let out = + this_level.pick_all_tables(2 * compact_def.targets.file_size[0], &cpt_status.tables); if out.len() < 4 { return Err(Error::CustomError("not enough table to merge".to_string())); @@ -262,12 +251,16 @@ impl Core { compact_def.this_range = KeyRange::inf(); compact_def.top = out; - cpt_status.levels[this_level.level] + cpt_status.levels[this_level.level()] .ranges .push(KeyRange::inf()); for table in compact_def.top.iter() { - assert!(cpt_status.tables.insert(table.id()), false); + assert!( + cpt_status.tables.insert(table.id()), + "insert to compaction table must success, but get {}", + false + ); } compact_def.targets.file_size[0] = std::u32::MAX as u64; @@ -283,12 +276,12 @@ impl Core { } fn fill_tables(&self, compact_def: &mut CompactDef) -> Result<()> { - let this_level = compact_def.this_level.read().unwrap(); - let next_level = compact_def.next_level.read().unwrap(); - - let tables = &this_level.tables; + let this_level = compact_def.this_level.clone(); + let next_level = compact_def.next_level.clone(); + let this_guard = this_level.read().unwrap(); + let next_guard = next_level.read().unwrap(); - if tables.is_empty() { + if this_guard.num_tables() == 0 { return Err(Error::CustomError("no tables to compact".to_string())); } @@ -296,49 +289,7 @@ impl Core { // TODO: don't hold cpt_status write lock for long time let mut cpt_status = self.cpt_status.write().unwrap(); - - for table in tables { - compact_def.this_size = table.size(); - compact_def.this_range = get_key_range_single(table); - // if we're already compacting this range, don't do anything - if cpt_status.overlaps_with(compact_def.this_level_id, &compact_def.this_range) { - continue; - } - compact_def.top = vec![table.clone()]; - let (left, right) = next_level.overlapping_tables(&compact_def.this_range); - - if right < left { - eprintln!("right {} is less than left {} in overlapping_tables for current level {}, next level {}, key_range {:?}", - right, left, compact_def.this_level_id, - compact_def.next_level_id, compact_def.this_range); - continue; - } - - compact_def.bot = next_level.tables[left..right].to_vec(); - - if compact_def.bot.is_empty() { - compact_def.bot = vec![]; - compact_def.next_range = compact_def.this_range.clone(); - if let Err(_) = cpt_status.compare_and_add(compact_def) { - continue; - } - return Ok(()); - } - - compact_def.next_range = get_key_range(&compact_def.bot); - - if cpt_status.overlaps_with(compact_def.next_level_id, &compact_def.next_range) { - continue; - } - - if let Err(_) = cpt_status.compare_and_add(compact_def) { - continue; - } - - return Ok(()); - } - - Err(Error::CustomError("no table to fill".to_string())) + this_guard.select_table_range(next_guard.as_ref(), compact_def, &mut *cpt_status) } fn run_compact_def( @@ -376,12 +327,12 @@ impl Core { if this_level_id != next_level_id { let mut this_level = this_level.write().unwrap(); let mut next_level = next_level.write().unwrap(); - this_level.delete_tables(&compact_def.top)?; - next_level.replace_tables(&compact_def.bot, &new_tables)?; + this_level.replace_tables(&compact_def.top, &[]); + next_level.replace_tables(&compact_def.bot, &new_tables); } else { let mut this_level = this_level.write().unwrap(); - this_level.delete_tables(&compact_def.top)?; - this_level.replace_tables(&compact_def.bot, &new_tables)?; + this_level.replace_tables(&compact_def.top, &[]); + this_level.replace_tables(&compact_def.bot, &new_tables); } // TODO: logging @@ -463,7 +414,7 @@ impl Core { } // TODO: sync dir - new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.biggest(), y.biggest())); + new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.largest(), y.largest())); Ok(new_tables) } @@ -648,7 +599,7 @@ impl Core { } if i % N == N - 1 { - let biggest = table.biggest(); + let biggest = table.largest(); let mut buf = BytesMut::with_capacity(biggest.len() + 8); buf.put(user_key(&biggest)); let right = key_with_ts(buf, std::u64::MAX); @@ -680,7 +631,7 @@ impl Core { let next_level = self.levels[base_level].clone(); compact_def = CompactDef::new( idx, - self.levels[level].clone(), + self.levels[0].clone(), level, next_level, base_level, @@ -733,7 +684,7 @@ impl Core { file_size: vec![0; self.levels.len()], }; - let mut db_size = self.last_level().read().unwrap().total_size; + let mut db_size = self.last_level().read().unwrap().total_size(); for i in (1..self.levels.len()).rev() { let ltarget = adjust(db_size); @@ -760,7 +711,7 @@ impl Core { targets } - fn last_level(&self) -> &Arc> { + fn last_level(&self) -> &Arc>> { self.levels.last().unwrap() } @@ -796,8 +747,8 @@ impl Core { // We may safely ignore this situation. // TODO: check if we could make it more stable let size; - if del_size <= level.total_size { - size = level.total_size - del_size; + if del_size <= level.total_size() { + size = level.total_size() - del_size; } else { size = 0; } @@ -822,9 +773,9 @@ impl Core { continue; } let lvl = lh.read().unwrap(); - let (left, right) = lvl.overlapping_tables(&kr); + let ret = lvl.overlapping_tables(&kr); drop(lvl); - if right - left > 0 { + if !ret.is_empty() { return true; } } diff --git a/src/levels/compaction.rs b/src/levels/compaction.rs index 7c20b538..e11e76ed 100644 --- a/src/levels/compaction.rs +++ b/src/levels/compaction.rs @@ -4,93 +4,11 @@ use std::sync::{Arc, RwLock}; use bytes::{Bytes, BytesMut}; -use super::LevelHandler; use crate::format::{key_with_ts, user_key}; -use crate::util::{KeyComparator, COMPARATOR}; +use crate::levels::handler::LevelHandler; +use crate::util::{ComparableRecord, KeyComparator, KeyRange, COMPARATOR}; use crate::{Error, Result, Table}; -// TODO: use enum for this struct -#[derive(PartialEq, Eq, Clone, Debug)] -pub struct KeyRange { - pub left: Bytes, - pub right: Bytes, - pub inf: bool, -} - -impl KeyRange { - pub fn is_empty(&self) -> bool { - if self.inf { - false - } else { - self.left.is_empty() && self.right.is_empty() - } - } - - pub fn inf() -> Self { - Self { - left: Bytes::new(), - right: Bytes::new(), - inf: true, - } - } - - pub fn new(left: Bytes, right: Bytes) -> Self { - Self { - left, - right, - inf: false, - } - } - - pub fn extend(&mut self, range: &Self) { - if self.is_empty() { - *self = range.clone(); - return; - } - if range.left.len() == 0 - || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Less - { - self.left = range.left.clone(); - } - if range.right.len() == 0 - || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Greater - { - self.right = range.right.clone(); - } - if range.inf { - self.inf = true; - self.left = Bytes::new(); - self.right = Bytes::new(); - } - } - - pub fn overlaps_with(&self, dst: &Self) -> bool { - if self.is_empty() { - return true; - } - if self.inf || dst.inf { - return true; - } - if COMPARATOR.compare_key(&self.left, &dst.right) == std::cmp::Ordering::Greater { - return false; - } - if COMPARATOR.compare_key(&self.right, &dst.left) == std::cmp::Ordering::Less { - return false; - } - true - } -} - -impl Default for KeyRange { - fn default() -> Self { - Self { - left: Bytes::new(), - right: Bytes::new(), - inf: false, - } - } -} - #[derive(Default)] pub struct LevelCompactStatus { pub ranges: Vec, @@ -124,8 +42,8 @@ pub struct CompactStatus { #[derive(Clone)] pub struct CompactDef { pub compactor_id: usize, - pub this_level: Arc>, - pub next_level: Arc>, + pub this_level: Arc>>, + pub next_level: Arc>>, pub this_level_id: usize, pub next_level_id: usize, pub targets: Targets, @@ -146,9 +64,9 @@ pub struct CompactDef { impl CompactDef { pub fn new( compactor_id: usize, - this_level: Arc>, + this_level: Arc>>, this_level_id: usize, - next_level: Arc>, + next_level: Arc>>, next_level_id: usize, prios: CompactionPriority, targets: Targets, @@ -285,14 +203,14 @@ pub fn get_key_range(tables: &[Table]) -> KeyRange { } let mut smallest = tables[0].smallest().clone(); - let mut biggest = tables[0].biggest().clone(); + let mut biggest = tables[0].largest().clone(); for i in 1..tables.len() { if COMPARATOR.compare_key(tables[i].smallest(), &smallest) == std::cmp::Ordering::Less { smallest = tables[i].smallest().clone(); } - if COMPARATOR.compare_key(tables[i].biggest(), &biggest) == std::cmp::Ordering::Greater { - biggest = tables[i].biggest().clone(); + if COMPARATOR.compare_key(tables[i].largest(), &biggest) == std::cmp::Ordering::Greater { + biggest = tables[i].largest().clone(); } } let mut smallest_buf = BytesMut::with_capacity(smallest.len() + 8); @@ -304,17 +222,3 @@ pub fn get_key_range(tables: &[Table]) -> KeyRange { key_with_ts(biggest_buf, 0), ); } - -pub fn get_key_range_single(table: &Table) -> KeyRange { - let smallest = table.smallest().clone(); - let biggest = table.biggest().clone(); - - let mut smallest_buf = BytesMut::with_capacity(smallest.len() + 8); - let mut biggest_buf = BytesMut::with_capacity(biggest.len() + 8); - smallest_buf.extend_from_slice(user_key(&smallest)); - biggest_buf.extend_from_slice(user_key(&biggest)); - return KeyRange::new( - key_with_ts(smallest_buf, std::u64::MAX), - key_with_ts(biggest_buf, 0), - ); -} diff --git a/src/levels/handler.rs b/src/levels/handler.rs index 2fafb3f4..c8ae2991 100644 --- a/src/levels/handler.rs +++ b/src/levels/handler.rs @@ -1,5 +1,7 @@ +use std::collections::HashSet; + +use crate::table::{get_key_range_single, TableAccessor, TableAccessorIterator}; use crate::value::Value; -use crate::Result; use crate::{format::user_key, get_ts, iterator_trait::AgateIterator}; use crate::{iterator::IteratorOptions, table::TableIterators}; use crate::{ @@ -7,84 +9,202 @@ use crate::{ util::{KeyComparator, COMPARATOR}, }; use crate::{AgateOptions, Table}; +use crate::{Error, Result}; use super::KeyRange; +use super::compaction::{get_key_range, CompactDef, CompactStatus}; +use crate::table::merge_iterator::Iterators; +use crate::util::ComparableRecord; use bytes::Bytes; -use std::collections::HashSet; +pub trait LevelHandler: Send + Sync { + fn level(&self) -> usize; + fn num_tables(&self) -> usize; + fn total_size(&self) -> u64; + + fn get(&self, key: &Bytes) -> Result; + + fn append_iterators(&self, iters: &mut Vec, opts: &IteratorOptions); + fn overlapping_tables(&self, kr: &KeyRange) -> Vec; + fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> bool; + fn select_table_range( + &self, + other: &dyn LevelHandler, + compact_def: &mut CompactDef, + status: &mut CompactStatus, + ) -> Result<()>; + + fn pick_all_tables(&self, max_file_size: u64, exists: &HashSet) -> Vec
; +} -pub struct LevelHandler { +pub struct HandlerBaseLevel { opts: AgateOptions, - pub level: usize, - pub tables: Vec
, - pub total_size: u64, + level: usize, + table_acessor: T, } -impl Drop for LevelHandler { +impl Drop for HandlerBaseLevel { fn drop(&mut self) { - for table in self.tables.drain(..) { - // TODO: simply forget table instance would cause memory leak. Should find - // a better way to handle this. For example, `table.close_and_save()`, which - // consumes table instance without deleting the files. - table.mark_save(); + let mut iter = self.table_acessor.new_iterator(); + iter.seek_first(); + while iter.valid() { + iter.table().unwrap().mark_save(); + iter.next(); } } } -impl LevelHandler { - pub fn new(opts: AgateOptions, level: usize) -> Self { +impl HandlerBaseLevel { + pub fn new(tables: Vec
, opts: AgateOptions, level: usize) -> Self { + let table_acessor = T::create(tables); Self { opts, level, - tables: vec![], - total_size: 0, + table_acessor, } } +} - pub fn try_add_l0_table(&mut self, table: Table) -> bool { - assert_eq!(self.level, 0); - if self.tables.len() >= self.opts.num_level_zero_tables_stall { - return false; +impl LevelHandler for HandlerBaseLevel { + fn level(&self) -> usize { + self.level + } + + fn num_tables(&self) -> usize { + self.table_acessor.len() + } + + fn total_size(&self) -> u64 { + self.table_acessor.total_size() + } + + fn get(&self, key: &Bytes) -> Result { + let key_no_ts = user_key(key); + let hash = farmhash::fingerprint32(key_no_ts); + let mut max_vs = Value::default(); + + if let Some(table) = self.table_acessor.get(key) { + if !table.does_not_have(hash) { + let mut it = table.new_iterator(0); + it.seek(key); + if it.valid() { + if crate::util::same_key(key, it.key()) { + let version = get_ts(it.key()); + max_vs = it.value(); + max_vs.version = version; + } + } + } + } + Ok(max_vs) + } + + fn append_iterators(&self, iters: &mut Vec, _: &IteratorOptions) { + if !self.table_acessor.is_empty() { + let iter = ConcatIterator::from_tables(Box::new(self.table_acessor.new_iterator()), 0); + iters.push(TableIterators::from(iter)); } + } + + fn overlapping_tables(&self, kr: &KeyRange) -> Vec
{ + use std::cmp::Ordering::*; - self.total_size += table.size(); - self.tables.push(table); + if kr.left.is_empty() || kr.right.is_empty() { + return vec![]; + } + let mut ret = vec![]; + let mut iter = self.table_acessor.new_iterator(); + iter.seek(&kr.left); + while let Some(table) = iter.table() { + if COMPARATOR.compare_key(&kr.right, table.smallest()) == Less { + break; + } + ret.push(table); + iter.next(); + } + ret + } + fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> bool { + let acessor = self.table_acessor.replace_tables(to_del, to_add); + self.table_acessor = acessor; true } - pub fn num_tables(&self) -> usize { - self.tables.len() + fn select_table_range( + &self, + next_level: &dyn LevelHandler, + compact_def: &mut CompactDef, + status: &mut CompactStatus, + ) -> Result<()> { + let mut it = self.table_acessor.new_iterator(); + it.seek_first(); + while let Some(table) = it.table() { + if select_table_range(&table, next_level, compact_def, status) { + return Ok(()); + } + it.next(); + } + Err(Error::CustomError("no table to fill".to_string())) } - pub fn get_table_for_key(&self, key: &Bytes) -> Vec
{ - if self.level == 0 { - // for level 0, we need to iterate every table. + fn pick_all_tables(&self, max_file_size: u64, tables: &HashSet) -> Vec
{ + let mut ret = vec![]; + let mut it = self.table_acessor.new_iterator(); + it.seek_first(); + while let Some(table) = it.table() { + if table.size() <= max_file_size && !tables.contains(&table.id()) { + ret.push(table); + } + it.next(); + } + ret + } +} - let mut out = self.tables.clone(); - out.reverse(); +pub struct HandlerLevel0 { + tables: Vec
, + opts: AgateOptions, + level: usize, + total_size: u64, +} - out - } else { - let idx = crate::util::search(self.tables.len(), |idx| { - COMPARATOR.compare_key(self.tables[idx].biggest(), key) != std::cmp::Ordering::Less - }); - if idx >= self.tables.len() { - vec![] - } else { - vec![self.tables[idx].clone()] - } +impl HandlerLevel0 { + pub fn new(mut tables: Vec
, opts: AgateOptions, level: usize) -> Self { + tables.sort_by(|x, y| x.id().cmp(&y.id())); + let mut total_size = 0; + for table in &tables { + total_size += table.size(); } + Self { + opts, + level, + tables, + total_size, + } + } +} + +impl LevelHandler for HandlerLevel0 { + fn level(&self) -> usize { + self.level } - pub fn get(&self, key: &Bytes) -> Result { - let tables = self.get_table_for_key(key); + fn num_tables(&self) -> usize { + self.tables.len() + } + + fn total_size(&self) -> u64 { + self.total_size + } + + fn get(&self, key: &Bytes) -> Result { let key_no_ts = user_key(key); let hash = farmhash::fingerprint32(key_no_ts); let mut max_vs = Value::default(); - for table in tables { + for table in self.tables.iter() { if table.does_not_have(hash) { continue; } @@ -104,33 +224,37 @@ impl LevelHandler { } } } - Ok(max_vs) } - pub fn overlapping_tables(&self, kr: &KeyRange) -> (usize, usize) { - use std::cmp::Ordering::*; - - if kr.left.is_empty() || kr.right.is_empty() { - return (0, 0); - } - let left = crate::util::search(self.tables.len(), |i| { - match COMPARATOR.compare_key(&kr.left, self.tables[i].biggest()) { - Less | Equal => true, - _ => false, + fn append_iterators(&self, iters: &mut Vec, opts: &IteratorOptions) { + for table in self.tables.iter().rev() { + if opts.pick_table(table) { + iters.push(TableIterators::from(table.new_iterator(0))); } - }); - let right = crate::util::search(self.tables.len(), |i| { - match COMPARATOR.compare_key(&kr.right, self.tables[i].smallest()) { - Less => true, - _ => false, + } + } + + fn overlapping_tables(&self, _: &KeyRange) -> Vec
{ + let mut out = vec![]; + let mut kr = KeyRange::default(); + for table in self.tables.iter() { + let dkr = get_key_range_single(table); + if kr.overlaps_with(&dkr) { + out.push(table.clone()); + kr.extend(&dkr); + } else { + break; } - }); - (left, right) + } + out } - pub fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> Result<()> { - // TODO: handle deletion + fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> bool { + assert_eq!(self.level, 0); + if self.tables.len() >= self.opts.num_level_zero_tables_stall { + return false; + } let mut to_del_map = HashSet::new(); for table in to_del { @@ -146,72 +270,78 @@ impl LevelHandler { } self.total_size = self.total_size.saturating_sub(table.size()); } - - for table in to_add { - self.total_size += table.size(); - new_tables.push(table.clone()); + for t in to_add { + self.total_size += t.size(); + new_tables.push(t.clone()); } - - new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.smallest(), y.smallest())); - + new_tables.sort_by(|a, b| a.id().cmp(&b.id())); self.tables = new_tables; - - Ok(()) + true } - pub fn delete_tables(&mut self, to_del: &[Table]) -> Result<()> { - let mut to_del_map = HashSet::new(); - - for table in to_del { - to_del_map.insert(table.id()); + fn select_table_range( + &self, + next_level: &dyn LevelHandler, + compact_def: &mut CompactDef, + status: &mut CompactStatus, + ) -> Result<()> { + for table in self.tables.iter() { + if select_table_range(&table, next_level, compact_def, status) { + return Ok(()); + } } + Err(Error::CustomError("no table to fill".to_string())) + } - let mut new_tables = vec![]; - - for table in &self.tables { - if !to_del_map.contains(&table.id()) { - new_tables.push(table.clone()); + fn pick_all_tables(&self, max_file_size: u64, tables: &HashSet) -> Vec
{ + let mut out = vec![]; + for table in self.tables.iter() { + if table.size() > max_file_size { + // file already big, don't include it continue; } - self.total_size = self.total_size.saturating_sub(table.size()); + // TODO: created at logic + if tables.contains(&table.id()) { + continue; + } + out.push(table.clone()); } - - self.tables = new_tables; - - Ok(()) + out } +} - pub fn init_tables(&mut self, tables: Vec
) { - self.tables = tables; - self.total_size = 0; - for table in &self.tables { - self.total_size += table.size(); - } +fn select_table_range( + table: &Table, + next_level: &dyn LevelHandler, + compact_def: &mut CompactDef, + cpt_status: &mut CompactStatus, +) -> bool { + compact_def.this_size = table.size(); + compact_def.this_range = get_key_range_single(table); + // if we're already compacting this range, don't do anything + if cpt_status.overlaps_with(compact_def.this_level_id, &compact_def.this_range) { + return false; + } + compact_def.top = vec![table.clone()]; + compact_def.bot = next_level.overlapping_tables(&compact_def.this_range); - if self.level == 0 { - self.tables.sort_by(|x, y| x.id().cmp(&y.id())); - } else { - self.tables - .sort_by(|x, y| COMPARATOR.compare_key(x.smallest(), y.smallest())); + if compact_def.bot.is_empty() { + compact_def.bot = vec![]; + compact_def.next_range = compact_def.this_range.clone(); + if let Err(_) = cpt_status.compare_and_add(compact_def) { + return false; } + return true; } - pub(crate) fn append_iterators(&self, iters: &mut Vec, opts: &IteratorOptions) { - if self.level == 0 { - for table in self.tables.iter().rev() { - if opts.pick_table(table) { - iters.push(TableIterators::from(table.new_iterator(0))); - } - } - return; - } + compact_def.next_range = get_key_range(&compact_def.bot); - let mut tables = self.tables.clone(); - opts.pick_tables(&mut tables); - if !tables.is_empty() { - let iter = ConcatIterator::from_tables(tables, 0); - iters.push(TableIterators::from(iter)); - } - return; + if cpt_status.overlaps_with(compact_def.next_level_id, &compact_def.next_range) { + return false; + } + + if let Err(_) = cpt_status.compare_and_add(compact_def) { + return false; } + return true; } diff --git a/src/levels/tests.rs b/src/levels/tests.rs index 9a543b27..9f927b36 100644 --- a/src/levels/tests.rs +++ b/src/levels/tests.rs @@ -1,16 +1,19 @@ use super::*; use crate::{db::tests::with_agate_test, table::new_filename, Agate, TableOptions}; +#[cfg(test)] pub fn helper_dump_levels(lvctl: &LevelsController) { for level in &lvctl.core.levels { let level = level.read().unwrap(); - eprintln!("--- Level {} ---", level.level); - for table in &level.tables { + eprintln!("--- Level {} ---", level.level()); + let exists = HashSet::new(); + let tables = level.pick_all_tables(u64::MAX, &exists); + for table in tables { eprintln!( "#{} ({:?} - {:?}, {})", table.id(), table.smallest(), - table.biggest(), + table.largest(), table.size() ); } @@ -72,7 +75,7 @@ fn create_and_open(agate: &mut Agate, td: Vec, level: usize) { .add_changes(vec![new_create_change(table.id(), level, 0)]) .unwrap(); let mut lv = agate.core.lvctl.core.levels[level].write().unwrap(); - lv.tables.push(table); + lv.replace_tables(&[], &[table]); } mod overlap { @@ -89,13 +92,11 @@ mod overlap { let l0_tables = agate.core.lvctl.core.levels[0] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); let l1_tables = agate.core.lvctl.core.levels[1] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); // lv0 should overlap with lv0 tables assert!(agate.core.lvctl.core.check_overlap(&l0_tables, 0)); @@ -124,13 +125,11 @@ mod overlap { let l0_tables = agate.core.lvctl.core.levels[0] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); let l1_tables = agate.core.lvctl.core.levels[1] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); // lv0 should overlap with lv0 tables assert!(agate.core.lvctl.core.check_overlap(&l0_tables, 0)); @@ -158,8 +157,7 @@ mod overlap { let l0_tables = agate.core.lvctl.core.levels[0] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); // lv1 should not overlap with lv0 tables assert!(!agate.core.lvctl.core.check_overlap(&l0_tables, 1)); @@ -222,6 +220,7 @@ fn generate_test_compect_def( adjusted: 0.0, drop_prefixes: vec![], }; + let exists = HashSet::new(); let mut compact_def = CompactDef::new( 0, agate.core.lvctl.core.levels[this_level_id].clone(), @@ -234,13 +233,11 @@ fn generate_test_compect_def( compact_def.top = agate.core.lvctl.core.levels[this_level_id] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &exists); compact_def.bot = agate.core.lvctl.core.levels[next_level_id] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &exists); compact_def.targets.base_level = next_level_id; compact_def } diff --git a/src/table.rs b/src/table.rs index f669d121..7ffffd79 100644 --- a/src/table.rs +++ b/src/table.rs @@ -5,12 +5,14 @@ pub mod merge_iterator; use crate::bloom::Bloom; use crate::checksum; +use crate::format::{key_with_ts, user_key}; use crate::iterator_trait::AgateIterator; use crate::opt::{ChecksumVerificationMode, Options}; +use crate::util::{ComparableRecord, KeyRange}; use crate::Error; use crate::Result; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use iterator::TableRefIterator; use memmap::{Mmap, MmapOptions}; use prost::Message; @@ -25,8 +27,15 @@ pub use iterator::{ITERATOR_NOCACHE, ITERATOR_REVERSED}; pub use merge_iterator::{Iterators as TableIterators, MergeIterator}; pub type TableIterator = TableRefIterator>; +mod btree_table_accessor; +mod table_accessor; #[cfg(test)] mod tests; +mod vec_table_acessor; + +pub use btree_table_accessor::{BTreeTableAccessor, BTreeTableAccessorIterator}; +pub use table_accessor::{TableAccessor, TableAccessorIterator}; +pub use vec_table_acessor::{VecTableAccessor, VecTableAccessorIterator}; /// MmapFile stores SST data. `File` refers to a file on disk, /// and `Memory` refers to data in memory. @@ -583,19 +592,6 @@ impl Table { self.inner.size() } - /// Get ID of SST - pub fn id(&self) -> u64 { - self.inner.id() - } - - pub fn biggest(&self) -> &Bytes { - self.inner.biggest() - } - - pub fn smallest(&self) -> &Bytes { - self.inner.smallest() - } - pub fn is_in_memory(&self) -> bool { self.inner.is_in_memory() } @@ -607,6 +603,20 @@ impl Table { } } +impl ComparableRecord for Table { + fn smallest(&self) -> &Bytes { + self.inner.smallest() + } + fn largest(&self) -> &Bytes { + self.inner.biggest() + } + + /// Get ID of SST + fn id(&self) -> u64 { + self.inner.id() + } +} + fn id_to_filename(id: u64) -> String { format!("{:06}.sst", id) } @@ -614,3 +624,17 @@ fn id_to_filename(id: u64) -> String { pub fn new_filename>(id: u64, dir: P) -> PathBuf { dir.as_ref().join(id_to_filename(id)) } + +pub fn get_key_range_single(table: &Table) -> KeyRange { + let smallest = table.smallest().clone(); + let biggest = table.largest().clone(); + + let mut smallest_buf = BytesMut::with_capacity(smallest.len() + 8); + let mut biggest_buf = BytesMut::with_capacity(biggest.len() + 8); + smallest_buf.extend_from_slice(user_key(&smallest)); + biggest_buf.extend_from_slice(user_key(&biggest)); + return KeyRange::new( + key_with_ts(smallest_buf, std::u64::MAX), + key_with_ts(biggest_buf, 0), + ); +} diff --git a/src/table/btree_table_accessor.rs b/src/table/btree_table_accessor.rs new file mode 100644 index 00000000..632b3fda --- /dev/null +++ b/src/table/btree_table_accessor.rs @@ -0,0 +1,97 @@ +use super::Table; +use crate::table::{TableAccessor, TableAccessorIterator}; +use crate::util::{BTree, BTreeIterator, PageIterator}; +use bytes::Bytes; + +const MAX_LEAF_PAGE_SIZE: usize = 128; +const MAX_TREE_PAGE_SIZE: usize = 64; + +pub struct BTreeTableAccessor { + inner: BTree
, + total_size: u64, +} +pub struct BTreeTableAccessorIterator { + inner: BTreeIterator
, +} + +impl TableAccessorIterator for BTreeTableAccessorIterator { + fn seek(&mut self, key: &Bytes) { + self.inner.seek(key); + } + + fn seek_for_previous(&mut self, key: &Bytes) { + self.inner.seek_for_previous(key); + } + + fn seek_first(&mut self) { + self.inner.seek_to_first(); + } + + fn seek_last(&mut self) { + self.inner.seek_to_last(); + } + + fn prev(&mut self) { + self.inner.prev(); + } + + fn next(&mut self) { + self.inner.next(); + } + + fn table(&self) -> Option
{ + self.inner.record() + } + + fn valid(&self) -> bool { + self.inner.valid() + } +} + +impl TableAccessor for BTreeTableAccessor { + type Iter = BTreeTableAccessorIterator; + + fn create(tables: Vec
) -> Self { + let mut total_size = 0; + let tree = BTree::new(MAX_TREE_PAGE_SIZE, MAX_LEAF_PAGE_SIZE); + for t in &tables { + total_size += t.size(); + } + let inner = tree.replace(vec![], tables); + BTreeTableAccessor { inner, total_size } + } + + fn get(&self, key: &Bytes) -> Option
{ + self.inner.get(key) + } + + fn is_empty(&self) -> bool { + self.inner.size() > 0 + } + + fn len(&self) -> usize { + self.inner.size() + } + + fn total_size(&self) -> u64 { + self.total_size + } + + fn new_iterator(&self) -> Self::Iter { + BTreeTableAccessorIterator { + inner: self.inner.new_iterator(), + } + } + + fn replace_tables(&self, to_del: &[Table], to_add: &[Table]) -> Self { + let mut total_size = self.total_size; + for t in to_add { + total_size += t.size(); + } + for t in to_del { + total_size -= t.size(); + } + let inner = self.inner.replace(to_del.to_vec(), to_add.to_vec()); + BTreeTableAccessor { inner, total_size } + } +} diff --git a/src/table/concat_iterator.rs b/src/table/concat_iterator.rs index 309e6da4..e87d35f8 100644 --- a/src/table/concat_iterator.rs +++ b/src/table/concat_iterator.rs @@ -1,119 +1,92 @@ use super::iterator::ITERATOR_REVERSED; -use super::{AgateIterator, Table, TableIterator}; -use crate::util::{KeyComparator, COMPARATOR}; +use super::{AgateIterator, TableIterator}; +use crate::table::table_accessor::TableAccessorIterator; use crate::value::Value; use bytes::Bytes; /// ConcatIterator iterates on SSTs with no overlap keys. pub struct ConcatIterator { - cur: Option, - iters: Vec>, - tables: Vec
, + cur_iter: Option, + accessor: Box, opt: usize, } impl ConcatIterator { /// Create `ConcatIterator` from a list of tables. Tables must have been sorted /// and have no overlap keys. - pub fn from_tables(tables: Vec
, opt: usize) -> Self { - let iters = tables.iter().map(|_| None).collect(); - + pub fn from_tables(accessor: Box, opt: usize) -> Self { ConcatIterator { - cur: None, - iters, - tables, + cur_iter: None, + accessor, opt, } } - fn set_idx(&mut self, idx: usize) { - if idx >= self.iters.len() { - self.cur = None; - return; - } - if self.iters[idx].is_none() { - self.iters[idx] = Some(self.tables[idx].new_iterator(self.opt)); - } - self.cur = Some(idx); - } - fn iter_mut(&mut self) -> &mut TableIterator { - self.iters[self.cur.unwrap()].as_mut().unwrap() + self.cur_iter.as_mut().unwrap() } fn iter_ref(&self) -> &TableIterator { - self.iters[self.cur.unwrap()].as_ref().unwrap() + self.cur_iter.as_ref().unwrap() } } impl AgateIterator for ConcatIterator { fn next(&mut self) { - let cur = self.cur.unwrap(); let cur_iter = self.iter_mut(); cur_iter.next(); if cur_iter.valid() { return; } drop(cur_iter); + self.cur_iter = None; loop { if self.opt & ITERATOR_REVERSED == 0 { - self.set_idx(cur + 1); + self.accessor.next(); } else { - if cur == 0 { - self.cur = None; - } else { - self.set_idx(cur - 1); - } + self.accessor.prev(); } - if self.cur.is_some() { - self.iter_mut().rewind(); - if self.iter_ref().valid() { - return; - } - } else { + if !self.accessor.valid() { return; } + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); + self.iter_mut().rewind(); + if self.iter_ref().valid() { + return; + } + self.cur_iter = None; } } fn rewind(&mut self) { - if self.iters.is_empty() { - return; - } if self.opt & ITERATOR_REVERSED == 0 { - self.set_idx(0); + self.accessor.seek_first(); } else { - self.set_idx(self.iters.len() - 1); + self.accessor.seek_last(); + } + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); + if self.cur_iter.is_none() { + return; } - self.iter_mut().rewind(); } fn seek(&mut self, key: &Bytes) { - use std::cmp::Ordering::*; - let idx; + self.cur_iter = None; if self.opt & ITERATOR_REVERSED == 0 { - idx = crate::util::search(self.tables.len(), |idx| { - COMPARATOR.compare_key(self.tables[idx].biggest(), key) != Less - }); - if idx >= self.tables.len() { - self.cur = None; + self.accessor.seek(key); + if !self.accessor.valid() { return; } + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); } else { - let n = self.tables.len(); - let ridx = crate::util::search(self.tables.len(), |idx| { - COMPARATOR.compare_key(self.tables[n - 1 - idx].smallest(), key) != Greater - }); - if ridx >= self.tables.len() { - self.cur = None; + self.accessor.seek_for_previous(key); + if !self.accessor.valid() { return; } - idx = n - 1 - ridx; + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); } - - self.set_idx(idx); self.iter_mut().seek(key); } @@ -126,7 +99,7 @@ impl AgateIterator for ConcatIterator { } fn valid(&self) -> bool { - if self.cur.is_some() { + if self.cur_iter.is_some() { self.iter_ref().valid() } else { false @@ -138,7 +111,7 @@ impl AgateIterator for ConcatIterator { mod tests { use super::*; - use crate::table::Table; + use crate::table::{Table, TableAccessor, VecTableAccessor}; use crate::{ format::{key_with_ts, user_key}, table::tests::{build_table_data, get_test_table_options}, @@ -173,7 +146,8 @@ mod tests { #[test] fn test_concat_iterator() { let (tables, cnt) = build_test_tables(); - let mut iter = ConcatIterator::from_tables(tables, 0); + let accessor = VecTableAccessor::create(tables); + let mut iter = ConcatIterator::from_tables(Box::new(accessor.new_iterator()), 0); iter.rewind(); diff --git a/src/table/table_accessor.rs b/src/table/table_accessor.rs new file mode 100644 index 00000000..1d8eca21 --- /dev/null +++ b/src/table/table_accessor.rs @@ -0,0 +1,87 @@ +use super::Table; +use bytes::Bytes; + +pub trait TableAccessorIterator { + fn seek(&mut self, key: &Bytes); + fn seek_for_previous(&mut self, key: &Bytes); + fn seek_first(&mut self); + fn seek_last(&mut self); + fn prev(&mut self); + fn next(&mut self); + fn table(&self) -> Option
; + fn valid(&self) -> bool; +} + +pub trait TableAccessor: Send + Sync { + type Iter: TableAccessorIterator; + + fn create(tables: Vec
) -> Self; + fn get(&self, key: &Bytes) -> Option
; + fn is_empty(&self) -> bool; + fn len(&self) -> usize; + fn total_size(&self) -> u64; + fn new_iterator(&self) -> Self::Iter; + fn replace_tables(&self, to_del: &[Table], to_add: &[Table]) -> Self; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::table::tests::build_table_data; + use crate::table::{BTreeTableAccessor, Table, VecTableAccessor}; + use crate::util::ComparableRecord; + use crate::TableOptions; + + fn test_table_accessor(accessor: A) { + let gap = 10; + let mut tables = vec![]; + for i in 1000..3000 { + let smallest = i * gap; + let largest = (i + 1) * gap; + let mut bopts = TableOptions::default(); + bopts.block_size = 1024; + bopts.table_capacity = 2048; + bopts.table_size = 4096; + let mut kvs = vec![]; + for j in smallest..largest { + if j % 2 == 0 { + kvs.push((Bytes::from(j.to_string()), Bytes::from(j.to_string()))); + } + } + let data = build_table_data(kvs, bopts.clone()); + let table = Table::open_in_memory(data, i, bopts).unwrap(); + tables.push(table); + } + let accessor = accessor.replace_tables(&[], &tables); + let mut iter = accessor.new_iterator(); + iter.seek(&Bytes::from(20009.to_string())); + let mut expected = 2001; + while iter.valid() { + let table = iter.table().unwrap(); + assert_eq!(expected, table.id()); + expected += 1; + iter.next(); + } + let mut iter = accessor.new_iterator(); + iter.seek_for_previous(&Bytes::from(20009.to_string())); + let mut expected = 2000; + while iter.valid() { + let table = iter.table().unwrap(); + assert_eq!(expected, table.id()); + expected -= 1; + iter.prev(); + } + } + + #[test] + fn test_vec_accessor() { + let accessor = VecTableAccessor::create(vec![]); + test_table_accessor(accessor); + } + + #[test] + fn test_btree_accessor() { + let accessor = BTreeTableAccessor::create(vec![]); + test_table_accessor(accessor); + } +} diff --git a/src/table/vec_table_acessor.rs b/src/table/vec_table_acessor.rs new file mode 100644 index 00000000..42ef8d55 --- /dev/null +++ b/src/table/vec_table_acessor.rs @@ -0,0 +1,164 @@ +use bytes::Bytes; +use std::cmp; +use std::collections::HashSet; +use std::sync::Arc; + +use super::table_accessor::{TableAccessor, TableAccessorIterator}; +use super::Table; + +use crate::util::ComparableRecord; + +pub struct VecTableAccessorInner { + tables: Vec
, + total_size: u64, +} + +pub struct VecTableAccessor { + inner: Arc, +} + +pub struct VecTableAccessorIterator { + inner: Arc, + cursor: Option, +} + +impl TableAccessorIterator for VecTableAccessorIterator { + fn seek(&mut self, key: &Bytes) { + let idx = crate::util::search(self.inner.tables.len(), |idx| { + self.inner.tables[idx].largest().cmp(&key) != cmp::Ordering::Less + }); + if idx >= self.inner.tables.len() { + self.cursor = None; + return; + } + self.cursor = Some(idx); + } + + fn seek_for_previous(&mut self, key: &Bytes) { + let n = self.inner.tables.len(); + let ridx = crate::util::search(self.inner.tables.len(), |idx| { + self.inner.tables[n - 1 - idx].smallest().cmp(&key) != cmp::Ordering::Greater + }); + if ridx >= self.inner.tables.len() { + self.cursor = None; + return; + } + self.cursor = Some(n - 1 - ridx); + } + + fn seek_first(&mut self) { + if self.inner.tables.len() > 0 { + self.cursor = Some(0); + } else { + self.cursor = None; + } + } + + fn seek_last(&mut self) { + if self.inner.tables.len() > 0 { + self.cursor = Some(self.inner.tables.len() - 1); + } else { + self.cursor = None; + } + } + + fn prev(&mut self) { + if let Some(cursor) = self.cursor.take() { + if cursor > 0 { + self.cursor = Some(cursor - 1); + } + } + } + + fn next(&mut self) { + if let Some(cursor) = self.cursor.take() { + if cursor + 1 < self.inner.tables.len() { + self.cursor = Some(cursor + 1); + } + } + } + + fn table(&self) -> Option
{ + self.cursor + .as_ref() + .map(|cursor| self.inner.tables[*cursor].clone()) + } + + fn valid(&self) -> bool { + self.cursor.is_some() + } +} + +impl TableAccessor for VecTableAccessor { + type Iter = VecTableAccessorIterator; + + fn create(mut tables: Vec
) -> Self { + let mut total_size = 0; + for table in &tables { + total_size += table.size(); + } + + tables.sort_by(|x, y| x.smallest().cmp(y.smallest())); + VecTableAccessor { + inner: Arc::new(VecTableAccessorInner { tables, total_size }), + } + } + + fn get(&self, key: &Bytes) -> Option
{ + let idx = crate::util::search(self.inner.tables.len(), |idx| { + self.inner.tables[idx].largest().cmp(&key) != cmp::Ordering::Less + }); + if idx >= self.inner.tables.len() { + None + } else { + Some(self.inner.tables[idx].clone()) + } + } + + fn is_empty(&self) -> bool { + self.inner.tables.is_empty() + } + + fn len(&self) -> usize { + self.inner.tables.len() + } + + fn total_size(&self) -> u64 { + self.inner.total_size + } + + fn new_iterator(&self) -> Self::Iter { + VecTableAccessorIterator { + inner: self.inner.clone(), + cursor: None, + } + } + + fn replace_tables(&self, to_del: &[Table], to_add: &[Table]) -> Self { + // TODO: handle deletion + let mut to_del_map = HashSet::new(); + for table in to_del { + to_del_map.insert(table.id()); + } + + let mut tables = Vec::with_capacity(self.inner.tables.len() + to_add.len()); + let mut total_size = self.inner.total_size; + + for table in &self.inner.tables { + if !to_del_map.contains(&table.id()) { + tables.push(table.clone()); + continue; + } + total_size = total_size.saturating_sub(table.size()); + } + + for table in to_add { + total_size += table.size(); + tables.push(table.clone()); + } + + tables.sort_by(|x, y| x.smallest().cmp(y.smallest())); + let inner = Arc::new(VecTableAccessorInner { tables, total_size }); + VecTableAccessor { inner } + } +} diff --git a/src/txn/watermark.rs b/src/txn/watermark.rs index 24505535..34b740a6 100644 --- a/src/txn/watermark.rs +++ b/src/txn/watermark.rs @@ -74,11 +74,11 @@ impl Core { } if until != done_until { - assert_eq!( - self.done_until - .compare_and_swap(done_until, until, Ordering::SeqCst), - done_until - ); + let ret = self + .done_until + .compare_exchange(done_until, until, Ordering::SeqCst, Ordering::SeqCst) + .unwrap_or_else(|x| x); + assert_eq!(ret, done_until); } if until - done_until <= waiters.len() as u64 { diff --git a/src/util/btree.rs b/src/util/btree.rs new file mode 100644 index 00000000..8b20f426 --- /dev/null +++ b/src/util/btree.rs @@ -0,0 +1,895 @@ +use bytes::Bytes; +use std::collections::HashSet; +use std::marker::PhantomData; +use std::sync::Arc; + +pub trait ComparableRecord: Clone { + fn smallest(&self) -> &Bytes; + fn largest(&self) -> &Bytes; + fn id(&self) -> u64; +} + +pub trait Page: Clone { + type Iter: PageIterator; + + fn new_iterator(self: &Arc) -> Self::Iter; + fn seek(&self, key: &Bytes) -> Option; + fn smallest(&self) -> &Bytes; + fn largest(&self) -> &Bytes; + fn split(&self) -> Vec>; + fn merge(&self, other: &Self) -> Arc; + fn size(&self) -> usize; + fn record_number(&self) -> usize; + fn insert(&mut self, records: Vec); + fn delete(&mut self, records: Vec); + fn max_page_size(&self) -> usize; + fn min_merge_size(&self) -> usize { + self.max_page_size() / 4 + } + fn split_page_size(&self) -> usize { + self.max_page_size() / 2 + } +} + +pub trait PageIterator: Clone { + fn seek(&mut self, key: &Bytes); + fn seek_for_previous(&mut self, key: &Bytes); + fn seek_to_first(&mut self); + fn seek_to_last(&mut self); + fn next(&mut self); + fn prev(&mut self); + fn idx(&self) -> usize; + fn valid(&self) -> bool; + fn size(&self) -> usize; + fn record(&self) -> Option; +} + +#[derive(Clone, Default)] +pub struct LeafNodeIterator { + page: Arc>, + cursor: usize, +} + +impl PageIterator for LeafNodeIterator { + fn seek(&mut self, key: &Bytes) { + if self.page.data.is_empty() { + self.cursor = 0; + return; + } + self.cursor = match self + .page + .data + .binary_search_by(|node| node.largest().cmp(key)) + { + Ok(idx) => idx, + Err(upper) => upper, + }; + } + + fn seek_for_previous(&mut self, key: &Bytes) { + if self.page.data.is_empty() { + self.cursor = 0; + return; + } + self.cursor = match self + .page + .data + .binary_search_by(|node| node.smallest().cmp(key)) + { + Ok(idx) => idx, + Err(upper) => { + if upper == 0 { + self.cursor = self.page.data.len(); + return; + } else { + upper - 1 + } + } + }; + } + + fn seek_to_first(&mut self) { + self.cursor = 0; + } + + fn seek_to_last(&mut self) { + self.cursor = self.page.data.len(); + if self.page.data.len() > 0 { + self.cursor -= 1; + } + } + + fn next(&mut self) { + self.cursor += 1; + } + + fn prev(&mut self) { + if self.cursor > 0 { + self.cursor -= 1; + } else { + self.cursor = self.page.data.len(); + } + } + + fn idx(&self) -> usize { + self.cursor + } + + fn valid(&self) -> bool { + self.cursor < self.page.data.len() + } + + fn size(&self) -> usize { + self.page.data.len() + } + + fn record(&self) -> Option { + if self.valid() { + Some(self.page.data[self.cursor].clone()) + } else { + None + } + } +} + +#[derive(Clone, Default)] +pub struct LeafPage { + data: Vec, + smallest: Bytes, + largest: Bytes, + max_page_size: usize, +} + +impl Page for LeafPage { + type Iter = LeafNodeIterator; + + fn new_iterator(self: &Arc) -> Self::Iter { + LeafNodeIterator:: { + page: self.clone(), + cursor: 0, + } + } + + fn seek(&self, key: &Bytes) -> Option { + if self.data.is_empty() { + return None; + } + + let idx = match self.data.binary_search_by(|node| node.largest().cmp(key)) { + Ok(idx) => idx, + Err(upper) => upper, + }; + if idx >= self.data.len() { + None + } else { + Some(self.data[idx].clone()) + } + } + + fn smallest(&self) -> &Bytes { + &self.smallest + } + + fn largest(&self) -> &Bytes { + &self.largest + } + + fn split(&self) -> Vec>> { + let split_count = (self.data.len() + self.split_page_size() - 1) / self.split_page_size(); + let split_size = self.data.len() / split_count; + let mut start_idx = 0; + let mut end_idx = split_size; + let mut nodes = vec![]; + while start_idx < self.data.len() { + let new_data = self.data[start_idx..end_idx].to_vec(); + let key = if start_idx == 0 { + self.smallest.clone() + } else { + self.data[start_idx].smallest().clone() + }; + nodes.push(Arc::new(Self { + data: new_data, + smallest: key, + largest: self.data[end_idx - 1].largest().clone(), + max_page_size: self.max_page_size, + })); + start_idx += split_size; + end_idx += split_size; + if end_idx > self.data.len() { + end_idx = self.data.len(); + } + } + nodes + } + + fn merge(&self, other: &LeafPage) -> Arc> { + let mut data = self.data.clone(); + for d in other.data.iter() { + data.push(d.clone()); + } + Arc::new(LeafPage { + data, + smallest: self.smallest.clone(), + largest: other.largest.clone(), + max_page_size: self.max_page_size, + }) + } + + fn size(&self) -> usize { + self.data.len() + } + + fn record_number(&self) -> usize { + self.data.len() + } + + fn insert(&mut self, mut tables: Vec) { + self.data.append(&mut tables); + self.data.sort_by(|a, b| a.smallest().cmp(b.smallest())); + self.largest = self.data.last().unwrap().largest().clone(); + } + + fn delete(&mut self, tables: Vec) { + let mut del_map = HashSet::with_capacity(tables.len()); + for t in tables { + del_map.insert(t.id()); + } + let mut new_idx = 0; + for cur in 0..self.data.len() { + if del_map.contains(&self.data[cur].id()) { + continue; + } + self.data[new_idx] = self.data[cur].clone(); + new_idx += 1; + } + self.data.truncate(new_idx); + } + + fn max_page_size(&self) -> usize { + self.max_page_size + } +} + +// We decide each range of sons by only `smallest`. It means that if the sons of one page is as +// following: +// son: [a, b], [d, f], [i, k], +// A new table [c,c] will be insert to the first page rather than the second page. +#[derive(Clone)] +pub struct BTreePage> { + son: Vec>, + smallest: Bytes, + largest: Bytes, + record_number: usize, + max_page_size: usize, + _phantom: PhantomData, +} + +#[derive(Clone)] +pub struct BTreePageIterator> { + page: Arc>, + cursor: usize, + iter: Option, +} + +impl PageIterator for BTreePageIterator +where + R: ComparableRecord, + P: Page, +{ + fn seek(&mut self, key: &Bytes) { + if self.page.son.is_empty() { + self.cursor = 0; + return; + } + self.cursor = match self + .page + .son + .binary_search_by(|node| node.smallest().cmp(key)) + { + Ok(idx) => idx, + Err(upper) => { + if upper > 0 { + if self.page.son[upper - 1].largest().ge(key) { + upper - 1 + } else { + upper + } + } else { + upper + } + } + }; + if self.cursor >= self.page.son.len() { + self.iter = None; + return; + } + let mut iter = self.page.son[self.cursor].new_iterator(); + iter.seek(key); + while !iter.valid() && self.cursor < self.page.son.len() { + self.cursor += 1; + iter = self.page.son[self.cursor].new_iterator(); + iter.seek(key); + } + if iter.valid() { + self.iter = Some(iter); + } else { + self.iter = None; + } + } + + fn seek_for_previous(&mut self, key: &Bytes) { + if self.page.son.is_empty() { + self.cursor = 0; + return; + } + self.cursor = match self + .page + .son + .binary_search_by(|node| node.smallest().cmp(key)) + { + Ok(idx) => idx, + Err(upper) => { + if upper > 0 { + upper - 1 + } else { + self.iter = None; + return; + } + } + }; + let mut iter = self.page.son[self.cursor].new_iterator(); + iter.seek_for_previous(key); + while !iter.valid() && self.cursor > 0 { + self.cursor -= 1; + iter = self.page.son[self.cursor].new_iterator(); + iter.seek_for_previous(key); + } + if iter.valid() { + self.iter = Some(iter); + } else { + self.iter = None; + } + } + + fn seek_to_first(&mut self) { + self.cursor = 0; + self.iter = None; + if !self.page.son.is_empty() { + let mut iter = self.page.son[self.cursor].new_iterator(); + iter.seek_to_first(); + while !iter.valid() && self.cursor + 1 < self.page.son.len() { + self.cursor += 1; + iter = self.page.son[self.cursor].new_iterator(); + iter.seek_to_first(); + } + if iter.valid() { + self.iter = Some(iter); + } + } + } + + fn seek_to_last(&mut self) { + self.cursor = self.page.son.len(); + self.iter = None; + while self.cursor > 0 { + self.cursor -= 1; + let mut iter = self.page.son[self.cursor].new_iterator(); + iter.seek_to_last(); + if iter.valid() { + self.iter = Some(iter); + return; + } + } + } + + fn next(&mut self) { + if let Some(iter) = self.iter.as_mut() { + iter.next(); + if iter.valid() { + return; + } + } + self.iter = None; + while self.cursor + 1 < self.page.son.len() { + self.cursor += 1; + let mut iter = self.page.son[self.cursor].new_iterator(); + iter.seek_to_first(); + if iter.valid() { + self.iter = Some(iter); + break; + } + } + } + + fn prev(&mut self) { + if let Some(iter) = self.iter.as_mut() { + iter.prev(); + if iter.valid() { + return; + } + } + self.iter = None; + while self.cursor > 0 { + self.cursor -= 1; + let mut iter = self.page.son[self.cursor].new_iterator(); + iter.seek_to_last(); + if iter.valid() { + self.iter = Some(iter); + break; + } + } + } + + fn idx(&self) -> usize { + self.cursor + } + + fn valid(&self) -> bool { + self.iter.as_ref().map_or(false, |iter| iter.valid()) + } + + fn size(&self) -> usize { + self.page.son.len() + } + + fn record(&self) -> Option { + if let Some(iter) = self.iter.as_ref() { + return iter.record(); + } + None + } +} + +impl Page for BTreePage +where + R: ComparableRecord, + P: Page, +{ + type Iter = BTreePageIterator; + + fn new_iterator(self: &Arc) -> Self::Iter { + BTreePageIterator:: { + page: self.clone(), + cursor: 0, + iter: None, + } + } + + fn seek(&self, key: &Bytes) -> Option { + if self.son.is_empty() { + return None; + } + match self.son.binary_search_by(|node| node.smallest().cmp(key)) { + Ok(idx) => self.son[idx].seek(key), + Err(upper) => { + if upper > 0 { + if self.son[upper - 1].largest().ge(key) { + self.son[upper - 1].seek(key) + } else if upper < self.son.len() { + self.son[upper].seek(key) + } else { + None + } + } else { + self.son[upper].seek(key) + } + } + } + } + + fn smallest(&self) -> &Bytes { + &self.smallest + } + fn largest(&self) -> &Bytes { + &self.largest + } + + fn split(&self) -> Vec> { + let split_count = (self.son.len() + self.split_page_size() - 1) / self.split_page_size(); + let split_size = self.son.len() / split_count; + let mut start_idx = 0; + let mut end_idx = split_size; + let mut nodes = vec![]; + while start_idx < self.son.len() { + let new_data = self.son[start_idx..end_idx].to_vec(); + let mut record_number = 0; + for page in &new_data { + record_number += page.record_number(); + } + let key = if start_idx == 0 { + self.smallest.clone() + } else { + self.son[start_idx].smallest().clone() + }; + nodes.push(Arc::new(BTreePage { + son: new_data, + smallest: key, + largest: self.son[end_idx - 1].largest().clone(), + max_page_size: self.max_page_size, + record_number, + _phantom: Default::default(), + })); + start_idx += split_size; + end_idx += split_size; + if end_idx > self.son.len() { + end_idx = self.son.len(); + } + } + nodes + } + + fn merge(&self, other: &Self) -> Arc { + let mut son = self.son.clone(); + for d in other.son.iter() { + son.push(d.clone()); + } + Arc::new(Self { + son, + smallest: self.smallest.clone(), + largest: other.largest.clone(), + record_number: self.record_number + other.record_number, + max_page_size: self.max_page_size, + _phantom: Default::default(), + }) + } + + fn size(&self) -> usize { + self.son.len() + } + + fn record_number(&self) -> usize { + self.record_number + } + + fn insert(&mut self, records: Vec) { + if records.is_empty() { + return; + } + let key = records.first().unwrap().smallest(); + let mut idx = match self.son.binary_search_by(|node| node.smallest().cmp(key)) { + Ok(idx) => idx, + Err(upper) => upper - 1, + }; + let mut cur_page = self.son[idx].as_ref().clone(); + let mut cur_records = Vec::with_capacity(records.len()); + let mut processed_count = records.len(); + for r in records { + if idx + 1 < self.son.len() && r.smallest().ge(self.son[idx + 1].smallest()) { + if !cur_records.is_empty() { + self.record_number -= cur_page.record_number(); + cur_page.insert(cur_records); + self.record_number += cur_page.record_number(); + cur_records = Vec::with_capacity(processed_count); + self.son[idx] = Arc::new(cur_page); + while idx + 1 < self.son.len() && r.smallest().ge(self.son[idx + 1].smallest()) + { + idx += 1; + } + cur_page = self.son[idx].as_ref().clone(); + } + } + cur_records.push(r); + processed_count -= 1; + } + if !cur_records.is_empty() { + self.record_number -= cur_page.record_number(); + cur_page.insert(cur_records); + self.record_number += cur_page.record_number(); + self.son[idx] = Arc::new(cur_page); + } + let mut idx = 0; + let mut unsorted = false; + let size = self.son.len(); + while idx < size { + if self.son[idx].size() > self.son[idx].max_page_size() { + let mut new_pages = self.son[idx].split(); + assert!(new_pages.len() > 1); + self.son.append(&mut new_pages); + let p = self.son.pop().unwrap(); + self.son[idx] = p; + unsorted = true; + } + idx += 1; + } + if unsorted { + self.son.sort_by(|a, b| a.smallest().cmp(b.smallest())); + if self.son.first().unwrap().smallest().cmp(self.smallest()) == std::cmp::Ordering::Less + { + self.smallest = self.son.first().unwrap().smallest().clone(); + } + self.largest = self.son.last().unwrap().largest().clone(); + } + } + + fn delete(&mut self, records: Vec) { + if records.is_empty() { + return; + } + let key = records.first().unwrap().smallest(); + let mut idx = match self.son.binary_search_by(|node| node.smallest().cmp(key)) { + Ok(idx) => idx, + Err(upper) => upper - 1, + }; + let mut cur_page = self.son[idx].as_ref().clone(); + let mut cur_records = Vec::with_capacity(records.len()); + let mut processed_count = records.len(); + for r in records { + if idx + 1 < self.son.len() && r.smallest().ge(self.son[idx + 1].smallest()) { + if !cur_records.is_empty() { + self.record_number -= cur_page.record_number(); + cur_page.delete(cur_records); + self.record_number += cur_page.record_number(); + cur_records = Vec::with_capacity(processed_count); + self.son[idx] = Arc::new(cur_page); + while idx + 1 < self.son.len() && r.smallest().ge(self.son[idx + 1].smallest()) + { + idx += 1; + } + cur_page = self.son[idx].as_ref().clone(); + } + } + cur_records.push(r); + processed_count -= 1; + } + if !cur_records.is_empty() { + self.record_number -= cur_page.record_number(); + cur_page.delete(cur_records); + self.record_number += cur_page.record_number(); + self.son[idx] = Arc::new(cur_page); + } + let mut new_idx = 1; + let mut cur_idx = 1; + let size = self.son.len(); + while cur_idx < size { + if self.son[new_idx - 1].size() + self.son[cur_idx].size() + < self.son[cur_idx].min_merge_size() + || self.son[new_idx - 1].record_number() == 0 + || self.son[cur_idx].record_number() == 0 + { + self.son[new_idx - 1] = self.son[new_idx - 1].merge(self.son[cur_idx].as_ref()); + cur_idx += 1; + } else { + self.son[new_idx] = self.son[cur_idx].clone(); + new_idx += 1; + cur_idx += 1; + } + } + if new_idx < self.son.len() { + self.son.truncate(new_idx); + } + self.largest = self.son.last().unwrap().largest().clone(); + } + + fn max_page_size(&self) -> usize { + self.max_page_size + } +} + +#[derive(Clone)] +pub struct BTree { + node: Arc>>>, +} + +impl BTree { + pub fn new(max_page_size: usize, leaf_max_page_size: usize) -> Self { + Self { + node: Arc::new(BTreePage::>> { + son: vec![Arc::new(BTreePage::> { + son: vec![Arc::new(LeafPage:: { + data: vec![], + smallest: Bytes::new(), + largest: Bytes::new(), + max_page_size: leaf_max_page_size, + })], + smallest: Bytes::new(), + largest: Bytes::new(), + record_number: 0, + max_page_size, + _phantom: Default::default(), + })], + largest: Bytes::new(), + smallest: Bytes::new(), + max_page_size: 32, + record_number: 0, + _phantom: Default::default(), + }), + } + } + + pub fn size(&self) -> usize { + self.node.record_number() + } + + pub fn get(&self, key: &Bytes) -> Option { + self.node.seek(key) + } + + pub fn replace(&self, mut to_del: Vec, mut to_add: Vec) -> Self { + let mut node = self.node.as_ref().clone(); + if !to_del.is_empty() { + to_del.sort_by(|a, b| a.smallest().cmp(b.smallest())); + node.delete(to_del); + } + if !to_add.is_empty() { + to_add.sort_by(|a, b| a.smallest().cmp(b.smallest())); + node.insert(to_add); + } + BTree { + node: Arc::new(node), + } + } + + pub fn new_iterator(&self) -> BTreePageIterator>> { + self.node.new_iterator() + } +} + +pub type BTreeIterator = BTreePageIterator>>; + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Clone)] + struct FakeTable { + id: u64, + smallest: Bytes, + largest: Bytes, + } + + impl ComparableRecord for FakeTable { + fn smallest(&self) -> &Bytes { + &self.smallest + } + + fn largest(&self) -> &Bytes { + &self.largest + } + + fn id(&self) -> u64 { + self.id + } + } + + fn update_page>( + page: &mut P, + left: u64, + right: u64, + gap: u64, + is_insert: bool, + ) { + let mut ops = vec![]; + for i in left..right { + let smallest = i * gap; + let largest = (i + 1) * gap - 1; + ops.push(FakeTable { + id: i, + smallest: Bytes::from(smallest.to_string()), + largest: Bytes::from(largest.to_string()), + }); + } + if is_insert { + page.insert(ops); + } else { + page.delete(ops); + } + } + + #[test] + fn test_leaf_page() { + let mut page = LeafPage { + data: vec![], + smallest: Default::default(), + largest: Default::default(), + max_page_size: 120, + }; + update_page(&mut page, 200, 300, 100, true); + let p = page.seek(&Bytes::from("0".to_string())); + assert_eq!(p.unwrap().id, 200); + assert_eq!(page.record_number(), 100); + assert_eq!(page.size(), 100); + update_page(&mut page, 100, 200, 100, true); + let p = page.seek(&Bytes::from("0".to_string())); + assert_eq!(p.unwrap().id, 100); + let p = page.seek(&Bytes::from("10099".to_string())); + assert_eq!(p.unwrap().id, 100); + let p = page.seek(&Bytes::from("29999".to_string())); + assert_eq!(p.unwrap().id, 299); + let p = page.seek(&Bytes::from("30000".to_string())); + assert!(p.is_none()); + + assert_eq!(page.record_number(), 200); + assert_eq!(page.size(), 200); + let pages = page.split(); + assert_eq!(pages.len(), 4); + assert_eq!(pages[0].size(), 50); + assert_eq!(pages[1].size(), 50); + assert_eq!(pages[2].size(), 50); + assert_eq!(pages[3].size(), 50); + let mut page2 = pages[2].as_ref().clone(); + let mut page3 = pages[3].as_ref().clone(); + update_page(&mut page2, 215, 250, 100, false); + update_page(&mut page3, 250, 290, 100, false); + let page = page2.merge(&page3); + assert_eq!(page.size(), 25); + let mut it = page.new_iterator(); + it.seek(&Bytes::from("250".to_string())); + assert_eq!(it.record().unwrap().id, 290); + let mut it = page.new_iterator(); + it.seek_for_previous(&Bytes::from("250".to_string())); + assert_eq!(it.record().unwrap().id, 214); + } + + fn insert_to_tree(tree: BTree, left: u64, right: u64, gap: u64) -> BTree { + let mut ops = vec![]; + for i in left..right { + let smallest = i * gap; + let largest = (i + 1) * gap - 1; + ops.push(FakeTable { + id: i, + smallest: Bytes::from(smallest.to_string()), + largest: Bytes::from(largest.to_string()), + }); + } + tree.replace(vec![], ops) + } + + fn delete_from_tree( + tree: BTree, + left: u64, + right: u64, + gap: u64, + ) -> BTree { + let mut ops = vec![]; + for i in left..right { + let smallest = i * gap; + let largest = (i + 1) * gap - 1; + ops.push(FakeTable { + id: i, + smallest: Bytes::from(smallest.to_string()), + largest: Bytes::from(largest.to_string()), + }); + } + tree.replace(ops, vec![]) + } + + #[test] + fn test_leveltree() { + let tree = BTree::::new(32, 64); + let tree = insert_to_tree(tree, 100, 228, 100); + let t = tree.get(&Bytes::from("20000")); + assert!(t.is_some()); + assert_eq!(t.unwrap().id, 200); + let t = tree.get(&Bytes::from("20099")); + assert!(t.is_some()); + assert_eq!(t.unwrap().id, 200); + + let tree = insert_to_tree(tree, 228, 100 + 640, 100); + assert_eq!(tree.node.son[0].record_number(), 640); + assert_eq!(tree.node.son[0].size(), 20); + let t = tree.get(&Bytes::from("69999")); + assert!(t.is_some()); + assert_eq!(t.unwrap().id, 699); + + let mut tree = delete_from_tree(tree, 100, 400, 100); + let t = tree.get(&Bytes::from("20000")); + assert!(t.is_some()); + assert_eq!(tree.node.son[0].size(), 11); + // 640 - 300 + assert_eq!(tree.node.son[0].record_number(), 340); + assert_eq!(t.unwrap().id, 400); + + let mut start = 1000; + while start < 3000 { + tree = insert_to_tree(tree, start, start + 400, 10); + start += 400; + } + + // 640 + 2000 - 300 + assert_eq!(tree.node.size(), 7); + assert_eq!(tree.node.record_number(), 2340); + assert_eq!(tree.node.son[0].size(), 12); + let t = tree.get(&Bytes::from("20000")); + assert!(t.is_some()); + } +} diff --git a/src/util.rs b/src/util/mod.rs similarity index 56% rename from src/util.rs rename to src/util/mod.rs index 017a49c6..8117b090 100644 --- a/src/util.rs +++ b/src/util/mod.rs @@ -1,4 +1,5 @@ -use crate::format::user_key; +mod btree; +pub use btree::{BTree, BTreeIterator, BTreePageIterator, ComparableRecord, Page, PageIterator}; use bytes::Bytes; pub use skiplist::FixedLengthSuffixComparator as Comparator; @@ -11,6 +12,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; use std::{cmp, ptr}; use std::{collections::hash_map::DefaultHasher, sync::atomic::AtomicBool}; +use crate::format::user_key; use crate::Result; pub static COMPARATOR: FixedLengthSuffixComparator = make_comparator(); @@ -53,7 +55,8 @@ pub fn bytes_diff<'a, 'b>(base: &'a [u8], target: &'b [u8]) -> &'b [u8] { } } -/// simple rewrite of golang sort.Search +/// simple rewrite of golang sort.Search. It will return the first one match `f`. If this +/// does not exsit, it will return the size of array. pub fn search(n: usize, mut f: F) -> usize where F: FnMut(usize) -> bool, @@ -115,3 +118,85 @@ pub fn panic_if_fail() { panic!("failed"); } } + +// TODO: use enum for this struct +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct KeyRange { + pub left: Bytes, + pub right: Bytes, + pub inf: bool, +} + +impl KeyRange { + pub fn is_empty(&self) -> bool { + if self.inf { + false + } else { + self.left.is_empty() && self.right.is_empty() + } + } + + pub fn inf() -> Self { + Self { + left: Bytes::new(), + right: Bytes::new(), + inf: true, + } + } + + pub fn new(left: Bytes, right: Bytes) -> Self { + Self { + left, + right, + inf: false, + } + } + + pub fn extend(&mut self, range: &Self) { + if self.is_empty() { + *self = range.clone(); + return; + } + if range.left.len() == 0 + || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Less + { + self.left = range.left.clone(); + } + if range.right.len() == 0 + || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Greater + { + self.right = range.right.clone(); + } + if range.inf { + self.inf = true; + self.left = Bytes::new(); + self.right = Bytes::new(); + } + } + + pub fn overlaps_with(&self, dst: &Self) -> bool { + if self.is_empty() { + return true; + } + if self.inf || dst.inf { + return true; + } + if COMPARATOR.compare_key(&self.left, &dst.right) == std::cmp::Ordering::Greater { + return false; + } + if COMPARATOR.compare_key(&self.right, &dst.left) == std::cmp::Ordering::Less { + return false; + } + true + } +} + +impl Default for KeyRange { + fn default() -> Self { + Self { + left: Bytes::new(), + right: Bytes::new(), + inf: false, + } + } +}