Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
More conservative purge_zero_lamport_accounts purge logic
Browse files Browse the repository at this point in the history
  • Loading branch information
sakridge committed Nov 27, 2019
1 parent d7a8278 commit bd56b48
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 29 deletions.
201 changes: 177 additions & 24 deletions runtime/src/accounts_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,9 @@ impl AccountsDB {
false
}

// Purge zero lamport accounts for garbage collection purposes
// Only remove those accounts where the entire rooted history of the account
// can be purged because there are no live append vecs in the ancestors
pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap<u64, usize>) {
self.report_store_stats();
let accounts_index = self.accounts_index.read().unwrap();
Expand All @@ -573,12 +576,59 @@ impl AccountsDB {
let mut reclaims = Vec::new();
let mut accounts_index = self.accounts_index.write().unwrap();
for purge in &purges {
reclaims.extend(accounts_index.purge(purge));
reclaims.extend(accounts_index.would_purge(purge));
}
let last_root = accounts_index.last_root;

// Calculate store counts as if everything was purged
// Then purge if we can
let mut store_counts: HashMap<AppendVecId, usize> = HashMap::new();
let storage = self.storage.read().unwrap();
for (slot_id, account_info) in &reclaims {
if let Some(slot_storage) = storage.0.get(&slot_id) {
if let Some(store) = slot_storage.get(&account_info.id) {
if let Some(store_count) = store_counts.get_mut(&account_info.id) {
*store_count -= 1;
} else {
store_counts
.insert(account_info.id, store.count_and_status.read().unwrap().0);
}
}
}
}

// Only keep purges where the entire history of the account in the root set
// can be purged. All AppendVecs for those updates are dead.
purges.retain(|p| {
let account_infos = accounts_index.would_purge(p);
for (_slot_id, account_info) in account_infos {
if *store_counts.get(&account_info.id).unwrap() != 0 {
return false;
}
}
true
});

// Recalculate reclaims with new purge set
let mut reclaims = Vec::new();
for purge in &purges {
reclaims.extend(accounts_index.purge(purge));
}

drop(accounts_index);

self.handle_reclaims(&reclaims, last_root);
}

fn handle_reclaims(&self, reclaims: &Vec<(Slot, AccountInfo)>, last_root: Slot) {
let mut dead_slots = self.remove_dead_accounts(reclaims);
self.cleanup_dead_slots(&mut dead_slots, last_root);

let mut purge_slots = Measure::start("store::purge_slots");
for slot in dead_slots {
self.purge_slot(slot);
}
purge_slots.stop();
}

pub fn scan_accounts<F, A>(&self, ancestors: &HashMap<Slot, usize>, scan_func: F) -> A
Expand Down Expand Up @@ -938,19 +988,19 @@ impl AccountsDB {
(reclaims, last_root)
}

fn remove_dead_accounts(&self, reclaims: Vec<(Slot, AccountInfo)>) -> HashSet<Slot> {
fn remove_dead_accounts(&self, reclaims: &Vec<(Slot, AccountInfo)>) -> HashSet<Slot> {
let storage = self.storage.read().unwrap();
let mut dead_slots = HashSet::new();
for (slot_id, account_info) in reclaims {
if let Some(slot_storage) = storage.0.get(&slot_id) {
if let Some(slot_storage) = storage.0.get(slot_id) {
if let Some(store) = slot_storage.get(&account_info.id) {
assert_eq!(
slot_id, store.slot_id,
*slot_id, store.slot_id,
"AccountDB::accounts_index corrupted. Storage should only point to one slot"
);
let count = store.remove_account();
if count == 0 {
dead_slots.insert(slot_id);
dead_slots.insert(*slot_id);
}
}
}
Expand Down Expand Up @@ -1036,21 +1086,7 @@ impl AccountsDB {
update_index.stop();
trace!("reclaim: {}", reclaims.len());

let mut remove_dead_accounts = Measure::start("store::remove_dead");
let mut dead_slots = self.remove_dead_accounts(reclaims);
remove_dead_accounts.stop();
trace!("dead_slots: {}", dead_slots.len());

let mut cleanup_dead_slots = Measure::start("store::cleanup_dead_slots");
self.cleanup_dead_slots(&mut dead_slots, last_root);
cleanup_dead_slots.stop();
trace!("purge_slots: {}", dead_slots.len());

let mut purge_slots = Measure::start("store::purge_slots");
for slot in dead_slots {
self.purge_slot(slot);
}
purge_slots.stop();
self.handle_reclaims(&reclaims, last_root);
}

pub fn add_root(&self, slot: Slot) {
Expand Down Expand Up @@ -1623,12 +1659,33 @@ pub mod tests {
assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1)));
}

fn print_index(label: &'static str, accounts: &AccountsDB) {
info!(
"{}: accounts.accounts_index roots: {:?}",
label,
accounts.accounts_index.read().unwrap().roots
);
for (pubkey, list) in &accounts.accounts_index.read().unwrap().account_maps {
info!(" key: {}", pubkey);
info!(" slots: {:?}", *list.read().unwrap());
}
}

fn print_count_and_status(label: &'static str, accounts: &AccountsDB) {
for (_slot, slot_stores) in &accounts.storage.read().unwrap().0 {
for (id, entry) in slot_stores {
let storage = accounts.storage.read().unwrap();
let mut slots: Vec<_> = storage.0.keys().cloned().collect();
slots.sort();
info!("{}: count_and status for {} slots:", label, slots.len());
for slot in &slots {
let slot_stores = storage.0.get(slot).unwrap();

let mut ids: Vec<_> = slot_stores.keys().cloned().collect();
ids.sort();
for id in &ids {
let entry = slot_stores.get(id).unwrap();
info!(
"{}: {} count_and_status: {:?}",
label,
" slot: {} id: {} count_and_status: {:?}",
slot,
id,
*entry.count_and_status.read().unwrap()
);
Expand Down Expand Up @@ -1735,6 +1792,102 @@ pub mod tests {
assert!(check_storage(&daccounts, 2, 31));
}

fn assert_load_account(
accounts: &AccountsDB,
slot: Slot,
pubkey: Pubkey,
expected_lamports: u64,
) {
let ancestors = vec![(slot, 0)].into_iter().collect();
let (account, slot) = accounts.load_slow(&ancestors, &pubkey).unwrap();
assert_eq!((account.lamports, slot), (expected_lamports, slot));
}

fn reconstruct_accounts_db_via_serialization(accounts: AccountsDB, slot: Slot) -> AccountsDB {
let mut writer = Cursor::new(vec![]);
serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, slot)).unwrap();

let buf = writer.into_inner();
let mut reader = BufReader::new(&buf[..]);
let daccounts = AccountsDB::new(None);

let local_paths = {
let paths = daccounts.paths.read().unwrap();
AccountsDB::format_paths(paths.to_vec())
};

let copied_accounts = TempDir::new().unwrap();
// Simulate obtaining a copy of the AppendVecs from a tarball
copy_append_vecs(&accounts, copied_accounts.path()).unwrap();
daccounts
.accounts_from_stream(&mut reader, local_paths, copied_accounts.path())
.unwrap();

print_count_and_status("daccounts", &daccounts);

daccounts
}

fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) {
let ancestors = vec![(slot as Slot, 0)].into_iter().collect();
accounts.purge_zero_lamport_accounts(&ancestors);
}

#[test]
fn test_accounts_db_serialize_zero_and_free() {
solana_logger::setup();

let some_lamport = 223;
let zero_lamport = 0;
let no_data = 0;
let owner = Account::default().owner;

let account = Account::new(some_lamport, no_data, &owner);
let pubkey = Pubkey::new_rand();
let zero_lamport_account = Account::new(zero_lamport, no_data, &owner);

let account2 = Account::new(some_lamport + 1, no_data, &owner);
let pubkey2 = Pubkey::new_rand();

let filler_account = Account::new(some_lamport, no_data, &owner);
let filler_account_pubkey = Pubkey::new_rand();

let accounts = AccountsDB::new_single();

let mut current_slot = 1;
accounts.store(current_slot, &[(&pubkey, &account)]);
accounts.add_root(current_slot);

current_slot += 1;
accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]);
accounts.store(current_slot, &[(&pubkey2, &account2)]);
for _ in 0..33000 {
accounts.store(current_slot, &[(&filler_account_pubkey, &filler_account)]);
}
accounts.add_root(current_slot);

error!("doesn't fail:");
assert_load_account(&accounts, current_slot, pubkey, zero_lamport);

//info!("accounts: {:?}", accounts);
print_index("accounts", &accounts);
print_count_and_status("accounts", &accounts);

purge_zero_lamport_accounts(&accounts, current_slot);

//info!("accounts after purge: {:?}", accounts);
print_index("accounts_post_purge:", &accounts);
print_count_and_status("accounts_post_purge:", &accounts);
let accounts = reconstruct_accounts_db_via_serialization(accounts, current_slot);

//info!("accounts reconstructed: {:?}", accounts);
print_index("reconstruct:", &accounts);
print_count_and_status("reconstruct: ", &accounts);

error!("does fail due to a reconstruction bug:");
assert_load_account(&accounts, current_slot, pubkey, zero_lamport);
}

#[test]
#[ignore]
fn test_store_account_stress() {
Expand Down
18 changes: 13 additions & 5 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,21 @@ impl<T: Clone> AccountsIndex<T> {
}
}

pub fn purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> {
let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap();
let reclaims = list
.iter()
fn get_rooted_entries(&self, list: &Vec<(Slot, T)>) -> Vec<(Slot, T)> {
list.iter()
.filter(|(slot, _)| self.is_root(*slot))
.cloned()
.collect();
.collect()
}

pub fn would_purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> {
let list = self.account_maps.get(&pubkey).unwrap().read().unwrap();
self.get_rooted_entries(&list)
}

pub fn purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> {
let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap();
let reclaims = self.get_rooted_entries(&list);
list.retain(|(slot, _)| !self.is_root(*slot));
reclaims
}
Expand Down

0 comments on commit bd56b48

Please sign in to comment.