diff --git a/.vscode/settings.json b/.vscode/settings.json index 0e9b52a..48e5d70 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,6 +1,12 @@ { "editor.rulers": [85], - "rust-analyzer.cargo.features": ["hashlink", "mini-moka", "quick_cache", "stretto"], + "rust-analyzer.cargo.features": [ + "metrics", + "hashlink", + "mini-moka", + "quick_cache", + "stretto" + ], "rust-analyzer.server.extraEnv": { "CARGO_TARGET_DIR": "target/ra" }, @@ -15,6 +21,7 @@ "moka", "mokabench", "oltp", + "statsd", "thiserror", "Toolchain", "unsync" diff --git a/Cargo.lock b/Cargo.lock index 7239d85..2f3ffed 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,12 @@ version = "1.0.75" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6" +[[package]] +name = "arc-swap" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bddcadddf5e9015d310179a59bb28c4d4b9920ad0f11e8e14dbadf654890c9a6" + [[package]] name = "async-attributes" version = "1.1.2" @@ -107,7 +113,7 @@ dependencies = [ "polling", "rustix 0.37.24", "slab", - "socket2", + "socket2 0.4.9", "waker-fn", ] @@ -571,9 +577,12 @@ checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" [[package]] name = "hashbrown" -version = "0.13.2" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "43a3c133739dddd0d2990f9a4bdf8eb4b21ef50e4851ca85ab661199821d510e" +checksum = "33ff8ae62cd3a9102e5637afc8452c55acf3844001bd5374e0b0bd7b6616c038" +dependencies = [ + "ahash", +] [[package]] name = "hashbrown" @@ -654,6 +663,37 @@ version = "1.0.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "af150ab688ff2122fcef229be89cb50dd66af9e01a4ff320cc137eecc9bacc38" +[[package]] +name = "jemalloc-ctl" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cffc705424a344c054e135d12ee591402f4539245e8bbd64e6c9eaa9458b63c" +dependencies = [ + "jemalloc-sys", + "libc", + "paste", +] + +[[package]] +name = "jemalloc-sys" +version = "0.5.4+5.3.0-patched" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac6c1946e1cea1788cbfde01c993b52a10e2da07f4bac608228d1bed20bfebf2" +dependencies = [ + "cc", + "libc", +] + +[[package]] +name = "jemallocator" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0de374a9f8e63150e6f5e8a60cc14c668226d7a347d8aee1a45766e3c4dd3bc" +dependencies = [ + "jemalloc-sys", + "libc", +] + [[package]] name = "js-sys" version = "0.3.64" @@ -763,6 +803,58 @@ dependencies = [ "autocfg", ] +[[package]] +name = "metrics" +version = "0.21.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fde3af1a009ed76a778cb84fdef9e7dbbdf5775ae3e4cc1f434a6a307f6f76c5" +dependencies = [ + "ahash", + "metrics-macros", + "portable-atomic", +] + +[[package]] +name = "metrics-exporter-dogstatsd" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "536c0a5509d2574fac7c3338a6f779ed35774edb60ecb23c7972f7b51f37814f" +dependencies = [ + "indexmap", + "metrics", + "metrics-util", + "quanta 0.11.1", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "metrics-macros" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddece26afd34c31585c74a4db0630c376df271c285d682d1e55012197830b6df" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + +[[package]] +name = "metrics-util" +version = "0.15.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4de2ed6e491ed114b40b732e4d1659a9d53992ebd87490c44a6ffe23739d973e" +dependencies = [ + "crossbeam-epoch 0.9.15", + "crossbeam-utils 0.8.16", + "hashbrown 0.13.1", + "metrics", + "num_cpus", + "quanta 0.11.1", + "sketches-ddsketch", +] + [[package]] name = "mini-moka" version = "0.10.2" @@ -787,6 +879,17 @@ dependencies = [ "adler", ] +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" +dependencies = [ + "libc", + "wasi 0.11.0+wasi-snapshot-preview1", + "windows-sys", +] + [[package]] name = "moka" version = "0.8.6" @@ -920,28 +1023,47 @@ name = "mokabench" version = "0.10.0" dependencies = [ "anyhow", + "arc-swap", "async-io", "async-std", "async-trait", "clap", "crossbeam-channel", + "crossbeam-epoch 0.9.15", "futures-util", "hashlink", "itertools", + "jemalloc-ctl", + "jemallocator", + "metrics", + "metrics-exporter-dogstatsd", "mini-moka", "moka 0.10.4", "moka 0.11.3", "moka 0.12.0", "moka 0.8.6", "moka 0.9.9", + "once_cell", "parking_lot", "quick_cache", "stretto", "thiserror", "tokio", + "tracing", + "tracing-subscriber", "xxhash-rust", ] +[[package]] +name = "nu-ansi-term" +version = "0.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +dependencies = [ + "overload", + "winapi", +] + [[package]] name = "num_cpus" version = "1.16.0" @@ -973,6 +1095,12 @@ version = "6.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d5d9eb14b174ee9aa2ef96dc2b94637a2d4b6e7cb873c7e171f0c20c6cf3eac" +[[package]] +name = "overload" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" + [[package]] name = "parking" version = "2.1.1" @@ -1002,6 +1130,12 @@ dependencies = [ "windows-targets", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "pin-project-lite" version = "0.2.13" @@ -1041,6 +1175,12 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "portable-atomic" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31114a898e107c51bb1609ffaf55a0e011cf6a4d7f1170d0015a165082c0338b" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1106,7 +1246,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "539169dc3bed0d6c3982dddb746a3d8907ec50863d9d1bbe3a5bcd413e53e805" dependencies = [ "ahash", - "hashbrown 0.13.2", + "hashbrown 0.13.1", "parking_lot", ] @@ -1285,6 +1425,15 @@ dependencies = [ "serde", ] +[[package]] +name = "sharded-slab" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b21f559e07218024e7e9f90f96f601825397de0e25420135f7f952453fed0b" +dependencies = [ + "lazy_static", +] + [[package]] name = "skeptic" version = "0.13.7" @@ -1300,6 +1449,12 @@ dependencies = [ "walkdir", ] +[[package]] +name = "sketches-ddsketch" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68a406c1882ed7f29cd5e248c9848a80e7cb6ae0fea82346d2746f2f941c07e1" + [[package]] name = "slab" version = "0.4.9" @@ -1325,6 +1480,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "socket2" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" +dependencies = [ + "libc", + "windows-sys", +] + [[package]] name = "stable_deref_trait" version = "1.2.0" @@ -1429,6 +1594,16 @@ dependencies = [ "syn 2.0.37", ] +[[package]] +name = "thread_local" +version = "1.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3fdd6f064ccff2d6567adcb3873ca630700f00b5ad3f060c25b5dcfd9a4ce152" +dependencies = [ + "cfg-if 1.0.0", + "once_cell", +] + [[package]] name = "tokio" version = "1.32.0" @@ -1436,9 +1611,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" dependencies = [ "backtrace", + "libc", + "mio", "num_cpus", "pin-project-lite", + "socket2 0.5.4", "tokio-macros", + "windows-sys", ] [[package]] @@ -1460,14 +1639,55 @@ checksum = "8ce8c33a8d48bd45d624a6e523445fd21ec13d3653cd51f681abf67418f54eb8" dependencies = [ "cfg-if 1.0.0", "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.37", +] + [[package]] name = "tracing-core" version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0955b8137a1df6f1a2e9a37d8a6656291ff0297c1a97c24e0d8425fe2312f79a" +dependencies = [ + "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ddad33d2d10b1ed7eb9d1f518a5674713876e97e5bb9b7345a7984fbb4f922" +dependencies = [ + "lazy_static", + "log", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30a651bc37f915e81f087d86e62a18eec5f79550c7faff886f7090b4ea757c77" +dependencies = [ + "nu-ansi-term", + "sharded-slab", + "smallvec", + "thread_local", + "tracing-core", + "tracing-log", +] [[package]] name = "triomphe" @@ -1503,6 +1723,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "valuable" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" + [[package]] name = "value-bag" version = "1.4.1" diff --git a/Cargo.toml b/Cargo.toml index 56b7de1..747dc5d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ name = "mokabench" version = "0.10.0" edition = "2021" +rust-version = "1.66" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -21,6 +22,9 @@ stretto = ["dep:stretto"] rt-tokio = ["dep:tokio"] rt-async-std = ["dep:async-std"] +# Metrics support +metrics = ["dep:crossbeam-epoch", "dep:metrics", "dep:metrics-exporter-dogstatsd", "dep:once_cell", "dep:tracing", "dep:tracing-subscriber"] + [dependencies] anyhow = "1.0.56" async-io = "1.12.0" @@ -43,6 +47,15 @@ mini-moka = { optional = true, version = "0.10.0" } quick_cache = { optional = true, version = "0.2.1" } stretto = { optional = true, version = "0.7.1" } +# Metrics support +crossbeam-epoch = { optional = true, version = "0.9.15" } +metrics = { optional = true, version = "0.21.1" } +metrics-exporter-dogstatsd = { optional = true, version = "0.8.0" } +once_cell = { optional = true, version = "1.18.0" } +tracing = { optional = true, version = "0.1.37" } +tracing-subscriber = { optional = true, version = "0.3.17" } +arc-swap = "1.6.0" + [dependencies.moka012] package = "moka" optional = true @@ -76,6 +89,10 @@ optional = true version = "0.8.6" features = ["future", "dash"] +[target.'cfg(not(target_env = "msvc"))'.dependencies] +jemallocator = "0.5.4" +jemalloc-ctl = "0.5.4" + # [profile.release] # debug=true # debug-assertions=true diff --git a/src/lib.rs b/src/lib.rs index d3e67ea..4e9cfbb 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -12,34 +12,24 @@ compile_error!( You might need `--no-default-features`." ); -use std::io::prelude::*; -use std::sync::Arc; -use std::{fs::File, io::BufReader, time::Instant}; - -#[cfg(feature = "moka-v012")] -pub(crate) use moka012 as moka; - -#[cfg(feature = "moka-v011")] -pub(crate) use moka011 as moka; - -#[cfg(feature = "moka-v010")] -pub(crate) use moka010 as moka; - -#[cfg(feature = "moka-v09")] -pub(crate) use moka09 as moka; - -#[cfg(feature = "moka-v08")] -pub(crate) use moka08 as moka; - mod async_rt_helper; mod cache; pub mod config; mod eviction_counters; mod load_gen; +mod metrics_exporter; mod parser; mod report; mod trace_file; +use std::io::prelude::*; +use std::{ + fs::File, + io::BufReader, + sync::Arc, + time::{Duration, Instant}, +}; + pub(crate) use eviction_counters::EvictionCounters; pub use report::Report; pub use trace_file::TraceFile; @@ -51,11 +41,27 @@ use cache::{ }, AsyncCacheDriver, CacheDriver, }; +use crossbeam_channel::Sender; use config::Config; use itertools::Itertools; use parser::TraceEntry; use report::ReportBuilder; +#[cfg(feature = "moka-v012")] +pub(crate) use moka012 as moka; + +#[cfg(feature = "moka-v011")] +pub(crate) use moka011 as moka; + +#[cfg(feature = "moka-v010")] +pub(crate) use moka010 as moka; + +#[cfg(feature = "moka-v09")] +pub(crate) use moka09 as moka; + +#[cfg(feature = "moka-v08")] +pub(crate) use moka08 as moka; + #[cfg(feature = "hashlink")] use crate::cache::hashlink::HashLink; #[cfg(any(feature = "mini-moka", feature = "moka-v08", feature = "moka-v09"))] @@ -79,34 +85,49 @@ pub(crate) enum Command { Iterate, } -pub fn run_multi_threads_moka_sync( +pub async fn run_metrics_exporter(duration: Duration) { + metrics_exporter::init("::1:8125").await; + std::thread::sleep(duration); + metrics_exporter::shutdown(); +} + +pub async fn run_multi_threads_moka_sync( config: &Config, capacity: usize, num_clients: u16, ) -> anyhow::Result { + metrics_exporter::init("::1:8125").await; + let max_cap = if config.size_aware { capacity as u64 * 2u64.pow(15) } else { capacity as u64 }; let report_builder = ReportBuilder::new("Moka Sync Cache", max_cap, Some(num_clients)); + let pp = should_pre_process_all_commands(); #[cfg(not(any(feature = "moka-v08", feature = "moka-v09")))] if config.entry_api { let cache_driver = MokaSyncCache::with_entry_api(config, max_cap, capacity); - return run_multi_threads(config, num_clients, cache_driver, report_builder); + let result = run_multi_threads(config, num_clients, cache_driver, report_builder, pp); + metrics_exporter::shutdown(); + return result; } let cache_driver = MokaSyncCache::new(config, max_cap, capacity); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let result = run_multi_threads(config, num_clients, cache_driver, report_builder, pp); + metrics_exporter::shutdown(); + result } -pub fn run_multi_threads_moka_segment( +pub async fn run_multi_threads_moka_segment( config: &Config, capacity: usize, num_clients: u16, num_segments: usize, ) -> anyhow::Result { + metrics_exporter::init("::1:8125").await; + let max_cap = if config.size_aware { capacity as u64 * 2u64.pow(15) } else { @@ -114,16 +135,21 @@ pub fn run_multi_threads_moka_segment( }; let report_name = format!("Moka SegmentedCache({num_segments})"); let report_builder = ReportBuilder::new(&report_name, max_cap, Some(num_clients)); + let pp = should_pre_process_all_commands(); #[cfg(not(any(feature = "moka-v08", feature = "moka-v09")))] if config.entry_api { let cache_driver = MokaSegmentedCache::with_entry_api(config, max_cap, capacity, num_segments); - return run_multi_threads(config, num_clients, cache_driver, report_builder); + let result = run_multi_threads(config, num_clients, cache_driver, report_builder, pp); + metrics_exporter::shutdown(); + return result; } let cache_driver = MokaSegmentedCache::new(config, max_cap, capacity, num_segments); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let result = run_multi_threads(config, num_clients, cache_driver, report_builder, pp); + metrics_exporter::shutdown(); + result } pub async fn run_multi_tasks_moka_async( @@ -131,21 +157,28 @@ pub async fn run_multi_tasks_moka_async( capacity: usize, num_clients: u16, ) -> anyhow::Result { + metrics_exporter::init("::1:8125").await; + let max_cap = if config.size_aware { capacity as u64 * 2u64.pow(15) } else { capacity as u64 }; let report_builder = ReportBuilder::new("Moka Async Cache", max_cap, Some(num_clients)); + let pp = should_pre_process_all_commands(); #[cfg(not(any(feature = "moka-v08", feature = "moka-v09")))] if config.entry_api { let cache_driver = MokaAsyncCache::with_entry_api(config, max_cap, capacity); - return run_multi_tasks(config, num_clients, cache_driver, report_builder).await; + let result = run_multi_tasks(config, num_clients, cache_driver, report_builder, pp).await; + metrics_exporter::shutdown(); + return result; } let cache_driver = MokaAsyncCache::new(config, max_cap, capacity); - run_multi_tasks(config, num_clients, cache_driver, report_builder).await + let result = run_multi_tasks(config, num_clients, cache_driver, report_builder, pp).await; + metrics_exporter::shutdown(); + result } #[cfg(any(feature = "mini-moka", feature = "moka-v08", feature = "moka-v09"))] @@ -166,7 +199,8 @@ pub fn run_multi_threads_moka_dash( "Moka Dash Cache" }; let report_builder = ReportBuilder::new(report_name, max_cap, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let pp = should_pre_process_all_commands(); + run_multi_threads(config, num_clients, cache_driver, report_builder, pp) } #[cfg(feature = "hashlink")] @@ -178,7 +212,8 @@ pub fn run_multi_threads_hashlink( let cache_driver = HashLink::new(config, capacity); let report_builder = ReportBuilder::new("HashLink (LRU w/ Mutex)", capacity as _, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let pp = should_pre_process_all_commands(); + run_multi_threads(config, num_clients, cache_driver, report_builder, pp) } #[cfg(feature = "quick_cache")] @@ -195,7 +230,8 @@ pub fn run_multi_threads_quick_cache( let cache_driver = QuickCache::new(config, capacity, max_cap); let report_builder = ReportBuilder::new("QuickCache Sync Cache", capacity as _, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let pp = should_pre_process_all_commands(); + run_multi_threads(config, num_clients, cache_driver, report_builder, pp) } #[cfg(feature = "stretto")] @@ -206,7 +242,8 @@ pub fn run_multi_threads_stretto( ) -> anyhow::Result { let cache_driver = StrettoCache::new(config, capacity); let report_builder = ReportBuilder::new("Stretto", capacity as _, Some(num_clients)); - run_multi_threads(config, num_clients, cache_driver, report_builder) + let pp = should_pre_process_all_commands(); + run_multi_threads(config, num_clients, cache_driver, report_builder, pp) } #[cfg(any(feature = "mini-moka", feature = "moka-v08", feature = "moka-v09"))] @@ -253,26 +290,25 @@ fn run_multi_threads( num_clients: u16, cache_driver: impl CacheDriver + Clone + Send + 'static, report_builder: ReportBuilder, + pre_process_all_commands: bool, ) -> anyhow::Result { let report_builder = Arc::new(report_builder); - let (send, receive) = crossbeam_channel::unbounded::>(); + let (send, receive) = if pre_process_all_commands { + crossbeam_channel::unbounded::>() + } else { + crossbeam_channel::bounded(100) + }; - // In order to have the minimum harness overhead and not have many consumers - // waiting for the single producer, we buffer all operations in a channel. - let mut counter = 0; - for _ in 0..(config.repeat.unwrap_or(1)) { - let f = File::open(config.trace_file.path())?; - let reader = BufReader::new(f); - for chunk in reader.lines().enumerate().chunks(BATCH_SIZE).into_iter() { - let chunk = chunk.map(|(i, r)| r.map(|s| (i, s))); - let commands = load_gen::generate_commands(config, BATCH_SIZE, &mut counter, chunk)?; - send.send(commands)?; - } + if pre_process_all_commands { + // In order to have the minimum harness overhead and not have many consumers + // waiting for the single producer, we buffer all operations in a channel. + // https://github.com/moka-rs/mokabench/pull/6 + generate_and_send_commands(config, &send)?; + } else { + // Read the whole trace file to prime the disk cache of the filesystem. + read_trace_file(config)?; } - // Drop the sender channel to notify the workers that we are finished. - std::mem::drop(send); - let instant = Instant::now(); let handles = (0..num_clients) .map(|_| { @@ -290,6 +326,13 @@ fn run_multi_threads( }) .collect::>(); + if !pre_process_all_commands { + generate_and_send_commands(config, &send)?; + } + + // Drop the sender channel to notify the workers that we are finished. + std::mem::drop(send); + // Wait for the workers to finish and collect their reports. let reports = handles .into_iter() @@ -314,26 +357,25 @@ async fn run_multi_tasks( num_clients: u16, cache_driver: impl AsyncCacheDriver + Clone + Send + 'static, report_builder: ReportBuilder, + pre_process_all_commands: bool, ) -> anyhow::Result { let report_builder = Arc::new(report_builder); - let (send, receive) = crossbeam_channel::unbounded::>(); + let (send, receive) = if pre_process_all_commands { + crossbeam_channel::unbounded::>() + } else { + crossbeam_channel::bounded(100) + }; - // In order to have the minimum harness overhead and not have many consumers - // waiting for the single producer, we buffer all operations in a channel. - let mut counter = 0; - for _ in 0..(config.repeat.unwrap_or(1)) { - let f = File::open(config.trace_file.path())?; - let reader = BufReader::new(f); - for chunk in reader.lines().enumerate().chunks(BATCH_SIZE).into_iter() { - let chunk = chunk.map(|(i, r)| r.map(|s| (i, s))); - let commands = load_gen::generate_commands(config, BATCH_SIZE, &mut counter, chunk)?; - send.send(commands)?; - } + if pre_process_all_commands { + // In order to have the minimum harness overhead and not have many consumers + // waiting for the single producer, we buffer all operations in a channel. + // https://github.com/moka-rs/mokabench/pull/6 + generate_and_send_commands(config, &send)?; + } else { + // Read the whole trace file to prime the disk cache of the filesystem. + read_trace_file(config)?; } - // Drop the sender channel to notify the workers that we are finished. - std::mem::drop(send); - let instant = Instant::now(); let handles = (0..num_clients) .map(|_| { @@ -356,6 +398,13 @@ async fn run_multi_tasks( }) .collect::>(); + if !pre_process_all_commands { + generate_and_send_commands(config, &send)?; + } + + // Drop the sender channel to notify the workers that we are finished. + std::mem::drop(send); + // Wait for the workers to finish and collect their reports. let reports = futures_util::future::join_all(handles).await; let elapsed = instant.elapsed(); @@ -378,3 +427,34 @@ async fn run_multi_tasks( Ok(report) } + +fn should_pre_process_all_commands() -> bool { + cfg!(not(feature = "metrics")) +} + +fn generate_and_send_commands( + config: &Config, + send_channel: &Sender>, +) -> anyhow::Result<()> { + let mut counter = 0; + for _ in 0..(config.repeat.unwrap_or(1)) { + let f = File::open(config.trace_file.path())?; + let reader = BufReader::new(f); + for chunk in reader.lines().enumerate().chunks(BATCH_SIZE).into_iter() { + let chunk = chunk.map(|(i, r)| r.map(|s| (i, s))); + let commands = load_gen::generate_commands(config, BATCH_SIZE, &mut counter, chunk)?; + send_channel.send(commands)?; + } + } + + Ok(()) +} + +/// Read the whole trace file to prime the disk cache of the filesystem. +fn read_trace_file(config: &Config) -> anyhow::Result<()> { + let f = File::open(config.trace_file.path())?; + let reader = BufReader::new(f); + std::hint::black_box(reader.lines().count()); + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index ad845b2..edbfe32 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use anyhow::Context; use mokabench::{ self, @@ -7,6 +9,16 @@ use mokabench::{ use clap::{Arg, Command}; +#[cfg(not(target_env = "msvc"))] +use jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +#[cfg(target_env = "msvc")] +compile_error!("Sorry, Windows MSVC target is not supported because we cannot use jemalloc there"); + #[cfg(feature = "rt-tokio")] #[tokio::main] async fn main() -> anyhow::Result<()> { @@ -43,6 +55,8 @@ async fn run(async_rt_name: &str) -> anyhow::Result<()> { } async fn run_with_capacity(config: &Config, capacity: usize) -> anyhow::Result<()> { + mokabench::run_metrics_exporter(Duration::from_secs(10)).await; + const DEFAULT_NUM_CLIENTS_ARRAY: &[u16] = &[16, 24, 32, 40, 48]; let num_clients_slice: &[u16] = if let Some(n) = &config.num_clients { @@ -108,7 +122,7 @@ async fn run_with_capacity(config: &Config, capacity: usize) -> anyhow::Result<( } for num_clients in num_clients_slice { - let report = mokabench::run_multi_threads_moka_sync(config, capacity, *num_clients)?; + let report = mokabench::run_multi_threads_moka_sync(config, capacity, *num_clients).await?; println!("{}", report.to_csv_record()); } @@ -120,15 +134,14 @@ async fn run_with_capacity(config: &Config, capacity: usize) -> anyhow::Result<( let num_segments = 8; for num_clients in num_clients_slice { - let report = mokabench::run_multi_threads_moka_segment( - config, - capacity, - *num_clients, - num_segments, - )?; + let report = + mokabench::run_multi_threads_moka_segment(config, capacity, *num_clients, num_segments) + .await?; println!("{}", report.to_csv_record()); } + mokabench::run_metrics_exporter(Duration::from_secs(10)).await; + Ok(()) } diff --git a/src/metrics_exporter.rs b/src/metrics_exporter.rs new file mode 100644 index 0000000..f8cea89 --- /dev/null +++ b/src/metrics_exporter.rs @@ -0,0 +1,8 @@ +#[cfg_attr(feature = "metrics", path = "metrics_exporter/statsd_exporter.rs")] +#[cfg_attr( + not(feature = "metrics"), + path = "metrics_exporter/disabled_exporter.rs" +)] +pub(crate) mod exporter; + +pub(crate) use exporter::{init, shutdown}; diff --git a/src/metrics_exporter/disabled_exporter.rs b/src/metrics_exporter/disabled_exporter.rs new file mode 100644 index 0000000..70120f5 --- /dev/null +++ b/src/metrics_exporter/disabled_exporter.rs @@ -0,0 +1,3 @@ +pub(crate) async fn init(_endpoint: &str) {} + +pub(crate) fn shutdown() {} diff --git a/src/metrics_exporter/statsd_exporter.rs b/src/metrics_exporter/statsd_exporter.rs new file mode 100644 index 0000000..25f2852 --- /dev/null +++ b/src/metrics_exporter/statsd_exporter.rs @@ -0,0 +1,166 @@ +use std::{ + fmt, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, Mutex, + }, + thread::JoinHandle, + time::Duration, +}; + +use metrics::{register_gauge, Gauge}; +use metrics_exporter_dogstatsd::StatsdBuilder; +use once_cell::sync::OnceCell; + +static EPOCH: AtomicU64 = AtomicU64::new(0); + +pub(crate) async fn init(endpoint: &str) { + if SHARED.get().is_none() { + let mut exporter = MetricsExporter::default(); + exporter.init_metrics_exporter(endpoint).await; + + SHARED + .set(exporter) + .expect("Failed to initialize MetricsExporter. {e}"); + } + + SHARED.get().unwrap().init(); +} + +pub(crate) fn shutdown() { + MetricsExporter::shared().shutdown(); +} + +static SHARED: OnceCell = OnceCell::new(); + +#[derive(Default)] +struct MetricsExporter { + reporter: Mutex, JoinHandle<()>)>>, +} + +impl fmt::Debug for MetricsExporter { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("MetricsExporter").finish() + } +} + +impl MetricsExporter { + fn shared() -> &'static MetricsExporter { + SHARED.get().expect("MetricsExporter is not initialized") + } +} + +#[cfg(feature = "metrics")] +impl MetricsExporter { + async fn init_metrics_exporter(&mut self, endpoint: &str) { + StatsdBuilder::new() + // Cannot send to IPV4 address on macOS if it is bound to an IPV6 address. + // https://users.rust-lang.org/t/udpsocket-connect-fails-on-unspecified-host/69100 + // https://github.com/dialtone/metrics-exporter-dogstatsd/pull/3 + .with_push_gateway(endpoint, Duration::from_millis(498)) + .expect("Failed to set the push gateway to statsd exporter") + .set_global_prefix("mokabench") + .install() + .expect("Failed to install statsd exporter"); + } + + fn init(&self) { + let mut reporter = self.reporter.lock().unwrap(); + if reporter.is_some() { + return; + } + + let ar = Arc::new(AllocationReporter::default()); + + // TODO: Give a name to the thread. + let t = { + let ar2 = Arc::clone(&ar); + std::thread::spawn(move || ar2.run()) + }; + + // Wait for a few seconds to let the reporter to send some data to the statsd + // server. + std::thread::sleep(Duration::from_millis(2500)); + + *reporter = Some((ar, t)); + } + + fn shutdown(&self) { + if let Some((r, h)) = self.reporter.lock().unwrap().take() { + r.shutdown(); + h.join().unwrap(); + } + } +} + +// #[cfg()] +#[derive(Default)] +struct AllocationReporter { + is_shutting_down: AtomicBool, +} + +impl AllocationReporter { + fn run(&self) { + let resident_gauge = register_gauge!("memory.resident_mb"); + let allocated_gauge = register_gauge!("memory.allocated_mb"); + + loop { + if self.is_shutting_down.load(Ordering::Acquire) { + break; + } + EPOCH.fetch_add(1, Ordering::AcqRel); + Self::report_allocation_info(&resident_gauge, &allocated_gauge); + std::thread::sleep(Duration::from_millis(98)); + } + + Self::run_deferred(); + Self::report_allocation_info(&resident_gauge, &allocated_gauge); + } + + fn shutdown(&self) { + self.is_shutting_down.store(true, Ordering::Release); + } + + fn report_allocation_info(resident_gauge: &Gauge, allocated_gauge: &Gauge) { + use jemalloc_ctl::{epoch, stats}; + + let e = epoch::mib().unwrap(); + e.advance().unwrap(); + let resident = stats::resident::read().unwrap(); + let allocated = stats::allocated::read().unwrap(); + let resident_mb = resident as f64 / 1024.0 / 1024.0; + let allocated_mb = allocated as f64 / 1024.0 / 1024.0; + + // println!("allocation,{:.4},{:.4}", resident_mb, allocated_mb); + resident_gauge.set(resident_mb); + allocated_gauge.set(allocated_mb); + } + + /// Runs deferred destructors in crossbeam-epoch and prints the current allocation + /// info. + fn run_deferred() { + use jemalloc_ctl::{epoch, stats}; + + let mut allocated = std::usize::MAX; + let mut unchanged_count = 0usize; + loop { + crossbeam_epoch::pin().flush(); + + let e = epoch::mib().unwrap(); + e.advance().unwrap(); + let new_allocated = stats::allocated::read().unwrap(); + + if new_allocated == allocated { + unchanged_count += 1; + if unchanged_count > 50 { + break; + } + } else { + allocated = new_allocated; + unchanged_count = 0; + } + + std::thread::sleep(std::time::Duration::from_millis(2)); + } + } +}