Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
Adds LoadedPrograms::next_cooperative_loading_task() and LoadedProgra…
Browse files Browse the repository at this point in the history
…ms::cooperative_loading_task_complete().
  • Loading branch information
Lichtso committed Oct 17, 2023
1 parent 56f542c commit 48bbe03
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 40 deletions.
21 changes: 21 additions & 0 deletions program-runtime/src/loaded_programs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,27 @@ impl LoadedPrograms {
ExtractedPrograms { loaded }
}

/// In cooperative loading a TX batch calls this to load the next task
pub fn next_cooperative_loading_task(&mut self) -> Option<(Pubkey, Arc<LoadedProgram>, bool)> {
self.cooperative_loading_queue.pop_front()
}

/// Upon finishing a task in cooperative loading a TX batch calls this to store the result
pub fn cooperative_loading_task_complete(
&mut self,
finished_task: (Pubkey, Arc<LoadedProgram>, Arc<LoadedProgram>),
) {
let (key, loading, loaded) = finished_task;
self.replace_program(key, &loading, loaded.clone());
if let LoadedProgramType::Loading(waiting_list) = &loading.program {
for waiting in waiting_list.lock().unwrap().iter() {
waiting.lock().unwrap().assign_program(key, loaded.clone());
}
} else {
debug_assert!(false);
}
}

pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) {
tx_batch_cache.entries.iter().for_each(|(key, entry)| {
self.assign_program(*key, entry.clone());
Expand Down
68 changes: 28 additions & 40 deletions runtime/src/bank.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ use {
sync::{
atomic::{
AtomicBool, AtomicI64, AtomicU64, AtomicUsize,
Ordering::{self, AcqRel, Acquire, Relaxed},
Ordering::{AcqRel, Acquire, Relaxed},
},
Arc, LockResult, RwLock, RwLockReadGuard, RwLockWriteGuard,
},
Expand Down Expand Up @@ -277,7 +277,6 @@ pub struct BankRc {

#[cfg(RUSTC_WITH_SPECIALIZATION)]
use solana_frozen_abi::abi_example::AbiExample;
use solana_program_runtime::loaded_programs::ExtractedPrograms;

#[cfg(RUSTC_WITH_SPECIALIZATION)]
impl AbiExample for BankRc {
Expand Down Expand Up @@ -5042,50 +5041,39 @@ impl Bank {
.collect()
};

let ExtractedPrograms {
loaded: mut loaded_programs_for_txs,
missing,
unloaded,
} = {
let loaded_programs_for_txs = {
// Lock the global cache to figure out which programs need to be loaded
let loaded_programs_cache = self.loaded_programs_cache.read().unwrap();
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
loaded_programs_cache.extract(self, programs_and_slots.into_iter())
};

// Load missing programs while global cache is unlocked
let missing_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = missing
.iter()
.map(|(key, count)| {
let program = self.load_program(key, false);
program.tx_usage_counter.store(*count, Ordering::Relaxed);
(*key, program)
})
.collect();

// Reload unloaded programs while global cache is unlocked
let unloaded_programs: Vec<(Pubkey, Arc<LoadedProgram>)> = unloaded
.iter()
.map(|(key, count)| {
let program = self.load_program(key, true);
program.tx_usage_counter.store(*count, Ordering::Relaxed);
(*key, program)
})
.collect();

// Lock the global cache again to replenish the missing programs
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
for (key, program) in missing_programs {
let entry = loaded_programs_cache.assign_program(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.assign_program(key, entry);
}
for (key, program) in unloaded_programs {
let entry = loaded_programs_cache.assign_program(key, program);
// Use the returned entry as that might have been deduplicated globally
loaded_programs_for_txs.assign_program(key, entry);
// Cooperative loading phase
let mut finished_task = None;
loop {
// Critical section for global coordination
let (key, loading, reload) = {
let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap();
if let Some(finished_task) = finished_task.take() {
loaded_programs_cache.cooperative_loading_task_complete(finished_task);
}
if Arc::strong_count(&loaded_programs_for_txs) == 1 {
// All the missing entries for this batch have been loaded
break;
}
if let Some(task) = loaded_programs_cache.next_cooperative_loading_task() {
task
} else {
// Waiting for some other TX batch to complete loading the programs needed by this TX batch
continue;
}
};
// Load, verify and compile the program outside of the critical section
let loaded = self.load_program(&key, reload);
finished_task = Some((key, loading, loaded));
}

loaded_programs_for_txs
// When we get here we should be the only remaining owner
std::sync::Mutex::into_inner(Arc::into_inner(loaded_programs_for_txs).unwrap()).unwrap()
}

#[allow(clippy::type_complexity)]
Expand Down

0 comments on commit 48bbe03

Please sign in to comment.