Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

v1.16: Re-enable periodic compaction on several columns (backport of #32548) #32565

Merged
merged 1 commit into from
Jul 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 93 additions & 30 deletions ledger/src/blockstore_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use {
},
solana_storage_proto::convert::generated,
std::{
collections::{HashMap, HashSet},
collections::HashMap,
ffi::{CStr, CString},
fs,
marker::PhantomData,
Expand All @@ -51,6 +51,14 @@ const BLOCKSTORE_METRICS_ERROR: i64 = -1;
const MAX_WRITE_BUFFER_SIZE: u64 = 256 * 1024 * 1024; // 256MB
const FIFO_WRITE_BUFFER_SIZE: u64 = 2 * MAX_WRITE_BUFFER_SIZE;

// SST files older than this value will be picked up for compaction. This value
// was chosen to be one day to strike a balance between storage getting
// reclaimed in a timely manner and the additional I/O that compaction incurs.
// For more details on this property, see
// https://github.com/facebook/rocksdb/blob/749b179c041347d150fa6721992ae8398b7d2b39/
// include/rocksdb/advanced_options.h#L908C30-L908C30
const PERIODIC_COMPACTION_SECONDS: u64 = 60 * 60 * 24;

// Column family for metadata about a leader slot
const META_CF: &str = "meta";
// Column family for slots that have been marked as dead
Expand Down Expand Up @@ -361,9 +369,6 @@ impl Rocks {
fs::create_dir_all(path)?;

// Use default database options
if should_disable_auto_compactions(&access_type) {
info!("Disabling rocksdb's automatic compactions...");
}
let mut db_options = get_db_options(&access_type);
if let Some(recovery_mode) = recovery_mode {
db_options.set_wal_recovery_mode(recovery_mode.into());
Expand Down Expand Up @@ -407,6 +412,7 @@ impl Rocks {
}
}
};
db.configure_compaction();

Ok(db)
}
Expand Down Expand Up @@ -470,6 +476,53 @@ impl Rocks {
]
}

// Configure compaction on a per-column basis
fn configure_compaction(&self) {
// If compactions are disabled altogether, no need to tune values
if should_disable_auto_compactions(&self.access_type) {
info!(
"Rocks's automatic compactions are disabled due to {:?} access",
self.access_type
);
return;
}

// Some columns make use of rocksdb's compaction to help in cleaning
// the database. See comments in should_enable_cf_compaction() for more
// details on why some columns need compaction and why others do not.
//
// More specifically, periodic (automatic) compaction is used as
// opposed to manual compaction requests on a range.
// - Periodic compaction operates on individual files once the file
// has reached a certain (configurable) age. See comments at
// PERIODIC_COMPACTION_SECONDS for some more deatil.
// - Manual compaction operates on a range and could end up propagating
// through several files and/or levels of the db.
//
// Given that data is inserted into the db at a somewhat steady rate,
// the age of the individual files will be fairly evently distributed
// over time as well. Thus, the I/O to perform cleanup with periodic
// compaction is also evenly distributed over time. On the other hand,
// a manual compaction spanning a large numbers of files could cause
// a sudden burst in I/O. Such a burst could potentially cause a write
// stall in addition to negatively impacting other parts of the system.
// Thus, the choice to use periodic compactions is fairly easy.
for cf_name in Self::columns() {
if should_enable_cf_compaction(cf_name) {
let cf_handle = self.cf_handle(cf_name);
self.db
.set_options_cf(
&cf_handle,
&[(
"periodic_compaction_seconds",
&PERIODIC_COMPACTION_SECONDS.to_string(),
)],
)
.unwrap();
}
}
}

fn destroy(path: &Path) -> Result<()> {
DB::destroy(&Options::default(), path)?;

Expand Down Expand Up @@ -1609,7 +1662,9 @@ impl<'a> WriteBatch<'a> {
}
}

