diff --git a/Cargo.toml b/Cargo.toml index 3fec08a6..89e7d1ca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,6 +8,7 @@ edition = "2018" bytes = "1.0" coarsetime = "0.1.22" crc = "1.8" +crc32fast = "1.2" crossbeam-channel = "0.5" enum_dispatch = "0.3" farmhash = "1.1" @@ -33,8 +34,9 @@ tikv-jemallocator = "0.4.0" [workspace] members = [ + "agate_bench", "proto", - "skiplist", + "skiplist" ] [[bench]] @@ -45,6 +47,10 @@ harness = false name = "bench_table" harness = false +[[bench]] +name = "bench_iterator" +harness = false + [profile.bench] opt-level = 3 debug = false diff --git a/README.md b/README.md index b1d03803..f6e833b9 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ optimizations that have been made in [unistore][4]. [1]: https://github.com/tikv/tikv [2]: https://github.com/tikv/agatedb/projects/1 -[3]: https://github.com/dgraph-io/badger/tree/45bca18f24ef5cc04701a1e17448ddfce9372da0 +[3]: https://github.com/outcaste-io/badger/tree/45bca18f24ef5cc04701a1e17448ddfce9372da0 [4]: https://github.com/ngaut/unistore AgateDB is under active development on [develop](https://github.com/tikv/agatedb/tree/develop) diff --git a/agate_bench/Cargo.toml b/agate_bench/Cargo.toml new file mode 100644 index 00000000..4b3faea8 --- /dev/null +++ b/agate_bench/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "agate_bench" +version = "0.1.0" +authors = ["Alex Chi "] +edition = "2018" + +[features] +default = [] +enable-rocksdb = ["rocksdb"] + +[dependencies] +agatedb = { path = "../" } +bytes = "1.0" +clap = "2.33" +indicatif = "0.15" +rand = "0.7" +rocksdb = { version = "0.15", optional = true } +threadpool = "1.8" +yatp = { git = "https://github.com/tikv/yatp.git" } + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.4.0" diff --git a/agate_bench/src/main.rs b/agate_bench/src/main.rs new file mode 100644 index 00000000..7853c576 --- /dev/null +++ b/agate_bench/src/main.rs @@ -0,0 +1,515 @@ +use agatedb::{AgateOptions, IteratorOptions}; +use bytes::{Bytes, BytesMut}; +use clap::clap_app; +use indicatif::{ProgressBar, ProgressStyle}; +use rand::Rng; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::UNIX_EPOCH; +use std::{sync::mpsc::channel, time::Duration}; + +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +fn gen_kv_pair(key: u64, value_size: usize) -> (Bytes, Bytes) { + let key = Bytes::from(format!("vsz={:05}-k={:010}", value_size, key)); + + let mut value = BytesMut::with_capacity(value_size); + value.resize(value_size, 0); + + (key, value.freeze()) +} + +pub fn unix_time() -> u64 { + UNIX_EPOCH + .elapsed() + .expect("Time went backwards") + .as_millis() as u64 +} + +pub struct Rate { + pub data: std::sync::Arc, + lst_data: u64, +} + +impl Rate { + pub fn new() -> Self { + Self { + data: std::sync::Arc::new(std::sync::atomic::AtomicU64::new(0)), + lst_data: 0, + } + } + + pub fn now(&self) -> u64 { + self.data.load(std::sync::atomic::Ordering::Relaxed) + } + + pub fn rate(&mut self) -> u64 { + let now = self.now(); + let delta = now - self.lst_data; + self.lst_data = now; + delta + } +} + +impl Default for Rate { + fn default() -> Self { + Self::new() + } +} +fn main() { + let matches = clap_app!(agate_bench => + (version: "1.0") + (author: "TiKV authors") + (about: "Benchmark for AgateDB") + (@arg directory: --directory +takes_value +required "database directory") + (@arg threads: --threads +takes_value default_value("8") "threads") + (@subcommand populate => + (about: "build a database with given keys") + (version: "1.0") + (author: "TiKV authors") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") + (@arg seq: --seq +takes_value default_value("true") "write sequentially") + (@arg value_size: --value_size +takes_value default_value("1024") "value size") + (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") + ) + (@subcommand randread => + (about: "randomly read from database") + (version: "1.0") + (author: "TiKV authors") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") + (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg value_size: --value_size +takes_value default_value("1024") "value size") + (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") + ) + (@subcommand iterate => + (about: "iterate database") + (version: "1.0") + (author: "TiKV authors") + (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg value_size: --value_size +takes_value default_value("1024") "value size") + ) + (@subcommand rocks_populate => + (about: "build a database with given keys") + (version: "1.0") + (author: "TiKV authors") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") + (@arg seq: --seq +takes_value default_value("true") "write sequentially") + (@arg value_size: --value_size +takes_value default_value("1024") "value size") + (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") + ) + (@subcommand rocks_randread => + (about: "randomly read from database") + (version: "1.0") + (author: "TiKV authors") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") + (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg value_size: --value_size +takes_value default_value("1024") "value size") + (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") + ) + (@subcommand rocks_iterate => + (about: "iterate database") + (version: "1.0") + (author: "TiKV authors") + (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg value_size: --value_size +takes_value default_value("1024") "value size") + ) + ) + .get_matches(); + + let directory = PathBuf::from(matches.value_of("directory").unwrap()); + let threads: usize = matches.value_of("threads").unwrap().parse().unwrap(); + let pool = threadpool::ThreadPool::new(threads); + let (tx, rx) = channel(); + + match matches.subcommand() { + ("populate", Some(sub_matches)) => { + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); + + let mut options = AgateOptions { + create_if_not_exists: true, + dir: directory.clone(), + value_dir: directory, + managed_txns: true, + ..Default::default() + }; + let agate = Arc::new(options.open().unwrap()); + let mut expected = 0; + let pb = ProgressBar::hidden(); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); + + let mut write = Rate::new(); + let mut last_report = std::time::Instant::now(); + + let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); + + if seq { + println!("writing sequentially"); + } + + for i in 0..key_nums / chunk_size { + let agate = agate.clone(); + let tx = tx.clone(); + let write = write.data.clone(); + pool.execute(move || { + let range = (i * chunk_size)..((i + 1) * chunk_size); + let mut txn = agate.new_transaction_at(unix_time(), true); + let mut rng = rand::thread_rng(); + for j in range { + let (key, value) = if seq { + gen_kv_pair(j, value_size) + } else { + gen_kv_pair(rng.gen_range(0, key_nums), value_size) + }; + txn.set(key, value).unwrap(); + write.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + txn.commit_at(unix_time()).unwrap(); + tx.send(()).unwrap(); + }); + expected += 1; + } + + let begin = std::time::Instant::now(); + + for _ in rx.iter().take(expected) { + pb.inc(chunk_size); + let now = std::time::Instant::now(); + let delta = now.duration_since(last_report); + if delta > std::time::Duration::from_secs(1) { + last_report = now; + println!( + "{}, rate: {}, total: {}", + now.duration_since(begin).as_secs_f64(), + write.rate() as f64 / delta.as_secs_f64(), + write.now() + ); + } + } + pb.finish_with_message("done"); + } + ("randread", Some(sub_matches)) => { + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + + let mut options = AgateOptions { + create_if_not_exists: true, + sync_writes: true, + dir: directory.clone(), + value_dir: directory, + managed_txns: true, + ..Default::default() + }; + let agate = Arc::new(options.open().unwrap()); + let mut expected = 0; + let pb = ProgressBar::new(key_nums * times); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); + + let mut missing = Rate::new(); + let mut found = Rate::new(); + let mut last_report = std::time::Instant::now(); + + for _ in 0..times { + for i in 0..key_nums / chunk_size { + let agate = agate.clone(); + let tx = tx.clone(); + let missing = missing.data.clone(); + let found = found.data.clone(); + pool.execute(move || { + let range = (i * chunk_size)..((i + 1) * chunk_size); + let txn = agate.new_transaction_at(unix_time(), false); + let mut rng = rand::thread_rng(); + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match txn.get(&key) { + Ok(item) => { + assert_eq!(item.value().len(), value_size); + found.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + Err(err) => { + if matches!(err, agatedb::Error::KeyNotFound(_)) { + missing.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + continue; + } else { + panic!("{:?}", err); + } + } + } + } + tx.send(()).unwrap(); + }); + expected += 1; + } + } + + let begin = std::time::Instant::now(); + + for _ in rx.iter().take(expected) { + let now = std::time::Instant::now(); + let delta = now.duration_since(last_report); + last_report = now; + if delta > std::time::Duration::from_secs(1) { + println!( + "{}, rate: {}, found: {}, missing: {}", + now.duration_since(begin).as_secs_f64(), + (found.rate() + missing.rate()) as f64 / delta.as_secs_f64(), + found.now(), + missing.now() + ); + } + } + pb.finish_with_message("done"); + } + ("iterate", Some(sub_matches)) => { + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + + let mut options = AgateOptions { + create_if_not_exists: true, + sync_writes: true, + dir: directory.clone(), + value_dir: directory, + managed_txns: true, + ..Default::default() + }; + let agate = Arc::new(options.open().unwrap()); + + let begin = std::time::Instant::now(); + let mut lst_report = std::time::Instant::now(); + let mut total = Rate::new(); + + for _ in 0..times { + let agate = agate.clone(); + let txn = agate.new_transaction_at(unix_time(), false); + let opts = IteratorOptions::default(); + let mut iter = txn.new_iterator(&opts); + iter.rewind(); + while iter.valid() { + let item = iter.item(); + assert_eq!(item.value().len(), value_size); + total + .data + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + iter.next(); + let now = std::time::Instant::now(); + if now.duration_since(lst_report) >= Duration::from_secs(1) { + lst_report = now; + println!( + "{}, rate: {}, total: {}", + now.duration_since(begin).as_secs_f64(), + total.rate(), + total.now() + ); + } + } + } + + println!( + "read total {} keys in {}", + total.now(), + begin.elapsed().as_secs_f64() + ) + } + #[cfg(feature = "enable-rocksdb")] + ("rocks_populate", Some(sub_matches)) => { + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.set_compression_type(rocksdb::DBCompressionType::None); + + let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); + let mut expected = 0; + let pb = ProgressBar::hidden(); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); + + let mut write = Rate::new(); + let mut last_report = std::time::Instant::now(); + + let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); + + if seq { + println!("writing sequentially"); + } + + for i in 0..key_nums / chunk_size { + let db = db.clone(); + let tx = tx.clone(); + let write = write.data.clone(); + pool.execute(move || { + let mut write_options = rocksdb::WriteOptions::default(); + write_options.set_sync(true); + write_options.disable_wal(false); + + let range = (i * chunk_size)..((i + 1) * chunk_size); + let mut batch = rocksdb::WriteBatch::default(); + let mut rng = rand::thread_rng(); + for j in range { + let (key, value) = if seq { + gen_kv_pair(j, value_size) + } else { + gen_kv_pair(rng.gen_range(0, key_nums), value_size) + }; + batch.put(key, value); + write.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + db.write_opt(batch, &write_options).unwrap(); + tx.send(()).unwrap(); + }); + expected += 1; + } + + let begin = std::time::Instant::now(); + + for _ in rx.iter().take(expected) { + pb.inc(chunk_size); + let now = std::time::Instant::now(); + let delta = now.duration_since(last_report); + if delta > std::time::Duration::from_secs(1) { + last_report = now; + println!( + "{}, rate: {}, total: {}", + now.duration_since(begin).as_secs_f64(), + write.rate() as f64 / delta.as_secs_f64(), + write.now() + ); + } + } + pb.finish_with_message("done"); + } + #[cfg(feature = "enable-rocksdb")] + ("rocks_randread", Some(sub_matches)) => { + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.set_compression_type(rocksdb::DBCompressionType::None); + + let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); + let mut expected = 0; + let pb = ProgressBar::new(key_nums * times); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); + + let mut missing = Rate::new(); + let mut found = Rate::new(); + let mut last_report = std::time::Instant::now(); + + for _ in 0..times { + for i in 0..key_nums / chunk_size { + let db = db.clone(); + let tx = tx.clone(); + let missing = missing.data.clone(); + let found = found.data.clone(); + pool.execute(move || { + let range = (i * chunk_size)..((i + 1) * chunk_size); + let mut rng = rand::thread_rng(); + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match db.get(&key) { + Ok(Some(value)) => { + assert_eq!(value.len(), value_size); + found.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + } + Ok(None) => { + missing.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + continue; + } + Err(err) => { + panic!("{:?}", err); + } + } + } + tx.send(()).unwrap(); + }); + expected += 1; + } + } + + let begin = std::time::Instant::now(); + + for _ in rx.iter().take(expected) { + let now = std::time::Instant::now(); + let delta = now.duration_since(last_report); + last_report = now; + if delta > std::time::Duration::from_secs(1) { + println!( + "{}, rate: {}, found: {}, missing: {}", + now.duration_since(begin).as_secs_f64(), + (found.rate() + missing.rate()) as f64 / delta.as_secs_f64(), + found.now(), + missing.now() + ); + } + } + pb.finish_with_message("done"); + } + #[cfg(feature = "enable-rocksdb")] + ("rocks_iterate", Some(sub_matches)) => { + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); + + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.set_compression_type(rocksdb::DBCompressionType::None); + let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); + + let begin = std::time::Instant::now(); + let mut lst_report = std::time::Instant::now(); + let mut total = Rate::new(); + + for _ in 0..times { + let iter = db.iterator(rocksdb::IteratorMode::Start); + for (_, value) in iter { + assert_eq!(value.len(), value_size); + total + .data + .fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let now = std::time::Instant::now(); + if now.duration_since(lst_report) >= Duration::from_secs(1) { + lst_report = now; + println!( + "{}, rate: {}, total: {}", + now.duration_since(begin).as_secs_f64(), + total.rate(), + total.now() + ); + } + } + } + + println!( + "read total {} keys in {}", + total.now(), + begin.elapsed().as_secs_f64() + ) + } + _ => panic!("unsupported command"), + } +} diff --git a/benches/bench_iterator.rs b/benches/bench_iterator.rs new file mode 100644 index 00000000..692c5483 --- /dev/null +++ b/benches/bench_iterator.rs @@ -0,0 +1,174 @@ +mod common; + +use agatedb::util::unix_time; +use agatedb::AgateIterator; +use agatedb::AgateOptions; +use agatedb::ConcatIterator; +use agatedb::Iterators; +use agatedb::MergeIterator; + +use bytes::Bytes; +use common::get_table_for_benchmark; +use criterion::{criterion_group, criterion_main, Criterion}; +use rand::{thread_rng, Rng}; +use tempdir::TempDir; + +fn get_test_options() -> AgateOptions { + agatedb::AgateOptions { + mem_table_size: 1 << 15, + base_table_size: 1 << 15, + base_level_size: 4 << 15, + sync_writes: false, + ..Default::default() + } +} + +fn bench_iterator(c: &mut Criterion) { + let dir = TempDir::new("agatedb").unwrap(); + let mut opt = get_test_options(); + opt.dir = dir.path().to_path_buf(); + opt.value_dir = dir.path().to_path_buf(); + opt.managed_txns = true; + let db = opt.open().unwrap(); + const N: usize = 100000; // around 80 SST + + let key = |i| Bytes::from(format!("{:06}", i)); + let val = Bytes::from("ok"); + + println!("generating tables..."); + + for chunk in (0..N).collect::>().chunks(10) { + let mut txn = db.new_transaction_at(unix_time(), true); + for i in chunk { + txn.set(key(*i), val.clone()).unwrap(); + } + txn.commit_at(unix_time()).unwrap(); + } + + std::thread::sleep(std::time::Duration::from_secs(3)); + + let lsm_files = std::fs::read_dir(dir.path()) + .unwrap() + .filter_map(|x| x.ok()) + .filter_map(|x| x.file_name().into_string().ok()) + .filter(|x| x.ends_with(".sst")) + .count(); + + println!("LSM files: {}", lsm_files); + + c.bench_function("iterate noprefix single key", |b| { + let txn = db.new_transaction_at(unix_time(), false); + let key_id = thread_rng().gen_range(0, N); + let seek_key = key(key_id); + let it_opts = agatedb::IteratorOptions { + all_versions: true, + ..Default::default() + }; + let mut it = txn.new_iterator(&it_opts); + + b.iter(|| { + it.seek(&seek_key); + let mut cnt = 0; + while it.valid_for_prefix(&seek_key) { + let item = it.item(); + assert_eq!(item.value(), val); + it.next(); + cnt += 1; + } + if cnt != 1 { + panic!("count must be one key"); + } + }); + }); + + dir.close().unwrap(); +} + +fn bench_merge_iterator(c: &mut Criterion) { + let m = 2; + let n = 5000000 / m; + + let tables = (0..m) + .map(|_| get_table_for_benchmark(n)) + .collect::>(); + let its = tables + .iter() + .map(|t| Iterators::from(t.new_iterator(0))) + .collect::>(); + let mut it = MergeIterator::from_iterators(its, false); + + c.bench_function("merge iterator read", |b| { + b.iter(|| { + it.rewind(); + while it.valid() { + it.next(); + } + }); + }); + + let mut rng = rand::thread_rng(); + c.bench_function("merge iterator random read", |b| { + b.iter_batched( + || { + let i = rng.gen_range(0, n); + ( + Bytes::from(format!("{:016x}", i)), + Bytes::from(i.to_string()), + ) + }, + |(k, v)| { + it.seek(&k); + assert!(it.valid()); + assert_eq!(it.value().value, v) + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +fn bench_concat_iterator(c: &mut Criterion) { + let m = 2; + let n = 5000000 / m; + + let tables = (0..m) + .map(|_| get_table_for_benchmark(n)) + .collect::>(); + let tables = tables.iter().map(|t| t.table.clone()).collect::>(); + let mut it = ConcatIterator::from_tables(tables, 0); + + c.bench_function("concat iterator read", |b| { + b.iter(|| { + it.rewind(); + while it.valid() { + it.next(); + } + }); + }); + + let mut rng = rand::thread_rng(); + c.bench_function("concat iterator random read", |b| { + b.iter_batched( + || { + let i = rng.gen_range(0, n); + ( + Bytes::from(format!("{:016x}", i)), + Bytes::from(i.to_string()), + ) + }, + |(k, v)| { + it.seek(&k); + assert!(it.valid()); + assert_eq!(it.value().value, v) + }, + criterion::BatchSize::SmallInput, + ); + }); +} + +criterion_group! { + name = benches_iterator; + config = Criterion::default(); + targets = bench_iterator, bench_merge_iterator, bench_concat_iterator +} + +criterion_main!(benches_iterator); diff --git a/benches/bench_table.rs b/benches/bench_table.rs index aef0a4ca..69e66be6 100644 --- a/benches/bench_table.rs +++ b/benches/bench_table.rs @@ -1,16 +1,13 @@ mod common; -use std::ops::{Deref, DerefMut}; - use agatedb::{ opt::build_table_options, AgateIterator, AgateOptions, - ChecksumVerificationMode::NoVerification, Table, TableBuilder, Value, + ChecksumVerificationMode::NoVerification, TableBuilder, Value, }; use bytes::Bytes; -use common::rand_value; +use common::{get_table_for_benchmark, rand_value}; use criterion::{criterion_group, criterion_main, Criterion}; use rand::Rng; -use tempdir::TempDir; fn bench_table_builder(c: &mut Criterion) { c.bench_function("table builder", |b| { @@ -45,62 +42,13 @@ fn bench_table_builder(c: &mut Criterion) { }); } -/// TableGuard saves Table and TempDir, so as to ensure -/// temporary directory is removed after table is closed. -/// According to Rust RFC, the drop order is first `table` then -/// `tmp_dir`. -pub struct TableGuard { - table: Table, - _tmp_dir: TempDir, -} - -impl Deref for TableGuard { - type Target = Table; - - fn deref(&self) -> &Table { - &self.table - } -} - -impl DerefMut for TableGuard { - fn deref_mut(&mut self) -> &mut Table { - &mut self.table - } -} - -fn get_table_for_benchmark(count: usize) -> TableGuard { - let tmp_dir = TempDir::new("agatedb").unwrap(); - - let agate_opts = AgateOptions { - block_size: 4 * 1024, - bloom_false_positive: 0.01, - checksum_mode: NoVerification, - ..Default::default() - }; - - let opts = build_table_options(&agate_opts); - - let mut builder = TableBuilder::new(opts.clone()); - let filename = tmp_dir.path().join("1.sst"); - - for i in 0..count { - let k = Bytes::from(format!("{:016x}", i)); - let v = Bytes::from(i.to_string()); - builder.add(&k, &Value::new(v), 0); - } - - TableGuard { - table: Table::create(&filename, builder.finish(), opts).unwrap(), - _tmp_dir: tmp_dir, - } -} - fn bench_table(c: &mut Criterion) { let n = 5000000; c.bench_function("table read", |b| { let table = get_table_for_benchmark(n); + let mut it = table.new_iterator(0); + b.iter(|| { - let mut it = table.new_iterator(0); it.seek_to_first(); while it.valid() { it.next(); @@ -119,8 +67,9 @@ fn bench_table(c: &mut Criterion) { c.bench_function("table read and build", |b| { let table = get_table_for_benchmark(n); + let mut it = table.new_iterator(0); + b.iter(|| { - let mut it = table.new_iterator(0); let mut builder = TableBuilder::new(builder_opts.clone()); it.seek_to_first(); while it.valid() { @@ -137,6 +86,7 @@ fn bench_table(c: &mut Criterion) { c.bench_function("table random read", |b| { let table = get_table_for_benchmark(n); let mut it = table.new_iterator(0); + b.iter_batched( || { let i = rng.gen_range(0, n); diff --git a/benches/common.rs b/benches/common.rs index c80b05f4..6742ad2f 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -1,9 +1,66 @@ +#![allow(dead_code)] +use agatedb::{ + opt::build_table_options, AgateOptions, ChecksumVerificationMode::NoVerification, Table, + TableBuilder, Value, +}; +use bytes::Bytes; use rand::{distributions::Alphanumeric, Rng}; +use std::ops::{Deref, DerefMut}; +use tempdir::TempDir; -#[allow(dead_code)] pub fn rand_value() -> String { rand::thread_rng() .sample_iter(&Alphanumeric) .take(32) .collect::() } + +/// TableGuard saves Table and TempDir, so as to ensure +/// temporary directory is removed after table is closed. +/// According to Rust RFC, the drop order is first `table` then +/// `tmp_dir`. +pub struct TableGuard { + pub table: Table, + _tmp_dir: TempDir, +} + +impl Deref for TableGuard { + type Target = Table; + + fn deref(&self) -> &Table { + &self.table + } +} + +impl DerefMut for TableGuard { + fn deref_mut(&mut self) -> &mut Table { + &mut self.table + } +} + +pub fn get_table_for_benchmark(count: usize) -> TableGuard { + let tmp_dir = TempDir::new("agatedb").unwrap(); + + let agate_opts = AgateOptions { + block_size: 4 * 1024, + bloom_false_positive: 0.01, + checksum_mode: NoVerification, + ..Default::default() + }; + + let opts = build_table_options(&agate_opts); + + let mut builder = TableBuilder::new(opts.clone()); + let filename = tmp_dir.path().join("1.sst"); + + for i in 0..count { + let k = Bytes::from(format!("{:016x}", i)); + let v = Bytes::from(i.to_string()); + builder.add(&k, &Value::new(v), 0); + } + + TableGuard { + table: Table::create(&filename, builder.finish(), opts).unwrap(), + _tmp_dir: tmp_dir, + } +} diff --git a/codecov.yml b/codecov.yml index dd1c4ffa..514c6ca0 100644 --- a/codecov.yml +++ b/codecov.yml @@ -12,3 +12,6 @@ coverage: default: target: auto threshold: 3% + +ignore: + - "agate_bench" # ignore bench script diff --git a/src/checksum.rs b/src/checksum.rs index fcbc3e6a..b6126b82 100644 --- a/src/checksum.rs +++ b/src/checksum.rs @@ -1,11 +1,13 @@ -use crc::crc32; -use proto::meta::{checksum::Algorithm as ChecksumAlgorithm, Checksum}; - use crate::{Error, Result}; +use proto::meta::{checksum::Algorithm as ChecksumAlgorithm, Checksum}; pub fn calculate_checksum(data: &[u8], algo: ChecksumAlgorithm) -> u64 { match algo { - ChecksumAlgorithm::Crc32c => crc32::checksum_castagnoli(data) as u64, + ChecksumAlgorithm::Crc32c => { + let mut hasher = crc32fast::Hasher::new(); + hasher.update(data); + hasher.finalize() as u64 + } ChecksumAlgorithm::XxHash64 => xxhash::checksum(data), } } diff --git a/src/lib.rs b/src/lib.rs index 4b1e148c..7f01a66e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,7 +16,7 @@ mod memtable; mod ops; pub mod opt; mod table; -mod util; +pub mod util; mod value; mod value_log; mod wal; @@ -25,8 +25,12 @@ mod watermark; pub use db::{Agate, AgateOptions}; pub use error::{Error, Result}; pub use format::{get_ts, key_with_ts}; +pub use iterator::IteratorOptions; pub use iterator_trait::AgateIterator; pub use opt::{ChecksumVerificationMode, Options as TableOptions}; pub use skiplist::Skiplist; +pub use table::merge_iterator::Iterators; +pub use table::ConcatIterator; +pub use table::MergeIterator; pub use table::{builder::Builder as TableBuilder, Table}; pub use value::Value; diff --git a/src/ops/transaction.rs b/src/ops/transaction.rs index 751333bb..2fefaa3a 100644 --- a/src/ops/transaction.rs +++ b/src/ops/transaction.rs @@ -196,7 +196,7 @@ impl Transaction { } /// Looks for key and returns corresponding Item. - pub(crate) fn get(&self, key: &Bytes) -> Result { + pub fn get(&self, key: &Bytes) -> Result { if key.is_empty() { return Err(Error::EmptyKey); } else if self.discarded { diff --git a/src/value.rs b/src/value.rs index dd866cbf..bb48d0d2 100644 --- a/src/value.rs +++ b/src/value.rs @@ -29,7 +29,7 @@ pub struct Value { impl From for Bytes { fn from(value: Value) -> Bytes { // TODO: we can reduce unnecessary copy by re-writing `encode` - let mut buf = BytesMut::new(); + let mut buf = BytesMut::with_capacity(value.encoded_size() as usize); value.encode(&mut buf); buf.freeze() }