Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

(WIP) Prototype of the tiered accounts storage #30626

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions ledger-tool/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,18 @@ use {
},
solana_measure::{measure, measure::Measure},
solana_runtime::{
account_storage::meta::StoredAccountMeta,
accounts::Accounts,
accounts_db::CalcAccountsHashDataSource,
accounts_index::ScanConfig,
accounts_background_service::{
AbsRequestHandlers, AbsRequestSender, AccountsBackgroundService,
PrunedBanksRequestHandler, SnapshotRequestHandler,
},
accounts_db::{
AccountsDb, AccountsDbConfig, CalcAccountsHashDataSource, FillerAccountsConfig,
},
accounts_index::{AccountsIndexConfig, IndexLimitMb, ScanConfig},
accounts_update_notifier_interface::AccountsUpdateNotifier,
append_vec::AppendVec,
bank::{Bank, RewardCalculationEvent, TotalAccountsStats},
bank_forks::BankForks,
hardened_unpack::MAX_GENESIS_ARCHIVE_UNPACKED_SIZE,
Expand All @@ -59,6 +68,7 @@ use {
self, ArchiveFormat, SnapshotVersion, DEFAULT_ARCHIVE_COMPRESSION,
SUPPORTED_ARCHIVE_COMPRESSION,
},
tiered_storage::{hot::HOT_FORMAT, TieredStorage},
},
solana_sdk::{
account::{AccountSharedData, ReadableAccount, WritableAccount},
Expand Down Expand Up @@ -2643,6 +2653,7 @@ fn main() {
exit_signal.store(true, Ordering::Relaxed);
system_monitor_service.join().unwrap();
}
/*
("graph", Some(arg_matches)) => {
let output_file = value_t_or_exit!(arg_matches, "graph_filename", String);
let graph_config = GraphConfig {
Expand Down Expand Up @@ -2705,6 +2716,7 @@ fn main() {
}
}
}
*/
("create-snapshot", Some(arg_matches)) => {
let is_incremental = arg_matches.is_present("incremental");
let is_minimized = arg_matches.is_present("minimized");
Expand Down
6 changes: 4 additions & 2 deletions runtime/src/account_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,8 @@ pub(crate) mod tests {
use {super::*, std::path::Path};

#[test]
/// The new cold storage will fail this test as it does not have
/// the concept of capacity yet.
fn test_shrink_in_progress() {
// test that we check in order map then shrink_in_progress_map
let storage = AccountStorage::default();
Expand All @@ -283,13 +285,13 @@ pub(crate) mod tests {
let store_file_size = 4000;
let store_file_size2 = store_file_size * 2;
// 2 append vecs with same id, but different sizes
let entry = Arc::new(AccountStorageEntry::new(
let entry = Arc::new(AccountStorageEntry::new_av(
common_store_path,
slot,
id,
store_file_size,
));
let entry2 = Arc::new(AccountStorageEntry::new(
let entry2 = Arc::new(AccountStorageEntry::new_av(
common_store_path,
slot,
id,
Expand Down
45 changes: 44 additions & 1 deletion runtime/src/account_storage/meta.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
use {
crate::{append_vec::AppendVecStoredAccountMeta, storable_accounts::StorableAccounts},
crate::{
append_vec::AppendVecStoredAccountMeta,
storable_accounts::StorableAccounts,
tiered_storage::{hot::HotAccountMeta, readable::TieredReadableAccount},
},
solana_sdk::{account::ReadableAccount, hash::Hash, pubkey::Pubkey, stake_history::Epoch},
std::{borrow::Borrow, marker::PhantomData},
};
Expand All @@ -12,6 +16,13 @@ pub struct StoredAccountInfo {
pub size: usize,
}

lazy_static! {
pub static ref DEFAULT_ACCOUNT_HASH: Hash = Hash::default();
}

pub const DEFAULT_WRITE_VERSION: StoredMetaWriteVersion = 0;
pub const DEFAULT_RENT_EPOCH: Epoch = Epoch::MAX;

/// Goal is to eliminate copies and data reshaping given various code paths that store accounts.
/// This struct contains what is needed to store accounts to a storage
/// 1. account & pubkey (StorableAccounts)
Expand Down Expand Up @@ -100,66 +111,88 @@ impl<'a: 'b, 'b, T: ReadableAccount + Sync + 'b, U: StorableAccounts<'a, T>, V:
#[derive(PartialEq, Eq, Debug)]
pub enum StoredAccountMeta<'storage> {
AppendVec(AppendVecStoredAccountMeta<'storage>),
// Cold(TieredReadableAccount<'storage, ColdAccountMeta>),
Hot(TieredReadableAccount<'storage, HotAccountMeta>),
}

impl<'storage> StoredAccountMeta<'storage> {
pub fn pubkey(&self) -> &'storage Pubkey {
match self {
Self::AppendVec(av) => av.pubkey(),
// Self::Cold(cs) => cs.pubkey(),
Self::Hot(hs) => hs.address(),
}
}

pub fn hash(&self) -> &'storage Hash {
match self {
Self::AppendVec(av) => av.hash(),
// Self::Cold(cs) => cs.hash(),
Self::Hot(hs) => hs.hash().unwrap_or(&DEFAULT_ACCOUNT_HASH),
}
}

pub fn stored_size(&self) -> usize {
match self {
Self::AppendVec(av) => av.stored_size(),
// Self::Cold(cs) => cs.stored_size(),
Self::Hot(hs) => hs.stored_size(),
}
}

pub fn offset(&self) -> usize {
match self {
Self::AppendVec(av) => av.offset(),
// Self::Cold(cs) => cs.offset(),
Self::Hot(hs) => hs.index(),
}
}

pub fn data(&self) -> &'storage [u8] {
match self {
Self::AppendVec(av) => av.data(),
// Self::Cold(cs) => cs.data(),
Self::Hot(hs) => hs.data(),
}
}

pub fn data_len(&self) -> u64 {
match self {
Self::AppendVec(av) => av.data_len(),
// Self::Cold(cs) => cs.data_len(),
Self::Hot(hs) => hs.data().len() as u64,
}
}

pub fn write_version(&self) -> StoredMetaWriteVersion {
match self {
Self::AppendVec(av) => av.write_version(),
// Self::Cold(cs) => cs.write_version(),
Self::Hot(hs) => hs.write_version().unwrap_or(DEFAULT_WRITE_VERSION),
}
}

pub fn meta(&self) -> &StoredMeta {
match self {
Self::AppendVec(av) => av.meta(),
// Self::Cold(_) => unreachable!(),
Self::Hot(_) => unreachable!(),
}
}

pub fn set_meta(&mut self, meta: &'storage StoredMeta) {
match self {
Self::AppendVec(av) => av.set_meta(meta),
// Self::Cold(_) => unreachable!(),
Self::Hot(_) => unreachable!(),
}
}

pub(crate) fn sanitize(&self) -> bool {
match self {
Self::AppendVec(av) => av.sanitize(),
// Self::Cold(_) => unimplemented!(),
Self::Hot(_) => unreachable!(),
}
}
}
Expand All @@ -168,26 +201,36 @@ impl<'storage> ReadableAccount for StoredAccountMeta<'storage> {
fn lamports(&self) -> u64 {
match self {
Self::AppendVec(av) => av.lamports(),
// Self::Cold(cs) => cs.lamports(),
Self::Hot(hs) => hs.lamports(),
}
}
fn data(&self) -> &[u8] {
match self {
Self::AppendVec(av) => av.data(),
// Self::Cold(cs) => cs.data(),
Self::Hot(hs) => hs.data(),
}
}
fn owner(&self) -> &Pubkey {
match self {
Self::AppendVec(av) => av.owner(),
// Self::Cold(cs) => cs.owner(),
Self::Hot(hs) => hs.owner(),
}
}
fn executable(&self) -> bool {
match self {
Self::AppendVec(av) => av.executable(),
// Self::Cold(cs) => cs.executable(),
Self::Hot(hs) => hs.executable(),
}
}
fn rent_epoch(&self) -> Epoch {
match self {
Self::AppendVec(av) => av.rent_epoch(),
// Self::Cold(cs) => cs.rent_epoch(),
Self::Hot(hs) => hs.rent_epoch(),
}
}
}
Expand Down
50 changes: 50 additions & 0 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ use {
snapshot_utils::create_accounts_run_and_snapshot_dirs,
sorted_storages::SortedStorages,
storable_accounts::StorableAccounts,
tiered_storage::{hot::HOT_FORMAT, TieredStorage},
verify_accounts_hash_in_background::VerifyAccountsHashInBackground,
},
blake3::traits::digest::Digest,
Expand Down Expand Up @@ -1048,6 +1049,11 @@ pub struct AccountStorageEntry {

impl AccountStorageEntry {
pub fn new(path: &Path, slot: Slot, id: AppendVecId, file_size: u64) -> Self {
// Self::new_cold(path, slot, id, file_size)
Self::new_hot(path, slot, id, file_size)
}

pub fn new_av(path: &Path, slot: Slot, id: AppendVecId, file_size: u64) -> Self {
let tail = AccountsFile::file_name(slot, id);
let path = Path::new(path).join(tail);
let accounts = AccountsFile::AppendVec(AppendVec::new(&path, true, file_size as usize));
Expand All @@ -1062,6 +1068,40 @@ impl AccountStorageEntry {
}
}

/*
pub fn new_cold(path: &Path, slot: Slot, id: AppendVecId, _file_size: u64) -> Self {
let tail = AccountsFile::file_name(slot, id);
let path = Path::new(path).join(tail);
let accounts = AccountsFile::Tiered(TieredStorage::new(&path, Some(&COLD_FORMAT)));
info!("[Cold]: Create new cold storage at path {:?}", path);

Self {
id: AtomicAppendVecId::new(id),
slot: AtomicU64::new(slot),
accounts,
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
alive_bytes: AtomicUsize::new(0),
}
}
*/

pub fn new_hot(path: &Path, slot: Slot, id: AppendVecId, _file_size: u64) -> Self {
let tail = AccountsFile::file_name(slot, id);
let path = Path::new(path).join(tail);
let accounts = AccountsFile::Tiered(TieredStorage::new(&path, Some(&HOT_FORMAT)));
info!("[Hot]: Create new hot storage at path {:?}", path);

Self {
id: AtomicAppendVecId::new(id),
slot: AtomicU64::new(slot),
accounts,
count_and_status: RwLock::new((0, AccountStorageStatus::Available)),
approx_store_count: AtomicUsize::new(0),
alive_bytes: AtomicUsize::new(0),
}
}

pub(crate) fn new_existing(
slot: Slot,
id: AppendVecId,
Expand Down Expand Up @@ -3967,6 +4007,7 @@ impl AccountsDb {
.fetch_add(time.as_us(), Ordering::Relaxed);
}

/// Perform shrink on the speficied slot if the slot is eligible for shrink.
fn do_shrink_slot_store(&self, slot: Slot, store: &Arc<AccountStorageEntry>) {
if self.accounts_cache.contains(slot) {
// It is not correct to shrink a slot while it is in the write cache until flush is complete and the slot is removed from the write cache.
Expand All @@ -3993,6 +4034,7 @@ impl AccountsDb {
self.shrink_stats
.skipped_shrink
.fetch_add(1, Ordering::Relaxed);
// comment(YH): undo the unref to the locked entry in the accounts_index.
for pubkey in shrink_collect.unrefed_pubkeys {
if let Some(locked_entry) = self.accounts_index.get_account_read_entry(pubkey) {
// pubkeys in `unrefed_pubkeys` were unref'd in `shrink_collect` above under the assumption that we would shrink everything.
Expand Down Expand Up @@ -4151,6 +4193,7 @@ impl AccountsDb {
slot: Slot,
aligned_total: u64,
) -> ShrinkInProgress<'_> {
/*
let shrunken_store = self
.try_recycle_store(slot, aligned_total, aligned_total + 1024)
.unwrap_or_else(|| {
Expand All @@ -4161,6 +4204,13 @@ impl AccountsDb {
.unwrap_or_else(|| (&self.paths, "shrink"));
self.create_store(slot, aligned_total, from, shrink_paths)
});
*/
let maybe_shrink_paths = self.shrink_paths.read().unwrap();
let (shrink_paths, from) = maybe_shrink_paths
.as_ref()
.map(|paths| (paths, "shrink-w-path"))
.unwrap_or_else(|| (&self.paths, "shrink"));
let shrunken_store = self.create_store(slot, aligned_total, from, shrink_paths);
self.storage.shrinking_in_progress(slot, shrunken_store)
}

Expand Down
Loading