Skip to content
This repository has been archived by the owner on Jan 22, 2025. It is now read-only.

Commit

Permalink
flush individual buckets every n ages (#30855)
Browse files Browse the repository at this point in the history
disk index flush individual buckets every n ages
  • Loading branch information
jeffwashington authored Mar 28, 2023
1 parent 42dfb85 commit aaac046
Showing 1 changed file with 166 additions and 55 deletions.
221 changes: 166 additions & 55 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,15 @@ pub struct InMemAccountsIndex<T: IndexValue, U: DiskIndexValue + From<T> + Into<

/// possible evictions for next few slots coming up
possible_evictions: RwLock<PossibleEvictions<T>>,
/// when age % ages_to_stay_in_cache == 'age_to_flush_bin_offset', then calculate the next 'ages_to_stay_in_cache' 'possible_evictions'
/// this causes us to scan the entire in-mem hash map every 1/'ages_to_stay_in_cache' instead of each age
age_to_flush_bin_mod: Age,

/// how many more ages to skip before this bucket is flushed (as opposed to being skipped).
/// When this reaches 0, this bucket is flushed.
remaining_ages_to_skip_flushing: AtomicU8,

/// an individual bucket will evict its entries and write to disk every 1/NUM_AGES_TO_DISTRIBUTE_FLUSHES ages
/// Higher numbers mean we flush less buckets/s
/// Lower numbers mean we flush more buckets/s
num_ages_to_distribute_flushes: Age,
}

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> Debug for InMemAccountsIndex<T, U> {
Expand Down Expand Up @@ -146,7 +152,7 @@ struct FlushScanResult<T> {

impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T, U> {
pub fn new(storage: &Arc<BucketMapHolder<T, U>>, bin: usize) -> Self {
let ages_to_stay_in_cache = storage.ages_to_stay_in_cache;
let num_ages_to_distribute_flushes = Age::MAX - storage.ages_to_stay_in_cache;
Self {
map_internal: RwLock::default(),
storage: Arc::clone(storage),
Expand All @@ -163,23 +169,13 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
// initialize this to max, to make it clear we have not flushed at age 0, the starting age
last_age_flushed: AtomicU8::new(Age::MAX),
startup_info: Mutex::default(),
possible_evictions: RwLock::new(PossibleEvictions::new(ages_to_stay_in_cache)),
possible_evictions: RwLock::new(PossibleEvictions::new(1)),
// Spread out the scanning across all ages within the window.
// This causes us to scan 1/N of the bins each 'Age'
age_to_flush_bin_mod: thread_rng().gen_range(0, ages_to_stay_in_cache),
}
}

/// # ages to scan ahead
fn ages_to_scan_ahead(&self, current_age: Age) -> Age {
let ages_to_stay_in_cache = self.storage.ages_to_stay_in_cache;
if (self.age_to_flush_bin_mod == current_age % ages_to_stay_in_cache)
&& !self.storage.get_startup()
{
// scan ahead multiple ages
ages_to_stay_in_cache
} else {
1 // just current age
remaining_ages_to_skip_flushing: AtomicU8::new(
thread_rng().gen_range(0, num_ages_to_distribute_flushes),
),
num_ages_to_distribute_flushes,
}
}

Expand Down Expand Up @@ -937,8 +933,9 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
current_age: Age,
entry: &AccountMapEntry<T>,
startup: bool,
ages_flushing_now: Age,
) -> bool {
startup || (current_age == entry.age())
startup || current_age.wrapping_sub(entry.age()) <= ages_flushing_now
}

/// return true if 'entry' should be evicted from the in-mem index
Expand All @@ -949,10 +946,11 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
startup: bool,
update_stats: bool,
exceeds_budget: bool,
ages_flushing_now: Age,
) -> (bool, Option<std::sync::RwLockReadGuard<'a, SlotList<T>>>) {
// this could be tunable dynamically based on memory pressure
// we could look at more ages or we could throw out more items we are choosing to keep in the cache
if Self::should_evict_based_on_age(current_age, entry, startup) {
if Self::should_evict_based_on_age(current_age, entry, startup, ages_flushing_now) {
if exceeds_budget {
// if we are already holding too many items in-mem, then we need to be more aggressive at kicking things out
(true, None)
Expand Down Expand Up @@ -981,45 +979,53 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
}
}

/// fill in `possible_evictions` from `iter` by checking age
fn gather_possible_evictions<'a>(
iter: impl Iterator<Item = (&'a Pubkey, &'a Arc<AccountMapEntryInner<T>>)>,
possible_evictions: &mut PossibleEvictions<T>,
startup: bool,
current_age: Age,
ages_flushing_now: Age,
can_randomly_flush: bool,
) {
for (k, v) in iter {
let mut random = false;
if !startup && current_age.wrapping_sub(v.age()) > ages_flushing_now {
if !can_randomly_flush || !Self::random_chance_of_eviction() {
// not planning to evict this item from memory within 'ages_flushing_now' ages
continue;
}
random = true;
}

possible_evictions.insert(0, *k, Arc::clone(v), random);
}
}

/// scan loop
/// holds read lock
/// identifies items which are dirty and items to evict
/// identifies items which are potential candidates to evict
fn flush_scan(
&self,
current_age: Age,
startup: bool,
_flush_guard: &FlushGuard,
ages_flushing_now: Age,
) -> FlushScanResult<T> {
let mut possible_evictions = self.possible_evictions.write().unwrap();
if let Some(result) = possible_evictions.get_possible_evictions() {
// we have previously calculated the possible evictions for this age
return result;
}
// otherwise, we need to scan some number of ages into the future now
let ages_to_scan = self.ages_to_scan_ahead(current_age);
possible_evictions.reset(ages_to_scan);

possible_evictions.reset(1);
let m;
{
let map = self.map_internal.read().unwrap();
m = Measure::start("flush_scan"); // we don't care about lock time in this metric - bg threads can wait
for (k, v) in map.iter() {
let random = Self::random_chance_of_eviction();
let age_offset = if random {
thread_rng().gen_range(0, ages_to_scan)
} else if startup {
0
} else {
let ages_in_future = v.age().wrapping_sub(current_age);
if ages_in_future >= ages_to_scan {
// not planning to evict this item from memory within the next few ages
continue;
}
ages_in_future
};

possible_evictions.insert(age_offset, *k, Arc::clone(v), random);
}
Self::gather_possible_evictions(
map.iter(),
&mut possible_evictions,
startup,
current_age,
ages_flushing_now,
true,
);
}
Self::update_time_stat(&self.stats().flush_scan_us, m);

Expand Down Expand Up @@ -1109,16 +1115,36 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
return;
}

if startup {
self.write_startup_info_to_disk();
}

let ages_flushing_now = if iterate_for_age && !startup {
let old_value = self
.remaining_ages_to_skip_flushing
.fetch_sub(1, Ordering::AcqRel);
if old_value == 0 {
self.remaining_ages_to_skip_flushing
.store(self.num_ages_to_distribute_flushes, Ordering::Release);
} else {
// skipping iteration of the buckets at the current age, but mark the bucket as having aged
assert_eq!(current_age, self.storage.current_age());
self.set_has_aged(current_age, can_advance_age);
return;
}
self.num_ages_to_distribute_flushes
} else {
// just 1 age to flush. 0 means age == age
0
};

Self::update_stat(&self.stats().buckets_scanned, 1);

// scan in-mem map for items that we may evict
let FlushScanResult {
mut evictions_age_possible,
mut evictions_random,
} = self.flush_scan(current_age, startup, flush_guard);

if startup {
self.write_startup_info_to_disk();
}
} = self.flush_scan(current_age, startup, flush_guard, ages_flushing_now);

// write to disk outside in-mem map read lock
{
Expand Down Expand Up @@ -1146,6 +1172,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
startup,
true,
exceeds_budget,
ages_flushing_now,
);
slot_list = slot_list_temp;
mse.stop();
Expand Down Expand Up @@ -1222,8 +1249,20 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
.collect::<Vec<_>>();

let m = Measure::start("flush_evict");
self.evict_from_cache(evictions_age, current_age, startup, false);
self.evict_from_cache(evictions_random, current_age, startup, true);
self.evict_from_cache(
evictions_age,
current_age,
startup,
false,
ages_flushing_now,
);
self.evict_from_cache(
evictions_random,
current_age,
startup,
true,
ages_flushing_now,
);
Self::update_time_stat(&self.stats().flush_evict_us, m);
}

