diff --git a/runtime/src/accounts_db.rs b/runtime/src/accounts_db.rs index 4351c039fd4863..1a327f96110d41 100644 --- a/runtime/src/accounts_db.rs +++ b/runtime/src/accounts_db.rs @@ -560,6 +560,9 @@ impl AccountsDB { false } + // Purge zero lamport accounts for garbage collection purposes + // Only remove those accounts where the entire rooted history of the account + // can be purged because there are no live append vecs in the ancestors pub fn purge_zero_lamport_accounts(&self, ancestors: &HashMap) { self.report_store_stats(); let accounts_index = self.accounts_index.read().unwrap(); @@ -573,12 +576,59 @@ impl AccountsDB { let mut reclaims = Vec::new(); let mut accounts_index = self.accounts_index.write().unwrap(); for purge in &purges { - reclaims.extend(accounts_index.purge(purge)); + reclaims.extend(accounts_index.would_purge(purge)); } let last_root = accounts_index.last_root; + + // Calculate store counts as if everything was purged + // Then purge if we can + let mut store_counts: HashMap = HashMap::new(); + let storage = self.storage.read().unwrap(); + for (slot_id, account_info) in &reclaims { + if let Some(slot_storage) = storage.0.get(&slot_id) { + if let Some(store) = slot_storage.get(&account_info.id) { + if let Some(store_count) = store_counts.get_mut(&account_info.id) { + *store_count -= 1; + } else { + store_counts + .insert(account_info.id, store.count_and_status.read().unwrap().0); + } + } + } + } + + // Only keep purges where the entire history of the account in the root set + // can be purged. All AppendVecs for those updates are dead. + purges.retain(|p| { + let account_infos = accounts_index.would_purge(p); + for (_slot_id, account_info) in account_infos { + if *store_counts.get(&account_info.id).unwrap() != 0 { + return false; + } + } + true + }); + + // Recalculate reclaims with new purge set + let mut reclaims = Vec::new(); + for purge in &purges { + reclaims.extend(accounts_index.purge(purge)); + } + drop(accounts_index); + + self.handle_reclaims(&reclaims, last_root); + } + + fn handle_reclaims(&self, reclaims: &Vec<(Slot, AccountInfo)>, last_root: Slot) { let mut dead_slots = self.remove_dead_accounts(reclaims); self.cleanup_dead_slots(&mut dead_slots, last_root); + + let mut purge_slots = Measure::start("store::purge_slots"); + for slot in dead_slots { + self.purge_slot(slot); + } + purge_slots.stop(); } pub fn scan_accounts(&self, ancestors: &HashMap, scan_func: F) -> A @@ -938,19 +988,19 @@ impl AccountsDB { (reclaims, last_root) } - fn remove_dead_accounts(&self, reclaims: Vec<(Slot, AccountInfo)>) -> HashSet { + fn remove_dead_accounts(&self, reclaims: &Vec<(Slot, AccountInfo)>) -> HashSet { let storage = self.storage.read().unwrap(); let mut dead_slots = HashSet::new(); for (slot_id, account_info) in reclaims { - if let Some(slot_storage) = storage.0.get(&slot_id) { + if let Some(slot_storage) = storage.0.get(slot_id) { if let Some(store) = slot_storage.get(&account_info.id) { assert_eq!( - slot_id, store.slot_id, + *slot_id, store.slot_id, "AccountDB::accounts_index corrupted. Storage should only point to one slot" ); let count = store.remove_account(); if count == 0 { - dead_slots.insert(slot_id); + dead_slots.insert(*slot_id); } } } @@ -1036,21 +1086,7 @@ impl AccountsDB { update_index.stop(); trace!("reclaim: {}", reclaims.len()); - let mut remove_dead_accounts = Measure::start("store::remove_dead"); - let mut dead_slots = self.remove_dead_accounts(reclaims); - remove_dead_accounts.stop(); - trace!("dead_slots: {}", dead_slots.len()); - - let mut cleanup_dead_slots = Measure::start("store::cleanup_dead_slots"); - self.cleanup_dead_slots(&mut dead_slots, last_root); - cleanup_dead_slots.stop(); - trace!("purge_slots: {}", dead_slots.len()); - - let mut purge_slots = Measure::start("store::purge_slots"); - for slot in dead_slots { - self.purge_slot(slot); - } - purge_slots.stop(); + self.handle_reclaims(&reclaims, last_root); } pub fn add_root(&self, slot: Slot) { @@ -1623,12 +1659,33 @@ pub mod tests { assert_eq!(accounts.load_slow(&ancestors, &pubkey), Some((account, 1))); } + fn print_index(label: &'static str, accounts: &AccountsDB) { + info!( + "{}: accounts.accounts_index roots: {:?}", + label, + accounts.accounts_index.read().unwrap().roots + ); + for (pubkey, list) in &accounts.accounts_index.read().unwrap().account_maps { + info!(" key: {}", pubkey); + info!(" slots: {:?}", *list.read().unwrap()); + } + } + fn print_count_and_status(label: &'static str, accounts: &AccountsDB) { - for (_slot, slot_stores) in &accounts.storage.read().unwrap().0 { - for (id, entry) in slot_stores { + let storage = accounts.storage.read().unwrap(); + let mut slots: Vec<_> = storage.0.keys().cloned().collect(); + slots.sort(); + info!("{}: count_and status for {} slots:", label, slots.len()); + for slot in &slots { + let slot_stores = storage.0.get(slot).unwrap(); + + let mut ids: Vec<_> = slot_stores.keys().cloned().collect(); + ids.sort(); + for id in &ids { + let entry = slot_stores.get(id).unwrap(); info!( - "{}: {} count_and_status: {:?}", - label, + " slot: {} id: {} count_and_status: {:?}", + slot, id, *entry.count_and_status.read().unwrap() ); @@ -1735,6 +1792,102 @@ pub mod tests { assert!(check_storage(&daccounts, 2, 31)); } + fn assert_load_account( + accounts: &AccountsDB, + slot: Slot, + pubkey: Pubkey, + expected_lamports: u64, + ) { + let ancestors = vec![(slot, 0)].into_iter().collect(); + let (account, slot) = accounts.load_slow(&ancestors, &pubkey).unwrap(); + assert_eq!((account.lamports, slot), (expected_lamports, slot)); + } + + fn reconstruct_accounts_db_via_serialization(accounts: AccountsDB, slot: Slot) -> AccountsDB { + let mut writer = Cursor::new(vec![]); + serialize_into(&mut writer, &AccountsDBSerialize::new(&accounts, slot)).unwrap(); + + let buf = writer.into_inner(); + let mut reader = BufReader::new(&buf[..]); + let daccounts = AccountsDB::new(None); + + let local_paths = { + let paths = daccounts.paths.read().unwrap(); + AccountsDB::format_paths(paths.to_vec()) + }; + + let copied_accounts = TempDir::new().unwrap(); + // Simulate obtaining a copy of the AppendVecs from a tarball + copy_append_vecs(&accounts, copied_accounts.path()).unwrap(); + daccounts + .accounts_from_stream(&mut reader, local_paths, copied_accounts.path()) + .unwrap(); + + print_count_and_status("daccounts", &daccounts); + + daccounts + } + + fn purge_zero_lamport_accounts(accounts: &AccountsDB, slot: Slot) { + let ancestors = vec![(slot as Slot, 0)].into_iter().collect(); + accounts.purge_zero_lamport_accounts(&ancestors); + } + + #[test] + fn test_accounts_db_serialize_zero_and_free() { + solana_logger::setup(); + + let some_lamport = 223; + let zero_lamport = 0; + let no_data = 0; + let owner = Account::default().owner; + + let account = Account::new(some_lamport, no_data, &owner); + let pubkey = Pubkey::new_rand(); + let zero_lamport_account = Account::new(zero_lamport, no_data, &owner); + + let account2 = Account::new(some_lamport + 1, no_data, &owner); + let pubkey2 = Pubkey::new_rand(); + + let filler_account = Account::new(some_lamport, no_data, &owner); + let filler_account_pubkey = Pubkey::new_rand(); + + let accounts = AccountsDB::new_single(); + + let mut current_slot = 1; + accounts.store(current_slot, &[(&pubkey, &account)]); + accounts.add_root(current_slot); + + current_slot += 1; + accounts.store(current_slot, &[(&pubkey, &zero_lamport_account)]); + accounts.store(current_slot, &[(&pubkey2, &account2)]); + for _ in 0..33000 { + accounts.store(current_slot, &[(&filler_account_pubkey, &filler_account)]); + } + accounts.add_root(current_slot); + + error!("doesn't fail:"); + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); + + //info!("accounts: {:?}", accounts); + print_index("accounts", &accounts); + print_count_and_status("accounts", &accounts); + + purge_zero_lamport_accounts(&accounts, current_slot); + + //info!("accounts after purge: {:?}", accounts); + print_index("accounts_post_purge:", &accounts); + print_count_and_status("accounts_post_purge:", &accounts); + let accounts = reconstruct_accounts_db_via_serialization(accounts, current_slot); + + //info!("accounts reconstructed: {:?}", accounts); + print_index("reconstruct:", &accounts); + print_count_and_status("reconstruct: ", &accounts); + + error!("does fail due to a reconstruction bug:"); + assert_load_account(&accounts, current_slot, pubkey, zero_lamport); + } + #[test] #[ignore] fn test_store_account_stress() { diff --git a/runtime/src/accounts_index.rs b/runtime/src/accounts_index.rs index 0939f3581b057a..bd4f8aec2d9b7e 100644 --- a/runtime/src/accounts_index.rs +++ b/runtime/src/accounts_index.rs @@ -29,13 +29,21 @@ impl AccountsIndex { } } - pub fn purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> { - let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap(); - let reclaims = list - .iter() + fn get_rooted_entries(&self, list: &Vec<(Slot, T)>) -> Vec<(Slot, T)> { + list.iter() .filter(|(slot, _)| self.is_root(*slot)) .cloned() - .collect(); + .collect() + } + + pub fn would_purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> { + let list = self.account_maps.get(&pubkey).unwrap().read().unwrap(); + self.get_rooted_entries(&list) + } + + pub fn purge(&mut self, pubkey: &Pubkey) -> Vec<(Slot, T)> { + let mut list = self.account_maps.get(&pubkey).unwrap().write().unwrap(); + let reclaims = self.get_rooted_entries(&list); list.retain(|(slot, _)| !self.is_root(*slot)); reclaims }