/// A CompactionFilter implementation to remove keys older than a given slot.
struct PurgedSlotFilter<C: Column + ColumnName> {
/// The oldest slot to keep; any slot < oldest_slot will be removed
oldest_slot: Slot,
name: CString,
_phantom: PhantomData<C>,
Expand All @@ -1620,8 +1675,6 @@ impl<C: Column + ColumnName> CompactionFilter for PurgedSlotFilter<C> {
use rocksdb::CompactionDecision::*;

let slot_in_key = C::slot(C::index(key));
// Refer to a comment about periodic_compaction_seconds, especially regarding implicit
// periodic execution of compaction_filters
if slot_in_key >= self.oldest_slot {
Keep
} else {
Expand Down Expand Up @@ -1692,7 +1745,7 @@ fn get_cf_options<C: 'static + Column + ColumnName>(
cf_options.set_disable_auto_compactions(true);
}

if !disable_auto_compactions && !should_exclude_from_compaction(C::NAME) {
if !disable_auto_compactions && should_enable_cf_compaction(C::NAME) {
cf_options.set_compaction_filter_factory(PurgedSlotFilterFactory::<C> {
oldest_slot: oldest_slot.clone(),
name: CString::new(format!("purged_slot_filter_factory({})", C::NAME)).unwrap(),
Expand Down Expand Up @@ -1836,25 +1889,36 @@ fn get_db_options(access_type: &AccessType) -> Options {
options
}

// Returns whether automatic compactions should be disabled based upon access type
// Returns whether automatic compactions should be disabled for the entire
// database based upon the given access type.
fn should_disable_auto_compactions(access_type: &AccessType) -> bool {
// Leave automatic compactions enabled (do not disable) in Primary mode;
// disable in all other modes to prevent accidental cleaning
!matches!(access_type, AccessType::Primary)
}

// Returns whether the supplied column (name) should be excluded from compaction
fn should_exclude_from_compaction(cf_name: &str) -> bool {
// List of column families to be excluded from compactions
let no_compaction_cfs: HashSet<&'static str> = vec![
columns::TransactionStatusIndex::NAME,
columns::ProgramCosts::NAME,
columns::TransactionMemos::NAME,
]
.into_iter()
.collect();

no_compaction_cfs.get(cf_name).is_some()
// Returns whether compactions should be enabled for the given column (name).
fn should_enable_cf_compaction(cf_name: &str) -> bool {
// In order to keep the ledger storage footprint within a desired size,
// LedgerCleanupService removes data in FIFO order by slot.
//
// Several columns do not contain slot in their key. These columns must
// be manually managed to avoid unbounded storage growth.
//
// Columns where slot is the primary index can be efficiently cleaned via
// Database::delete_range_cf() && Database::delete_file_in_range_cf().
//
// Columns where a slot is part of the key but not the primary index can
// not be range deleted like above. Instead, the individual key/value pairs
// must be iterated over and a decision to keep or discard that pair is
// made. The comparison logic is implemented in PurgedSlotFilter which is
// configured to run as part of rocksdb's automatic compactions. Storage
// space is reclaimed on this class of columns once compaction has
// completed on a given range or file.
matches!(
cf_name,
columns::TransactionStatus::NAME | columns::AddressSignatures::NAME
)
}

// Returns true if the column family enables compression.
Expand Down Expand Up @@ -1937,15 +2001,14 @@ pub mod tests {
}

#[test]
fn test_should_exclude_from_compaction() {
// currently there are three CFs excluded from compaction:
assert!(should_exclude_from_compaction(
columns::TransactionStatusIndex::NAME
));
assert!(should_exclude_from_compaction(columns::ProgramCosts::NAME));
assert!(should_exclude_from_compaction(
columns::TransactionMemos::NAME
));
assert!(!should_exclude_from_compaction("something else"));
fn test_should_enable_cf_compaction() {
let columns_to_compact = vec![
columns::TransactionStatus::NAME,
columns::AddressSignatures::NAME,
];
columns_to_compact.iter().for_each(|cf_name| {
assert!(should_enable_cf_compaction(cf_name));
});
assert!(!should_enable_cf_compaction("something else"));
}
}