Expand Down Expand Up @@ -1267,6 +1306,7 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,
current_age: Age,
startup: bool,
randomly_evicted: bool,
ages_flushing_now: Age,
) {
if evictions.is_empty() {
return;
Expand Down Expand Up @@ -1317,7 +1357,12 @@ impl<T: IndexValue, U: DiskIndexValue + From<T> + Into<T>> InMemAccountsIndex<T,

if v.dirty()
|| (!randomly_evicted
&& !Self::should_evict_based_on_age(current_age, v, startup))
&& !Self::should_evict_based_on_age(
current_age,
v,
startup,
ages_flushing_now,
))
{
// marked dirty or bumped in age after we looked above
// these evictions will be handled in later passes (at later ages)
Expand Down Expand Up @@ -1513,13 +1558,71 @@ mod tests {
startup,
false,
false,
1,
)
.0,
ref_count == 1
);
}
}

#[test]
fn test_gather_possible_evictions() {
solana_logger::setup();
let startup = false;
let ref_count = 1;
let pks = (0..=255)
.map(|i| Pubkey::from([i as u8; 32]))
.collect::<Vec<_>>();
let accounts = (0..=255)
.map(|age| {
let one_element_slot_list = vec![(0, 0)];
let one_element_slot_list_entry = Arc::new(AccountMapEntryInner::new(
one_element_slot_list,
ref_count,
AccountMapEntryMeta::default(),
));
one_element_slot_list_entry.set_age(age);
one_element_slot_list_entry
})
.collect::<Vec<_>>();
let both = pks.iter().zip(accounts.iter()).collect::<Vec<_>>();

for current_age in 0..=255 {
for ages_flushing_now in 0..=255 {
let mut possible_evictions = PossibleEvictions::new(1);
possible_evictions.reset(1);
InMemAccountsIndex::<u64, u64>::gather_possible_evictions(
both.iter().cloned(),
&mut possible_evictions,
startup,
current_age,
ages_flushing_now,
false, // true=can_randomly_flush
);
let evictions = possible_evictions.possible_evictions.pop().unwrap();
assert_eq!(
evictions.evictions_age_possible.len(),
1 + ages_flushing_now as usize
);
evictions.evictions_age_possible.iter().for_each(|(_k, v)| {
assert!(
InMemAccountsIndex::<u64, u64>::should_evict_based_on_age(
current_age,
v,
startup,
ages_flushing_now,
),
"current_age: {}, age: {}, ages_flushing_now: {}",
current_age,
v.age(),
ages_flushing_now
);
});
}
}
}

#[test]
fn test_should_evict_from_mem() {
solana_logger::setup();
Expand Down Expand Up @@ -1547,6 +1650,7 @@ mod tests {
startup,
false,
true,
0,
)
.0
);
Expand All @@ -1563,6 +1667,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1575,6 +1680,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1591,6 +1697,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1610,6 +1717,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1624,6 +1732,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1638,6 +1747,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand All @@ -1652,6 +1762,7 @@ mod tests {
startup,
false,
false,
0,
)
.0
);
Expand Down

0 comments on commit aaac046

Please sign in to comment.