From 305dcd28966fc1e205a76fc624175db08f816fd6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alexander=20Mei=C3=9Fner?= Date: Mon, 27 Nov 2023 16:42:19 +0100 Subject: [PATCH] Adds LoadedPrograms::next_cooperative_loading_task() and LoadedPrograms::cooperative_loading_task_complete(). --- program-runtime/src/loaded_programs.rs | 79 ++++++++++++++++++++++++++ runtime/src/bank.rs | 67 +++++++++++----------- 2 files changed, 113 insertions(+), 33 deletions(-) diff --git a/program-runtime/src/loaded_programs.rs b/program-runtime/src/loaded_programs.rs index 125434e40ca0f8..fc18307ded938c 100644 --- a/program-runtime/src/loaded_programs.rs +++ b/program-runtime/src/loaded_programs.rs @@ -898,6 +898,85 @@ impl LoadedPrograms { extracted } + /// In cooperative loading a TX batch calls this to receive the next task + pub fn next_cooperative_loading_task( + &mut self, + extracted: &Arc>, + ) -> Option<(Pubkey, Arc, bool)> { + // The Mutexes are strictly speaking unnecessary + // because the entire `LoadedPrograms` cache is already locked here. + let extracted = extracted.lock().unwrap(); + let (key, (entry, reload)) = + extracted.missing.iter().find(|(_key, (entry, _reload))| { + let LoadedProgramType::Loading(mutex) = &entry.program else { + debug_assert!(false); + return false; + }; + let processing = mutex.lock().unwrap().0; + !processing + })?; + let (key, entry, reload) = (*key, entry.clone(), *reload); + drop(extracted); + { + let LoadedProgramType::Loading(mutex) = &entry.program else { + debug_assert!(false); + return None; + }; + let processing = &mut mutex.lock().unwrap().0; + *processing = true; + } + Some((key, entry, reload)) + } + + /// Upon completing a task in cooperative loading a TX batch calls this to submit the result + pub fn cooperative_loading_task_complete( + &mut self, + key: Pubkey, + loading: Arc, + loaded: Arc, + ) { + let LoadedProgramType::Loading(mutex) = &loading.program else { + debug_assert!(false); + return; + }; + loaded.tx_usage_counter.store( + loading.tx_usage_counter.load(Ordering::Relaxed), + Ordering::Relaxed, + ); + let mut mutex = mutex.lock().unwrap(); + let processing = &mut mutex.0; + *processing = false; + let waiting_list_is_empty = { + let fork_graph = self + .fork_graph + .as_ref() + .expect("Program cache doesn't have fork graph."); + let fork_graph = fork_graph + .read() + .expect("Failed to lock fork graph for reading."); + let waiting_list = &mut mutex.1; + waiting_list.retain(|waiting| { + // The Mutex around `waiting` is strictly speaking unnecessary + // because the entire `LoadedPrograms` cache is already locked here. + let mut waiting = waiting.lock().unwrap(); + let relation = fork_graph.relationship(loaded.deployment_slot, waiting.loaded.slot); + if loaded.deployment_slot <= self.latest_root_slot + || matches!(relation, BlockRelation::Equal | BlockRelation::Descendant) + { + waiting.missing.remove(&key); + waiting.loaded.assign_program(key, loaded.clone()); + return false; + } + true + }); + waiting_list.is_empty() + }; + if waiting_list_is_empty { + self.remove_program(key, &loading); + } + self.assign_program(key, loaded); + } + pub fn merge(&mut self, tx_batch_cache: &LoadedProgramsForTxBatch) { tx_batch_cache.entries.iter().for_each(|(key, entry)| { self.assign_program(*key, entry.clone()); diff --git a/runtime/src/bank.rs b/runtime/src/bank.rs index 4be3ee97bcc5a3..0c1d75914577b6 100644 --- a/runtime/src/bank.rs +++ b/runtime/src/bank.rs @@ -112,8 +112,8 @@ use { compute_budget_processor::process_compute_budget_instructions, invoke_context::BuiltinFunctionWithContext, loaded_programs::{ - ExtractedPrograms, LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria, - LoadedProgramType, LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment, + LoadProgramMetrics, LoadedProgram, LoadedProgramMatchCriteria, LoadedProgramType, + LoadedPrograms, LoadedProgramsForTxBatch, ProgramRuntimeEnvironment, ProgramRuntimeEnvironments, WorkingSlot, DELAY_VISIBILITY_SLOT_OFFSET, }, log_collector::LogCollector, @@ -5016,42 +5016,43 @@ impl Bank { .collect() }; - let ExtractedPrograms { - loaded: mut loaded_programs_for_txs, - missing, - } = { + let extracted = { // Lock the global cache to figure out which programs need to be loaded let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap(); - Mutex::into_inner( - Arc::into_inner( - loaded_programs_cache.extract(self, programs_and_slots.into_iter()), - ) - .unwrap(), - ) - .unwrap() + loaded_programs_cache.extract(self, programs_and_slots.into_iter()) }; - // Load missing programs while global cache is unlocked - let missing_programs: Vec<(Pubkey, Arc)> = missing - .iter() - .map(|(key, (entry, reloading))| { - let program = self.load_program(key, *reloading, None); - program.tx_usage_counter.store( - entry.tx_usage_counter.load(Ordering::Relaxed), - Ordering::Relaxed, - ); - (*key, program) - }) - .collect(); - - // Lock the global cache again to replenish the missing programs - let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap(); - for (key, program) in missing_programs { - let entry = loaded_programs_cache.assign_program(key, program); - // Use the returned entry as that might have been deduplicated globally - loaded_programs_for_txs.assign_program(key, entry); + // Cooperative loading phase + let mut finished_task = None; + loop { + // Critical section for global coordination + let (key, loading, reload) = { + let mut loaded_programs_cache = self.loaded_programs_cache.write().unwrap(); + if let Some((key, loading, loaded)) = finished_task.take() { + loaded_programs_cache.cooperative_loading_task_complete(key, loading, loaded); + } + if Arc::strong_count(&extracted) == 1 { + // All the missing entries for this batch have been loaded + break; + } + if let Some(task) = loaded_programs_cache.next_cooperative_loading_task(&extracted) + { + task + } else { + // Waiting for some other TX batch to complete loading the programs needed by this TX batch + // TODO: Use a Condvar here + continue; + } + }; + // Load, verify and compile the program outside of the critical section + let loaded = self.load_program(&key, reload, None); + finished_task = Some((key, loading, loaded)); } - loaded_programs_for_txs + + // When we get here we should be the only remaining owner + std::sync::Mutex::into_inner(Arc::into_inner(extracted).unwrap()) + .unwrap() + .loaded } /// Returns a hash map of executable program accounts (program accounts that are not writable