Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
delay trying to flush cached upserts until far future (#26908)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffwashington authored Aug 31, 2022
1 parent 8bb039d commit 8c1e193
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 29 deletions.
8 changes: 4 additions & 4 deletions runtime/src/accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,16 +232,16 @@ pub struct AccountMapEntryMeta {
}

impl AccountMapEntryMeta {
pub fn new_dirty<T: IndexValue>(storage: &Arc<BucketMapHolder<T>>) -> Self {
pub fn new_dirty<T: IndexValue>(storage: &Arc<BucketMapHolder<T>>, is_cached: bool) -> Self {
AccountMapEntryMeta {
dirty: AtomicBool::new(true),
age: AtomicU8::new(storage.future_age_to_flush()),
age: AtomicU8::new(storage.future_age_to_flush(is_cached)),
}
}
pub fn new_clean<T: IndexValue>(storage: &Arc<BucketMapHolder<T>>) -> Self {
AccountMapEntryMeta {
dirty: AtomicBool::new(false),
age: AtomicU8::new(storage.future_age_to_flush()),
age: AtomicU8::new(storage.future_age_to_flush(false)),
}
}
}
Expand Down Expand Up @@ -414,7 +414,7 @@ impl<T: IndexValue> PreAllocatedAccountMapEntry<T> {
) -> AccountMapEntry<T> {
let is_cached = account_info.is_cached();
let ref_count = if is_cached { 0 } else { 1 };
let meta = AccountMapEntryMeta::new_dirty(storage);
let meta = AccountMapEntryMeta::new_dirty(storage, is_cached);
Arc::new(AccountMapEntryInner::new(
vec![(slot, account_info)],
ref_count,
Expand Down
47 changes: 45 additions & 2 deletions runtime/src/bucket_map_holder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,18 @@ pub struct BucketMapHolder<T: IndexValue> {
pub disk: Option<BucketMap<(Slot, T)>>,

pub count_buckets_flushed: AtomicUsize,

/// These three ages are individual atomics because their values are read many times from code during runtime.
/// Instead of accessing the single age and doing math each time, each value is incremented each time the age occurs, which is ~400ms.
/// Callers can ask for the precomputed value they already want.
/// rolling 'current' age
pub age: AtomicU8,
/// rolling age that is 'ages_to_stay_in_cache' + 'age'
pub future_age_to_flush: AtomicU8,
/// rolling age that is effectively 'age' - 1
/// these items are expected to be flushed from the accounts write cache or otherwise modified before this age occurs
pub future_age_to_flush_cached: AtomicU8,

pub stats: BucketMapHolderStats,

age_timer: AtomicInterval,
Expand Down Expand Up @@ -79,6 +89,9 @@ impl<T: IndexValue> BucketMapHolder<T> {
// fetch_add is defined to wrap.
// That's what we want. 0..255, then back to 0.
self.age.fetch_add(1, Ordering::Release);
self.future_age_to_flush.fetch_add(1, Ordering::Release);
self.future_age_to_flush_cached
.fetch_add(1, Ordering::Release);
assert!(
previous >= self.bins,
"previous: {}, bins: {}",
Expand All @@ -88,8 +101,13 @@ impl<T: IndexValue> BucketMapHolder<T> {
self.wait_dirty_or_aged.notify_all(); // notify all because we can age scan in parallel
}

pub fn future_age_to_flush(&self) -> Age {
self.current_age().wrapping_add(self.ages_to_stay_in_cache)
pub fn future_age_to_flush(&self, is_cached: bool) -> Age {
if is_cached {
&self.future_age_to_flush_cached
} else {
&self.future_age_to_flush
}
.load(Ordering::Acquire)
}

fn has_age_interval_elapsed(&self) -> bool {
Expand Down Expand Up @@ -224,7 +242,12 @@ impl<T: IndexValue> BucketMapHolder<T> {
disk,
ages_to_stay_in_cache,
count_buckets_flushed: AtomicUsize::default(),
// age = 0
age: AtomicU8::default(),
// future age = age (=0) + ages_to_stay_in_cache
future_age_to_flush: AtomicU8::new(ages_to_stay_in_cache),
// effectively age (0) - 1. So, the oldest possible age from 'now'
future_age_to_flush_cached: AtomicU8::new(0_u8.wrapping_sub(1)),
stats: BucketMapHolderStats::new(bins),
wait_dirty_or_aged: Arc::default(),
next_bucket_to_flush: AtomicUsize::new(0),
Expand Down Expand Up @@ -399,6 +422,26 @@ pub mod tests {
});
}

#[test]
fn test_ages() {
solana_logger::setup();
let bins = 4;
let test = BucketMapHolder::<u64>::new(bins, &Some(AccountsIndexConfig::default()), 1);
assert_eq!(0, test.current_age());
assert_eq!(test.ages_to_stay_in_cache, test.future_age_to_flush(false));
assert_eq!(u8::MAX, test.future_age_to_flush(true));
(0..bins).for_each(|_| {
test.bucket_flushed_at_current_age(false);
});
test.increment_age();
assert_eq!(1, test.current_age());
assert_eq!(
test.ages_to_stay_in_cache + 1,
test.future_age_to_flush(false)
);
assert_eq!(0, test.future_age_to_flush(true));
}

#[test]
fn test_age_increment() {
solana_logger::setup();
Expand Down
48 changes: 25 additions & 23 deletions runtime/src/in_mem_accounts_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
fn get_only_in_mem<RT>(
&self,
pubkey: &K,
update_age: bool,
callback: impl for<'a> FnOnce(Option<&'a AccountMapEntry<T>>) -> RT,
) -> RT {
let mut found = true;
Expand All @@ -276,7 +277,9 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
m.stop();

callback(if let Some(entry) = result {
self.set_age_to_future(entry);
if update_age {
self.set_age_to_future(entry, false);
}
Some(entry)
} else {
drop(map);
Expand All @@ -302,8 +305,10 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
self.get_internal(pubkey, |entry| (true, entry.map(Arc::clone)))
}

fn set_age_to_future(&self, entry: &AccountMapEntry<T>) {
entry.set_age(self.storage.future_age_to_flush());
/// set age of 'entry' to the future
/// if 'is_cached', age will be set farther
fn set_age_to_future(&self, entry: &AccountMapEntry<T>, is_cached: bool) {
entry.set_age(self.storage.future_age_to_flush(is_cached));
}

/// lookup 'pubkey' in index (in_mem or disk).
Expand All @@ -314,7 +319,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
// return true if item should be added to in_mem cache
callback: impl for<'a> FnOnce(Option<&AccountMapEntry<T>>) -> (bool, RT),
) -> RT {
self.get_only_in_mem(pubkey, |entry| {
self.get_only_in_mem(pubkey, true, |entry| {
if let Some(entry) = entry {
callback(Some(entry)).1
} else {
Expand Down Expand Up @@ -448,16 +453,12 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
) {
let mut updated_in_mem = true;
// try to get it just from memory first using only a read lock
self.get_only_in_mem(pubkey, |entry| {
self.get_only_in_mem(pubkey, false, |entry| {
if let Some(entry) = entry {
Self::lock_and_update_slot_list(
entry,
new_value.into(),
other_slot,
reclaims,
reclaim,
);
// age is incremented by caller
let new_value: (Slot, T) = new_value.into();
let upsert_cached = new_value.1.is_cached();
Self::lock_and_update_slot_list(entry, new_value, other_slot, reclaims, reclaim);
self.set_age_to_future(entry, upsert_cached);
} else {
let mut m = Measure::start("entry");
let mut map = self.map_internal.write().unwrap();
Expand All @@ -466,15 +467,13 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
let found = matches!(entry, Entry::Occupied(_));
match entry {
Entry::Occupied(mut occupied) => {
let new_value: (Slot, T) = new_value.into();
let upsert_cached = new_value.1.is_cached();
let current = occupied.get_mut();
Self::lock_and_update_slot_list(
current,
new_value.into(),
other_slot,
reclaims,
reclaim,
current, new_value, other_slot, reclaims, reclaim,
);
self.set_age_to_future(current);
self.set_age_to_future(current, upsert_cached);
}
Entry::Vacant(vacant) => {
// not in cache, look on disk
Expand All @@ -483,14 +482,17 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
// go to in-mem cache first
let disk_entry = self.load_account_entry_from_disk(vacant.key());
let new_value = if let Some(disk_entry) = disk_entry {
let new_value: (Slot, T) = new_value.into();
let upsert_cached = new_value.1.is_cached();
// on disk, so merge new_value with what was on disk
Self::lock_and_update_slot_list(
&disk_entry,
new_value.into(),
new_value,
other_slot,
reclaims,
reclaim,
);
self.set_age_to_future(&disk_entry, upsert_cached);
disk_entry
} else {
// not on disk, so insert new thing
Expand All @@ -501,7 +503,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
vacant.insert(new_value);
self.stats().inc_mem_count(self.bin);
}
}
};

drop(map);
self.update_entry_stats(m, found);
Expand Down Expand Up @@ -872,7 +874,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
if let Some(disk) = self.bucket.as_ref() {
let mut map = self.map_internal.write().unwrap();
let items = disk.items_in_range(range); // map's lock has to be held while we are getting items from disk
let future_age = self.storage.future_age_to_flush();
let future_age = self.storage.future_age_to_flush(false);
for item in items {
let entry = map.entry(item.pubkey);
match entry {
Expand Down Expand Up @@ -1235,7 +1237,7 @@ impl<T: IndexValue> InMemAccountsIndex<T> {
}

let stop_evictions_changes_at_start = self.get_stop_evictions_changes();
let next_age_on_failure = self.storage.future_age_to_flush();
let next_age_on_failure = self.storage.future_age_to_flush(false);
if self.get_stop_evictions() {
// ranges were changed
self.move_ages_to_future(next_age_on_failure, current_age, &evictions);
Expand Down

0 comments on commit 8c1e193

Please sign in to comment.