Skip to content

Commit

Permalink
Optimize file descriptor usage across databases.
Browse files Browse the repository at this point in the history
Many database interfaces were changed to support setting a file descriptor limit upon creation of the DB. This allows better allocation of the total file descriptor budget across the multiple databases instantiated across the system. A utility function was also added to the utils crate to manage the allocation of file descriptors. This will lead to a more stable system under high load and better use of system resources.
  • Loading branch information
biryukovmaxim committed Sep 28, 2023
1 parent 3033a6f commit a0d897d
Show file tree
Hide file tree
Showing 23 changed files with 161 additions and 100 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion components/addressmanager/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ mod address_store_with_cache {
// Assert that initial distribution is skewed, and hence not uniform from the outset.
assert!(bucket_reduction_ratio >= 1.25);

let db = create_temp_db!(ConnBuilder::default());
let db = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let config = Config::new(SIMNET_PARAMS);
let am = AddressManager::new(Arc::new(config), db.1);

Expand Down
17 changes: 15 additions & 2 deletions consensus/src/consensus/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ pub struct Factory {
db_parallelism: usize,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
pub fd_total_budget: i32,
}

impl Factory {
Expand All @@ -163,6 +164,7 @@ impl Factory {
db_parallelism: usize,
notification_root: Arc<ConsensusNotificationRoot>,
counters: Arc<ProcessingCounters>,
fd_total_budget: i32,
) -> Self {
let mut config = config.clone();
#[cfg(feature = "devnet-prealloc")]
Expand All @@ -175,6 +177,7 @@ impl Factory {
db_parallelism,
notification_root,
counters,
fd_total_budget,
}
}
}
Expand All @@ -199,7 +202,12 @@ impl ConsensusFactory for Factory {
};

let dir = self.db_root_dir.join(entry.directory_name.clone());
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(200.max(self.fd_total_budget * 70 / 100))
.build()
.unwrap();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand Down Expand Up @@ -227,7 +235,12 @@ impl ConsensusFactory for Factory {

let entry = self.management_store.write().new_staging_consensus_entry().unwrap();
let dir = self.db_root_dir.join(entry.directory_name);
let db = kaspa_database::prelude::ConnBuilder::default().with_db_path(dir).with_parallelism(self.db_parallelism).build();
let db = kaspa_database::prelude::ConnBuilder::default()
.with_db_path(dir)
.with_parallelism(self.db_parallelism)
.with_files_limit(10.max(self.fd_total_budget * 10 / 100))
.build()
.unwrap();

let session_lock = SessionLock::new();
let consensus = Arc::new(Consensus::new(
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/consensus/test_consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and the provided `notification_sender`
pub fn with_notifier(config: &Config, notification_sender: Sender<Notification>) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let notification_root = Arc::new(ConsensusNotificationRoot::new(notification_sender));
let counters = Arc::new(ProcessingCounters::default());
let consensus = Arc::new(Consensus::new(db, Arc::new(config.clone()), Default::default(), notification_root, counters, 0));
Expand All @@ -69,7 +69,7 @@ impl TestConsensus {

/// Creates a test consensus instance based on `config` with a temp DB and no notifier
pub fn new(config: &Config) -> Self {
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let (dummy_notification_sender, _) = async_channel::unbounded();
let notification_root = Arc::new(ConsensusNotificationRoot::new(dummy_notification_sender));
let counters = Arc::new(ProcessingCounters::default());
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/model/stores/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ impl RelationsStore for MemoryRelationsStore {
mod tests {
use super::*;
use crate::processes::relations::RelationsStoreExtensions;
use kaspa_database::create_temp_db;

#[test]
fn test_memory_relations_store() {
Expand All @@ -299,8 +300,7 @@ mod tests {

#[test]
fn test_db_relations_store() {
let db_tempdir = kaspa_database::utils::get_kaspa_tempdir();
let db = Arc::new(DB::open_default(db_tempdir.path().to_owned().to_str().unwrap()).unwrap());
let (_, db) = create_temp_db!(kaspa_database::prelude::ConnBuilder::default().with_files_limit(10));
test_relations_store(DbRelationsStore::new(db, 0, 2));
}

Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/pruning_proof/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ impl PruningProofManager {
let proof_pp_header = proof[0].last().expect("checked if empty");
let proof_pp = proof_pp_header.hash;
let proof_pp_level = calc_block_level(proof_pp_header, self.max_block_level);
let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default());
let (db_lifetime, db) = kaspa_database::create_temp_db!(ConnBuilder::default().with_files_limit(10));
let headers_store = Arc::new(DbHeadersStore::new(db.clone(), 2 * self.pruning_proof_m)); // TODO: Think about cache size
let ghostdag_stores = (0..=self.max_block_level)
.map(|level| Arc::new(DbGhostdagStore::new(db.clone(), level, 2 * self.pruning_proof_m)))
Expand Down
4 changes: 2 additions & 2 deletions consensus/src/processes/reachability/inquirer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ mod tests {
/// Runs a DAG test-case with full verification using the staging store mechanism.
/// Note: runtime is quadratic in the number of blocks so should be used with mildly small DAGs (~50)
fn run_dag_test_case_with_staging(test: &DagTestCase) {
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = test.blocks.len() as u64 / 3;
let reachability = RwLock::new(DbReachabilityStore::new(db.clone(), cache_size));
let relations = RwLock::new(DbRelationsStore::with_prefix(db.clone(), &[], 0));
Expand Down Expand Up @@ -533,7 +533,7 @@ mod tests {
run_dag_test_case(&mut relations, &mut reachability, &test);

// Run with direct DB stores
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = test.blocks.len() as u64 / 3;
let mut reachability = DbReachabilityStore::new(db.clone(), cache_size);
let mut relations = DbRelationsStore::new(db, 0, cache_size);
Expand Down
2 changes: 1 addition & 1 deletion consensus/src/processes/relations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ mod tests {

#[test]
fn test_delete_level_relations_zero_cache() {
let (_lifetime, db) = create_temp_db!(ConnBuilder::default());
let (_lifetime, db) = create_temp_db!(ConnBuilder::default().with_files_limit(10));
let cache_size = 0;
let mut relations = DbRelationsStore::new(db.clone(), 0, cache_size);
relations.insert(ORIGIN, Default::default()).unwrap();
Expand Down
1 change: 0 additions & 1 deletion database/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ tempfile.workspace = true

enum-primitive-derive = "0.2.2"
num-traits = "0.2.15"
rlimit = "0.10.1"
29 changes: 27 additions & 2 deletions database/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,37 @@
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::ops::{Deref, DerefMut};
use std::path::PathBuf;

pub use conn_builder::ConnBuilder;
use kaspa_utils::fd_budget::FDGuard;

mod conn_builder;

/// The DB type used for Kaspad stores
pub type DB = DBWithThreadMode<MultiThreaded>;
pub struct DB {
inner: DBWithThreadMode<MultiThreaded>,
_fd_guard: FDGuard,
}

impl DB {
pub fn new(inner: DBWithThreadMode<MultiThreaded>, fd_guard: FDGuard) -> Self {
Self { inner, _fd_guard: fd_guard }
}
}

impl DerefMut for DB {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}

impl Deref for DB {
type Target = DBWithThreadMode<MultiThreaded>;

fn deref(&self) -> &Self::Target {
&self.inner
}
}

/// Deletes an existing DB if it exists
pub fn delete_db(db_dir: PathBuf) {
Expand All @@ -15,5 +40,5 @@ pub fn delete_db(db_dir: PathBuf) {
}
let options = rocksdb::Options::default();
let path = db_dir.to_str().unwrap();
DB::destroy(&options, path).expect("DB is expected to be deletable");
<DBWithThreadMode<MultiThreaded>>::destroy(&options, path).expect("DB is expected to be deletable");
}
93 changes: 48 additions & 45 deletions database/src/db/conn_builder.rs
Original file line number Diff line number Diff line change
@@ -1,59 +1,67 @@
use crate::db::DB;
use std::{cmp::min, path::PathBuf, sync::Arc};
use rocksdb::{DBWithThreadMode, MultiThreaded};
use std::{path::PathBuf, sync::Arc};

#[derive(Debug, Copy, Clone)]
#[derive(Debug)]
pub struct Unspecified;

#[derive(Debug, Clone)]
pub struct ConnBuilder<Path: Clone, const STATS_ENABLED: bool, StatsPeriod: Clone> {
#[derive(Debug)]
pub struct ConnBuilder<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit> {
db_path: Path,
create_if_missing: bool,
parallelism: usize,
files_limit: i32,
files_limit: FDLimit,
mem_budget: usize,
stats_period: StatsPeriod,
}

impl Default for ConnBuilder<Unspecified, false, Unspecified> {
impl Default for ConnBuilder<Unspecified, false, Unspecified, Unspecified> {
fn default() -> Self {
ConnBuilder {
db_path: Unspecified,
create_if_missing: true,
parallelism: 1,
files_limit: 500,
mem_budget: 64 * 1024 * 1024,
stats_period: Unspecified,
files_limit: Unspecified,
}
}
}

impl<Path: Clone, const STATS_ENABLED: bool, StatsPeriod: Clone> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder<PathBuf, STATS_ENABLED, StatsPeriod> {
impl<Path, const STATS_ENABLED: bool, StatsPeriod, FDLimit> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
pub fn with_db_path(self, db_path: PathBuf) -> ConnBuilder<PathBuf, STATS_ENABLED, StatsPeriod, FDLimit> {
ConnBuilder {
db_path,
files_limit: self.files_limit,
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
files_limit: self.files_limit,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
}
}
pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
pub fn with_create_if_missing(self, create_if_missing: bool) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
ConnBuilder { create_if_missing, ..self }
}
pub fn with_parallelism(self, parallelism: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
pub fn with_parallelism(self, parallelism: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
ConnBuilder { parallelism: parallelism.into(), ..self }
}
pub fn with_files_limit(self, files_limit: impl Into<i32>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
ConnBuilder { files_limit: files_limit.into(), ..self }
}
pub fn with_mem_budget(self, mem_budget: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod> {
pub fn with_mem_budget(self, mem_budget: impl Into<usize>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, FDLimit> {
ConnBuilder { mem_budget: mem_budget.into(), ..self }
}
pub fn with_files_limit(self, files_limit: impl Into<i32>) -> ConnBuilder<Path, STATS_ENABLED, StatsPeriod, i32> {
ConnBuilder {
db_path: self.db_path,
files_limit: files_limit.into(),
create_if_missing: self.create_if_missing,
parallelism: self.parallelism,
mem_budget: self.mem_budget,
stats_period: self.stats_period,
}
}
}

impl<Path: Clone> ConnBuilder<Path, false, Unspecified> {
pub fn enable_stats(self) -> ConnBuilder<Path, true, Unspecified> {
impl<Path, FDLimit> ConnBuilder<Path, false, Unspecified, FDLimit> {
pub fn enable_stats(self) -> ConnBuilder<Path, true, Unspecified, FDLimit> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
Expand All @@ -65,8 +73,8 @@ impl<Path: Clone> ConnBuilder<Path, false, Unspecified> {
}
}

impl<Path: Clone, StatsPeriod: Clone> ConnBuilder<Path, true, StatsPeriod> {
pub fn disable_stats(self) -> ConnBuilder<Path, false, Unspecified> {
impl<Path, StatsPeriod, FDLimit> ConnBuilder<Path, true, StatsPeriod, FDLimit> {
pub fn disable_stats(self) -> ConnBuilder<Path, false, Unspecified, FDLimit> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
Expand All @@ -76,7 +84,7 @@ impl<Path: Clone, StatsPeriod: Clone> ConnBuilder<Path, true, StatsPeriod> {
stats_period: Unspecified,
}
}
pub fn with_stats_period(self, stats_period: impl Into<u32>) -> ConnBuilder<Path, true, u32> {
pub fn with_stats_period(self, stats_period: impl Into<u32>) -> ConnBuilder<Path, true, u32, FDLimit> {
ConnBuilder {
db_path: self.db_path,
create_if_missing: self.create_if_missing,
Expand All @@ -94,44 +102,39 @@ macro_rules! default_opts {
if $self.parallelism > 1 {
opts.increase_parallelism($self.parallelism as i32);
}
opts.optimize_level_style_compaction($self.mem_budget);

#[cfg(target_os = "windows")]
let files_limit = rlimit::getmaxstdio() as i32;
#[cfg(any(target_os = "macos", target_os = "linux"))]
let files_limit = rlimit::getrlimit(rlimit::Resource::NOFILE).unwrap().0 as i32;
// In most linux environments the limit is set to 1024, so we use 500 to give sufficient slack.
// TODO: fine-tune this parameter and additional parameters related to max file size
opts.set_max_open_files(min(files_limit, $self.files_limit));
opts.optimize_level_style_compaction($self.mem_budget);
let guard = kaspa_utils::fd_budget::acquire_guard($self.files_limit)?;
opts.set_max_open_files($self.files_limit);
opts.create_if_missing($self.create_if_missing);
opts
Ok((opts, guard))
}};
}

impl ConnBuilder<PathBuf, false, Unspecified> {
pub fn build(self) -> Arc<DB> {
let opts = default_opts!(self);
let db = Arc::new(DB::open(&opts, self.db_path.to_str().unwrap()).unwrap());
db
impl ConnBuilder<PathBuf, false, Unspecified, i32> {
pub fn build(self) -> Result<Arc<DB>, kaspa_utils::fd_budget::Error> {
let (opts, guard) = default_opts!(self)?;
let db = Arc::new(DB::new(<DBWithThreadMode<MultiThreaded>>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard));
Ok(db)
}
}

impl ConnBuilder<PathBuf, true, Unspecified> {
pub fn build(self) -> Arc<DB> {
let mut opts = default_opts!(self);
impl ConnBuilder<PathBuf, true, Unspecified, i32> {
pub fn build(self) -> Result<Arc<DB>, kaspa_utils::fd_budget::Error> {
let (mut opts, guard) = default_opts!(self)?;
opts.enable_statistics();
let db = Arc::new(DB::open(&opts, self.db_path.to_str().unwrap()).unwrap());
db
let db = Arc::new(DB::new(<DBWithThreadMode<MultiThreaded>>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard));
Ok(db)
}
}

impl ConnBuilder<PathBuf, true, u32> {
pub fn build(self) -> Arc<DB> {
let mut opts = default_opts!(self);
impl ConnBuilder<PathBuf, true, u32, i32> {
pub fn build(self) -> Result<Arc<DB>, kaspa_utils::fd_budget::Error> {
let (mut opts, guard) = default_opts!(self)?;
opts.enable_statistics();
opts.set_report_bg_io_stats(true);
opts.set_stats_dump_period_sec(self.stats_period);
let db = Arc::new(DB::open(&opts, self.db_path.to_str().unwrap()).unwrap());
db
let db = Arc::new(DB::new(<DBWithThreadMode<MultiThreaded>>::open(&opts, self.db_path.to_str().unwrap()).unwrap(), guard));
Ok(db)
}
}
Loading

0 comments on commit a0d897d

Please sign in to comment.