diff --git a/Cargo.toml b/Cargo.toml index 0d4d3f5f..15f24904 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,7 +19,7 @@ proto = { path = "proto" } skiplist = { path = "skiplist" } memmap = "0.7" farmhash = "1.1" -prost = "0.7" +prost = "0.8" enum_dispatch = "0.3" crossbeam-channel = "0.5" crc32fast = "1.2" diff --git a/proto/Cargo.toml b/proto/Cargo.toml index ccfee994..1f55de7e 100644 --- a/proto/Cargo.toml +++ b/proto/Cargo.toml @@ -6,7 +6,7 @@ edition = "2018" [dependencies] bytes = "1.0" -prost = "0.7" +prost = "0.8" [build-dependencies] -prost-build = { version = "0.6" } +prost-build = { version = "0.8" } diff --git a/src/db.rs b/src/db.rs index 7e3ec8c7..1338747e 100644 --- a/src/db.rs +++ b/src/db.rs @@ -626,14 +626,11 @@ impl Core { vlog.write(&mut requests)?; } - let mut cnt = 0; - // writing to LSM for req in requests { if req.entries.is_empty() { continue; } - cnt += req.entries.len(); while let Err(_) = self.ensure_room_for_write() { std::thread::sleep(std::time::Duration::from_millis(10)); @@ -643,8 +640,6 @@ impl Core { self.write_to_lsm(req)?; } - // eprintln!("{} entries written", cnt); - Ok(()) }; diff --git a/src/levels.rs b/src/levels.rs index 250e73fd..568c4cb4 100644 --- a/src/levels.rs +++ b/src/levels.rs @@ -4,10 +4,11 @@ mod handler; #[cfg(test)] pub(crate) mod tests; +use crate::table::VecTableAccessor; use compaction::{ - get_key_range, get_key_range_single, CompactDef, CompactStatus, CompactionPriority, KeyRange, - LevelCompactStatus, Targets, + get_key_range, CompactDef, CompactStatus, CompactionPriority, LevelCompactStatus, Targets, }; +use handler::HandlerBaseLevel; use handler::LevelHandler; use proto::meta::ManifestChangeSet; @@ -16,7 +17,7 @@ use crate::manifest::{new_create_change, new_delete_change, ManifestFile}; use crate::ops::oracle::Oracle; use crate::opt::build_table_options; use crate::table::{MergeIterator, TableIterators}; -use crate::util::{has_any_prefixes, same_key, KeyComparator, COMPARATOR}; +use crate::util::{has_any_prefixes, same_key, KeyComparator, KeyRange, COMPARATOR}; use crate::value::{Value, ValuePointer}; use crate::AgateIterator; use crate::TableBuilder; @@ -33,6 +34,7 @@ use std::sync::atomic::AtomicU64; use std::sync::{Arc, RwLock}; use std::time::Duration; +use crate::levels::handler::HandlerLevel0; use bytes::{BufMut, Bytes, BytesMut}; use crossbeam_channel::{select, tick, unbounded}; use yatp::task::callback::Handle; @@ -40,7 +42,7 @@ use yatp::task::callback::Handle; pub(crate) struct Core { next_file_id: AtomicU64, // `levels[i].level == i` should be ensured - pub(crate) levels: Vec>>, + pub(crate) levels: Vec>>>, opts: AgateOptions, // TODO: agate oracle, manifest should be added here cpt_status: RwLock, @@ -85,8 +87,15 @@ impl Core { eprintln!("{} tables opened", num_opened); for (i, tables) in tables.into_iter().enumerate() { - let mut level = LevelHandler::new(opts.clone(), i); - level.init_tables(tables); + let level: Box = if i == 0 { + Box::new(HandlerLevel0::new(tables, opts.clone(), i)) + } else { + Box::new(HandlerBaseLevel::::new( + tables, + opts.clone(), + i, + )) + }; levels.push(Arc::new(RwLock::new(level))); cpt_status_levels.push(LevelCompactStatus::default()); @@ -122,7 +131,10 @@ impl Core { let start = std::time::Instant::now(); let mut last_log = std::time::Instant::now(); - while !self.levels[0].write()?.try_add_l0_table(table.clone()) { + while !self.levels[0] + .write()? + .replace_tables(&[], &[table.clone()]) + { let current = std::time::Instant::now(); let duration = current.duration_since(start); if duration.as_millis() > 1000 { @@ -184,30 +196,17 @@ impl Core { )); } - if this_level.tables.is_empty() { + if this_level.num_tables() == 0 { return Err(Error::CustomError("not table in this level".to_string())); } if compact_def.drop_prefixes.is_empty() { - let mut out = vec![]; - let mut kr = KeyRange::default(); - for table in this_level.tables.iter() { - let dkr = get_key_range_single(table); - if kr.overlaps_with(&dkr) { - out.push(table.clone()); - kr.extend(&dkr); - } else { - break; - } - } - compact_def.top = out; + compact_def.top = this_level.overlapping_tables(&KeyRange::default()); } compact_def.this_range = get_key_range(&compact_def.top); - let (left, right) = next_level.overlapping_tables(&compact_def.this_range); - - compact_def.bot = next_level.tables[left..right].to_vec(); + compact_def.bot = next_level.overlapping_tables(&compact_def.this_range); if compact_def.bot.is_empty() { compact_def.next_range = compact_def.this_range.clone(); @@ -240,20 +239,8 @@ impl Core { // TODO: don't hold cpt_status through this function let mut cpt_status = self.cpt_status.write().unwrap(); - let mut out = vec![]; - // let now = std::time::Instant::now(); - - for table in this_level.tables.iter() { - if table.size() > 2 * compact_def.targets.file_size[0] { - // file already big, don't include it - continue; - } - // TODO: created at logic - if cpt_status.tables.contains(&table.id()) { - continue; - } - out.push(table.clone()); - } + let out = + this_level.pick_all_tables(2 * compact_def.targets.file_size[0], &cpt_status.tables); if out.len() < 4 { return Err(Error::CustomError("not enough table to merge".to_string())); @@ -262,12 +249,16 @@ impl Core { compact_def.this_range = KeyRange::inf(); compact_def.top = out; - cpt_status.levels[this_level.level] + cpt_status.levels[this_level.level()] .ranges .push(KeyRange::inf()); for table in compact_def.top.iter() { - assert!(cpt_status.tables.insert(table.id()), false); + assert!( + cpt_status.tables.insert(table.id()), + "insert to compaction table must success, but get {}", + false + ); } compact_def.targets.file_size[0] = std::u32::MAX as u64; @@ -283,12 +274,12 @@ impl Core { } fn fill_tables(&self, compact_def: &mut CompactDef) -> Result<()> { - let this_level = compact_def.this_level.read().unwrap(); - let next_level = compact_def.next_level.read().unwrap(); - - let tables = &this_level.tables; + let this_level = compact_def.this_level.clone(); + let next_level = compact_def.next_level.clone(); + let this_guard = this_level.read().unwrap(); + let next_guard = next_level.read().unwrap(); - if tables.is_empty() { + if this_guard.num_tables() == 0 { return Err(Error::CustomError("no tables to compact".to_string())); } @@ -296,49 +287,7 @@ impl Core { // TODO: don't hold cpt_status write lock for long time let mut cpt_status = self.cpt_status.write().unwrap(); - - for table in tables { - compact_def.this_size = table.size(); - compact_def.this_range = get_key_range_single(table); - // if we're already compacting this range, don't do anything - if cpt_status.overlaps_with(compact_def.this_level_id, &compact_def.this_range) { - continue; - } - compact_def.top = vec![table.clone()]; - let (left, right) = next_level.overlapping_tables(&compact_def.this_range); - - if right < left { - eprintln!("right {} is less than left {} in overlapping_tables for current level {}, next level {}, key_range {:?}", - right, left, compact_def.this_level_id, - compact_def.next_level_id, compact_def.this_range); - continue; - } - - compact_def.bot = next_level.tables[left..right].to_vec(); - - if compact_def.bot.is_empty() { - compact_def.bot = vec![]; - compact_def.next_range = compact_def.this_range.clone(); - if let Err(_) = cpt_status.compare_and_add(compact_def) { - continue; - } - return Ok(()); - } - - compact_def.next_range = get_key_range(&compact_def.bot); - - if cpt_status.overlaps_with(compact_def.next_level_id, &compact_def.next_range) { - continue; - } - - if let Err(_) = cpt_status.compare_and_add(compact_def) { - continue; - } - - return Ok(()); - } - - Err(Error::CustomError("no table to fill".to_string())) + this_guard.select_table_range(next_guard.as_ref(), compact_def, &mut *cpt_status) } fn run_compact_def( @@ -377,11 +326,11 @@ impl Core { let mut this_level = this_level.write().unwrap(); let mut next_level = next_level.write().unwrap(); this_level.delete_tables(&compact_def.top)?; - next_level.replace_tables(&compact_def.bot, &new_tables)?; + next_level.replace_tables(&compact_def.bot, &new_tables); } else { let mut this_level = this_level.write().unwrap(); this_level.delete_tables(&compact_def.top)?; - this_level.replace_tables(&compact_def.bot, &new_tables)?; + this_level.replace_tables(&compact_def.bot, &new_tables); } // TODO: logging @@ -463,7 +412,7 @@ impl Core { } // TODO: sync dir - new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.biggest(), y.biggest())); + new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.largest(), y.largest())); Ok(new_tables) } @@ -648,7 +597,7 @@ impl Core { } if i % N == N - 1 { - let biggest = table.biggest(); + let biggest = table.largest(); let mut buf = BytesMut::with_capacity(biggest.len() + 8); buf.put(user_key(&biggest)); let right = key_with_ts(buf, std::u64::MAX); @@ -680,7 +629,7 @@ impl Core { let next_level = self.levels[base_level].clone(); compact_def = CompactDef::new( idx, - self.levels[level].clone(), + self.levels[0].clone(), level, next_level, base_level, @@ -733,7 +682,7 @@ impl Core { file_size: vec![0; self.levels.len()], }; - let mut db_size = self.last_level().read().unwrap().total_size; + let mut db_size = self.last_level().read().unwrap().total_size(); for i in (1..self.levels.len()).rev() { let ltarget = adjust(db_size); @@ -760,7 +709,7 @@ impl Core { targets } - fn last_level(&self) -> &Arc> { + fn last_level(&self) -> &Arc>> { self.levels.last().unwrap() } @@ -796,8 +745,8 @@ impl Core { // We may safely ignore this situation. // TODO: check if we could make it more stable let size; - if del_size <= level.total_size { - size = level.total_size - del_size; + if del_size <= level.total_size() { + size = level.total_size() - del_size; } else { size = 0; } @@ -822,9 +771,9 @@ impl Core { continue; } let lvl = lh.read().unwrap(); - let (left, right) = lvl.overlapping_tables(&kr); + let ret = lvl.overlapping_tables(&kr); drop(lvl); - if right - left > 0 { + if !ret.is_empty() { return true; } } diff --git a/src/levels/compaction.rs b/src/levels/compaction.rs index 7c20b538..43e573ba 100644 --- a/src/levels/compaction.rs +++ b/src/levels/compaction.rs @@ -4,93 +4,11 @@ use std::sync::{Arc, RwLock}; use bytes::{Bytes, BytesMut}; -use super::LevelHandler; use crate::format::{key_with_ts, user_key}; -use crate::util::{KeyComparator, COMPARATOR}; +use crate::levels::handler::LevelHandler; +use crate::util::{KeyComparator, KeyRange, COMPARATOR}; use crate::{Error, Result, Table}; -// TODO: use enum for this struct -#[derive(PartialEq, Eq, Clone, Debug)] -pub struct KeyRange { - pub left: Bytes, - pub right: Bytes, - pub inf: bool, -} - -impl KeyRange { - pub fn is_empty(&self) -> bool { - if self.inf { - false - } else { - self.left.is_empty() && self.right.is_empty() - } - } - - pub fn inf() -> Self { - Self { - left: Bytes::new(), - right: Bytes::new(), - inf: true, - } - } - - pub fn new(left: Bytes, right: Bytes) -> Self { - Self { - left, - right, - inf: false, - } - } - - pub fn extend(&mut self, range: &Self) { - if self.is_empty() { - *self = range.clone(); - return; - } - if range.left.len() == 0 - || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Less - { - self.left = range.left.clone(); - } - if range.right.len() == 0 - || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Greater - { - self.right = range.right.clone(); - } - if range.inf { - self.inf = true; - self.left = Bytes::new(); - self.right = Bytes::new(); - } - } - - pub fn overlaps_with(&self, dst: &Self) -> bool { - if self.is_empty() { - return true; - } - if self.inf || dst.inf { - return true; - } - if COMPARATOR.compare_key(&self.left, &dst.right) == std::cmp::Ordering::Greater { - return false; - } - if COMPARATOR.compare_key(&self.right, &dst.left) == std::cmp::Ordering::Less { - return false; - } - true - } -} - -impl Default for KeyRange { - fn default() -> Self { - Self { - left: Bytes::new(), - right: Bytes::new(), - inf: false, - } - } -} - #[derive(Default)] pub struct LevelCompactStatus { pub ranges: Vec, @@ -124,8 +42,8 @@ pub struct CompactStatus { #[derive(Clone)] pub struct CompactDef { pub compactor_id: usize, - pub this_level: Arc>, - pub next_level: Arc>, + pub this_level: Arc>>, + pub next_level: Arc>>, pub this_level_id: usize, pub next_level_id: usize, pub targets: Targets, @@ -146,9 +64,9 @@ pub struct CompactDef { impl CompactDef { pub fn new( compactor_id: usize, - this_level: Arc>, + this_level: Arc>>, this_level_id: usize, - next_level: Arc>, + next_level: Arc>>, next_level_id: usize, prios: CompactionPriority, targets: Targets, @@ -285,14 +203,14 @@ pub fn get_key_range(tables: &[Table]) -> KeyRange { } let mut smallest = tables[0].smallest().clone(); - let mut biggest = tables[0].biggest().clone(); + let mut biggest = tables[0].largest().clone(); for i in 1..tables.len() { if COMPARATOR.compare_key(tables[i].smallest(), &smallest) == std::cmp::Ordering::Less { smallest = tables[i].smallest().clone(); } - if COMPARATOR.compare_key(tables[i].biggest(), &biggest) == std::cmp::Ordering::Greater { - biggest = tables[i].biggest().clone(); + if COMPARATOR.compare_key(tables[i].largest(), &biggest) == std::cmp::Ordering::Greater { + biggest = tables[i].largest().clone(); } } let mut smallest_buf = BytesMut::with_capacity(smallest.len() + 8); @@ -304,17 +222,3 @@ pub fn get_key_range(tables: &[Table]) -> KeyRange { key_with_ts(biggest_buf, 0), ); } - -pub fn get_key_range_single(table: &Table) -> KeyRange { - let smallest = table.smallest().clone(); - let biggest = table.biggest().clone(); - - let mut smallest_buf = BytesMut::with_capacity(smallest.len() + 8); - let mut biggest_buf = BytesMut::with_capacity(biggest.len() + 8); - smallest_buf.extend_from_slice(user_key(&smallest)); - biggest_buf.extend_from_slice(user_key(&biggest)); - return KeyRange::new( - key_with_ts(smallest_buf, std::u64::MAX), - key_with_ts(biggest_buf, 0), - ); -} diff --git a/src/levels/handler.rs b/src/levels/handler.rs index 2fafb3f4..d42339f9 100644 --- a/src/levels/handler.rs +++ b/src/levels/handler.rs @@ -1,5 +1,8 @@ +use std::collections::HashSet; +use std::sync::Arc; + +use crate::table::{get_key_range_single, TableAccessor, TableAccessorIterator}; use crate::value::Value; -use crate::Result; use crate::{format::user_key, get_ts, iterator_trait::AgateIterator}; use crate::{iterator::IteratorOptions, table::TableIterators}; use crate::{ @@ -7,84 +10,209 @@ use crate::{ util::{KeyComparator, COMPARATOR}, }; use crate::{AgateOptions, Table}; +use crate::{Error, Result}; use super::KeyRange; +use super::compaction::{get_key_range, CompactDef, CompactStatus}; +use crate::table::merge_iterator::Iterators; use bytes::Bytes; -use std::collections::HashSet; +pub trait LevelHandler: Send + Sync { + fn level(&self) -> usize; + fn num_tables(&self) -> usize; + fn total_size(&self) -> u64; + + fn get(&self, key: &Bytes) -> Result; + + fn append_iterators(&self, iters: &mut Vec, opts: &IteratorOptions); + fn delete_tables(&mut self, to_del: &[Table]) -> Result<()>; + fn overlapping_tables(&self, kr: &KeyRange) -> Vec; + fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> bool; + fn select_table_range( + &self, + other: &dyn LevelHandler, + compact_def: &mut CompactDef, + status: &mut CompactStatus, + ) -> Result<()>; + + fn pick_all_tables(&self, max_file_size: u64, exists: &HashSet) -> Vec
; +} -pub struct LevelHandler { +pub struct HandlerBaseLevel { opts: AgateOptions, - pub level: usize, - pub tables: Vec
, - pub total_size: u64, + level: usize, + table_acessor: Arc, } -impl Drop for LevelHandler { +impl Drop for HandlerBaseLevel { fn drop(&mut self) { - for table in self.tables.drain(..) { - // TODO: simply forget table instance would cause memory leak. Should find - // a better way to handle this. For example, `table.close_and_save()`, which - // consumes table instance without deleting the files. - table.mark_save(); + let mut iter = T::new_iterator(self.table_acessor.clone()); + iter.seek_first(); + while iter.valid() { + iter.table().unwrap().mark_save(); + iter.next(); } } } -impl LevelHandler { - pub fn new(opts: AgateOptions, level: usize) -> Self { +impl HandlerBaseLevel { + pub fn new(tables: Vec
, opts: AgateOptions, level: usize) -> Self { + let table_acessor = T::create(tables); Self { opts, level, - tables: vec![], - total_size: 0, + table_acessor, } } +} - pub fn try_add_l0_table(&mut self, table: Table) -> bool { - assert_eq!(self.level, 0); - if self.tables.len() >= self.opts.num_level_zero_tables_stall { - return false; +impl LevelHandler for HandlerBaseLevel { + fn level(&self) -> usize { + self.level + } + + fn num_tables(&self) -> usize { + self.table_acessor.len() + } + + fn total_size(&self) -> u64 { + self.table_acessor.total_size() + } + + fn get(&self, key: &Bytes) -> Result { + let key_no_ts = user_key(key); + let hash = farmhash::fingerprint32(key_no_ts); + let mut max_vs = Value::default(); + + if let Some(table) = self.table_acessor.get(key) { + if !table.does_not_have(hash) { + let mut it = table.new_iterator(0); + it.seek(key); + if it.valid() { + if crate::util::same_key(key, it.key()) { + let version = get_ts(it.key()); + max_vs = it.value(); + max_vs.version = version; + } + } + } } + Ok(max_vs) + } - self.total_size += table.size(); - self.tables.push(table); + fn append_iterators(&self, iters: &mut Vec, _: &IteratorOptions) { + if !self.table_acessor.is_empty() { + let acessor = self.table_acessor.clone(); + let iter = ConcatIterator::from_tables(Box::new(T::new_iterator(acessor)), 0); + iters.push(TableIterators::from(iter)); + } + } - true + fn delete_tables(&mut self, to_del: &[Table]) -> Result<()> { + let acessor = self.table_acessor.delete_tables(to_del); + self.table_acessor = acessor; + Ok(()) } - pub fn num_tables(&self) -> usize { - self.tables.len() + fn overlapping_tables(&self, kr: &KeyRange) -> Vec
{ + use std::cmp::Ordering::*; + + if kr.left.is_empty() || kr.right.is_empty() { + return vec![]; + } + let mut ret = vec![]; + let mut iter = T::new_iterator(self.table_acessor.clone()); + iter.seek(&kr.left); + while let Some(table) = iter.table() { + if COMPARATOR.compare_key(&kr.right, table.smallest()) == Less { + break; + } + ret.push(table); + iter.next(); + } + ret } - pub fn get_table_for_key(&self, key: &Bytes) -> Vec
{ - if self.level == 0 { - // for level 0, we need to iterate every table. + fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> bool { + let acessor = self.table_acessor.replace_tables(to_del, to_add); + self.table_acessor = acessor; + true + } - let mut out = self.tables.clone(); - out.reverse(); + fn select_table_range( + &self, + next_level: &dyn LevelHandler, + compact_def: &mut CompactDef, + status: &mut CompactStatus, + ) -> Result<()> { + let mut it = T::new_iterator(self.table_acessor.clone()); + it.seek_first(); + while let Some(table) = it.table() { + if select_table_range(&table, next_level, compact_def, status) { + return Ok(()); + } + it.next(); + } + Err(Error::CustomError("no table to fill".to_string())) + } - out - } else { - let idx = crate::util::search(self.tables.len(), |idx| { - COMPARATOR.compare_key(self.tables[idx].biggest(), key) != std::cmp::Ordering::Less - }); - if idx >= self.tables.len() { - vec![] - } else { - vec![self.tables[idx].clone()] + fn pick_all_tables(&self, max_file_size: u64, tables: &HashSet) -> Vec
{ + let mut ret = vec![]; + let mut it = T::new_iterator(self.table_acessor.clone()); + it.seek_first(); + while let Some(table) = it.table() { + if table.size() <= max_file_size && !tables.contains(&table.id()) { + ret.push(table); } + it.next(); + } + ret + } +} + +pub struct HandlerLevel0 { + tables: Vec
, + opts: AgateOptions, + level: usize, + total_size: u64, +} + +impl HandlerLevel0 { + pub fn new(mut tables: Vec
, opts: AgateOptions, level: usize) -> Self { + tables.sort_by(|x, y| x.id().cmp(&y.id())); + let mut total_size = 0; + for table in &tables { + total_size += table.size(); + } + Self { + opts, + level, + tables, + total_size, } } +} - pub fn get(&self, key: &Bytes) -> Result { - let tables = self.get_table_for_key(key); +impl LevelHandler for HandlerLevel0 { + fn level(&self) -> usize { + self.level + } + + fn num_tables(&self) -> usize { + self.tables.len() + } + + fn total_size(&self) -> u64 { + self.total_size + } + + fn get(&self, key: &Bytes) -> Result { let key_no_ts = user_key(key); let hash = farmhash::fingerprint32(key_no_ts); let mut max_vs = Value::default(); - for table in tables { + for table in self.tables.iter() { if table.does_not_have(hash) { continue; } @@ -104,33 +232,18 @@ impl LevelHandler { } } } - Ok(max_vs) } - pub fn overlapping_tables(&self, kr: &KeyRange) -> (usize, usize) { - use std::cmp::Ordering::*; - - if kr.left.is_empty() || kr.right.is_empty() { - return (0, 0); - } - let left = crate::util::search(self.tables.len(), |i| { - match COMPARATOR.compare_key(&kr.left, self.tables[i].biggest()) { - Less | Equal => true, - _ => false, - } - }); - let right = crate::util::search(self.tables.len(), |i| { - match COMPARATOR.compare_key(&kr.right, self.tables[i].smallest()) { - Less => true, - _ => false, + fn append_iterators(&self, iters: &mut Vec, opts: &IteratorOptions) { + for table in self.tables.iter().rev() { + if opts.pick_table(table) { + iters.push(TableIterators::from(table.new_iterator(0))); } - }); - (left, right) + } } - pub fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> Result<()> { - // TODO: handle deletion + fn delete_tables(&mut self, to_del: &[Table]) -> Result<()> { let mut to_del_map = HashSet::new(); for table in to_del { @@ -146,72 +259,101 @@ impl LevelHandler { } self.total_size = self.total_size.saturating_sub(table.size()); } - - for table in to_add { - self.total_size += table.size(); - new_tables.push(table.clone()); - } - - new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.smallest(), y.smallest())); - self.tables = new_tables; - Ok(()) } - pub fn delete_tables(&mut self, to_del: &[Table]) -> Result<()> { - let mut to_del_map = HashSet::new(); + fn overlapping_tables(&self, _: &KeyRange) -> Vec
{ + let mut out = vec![]; + let mut kr = KeyRange::default(); + for table in self.tables.iter() { + let dkr = get_key_range_single(table); + if kr.overlaps_with(&dkr) { + out.push(table.clone()); + kr.extend(&dkr); + } else { + break; + } + } + out + } - for table in to_del { - to_del_map.insert(table.id()); + fn replace_tables(&mut self, _: &[Table], to_add: &[Table]) -> bool { + assert_eq!(self.level, 0); + if self.tables.len() >= self.opts.num_level_zero_tables_stall { + return false; } - let mut new_tables = vec![]; + for t in to_add { + self.total_size += t.size(); + self.tables.push(t.clone()); + } + true + } - for table in &self.tables { - if !to_del_map.contains(&table.id()) { - new_tables.push(table.clone()); - continue; + fn select_table_range( + &self, + next_level: &dyn LevelHandler, + compact_def: &mut CompactDef, + status: &mut CompactStatus, + ) -> Result<()> { + for table in self.tables.iter() { + if select_table_range(&table, next_level, compact_def, status) { + return Ok(()); } - self.total_size = self.total_size.saturating_sub(table.size()); } - - self.tables = new_tables; - - Ok(()) + Err(Error::CustomError("no table to fill".to_string())) } - pub fn init_tables(&mut self, tables: Vec
) { - self.tables = tables; - self.total_size = 0; - for table in &self.tables { - self.total_size += table.size(); + fn pick_all_tables(&self, max_file_size: u64, tables: &HashSet) -> Vec
{ + let mut out = vec![]; + for table in self.tables.iter() { + if table.size() > max_file_size { + // file already big, don't include it + continue; + } + // TODO: created at logic + if tables.contains(&table.id()) { + continue; + } + out.push(table.clone()); } + out + } +} - if self.level == 0 { - self.tables.sort_by(|x, y| x.id().cmp(&y.id())); - } else { - self.tables - .sort_by(|x, y| COMPARATOR.compare_key(x.smallest(), y.smallest())); - } +fn select_table_range( + table: &Table, + next_level: &dyn LevelHandler, + compact_def: &mut CompactDef, + cpt_status: &mut CompactStatus, +) -> bool { + compact_def.this_size = table.size(); + compact_def.this_range = get_key_range_single(table); + // if we're already compacting this range, don't do anything + if cpt_status.overlaps_with(compact_def.this_level_id, &compact_def.this_range) { + return false; } + compact_def.top = vec![table.clone()]; + compact_def.bot = next_level.overlapping_tables(&compact_def.this_range); - pub(crate) fn append_iterators(&self, iters: &mut Vec, opts: &IteratorOptions) { - if self.level == 0 { - for table in self.tables.iter().rev() { - if opts.pick_table(table) { - iters.push(TableIterators::from(table.new_iterator(0))); - } - } - return; + if compact_def.bot.is_empty() { + compact_def.bot = vec![]; + compact_def.next_range = compact_def.this_range.clone(); + if let Err(_) = cpt_status.compare_and_add(compact_def) { + return false; } + return true; + } - let mut tables = self.tables.clone(); - opts.pick_tables(&mut tables); - if !tables.is_empty() { - let iter = ConcatIterator::from_tables(tables, 0); - iters.push(TableIterators::from(iter)); - } - return; + compact_def.next_range = get_key_range(&compact_def.bot); + + if cpt_status.overlaps_with(compact_def.next_level_id, &compact_def.next_range) { + return false; + } + + if let Err(_) = cpt_status.compare_and_add(compact_def) { + return false; } + return true; } diff --git a/src/levels/tests.rs b/src/levels/tests.rs index 9a543b27..9f927b36 100644 --- a/src/levels/tests.rs +++ b/src/levels/tests.rs @@ -1,16 +1,19 @@ use super::*; use crate::{db::tests::with_agate_test, table::new_filename, Agate, TableOptions}; +#[cfg(test)] pub fn helper_dump_levels(lvctl: &LevelsController) { for level in &lvctl.core.levels { let level = level.read().unwrap(); - eprintln!("--- Level {} ---", level.level); - for table in &level.tables { + eprintln!("--- Level {} ---", level.level()); + let exists = HashSet::new(); + let tables = level.pick_all_tables(u64::MAX, &exists); + for table in tables { eprintln!( "#{} ({:?} - {:?}, {})", table.id(), table.smallest(), - table.biggest(), + table.largest(), table.size() ); } @@ -72,7 +75,7 @@ fn create_and_open(agate: &mut Agate, td: Vec, level: usize) { .add_changes(vec![new_create_change(table.id(), level, 0)]) .unwrap(); let mut lv = agate.core.lvctl.core.levels[level].write().unwrap(); - lv.tables.push(table); + lv.replace_tables(&[], &[table]); } mod overlap { @@ -89,13 +92,11 @@ mod overlap { let l0_tables = agate.core.lvctl.core.levels[0] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); let l1_tables = agate.core.lvctl.core.levels[1] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); // lv0 should overlap with lv0 tables assert!(agate.core.lvctl.core.check_overlap(&l0_tables, 0)); @@ -124,13 +125,11 @@ mod overlap { let l0_tables = agate.core.lvctl.core.levels[0] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); let l1_tables = agate.core.lvctl.core.levels[1] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); // lv0 should overlap with lv0 tables assert!(agate.core.lvctl.core.check_overlap(&l0_tables, 0)); @@ -158,8 +157,7 @@ mod overlap { let l0_tables = agate.core.lvctl.core.levels[0] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &HashSet::default()); // lv1 should not overlap with lv0 tables assert!(!agate.core.lvctl.core.check_overlap(&l0_tables, 1)); @@ -222,6 +220,7 @@ fn generate_test_compect_def( adjusted: 0.0, drop_prefixes: vec![], }; + let exists = HashSet::new(); let mut compact_def = CompactDef::new( 0, agate.core.lvctl.core.levels[this_level_id].clone(), @@ -234,13 +233,11 @@ fn generate_test_compect_def( compact_def.top = agate.core.lvctl.core.levels[this_level_id] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &exists); compact_def.bot = agate.core.lvctl.core.levels[next_level_id] .read() .unwrap() - .tables - .clone(); + .pick_all_tables(u64::MAX, &exists); compact_def.targets.base_level = next_level_id; compact_def } diff --git a/src/table.rs b/src/table.rs index f669d121..761bbc06 100644 --- a/src/table.rs +++ b/src/table.rs @@ -5,12 +5,14 @@ pub mod merge_iterator; use crate::bloom::Bloom; use crate::checksum; +use crate::format::{key_with_ts, user_key}; use crate::iterator_trait::AgateIterator; use crate::opt::{ChecksumVerificationMode, Options}; +use crate::util::KeyRange; use crate::Error; use crate::Result; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use iterator::TableRefIterator; use memmap::{Mmap, MmapOptions}; use prost::Message; @@ -25,8 +27,12 @@ pub use iterator::{ITERATOR_NOCACHE, ITERATOR_REVERSED}; pub use merge_iterator::{Iterators as TableIterators, MergeIterator}; pub type TableIterator = TableRefIterator>; +mod table_accessor; #[cfg(test)] mod tests; +mod vec_table_acessor; +pub use table_accessor::{TableAccessor, TableAccessorIterator}; +pub use vec_table_acessor::{VecTableAccessor, VecTableAccessorIterator}; /// MmapFile stores SST data. `File` refers to a file on disk, /// and `Memory` refers to data in memory. @@ -587,8 +593,7 @@ impl Table { pub fn id(&self) -> u64 { self.inner.id() } - - pub fn biggest(&self) -> &Bytes { + pub fn largest(&self) -> &Bytes { self.inner.biggest() } @@ -614,3 +619,17 @@ fn id_to_filename(id: u64) -> String { pub fn new_filename>(id: u64, dir: P) -> PathBuf { dir.as_ref().join(id_to_filename(id)) } + +pub fn get_key_range_single(table: &Table) -> KeyRange { + let smallest = table.smallest().clone(); + let biggest = table.largest().clone(); + + let mut smallest_buf = BytesMut::with_capacity(smallest.len() + 8); + let mut biggest_buf = BytesMut::with_capacity(biggest.len() + 8); + smallest_buf.extend_from_slice(user_key(&smallest)); + biggest_buf.extend_from_slice(user_key(&biggest)); + return KeyRange::new( + key_with_ts(smallest_buf, std::u64::MAX), + key_with_ts(biggest_buf, 0), + ); +} diff --git a/src/table/concat_iterator.rs b/src/table/concat_iterator.rs index 309e6da4..4fdd7ec0 100644 --- a/src/table/concat_iterator.rs +++ b/src/table/concat_iterator.rs @@ -1,119 +1,92 @@ use super::iterator::ITERATOR_REVERSED; -use super::{AgateIterator, Table, TableIterator}; -use crate::util::{KeyComparator, COMPARATOR}; +use super::{AgateIterator, TableIterator}; +use crate::table::table_accessor::TableAccessorIterator; use crate::value::Value; use bytes::Bytes; /// ConcatIterator iterates on SSTs with no overlap keys. pub struct ConcatIterator { - cur: Option, - iters: Vec>, - tables: Vec
, + cur_iter: Option, + accessor: Box, opt: usize, } impl ConcatIterator { /// Create `ConcatIterator` from a list of tables. Tables must have been sorted /// and have no overlap keys. - pub fn from_tables(tables: Vec
, opt: usize) -> Self { - let iters = tables.iter().map(|_| None).collect(); - + pub fn from_tables(accessor: Box, opt: usize) -> Self { ConcatIterator { - cur: None, - iters, - tables, + cur_iter: None, + accessor, opt, } } - fn set_idx(&mut self, idx: usize) { - if idx >= self.iters.len() { - self.cur = None; - return; - } - if self.iters[idx].is_none() { - self.iters[idx] = Some(self.tables[idx].new_iterator(self.opt)); - } - self.cur = Some(idx); - } - fn iter_mut(&mut self) -> &mut TableIterator { - self.iters[self.cur.unwrap()].as_mut().unwrap() + self.cur_iter.as_mut().unwrap() } fn iter_ref(&self) -> &TableIterator { - self.iters[self.cur.unwrap()].as_ref().unwrap() + self.cur_iter.as_ref().unwrap() } } impl AgateIterator for ConcatIterator { fn next(&mut self) { - let cur = self.cur.unwrap(); let cur_iter = self.iter_mut(); cur_iter.next(); if cur_iter.valid() { return; } drop(cur_iter); + self.cur_iter = None; loop { if self.opt & ITERATOR_REVERSED == 0 { - self.set_idx(cur + 1); + self.accessor.next(); } else { - if cur == 0 { - self.cur = None; - } else { - self.set_idx(cur - 1); - } + self.accessor.prev(); } - if self.cur.is_some() { - self.iter_mut().rewind(); - if self.iter_ref().valid() { - return; - } - } else { + if !self.accessor.valid() { return; } + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); + self.iter_mut().rewind(); + if self.iter_ref().valid() { + return; + } + self.cur_iter = None; } } fn rewind(&mut self) { - if self.iters.is_empty() { - return; - } if self.opt & ITERATOR_REVERSED == 0 { - self.set_idx(0); + self.accessor.seek_first(); } else { - self.set_idx(self.iters.len() - 1); + self.accessor.seek_last(); + } + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); + if self.cur_iter.is_none() { + return; } - self.iter_mut().rewind(); } fn seek(&mut self, key: &Bytes) { - use std::cmp::Ordering::*; - let idx; + self.cur_iter = None; if self.opt & ITERATOR_REVERSED == 0 { - idx = crate::util::search(self.tables.len(), |idx| { - COMPARATOR.compare_key(self.tables[idx].biggest(), key) != Less - }); - if idx >= self.tables.len() { - self.cur = None; + self.accessor.seek(key); + if !self.accessor.valid() { return; } + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); } else { - let n = self.tables.len(); - let ridx = crate::util::search(self.tables.len(), |idx| { - COMPARATOR.compare_key(self.tables[n - 1 - idx].smallest(), key) != Greater - }); - if ridx >= self.tables.len() { - self.cur = None; + self.accessor.seek_for_previous(key); + if !self.accessor.valid() { return; } - idx = n - 1 - ridx; + self.cur_iter = self.accessor.table().map(|t| t.new_iterator(self.opt)); } - - self.set_idx(idx); self.iter_mut().seek(key); } @@ -126,7 +99,7 @@ impl AgateIterator for ConcatIterator { } fn valid(&self) -> bool { - if self.cur.is_some() { + if self.cur_iter.is_some() { self.iter_ref().valid() } else { false @@ -138,7 +111,7 @@ impl AgateIterator for ConcatIterator { mod tests { use super::*; - use crate::table::Table; + use crate::table::{Table, TableAccessor, VecTableAccessor}; use crate::{ format::{key_with_ts, user_key}, table::tests::{build_table_data, get_test_table_options}, @@ -173,7 +146,9 @@ mod tests { #[test] fn test_concat_iterator() { let (tables, cnt) = build_test_tables(); - let mut iter = ConcatIterator::from_tables(tables, 0); + let accessor = VecTableAccessor::create(tables); + let mut iter = + ConcatIterator::from_tables(Box::new(VecTableAccessor::new_iterator(accessor)), 0); iter.rewind(); diff --git a/src/table/table_accessor.rs b/src/table/table_accessor.rs new file mode 100644 index 00000000..4647011e --- /dev/null +++ b/src/table/table_accessor.rs @@ -0,0 +1,27 @@ +use super::Table; +use bytes::Bytes; +use std::sync::Arc; + +pub trait TableAccessorIterator { + fn seek(&mut self, key: &Bytes); + fn seek_for_previous(&mut self, key: &Bytes); + fn seek_first(&mut self); + fn seek_last(&mut self); + fn prev(&mut self); + fn next(&mut self); + fn table(&self) -> Option
; + fn valid(&self) -> bool; +} + +pub trait TableAccessor: Send + Sync { + type Iter: TableAccessorIterator; + + fn create(tables: Vec
) -> Arc; + fn get(&self, key: &Bytes) -> Option
; + fn is_empty(&self) -> bool; + fn len(&self) -> usize; + fn total_size(&self) -> u64; + fn new_iterator(acessor: Arc) -> Self::Iter; + fn replace_tables(&self, to_del: &[Table], to_add: &[Table]) -> Arc; + fn delete_tables(&self, to_del: &[Table]) -> Arc; +} diff --git a/src/table/vec_table_acessor.rs b/src/table/vec_table_acessor.rs new file mode 100644 index 00000000..969b1cef --- /dev/null +++ b/src/table/vec_table_acessor.rs @@ -0,0 +1,179 @@ +use bytes::Bytes; +use std::cmp; +use std::collections::HashSet; +use std::sync::Arc; + +use super::table_accessor::{TableAccessor, TableAccessorIterator}; +use super::Table; + +use crate::util::{KeyComparator, COMPARATOR}; + +pub struct VecTableAccessor { + tables: Vec
, + total_size: u64, +} + +pub struct VecTableAccessorIterator { + inner: Arc, + cursor: Option, +} + +impl TableAccessorIterator for VecTableAccessorIterator { + fn seek(&mut self, key: &Bytes) { + let idx = crate::util::search(self.inner.tables.len(), |idx| { + COMPARATOR.compare_key(self.inner.tables[idx].largest(), key) != cmp::Ordering::Less + }); + if idx >= self.inner.tables.len() { + self.cursor = None; + return; + } + self.cursor = Some(idx); + } + + fn seek_for_previous(&mut self, key: &Bytes) { + let n = self.inner.tables.len(); + let ridx = crate::util::search(self.inner.tables.len(), |idx| { + COMPARATOR.compare_key(self.inner.tables[n - 1 - idx].smallest(), key) + != cmp::Ordering::Greater + }); + if ridx >= self.inner.tables.len() { + self.cursor = None; + return; + } + self.cursor = Some(n - 1 - ridx); + } + + fn seek_first(&mut self) { + if self.inner.tables.len() > 0 { + self.cursor = Some(0); + } else { + self.cursor = None; + } + } + + fn seek_last(&mut self) { + if self.inner.tables.len() > 0 { + self.cursor = Some(self.inner.tables.len() - 1); + } else { + self.cursor = None; + } + } + + fn prev(&mut self) { + if let Some(cursor) = self.cursor.take() { + if cursor > 0 { + self.cursor = Some(cursor - 1); + } + } + } + + fn next(&mut self) { + if let Some(cursor) = self.cursor.take() { + if cursor + 1 < self.inner.tables.len() { + self.cursor = Some(cursor + 1); + } + } + } + + fn table(&self) -> Option
{ + self.cursor + .as_ref() + .map(|cursor| self.inner.tables[*cursor].clone()) + } + + fn valid(&self) -> bool { + self.cursor.is_some() + } +} + +impl TableAccessor for VecTableAccessor { + type Iter = VecTableAccessorIterator; + + fn create(mut tables: Vec
) -> Arc { + let mut total_size = 0; + for table in &tables { + total_size += table.size(); + } + + tables.sort_by(|x, y| COMPARATOR.compare_key(x.smallest(), y.smallest())); + Arc::new(VecTableAccessor { tables, total_size }) + } + + fn get(&self, key: &Bytes) -> Option
{ + let idx = crate::util::search(self.tables.len(), |idx| { + COMPARATOR.compare_key(self.tables[idx].largest(), key) != cmp::Ordering::Less + }); + if idx >= self.tables.len() { + None + } else { + Some(self.tables[idx].clone()) + } + } + + fn is_empty(&self) -> bool { + self.tables.is_empty() + } + + fn len(&self) -> usize { + self.tables.len() + } + + fn total_size(&self) -> u64 { + self.total_size + } + + fn new_iterator(inner: Arc) -> Self::Iter { + VecTableAccessorIterator { + inner, + cursor: None, + } + } + + fn replace_tables(&self, to_del: &[Table], to_add: &[Table]) -> Arc { + // TODO: handle deletion + let mut to_del_map = HashSet::new(); + for table in to_del { + to_del_map.insert(table.id()); + } + + let mut tables = Vec::with_capacity(self.tables.len() + to_add.len()); + let mut total_size = self.total_size; + + for table in &self.tables { + if !to_del_map.contains(&table.id()) { + tables.push(table.clone()); + continue; + } + total_size = total_size.saturating_sub(table.size()); + } + + for table in to_add { + total_size += table.size(); + tables.push(table.clone()); + } + + tables.sort_by(|x, y| COMPARATOR.compare_key(x.smallest(), y.smallest())); + Arc::new(VecTableAccessor { tables, total_size }) + } + + fn delete_tables(&self, to_del: &[Table]) -> Arc { + let mut to_del_map = HashSet::new(); + + for table in to_del { + to_del_map.insert(table.id()); + } + + let mut tables = Vec::with_capacity(self.tables.len()); + let mut total_size = self.total_size; + + for table in &self.tables { + if !to_del_map.contains(&table.id()) { + tables.push(table.clone()); + continue; + } + total_size = total_size.saturating_sub(table.size()); + } + + Arc::new(VecTableAccessor { tables, total_size }) + } +} diff --git a/src/txn/watermark.rs b/src/txn/watermark.rs index 24505535..34b740a6 100644 --- a/src/txn/watermark.rs +++ b/src/txn/watermark.rs @@ -74,11 +74,11 @@ impl Core { } if until != done_until { - assert_eq!( - self.done_until - .compare_and_swap(done_until, until, Ordering::SeqCst), - done_until - ); + let ret = self + .done_until + .compare_exchange(done_until, until, Ordering::SeqCst, Ordering::SeqCst) + .unwrap_or_else(|x| x); + assert_eq!(ret, done_until); } if until - done_until <= waiters.len() as u64 { diff --git a/src/util.rs b/src/util.rs index 017a49c6..d8f4ae40 100644 --- a/src/util.rs +++ b/src/util.rs @@ -53,7 +53,8 @@ pub fn bytes_diff<'a, 'b>(base: &'a [u8], target: &'b [u8]) -> &'b [u8] { } } -/// simple rewrite of golang sort.Search +/// simple rewrite of golang sort.Search. It will return the first one match `f`. If this +/// does not exsit, it will return the size of array. pub fn search(n: usize, mut f: F) -> usize where F: FnMut(usize) -> bool, @@ -115,3 +116,85 @@ pub fn panic_if_fail() { panic!("failed"); } } + +// TODO: use enum for this struct +#[derive(PartialEq, Eq, Clone, Debug)] +pub struct KeyRange { + pub left: Bytes, + pub right: Bytes, + pub inf: bool, +} + +impl KeyRange { + pub fn is_empty(&self) -> bool { + if self.inf { + false + } else { + self.left.is_empty() && self.right.is_empty() + } + } + + pub fn inf() -> Self { + Self { + left: Bytes::new(), + right: Bytes::new(), + inf: true, + } + } + + pub fn new(left: Bytes, right: Bytes) -> Self { + Self { + left, + right, + inf: false, + } + } + + pub fn extend(&mut self, range: &Self) { + if self.is_empty() { + *self = range.clone(); + return; + } + if range.left.len() == 0 + || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Less + { + self.left = range.left.clone(); + } + if range.right.len() == 0 + || COMPARATOR.compare_key(&range.left, &self.left) == std::cmp::Ordering::Greater + { + self.right = range.right.clone(); + } + if range.inf { + self.inf = true; + self.left = Bytes::new(); + self.right = Bytes::new(); + } + } + + pub fn overlaps_with(&self, dst: &Self) -> bool { + if self.is_empty() { + return true; + } + if self.inf || dst.inf { + return true; + } + if COMPARATOR.compare_key(&self.left, &dst.right) == std::cmp::Ordering::Greater { + return false; + } + if COMPARATOR.compare_key(&self.right, &dst.left) == std::cmp::Ordering::Less { + return false; + } + true + } +} + +impl Default for KeyRange { + fn default() -> Self { + Self { + left: Bytes::new(), + right: Bytes::new(), + inf: false, + } + } +}