diff --git a/src/base.rs b/src/base.rs index 3a4c2b8454958..44c34d6c8cb79 100644 --- a/src/base.rs +++ b/src/base.rs @@ -5,21 +5,21 @@ use rustc_index::vec::IndexVec; use rustc_middle::ty::adjustment::PointerCast; use rustc_middle::ty::layout::FnAbiOf; use rustc_middle::ty::print::with_no_trimmed_paths; -use rustc_middle::ty::SymbolName; use crate::constant::ConstantCx; use crate::debuginfo::FunctionDebugContext; use crate::prelude::*; use crate::pretty_clif::CommentWriter; -struct CodegenedFunction<'tcx> { - symbol_name: SymbolName<'tcx>, +pub(crate) struct CodegenedFunction { + symbol_name: String, func_id: FuncId, func: Function, clif_comments: CommentWriter, func_debug_cx: Option, } +#[cfg_attr(not(feature = "jit"), allow(dead_code))] pub(crate) fn codegen_and_compile_fn<'tcx>( tcx: TyCtxt<'tcx>, cx: &mut crate::CodegenCx, @@ -36,13 +36,13 @@ pub(crate) fn codegen_and_compile_fn<'tcx>( compile_fn(cx, cached_context, module, codegened_func); } -fn codegen_fn<'tcx>( +pub(crate) fn codegen_fn<'tcx>( tcx: TyCtxt<'tcx>, cx: &mut crate::CodegenCx, cached_func: Function, module: &mut dyn Module, instance: Instance<'tcx>, -) -> CodegenedFunction<'tcx> { +) -> CodegenedFunction { debug_assert!(!instance.substs.needs_infer()); let mir = tcx.instance_mir(instance.def); @@ -56,9 +56,9 @@ fn codegen_fn<'tcx>( }); // Declare function - let symbol_name = tcx.symbol_name(instance); + let symbol_name = tcx.symbol_name(instance).name.to_string(); let sig = get_function_sig(tcx, module.isa().triple(), instance); - let func_id = module.declare_function(symbol_name.name, Linkage::Local, &sig).unwrap(); + let func_id = module.declare_function(&symbol_name, Linkage::Local, &sig).unwrap(); // Make the FunctionBuilder let mut func_ctx = FunctionBuilderContext::new(); @@ -81,7 +81,7 @@ fn codegen_fn<'tcx>( let clif_comments = crate::pretty_clif::CommentWriter::new(tcx, instance); let func_debug_cx = if let Some(debug_context) = &mut cx.debug_context { - Some(debug_context.define_function(tcx, symbol_name.name, mir.span)) + Some(debug_context.define_function(tcx, &symbol_name, mir.span)) } else { None }; @@ -113,6 +113,7 @@ fn codegen_fn<'tcx>( tcx.sess.time("codegen clif ir", || codegen_fn_body(&mut fx, start_block)); // Recover all necessary data from fx, before accessing func will prevent future access to it. + let symbol_name = fx.symbol_name; let clif_comments = fx.clif_comments; let func_debug_cx = fx.func_debug_cx; @@ -121,7 +122,7 @@ fn codegen_fn<'tcx>( if cx.should_write_ir { crate::pretty_clif::write_clif_file( tcx.output_filenames(()), - symbol_name.name, + &symbol_name, "unopt", module.isa(), &func, @@ -135,11 +136,11 @@ fn codegen_fn<'tcx>( CodegenedFunction { symbol_name, func_id, func, clif_comments, func_debug_cx } } -fn compile_fn<'tcx>( +pub(crate) fn compile_fn( cx: &mut crate::CodegenCx, cached_context: &mut Context, module: &mut dyn Module, - codegened_func: CodegenedFunction<'tcx>, + codegened_func: CodegenedFunction, ) { let clif_comments = codegened_func.clif_comments; @@ -195,7 +196,7 @@ fn compile_fn<'tcx>( // Write optimized function to file for debugging crate::pretty_clif::write_clif_file( &cx.output_filenames, - codegened_func.symbol_name.name, + &codegened_func.symbol_name, "opt", module.isa(), &context.func, @@ -205,7 +206,7 @@ fn compile_fn<'tcx>( if let Some(disasm) = &context.compiled_code().unwrap().disasm { crate::pretty_clif::write_ir_file( &cx.output_filenames, - &format!("{}.vcode", codegened_func.symbol_name.name), + &format!("{}.vcode", codegened_func.symbol_name), |file| file.write_all(disasm.as_bytes()), ) } diff --git a/src/common.rs b/src/common.rs index 4a80b79a9dc1b..589594465783e 100644 --- a/src/common.rs +++ b/src/common.rs @@ -6,7 +6,6 @@ use rustc_index::vec::IndexVec; use rustc_middle::ty::layout::{ FnAbiError, FnAbiOfHelpers, FnAbiRequest, LayoutError, LayoutOfHelpers, }; -use rustc_middle::ty::SymbolName; use rustc_span::SourceFile; use rustc_target::abi::call::FnAbi; use rustc_target::abi::{Integer, Primitive}; @@ -246,7 +245,7 @@ pub(crate) struct FunctionCx<'m, 'clif, 'tcx: 'm> { pub(crate) func_debug_cx: Option, pub(crate) instance: Instance<'tcx>, - pub(crate) symbol_name: SymbolName<'tcx>, + pub(crate) symbol_name: String, pub(crate) mir: &'tcx Body<'tcx>, pub(crate) fn_abi: Option<&'tcx FnAbi<'tcx, Ty<'tcx>>>, diff --git a/src/concurrency_limiter.rs b/src/concurrency_limiter.rs new file mode 100644 index 0000000000000..dfde97920461e --- /dev/null +++ b/src/concurrency_limiter.rs @@ -0,0 +1,168 @@ +use std::sync::{Arc, Condvar, Mutex}; + +use rustc_session::Session; + +use jobserver::HelperThread; + +// FIXME don't panic when a worker thread panics + +pub(super) struct ConcurrencyLimiter { + helper_thread: Option, + state: Arc>, + available_token_condvar: Arc, +} + +impl ConcurrencyLimiter { + pub(super) fn new(sess: &Session, pending_jobs: usize) -> Self { + let state = Arc::new(Mutex::new(state::ConcurrencyLimiterState::new(pending_jobs))); + let available_token_condvar = Arc::new(Condvar::new()); + + let state_helper = state.clone(); + let available_token_condvar_helper = available_token_condvar.clone(); + let helper_thread = sess + .jobserver + .clone() + .into_helper_thread(move |token| { + let mut state = state_helper.lock().unwrap(); + state.add_new_token(token.unwrap()); + available_token_condvar_helper.notify_one(); + }) + .unwrap(); + ConcurrencyLimiter { + helper_thread: Some(helper_thread), + state, + available_token_condvar: Arc::new(Condvar::new()), + } + } + + pub(super) fn acquire(&mut self) -> ConcurrencyLimiterToken { + let mut state = self.state.lock().unwrap(); + loop { + state.assert_invariants(); + + if state.try_start_job() { + return ConcurrencyLimiterToken { + state: self.state.clone(), + available_token_condvar: self.available_token_condvar.clone(), + }; + } + + self.helper_thread.as_mut().unwrap().request_token(); + state = self.available_token_condvar.wait(state).unwrap(); + } + } + + pub(super) fn job_already_done(&mut self) { + let mut state = self.state.lock().unwrap(); + state.job_already_done(); + } +} + +impl Drop for ConcurrencyLimiter { + fn drop(&mut self) { + // + self.helper_thread.take(); + + // Assert that all jobs have finished + let state = Mutex::get_mut(Arc::get_mut(&mut self.state).unwrap()).unwrap(); + state.assert_done(); + } +} + +#[derive(Debug)] +pub(super) struct ConcurrencyLimiterToken { + state: Arc>, + available_token_condvar: Arc, +} + +impl Drop for ConcurrencyLimiterToken { + fn drop(&mut self) { + let mut state = self.state.lock().unwrap(); + state.job_finished(); + self.available_token_condvar.notify_one(); + } +} + +mod state { + use jobserver::Acquired; + + #[derive(Debug)] + pub(super) struct ConcurrencyLimiterState { + pending_jobs: usize, + active_jobs: usize, + + // None is used to represent the implicit token, Some to represent explicit tokens + tokens: Vec>, + } + + impl ConcurrencyLimiterState { + pub(super) fn new(pending_jobs: usize) -> Self { + ConcurrencyLimiterState { pending_jobs, active_jobs: 0, tokens: vec![None] } + } + + pub(super) fn assert_invariants(&self) { + // There must be no excess active jobs + assert!(self.active_jobs <= self.pending_jobs); + + // There may not be more active jobs than there are tokens + assert!(self.active_jobs <= self.tokens.len()); + } + + pub(super) fn assert_done(&self) { + assert_eq!(self.pending_jobs, 0); + assert_eq!(self.active_jobs, 0); + } + + pub(super) fn add_new_token(&mut self, token: Acquired) { + self.tokens.push(Some(token)); + self.drop_excess_capacity(); + } + + pub(super) fn try_start_job(&mut self) -> bool { + if self.active_jobs < self.tokens.len() { + // Using existing token + self.job_started(); + return true; + } + + false + } + + pub(super) fn job_started(&mut self) { + self.assert_invariants(); + self.active_jobs += 1; + self.drop_excess_capacity(); + self.assert_invariants(); + } + + pub(super) fn job_finished(&mut self) { + self.assert_invariants(); + self.pending_jobs -= 1; + self.active_jobs -= 1; + self.assert_invariants(); + self.drop_excess_capacity(); + self.assert_invariants(); + } + + pub(super) fn job_already_done(&mut self) { + self.assert_invariants(); + self.pending_jobs -= 1; + self.assert_invariants(); + self.drop_excess_capacity(); + self.assert_invariants(); + } + + fn drop_excess_capacity(&mut self) { + self.assert_invariants(); + + // Drop all tokens that can never be used anymore + self.tokens.truncate(std::cmp::max(self.pending_jobs, 1)); + + // Keep some excess tokens to satisfy requests faster + const MAX_EXTRA_CAPACITY: usize = 2; + self.tokens.truncate(std::cmp::max(self.active_jobs + MAX_EXTRA_CAPACITY, 1)); + + self.assert_invariants(); + } + } +} diff --git a/src/driver/aot.rs b/src/driver/aot.rs index 3220d16f72539..8eabe1cbcb150 100644 --- a/src/driver/aot.rs +++ b/src/driver/aot.rs @@ -4,6 +4,7 @@ use std::fs::File; use std::path::PathBuf; use std::sync::Arc; +use std::thread::JoinHandle; use rustc_codegen_ssa::back::metadata::create_compressed_metadata_file; use rustc_codegen_ssa::{CodegenResults, CompiledModule, CrateInfo, ModuleKind}; @@ -18,6 +19,7 @@ use rustc_session::Session; use cranelift_object::{ObjectBuilder, ObjectModule}; +use crate::concurrency_limiter::{ConcurrencyLimiter, ConcurrencyLimiterToken}; use crate::global_asm::GlobalAsmConfig; use crate::{prelude::*, BackendConfig}; @@ -27,18 +29,24 @@ struct ModuleCodegenResult { existing_work_product: Option<(WorkProductId, WorkProduct)>, } -impl HashStable for ModuleCodegenResult { +enum OngoingModuleCodegen { + Sync(Result), + Async(JoinHandle>), +} + +impl HashStable for OngoingModuleCodegen { fn hash_stable(&self, _: &mut HCX, _: &mut StableHasher) { // do nothing } } pub(crate) struct OngoingCodegen { - modules: Vec>, + modules: Vec, allocator_module: Option, metadata_module: Option, metadata: EncodedMetadata, crate_info: CrateInfo, + concurrency_limiter: ConcurrencyLimiter, } impl OngoingCodegen { @@ -50,7 +58,15 @@ impl OngoingCodegen { let mut work_products = FxHashMap::default(); let mut modules = vec![]; - for module_codegen_result in self.modules { + for module_codegen in self.modules { + let module_codegen_result = match module_codegen { + OngoingModuleCodegen::Sync(module_codegen_result) => module_codegen_result, + OngoingModuleCodegen::Async(join_handle) => match join_handle.join() { + Ok(module_codegen_result) => module_codegen_result, + Err(panic) => std::panic::resume_unwind(panic), + }, + }; + let module_codegen_result = match module_codegen_result { Ok(module_codegen_result) => module_codegen_result, Err(err) => sess.fatal(&err), @@ -90,6 +106,8 @@ impl OngoingCodegen { } } + drop(self.concurrency_limiter); + ( CodegenResults { modules, @@ -233,70 +251,90 @@ fn reuse_workproduct_for_cgu( fn module_codegen( tcx: TyCtxt<'_>, - (backend_config, global_asm_config, cgu_name): ( + (backend_config, global_asm_config, cgu_name, token): ( BackendConfig, Arc, rustc_span::Symbol, + ConcurrencyLimiterToken, ), -) -> Result { - let cgu = tcx.codegen_unit(cgu_name); - let mono_items = cgu.items_in_deterministic_order(tcx); +) -> OngoingModuleCodegen { + let (cgu_name, mut cx, mut module, codegened_functions) = tcx.sess.time("codegen cgu", || { + let cgu = tcx.codegen_unit(cgu_name); + let mono_items = cgu.items_in_deterministic_order(tcx); + + let mut module = make_module(tcx.sess, &backend_config, cgu_name.as_str().to_string()); + + let mut cx = crate::CodegenCx::new( + tcx, + backend_config.clone(), + module.isa(), + tcx.sess.opts.debuginfo != DebugInfo::None, + cgu_name, + ); + super::predefine_mono_items(tcx, &mut module, &mono_items); + let mut codegened_functions = vec![]; + for (mono_item, _) in mono_items { + match mono_item { + MonoItem::Fn(inst) => { + tcx.sess.time("codegen fn", || { + let codegened_function = crate::base::codegen_fn( + tcx, + &mut cx, + Function::new(), + &mut module, + inst, + ); + codegened_functions.push(codegened_function); + }); + } + MonoItem::Static(def_id) => { + crate::constant::codegen_static(tcx, &mut module, def_id) + } + MonoItem::GlobalAsm(item_id) => { + crate::global_asm::codegen_global_asm_item(tcx, &mut cx.global_asm, item_id); + } + } + } + crate::main_shim::maybe_create_entry_wrapper( + tcx, + &mut module, + &mut cx.unwind_context, + false, + cgu.is_primary(), + ); - let mut module = make_module(tcx.sess, &backend_config, cgu_name.as_str().to_string()); + let cgu_name = cgu.name().as_str().to_owned(); - let mut cx = crate::CodegenCx::new( - tcx, - backend_config.clone(), - module.isa(), - tcx.sess.opts.debuginfo != DebugInfo::None, - cgu_name, - ); - super::predefine_mono_items(tcx, &mut module, &mono_items); - let mut cached_context = Context::new(); - for (mono_item, _) in mono_items { - match mono_item { - MonoItem::Fn(inst) => { - tcx.sess.time("codegen fn", || { - crate::base::codegen_and_compile_fn( - tcx, - &mut cx, - &mut cached_context, - &mut module, - inst, - ) - }); - } - MonoItem::Static(def_id) => crate::constant::codegen_static(tcx, &mut module, def_id), - MonoItem::GlobalAsm(item_id) => { - crate::global_asm::codegen_global_asm_item(tcx, &mut cx.global_asm, item_id); + (cgu_name, cx, module, codegened_functions) + }); + + OngoingModuleCodegen::Async(std::thread::spawn(move || { + cx.profiler.clone().verbose_generic_activity("compile functions").run(|| { + let mut cached_context = Context::new(); + for codegened_func in codegened_functions { + crate::base::compile_fn(&mut cx, &mut cached_context, &mut module, codegened_func); } - } - } - crate::main_shim::maybe_create_entry_wrapper( - tcx, - &mut module, - &mut cx.unwind_context, - false, - cgu.is_primary(), - ); + }); - let global_asm_object_file = crate::global_asm::compile_global_asm( - &global_asm_config, - cgu.name().as_str(), - &cx.global_asm, - )?; - - tcx.sess.time("write object file", || { - emit_cgu( - &global_asm_config.output_filenames, - &cx.profiler, - cgu.name().as_str().to_string(), - module, - cx.debug_context, - cx.unwind_context, - global_asm_object_file, - ) - }) + let global_asm_object_file = + cx.profiler.verbose_generic_activity("compile assembly").run(|| { + crate::global_asm::compile_global_asm(&global_asm_config, &cgu_name, &cx.global_asm) + })?; + + let codegen_result = cx.profiler.verbose_generic_activity("write object file").run(|| { + emit_cgu( + &global_asm_config.output_filenames, + &cx.profiler, + cgu_name, + module, + cx.debug_context, + cx.unwind_context, + global_asm_object_file, + ) + }); + std::mem::drop(token); + codegen_result + })) } pub(crate) fn run_aot( @@ -321,6 +359,8 @@ pub(crate) fn run_aot( let global_asm_config = Arc::new(crate::global_asm::GlobalAsmConfig::new(tcx)); + let mut concurrency_limiter = ConcurrencyLimiter::new(tcx.sess, cgus.len()); + let modules = super::time(tcx, backend_config.display_cg_time, "codegen mono items", || { cgus.iter() .map(|cgu| { @@ -338,14 +378,22 @@ pub(crate) fn run_aot( .with_task( dep_node, tcx, - (backend_config.clone(), global_asm_config.clone(), cgu.name()), + ( + backend_config.clone(), + global_asm_config.clone(), + cgu.name(), + concurrency_limiter.acquire(), + ), module_codegen, Some(rustc_middle::dep_graph::hash_result), ) .0 } - CguReuse::PreLto => reuse_workproduct_for_cgu(tcx, &*cgu), - CguReuse::PostLto => unreachable!(), + CguReuse::PreLto => unreachable!(), + CguReuse::PostLto => { + concurrency_limiter.job_already_done(); + OngoingModuleCodegen::Sync(reuse_workproduct_for_cgu(tcx, &*cgu)) + } } }) .collect::>() @@ -424,6 +472,7 @@ pub(crate) fn run_aot( metadata_module, metadata, crate_info: CrateInfo::new(tcx, target_cpu), + concurrency_limiter, }) } @@ -453,5 +502,5 @@ fn determine_cgu_reuse<'tcx>(tcx: TyCtxt<'tcx>, cgu: &CodegenUnit<'tcx>) -> CguR cgu.name() ); - if tcx.try_mark_green(&dep_node) { CguReuse::PreLto } else { CguReuse::No } + if tcx.try_mark_green(&dep_node) { CguReuse::PostLto } else { CguReuse::No } } diff --git a/src/lib.rs b/src/lib.rs index 40dab58523c20..913414e761821 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,6 +4,7 @@ #![warn(unused_lifetimes)] #![warn(unreachable_pub)] +extern crate jobserver; #[macro_use] extern crate rustc_middle; extern crate rustc_ast; @@ -53,6 +54,7 @@ mod cast; mod codegen_i128; mod common; mod compiler_builtins; +mod concurrency_limiter; mod config; mod constant; mod debuginfo;