Skip to content

Commit

Permalink
validator: Add CLI args to control rocksdb threadpool sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
steviez committed Dec 29, 2024
1 parent 34e95be commit 36f0b91
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 5 deletions.
9 changes: 5 additions & 4 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
blockstore_meta::*,
blockstore_metrics::BlockstoreRpcApiMetrics,
blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
},
blockstore_processor::BlockstoreProcessorError,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -90,7 +90,9 @@ pub mod blockstore_purge;
use static_assertions::const_assert_eq;
pub use {
crate::{
blockstore_db::BlockstoreError,
blockstore_db::{
default_num_compaction_threads, default_num_flush_threads, BlockstoreError,
},
blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta},
blockstore_metrics::BlockstoreInsertionMetrics,
},
Expand Down Expand Up @@ -4961,10 +4963,9 @@ pub fn create_new_ledger(
let blockstore = Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: AccessType::Primary,
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: column_options.clone(),
..BlockstoreOptions::default()
},
)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
Expand Down
13 changes: 13 additions & 0 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use {
fs,
marker::PhantomData,
mem,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -2032,6 +2033,18 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}

/// The default number of threads to use for rocksdb compaction in the rocksdb
/// low priority threadpool
pub fn default_num_compaction_threads() -> NonZeroUsize {
NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero")
}

/// The default number of threads to use for rocksdb memtable flushes in the
/// rocksdb high priority threadpool
pub fn default_num_flush_threads() -> NonZeroUsize {
NonZeroUsize::new((num_cpus::get() / 4).max(1)).expect("thread count is non-zero")
}

// Returns whether automatic compactions should be disabled for the entire
// database based upon the given access type.
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
Expand Down
12 changes: 11 additions & 1 deletion ledger/src/blockstore_options.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode};
use {
crate::blockstore_db::{default_num_compaction_threads, default_num_flush_threads},
rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode},
std::num::NonZeroUsize,
};

/// The subdirectory under ledger directory where the Blockstore lives
pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb";
Expand All @@ -13,6 +17,8 @@ pub struct BlockstoreOptions {
// desired open file descriptor limit cannot be configured. Default: true.
pub enforce_ulimit_nofile: bool,
pub column_options: LedgerColumnOptions,
pub num_rocksdb_compaction_threads: NonZeroUsize,
pub num_rocksdb_flush_threads: NonZeroUsize,
}

impl Default for BlockstoreOptions {
Expand All @@ -25,6 +31,8 @@ impl Default for BlockstoreOptions {
recovery_mode: None,
enforce_ulimit_nofile: true,
column_options: LedgerColumnOptions::default(),
num_rocksdb_compaction_threads: default_num_compaction_threads(),
num_rocksdb_flush_threads: default_num_flush_threads(),
}
}
}
Expand All @@ -36,6 +44,8 @@ impl BlockstoreOptions {
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: LedgerColumnOptions::default(),
num_rocksdb_compaction_threads: default_num_compaction_threads(),
num_rocksdb_flush_threads: default_num_flush_threads(),
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub struct DefaultThreadArgs {
pub rayon_global_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub rocksdb_compaction_threads: String,
pub rocksdb_flush_threads: String,
pub tvu_receive_threads: String,
pub tvu_sigverify_threads: String,
}
Expand All @@ -36,6 +38,8 @@ impl Default for DefaultThreadArgs {
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
}
Expand All @@ -52,6 +56,8 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
]
Expand All @@ -77,6 +83,8 @@ pub struct NumThreadConfig {
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub rocksdb_compaction_threads: NonZeroUsize,
pub rocksdb_flush_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
pub tvu_sigverify_threads: NonZeroUsize,
}
Expand Down Expand Up @@ -119,6 +127,16 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
rocksdb_compaction_threads: value_t_or_exit!(
matches,
RocksdbCompactionThreadsArg::NAME,
NonZeroUsize
),
rocksdb_flush_threads: value_t_or_exit!(
matches,
RocksdbFlushThreadsArg::NAME,
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
tvu_sigverify_threads: value_t_or_exit!(
matches,
Expand Down Expand Up @@ -257,6 +275,28 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
}
}

struct RocksdbCompactionThreadsArg;
impl ThreadArg for RocksdbCompactionThreadsArg {
const NAME: &'static str = "rocksdb_compaction_threads";
const LONG_NAME: &'static str = "rocksdb-compaction-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";

fn default() -> usize {
solana_ledger::blockstore::default_num_compaction_threads().get()
}
}

struct RocksdbFlushThreadsArg;
impl ThreadArg for RocksdbFlushThreadsArg {
const NAME: &'static str = "rocksdb_flush_threads";
const LONG_NAME: &'static str = "rocksdb-flush-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";

fn default() -> usize {
solana_ledger::blockstore::default_num_flush_threads().get()
}
}

struct TvuReceiveThreadsArg;
impl ThreadArg for TvuReceiveThreadsArg {
const NAME: &'static str = "tvu_receive_threads";
Expand Down
4 changes: 4 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,8 @@ pub fn main() {
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
rocksdb_compaction_threads,
rocksdb_flush_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);
Expand Down Expand Up @@ -1055,6 +1057,8 @@ pub fn main() {
enforce_ulimit_nofile: true,
// The validator needs primary (read/write)
access_type: AccessType::Primary,
num_rocksdb_compaction_threads: rocksdb_compaction_threads,
num_rocksdb_flush_threads: rocksdb_flush_threads,
};

let accounts_hash_cache_path = matches
Expand Down

0 comments on commit 36f0b91

Please sign in to comment.