diff --git a/.github/workflows/bench.yml b/.github/workflows/bench.yml index 2dc581b5..b365187a 100644 --- a/.github/workflows/bench.yml +++ b/.github/workflows/bench.yml @@ -1,10 +1,15 @@ +# Do not run this workflow on pull request since this workflow has permission to modify contents. on: - workflow_dispatch: - inputs: - reason: - description: 'reason to trigger this build' - required: false - + push: + branches: + - master + +permissions: + # deployments permission to deploy GitHub pages website + deployments: write + # contents permission to update benchmark contents in gh-pages branch + contents: write + name: Bench jobs: @@ -26,55 +31,15 @@ jobs: with: toolchain: nightly default: true - - uses: actions-rs/cargo@v1 - name: Benchmark 🚀 - with: - command: bench - args: --all-features --workspace - sanitizer_bench: - name: Bench with Sanitizer - runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v2 - name: Checkout 🛎️ - - uses: actions/cache@v2 - with: - path: | - ~/.cargo/registry - ~/.cargo/git - target - key: ${{ runner.os }}-cargo-sanitizer-bench - - uses: actions-rs/toolchain@v1 - name: Setup Cargo Toolchain 🛎️ - with: - components: rust-src - toolchain: nightly - default: true - - uses: actions-rs/cargo@v1 - name: Bench with Address Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=address" - - uses: actions-rs/cargo@v1 - name: Bench with Leak Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=leak" - - uses: actions-rs/cargo@v1 - name: Bench with Memory Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=memory" - - uses: actions-rs/cargo@v1 - name: Bench with Thread Sanitizer 🚀 - with: - command: bench - args: --all-features -Zbuild-std --target x86_64-unknown-linux-gnu - env: - RUSTFLAGS: "-Zsanitizer=thread" \ No newline at end of file + - name: Benchmark 🚀 + run: cargo +nightly bench --all-features --workspace --bench benches_agate_rocks -- --output-format bencher | tee output.txt + - uses: benchmark-action/github-action-benchmark@v1 + name: Store benchmark result + with: + name: Benchmark with RocksDB + tool: "cargo" + output-file-path: output.txt + # Access token to deploy GitHub Pages branch + github-token: ${{ secrets.GITHUB_TOKEN }} + # Push and deploy GitHub pages branch automatically + auto-push: true diff --git a/Cargo.toml b/Cargo.toml index 89e7d1ca..b147ba90 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -4,6 +4,10 @@ version = "0.1.0" authors = ["Jay Lee "] edition = "2018" +[features] +default = [] +enable-rocksdb = ["rocksdb"] + [dependencies] bytes = "1.0" coarsetime = "0.1.22" @@ -20,6 +24,7 @@ parking_lot = "0.11" prost = "0.8" proto = { path = "proto" } rand = "0.7" +rocksdb = { version = "0.15", optional = true } skiplist = { path = "skiplist" } tempdir = "0.3" thiserror = "1.0" @@ -51,6 +56,10 @@ harness = false name = "bench_iterator" harness = false +[[bench]] +name = "benches_agate_rocks" +harness = false + [profile.bench] opt-level = 3 debug = false diff --git a/agate_bench/Cargo.toml b/agate_bench/Cargo.toml index 4b3faea8..a2f23434 100644 --- a/agate_bench/Cargo.toml +++ b/agate_bench/Cargo.toml @@ -15,6 +15,7 @@ clap = "2.33" indicatif = "0.15" rand = "0.7" rocksdb = { version = "0.15", optional = true } +tempdir = "0.3" threadpool = "1.8" yatp = { git = "https://github.com/tikv/yatp.git" } diff --git a/agate_bench/src/main.rs b/agate_bench/src/main.rs index 7853c576..17e3b4d9 100644 --- a/agate_bench/src/main.rs +++ b/agate_bench/src/main.rs @@ -1,35 +1,15 @@ -use agatedb::{AgateOptions, IteratorOptions}; -use bytes::{Bytes, BytesMut}; +use std::{path::PathBuf, sync::Arc, time::Instant}; + +use agatedb::AgateOptions; use clap::clap_app; use indicatif::{ProgressBar, ProgressStyle}; -use rand::Rng; -use std::path::PathBuf; -use std::sync::Arc; -use std::time::UNIX_EPOCH; -use std::{sync::mpsc::channel, time::Duration}; - -#[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; - -fn gen_kv_pair(key: u64, value_size: usize) -> (Bytes, Bytes) { - let key = Bytes::from(format!("vsz={:05}-k={:010}", value_size, key)); - - let mut value = BytesMut::with_capacity(value_size); - value.resize(value_size, 0); - (key, value.freeze()) -} +#[path = "../../benches/common.rs"] +mod common; -pub fn unix_time() -> u64 { - UNIX_EPOCH - .elapsed() - .expect("Time went backwards") - .as_millis() as u64 -} +use common::{ + agate_iterate, agate_populate, agate_randread, rocks_iterate, rocks_populate, rocks_randread, +}; pub struct Rate { pub data: std::sync::Arc, @@ -73,24 +53,24 @@ fn main() { (version: "1.0") (author: "TiKV authors") (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") - (@arg seq: --seq +takes_value default_value("true") "write sequentially") + (@arg batch_size: --batch_size +takes_value default_value("1000") "pairs in one txn") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") + (@arg seq: --seq +takes_value default_value("true") "write sequentially") ) (@subcommand randread => (about: "randomly read from database") (version: "1.0") (author: "TiKV authors") - (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") ) (@subcommand iterate => (about: "iterate database") (version: "1.0") (author: "TiKV authors") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") ) (@subcommand rocks_populate => @@ -98,122 +78,70 @@ fn main() { (version: "1.0") (author: "TiKV authors") (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") - (@arg seq: --seq +takes_value default_value("true") "write sequentially") + (@arg batch_size: --batch_size +takes_value default_value("1000") "pairs in one txn") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") + (@arg seq: --seq +takes_value default_value("true") "write sequentially") ) (@subcommand rocks_randread => (about: "randomly read from database") (version: "1.0") (author: "TiKV authors") - (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") - (@arg chunk_size: --chunk_size +takes_value default_value("1000") "pairs in one txn") ) (@subcommand rocks_iterate => (about: "iterate database") (version: "1.0") (author: "TiKV authors") (@arg times: --times +takes_value default_value("5") "read how many times") + (@arg key_nums: --key_nums +takes_value default_value("1024") "key numbers") (@arg value_size: --value_size +takes_value default_value("1024") "value size") ) ) .get_matches(); let directory = PathBuf::from(matches.value_of("directory").unwrap()); - let threads: usize = matches.value_of("threads").unwrap().parse().unwrap(); - let pool = threadpool::ThreadPool::new(threads); - let (tx, rx) = channel(); + let threads: u64 = matches.value_of("threads").unwrap().parse().unwrap(); + + let mut agate_opts = AgateOptions { + dir: directory.clone(), + value_dir: directory.clone(), + sync_writes: true, + managed_txns: true, + ..Default::default() + }; + + let mut rocks_opts = rocksdb::Options::default(); + rocks_opts.create_if_missing(true); + rocks_opts.set_compression_type(rocksdb::DBCompressionType::None); match matches.subcommand() { ("populate", Some(sub_matches)) => { let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let batch_size: u64 = sub_matches.value_of("batch_size").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - - let mut options = AgateOptions { - create_if_not_exists: true, - dir: directory.clone(), - value_dir: directory, - managed_txns: true, - ..Default::default() - }; - let agate = Arc::new(options.open().unwrap()); - let mut expected = 0; - let pb = ProgressBar::hidden(); - pb.set_style(ProgressStyle::default_bar() - .template( - "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", - ) - .progress_chars("=> ")); + let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); - let mut write = Rate::new(); - let mut last_report = std::time::Instant::now(); + let agate = Arc::new(agate_opts.open().unwrap()); + let chunk_size = key_nums / threads; - let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); + let begin = Instant::now(); - if seq { - println!("writing sequentially"); - } + agate_populate(agate, key_nums, chunk_size, batch_size, value_size, seq); - for i in 0..key_nums / chunk_size { - let agate = agate.clone(); - let tx = tx.clone(); - let write = write.data.clone(); - pool.execute(move || { - let range = (i * chunk_size)..((i + 1) * chunk_size); - let mut txn = agate.new_transaction_at(unix_time(), true); - let mut rng = rand::thread_rng(); - for j in range { - let (key, value) = if seq { - gen_kv_pair(j, value_size) - } else { - gen_kv_pair(rng.gen_range(0, key_nums), value_size) - }; - txn.set(key, value).unwrap(); - write.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - txn.commit_at(unix_time()).unwrap(); - tx.send(()).unwrap(); - }); - expected += 1; - } + let cost = begin.elapsed(); - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - pb.inc(chunk_size); - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - if delta > std::time::Duration::from_secs(1) { - last_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - write.rate() as f64 / delta.as_secs_f64(), - write.now() - ); - } - } - pb.finish_with_message("done"); + println!("populate {} keys in {} ms", key_nums, cost.as_millis()); } ("randread", Some(sub_matches)) => { + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); - let mut options = AgateOptions { - create_if_not_exists: true, - sync_writes: true, - dir: directory.clone(), - value_dir: directory, - managed_txns: true, - ..Default::default() - }; - let agate = Arc::new(options.open().unwrap()); - let mut expected = 0; + let agate = Arc::new(agate_opts.open().unwrap()); + let chunk_size = key_nums / threads; + let pb = ProgressBar::new(key_nums * times); pb.set_style(ProgressStyle::default_bar() .template( @@ -221,195 +149,80 @@ fn main() { ) .progress_chars("=> ")); - let mut missing = Rate::new(); - let mut found = Rate::new(); - let mut last_report = std::time::Instant::now(); + let begin = Instant::now(); for _ in 0..times { - for i in 0..key_nums / chunk_size { - let agate = agate.clone(); - let tx = tx.clone(); - let missing = missing.data.clone(); - let found = found.data.clone(); - pool.execute(move || { - let range = (i * chunk_size)..((i + 1) * chunk_size); - let txn = agate.new_transaction_at(unix_time(), false); - let mut rng = rand::thread_rng(); - for _ in range { - let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); - match txn.get(&key) { - Ok(item) => { - assert_eq!(item.value().len(), value_size); - found.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - Err(err) => { - if matches!(err, agatedb::Error::KeyNotFound(_)) { - missing.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - continue; - } else { - panic!("{:?}", err); - } - } - } - } - tx.send(()).unwrap(); - }); - expected += 1; - } + agate_randread(agate.clone(), key_nums, chunk_size, value_size); + pb.inc(key_nums); } - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - last_report = now; - if delta > std::time::Duration::from_secs(1) { - println!( - "{}, rate: {}, found: {}, missing: {}", - now.duration_since(begin).as_secs_f64(), - (found.rate() + missing.rate()) as f64 / delta.as_secs_f64(), - found.now(), - missing.now() - ); - } - } pb.finish_with_message("done"); + + let cost = begin.elapsed(); + println!( + "randread {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } ("iterate", Some(sub_matches)) => { let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let mut options = AgateOptions { - create_if_not_exists: true, - sync_writes: true, - dir: directory.clone(), - value_dir: directory, - managed_txns: true, - ..Default::default() - }; - let agate = Arc::new(options.open().unwrap()); + let agate = Arc::new(agate_opts.open().unwrap()); + let chunk_size = key_nums / threads; + + let pb = ProgressBar::new(key_nums * times); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); - let begin = std::time::Instant::now(); - let mut lst_report = std::time::Instant::now(); - let mut total = Rate::new(); + let begin = Instant::now(); for _ in 0..times { - let agate = agate.clone(); - let txn = agate.new_transaction_at(unix_time(), false); - let opts = IteratorOptions::default(); - let mut iter = txn.new_iterator(&opts); - iter.rewind(); - while iter.valid() { - let item = iter.item(); - assert_eq!(item.value().len(), value_size); - total - .data - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - iter.next(); - let now = std::time::Instant::now(); - if now.duration_since(lst_report) >= Duration::from_secs(1) { - lst_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - total.rate(), - total.now() - ); - } - } + agate_iterate(agate.clone(), key_nums, chunk_size, value_size); + pb.inc(key_nums); } + pb.finish_with_message("done"); + + let cost = begin.elapsed(); println!( - "read total {} keys in {}", - total.now(), - begin.elapsed().as_secs_f64() - ) + "iterate {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } - #[cfg(feature = "enable-rocksdb")] ("rocks_populate", Some(sub_matches)) => { let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); + let batch_size: u64 = sub_matches.value_of("batch_size").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_compression_type(rocksdb::DBCompressionType::None); - - let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); - let mut expected = 0; - let pb = ProgressBar::hidden(); - pb.set_style(ProgressStyle::default_bar() - .template( - "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", - ) - .progress_chars("=> ")); + let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); - let mut write = Rate::new(); - let mut last_report = std::time::Instant::now(); + let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); + let chunk_size = key_nums / threads; - let seq: bool = sub_matches.value_of("seq").unwrap().parse().unwrap(); + let begin = Instant::now(); - if seq { - println!("writing sequentially"); - } + rocks_populate(db, key_nums, chunk_size, batch_size, value_size, seq); - for i in 0..key_nums / chunk_size { - let db = db.clone(); - let tx = tx.clone(); - let write = write.data.clone(); - pool.execute(move || { - let mut write_options = rocksdb::WriteOptions::default(); - write_options.set_sync(true); - write_options.disable_wal(false); - - let range = (i * chunk_size)..((i + 1) * chunk_size); - let mut batch = rocksdb::WriteBatch::default(); - let mut rng = rand::thread_rng(); - for j in range { - let (key, value) = if seq { - gen_kv_pair(j, value_size) - } else { - gen_kv_pair(rng.gen_range(0, key_nums), value_size) - }; - batch.put(key, value); - write.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - db.write_opt(batch, &write_options).unwrap(); - tx.send(()).unwrap(); - }); - expected += 1; - } + let cost = begin.elapsed(); - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - pb.inc(chunk_size); - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - if delta > std::time::Duration::from_secs(1) { - last_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - write.rate() as f64 / delta.as_secs_f64(), - write.now() - ); - } - } - pb.finish_with_message("done"); + println!("populate {} keys in {} ms", key_nums, cost.as_millis()); } - #[cfg(feature = "enable-rocksdb")] ("rocks_randread", Some(sub_matches)) => { + let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let chunk_size: u64 = sub_matches.value_of("chunk_size").unwrap().parse().unwrap(); - let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_compression_type(rocksdb::DBCompressionType::None); - let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); - let mut expected = 0; + let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); + let chunk_size = key_nums / threads; + let pb = ProgressBar::new(key_nums * times); pb.set_style(ProgressStyle::default_bar() .template( @@ -417,99 +230,56 @@ fn main() { ) .progress_chars("=> ")); - let mut missing = Rate::new(); - let mut found = Rate::new(); - let mut last_report = std::time::Instant::now(); + let begin = Instant::now(); for _ in 0..times { - for i in 0..key_nums / chunk_size { - let db = db.clone(); - let tx = tx.clone(); - let missing = missing.data.clone(); - let found = found.data.clone(); - pool.execute(move || { - let range = (i * chunk_size)..((i + 1) * chunk_size); - let mut rng = rand::thread_rng(); - for _ in range { - let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); - match db.get(&key) { - Ok(Some(value)) => { - assert_eq!(value.len(), value_size); - found.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - } - Ok(None) => { - missing.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - continue; - } - Err(err) => { - panic!("{:?}", err); - } - } - } - tx.send(()).unwrap(); - }); - expected += 1; - } + rocks_randread(db.clone(), key_nums, chunk_size, value_size); + pb.inc(key_nums); } - let begin = std::time::Instant::now(); - - for _ in rx.iter().take(expected) { - let now = std::time::Instant::now(); - let delta = now.duration_since(last_report); - last_report = now; - if delta > std::time::Duration::from_secs(1) { - println!( - "{}, rate: {}, found: {}, missing: {}", - now.duration_since(begin).as_secs_f64(), - (found.rate() + missing.rate()) as f64 / delta.as_secs_f64(), - found.now(), - missing.now() - ); - } - } pb.finish_with_message("done"); + + let cost = begin.elapsed(); + println!( + "randread {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } - #[cfg(feature = "enable-rocksdb")] ("rocks_iterate", Some(sub_matches)) => { let times: u64 = sub_matches.value_of("times").unwrap().parse().unwrap(); + let key_nums: u64 = sub_matches.value_of("key_nums").unwrap().parse().unwrap(); let value_size: usize = sub_matches.value_of("value_size").unwrap().parse().unwrap(); - let mut opts = rocksdb::Options::default(); - opts.create_if_missing(true); - opts.set_compression_type(rocksdb::DBCompressionType::None); - let db = Arc::new(rocksdb::DB::open(&opts, directory).unwrap()); + let db = Arc::new(rocksdb::DB::open(&rocks_opts, &directory).unwrap()); + let chunk_size = key_nums / threads; - let begin = std::time::Instant::now(); - let mut lst_report = std::time::Instant::now(); - let mut total = Rate::new(); + let pb = ProgressBar::new(key_nums * times); + pb.set_style(ProgressStyle::default_bar() + .template( + "{prefix:.bold.dim} [{elapsed_precise}] [{bar:40}] [{per_sec}] ({pos}/{len}) {msg}", + ) + .progress_chars("=> ")); + + let begin = Instant::now(); for _ in 0..times { - let iter = db.iterator(rocksdb::IteratorMode::Start); - for (_, value) in iter { - assert_eq!(value.len(), value_size); - total - .data - .fetch_add(1, std::sync::atomic::Ordering::Relaxed); - let now = std::time::Instant::now(); - if now.duration_since(lst_report) >= Duration::from_secs(1) { - lst_report = now; - println!( - "{}, rate: {}, total: {}", - now.duration_since(begin).as_secs_f64(), - total.rate(), - total.now() - ); - } - } + rocks_iterate(db.clone(), key_nums, chunk_size, value_size); + pb.inc(key_nums); } + pb.finish_with_message("done"); + + let cost = begin.elapsed(); println!( - "read total {} keys in {}", - total.now(), - begin.elapsed().as_secs_f64() - ) + "iterate {} keys {} times in {} ms", + key_nums, + times, + cost.as_millis() + ); } + _ => panic!("unsupported command"), } } diff --git a/benches/bench_iterator.rs b/benches/bench_iterator.rs index 692c5483..bdc702e7 100644 --- a/benches/bench_iterator.rs +++ b/benches/bench_iterator.rs @@ -1,12 +1,8 @@ mod common; -use agatedb::util::unix_time; -use agatedb::AgateIterator; -use agatedb::AgateOptions; -use agatedb::ConcatIterator; -use agatedb::Iterators; -use agatedb::MergeIterator; - +use agatedb::{ + util::unix_time, AgateIterator, AgateOptions, ConcatIterator, Iterators, MergeIterator, +}; use bytes::Bytes; use common::get_table_for_benchmark; use criterion::{criterion_group, criterion_main, Criterion}; diff --git a/benches/benches_agate_rocks.rs b/benches/benches_agate_rocks.rs new file mode 100644 index 00000000..4abd3424 --- /dev/null +++ b/benches/benches_agate_rocks.rs @@ -0,0 +1,297 @@ +#![cfg(feature = "enable-rocksdb")] +mod common; + +use std::{ + ops::Add, + sync::Arc, + time::{Duration, Instant}, +}; + +use agatedb::AgateOptions; +use common::{ + agate_iterate, agate_populate, agate_randread, remove_files, rocks_iterate, rocks_populate, + rocks_randread, +}; +use criterion::{criterion_group, criterion_main, Criterion}; +use tempdir::TempDir; + +// We will process `CHUNK_SIZE` items in a thread, and in one certain thread, +// we will process `BATCH_SIZE` items in a transaction or write batch. +const KEY_NUMS: u64 = 160_000; +const CHUNK_SIZE: u64 = 10_000; +const BATCH_SIZE: u64 = 100; + +const SMALL_VALUE_SIZE: usize = 32; +const LARGE_VALUE_SIZE: usize = 4096; + +fn bench_agate(c: &mut Criterion) { + let dir = TempDir::new("agatedb-bench-small-value").unwrap(); + let dir_path = dir.path(); + let mut opts = AgateOptions { + dir: dir_path.to_path_buf(), + value_dir: dir_path.to_path_buf(), + sync_writes: true, + managed_txns: true, + ..Default::default() + }; + + c.bench_function("agate sequentially populate small value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + SMALL_VALUE_SIZE, + true, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("agate randomly populate small value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + SMALL_VALUE_SIZE, + false, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + let agate = Arc::new(opts.open().unwrap()); + + c.bench_function("agate randread small value", |b| { + b.iter(|| { + agate_randread(agate.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE); + }); + }); + + c.bench_function("agate iterate small value", |b| { + b.iter(|| { + agate_iterate(agate.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE); + }); + }); + + dir.close().unwrap(); + let dir = TempDir::new("agatedb-bench-large-value").unwrap(); + let dir_path = dir.path(); + opts.dir = dir_path.to_path_buf(); + opts.value_dir = dir_path.to_path_buf(); + + c.bench_function("agate sequentially populate large value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + LARGE_VALUE_SIZE, + true, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("agate randomly populate large value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let agate = Arc::new(opts.open().unwrap()); + + let now = Instant::now(); + agate_populate( + agate, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + LARGE_VALUE_SIZE, + false, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + let agate = Arc::new(opts.open().unwrap()); + + c.bench_function("agate randread large value", |b| { + b.iter(|| { + agate_randread(agate.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE); + }); + }); + + c.bench_function("agate iterate large value", |b| { + b.iter(|| { + agate_iterate(agate.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE); + }); + }); + + dir.close().unwrap(); +} + +fn bench_rocks(c: &mut Criterion) { + let dir = TempDir::new("rocks-bench-small-value").unwrap(); + let dir_path = dir.path(); + let mut opts = rocksdb::Options::default(); + opts.create_if_missing(true); + opts.set_compression_type(rocksdb::DBCompressionType::None); + + c.bench_function("rocks sequentially populate small value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); + rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, SMALL_VALUE_SIZE, true); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("rocks randomly populate small value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); + rocks_populate( + db, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + SMALL_VALUE_SIZE, + false, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + c.bench_function("rocks randread small value", |b| { + b.iter(|| { + rocks_randread(db.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE); + }); + }); + + c.bench_function("rocks iterate small value", |b| { + b.iter(|| rocks_iterate(db.clone(), KEY_NUMS, CHUNK_SIZE, SMALL_VALUE_SIZE)); + }); + + dir.close().unwrap(); + let dir = TempDir::new("rocks-bench-large-value").unwrap(); + let dir_path = dir.path(); + + c.bench_function("rocks sequentially populate large value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); + rocks_populate(db, KEY_NUMS, CHUNK_SIZE, BATCH_SIZE, LARGE_VALUE_SIZE, true); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + c.bench_function("rocks randomly populate large value", |b| { + b.iter_custom(|iters| { + let mut total = Duration::new(0, 0); + + (0..iters).into_iter().for_each(|_| { + remove_files(dir_path); + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + let now = Instant::now(); + rocks_populate( + db, + KEY_NUMS, + CHUNK_SIZE, + BATCH_SIZE, + LARGE_VALUE_SIZE, + false, + ); + total = total.add(now.elapsed()); + }); + + total + }); + }); + + let db = Arc::new(rocksdb::DB::open(&opts, &dir).unwrap()); + + c.bench_function("rocks randread large value", |b| { + b.iter(|| { + rocks_randread(db.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE); + }); + }); + + c.bench_function("rocks iterate large value", |b| { + b.iter(|| rocks_iterate(db.clone(), KEY_NUMS, CHUNK_SIZE, LARGE_VALUE_SIZE)); + }); + + dir.close().unwrap(); +} + +criterion_group! { + name = benches_agate_rocks; + config = Criterion::default(); + targets = bench_agate, bench_rocks +} + +criterion_main!(benches_agate_rocks); diff --git a/benches/common.rs b/benches/common.rs index 6742ad2f..44e83e22 100644 --- a/benches/common.rs +++ b/benches/common.rs @@ -1,11 +1,20 @@ #![allow(dead_code)] +use std::{ + fs::{read_dir, remove_file}, + ops::{Deref, DerefMut}, + path::Path, + sync::Arc, + time::UNIX_EPOCH, +}; + use agatedb::{ - opt::build_table_options, AgateOptions, ChecksumVerificationMode::NoVerification, Table, - TableBuilder, Value, + opt::build_table_options, util::sync_dir, Agate, AgateOptions, + ChecksumVerificationMode::NoVerification, IteratorOptions, Table, TableBuilder, Value, }; -use bytes::Bytes; +use bytes::{Bytes, BytesMut}; use rand::{distributions::Alphanumeric, Rng}; -use std::ops::{Deref, DerefMut}; +#[cfg(feature = "enable-rocksdb")] +use rocksdb::DB; use tempdir::TempDir; pub fn rand_value() -> String { @@ -64,3 +73,242 @@ pub fn get_table_for_benchmark(count: usize) -> TableGuard { _tmp_dir: tmp_dir, } } + +pub fn gen_kv_pair(key: u64, value_size: usize) -> (Bytes, Bytes) { + let key = Bytes::from(format!("vsz={:05}-k={:010}", value_size, key)); + + let mut value = BytesMut::with_capacity(value_size); + value.resize(value_size, 0); + + (key, value.freeze()) +} + +pub fn unix_time() -> u64 { + UNIX_EPOCH + .elapsed() + .expect("Time went backwards") + .as_millis() as u64 +} + +pub fn remove_files(path: &Path) { + read_dir(path).unwrap().into_iter().for_each(|entry| { + let entry = entry.unwrap(); + remove_file(entry.path()).unwrap(); + }); + sync_dir(&path).unwrap(); +} + +pub fn agate_populate( + agate: Arc, + key_nums: u64, + chunk_size: u64, + batch_size: u64, + value_size: usize, + seq: bool, +) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + + handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + let range = chunk_start..chunk_start + chunk_size; + + for batch_start in range.step_by(batch_size as usize) { + let mut txn = agate.new_transaction_at(unix_time(), true); + + (batch_start..batch_start + batch_size).for_each(|key| { + let (key, value) = if seq { + gen_kv_pair(key, value_size) + } else { + gen_kv_pair(rng.gen_range(0, key_nums), value_size) + }; + txn.set(key, value).unwrap(); + }); + + txn.commit_at(unix_time()).unwrap(); + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +pub fn agate_randread(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + + handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + let range = chunk_start..chunk_start + chunk_size; + let txn = agate.new_transaction_at(unix_time(), false); + + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match txn.get(&key) { + Ok(item) => { + assert_eq!(item.value().len(), value_size); + } + Err(err) => { + if matches!(err, agatedb::Error::KeyNotFound(_)) { + continue; + } else { + panic!("{:?}", err); + } + } + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +pub fn agate_iterate(agate: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let agate = agate.clone(); + let (key, _) = gen_kv_pair(chunk_start, value_size); + + handles.push(std::thread::spawn(move || { + let txn = agate.new_transaction_at(unix_time(), false); + let opts = IteratorOptions::default(); + let mut iter = txn.new_iterator(&opts); + iter.seek(&key); + let mut count = 0; + + while iter.valid() { + let item = iter.item(); + assert_eq!(item.value().len(), value_size); + + iter.next(); + + count += 1; + if count > chunk_size { + break; + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +#[cfg(feature = "enable-rocksdb")] +pub fn rocks_populate( + db: Arc, + key_nums: u64, + chunk_size: u64, + batch_size: u64, + value_size: usize, + seq: bool, +) { + let mut write_options = rocksdb::WriteOptions::default(); + write_options.set_sync(true); + write_options.disable_wal(false); + let write_options = Arc::new(write_options); + + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + let write_options = write_options.clone(); + + handles.push(std::thread::spawn(move || { + let range = chunk_start..chunk_start + chunk_size; + + for batch_start in range.step_by(batch_size as usize) { + let mut rng = rand::thread_rng(); + let mut batch = rocksdb::WriteBatch::default(); + + (batch_start..batch_start + batch_size).for_each(|key| { + let (key, value) = if seq { + gen_kv_pair(key, value_size) + } else { + gen_kv_pair(rng.gen_range(0, key_nums), value_size) + }; + batch.put(key, value); + }); + + db.write_opt(batch, &write_options).unwrap(); + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +#[cfg(feature = "enable-rocksdb")] +pub fn rocks_randread(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + + handles.push(std::thread::spawn(move || { + let mut rng = rand::thread_rng(); + let range = chunk_start..chunk_start + chunk_size; + + for _ in range { + let (key, _) = gen_kv_pair(rng.gen_range(0, key_nums), value_size); + match db.get(key) { + Ok(item) => { + if item.is_some() { + assert_eq!(item.unwrap().len(), value_size); + } + } + Err(err) => { + panic!("{:?}", err); + } + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} + +#[cfg(feature = "enable-rocksdb")] +pub fn rocks_iterate(db: Arc, key_nums: u64, chunk_size: u64, value_size: usize) { + let mut handles = vec![]; + + for chunk_start in (0..key_nums).step_by(chunk_size as usize) { + let db = db.clone(); + let (key, _) = gen_kv_pair(chunk_start, value_size); + + handles.push(std::thread::spawn(move || { + let mut iter = db.raw_iterator(); + iter.seek(&key); + let mut count = 0; + + while iter.valid() { + assert_eq!(iter.value().unwrap().len(), value_size); + + iter.next(); + + count += 1; + if count > chunk_size { + break; + } + } + })); + } + + handles + .into_iter() + .for_each(|handle| handle.join().unwrap()); +} diff --git a/src/batch.rs b/src/batch.rs new file mode 100644 index 00000000..e5b974c7 --- /dev/null +++ b/src/batch.rs @@ -0,0 +1,203 @@ +use crate::Agate; +use crate::{entry::Entry, ops::transaction::Transaction}; +use crate::{Error, Result}; + +use bytes::Bytes; +use std::sync::Arc; + +/// WriteBatch helps write multiple entries to database +pub struct WriteBatch { + txn: Transaction, + core: Arc, + err: Option, + + is_managed: bool, + commit_ts: u64, + // TODO: Add finished and support concurrency. +} + +impl Agate { + /// Creates a new [`WriteBatch`]. This provides a way to conveniently do a lot of writes, + /// batching them up as tightly as possible in a single transaction, thus achieving good + /// performance. This API hides away the logic of creating and committing transactions. + /// Due to the nature of SSI guaratees provided by Agate, blind writes can never encounter + /// transaction conflicts. + pub fn new_write_batch(&self) -> WriteBatch { + if self.core.opts.managed_txns { + panic!("Cannot use new_write_batch in managed mode. Use new_write_batch_at instead"); + } + + WriteBatch { + txn: self.core.new_transaction(true, true), + core: self.core.clone(), + err: None, + is_managed: false, + commit_ts: 0, + } + } + + /// Similar to `new_write_batch` but it allows user to set the commit timestamp. + pub fn new_write_batch_at(&self, commit_ts: u64) -> WriteBatch { + if !self.core.opts.managed_txns { + panic!( + "Cannot use new_write_batch_at in non-managed mode. Use new_write_batch instead." + ); + } + + let mut txn = self.core.new_transaction(true, true); + txn.commit_ts = commit_ts; + WriteBatch { + txn, + core: self.core.clone(), + err: None, + is_managed: true, + commit_ts, + } + } +} + +impl WriteBatch { + pub fn set(&mut self, key: Bytes, value: Bytes) -> Result<()> { + let entry = Entry::new(key, value); + self.set_entry(entry) + } + + pub fn set_entry(&mut self, entry: Entry) -> Result<()> { + self.handle_entry(entry) + } + + fn handle_entry(&mut self, entry: Entry) -> Result<()> { + if let Err(err) = self.txn.set_entry(entry.clone()) { + if !matches!(err, crate::Error::TxnTooBig) { + self.err = Some(err.clone()); + return Err(err); + } + } else { + return Ok(()); + } + + self.commit()?; + + if let Err(err) = self.txn.set_entry(entry) { + self.err = Some(err.clone()); + return Err(err); + } + + Ok(()) + } + + fn commit(&mut self) -> Result<()> { + if let Some(err) = &self.err { + return Err(err.clone()); + } + + let mut new_txn = self.core.new_transaction(true, self.is_managed); + new_txn.commit_ts = self.commit_ts; + let txn = std::mem::replace(&mut self.txn, new_txn); + txn.commit()?; + + Ok(()) + } + + fn delete(&mut self, key: Bytes) -> Result<()> { + if let Err(err) = self.txn.delete(key.clone()) { + if !matches!(err, Error::TxnTooBig) { + self.err = Some(err.clone()); + return Err(err); + } + } + + self.commit()?; + + if let Err(err) = self.txn.delete(key) { + self.err = Some(err.clone()); + return Err(err); + } + + Ok(()) + } + + /// Must be called at the end to ensure that any pending writes get committed to Agate. + /// Returns any error stored by [`WriteBatch`]. + fn flush(mut self) -> Result<()> { + self.commit()?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::{ + db::tests::{generate_test_agate_options, run_agate_test}, + AgateOptions, + }; + use bytes::Bytes; + + fn test_with_options(opts: AgateOptions) { + let key = |i| Bytes::from(format!("{:10}", i)); + let value = |i| Bytes::from(format!("{:128}", i)); + + run_agate_test(Some(opts.clone()), move |agate| { + let mut wb = if !opts.managed_txns { + agate.new_write_batch() + } else { + agate.new_write_batch_at(1) + }; + + // Do not set too large to avoid out of memory. + const N: usize = 100; + const M: usize = 100; + for i in 0..N { + wb.set(key(i), value(i)).unwrap(); + } + for i in 0..M { + wb.delete(key(i)).unwrap(); + } + wb.flush().unwrap(); + }) + } + + #[test] + fn test_on_disk() { + let mut opts = generate_test_agate_options(); + opts.value_threshold = 32; + + test_with_options(opts.clone()); + + opts.managed_txns = true; + test_with_options(opts); + } + + #[test] + fn test_in_memory() { + let mut opts = generate_test_agate_options(); + opts.in_memory = true; + + test_with_options(opts.clone()); + + opts.managed_txns = true; + test_with_options(opts); + } + + #[test] + fn test_empty_write_batch() { + run_agate_test(None, |agate| { + let wb = agate.new_write_batch(); + wb.flush().unwrap(); + }); + + let opts = AgateOptions { + managed_txns: true, + ..Default::default() + }; + + run_agate_test(Some(opts), |agate| { + let wb = agate.new_write_batch_at(2); + wb.flush().unwrap(); + let wb = agate.new_write_batch_at(208); + wb.flush().unwrap(); + let wb = agate.new_write_batch_at(31); + wb.flush().unwrap(); + }) + } +} diff --git a/src/checksum.rs b/src/checksum.rs index b6126b82..98768845 100644 --- a/src/checksum.rs +++ b/src/checksum.rs @@ -1,6 +1,7 @@ -use crate::{Error, Result}; use proto::meta::{checksum::Algorithm as ChecksumAlgorithm, Checksum}; +use crate::{Error, Result}; + pub fn calculate_checksum(data: &[u8], algo: ChecksumAlgorithm) -> u64 { match algo { ChecksumAlgorithm::Crc32c => { diff --git a/src/db.rs b/src/db.rs index 2c5ea8b7..c1c04a65 100644 --- a/src/db.rs +++ b/src/db.rs @@ -18,6 +18,7 @@ pub use opt::AgateOptions; use skiplist::Skiplist; use yatp::task::callback::Handle; +use crate::value::ValuePointer; use crate::{ closer::Closer, entry::Entry, @@ -408,10 +409,14 @@ impl Core { // TODO: reduce encode / decode by using something like flatbuffer let mut vs = Value::default(); vs.decode(iter.value().clone()); - if vs.meta & value::VALUE_POINTER != 0 { - panic!("value pointer not supported"); - } - builder.add(iter.key(), &vs, 0); // TODO: support vlog length + let vlog_len = if vs.meta & value::VALUE_POINTER != 0 { + let mut vp = ValuePointer::default(); + vp.decode(&vs.value); + vp.len + } else { + 0 + }; + builder.add(iter.key(), &vs, vlog_len); iter.next(); } builder diff --git a/src/db/opt.rs b/src/db/opt.rs index c785c3c9..545d9427 100644 --- a/src/db/opt.rs +++ b/src/db/opt.rs @@ -1,10 +1,10 @@ -use skiplist::MAX_NODE_SIZE; +use std::cmp; -use super::*; use getset::Setters; +use skiplist::MAX_NODE_SIZE; +use super::*; use crate::{entry::Entry, opt}; -use std::cmp; #[derive(Clone, Setters)] pub struct AgateOptions { @@ -114,7 +114,7 @@ pub struct AgateOptions { pub num_compactors: usize, /// Indicates when the db should verify checksums for SSTable blocks. /// - /// The default value of `checksum_mode` is [`ChecksumVerificationMode`]. + /// The default value of `checksum_mode` is [`NoVerification`]. pub checksum_mode: opt::ChecksumVerificationMode, /// Determines whether the transactions would be checked for conflicts. diff --git a/src/db/tests.rs b/src/db/tests.rs index 123acc87..fc09c833 100644 --- a/src/db/tests.rs +++ b/src/db/tests.rs @@ -191,10 +191,16 @@ fn test_flush_memtable() { #[test] fn test_in_memory_agate() { - run_agate_test(None, |agate| { - agate.write_requests(generate_requests(10)).unwrap(); - verify_requests(10, &agate); - }); + run_agate_test( + Some(AgateOptions { + in_memory: true, + ..Default::default() + }), + |agate| { + agate.write_requests(generate_requests(10)).unwrap(); + verify_requests(10, &agate); + }, + ); } #[test] diff --git a/src/lib.rs b/src/lib.rs index 7f01a66e..17bb41ec 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,5 +1,6 @@ #![allow(dead_code)] +mod batch; mod bloom; mod checksum; mod closer; @@ -29,8 +30,8 @@ pub use iterator::IteratorOptions; pub use iterator_trait::AgateIterator; pub use opt::{ChecksumVerificationMode, Options as TableOptions}; pub use skiplist::Skiplist; -pub use table::merge_iterator::Iterators; -pub use table::ConcatIterator; -pub use table::MergeIterator; -pub use table::{builder::Builder as TableBuilder, Table}; +pub use table::{ + builder::Builder as TableBuilder, merge_iterator::Iterators, ConcatIterator, MergeIterator, + Table, +}; pub use value::Value; diff --git a/src/managed_db.rs b/src/managed_db.rs index 8fe4d1ce..b617ba47 100644 --- a/src/managed_db.rs +++ b/src/managed_db.rs @@ -1,9 +1,7 @@ -use crate::db::Agate; -use crate::ops::transaction::Transaction; -use crate::Result; - use std::sync::Arc; +use crate::{db::Agate, ops::transaction::Transaction, Result}; + impl crate::db::Core { /// Follows the same logic as `new_transaction`, but uses the provided read timestamp. pub fn new_transaction_at(self: &Arc, read_ts: u64, update: bool) -> Transaction { diff --git a/src/ops/transaction_test.rs b/src/ops/transaction_test.rs index 01869769..e5cab415 100644 --- a/src/ops/transaction_test.rs +++ b/src/ops/transaction_test.rs @@ -1,17 +1,15 @@ -use crate::Error; - -use crate::assert_bytes_eq; +use crate::{assert_bytes_eq, Error}; /// Tests in managed mode. mod managed_db { + use bytes::{Bytes, BytesMut}; + + use super::*; use crate::{ db::tests::{generate_test_agate_options, run_agate_test, with_payload}, entry::Entry, AgateOptions, }; - use bytes::{Bytes, BytesMut}; - - use super::*; fn default_test_managed_opts() -> AgateOptions { let mut opts = generate_test_agate_options(); @@ -136,6 +134,12 @@ mod normal_db { Arc, }; + use bytes::{Bytes, BytesMut}; + use crossbeam_channel::select; + use rand::Rng; + use yatp::{task::callback::Handle, Builder}; + + use super::*; use crate::{ closer::Closer, db::tests::{generate_test_agate_options, run_agate_test}, @@ -146,12 +150,6 @@ mod normal_db { value::VALUE_DELETE, Agate, AgateOptions, }; - use bytes::{Bytes, BytesMut}; - use crossbeam_channel::select; - use rand::Rng; - use yatp::{task::callback::Handle, Builder}; - - use super::*; #[test] fn test_txn_simple() { diff --git a/src/table/merge_iterator.rs b/src/table/merge_iterator.rs index 612398f4..dd08460d 100644 --- a/src/table/merge_iterator.rs +++ b/src/table/merge_iterator.rs @@ -235,8 +235,10 @@ impl AgateIterator for MergeIterator { #[cfg(test)] mod tests { use super::*; - use crate::assert_bytes_eq; - use crate::format::{key_with_ts, user_key}; + use crate::{ + assert_bytes_eq, + format::{key_with_ts, user_key}, + }; pub struct VecIterator { vec: Vec,