From ce8b8934b44232f4867a9d172fb7dc7cf49c6ded Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Mon, 22 Aug 2022 19:15:01 +0800 Subject: [PATCH 01/11] feat: ingest external SST files Signed-off-by: wangnengjie <751614701@qq.com> --- proto/src/proto/meta.proto | 1 + src/db.rs | 21 ++- src/format.rs | 14 +- src/ingest.rs | 263 +++++++++++++++++++++++++++++++++++++ src/levels.rs | 122 ++++++++++++++++- src/levels/compaction.rs | 27 +++- src/levels/handler.rs | 4 +- src/levels/tests.rs | 7 +- src/lib.rs | 1 + src/manifest.rs | 22 +++- src/ops/oracle.rs | 64 +++++++-- src/ops/transaction.rs | 30 +++-- src/table.rs | 66 ++++++++-- src/table/iterator.rs | 39 ++++-- 14 files changed, 612 insertions(+), 69 deletions(-) create mode 100644 src/ingest.rs diff --git a/proto/src/proto/meta.proto b/proto/src/proto/meta.proto index 010c0420..00d8ad26 100644 --- a/proto/src/proto/meta.proto +++ b/proto/src/proto/meta.proto @@ -44,6 +44,7 @@ message ManifestChange { uint64 key_id = 4; EncryptionAlgo encryption_algo = 5; uint32 compression = 6; // Only used for CREATE Op. + uint64 global_version = 7; // Only used for file ingest, 0 means no global_version } message BlockOffset { diff --git a/src/db.rs b/src/db.rs index c1c04a65..8a7c42d8 100644 --- a/src/db.rs +++ b/src/db.rs @@ -18,18 +18,18 @@ pub use opt::AgateOptions; use skiplist::Skiplist; use yatp::task::callback::Handle; -use crate::value::ValuePointer; use crate::{ closer::Closer, entry::Entry, get_ts, + ingest::{IngestExternalFileOptions, IngestExternalFileTask}, levels::LevelsController, manifest::ManifestFile, memtable::{MemTable, MemTables}, ops::oracle::Oracle, opt::build_table_options, util::{has_any_prefixes, make_comparator}, - value::{self, Request, Value}, + value::{self, Request, Value, ValuePointer}, value_log::ValueLog, wal::Wal, Error, Result, Table, TableBuilder, TableOptions, @@ -143,6 +143,14 @@ impl Agate { pub fn write_requests(&self, request: Vec) -> Result<()> { self.core.write_requests(request) } + + pub fn ingest_external_files( + &self, + files: &[&str], + opts: &IngestExternalFileOptions, + ) -> Result<()> { + self.core.ingest_external_files(files, opts) + } } impl Drop for Agate { @@ -650,6 +658,15 @@ impl Core { unreachable!() } + + pub fn ingest_external_files( + self: &Arc, + files: &[&str], + opts: &IngestExternalFileOptions, + ) -> Result<()> { + let mut task = IngestExternalFileTask::new(self.clone(), files, opts.clone()); + task.run() + } } #[cfg(test)] diff --git a/src/format.rs b/src/format.rs index 9ef39b89..56a55636 100644 --- a/src/format.rs +++ b/src/format.rs @@ -18,18 +18,8 @@ pub fn key_with_ts_last(key: impl Into) -> Bytes { key_with_ts(key, 0) } -pub fn append_ts(key: &mut BytesMut, ts: u64) { - key.reserve(8); - let res = (u64::MAX - ts).to_be(); - let buf = key.chunk_mut(); - unsafe { - ptr::copy_nonoverlapping( - &res as *const u64 as *const u8, - buf.as_mut_ptr() as *mut _, - 8, - ); - key.advance_mut(8); - } +pub fn append_ts(mut key: impl BufMut, ts: u64) { + key.put_u64(u64::MAX - ts); } pub fn get_ts(key: &[u8]) -> u64 { diff --git a/src/ingest.rs b/src/ingest.rs new file mode 100644 index 00000000..fa545df8 --- /dev/null +++ b/src/ingest.rs @@ -0,0 +1,263 @@ +use std::{cmp::Ordering, fs, io, ops::RangeInclusive, path::Path, sync::Arc}; + +use bytes::Bytes; +use log::warn; +use skiplist::KeyComparator; + +use crate::{ + db::Core, error::Result, format::user_key, opt::build_table_options, table, util::COMPARATOR, + ChecksumVerificationMode, Table, +}; + +#[derive(Debug, Clone, Copy)] +pub enum PickLevelStrategy { + BaseLevel, + BottomLevel, +} + +#[derive(Debug)] +pub struct IngestExternalFileOptions { + pub commit_ts: u64, + pub move_files: bool, + pub failed_move_fall_back_to_copy: bool, + pub verify_checksum: bool, + pub pick_level_strategy: PickLevelStrategy, +} + +impl Default for IngestExternalFileOptions { + fn default() -> Self { + Self { + commit_ts: 0, + move_files: false, + failed_move_fall_back_to_copy: true, + verify_checksum: true, + pick_level_strategy: PickLevelStrategy::BaseLevel, + } + } +} + +pub(crate) struct FileContext { + /// file id alloc by [`LevelController`] + pub(crate) id: u64, + pub(crate) input_path: String, + pub(crate) table: Option, + pub(crate) moved_or_copied: bool, + pub(crate) picked_level: usize, +} + +pub(crate) struct IngestExternalFileTask { + opts: IngestExternalFileOptions, + core: Arc, + files: Vec, + version: Option, + // whether files to ingest overlap with each other + overlap: bool, +} + +impl IngestExternalFileTask { + pub(crate) fn new(core: Arc, files: &[&str], opts: IngestExternalFileOptions) -> Self { + IngestExternalFileTask { + opts, + core, + files: files + .iter() + .map(|str| FileContext { + id: 0, + input_path: str.to_string(), + table: None, + moved_or_copied: false, + picked_level: 0, + }) + .collect(), + version: None, + overlap: false, + } + } + + pub(crate) fn run(&mut self) -> Result<()> { + let res = self.run_inner(); + if let Some(version) = self.version { + self.core.orc.done_commit(version); + } + if res.is_err() { + self.cleanup_files(); + } + res + } + + fn run_inner(&mut self) -> Result<()> { + // first check all files are valid + self.check_input_exist()?; + // alloc file id for each file + self.reserve_file_id(); + // move or copy file to db dir + self.ingest_to_dir()?; + // verify file + self.verify()?; + // start to commit + self.assign_version(); + // ingest to LSM-tree + self.ingest_to_lsm()?; + Ok(()) + } + + fn check_input_exist(&self) -> Result<()> { + for file in self.files.iter() { + if !Path::new(&file.input_path).is_file() { + return Err(io::Error::new( + io::ErrorKind::NotFound, + format!("file path {} is invalid", file.input_path), + ) + .into()); + } + } + Ok(()) + } + + fn reserve_file_id(&mut self) { + for file in self.files.iter_mut() { + file.id = self.core.lvctl.reserve_file_id(); + } + } + + fn ingest_to_dir(&mut self) -> Result<()> { + for file in self.files.iter_mut() { + let out_path = table::new_filename(file.id, &self.core.opts.dir); + if self.opts.move_files { + match fs::hard_link(&file.input_path, &out_path) { + Ok(_) => { + file.moved_or_copied = true; + continue; + } + Err(err) => { + warn!( + "[ingest task]: failed to move file {} to db dir. {}", + file.input_path, err + ); + if !self.opts.failed_move_fall_back_to_copy { + return Err(err.into()); + } + } + } + } + // copy file + match fs::copy(&file.input_path, &out_path) { + Ok(_) => { + file.moved_or_copied = true; + } + Err(err) => { + warn!( + "[ingest task]: failed to copy file {} to db dir. {}", + file.input_path, err + ); + return Err(err.into()); + } + } + } + Ok(()) + } + + fn verify(&mut self) -> Result<()> { + use ChecksumVerificationMode::*; + + for file in self.files.iter_mut() { + let table = Table::open( + &table::new_filename(file.id, &self.core.opts.dir), + build_table_options(&self.core.opts), + )?; + // checksum has been checked when open in [`OnTableRead`] or [`OnTableAndBlockRead`] mode + // avoid double check + if self.opts.verify_checksum + && !matches!( + self.core.opts.checksum_mode, + OnTableRead | OnTableAndBlockRead + ) + { + table.verify_checksum()?; + } + file.table = Some(table); + } + Ok(()) + } + + fn assign_version(&mut self) { + // collect user key range for oracle to check conflict + let ranges = self + .files + .iter() + .map(|file| { + RangeInclusive::new( + Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().smallest())), + Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().biggest())), + ) + }) + .collect::>(); + let version = self.core.orc.new_ingest_commit_ts(ranges, &self.opts); + self.version = Some(version); + self.files.iter_mut().for_each(|file| { + file.table.as_mut().unwrap().set_global_version(version); + }); + + // all tables assigned version and open, sort by range + self.files.sort_unstable_by(|x, y| { + COMPARATOR.compare_key( + x.table.as_ref().unwrap().smallest(), + y.table.as_ref().unwrap().smallest(), + ) + }); + // check overlap + if self.files.len() > 1 { + for i in 0..(self.files.len() - 1) { + if matches!( + COMPARATOR.compare_key( + self.files[i].table.as_ref().unwrap().biggest(), + self.files[i + 1].table.as_ref().unwrap().smallest() + ), + Ordering::Equal | Ordering::Greater + ) { + self.overlap = true; + break; + } + } + } + } + + fn ingest_to_lsm(&mut self) -> Result<()> { + // TODO: will, it's too ugly here + let ccore = self.core.clone(); + ccore.lvctl.ingest_tables(self) + } + + fn cleanup_files(&self) { + // file will be removed when table drop + self.files + .iter() + .filter(|file| file.moved_or_copied && file.table.is_none()) + .for_each(|file| { + let out_path = table::new_filename(file.id, &self.core.opts.dir); + if let Err(err) = fs::remove_file(&out_path) { + warn!( + "[ingest tark]: failed to clean file {} when ingest task failed. {}", + out_path.to_string_lossy(), + err + ) + } + }) + } + + pub(crate) fn overlap(&self) -> bool { + self.overlap + } + + pub(crate) fn files(&self) -> &[FileContext] { + &self.files + } + + pub(crate) fn files_mut(&mut self) -> &mut [FileContext] { + &mut self.files + } + + pub(crate) fn opts(&self) -> &IngestExternalFileOptions { + &self.opts + } +} diff --git a/src/levels.rs b/src/levels.rs index f2cb9535..c9315d01 100644 --- a/src/levels.rs +++ b/src/levels.rs @@ -25,6 +25,7 @@ use yatp::task::callback::Handle; use crate::{ closer::Closer, format::{get_ts, key_with_ts, user_key}, + ingest::{FileContext, IngestExternalFileTask, PickLevelStrategy}, iterator::{is_deleted_or_expired, IteratorOptions}, manifest::{new_create_change, new_delete_change, ManifestFile}, ops::oracle::Oracle, @@ -96,7 +97,10 @@ impl LevelsControllerInner { // TODO: Set compression, data_key, cache. let filename = crate::table::new_filename(id, &opts.dir); - let table = Table::open(&filename, table_opts)?; + let mut table = Table::open(&filename, table_opts)?; + if table_manifest.global_version > 0 { + table.set_global_version(table_manifest.global_version); + } // TODO: Allow checksum mismatch tables. tables[table_manifest.level as usize].push(table); @@ -580,7 +584,7 @@ impl LevelsControllerInner { if i % N == N - 1 { let biggest = table.biggest(); // TODO: Check this. - let mut buf = BytesMut::with_capacity(biggest.len() + 8); + let mut buf = BytesMut::with_capacity(biggest.len()); buf.put(user_key(biggest)); let right = key_with_ts(buf, std::u64::MAX); add_range(&mut compact_def.splits, right); @@ -690,6 +694,8 @@ impl LevelsControllerInner { compact_def.next_range = compact_def.this_range.clone(); } else { compact_def.next_range = get_key_range(&compact_def.bot); + // make next_range cover the compaction output + compact_def.next_range.extend(&compact_def.this_range); } self.cpt_status @@ -759,6 +765,8 @@ impl LevelsControllerInner { } compact_def.next_range = get_key_range(&compact_def.bot); + // make next_range cover the compaction output + compact_def.next_range.extend(&compact_def.this_range); if cpt_status.overlaps_with(compact_def.next_level_id, &compact_def.next_range) { continue; @@ -814,11 +822,11 @@ impl LevelsControllerInner { let mut this_level = this_level.write().unwrap(); let mut next_level = next_level.write().unwrap(); this_level.delete_tables(&compact_def.top); - next_level.replace_tables(&compact_def.bot, &new_tables)?; + next_level.replace_tables(&compact_def.bot, &new_tables); } else { let mut this_level = this_level.write().unwrap(); this_level.delete_tables(&compact_def.top); - this_level.replace_tables(&compact_def.bot, &new_tables)?; + this_level.replace_tables(&compact_def.bot, &new_tables); } // TODO: Add log. @@ -893,8 +901,12 @@ impl LevelsControllerInner { fn add_l0_table(&self, table: Table) -> Result<()> { if !self.opts.in_memory { // Update the manifest _before_ the table becomes part of a level_handler. - self.manifest - .add_changes(vec![new_create_change(table.id(), 0, 0)])?; + self.manifest.add_changes(vec![new_create_change( + table.id(), + 0, + 0, + table.global_version().unwrap_or(0), + )])?; } while !self.levels[0].write()?.try_add_l0_table(table.clone()) { @@ -918,6 +930,93 @@ impl LevelsControllerInner { Ok(()) } + fn ingest_tables(&self, task: &mut IngestExternalFileTask) -> Result<()> { + let mut changes = vec![]; + let files_overlap = task.overlap(); + let pick_level_strategy = task.opts().pick_level_strategy; + let mut krs = vec![]; + let mut lv_tbls = vec![vec![]; self.levels.len()]; + + for file in task.files_mut().iter_mut() { + if files_overlap { + file.picked_level = 0; + } else { + self.assign_ingest_file_level(file, pick_level_strategy); + } + let table = file.table.as_ref().unwrap(); + if file.picked_level > 0 { + let kr = get_key_range_single(table); + krs.push((file.picked_level, kr)); + } + lv_tbls[file.picked_level].push(table.clone()); + changes.push(new_create_change( + file.id, + file.picked_level, + 0, + table.global_version().unwrap_or(0), + )); + } + + // All files picked levels, write changes to manifest before ingest to lsm. + // No lock was hold here. + // If failed, need to remove ranges. + if let Err(err) = self.manifest.add_changes(changes) { + error!( + "[ingest task]: failed to write changes to manifest. {}", + err + ); + let mut cpt_status = self.cpt_status.write().unwrap(); + krs.iter() + .for_each(|(lv, kr)| cpt_status.delete_range(*lv, kr)); + return Err(err); + } + // ingest to lsm + for (lv, tbs) in lv_tbls.iter().enumerate() { + self.levels[lv].write().unwrap().replace_tables(&[], tbs); + } + // remove hold key ranges + let mut cpt_status = self.cpt_status.write().unwrap(); + krs.iter() + .for_each(|(lv, kr)| cpt_status.delete_range(*lv, kr)); + Ok(()) + } + + fn assign_ingest_file_level( + &self, + file: &mut FileContext, + pick_level_strategy: PickLevelStrategy, + ) { + let table = file.table.as_ref().unwrap(); + file.picked_level = 0; + let kr = get_key_range_single(table); + match pick_level_strategy { + PickLevelStrategy::BaseLevel | PickLevelStrategy::BottomLevel => { + let base = if matches!(pick_level_strategy, PickLevelStrategy::BaseLevel) { + let target = self.level_targets(); + if target.base_level == 0 { + 1 + } else { + target.base_level + } + } else { + self.levels.len() - 1 + }; + // iter from level base to level 1, if all failed, auto pick level 0 + for target in (1..=base).rev() { + let handle = self.levels[target].read().unwrap(); + let (l, r) = handle.overlapping_tables(&kr); + if r - l > 0 { + continue; + } + if self.cpt_status.write().unwrap().add_range(target, &kr) { + file.picked_level = target; + break; + } + } + } + }; + } + /// Searches for a given key in all the levels of the LSM tree. fn get(&self, key: &Bytes, mut max_value: Value, start_level: usize) -> Result { // TODO: Check if is closed. @@ -971,6 +1070,10 @@ impl LevelsController { self.inner.add_l0_table(table) } + pub(crate) fn ingest_tables(&self, task: &mut IngestExternalFileTask) -> Result<()> { + self.inner.ingest_tables(task) + } + pub fn get(&self, key: &Bytes, max_value: Value, start_level: usize) -> Result { self.inner.get(key, max_value, start_level) } @@ -1055,7 +1158,12 @@ fn build_change_set(compact_def: &CompactDef, new_tables: &[Table]) -> ManifestC for table in new_tables { // TODO: Data key id. - changes.push(new_create_change(table.id(), compact_def.next_level_id, 0)); + changes.push(new_create_change( + table.id(), + compact_def.next_level_id, + 0, + table.global_version().unwrap_or(0), + )); } for table in &compact_def.top { if !table.is_in_memory() { diff --git a/src/levels/compaction.rs b/src/levels/compaction.rs index cf446425..735a8f44 100644 --- a/src/levels/compaction.rs +++ b/src/levels/compaction.rs @@ -215,11 +215,9 @@ impl CompactStatus { } if !found { - let this = compact_def.this_range.clone(); - let next = compact_def.next_range.clone(); panic!( "try looking for {:?} in this level and {:?} in next level, but key range not found", - this, next + compact_def.this_range, compact_def.next_range ); } @@ -275,6 +273,25 @@ impl CompactStatus { Ok(()) } + /// return true when success to add range + pub fn add_range(&mut self, level: usize, kr: &KeyRange) -> bool { + if self.levels[level].overlaps_with(kr) { + false + } else { + self.levels[level].ranges.push(kr.clone()); + true + } + } + + pub fn delete_range(&mut self, level: usize, kr: &KeyRange) { + if !self.levels[level].remove(kr) { + panic!( + "try looking for {:?} in level {}, but key range not found", + kr, level + ) + } + } + pub fn overlaps_with(&self, level: usize, this: &KeyRange) -> bool { let this_level = &self.levels[level]; this_level.overlaps_with(this) @@ -323,8 +340,8 @@ pub fn get_key_range(tables: &[Table]) -> KeyRange { biggest = item.biggest(); } } - let mut smallest_buf = BytesMut::with_capacity(smallest.len() + 8); - let mut biggest_buf = BytesMut::with_capacity(biggest.len() + 8); + let mut smallest_buf = BytesMut::with_capacity(smallest.len()); + let mut biggest_buf = BytesMut::with_capacity(biggest.len()); smallest_buf.extend_from_slice(user_key(smallest)); biggest_buf.extend_from_slice(user_key(biggest)); KeyRange::new( diff --git a/src/levels/handler.rs b/src/levels/handler.rs index 5681af76..cc602a10 100644 --- a/src/levels/handler.rs +++ b/src/levels/handler.rs @@ -85,7 +85,7 @@ impl LevelHandler { self.tables = new_tables; } - pub fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) -> Result<()> { + pub fn replace_tables(&mut self, to_del: &[Table], to_add: &[Table]) { let mut to_del_map = HashSet::new(); for table in to_del { @@ -111,8 +111,6 @@ impl LevelHandler { new_tables.sort_by(|x, y| COMPARATOR.compare_key(x.smallest(), y.smallest())); self.tables = new_tables; - - Ok(()) } /// Returns true if ok and no stalling. diff --git a/src/levels/tests.rs b/src/levels/tests.rs index 515e5a69..7c54774d 100644 --- a/src/levels/tests.rs +++ b/src/levels/tests.rs @@ -115,7 +115,12 @@ fn create_and_open(lvctl: &mut LevelsController, td: Vec, level: lvctl .inner .manifest - .add_changes(vec![new_create_change(table.id(), level, 0)]) + .add_changes(vec![new_create_change( + table.id(), + level, + 0, + table.global_version().unwrap_or(0), + )]) .unwrap(); let mut lv = lvctl.inner.levels[level].write().unwrap(); lv.tables.push(table); diff --git a/src/lib.rs b/src/lib.rs index 17bb41ec..98999664 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -8,6 +8,7 @@ mod db; mod entry; mod error; mod format; +mod ingest; mod iterator; mod iterator_trait; mod levels; diff --git a/src/manifest.rs b/src/manifest.rs index 347489c1..e5c43f43 100644 --- a/src/manifest.rs +++ b/src/manifest.rs @@ -34,6 +34,7 @@ pub struct LevelManifest { pub struct TableManifest { pub level: u8, pub key_id: u64, + pub global_version: u64, // TODO: compression } @@ -76,7 +77,12 @@ impl Manifest { fn as_changes(&self) -> Vec { let mut changes = Vec::with_capacity(self.tables.len()); for (id, tm) in &self.tables { - changes.push(new_create_change(*id, tm.level as usize, tm.key_id)); + changes.push(new_create_change( + *id, + tm.level as usize, + tm.key_id, + tm.global_version, + )); } changes } @@ -330,6 +336,7 @@ fn apply_manifest_change(build: &mut Manifest, tc: &ManifestChange) -> Result<() TableManifest { level: tc.level as u8, key_id: tc.key_id, + global_version: tc.global_version, }, ); while build.levels.len() <= tc.level as usize { @@ -354,12 +361,18 @@ fn apply_manifest_change(build: &mut Manifest, tc: &ManifestChange) -> Result<() Ok(()) } -pub fn new_create_change(id: u64, level: usize, key_id: u64) -> ManifestChange { +pub fn new_create_change( + id: u64, + level: usize, + key_id: u64, + global_version: u64, +) -> ManifestChange { ManifestChange { id, op: ManifestChangeOp::Create as i32, level: level as u32, key_id, + global_version, // unused fields encryption_algo: 0, compression: 0, @@ -375,6 +388,7 @@ pub fn new_delete_change(id: u64) -> ManifestChange { key_id: 0, encryption_algo: 0, compression: 0, + global_version: 0, } } @@ -394,8 +408,8 @@ mod tests { let manifestfile = ManifestFile::open_or_create_manifest_file(&opts).unwrap(); - let changes_param1 = vec![new_create_change(1, 1, 1)]; - let changes_param2 = vec![new_create_change(2, 2, 2)]; + let changes_param1 = vec![new_create_change(1, 1, 1, 0)]; + let changes_param2 = vec![new_create_change(2, 2, 2, 0)]; manifestfile.add_changes(changes_param1.clone()).unwrap(); manifestfile.add_changes(changes_param2.clone()).unwrap(); diff --git a/src/ops/oracle.rs b/src/ops/oracle.rs index 98dd0748..f47155f6 100644 --- a/src/ops/oracle.rs +++ b/src/ops/oracle.rs @@ -1,17 +1,23 @@ use std::{ collections::HashSet, + ops::{Deref, RangeInclusive}, sync::{Arc, Mutex}, }; +use bytes::Bytes; use yatp::{task::callback::TaskCell, Builder, ThreadPool}; use super::transaction::Transaction; -use crate::{closer::Closer, watermark::WaterMark, AgateOptions}; +use crate::{ + closer::Closer, ingest::IngestExternalFileOptions, watermark::WaterMark, AgateOptions, +}; struct CommittedTxn { ts: u64, // Keeps track of the entries written at timestamp ts. conflict_keys: HashSet, + // Keeps track of ranges of ingested files at timestamp ts. + conflict_ranges: Vec>, } #[derive(Default)] @@ -46,7 +52,7 @@ impl CommitInfo { fn has_conflict(&self, txn: &Transaction) -> bool { let reads = txn.reads.lock().unwrap(); - if reads.is_empty() { + if reads.fingerprints.is_empty() { false } else { // If the committed_txn.ts is less than txn.read_ts that implies that the @@ -57,11 +63,21 @@ impl CommitInfo { // a txn before it. self.committed_txns .iter() - .filter(|committed_txn| committed_txn.ts > txn.read_ts) + .filter(|committed_txn| { + committed_txn.ts > txn.read_ts + && (!committed_txn.conflict_keys.is_empty() + || !committed_txn.conflict_ranges.is_empty()) + }) .any(|committed_txn| { - reads + let ck = reads + .fingerprints .iter() - .any(|read| committed_txn.conflict_keys.contains(read)) + .any(|read| committed_txn.conflict_keys.contains(read)); + let cr = committed_txn.conflict_ranges.iter().any(|range| { + range.contains(reads.smallest.deref()) + || range.contains(reads.biggest.deref()) + }); + ck || cr }) } } @@ -200,6 +216,7 @@ impl Oracle { commit_info.committed_txns.push(CommittedTxn { ts, conflict_keys: txn.conflict_keys.clone(), + conflict_ranges: vec![], }) } @@ -207,6 +224,31 @@ impl Oracle { } } + pub(crate) fn new_ingest_commit_ts( + &self, + ingest_ranges: Vec>, + opts: &IngestExternalFileOptions, + ) -> u64 { + let mut commit_info = self.commit_info.lock().unwrap(); + let commit_ts = if !self.is_managed { + // ingest task does not have a read_ts, no need to done and cleanup + let ts = commit_info.next_txn_ts; + commit_info.next_txn_ts += 1; + self.txn_mark.begin(ts); + ts + } else { + opts.commit_ts + }; + if self.detect_conflicts { + commit_info.committed_txns.push(CommittedTxn { + ts: commit_ts, + conflict_keys: HashSet::default(), + conflict_ranges: ingest_ranges, + }) + } + commit_ts + } + pub(crate) fn done_read(&self, txn: &mut Transaction) { if !txn.done_read { txn.done_read = true; @@ -230,7 +272,7 @@ impl Drop for Oracle { #[cfg(test)] mod tests { use super::*; - use crate::db::Core; + use crate::{db::Core, ops::transaction::ReadsTrace}; #[test] fn test_basic() { @@ -277,12 +319,18 @@ mod tests { assert_eq!(core.orc.new_commit_ts(&mut txn), (1, false)); txn.read_ts = 0; - txn.reads = Mutex::new(vec![11, 23]); + txn.reads = Mutex::new(ReadsTrace { + fingerprints: vec![11, 23], + ..Default::default() + }); // Has conflict. assert_eq!(core.orc.new_commit_ts(&mut txn), (0, true)); - txn.reads = Mutex::new(vec![23]); + txn.reads = Mutex::new(ReadsTrace { + fingerprints: vec![23], + ..Default::default() + }); // No conflict. assert_eq!(core.orc.new_commit_ts(&mut txn), (2, false)); diff --git a/src/ops/transaction.rs b/src/ops/transaction.rs index 2fefaa3a..3ad699fb 100644 --- a/src/ops/transaction.rs +++ b/src/ops/transaction.rs @@ -1,5 +1,7 @@ use std::{ + cmp, collections::{HashMap, HashSet}, + ops::Deref, sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, @@ -30,7 +32,7 @@ pub struct Transaction { pub(crate) count: usize, /// Contains fingerprints of keys read. - pub(crate) reads: Mutex>, + pub(crate) reads: Mutex, /// Contains fingerprints of keys written. This is used for conflict detection. pub(crate) conflict_keys: HashSet, @@ -48,6 +50,13 @@ pub struct Transaction { pub(crate) core: Arc, } +#[derive(Default)] +pub(crate) struct ReadsTrace { + pub(crate) fingerprints: Vec, + pub(crate) smallest: BytesMut, + pub(crate) biggest: BytesMut, +} + pub struct PendingWritesIterator { entries: Vec, next_idx: usize, @@ -63,7 +72,7 @@ impl Transaction { commit_ts: 0, size: 0, count: 0, - reads: Mutex::new(vec![]), + reads: Mutex::new(ReadsTrace::default()), conflict_keys: HashSet::new(), pending_writes: HashMap::new(), duplicate_writes: vec![], @@ -87,11 +96,7 @@ impl Transaction { let mut entries: Vec<_> = self.pending_writes.values().cloned().collect(); entries.sort_by(|x, y| { let cmp = COMPARATOR.compare_key(&x.key, &y.key); - if reversed { - cmp.reverse() - } else { - cmp - } + if reversed { cmp.reverse() } else { cmp } }); Some(PendingWritesIterator::new(self.read_ts, reversed, entries)) @@ -254,8 +259,17 @@ impl Transaction { } pub(crate) fn add_read_key(&self, key: &Bytes) { + // key here is user key if self.update { - self.reads.lock().unwrap().push(default_hash(key)); + let mut reads = self.reads.lock().unwrap(); + reads.fingerprints.push(default_hash(key)); + if matches!(reads.smallest.deref().cmp(key), cmp::Ordering::Greater) { + reads.smallest.clear(); + reads.smallest.extend_from_slice(key); + } else if matches!(reads.biggest.deref().cmp(key), cmp::Ordering::Less) { + reads.biggest.clear(); + reads.biggest.extend_from_slice(key); + } } } diff --git a/src/table.rs b/src/table.rs index 67be25f9..175a4b16 100644 --- a/src/table.rs +++ b/src/table.rs @@ -14,7 +14,7 @@ use std::{ sync::{atomic::AtomicBool, Arc}, }; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use iterator::TableRefIterator; pub use iterator::{ITERATOR_NOCACHE, ITERATOR_REVERSED}; use memmap2::{Mmap, MmapOptions}; @@ -24,6 +24,7 @@ use proto::meta::{BlockOffset, Checksum, TableIndex}; use crate::{ bloom::Bloom, checksum, + format::{append_ts, user_key}, iterator_trait::AgateIterator, opt::{ChecksumVerificationMode, Options}, Error, Result, @@ -97,6 +98,9 @@ pub struct TableInner { /// by default, when `TableInner` is dropped, the SST file will be /// deleted. By setting this to true, it won't be deleted. save_after_close: AtomicBool, + /// Global version for table. Currently only ingested files has a global version + /// for all keys in table + global_version: Option, } /// Table is simply an Arc to its internal TableInner structure. @@ -155,6 +159,7 @@ impl TableInner { opts, has_bloom_filter: false, save_after_close: AtomicBool::new(false), + global_version: None, }; inner.init_biggest_and_smallest()?; @@ -184,6 +189,7 @@ impl TableInner { index_len: 0, has_bloom_filter: false, save_after_close: AtomicBool::new(false), + global_version: None, }; inner.init_biggest_and_smallest()?; @@ -265,6 +271,33 @@ impl TableInner { result } + fn global_version(&self) -> Option { + self.global_version + } + + fn set_global_version(&mut self, version: u64) { + assert!(self.global_version.is_none()); + + self.global_version = Some(version); + // reset smallest and biggest. Capacity eq to previous is enough. + let mut buf = BytesMut::with_capacity(self.smallest.len()); + buf.extend_from_slice(user_key(&self.smallest)); + append_ts(&mut buf, version); + self.smallest = buf.freeze(); + let mut buf = BytesMut::with_capacity(self.biggest.len()); + buf.extend_from_slice(user_key(&self.biggest)); + append_ts(&mut buf, version); + self.biggest = buf.freeze(); + // TODO: Maybe we should rebuild keys in index when iter. + // But we don't have a index iterator like block, it's a little bit hard and + // dispersive to make this change. + // So I just change key's version in [`BlockOffset`] here. + self.index.offsets.iter_mut().for_each(|bo| { + bo.key.truncate(bo.key.len() - 8); + append_ts(&mut bo.key, version); + }) + } + fn fetch_index(&self) -> &TableIndex { &self.index // TODO: encryption @@ -405,17 +438,12 @@ impl TableInner { Ok(result) } + /// checksum mode should be check by caller, this func is pure fn verify_checksum(&self) -> Result<()> { - use ChecksumVerificationMode::*; - let table_index = self.fetch_index(); for i in 0..table_index.offsets.len() { - // When using OnBlockRead or OnTableAndBlockRead, we do not need to verify block - // checksum now. But we still need to check if there is an encoding error in block. let block = self.block(i, true)?; - if !matches!(self.opts.checksum_mode, OnBlockRead | OnTableAndBlockRead) { - block.verify_checksum()?; - } + block.verify_checksum()?; } Ok(()) } @@ -610,6 +638,28 @@ impl Table { .save_after_close .store(true, std::sync::atomic::Ordering::SeqCst); } + + /// checksum mode should be check by caller, this func is pure + pub fn verify_checksum(&self) -> Result<()> { + self.inner.verify_checksum() + } + + /// For ingested files to set global version. + /// This will change the smallest, biggest key and TableIndex in struct. + /// + /// ### Panics + /// + /// panic when table has more than one ref + pub(crate) fn set_global_version(&mut self, version: u64) { + match Arc::get_mut(&mut self.inner) { + Some(inner) => inner.set_global_version(version), + None => unreachable!(), + } + } + + pub(crate) fn global_version(&self) -> Option { + self.inner.global_version() + } } fn id_to_filename(id: u64) -> String { diff --git a/src/table/iterator.rs b/src/table/iterator.rs index 57b4851a..63006d6e 100644 --- a/src/table/iterator.rs +++ b/src/table/iterator.rs @@ -10,7 +10,7 @@ use crate::{ iterator_trait::AgateIterator, util::{self, KeyComparator, COMPARATOR}, value::Value, - Error, + Error, format::append_ts, }; /// Errors that may encounter during iterator operation @@ -67,12 +67,14 @@ struct BlockIterator { /// to avoid unnecessary copy of base key when the overlap is /// same for multiple keys. perv_overlap: u16, + /// Version for every key in this block. Used for ingested files. + global_version: Option, /// iterator error in last operation err: Option, } impl BlockIterator { - pub fn new(block: Arc) -> Self { + pub fn new(block: Arc, global_version: Option) -> Self { let data = block.data.slice(..block.entries_index_start); Self { block, @@ -82,12 +84,13 @@ impl BlockIterator { val: Bytes::new(), data, perv_overlap: 0, + global_version, idx: 0, } } /// Replace block inside iterator and reset the iterator - pub fn set_block(&mut self, block: Arc) { + pub fn set_block(&mut self, block: Arc, global_version: Option) { self.err = None; self.idx = 0; self.base_key.clear(); @@ -96,6 +99,7 @@ impl BlockIterator { self.val.clear(); self.data = block.data.slice(..block.entries_index_start); self.block = block; + self.global_version = global_version; } #[inline] @@ -145,6 +149,10 @@ impl BlockIterator { let diff_key = &entry_data[..header.diff as usize]; self.key.extend_from_slice(diff_key); + if let Some(version) = self.global_version { + self.key.truncate(self.key.len() - 8); + append_ts(&mut self.key, version); + } self.val = entry_data.slice(header.diff as usize..); } @@ -260,12 +268,16 @@ impl> TableRefIterator { self.opt & ITERATOR_NOCACHE == 0 } - fn get_block_iterator(&mut self, block: Arc) -> &mut BlockIterator { + fn get_block_iterator( + &mut self, + block: Arc, + global_version: Option, + ) -> &mut BlockIterator { if let Some(ref mut iter) = self.block_iterator { - iter.set_block(block); + iter.set_block(block, global_version); return iter; } - self.block_iterator = Some(BlockIterator::new(block)); + self.block_iterator = Some(BlockIterator::new(block, global_version)); self.block_iterator.as_mut().unwrap() } @@ -278,7 +290,8 @@ impl> TableRefIterator { self.bpos = 0; match self.table.as_ref().block(self.bpos, self.use_cache()) { Ok(block) => { - let block_iterator = self.get_block_iterator(block); + let block_iterator = + self.get_block_iterator(block, self.table.as_ref().global_version()); block_iterator.seek_to_first(); self.err = block_iterator.err.clone(); } @@ -295,7 +308,8 @@ impl> TableRefIterator { self.bpos = num_blocks - 1; match self.table.as_ref().block(self.bpos, self.use_cache()) { Ok(block) => { - let block_iterator = self.get_block_iterator(block); + let block_iterator = + self.get_block_iterator(block, self.table.as_ref().global_version()); block_iterator.seek_to_last(); self.err = block_iterator.err.clone(); } @@ -307,7 +321,8 @@ impl> TableRefIterator { self.bpos = block_idx; match self.table.as_ref().block(self.bpos, self.use_cache()) { Ok(block) => { - let block_iterator = self.get_block_iterator(block); + let block_iterator = + self.get_block_iterator(block, self.table.as_ref().global_version()); block_iterator.seek(key, SeekPos::Origin); self.err = block_iterator.err.clone(); } @@ -372,7 +387,8 @@ impl> TableRefIterator { if BlockIterator::is_ready(&self.block_iterator) { match self.table.as_ref().block(self.bpos, self.use_cache()) { Ok(block) => { - let block_iterator = self.get_block_iterator(block); + let block_iterator = + self.get_block_iterator(block, self.table.as_ref().global_version()); block_iterator.seek_to_first(); self.err = block_iterator.err.clone(); } @@ -403,7 +419,8 @@ impl> TableRefIterator { if BlockIterator::is_ready(&self.block_iterator) { match self.table.as_ref().block(self.bpos, self.use_cache()) { Ok(block) => { - let block_iterator = self.get_block_iterator(block); + let block_iterator = + self.get_block_iterator(block, self.table.as_ref().global_version()); block_iterator.seek_to_last(); self.err = block_iterator.err.clone(); } From fce8f505e05d17c6b7757bb20f36abd90ebedbbf Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Tue, 23 Aug 2022 14:28:33 +0800 Subject: [PATCH 02/11] test(ingest): add basic tests Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 224 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 209 insertions(+), 15 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index fa545df8..c770f197 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -15,7 +15,7 @@ pub enum PickLevelStrategy { BottomLevel, } -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct IngestExternalFileOptions { pub commit_ts: u64, pub move_files: bool, @@ -52,6 +52,7 @@ pub(crate) struct IngestExternalFileTask { version: Option, // whether files to ingest overlap with each other overlap: bool, + success: bool, } impl IngestExternalFileTask { @@ -71,6 +72,7 @@ impl IngestExternalFileTask { .collect(), version: None, overlap: false, + success: false, } } @@ -79,9 +81,8 @@ impl IngestExternalFileTask { if let Some(version) = self.version { self.core.orc.done_commit(version); } - if res.is_err() { - self.cleanup_files(); - } + self.success = res.is_ok(); + self.cleanup_files(); res } @@ -229,20 +230,32 @@ impl IngestExternalFileTask { } fn cleanup_files(&self) { - // file will be removed when table drop - self.files - .iter() - .filter(|file| file.moved_or_copied && file.table.is_none()) - .for_each(|file| { - let out_path = table::new_filename(file.id, &self.core.opts.dir); - if let Err(err) = fs::remove_file(&out_path) { + if !self.success { + // file will be removed when table drop + self.files + .iter() + .filter(|file| file.moved_or_copied && file.table.is_none()) + .for_each(|file| { + let out_path = table::new_filename(file.id, &self.core.opts.dir); + if let Err(err) = fs::remove_file(&out_path) { + warn!( + "[ingest tark]: failed to clean file {} when ingest task failed. {}", + out_path.to_string_lossy(), + err + ) + } + }) + } else if self.opts.move_files { + // success move and ingest, remove old files + self.files.iter().for_each(|file| { + if let Err(err) = fs::remove_file(&file.input_path) { warn!( - "[ingest tark]: failed to clean file {} when ingest task failed. {}", - out_path.to_string_lossy(), - err - ) + "failed to remove input file {} after ingest. {}", + file.input_path, err + ); } }) + } } pub(crate) fn overlap(&self) -> bool { @@ -261,3 +274,184 @@ impl IngestExternalFileTask { &self.opts } } + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, fs, path::Path}; + + use bytes::BytesMut; + + use super::IngestExternalFileOptions; + use crate::{ + db::tests::run_agate_test, error::Result, key_with_ts, opt::build_table_options, Agate, + AgateOptions, Table, TableBuilder, Value, + }; + + const BUILD_TABLE_VERSION: u64 = 10; + + fn build_key(i: usize) -> BytesMut { + BytesMut::from(format!("key_{:012x}", i).as_bytes()) + } + + fn build_value(i: usize) -> BytesMut { + BytesMut::from(build_key(i).repeat(4).as_slice()) + } + + fn build_table>( + path: P, + opts: &AgateOptions, + f: impl FnOnce(&mut TableBuilder), + ) -> Result<()> { + let mut builder = TableBuilder::new(build_table_options(opts)); + f(&mut builder); + let table = Table::create(path.as_ref(), builder.finish(), build_table_options(opts))?; + table.mark_save(); + Ok(()) + } + + fn create_external_files_dir>(path: P) { + let _ = fs::remove_dir_all(&path); + assert!(fs::create_dir_all(&path).is_ok()); + } + + fn ingest(db: &Agate, files: &[&str], move_files: bool, commit_ts: u64) -> Result<()> { + let mut opts = IngestExternalFileOptions::default(); + opts.move_files = move_files; + opts.commit_ts = commit_ts; + db.ingest_external_files(files, &opts) + } + + #[test] + fn basic() { + run_agate_test(None, |db| { + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + + let file1 = external_dir.join("1.sst"); + + let res = build_table(&file1, &db.core.opts, |builder| { + for i in 0..100 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + assert!(res.is_ok()); + + let file2 = external_dir.join("2.sst"); + + let res = build_table(&file2, &db.core.opts, |builder| { + for i in 100..200 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + assert!(res.is_ok()); + + let res = ingest(&db, &[&file1.to_string_lossy()], false, 0); + assert!(res.is_ok()); + let res = ingest(&db, &[&file2.to_string_lossy()], true, 0); + assert!(res.is_ok()); + + assert!(Path::exists(&file1)); + assert!(!Path::exists(&file2)); + + db.view(|txn| { + for i in 0..200 { + let item = txn.get(&build_key(i).freeze()).unwrap(); + assert_eq!(item.value(), build_value(i)); + } + Ok(()) + }) + .unwrap(); + }) + } + + #[test] + fn overlap() { + let mut db_opts = AgateOptions::default(); + db_opts.managed_txns = true; + + run_agate_test(Some(db_opts), |db| { + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + + let mut data = HashMap::new(); + + let file1 = external_dir.join("1.sst"); + + let res = build_table(&file1, &db.core.opts, |builder| { + for i in 0..1000 { + let k = build_key(i); + let v = build_value(i); + builder.add( + &key_with_ts(k.clone(), BUILD_TABLE_VERSION), + &Value::new(v.clone().freeze()), + 0, + ); + data.insert(k, v); + } + }); + assert!(res.is_ok()); + let res = ingest(&db, &[&file1.to_string_lossy()], true, 1); + assert!(res.is_ok()); + + { + let mut txn = db.new_transaction_at(1, true); + for i in 500..1500 { + let k = build_key(i); + let v = BytesMut::from(&b"in memtable"[..]); + assert!(txn.set(k.clone().freeze(), v.clone().freeze()).is_ok()); + data.insert(k, v); + } + assert!(txn.commit_at(2).is_ok()); + } + + let file2 = external_dir.join("2.sst"); + + let res = build_table(&file2, &db.core.opts, |builder| { + for i in 1000..2000 { + let k = build_key(i); + let v = build_value(i); + builder.add( + &key_with_ts(k.clone(), BUILD_TABLE_VERSION), + &Value::new(v.clone().freeze()), + 0, + ); + data.insert(k, v); + } + }); + assert!(res.is_ok()); + + let res = ingest(&db, &[&file2.to_string_lossy()], true, 3); + assert!(res.is_ok()); + + assert!(!Path::exists(&file1)); + assert!(!Path::exists(&file2)); + + db.view(|txn| { + for (k, v) in data.iter() { + let item = txn.get(&k.clone().freeze()).unwrap(); + assert_eq!(item.value(), v) + } + Ok(()) + }) + .unwrap(); + + { + let txn = db.new_transaction_at(2, false); + for i in 500..1500 { + let k = build_key(i); + let v = BytesMut::from(&b"in memtable"[..]); + let item = txn.get(&k.freeze()).unwrap(); + assert_eq!(item.value(), &v); + } + } + }) + } +} From ca86239568b808d0da02f5fa6eb2752d8f55b335 Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Tue, 23 Aug 2022 15:28:04 +0800 Subject: [PATCH 03/11] fix: fix ReadsTrace update for conflict check & add ingest conflict check test Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 103 ++++++++++++++++++++++++++++++++++++++++- src/ops/transaction.rs | 5 +- 2 files changed, 105 insertions(+), 3 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index c770f197..817084b4 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -284,7 +284,7 @@ mod tests { use super::IngestExternalFileOptions; use crate::{ db::tests::run_agate_test, error::Result, key_with_ts, opt::build_table_options, Agate, - AgateOptions, Table, TableBuilder, Value, + AgateOptions, Table, TableBuilder, Value, IteratorOptions, }; const BUILD_TABLE_VERSION: u64 = 10; @@ -454,4 +454,105 @@ mod tests { } }) } + + #[test] + fn conflict_check() { + run_agate_test(None, |db| { + assert!(db.update(|txn| { + for i in 0..500 { + txn.set(build_key(i).freeze(), build_value(i).freeze())?; + } + Ok(()) + }).is_ok()); + + // use update mode to trigger conflict check + let mut txn1 = db.new_transaction(true); + let mut txn2 = db.new_transaction(true); + let mut txn3 = db.new_transaction(true); + let mut txn4 = db.new_transaction(true); + assert!(txn1.set(build_key(501).freeze(), build_value(501).freeze()).is_ok()); + assert!(txn2.set(build_key(502).freeze(), build_value(502).freeze()).is_ok()); + assert!(txn3.set(build_key(503).freeze(), build_value(503).freeze()).is_ok()); + assert!(txn4.set(build_key(504).freeze(), build_value(504).freeze()).is_ok()); + + + + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + let file1 = external_dir.join("1.sst"); + let res = build_table(&file1, &db.core.opts, |builder| { + for i in 200..300 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + assert!(res.is_ok()); + let res = ingest(&db, &[&file1.to_string_lossy()], true, 0); + assert!(res.is_ok()); + + { + // [0, 200], should conflict + let mut iter = txn1.new_iterator(&IteratorOptions::default()); + iter.rewind(); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(200) { + break; + } + iter.next(); + } + } + + { + // [300, 400], not conflict + let mut iter = txn2.new_iterator(&IteratorOptions::default()); + iter.seek(&build_key(300).freeze()); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(400) { + break; + } + iter.next(); + } + } + + { + // [250, 350], should conflict + let mut iter = txn3.new_iterator(&IteratorOptions::default()); + iter.seek(&build_key(250).freeze()); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(350) { + break; + } + iter.next(); + } + } + + { + // [50, 60], not conflict + let mut iter = txn4.new_iterator(&IteratorOptions::default()); + iter.seek(&build_key(50).freeze()); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(60) { + break; + } + iter.next(); + } + } + + assert!(txn1.commit().is_err()); + assert!(txn2.commit().is_ok()); + assert!(txn3.commit().is_err()); + assert!(txn4.commit().is_ok()); + }) + } } diff --git a/src/ops/transaction.rs b/src/ops/transaction.rs index 3ad699fb..0c57b47a 100644 --- a/src/ops/transaction.rs +++ b/src/ops/transaction.rs @@ -263,10 +263,11 @@ impl Transaction { if self.update { let mut reads = self.reads.lock().unwrap(); reads.fingerprints.push(default_hash(key)); - if matches!(reads.smallest.deref().cmp(key), cmp::Ordering::Greater) { + if reads.smallest.is_empty() || matches!(reads.smallest.deref().cmp(key), cmp::Ordering::Greater) { reads.smallest.clear(); reads.smallest.extend_from_slice(key); - } else if matches!(reads.biggest.deref().cmp(key), cmp::Ordering::Less) { + } + if reads.biggest.is_empty() || matches!(reads.biggest.deref().cmp(key), cmp::Ordering::Less) { reads.biggest.clear(); reads.biggest.extend_from_slice(key); } From 15433a1233c445b4ff31745fbfc4b609ef12ac06 Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Wed, 24 Aug 2022 12:57:07 +0800 Subject: [PATCH 04/11] style: use unwrap,unwrap_err Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 81 ++++++++++++++++++++++++--------------------------- 1 file changed, 38 insertions(+), 43 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index 817084b4..24460c3e 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -284,7 +284,7 @@ mod tests { use super::IngestExternalFileOptions; use crate::{ db::tests::run_agate_test, error::Result, key_with_ts, opt::build_table_options, Agate, - AgateOptions, Table, TableBuilder, Value, IteratorOptions, + AgateOptions, IteratorOptions, Table, TableBuilder, Value, }; const BUILD_TABLE_VERSION: u64 = 10; @@ -329,7 +329,7 @@ mod tests { let file1 = external_dir.join("1.sst"); - let res = build_table(&file1, &db.core.opts, |builder| { + build_table(&file1, &db.core.opts, |builder| { for i in 0..100 { builder.add( &key_with_ts(build_key(i), BUILD_TABLE_VERSION), @@ -337,12 +337,12 @@ mod tests { 0, ); } - }); - assert!(res.is_ok()); + }) + .unwrap(); let file2 = external_dir.join("2.sst"); - let res = build_table(&file2, &db.core.opts, |builder| { + build_table(&file2, &db.core.opts, |builder| { for i in 100..200 { builder.add( &key_with_ts(build_key(i), BUILD_TABLE_VERSION), @@ -350,13 +350,11 @@ mod tests { 0, ); } - }); - assert!(res.is_ok()); + }) + .unwrap(); - let res = ingest(&db, &[&file1.to_string_lossy()], false, 0); - assert!(res.is_ok()); - let res = ingest(&db, &[&file2.to_string_lossy()], true, 0); - assert!(res.is_ok()); + ingest(&db, &[&file1.to_string_lossy()], false, 0).unwrap(); + ingest(&db, &[&file2.to_string_lossy()], true, 0).unwrap(); assert!(Path::exists(&file1)); assert!(!Path::exists(&file2)); @@ -384,8 +382,7 @@ mod tests { let mut data = HashMap::new(); let file1 = external_dir.join("1.sst"); - - let res = build_table(&file1, &db.core.opts, |builder| { + build_table(&file1, &db.core.opts, |builder| { for i in 0..1000 { let k = build_key(i); let v = build_value(i); @@ -396,25 +393,23 @@ mod tests { ); data.insert(k, v); } - }); - assert!(res.is_ok()); - let res = ingest(&db, &[&file1.to_string_lossy()], true, 1); - assert!(res.is_ok()); + }) + .unwrap(); + ingest(&db, &[&file1.to_string_lossy()], true, 1).unwrap(); { let mut txn = db.new_transaction_at(1, true); for i in 500..1500 { let k = build_key(i); let v = BytesMut::from(&b"in memtable"[..]); - assert!(txn.set(k.clone().freeze(), v.clone().freeze()).is_ok()); + txn.set(k.clone().freeze(), v.clone().freeze()).unwrap(); data.insert(k, v); } - assert!(txn.commit_at(2).is_ok()); + txn.commit_at(2).unwrap(); } let file2 = external_dir.join("2.sst"); - - let res = build_table(&file2, &db.core.opts, |builder| { + build_table(&file2, &db.core.opts, |builder| { for i in 1000..2000 { let k = build_key(i); let v = build_value(i); @@ -425,11 +420,9 @@ mod tests { ); data.insert(k, v); } - }); - assert!(res.is_ok()); - - let res = ingest(&db, &[&file2.to_string_lossy()], true, 3); - assert!(res.is_ok()); + }) + .unwrap(); + ingest(&db, &[&file2.to_string_lossy()], true, 3).unwrap(); assert!(!Path::exists(&file1)); assert!(!Path::exists(&file2)); @@ -458,29 +451,32 @@ mod tests { #[test] fn conflict_check() { run_agate_test(None, |db| { - assert!(db.update(|txn| { + db.update(|txn| { for i in 0..500 { txn.set(build_key(i).freeze(), build_value(i).freeze())?; } Ok(()) - }).is_ok()); + }) + .unwrap(); // use update mode to trigger conflict check let mut txn1 = db.new_transaction(true); let mut txn2 = db.new_transaction(true); let mut txn3 = db.new_transaction(true); let mut txn4 = db.new_transaction(true); - assert!(txn1.set(build_key(501).freeze(), build_value(501).freeze()).is_ok()); - assert!(txn2.set(build_key(502).freeze(), build_value(502).freeze()).is_ok()); - assert!(txn3.set(build_key(503).freeze(), build_value(503).freeze()).is_ok()); - assert!(txn4.set(build_key(504).freeze(), build_value(504).freeze()).is_ok()); - - + txn1.set(build_key(501).freeze(), build_value(501).freeze()) + .unwrap(); + txn2.set(build_key(502).freeze(), build_value(502).freeze()) + .unwrap(); + txn3.set(build_key(503).freeze(), build_value(503).freeze()) + .unwrap(); + txn4.set(build_key(504).freeze(), build_value(504).freeze()) + .unwrap(); let external_dir = db.core.opts.dir.join("external_files"); create_external_files_dir(&external_dir); let file1 = external_dir.join("1.sst"); - let res = build_table(&file1, &db.core.opts, |builder| { + build_table(&file1, &db.core.opts, |builder| { for i in 200..300 { builder.add( &key_with_ts(build_key(i), BUILD_TABLE_VERSION), @@ -488,10 +484,9 @@ mod tests { 0, ); } - }); - assert!(res.is_ok()); - let res = ingest(&db, &[&file1.to_string_lossy()], true, 0); - assert!(res.is_ok()); + }) + .unwrap(); + ingest(&db, &[&file1.to_string_lossy()], true, 0).unwrap(); { // [0, 200], should conflict @@ -549,10 +544,10 @@ mod tests { } } - assert!(txn1.commit().is_err()); - assert!(txn2.commit().is_ok()); - assert!(txn3.commit().is_err()); - assert!(txn4.commit().is_ok()); + txn1.commit().unwrap_err(); + txn2.commit().unwrap(); + txn3.commit().unwrap_err(); + txn4.commit().unwrap(); }) } } From 54c51723d7fe61027eab9ad08126e285a6d67813 Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Wed, 24 Aug 2022 14:17:14 +0800 Subject: [PATCH 05/11] test: use DBTestWrapper & add reopen test (ignore currently) Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 533 +++++++++++++++++++++++++++++++------------------- 1 file changed, 336 insertions(+), 197 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index 24460c3e..4c6e7d9f 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -277,14 +277,20 @@ impl IngestExternalFileTask { #[cfg(test)] mod tests { - use std::{collections::HashMap, fs, path::Path}; + use std::{ + collections::HashMap, + fs, + ops::{Deref, DerefMut}, + path::Path, + }; use bytes::BytesMut; + use tempdir::TempDir; use super::IngestExternalFileOptions; use crate::{ - db::tests::run_agate_test, error::Result, key_with_ts, opt::build_table_options, Agate, - AgateOptions, IteratorOptions, Table, TableBuilder, Value, + key_with_ts, opt::build_table_options, Agate, AgateOptions, IteratorOptions, Table, + TableBuilder, Value, }; const BUILD_TABLE_VERSION: u64 = 10; @@ -301,12 +307,12 @@ mod tests { path: P, opts: &AgateOptions, f: impl FnOnce(&mut TableBuilder), - ) -> Result<()> { + ) { let mut builder = TableBuilder::new(build_table_options(opts)); f(&mut builder); - let table = Table::create(path.as_ref(), builder.finish(), build_table_options(opts))?; + let table = + Table::create(path.as_ref(), builder.finish(), build_table_options(opts)).unwrap(); table.mark_save(); - Ok(()) } fn create_external_files_dir>(path: P) { @@ -314,240 +320,373 @@ mod tests { assert!(fs::create_dir_all(&path).is_ok()); } - fn ingest(db: &Agate, files: &[&str], move_files: bool, commit_ts: u64) -> Result<()> { + fn ingest(db: &Agate, files: &[&str], move_files: bool, commit_ts: u64) { let mut opts = IngestExternalFileOptions::default(); opts.move_files = move_files; opts.commit_ts = commit_ts; - db.ingest_external_files(files, &opts) + db.ingest_external_files(files, &opts).unwrap(); + } + + struct DBTestWrapper { + db: Agate, + opts: AgateOptions, + tmp_dir: TempDir, + } + + impl DBTestWrapper { + fn new(opts: Option) -> Self { + let tmp_dir = TempDir::new("agatedb").unwrap(); + let mut opts = opts.unwrap_or(AgateOptions::default()); + if !opts.in_memory { + opts.dir = tmp_dir.path().to_path_buf(); + opts.value_dir = tmp_dir.path().to_path_buf(); + } + DBTestWrapper { + tmp_dir, + db: opts.open().unwrap(), + opts, + } + } + + fn reopen(self) -> Self { + let Self { + db, + mut opts, + tmp_dir, + } = self; + drop(db); + Self { + db: opts.open().unwrap(), + opts, + tmp_dir, + } + } + } + + impl Deref for DBTestWrapper { + type Target = Agate; + + fn deref(&self) -> &Self::Target { + &self.db + } + } + + impl DerefMut for DBTestWrapper { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.db + } } #[test] fn basic() { - run_agate_test(None, |db| { - let external_dir = db.core.opts.dir.join("external_files"); - create_external_files_dir(&external_dir); - - let file1 = external_dir.join("1.sst"); - - build_table(&file1, &db.core.opts, |builder| { - for i in 0..100 { - builder.add( - &key_with_ts(build_key(i), BUILD_TABLE_VERSION), - &Value::new(build_value(i).freeze()), - 0, - ); - } - }) - .unwrap(); - - let file2 = external_dir.join("2.sst"); + let db = DBTestWrapper::new(None); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + for i in 0..100 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); - build_table(&file2, &db.core.opts, |builder| { - for i in 100..200 { - builder.add( - &key_with_ts(build_key(i), BUILD_TABLE_VERSION), - &Value::new(build_value(i).freeze()), - 0, - ); - } - }) - .unwrap(); + let file2 = external_dir.join("2.sst"); + build_table(&file2, &db.core.opts, |builder| { + for i in 100..200 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); - ingest(&db, &[&file1.to_string_lossy()], false, 0).unwrap(); - ingest(&db, &[&file2.to_string_lossy()], true, 0).unwrap(); + ingest(&db, &[&file1.to_string_lossy()], false, 0); + ingest(&db, &[&file2.to_string_lossy()], true, 0); - assert!(Path::exists(&file1)); - assert!(!Path::exists(&file2)); + assert!(Path::exists(&file1)); + assert!(!Path::exists(&file2)); - db.view(|txn| { - for i in 0..200 { - let item = txn.get(&build_key(i).freeze()).unwrap(); - assert_eq!(item.value(), build_value(i)); - } - Ok(()) - }) - .unwrap(); + db.view(|txn| { + for i in 0..200 { + let item = txn.get(&build_key(i).freeze()).unwrap(); + assert_eq!(item.value(), build_value(i)); + } + Ok(()) }) + .unwrap(); } #[test] fn overlap() { let mut db_opts = AgateOptions::default(); db_opts.managed_txns = true; - - run_agate_test(Some(db_opts), |db| { - let external_dir = db.core.opts.dir.join("external_files"); - create_external_files_dir(&external_dir); - - let mut data = HashMap::new(); - - let file1 = external_dir.join("1.sst"); - build_table(&file1, &db.core.opts, |builder| { - for i in 0..1000 { - let k = build_key(i); - let v = build_value(i); - builder.add( - &key_with_ts(k.clone(), BUILD_TABLE_VERSION), - &Value::new(v.clone().freeze()), - 0, - ); - data.insert(k, v); - } - }) - .unwrap(); - ingest(&db, &[&file1.to_string_lossy()], true, 1).unwrap(); - - { - let mut txn = db.new_transaction_at(1, true); - for i in 500..1500 { - let k = build_key(i); - let v = BytesMut::from(&b"in memtable"[..]); - txn.set(k.clone().freeze(), v.clone().freeze()).unwrap(); - data.insert(k, v); - } - txn.commit_at(2).unwrap(); + let db = DBTestWrapper::new(Some(db_opts)); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + + let mut data = HashMap::new(); + + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + for i in 0..1000 { + let k = build_key(i); + let v = build_value(i); + builder.add( + &key_with_ts(k.clone(), BUILD_TABLE_VERSION), + &Value::new(v.clone().freeze()), + 0, + ); + data.insert(k, v); } + }); + ingest(&db, &[&file1.to_string_lossy()], true, 1); + + { + let mut txn = db.new_transaction_at(1, true); + for i in 500..1500 { + let k = build_key(i); + let v = BytesMut::from(&b"in memtable"[..]); + txn.set(k.clone().freeze(), v.clone().freeze()).unwrap(); + data.insert(k, v); + } + txn.commit_at(2).unwrap(); + } - let file2 = external_dir.join("2.sst"); - build_table(&file2, &db.core.opts, |builder| { - for i in 1000..2000 { - let k = build_key(i); - let v = build_value(i); - builder.add( - &key_with_ts(k.clone(), BUILD_TABLE_VERSION), - &Value::new(v.clone().freeze()), - 0, - ); - data.insert(k, v); - } - }) - .unwrap(); - ingest(&db, &[&file2.to_string_lossy()], true, 3).unwrap(); - - assert!(!Path::exists(&file1)); - assert!(!Path::exists(&file2)); + let file2 = external_dir.join("2.sst"); + build_table(&file2, &db.core.opts, |builder| { + for i in 1000..2000 { + let k = build_key(i); + let v = build_value(i); + builder.add( + &key_with_ts(k.clone(), BUILD_TABLE_VERSION), + &Value::new(v.clone().freeze()), + 0, + ); + data.insert(k, v); + } + }); + ingest(&db, &[&file2.to_string_lossy()], true, 3); - db.view(|txn| { - for (k, v) in data.iter() { - let item = txn.get(&k.clone().freeze()).unwrap(); - assert_eq!(item.value(), v) - } - Ok(()) - }) - .unwrap(); + assert!(!Path::exists(&file1)); + assert!(!Path::exists(&file2)); - { - let txn = db.new_transaction_at(2, false); - for i in 500..1500 { - let k = build_key(i); - let v = BytesMut::from(&b"in memtable"[..]); - let item = txn.get(&k.freeze()).unwrap(); - assert_eq!(item.value(), &v); - } + db.view(|txn| { + for (k, v) in data.iter() { + let item = txn.get(&k.clone().freeze()).unwrap(); + assert_eq!(item.value(), v) } + Ok(()) }) + .unwrap(); + + { + let txn = db.new_transaction_at(2, false); + for i in 500..1500 { + let k = build_key(i); + let v = BytesMut::from(&b"in memtable"[..]); + let item = txn.get(&k.freeze()).unwrap(); + assert_eq!(item.value(), &v); + } + } } #[test] fn conflict_check() { - run_agate_test(None, |db| { - db.update(|txn| { - for i in 0..500 { - txn.set(build_key(i).freeze(), build_value(i).freeze())?; - } - Ok(()) - }) + let db = DBTestWrapper::new(None); + db.update(|txn| { + for i in 0..500 { + txn.set(build_key(i).freeze(), build_value(i).freeze())?; + } + Ok(()) + }) + .unwrap(); + + // use update mode to trigger conflict check + let mut txn1 = db.new_transaction(true); + let mut txn2 = db.new_transaction(true); + let mut txn3 = db.new_transaction(true); + let mut txn4 = db.new_transaction(true); + txn1.set(build_key(501).freeze(), build_value(501).freeze()) .unwrap(); - - // use update mode to trigger conflict check - let mut txn1 = db.new_transaction(true); - let mut txn2 = db.new_transaction(true); - let mut txn3 = db.new_transaction(true); - let mut txn4 = db.new_transaction(true); - txn1.set(build_key(501).freeze(), build_value(501).freeze()) - .unwrap(); - txn2.set(build_key(502).freeze(), build_value(502).freeze()) - .unwrap(); - txn3.set(build_key(503).freeze(), build_value(503).freeze()) - .unwrap(); - txn4.set(build_key(504).freeze(), build_value(504).freeze()) - .unwrap(); - - let external_dir = db.core.opts.dir.join("external_files"); - create_external_files_dir(&external_dir); - let file1 = external_dir.join("1.sst"); - build_table(&file1, &db.core.opts, |builder| { - for i in 200..300 { - builder.add( - &key_with_ts(build_key(i), BUILD_TABLE_VERSION), - &Value::new(build_value(i).freeze()), - 0, - ); - } - }) + txn2.set(build_key(502).freeze(), build_value(502).freeze()) + .unwrap(); + txn3.set(build_key(503).freeze(), build_value(503).freeze()) + .unwrap(); + txn4.set(build_key(504).freeze(), build_value(504).freeze()) .unwrap(); - ingest(&db, &[&file1.to_string_lossy()], true, 0).unwrap(); - { - // [0, 200], should conflict - let mut iter = txn1.new_iterator(&IteratorOptions::default()); - iter.rewind(); - loop { - assert!(iter.valid()); - let item = iter.item(); - if &item.key == &build_key(200) { - break; - } - iter.next(); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + for i in 200..300 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + ingest(&db, &[&file1.to_string_lossy()], true, 0); + + { + // [0, 200], should conflict + let mut iter = txn1.new_iterator(&IteratorOptions::default()); + iter.rewind(); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(200) { + break; } + iter.next(); } + } - { - // [300, 400], not conflict - let mut iter = txn2.new_iterator(&IteratorOptions::default()); - iter.seek(&build_key(300).freeze()); - loop { - assert!(iter.valid()); - let item = iter.item(); - if &item.key == &build_key(400) { - break; - } - iter.next(); + { + // [300, 400], not conflict + let mut iter = txn2.new_iterator(&IteratorOptions::default()); + iter.seek(&build_key(300).freeze()); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(400) { + break; } + iter.next(); } + } - { - // [250, 350], should conflict - let mut iter = txn3.new_iterator(&IteratorOptions::default()); - iter.seek(&build_key(250).freeze()); - loop { - assert!(iter.valid()); - let item = iter.item(); - if &item.key == &build_key(350) { - break; - } - iter.next(); + { + // [250, 350], should conflict + let mut iter = txn3.new_iterator(&IteratorOptions::default()); + iter.seek(&build_key(250).freeze()); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(350) { + break; } + iter.next(); } + } - { - // [50, 60], not conflict - let mut iter = txn4.new_iterator(&IteratorOptions::default()); - iter.seek(&build_key(50).freeze()); - loop { - assert!(iter.valid()); - let item = iter.item(); - if &item.key == &build_key(60) { - break; - } - iter.next(); + { + // [50, 60], not conflict + let mut iter = txn4.new_iterator(&IteratorOptions::default()); + iter.seek(&build_key(50).freeze()); + loop { + assert!(iter.valid()); + let item = iter.item(); + if &item.key == &build_key(60) { + break; } + iter.next(); } + } + + txn1.commit().unwrap_err(); + txn2.commit().unwrap(); + txn3.commit().unwrap_err(); + txn4.commit().unwrap(); + } - txn1.commit().unwrap_err(); - txn2.commit().unwrap(); - txn3.commit().unwrap_err(); - txn4.commit().unwrap(); + #[test] + fn files_overlap() { + // if ingested files overlap with each other, all should be put in level 0 + let db = DBTestWrapper::new(None); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + // 0, 2, ... , 998 + for i in (0..1000).step_by(2) { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + let file2 = external_dir.join("2.sst"); + build_table(&file2, &db.core.opts, |builder| { + // 501, 503, ..., 1499 + for i in (501..1500).step_by(2) { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + ingest( + &db, + &[&file1.to_string_lossy(), &file2.to_string_lossy()], + true, + 0, + ); + assert_eq!( + 2, + db.core.lvctl.inner.levels[0].read().unwrap().num_tables() + ); + } + + #[ignore = "wait for `next_txn_ts` update feature"] + #[test] + fn reopen() { + let db = DBTestWrapper::new(None); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + for i in 0..500 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + let file2 = external_dir.join("2.sst"); + build_table(&file2, &db.core.opts, |builder| { + for i in 500..1000 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + ingest( + &db, + &[&file1.to_string_lossy(), &file2.to_string_lossy()], + true, + 0, + ); + + db.view(|txn| { + for i in 0..1000 { + let item = txn.get(&build_key(i).freeze()).unwrap(); + assert_eq!(item.value(), build_value(i)); + } + Ok(()) + }) + .unwrap(); + + let db = db.reopen(); + + db.view(|txn| { + for i in 0..1000 { + let item = txn.get(&build_key(i).freeze()).unwrap(); + assert_eq!(item.value(), build_value(i)); + } + Ok(()) }) + .unwrap(); } } From 60a7a399166c6a1b8d8fa4712bc2c16660ecb725 Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Wed, 24 Aug 2022 14:49:23 +0800 Subject: [PATCH 06/11] fix(ingest): alloc version for each file when overlap Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 83 ++++++++++++++++++++++++++++++++------------------- 1 file changed, 52 insertions(+), 31 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index 4c6e7d9f..7d17c8e5 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -2,10 +2,10 @@ use std::{cmp::Ordering, fs, io, ops::RangeInclusive, path::Path, sync::Arc}; use bytes::Bytes; use log::warn; -use skiplist::KeyComparator; +use skiplist::{FixedLengthSuffixComparator, KeyComparator}; use crate::{ - db::Core, error::Result, format::user_key, opt::build_table_options, table, util::COMPARATOR, + db::Core, error::Result, format::user_key, opt::build_table_options, table, ChecksumVerificationMode, Table, }; @@ -49,7 +49,7 @@ pub(crate) struct IngestExternalFileTask { opts: IngestExternalFileOptions, core: Arc, files: Vec, - version: Option, + versions: Vec, // whether files to ingest overlap with each other overlap: bool, success: bool, @@ -70,7 +70,7 @@ impl IngestExternalFileTask { picked_level: 0, }) .collect(), - version: None, + versions: vec![], overlap: false, success: false, } @@ -78,9 +78,9 @@ impl IngestExternalFileTask { pub(crate) fn run(&mut self) -> Result<()> { let res = self.run_inner(); - if let Some(version) = self.version { - self.core.orc.done_commit(version); - } + self.versions.iter().for_each(|version| { + self.core.orc.done_commit(*version); + }); self.success = res.is_ok(); self.cleanup_files(); res @@ -182,37 +182,21 @@ impl IngestExternalFileTask { } fn assign_version(&mut self) { - // collect user key range for oracle to check conflict - let ranges = self - .files - .iter() - .map(|file| { - RangeInclusive::new( - Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().smallest())), - Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().biggest())), - ) - }) - .collect::>(); - let version = self.core.orc.new_ingest_commit_ts(ranges, &self.opts); - self.version = Some(version); - self.files.iter_mut().for_each(|file| { - file.table.as_mut().unwrap().set_global_version(version); - }); - - // all tables assigned version and open, sort by range + let user_key_cmp = FixedLengthSuffixComparator::new(0); + // all tables opened, sort by range self.files.sort_unstable_by(|x, y| { - COMPARATOR.compare_key( - x.table.as_ref().unwrap().smallest(), - y.table.as_ref().unwrap().smallest(), + user_key_cmp.compare_key( + user_key(x.table.as_ref().unwrap().smallest()), + user_key(y.table.as_ref().unwrap().smallest()), ) }); // check overlap if self.files.len() > 1 { for i in 0..(self.files.len() - 1) { if matches!( - COMPARATOR.compare_key( - self.files[i].table.as_ref().unwrap().biggest(), - self.files[i + 1].table.as_ref().unwrap().smallest() + user_key_cmp.compare_key( + user_key(self.files[i].table.as_ref().unwrap().biggest()), + user_key(self.files[i + 1].table.as_ref().unwrap().smallest()), ), Ordering::Equal | Ordering::Greater ) { @@ -221,6 +205,35 @@ impl IngestExternalFileTask { } } } + if self.overlap { + // files overlap with each other, assign commit_ts for each file + for file in self.files.iter_mut() { + let range = RangeInclusive::new( + Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().smallest())), + Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().biggest())), + ); + let version = self.core.orc.new_ingest_commit_ts(vec![range], &self.opts); + self.versions.push(version); + file.table.as_mut().unwrap().set_global_version(version); + } + } else { + // collect user key range for oracle to check conflict + let ranges = self + .files + .iter() + .map(|file| { + RangeInclusive::new( + Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().smallest())), + Bytes::copy_from_slice(user_key(file.table.as_ref().unwrap().biggest())), + ) + }) + .collect::>(); + let version = self.core.orc.new_ingest_commit_ts(ranges, &self.opts); + self.versions.push(version); + self.files.iter_mut().for_each(|file| { + file.table.as_mut().unwrap().set_global_version(version); + }); + } } fn ingest_to_lsm(&mut self) -> Result<()> { @@ -634,6 +647,14 @@ mod tests { 2, db.core.lvctl.inner.levels[0].read().unwrap().num_tables() ); + let versions = db.core.lvctl.inner.levels[0] + .read() + .unwrap() + .tables + .iter() + .map(|t| t.global_version().unwrap()) + .collect::>(); + assert_ne!(versions[0], versions[1]); } #[ignore = "wait for `next_txn_ts` update feature"] From ad407bb1d0d3af06f03433a8da2b41bed2fff60b Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Tue, 30 Aug 2022 15:50:18 +0800 Subject: [PATCH 07/11] test(ingest): add some tests Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 116 +++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 11 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index 7d17c8e5..5de75532 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -293,6 +293,7 @@ mod tests { use std::{ collections::HashMap, fs, + io::Write, ops::{Deref, DerefMut}, path::Path, }; @@ -302,8 +303,8 @@ mod tests { use super::IngestExternalFileOptions; use crate::{ - key_with_ts, opt::build_table_options, Agate, AgateOptions, IteratorOptions, Table, - TableBuilder, Value, + error::Result, key_with_ts, opt::build_table_options, value::VALUE_DELETE, Agate, + AgateOptions, Error, IteratorOptions, Table, TableBuilder, Value, }; const BUILD_TABLE_VERSION: u64 = 10; @@ -333,11 +334,11 @@ mod tests { assert!(fs::create_dir_all(&path).is_ok()); } - fn ingest(db: &Agate, files: &[&str], move_files: bool, commit_ts: u64) { + fn ingest(db: &Agate, files: &[&str], move_files: bool, commit_ts: u64) -> Result<()> { let mut opts = IngestExternalFileOptions::default(); opts.move_files = move_files; opts.commit_ts = commit_ts; - db.ingest_external_files(files, &opts).unwrap(); + db.ingest_external_files(files, &opts) } struct DBTestWrapper { @@ -418,8 +419,8 @@ mod tests { } }); - ingest(&db, &[&file1.to_string_lossy()], false, 0); - ingest(&db, &[&file2.to_string_lossy()], true, 0); + ingest(&db, &[&file1.to_string_lossy()], false, 0).unwrap(); + ingest(&db, &[&file2.to_string_lossy()], true, 0).unwrap(); assert!(Path::exists(&file1)); assert!(!Path::exists(&file2)); @@ -457,7 +458,7 @@ mod tests { data.insert(k, v); } }); - ingest(&db, &[&file1.to_string_lossy()], true, 1); + ingest(&db, &[&file1.to_string_lossy()], true, 1).unwrap(); { let mut txn = db.new_transaction_at(1, true); @@ -483,7 +484,7 @@ mod tests { data.insert(k, v); } }); - ingest(&db, &[&file2.to_string_lossy()], true, 3); + ingest(&db, &[&file2.to_string_lossy()], true, 3).unwrap(); assert!(!Path::exists(&file1)); assert!(!Path::exists(&file2)); @@ -545,7 +546,7 @@ mod tests { ); } }); - ingest(&db, &[&file1.to_string_lossy()], true, 0); + ingest(&db, &[&file1.to_string_lossy()], true, 0).unwrap(); { // [0, 200], should conflict @@ -642,7 +643,8 @@ mod tests { &[&file1.to_string_lossy(), &file2.to_string_lossy()], true, 0, - ); + ) + .unwrap(); assert_eq!( 2, db.core.lvctl.inner.levels[0].read().unwrap().num_tables() @@ -688,7 +690,8 @@ mod tests { &[&file1.to_string_lossy(), &file2.to_string_lossy()], true, 0, - ); + ) + .unwrap(); db.view(|txn| { for i in 0..1000 { @@ -710,4 +713,95 @@ mod tests { }) .unwrap(); } + + #[test] + fn files_not_exist() { + let db = DBTestWrapper::new(None); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + for i in 0..500 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + let file2 = external_dir.join("2.sst"); + ingest( + &db, + &[&file1.to_string_lossy(), &file2.to_string_lossy()], + true, + 0, + ) + .unwrap_err(); + } + + #[test] + fn bad_checksum() { + let db = DBTestWrapper::new(None); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + for i in 0..500 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + { + let mut f = fs::File::options().write(true).open(&file1).unwrap(); + write!(&mut f, "junk data").unwrap(); + f.sync_all().unwrap(); + } + ingest(&db, &[&file1.to_string_lossy()], true, 0).unwrap_err(); + } + + #[test] + fn put_then_delete() { + let db = DBTestWrapper::new(None); + let external_dir = db.core.opts.dir.join("external_files"); + create_external_files_dir(&external_dir); + let file1 = external_dir.join("1.sst"); + build_table(&file1, &db.core.opts, |builder| { + for i in 0..500 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new(build_value(i).freeze()), + 0, + ); + } + }); + ingest(&db, &[&file1.to_string_lossy()], true, 0).unwrap(); + let file2 = external_dir.join("2.sst"); + build_table(&file2, &db.core.opts, |builder| { + for i in 400..500 { + builder.add( + &key_with_ts(build_key(i), BUILD_TABLE_VERSION), + &Value::new_with_meta(build_value(i).freeze(), VALUE_DELETE, 0), + 0, + ); + } + }); + ingest(&db, &[&file2.to_string_lossy()], true, 0).unwrap(); + db.view(|txn| { + for i in 0..400 { + let item = txn.get(&build_key(i).freeze()).unwrap(); + assert_eq!(item.value(), build_value(i)); + } + for i in 400..500 { + assert!(matches!( + txn.get(&build_key(i).freeze()).err().unwrap(), + Error::KeyNotFound(_) + )); + } + Ok(()) + }) + .unwrap(); + } } From ea205adc9d06856b0282e4e60da31187ba6a730c Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Thu, 1 Sep 2022 15:13:31 +0800 Subject: [PATCH 08/11] refactor: fit max_version Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 1 - src/table.rs | 3 ++- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ingest.rs b/src/ingest.rs index 5de75532..5a312e05 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -659,7 +659,6 @@ mod tests { assert_ne!(versions[0], versions[1]); } - #[ignore = "wait for `next_txn_ts` update feature"] #[test] fn reopen() { let db = DBTestWrapper::new(None); diff --git a/src/table.rs b/src/table.rs index 74a9cfa0..2aad35b8 100644 --- a/src/table.rs +++ b/src/table.rs @@ -487,7 +487,8 @@ impl TableInner { } fn max_version(&self) -> u64 { - self.fetch_index().max_version + self.global_version() + .unwrap_or(self.fetch_index().max_version) } fn drop_no_fail(&mut self) -> Result<()> { From 11161387d17c70bbe595afab67553ea1c9eef28e Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Thu, 1 Sep 2022 15:29:31 +0800 Subject: [PATCH 09/11] style: run make format Signed-off-by: wangnengjie <751614701@qq.com> --- src/batch.rs | 10 +++++----- src/ops/transaction.rs | 8 ++++++-- src/table/iterator.rs | 3 ++- 3 files changed, 13 insertions(+), 8 deletions(-) diff --git a/src/batch.rs b/src/batch.rs index e5b974c7..36a1c49c 100644 --- a/src/batch.rs +++ b/src/batch.rs @@ -1,9 +1,8 @@ -use crate::Agate; -use crate::{entry::Entry, ops::transaction::Transaction}; -use crate::{Error, Result}; +use std::sync::Arc; use bytes::Bytes; -use std::sync::Arc; + +use crate::{entry::Entry, ops::transaction::Transaction, Agate, Error, Result}; /// WriteBatch helps write multiple entries to database pub struct WriteBatch { @@ -127,11 +126,12 @@ impl WriteBatch { #[cfg(test)] mod tests { + use bytes::Bytes; + use crate::{ db::tests::{generate_test_agate_options, run_agate_test}, AgateOptions, }; - use bytes::Bytes; fn test_with_options(opts: AgateOptions) { let key = |i| Bytes::from(format!("{:10}", i)); diff --git a/src/ops/transaction.rs b/src/ops/transaction.rs index 0c57b47a..51e693a4 100644 --- a/src/ops/transaction.rs +++ b/src/ops/transaction.rs @@ -263,11 +263,15 @@ impl Transaction { if self.update { let mut reads = self.reads.lock().unwrap(); reads.fingerprints.push(default_hash(key)); - if reads.smallest.is_empty() || matches!(reads.smallest.deref().cmp(key), cmp::Ordering::Greater) { + if reads.smallest.is_empty() + || matches!(reads.smallest.deref().cmp(key), cmp::Ordering::Greater) + { reads.smallest.clear(); reads.smallest.extend_from_slice(key); } - if reads.biggest.is_empty() || matches!(reads.biggest.deref().cmp(key), cmp::Ordering::Less) { + if reads.biggest.is_empty() + || matches!(reads.biggest.deref().cmp(key), cmp::Ordering::Less) + { reads.biggest.clear(); reads.biggest.extend_from_slice(key); } diff --git a/src/table/iterator.rs b/src/table/iterator.rs index 63006d6e..948df9ae 100644 --- a/src/table/iterator.rs +++ b/src/table/iterator.rs @@ -7,10 +7,11 @@ use super::{ Block, TableInner, }; use crate::{ + format::append_ts, iterator_trait::AgateIterator, util::{self, KeyComparator, COMPARATOR}, value::Value, - Error, format::append_ts, + Error, }; /// Errors that may encounter during iterator operation From a3e52647c1e4a4723b00690047e0ef41c55502f4 Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Wed, 7 Sep 2022 08:47:00 +0800 Subject: [PATCH 10/11] docs: add some comments Signed-off-by: wangnengjie <751614701@qq.com> --- src/ingest.rs | 21 +++++++++++++++++++++ src/ops/oracle.rs | 3 +++ 2 files changed, 24 insertions(+) diff --git a/src/ingest.rs b/src/ingest.rs index 5a312e05..5a586998 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -11,16 +11,37 @@ use crate::{ #[derive(Debug, Clone, Copy)] pub enum PickLevelStrategy { + /// Pick from base level to level 0. Base is calculated by [`LevelsControllerInner.target_level()`]. BaseLevel, + /// Pick from bottom level to level 0 BottomLevel, } #[derive(Debug, Clone, Copy)] pub struct IngestExternalFileOptions { + /// When create or open db in managed mode, this field is used as version of + /// all ingested files. If files are overlap with each other, it's better to + /// split into several tasks. Otherwise the result is undefined since there might + /// be same key with same version. pub commit_ts: u64, + /// Can be set to true to move the files instead of copying them. + /// + /// Default: false pub move_files: bool, + /// If set to true, ingestion falls back to copy when move fails. + /// + /// Default: true pub failed_move_fall_back_to_copy: bool, + /// If set to true, verify checksum of ingested files before ingest to LSM. + /// Note that if [`ChecksumVerificationMode`] is set to [`OnTableRead`][ChecksumVerificationMode::OnTableRead] + /// or [`OnTableAndBlockRead`][ChecksumVerificationMode::OnTableAndBlockRead], + /// checksum will be checked even if verify_checksum is set to false. + /// + /// Default: true pub verify_checksum: bool, + /// Strategy to pick which level to ingest to. + /// + /// Default: [`BaseLevel`](PickLevelStrategy::BaseLevel) pub pick_level_strategy: PickLevelStrategy, } diff --git a/src/ops/oracle.rs b/src/ops/oracle.rs index 5db6c7f0..23c2e7fe 100644 --- a/src/ops/oracle.rs +++ b/src/ops/oracle.rs @@ -232,6 +232,9 @@ impl Oracle { } } + /// Oracle's conflict check is only for read keys of update transaction. + /// Since IngestTask doesn't have read keys, we don't need to check conflict. + /// Just mark commit info. pub(crate) fn new_ingest_commit_ts( &self, ingest_ranges: Vec>, From f241a88bd567997b5c199dd8a1aff9c8985428ec Mon Sep 17 00:00:00 2001 From: wangnengjie <751614701@qq.com> Date: Wed, 7 Sep 2022 08:47:38 +0800 Subject: [PATCH 11/11] style: format & clippy Signed-off-by: wangnengjie <751614701@qq.com> --- src/db.rs | 2 +- src/ingest.rs | 24 ++++++++++++++---------- src/ops/transaction.rs | 6 +++++- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/src/db.rs b/src/db.rs index ac8aa0b0..8161f220 100644 --- a/src/db.rs +++ b/src/db.rs @@ -670,7 +670,7 @@ impl Core { files: &[&str], opts: &IngestExternalFileOptions, ) -> Result<()> { - let mut task = IngestExternalFileTask::new(self.clone(), files, opts.clone()); + let mut task = IngestExternalFileTask::new(self.clone(), files, *opts); task.run() } } diff --git a/src/ingest.rs b/src/ingest.rs index 5a586998..5a2c979e 100644 --- a/src/ingest.rs +++ b/src/ingest.rs @@ -356,9 +356,11 @@ mod tests { } fn ingest(db: &Agate, files: &[&str], move_files: bool, commit_ts: u64) -> Result<()> { - let mut opts = IngestExternalFileOptions::default(); - opts.move_files = move_files; - opts.commit_ts = commit_ts; + let opts = IngestExternalFileOptions { + move_files, + commit_ts, + ..Default::default() + }; db.ingest_external_files(files, &opts) } @@ -371,7 +373,7 @@ mod tests { impl DBTestWrapper { fn new(opts: Option) -> Self { let tmp_dir = TempDir::new("agatedb").unwrap(); - let mut opts = opts.unwrap_or(AgateOptions::default()); + let mut opts = opts.unwrap_or_default(); if !opts.in_memory { opts.dir = tmp_dir.path().to_path_buf(); opts.value_dir = tmp_dir.path().to_path_buf(); @@ -458,8 +460,10 @@ mod tests { #[test] fn overlap() { - let mut db_opts = AgateOptions::default(); - db_opts.managed_txns = true; + let db_opts = AgateOptions { + managed_txns: true, + ..Default::default() + }; let db = DBTestWrapper::new(Some(db_opts)); let external_dir = db.core.opts.dir.join("external_files"); create_external_files_dir(&external_dir); @@ -576,7 +580,7 @@ mod tests { loop { assert!(iter.valid()); let item = iter.item(); - if &item.key == &build_key(200) { + if item.key == build_key(200) { break; } iter.next(); @@ -590,7 +594,7 @@ mod tests { loop { assert!(iter.valid()); let item = iter.item(); - if &item.key == &build_key(400) { + if item.key == build_key(400) { break; } iter.next(); @@ -604,7 +608,7 @@ mod tests { loop { assert!(iter.valid()); let item = iter.item(); - if &item.key == &build_key(350) { + if item.key == build_key(350) { break; } iter.next(); @@ -618,7 +622,7 @@ mod tests { loop { assert!(iter.valid()); let item = iter.item(); - if &item.key == &build_key(60) { + if item.key == build_key(60) { break; } iter.next(); diff --git a/src/ops/transaction.rs b/src/ops/transaction.rs index 51e693a4..559a140a 100644 --- a/src/ops/transaction.rs +++ b/src/ops/transaction.rs @@ -96,7 +96,11 @@ impl Transaction { let mut entries: Vec<_> = self.pending_writes.values().cloned().collect(); entries.sort_by(|x, y| { let cmp = COMPARATOR.compare_key(&x.key, &y.key); - if reversed { cmp.reverse() } else { cmp } + if reversed { + cmp.reverse() + } else { + cmp + } }); Some(PendingWritesIterator::new(self.read_ts, reversed, entries))