Skip to content

Commit

Permalink
validator: Add CLI args to control rocksdb threadpool sizes
Browse files Browse the repository at this point in the history
  • Loading branch information
steviez committed Dec 29, 2024
1 parent 34e95be commit 62a73f8
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 19 deletions.
1 change: 1 addition & 0 deletions ledger-tool/src/ledger_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ pub fn open_blockstore(
recovery_mode: wal_recovery_mode.clone(),
enforce_ulimit_nofile,
column_options: LedgerColumnOptions::default(),
..BlockstoreOptions::default()
},
) {
Ok(blockstore) => blockstore,
Expand Down
9 changes: 5 additions & 4 deletions ledger/src/blockstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use {
blockstore_meta::*,
blockstore_metrics::BlockstoreRpcApiMetrics,
blockstore_options::{
AccessType, BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
BlockstoreOptions, LedgerColumnOptions, BLOCKSTORE_DIRECTORY_ROCKS_LEVEL,
},
blockstore_processor::BlockstoreProcessorError,
leader_schedule_cache::LeaderScheduleCache,
Expand Down Expand Up @@ -90,7 +90,9 @@ pub mod blockstore_purge;
use static_assertions::const_assert_eq;
pub use {
crate::{
blockstore_db::BlockstoreError,
blockstore_db::{
default_num_compaction_threads, default_num_flush_threads, BlockstoreError,
},
blockstore_meta::{OptimisticSlotMetaVersioned, SlotMeta},
blockstore_metrics::BlockstoreInsertionMetrics,
},
Expand Down Expand Up @@ -4961,10 +4963,9 @@ pub fn create_new_ledger(
let blockstore = Blockstore::open_with_options(
ledger_path,
BlockstoreOptions {
access_type: AccessType::Primary,
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: column_options.clone(),
..BlockstoreOptions::default()
},
)?;
let ticks_per_slot = genesis_config.ticks_per_slot;
Expand Down
39 changes: 25 additions & 14 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use {
fs,
marker::PhantomData,
mem,
num::NonZeroUsize,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -415,13 +416,13 @@ pub(crate) struct Rocks {

impl Rocks {
pub(crate) fn open(path: PathBuf, options: BlockstoreOptions) -> Result<Rocks> {
let access_type = options.access_type.clone();
// let access_type = options.access_type.clone();
let recovery_mode = options.recovery_mode.clone();

fs::create_dir_all(&path)?;

// Use default database options
let mut db_options = get_db_options(&access_type);
let mut db_options = get_db_options(&options);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
}
Expand All @@ -430,7 +431,7 @@ impl Rocks {
let column_options = Arc::from(options.column_options);

// Open the database
let db = match access_type {
let db = match options.access_type {
AccessType::Primary | AccessType::PrimaryForMaintenance => {
DB::open_cf_descriptors(&db_options, &path, cf_descriptors)?
}
Expand All @@ -452,7 +453,7 @@ impl Rocks {
let rocks = Rocks {
db,
path,
access_type,
access_type: options.access_type,
oldest_slot,
column_options,
write_batch_perf_status: PerfSamplingStatus::default(),
Expand Down Expand Up @@ -1991,7 +1992,7 @@ fn process_cf_options_advanced<C: 'static + Column + ColumnName>(
}
}

fn get_db_options(access_type: &AccessType) -> Options {
fn get_db_options(blockstore_options: &BlockstoreOptions) -> Options {
let mut options = Options::default();

// Create missing items to support a clean start
Expand All @@ -2002,15 +2003,13 @@ fn get_db_options(access_type: &AccessType) -> Options {
// pool is used for compactions whereas the high priority pool is used for
// memtable flushes. Separate pools are created so that compactions are
// unable to stall memtable flushes (which could stall memtable writes).
//
// We could call options.set_max_background_jobs(n) to automatically size
// the pools to 3n/4 low priority and n/4 high priority threads. Instead,
// set the sizes directly to sizes we want
let num_low_priority_threads = num_cpus::get();
let num_high_priority_threads = (num_cpus::get() / 4).max(1);
let mut env = rocksdb::Env::new().unwrap();
env.set_low_priority_background_threads(num_low_priority_threads as i32);
env.set_high_priority_background_threads(num_high_priority_threads as i32);
env.set_low_priority_background_threads(
blockstore_options.num_rocksdb_compaction_threads.get() as i32,
);
env.set_high_priority_background_threads(
blockstore_options.num_rocksdb_flush_threads.get() as i32
);
options.set_env(&env);
// The value set in max_background_jobs can grow but not shrink threadpool
// sizes. So, set this value to 2 (the default value and 1 low / 1 high) to
Expand All @@ -2020,7 +2019,7 @@ fn get_db_options(access_type: &AccessType) -> Options {
// Set max total wal size to 4G.
options.set_max_total_wal_size(4 * 1024 * 1024 * 1024);

if should_disable_auto_compactions(access_type) {
if should_disable_auto_compactions(&blockstore_options.access_type) {
options.set_disable_auto_compactions(true);
}

Expand All @@ -2032,6 +2031,18 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}

/// The default number of threads to use for rocksdb compaction in the rocksdb
/// low priority threadpool
pub fn default_num_compaction_threads() -> NonZeroUsize {
NonZeroUsize::new(num_cpus::get()).expect("thread count is non-zero")
}

/// The default number of threads to use for rocksdb memtable flushes in the
/// rocksdb high priority threadpool
pub fn default_num_flush_threads() -> NonZeroUsize {
NonZeroUsize::new((num_cpus::get() / 4).max(1)).expect("thread count is non-zero")
}

// Returns whether automatic compactions should be disabled for the entire
// database based upon the given access type.
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
Expand Down
12 changes: 11 additions & 1 deletion ledger/src/blockstore_options.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode};
use {
crate::blockstore_db::{default_num_compaction_threads, default_num_flush_threads},
rocksdb::{DBCompressionType as RocksCompressionType, DBRecoveryMode},
std::num::NonZeroUsize,
};

/// The subdirectory under ledger directory where the Blockstore lives
pub const BLOCKSTORE_DIRECTORY_ROCKS_LEVEL: &str = "rocksdb";
Expand All @@ -13,6 +17,8 @@ pub struct BlockstoreOptions {
// desired open file descriptor limit cannot be configured. Default: true.
pub enforce_ulimit_nofile: bool,
pub column_options: LedgerColumnOptions,
pub num_rocksdb_compaction_threads: NonZeroUsize,
pub num_rocksdb_flush_threads: NonZeroUsize,
}

impl Default for BlockstoreOptions {
Expand All @@ -25,6 +31,8 @@ impl Default for BlockstoreOptions {
recovery_mode: None,
enforce_ulimit_nofile: true,
column_options: LedgerColumnOptions::default(),
num_rocksdb_compaction_threads: default_num_compaction_threads(),
num_rocksdb_flush_threads: default_num_flush_threads(),
}
}
}
Expand All @@ -36,6 +44,8 @@ impl BlockstoreOptions {
recovery_mode: None,
enforce_ulimit_nofile: false,
column_options: LedgerColumnOptions::default(),
num_rocksdb_compaction_threads: default_num_compaction_threads(),
num_rocksdb_flush_threads: default_num_flush_threads(),
}
}
}
Expand Down
40 changes: 40 additions & 0 deletions validator/src/cli/thread_args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub struct DefaultThreadArgs {
pub rayon_global_threads: String,
pub replay_forks_threads: String,
pub replay_transactions_threads: String,
pub rocksdb_compaction_threads: String,
pub rocksdb_flush_threads: String,
pub tvu_receive_threads: String,
pub tvu_sigverify_threads: String,
}
Expand All @@ -36,6 +38,8 @@ impl Default for DefaultThreadArgs {
replay_forks_threads: ReplayForksThreadsArg::bounded_default().to_string(),
replay_transactions_threads: ReplayTransactionsThreadsArg::bounded_default()
.to_string(),
rocksdb_compaction_threads: RocksdbCompactionThreadsArg::bounded_default().to_string(),
rocksdb_flush_threads: RocksdbFlushThreadsArg::bounded_default().to_string(),
tvu_receive_threads: TvuReceiveThreadsArg::bounded_default().to_string(),
tvu_sigverify_threads: TvuShredSigverifyThreadsArg::bounded_default().to_string(),
}
Expand All @@ -52,6 +56,8 @@ pub fn thread_args<'a>(defaults: &DefaultThreadArgs) -> Vec<Arg<'_, 'a>> {
new_thread_arg::<RayonGlobalThreadsArg>(&defaults.rayon_global_threads),
new_thread_arg::<ReplayForksThreadsArg>(&defaults.replay_forks_threads),
new_thread_arg::<ReplayTransactionsThreadsArg>(&defaults.replay_transactions_threads),
new_thread_arg::<RocksdbCompactionThreadsArg>(&defaults.rocksdb_compaction_threads),
new_thread_arg::<RocksdbFlushThreadsArg>(&defaults.rocksdb_flush_threads),
new_thread_arg::<TvuReceiveThreadsArg>(&defaults.tvu_receive_threads),
new_thread_arg::<TvuShredSigverifyThreadsArg>(&defaults.tvu_sigverify_threads),
]
Expand All @@ -77,6 +83,8 @@ pub struct NumThreadConfig {
pub rayon_global_threads: NonZeroUsize,
pub replay_forks_threads: NonZeroUsize,
pub replay_transactions_threads: NonZeroUsize,
pub rocksdb_compaction_threads: NonZeroUsize,
pub rocksdb_flush_threads: NonZeroUsize,
pub tvu_receive_threads: NonZeroUsize,
pub tvu_sigverify_threads: NonZeroUsize,
}
Expand Down Expand Up @@ -119,6 +127,16 @@ pub fn parse_num_threads_args(matches: &ArgMatches) -> NumThreadConfig {
ReplayTransactionsThreadsArg::NAME,
NonZeroUsize
),
rocksdb_compaction_threads: value_t_or_exit!(
matches,
RocksdbCompactionThreadsArg::NAME,
NonZeroUsize
),
rocksdb_flush_threads: value_t_or_exit!(
matches,
RocksdbFlushThreadsArg::NAME,
NonZeroUsize
),
tvu_receive_threads: value_t_or_exit!(matches, TvuReceiveThreadsArg::NAME, NonZeroUsize),
tvu_sigverify_threads: value_t_or_exit!(
matches,
Expand Down Expand Up @@ -257,6 +275,28 @@ impl ThreadArg for ReplayTransactionsThreadsArg {
}
}

struct RocksdbCompactionThreadsArg;
impl ThreadArg for RocksdbCompactionThreadsArg {
const NAME: &'static str = "rocksdb_compaction_threads";
const LONG_NAME: &'static str = "rocksdb-compaction-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) compactions";

fn default() -> usize {
solana_ledger::blockstore::default_num_compaction_threads().get()
}
}

struct RocksdbFlushThreadsArg;
impl ThreadArg for RocksdbFlushThreadsArg {
const NAME: &'static str = "rocksdb_flush_threads";
const LONG_NAME: &'static str = "rocksdb-flush-threads";
const HELP: &'static str = "Number of threads to use for rocksdb (Blockstore) memtable flushes";

fn default() -> usize {
solana_ledger::blockstore::default_num_flush_threads().get()
}
}

struct TvuReceiveThreadsArg;
impl ThreadArg for TvuReceiveThreadsArg {
const NAME: &'static str = "tvu_receive_threads";
Expand Down
4 changes: 4 additions & 0 deletions validator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,8 @@ pub fn main() {
rayon_global_threads,
replay_forks_threads,
replay_transactions_threads,
rocksdb_compaction_threads,
rocksdb_flush_threads,
tvu_receive_threads,
tvu_sigverify_threads,
} = cli::thread_args::parse_num_threads_args(&matches);
Expand Down Expand Up @@ -1055,6 +1057,8 @@ pub fn main() {
enforce_ulimit_nofile: true,
// The validator needs primary (read/write)
access_type: AccessType::Primary,
num_rocksdb_compaction_threads: rocksdb_compaction_threads,
num_rocksdb_flush_threads: rocksdb_flush_threads,
};

let accounts_hash_cache_path = matches
Expand Down

0 comments on commit 62a73f8

Please sign in to comment.