Skip to content

Commit

Permalink
Merge branch 'master' into ao-no-overlay
Browse files Browse the repository at this point in the history
* master:
  kvdb-rocksdb: bump version (#348)
  kvdb-rocksdb: expose RocksDB stats (#347)
  Implement Error for FromDecStrErr (#346)
  Fix clippy lints for rlp-derive (#345)
  • Loading branch information
ordian committed Mar 2, 2020
2 parents 488cb36 + 01ccef7 commit 20a62a4
Show file tree
Hide file tree
Showing 9 changed files with 232 additions and 100 deletions.
5 changes: 4 additions & 1 deletion kvdb-rocksdb/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ The format is based on [Keep a Changelog].
[Keep a Changelog]: http://keepachangelog.com/en/1.0.0/

## [Unreleased]
- License changed from GPL3 to dual MIT/Apache2. [#342](https://github.com/paritytech/parity-common/pull/342)
### Breaking
- Updated to the new `kvdb` interface. [#313](https://github.com/paritytech/parity-common/pull/313)

## [0.6.0] - 2019-02-28
- License changed from GPL3 to dual MIT/Apache2. [#342](https://github.com/paritytech/parity-common/pull/342)
- Added `get_statistics` method and `enable_statistics` config parameter. [#347](https://github.com/paritytech/parity-common/pull/347)

## [0.5.0] - 2019-02-05
- Bump parking_lot to 0.10. [#332](https://github.com/paritytech/parity-common/pull/332)

Expand Down
2 changes: 1 addition & 1 deletion kvdb-rocksdb/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "kvdb-rocksdb"
version = "0.5.0"
version = "0.6.0"
authors = ["Parity Technologies <admin@parity.io>"]
repository = "https://github.com/paritytech/parity-common"
description = "kvdb implementation backed by RocksDB"
Expand Down
57 changes: 54 additions & 3 deletions kvdb-rocksdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ pub struct DatabaseConfig {
pub columns: u32,
/// Specify the maximum number of info/debug log files to be kept.
pub keep_log_file_num: i32,
/// Enable native RocksDB statistics.
/// Disabled by default.
///
/// It can have a negative performance impact up to 10% according to
/// https://github.com/facebook/rocksdb/wiki/Statistics.
pub enable_statistics: bool,
}

impl DatabaseConfig {
Expand Down Expand Up @@ -208,6 +214,7 @@ impl Default for DatabaseConfig {
compaction: CompactionProfile::default(),
columns: 1,
keep_log_file_num: 1,
enable_statistics: false,
}
}
}
Expand Down Expand Up @@ -260,6 +267,8 @@ pub struct Database {
config: DatabaseConfig,
path: String,
#[ignore_malloc_size_of = "insignificant"]
opts: Options,
#[ignore_malloc_size_of = "insignificant"]
write_opts: WriteOptions,
#[ignore_malloc_size_of = "insignificant"]
read_opts: ReadOptions,
Expand Down Expand Up @@ -290,6 +299,10 @@ fn is_corrupted(err: &Error) -> bool {
fn generate_options(config: &DatabaseConfig) -> Options {
let mut opts = Options::default();

opts.set_report_bg_io_stats(true);
if config.enable_statistics {
opts.enable_statistics();
}
opts.set_use_fsync(false);
opts.create_if_missing(true);
opts.set_max_open_files(config.max_open_files);
Expand Down Expand Up @@ -392,6 +405,7 @@ impl Database {
db: RwLock::new(Some(DBAndColumns { db, column_names })),
config: config.clone(),
path: path.to_owned(),
opts,
read_opts,
write_opts,
block_opts,
Expand Down Expand Up @@ -592,6 +606,15 @@ impl Database {
None => Ok(()),
}
}

/// Get RocksDB statistics.
pub fn get_statistics(&self) -> HashMap<String, stats::RocksDbStatsValue> {
if let Some(stats) = self.opts.get_statistics() {
stats::parse_rocksdb_stats(&stats)
} else {
HashMap::new()
}
}
}

// duplicate declaration of methods here to avoid trait import in certain existing cases
Expand Down Expand Up @@ -624,6 +647,13 @@ impl KeyValueDB for Database {
}

fn io_stats(&self, kind: kvdb::IoStatsKind) -> kvdb::IoStats {
let rocksdb_stats = self.get_statistics();
let cache_hit_count = rocksdb_stats.get("block.cache.hit").map(|s| s.count).unwrap_or(0u64);
let overall_stats = self.stats.overall();
let old_cache_hit_count = overall_stats.raw.cache_hit_count;

self.stats.tally_cache_hit_count(cache_hit_count - old_cache_hit_count);

let taken_stats = match kind {
kvdb::IoStatsKind::Overall => self.stats.overall(),
kvdb::IoStatsKind::SincePrevious => self.stats.since_previous(),
Expand All @@ -636,7 +666,7 @@ impl KeyValueDB for Database {
stats.transactions = taken_stats.raw.transactions;
stats.bytes_written = taken_stats.raw.bytes_written;
stats.bytes_read = taken_stats.raw.bytes_read;

stats.cache_reads = taken_stats.raw.cache_hit_count;
stats.started = taken_stats.started;
stats.span = taken_stats.started.elapsed();

Expand Down Expand Up @@ -709,6 +739,7 @@ mod tests {
compaction: CompactionProfile::default(),
columns: 11,
keep_log_file_num: 1,
enable_statistics: false,
};

let db = Database::open(&config, tempdir.path().to_str().unwrap()).unwrap();
Expand Down Expand Up @@ -844,20 +875,40 @@ mod tests {
assert_eq!(c.memory_budget(), 45 * MB, "total budget is the sum of the column budget");
}

#[test]
fn test_stats_parser() {
let raw = r#"rocksdb.row.cache.hit COUNT : 1
rocksdb.db.get.micros P50 : 2.000000 P95 : 3.000000 P99 : 4.000000 P100 : 5.000000 COUNT : 0 SUM : 15
"#;
let stats = stats::parse_rocksdb_stats(raw);
assert_eq!(stats["row.cache.hit"].count, 1);
assert!(stats["row.cache.hit"].times.is_none());
assert_eq!(stats["db.get.micros"].count, 0);
let get_times = stats["db.get.micros"].times.unwrap();
assert_eq!(get_times.sum, 15);
assert_eq!(get_times.p50, 2.0);
assert_eq!(get_times.p95, 3.0);
assert_eq!(get_times.p99, 4.0);
assert_eq!(get_times.p100, 5.0);
}

#[test]
fn rocksdb_settings() {
const NUM_COLS: usize = 2;
let mut cfg = DatabaseConfig::with_columns(NUM_COLS as u32);
let mut cfg = DatabaseConfig { enable_statistics: true, ..DatabaseConfig::with_columns(NUM_COLS as u32) };
cfg.max_open_files = 123; // is capped by the OS fd limit (typically 1024)
cfg.compaction.block_size = 323232;
cfg.compaction.initial_file_size = 102030;
cfg.memory_budget = [(0, 30), (1, 300)].iter().cloned().collect();

let db_path = TempDir::new("config_test").expect("the OS can create tmp dirs");
let _db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db");
let db = Database::open(&cfg, db_path.path().to_str().unwrap()).expect("can open a db");
let mut rocksdb_log = std::fs::File::open(format!("{}/LOG", db_path.path().to_str().unwrap()))
.expect("rocksdb creates a LOG file");
let mut settings = String::new();
let statistics = db.get_statistics();
assert!(statistics.contains_key("block.cache.hit"));

rocksdb_log.read_to_string(&mut settings).unwrap();
// Check column count
assert!(settings.contains("Options for column family [default]"), "no default col");
Expand Down
68 changes: 63 additions & 5 deletions kvdb-rocksdb/src/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,68 @@
// except according to those terms.

use parking_lot::RwLock;
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::atomic::{AtomicU64, Ordering as AtomicOrdering};
use std::time::Instant;

#[derive(Default, Clone, Copy)]
pub struct RawDbStats {
pub reads: u64,
pub writes: u64,
pub bytes_written: u64,
pub bytes_read: u64,
pub transactions: u64,
pub cache_hit_count: u64,
}

#[derive(Default, Debug, Clone, Copy)]
pub struct RocksDbStatsTimeValue {
/// 50% percentile
pub p50: f64,
/// 95% percentile
pub p95: f64,
/// 99% percentile
pub p99: f64,
/// 100% percentile
pub p100: f64,
pub sum: u64,
}

#[derive(Default, Debug, Clone, Copy)]
pub struct RocksDbStatsValue {
pub count: u64,
pub times: Option<RocksDbStatsTimeValue>,
}

pub fn parse_rocksdb_stats(stats: &str) -> HashMap<String, RocksDbStatsValue> {
stats.lines().map(|line| parse_rocksdb_stats_row(line.splitn(2, ' '))).collect()
}

fn parse_rocksdb_stats_row<'a>(mut iter: impl Iterator<Item = &'a str>) -> (String, RocksDbStatsValue) {
const PROOF: &str = "rocksdb statistics format is valid and hasn't changed";
const SEPARATOR: &str = " : ";
let key = iter.next().expect(PROOF).trim_start_matches("rocksdb.").to_owned();
let values = iter.next().expect(PROOF);
let value = if values.starts_with("COUNT") {
// rocksdb.row.cache.hit COUNT : 0
RocksDbStatsValue {
count: u64::from_str(values.rsplit(SEPARATOR).next().expect(PROOF)).expect(PROOF),
times: None,
}
} else {
// rocksdb.db.get.micros P50 : 0.000000 P95 : 0.000000 P99 : 0.000000 P100 : 0.000000 COUNT : 0 SUM : 0
let values: Vec<&str> = values.split_whitespace().filter(|s| *s != ":").collect();
let times = RocksDbStatsTimeValue {
p50: f64::from_str(values.get(1).expect(PROOF)).expect(PROOF),
p95: f64::from_str(values.get(3).expect(PROOF)).expect(PROOF),
p99: f64::from_str(values.get(5).expect(PROOF)).expect(PROOF),
p100: f64::from_str(values.get(7).expect(PROOF)).expect(PROOF),
sum: u64::from_str(values.get(11).expect(PROOF)).expect(PROOF),
};
RocksDbStatsValue { count: u64::from_str(values.get(9).expect(PROOF)).expect(PROOF), times: Some(times) }
};
(key, value)
}

impl RawDbStats {
Expand All @@ -26,6 +79,7 @@ impl RawDbStats {
bytes_written: self.bytes_written + other.bytes_written,
bytes_read: self.bytes_read + other.bytes_written,
transactions: self.transactions + other.transactions,
cache_hit_count: self.cache_hit_count + other.cache_hit_count,
}
}
}
Expand All @@ -38,11 +92,7 @@ struct OverallDbStats {

impl OverallDbStats {
fn new() -> Self {
OverallDbStats {
stats: RawDbStats { reads: 0, writes: 0, bytes_written: 0, bytes_read: 0, transactions: 0 },
last_taken: Instant::now(),
started: Instant::now(),
}
OverallDbStats { stats: RawDbStats::default(), last_taken: Instant::now(), started: Instant::now() }
}
}

Expand All @@ -52,6 +102,7 @@ pub struct RunningDbStats {
bytes_written: AtomicU64,
bytes_read: AtomicU64,
transactions: AtomicU64,
cache_hit_count: AtomicU64,
overall: RwLock<OverallDbStats>,
}

Expand All @@ -68,6 +119,7 @@ impl RunningDbStats {
writes: 0.into(),
bytes_written: 0.into(),
transactions: 0.into(),
cache_hit_count: 0.into(),
overall: OverallDbStats::new().into(),
}
}
Expand All @@ -92,13 +144,18 @@ impl RunningDbStats {
self.transactions.fetch_add(val, AtomicOrdering::Relaxed);
}

pub fn tally_cache_hit_count(&self, val: u64) {
self.cache_hit_count.fetch_add(val, AtomicOrdering::Relaxed);
}

fn take_current(&self) -> RawDbStats {
RawDbStats {
reads: self.reads.swap(0, AtomicOrdering::Relaxed),
writes: self.writes.swap(0, AtomicOrdering::Relaxed),
bytes_written: self.bytes_written.swap(0, AtomicOrdering::Relaxed),
bytes_read: self.bytes_read.swap(0, AtomicOrdering::Relaxed),
transactions: self.transactions.swap(0, AtomicOrdering::Relaxed),
cache_hit_count: self.cache_hit_count.swap(0, AtomicOrdering::Relaxed),
}
}

Expand All @@ -109,6 +166,7 @@ impl RunningDbStats {
bytes_written: self.bytes_written.load(AtomicOrdering::Relaxed),
bytes_read: self.bytes_read.load(AtomicOrdering::Relaxed),
transactions: self.transactions.load(AtomicOrdering::Relaxed),
cache_hit_count: self.cache_hit_count.load(AtomicOrdering::Relaxed),
}
}

Expand Down
Loading

0 comments on commit 20a62a4

Please sign in to comment.