From 05898e168330477b8b3de4952855b5f680c9b3da Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Wed, 22 Feb 2023 14:40:47 +0000 Subject: [PATCH 1/4] doc: enhancement of `job_queue` mod-level odc --- src/cargo/core/compiler/job_queue.rs | 138 ++++++++++++++++++--------- 1 file changed, 93 insertions(+), 45 deletions(-) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue.rs index 47b2e769445..d7417e89fab 100644 --- a/src/cargo/core/compiler/job_queue.rs +++ b/src/cargo/core/compiler/job_queue.rs @@ -1,16 +1,39 @@ -//! This module implements the job queue which determines the ordering in which -//! rustc is spawned off. It also manages the allocation of jobserver tokens to -//! rustc beyond the implicit token each rustc owns (i.e., the ones used for -//! parallel LLVM work and parallel rustc threads). +//! Management of the interaction between the main `cargo` and all spawned jobs. +//! +//! ## Overview +//! +//! This module implements a job queue. A job here represents a unit of work, +//! which is roughly a rusc invocation, a build script run, or just a no-op. +//! The job queue primarily handles the following things: +//! +//! * Spawns concurrent jobs. Depending on its [`Freshness`], a job could be +//! either executed on a spawned thread or ran on the same thread to avoid +//! the threading overhead. +//! * Controls the number of concurrency. It allocates and manages [`jobserver`] +//! tokens to each spawned off rustc and build scripts. +//! * Manages the communication between the main `cargo` process and its +//! spawned jobs. Those [`Message`]s are sent over a [`Queue`] shared +//! across threads. +//! * Schedules the execution order of each [`Job`]. Priorities are determined +//! when calling [`JobQueue::enqueue`] to enqueue a job. The scheduling is +//! relatively rudimentary and could likely be improved. +//! +//! A rough outline of building a queue and executing jobs is: +//! +//! 1. [`JobQueue::new`] to simply create one queue. +//! 2. [`JobQueue::enqueue`] to add new jobs onto the queue. +//! 3. Consumes the queue and executes all jobs via [`JobQueue::execute`]. +//! +//! The primary loop happens insides [`JobQueue::execute`], which is effectively +//! [`DrainState::drain_the_queue`]. [`DrainState`] is, as its name tells, +//! the running state of the job queue getting drained. +//! +//! ## Jobserver //! //! Cargo and rustc have a somewhat non-trivial jobserver relationship with each //! other, which is due to scaling issues with sharing a single jobserver //! amongst what is potentially hundreds of threads of work on many-cored -//! systems on (at least) linux, and likely other platforms as well. -//! -//! The details of this algorithm are (also) written out in -//! src/librustc_jobserver/lib.rs. What follows is a description focusing on the -//! Cargo side of things. +//! systems on (at least) Linux, and likely other platforms as well. //! //! Cargo wants to complete the build as quickly as possible, fully saturating //! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn @@ -23,31 +46,78 @@ //! implement prioritizes spawning as many crates (i.e., rustc processes) as //! possible, and then filling each rustc with tokens on demand. //! -//! The primary loop is in `drain_the_queue` below. -//! -//! We integrate with the jobserver, originating from GNU make, to make sure +//! We integrate with the [jobserver], originating from GNU make, to make sure //! that build scripts which use make to build C code can cooperate with us on //! the number of used tokens and avoid overfilling the system we're on. //! //! The jobserver is unfortunately a very simple protocol, so we enhance it a //! little when we know that there is a rustc on the other end. Via the stderr -//! pipe we have to rustc, we get messages such as "NeedsToken" and -//! "ReleaseToken" from rustc. +//! pipe we have to rustc, we get messages such as `NeedsToken` and +//! `ReleaseToken` from rustc. //! -//! "NeedsToken" indicates that a rustc is interested in acquiring a token, but -//! never that it would be impossible to make progress without one (i.e., it -//! would be incorrect for rustc to not terminate due to an unfulfilled -//! NeedsToken request); we do not usually fulfill all NeedsToken requests for a +//! [`NeedsToken`] indicates that a rustc is interested in acquiring a token, +//! but never that it would be impossible to make progress without one (i.e., +//! it would be incorrect for rustc to not terminate due to an unfulfilled +//! `NeedsToken` request); we do not usually fulfill all `NeedsToken` requests for a //! given rustc. //! -//! "ReleaseToken" indicates that a rustc is done with one of its tokens and is -//! ready for us to re-acquire ownership -- we will either release that token +//! [`ReleaseToken`] indicates that a rustc is done with one of its tokens and +//! is ready for us to re-acquire ownership — we will either release that token //! back into the general pool or reuse it ourselves. Note that rustc will //! inform us that it is releasing a token even if it itself is also requesting -//! tokens; is is up to us whether to return the token to that same rustc. +//! tokens; is up to us whether to return the token to that same rustc. +//! +//! `jobserver` also manages the allocation of tokens to rustc beyond +//! the implicit token each rustc owns (i.e., the ones used for parallel LLVM +//! work and parallel rustc threads). +//! +//! ## Scheduling +//! +//! The current scheduling algorithm is not really polished. It is simply based +//! on a dependency graph [`DependencyQueue`]. We continue adding nodes onto +//! the graph until we finalize it. When the graph gets finalized, it finds the +//! sum of the cost of each dependencies of each node, including transitively. +//! The sum of dependency cost turns out to be the cost of each given node. +//! +//! At the time being, the cost is just passed as a fixed placeholder in +//! [`JobQueue::enqueue`]. In the future, we could explore more possibilities +//! around it. For instance, we start persisting timing information for each +//! build somewhere. For a subsequent build, we can look into the historical +//! data and perform a PGO-like optimization to prioritize jobs, making a build +//! fully pipelined. //! -//! The current scheduling algorithm is relatively primitive and could likely be -//! improved. +//! ## Message queue +//! +//! Each spawned thread running a process uses the message queue [`Queue`] to +//! send messages back to the main thread (the one running `cargo`). +//! The main thread coordinates everything, and handles printing output. +//! +//! It is important to be careful which messages use [`push`] vs [`push_bounded`]. +//! `push` is for priority messages (like tokens, or "finished") where the +//! sender shouldn't block. We want to handle those so real work can proceed +//! ASAP. +//! +//! `push_bounded` is only for messages being printed to stdout/stderr. Being +//! bounded prevents a flood of messages causing a large amount of memory +//! being used. +//! +//! `push` also avoids blocking which helps avoid deadlocks. For example, when +//! the diagnostic server thread is dropped, it waits for the thread to exit. +//! But if the thread is blocked on a full queue, and there is a critical +//! error, the drop will deadlock. This should be fixed at some point in the +//! future. The jobserver thread has a similar problem, though it will time +//! out after 1 second. +//! +//! To access the message queue, each running `Job` is given its own [`JobState`], +//! containing everything it needs to communicate with the main thread. +//! +//! See [`Message`] for all available message kinds. +//! +//! [jobserver]: https://docs.rs/jobserver +//! [`NeedsToken`]: Message::NeedsToken +//! [`ReleaseToken`]: Message::ReleaseToken +//! [`push`]: Queue::push +//! [`push_bounded`]: Queue::push_bounded use std::cell::{Cell, RefCell}; use std::collections::{BTreeMap, HashMap, HashSet}; @@ -100,28 +170,6 @@ pub struct JobQueue<'cfg> { /// /// It is created from JobQueue when we have fully assembled the crate graph /// (i.e., all package dependencies are known). -/// -/// # Message queue -/// -/// Each thread running a process uses the message queue to send messages back -/// to the main thread. The main thread coordinates everything, and handles -/// printing output. -/// -/// It is important to be careful which messages use `push` vs `push_bounded`. -/// `push` is for priority messages (like tokens, or "finished") where the -/// sender shouldn't block. We want to handle those so real work can proceed -/// ASAP. -/// -/// `push_bounded` is only for messages being printed to stdout/stderr. Being -/// bounded prevents a flood of messages causing a large amount of memory -/// being used. -/// -/// `push` also avoids blocking which helps avoid deadlocks. For example, when -/// the diagnostic server thread is dropped, it waits for the thread to exit. -/// But if the thread is blocked on a full queue, and there is a critical -/// error, the drop will deadlock. This should be fixed at some point in the -/// future. The jobserver thread has a similar problem, though it will time -/// out after 1 second. struct DrainState<'cfg> { // This is the length of the DependencyQueue when starting out total_units: usize, From 5e2f3ca152c24653931c8bcd1b8d088c59427574 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Wed, 22 Feb 2023 14:52:37 +0000 Subject: [PATCH 2/4] refactor: rename job_queue.rs -> job_queue/mod.rs --- src/cargo/core/compiler/{job_queue.rs => job_queue/mod.rs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename src/cargo/core/compiler/{job_queue.rs => job_queue/mod.rs} (100%) diff --git a/src/cargo/core/compiler/job_queue.rs b/src/cargo/core/compiler/job_queue/mod.rs similarity index 100% rename from src/cargo/core/compiler/job_queue.rs rename to src/cargo/core/compiler/job_queue/mod.rs From 69f4b5c6965c90b2e951190eeba8e1d736e1596b Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Wed, 22 Feb 2023 14:57:41 +0000 Subject: [PATCH 3/4] refactor: rename job.rs -> job_queue/job.rs --- src/cargo/core/compiler/custom_build.rs | 3 +-- src/cargo/core/compiler/fingerprint/mod.rs | 3 +-- src/cargo/core/compiler/{ => job_queue}/job.rs | 4 +++- src/cargo/core/compiler/job_queue/mod.rs | 8 ++++---- src/cargo/core/compiler/mod.rs | 6 ++---- 5 files changed, 11 insertions(+), 13 deletions(-) rename src/cargo/core/compiler/{ => job_queue}/job.rs (98%) diff --git a/src/cargo/core/compiler/custom_build.rs b/src/cargo/core/compiler/custom_build.rs index 11c3f1a24e4..40df22bb7ae 100644 --- a/src/cargo/core/compiler/custom_build.rs +++ b/src/cargo/core/compiler/custom_build.rs @@ -1,5 +1,4 @@ -use super::job::{Job, Work}; -use super::{fingerprint, Context, LinkType, Unit}; +use super::{fingerprint, Context, Job, LinkType, Unit, Work}; use crate::core::compiler::artifact; use crate::core::compiler::context::Metadata; use crate::core::compiler::job_queue::JobState; diff --git a/src/cargo/core/compiler/fingerprint/mod.rs b/src/cargo/core/compiler/fingerprint/mod.rs index e728263a38c..da03d8f787d 100644 --- a/src/cargo/core/compiler/fingerprint/mod.rs +++ b/src/cargo/core/compiler/fingerprint/mod.rs @@ -374,8 +374,7 @@ use crate::util::{internal, path_args, profile, StableHasher}; use crate::CARGO_ENV; use super::custom_build::BuildDeps; -use super::job::{Job, Work}; -use super::{BuildContext, Context, FileFlavor, Unit}; +use super::{BuildContext, Context, FileFlavor, Job, Unit, Work}; pub use dirty_reason::DirtyReason; diff --git a/src/cargo/core/compiler/job.rs b/src/cargo/core/compiler/job_queue/job.rs similarity index 98% rename from src/cargo/core/compiler/job.rs rename to src/cargo/core/compiler/job_queue/job.rs index b0f05070737..ae802d6a03c 100644 --- a/src/cargo/core/compiler/job.rs +++ b/src/cargo/core/compiler/job_queue/job.rs @@ -1,7 +1,9 @@ +//! See [`Job`] and [`Work`]. + use std::fmt; use std::mem; -use super::job_queue::JobState; +use super::JobState; use crate::core::compiler::fingerprint::DirtyReason; use crate::util::CargoResult; diff --git a/src/cargo/core/compiler/job_queue/mod.rs b/src/cargo/core/compiler/job_queue/mod.rs index d7417e89fab..5080560eec4 100644 --- a/src/cargo/core/compiler/job_queue/mod.rs +++ b/src/cargo/core/compiler/job_queue/mod.rs @@ -119,6 +119,8 @@ //! [`push`]: Queue::push //! [`push_bounded`]: Queue::push_bounded +mod job; + use std::cell::{Cell, RefCell}; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Write as _; @@ -135,11 +137,9 @@ use jobserver::{Acquired, Client, HelperThread}; use log::{debug, trace}; use semver::Version; +pub use self::job::Freshness::{self, Dirty, Fresh}; +pub use self::job::{Job, Work}; use super::context::OutputFile; -use super::job::{ - Freshness::{self, Dirty, Fresh}, - Job, -}; use super::timings::Timings; use super::{BuildContext, BuildPlan, CompileMode, Context, Unit}; use crate::core::compiler::future_incompat::{ diff --git a/src/cargo/core/compiler/mod.rs b/src/cargo/core/compiler/mod.rs index 877b7ccd1d0..bcc9a1596f2 100644 --- a/src/cargo/core/compiler/mod.rs +++ b/src/cargo/core/compiler/mod.rs @@ -42,7 +42,6 @@ mod crate_type; mod custom_build; mod fingerprint; pub mod future_incompat; -mod job; mod job_queue; mod layout; mod links; @@ -79,9 +78,8 @@ pub use self::context::{Context, Metadata}; pub use self::crate_type::CrateType; pub use self::custom_build::{BuildOutput, BuildScriptOutputs, BuildScripts}; pub(crate) use self::fingerprint::DirtyReason; -pub use self::job::Freshness; -use self::job::{Job, Work}; -use self::job_queue::{JobQueue, JobState}; +pub use self::job_queue::Freshness; +use self::job_queue::{Job, JobQueue, JobState, Work}; pub(crate) use self::layout::Layout; pub use self::lto::Lto; use self::output_depinfo::output_depinfo; From ff7b54d9c2e58b3416b97b851c212f3cb10457a3 Mon Sep 17 00:00:00 2001 From: Weihang Lo Date: Wed, 22 Feb 2023 16:43:25 +0000 Subject: [PATCH 4/4] refactor: move `JobState` to `job_state.rs` Also add two new methods: - `JobState::new` to create new `JobState` - `JobState::run_to_finish` is extracted from old [`doit`][1]. [1]: https://github.com/rust-lang/cargo/blob/7ddcf0fe3348b7e0f4ad4a730eab60a20638ef28/src/cargo/core/compiler/job_queue.rs#L1122-L1168 --- .../core/compiler/job_queue/job_state.rs | 213 ++++++++++++++++++ src/cargo/core/compiler/job_queue/mod.rs | 209 +---------------- 2 files changed, 221 insertions(+), 201 deletions(-) create mode 100644 src/cargo/core/compiler/job_queue/job_state.rs diff --git a/src/cargo/core/compiler/job_queue/job_state.rs b/src/cargo/core/compiler/job_queue/job_state.rs new file mode 100644 index 00000000000..9bd376aace9 --- /dev/null +++ b/src/cargo/core/compiler/job_queue/job_state.rs @@ -0,0 +1,213 @@ +//! See [`JobState`]. + +use std::{cell::Cell, marker, sync::Arc}; + +use cargo_util::ProcessBuilder; + +use crate::core::compiler::context::OutputFile; +use crate::core::compiler::future_incompat::FutureBreakageItem; +use crate::util::Queue; +use crate::CargoResult; + +use super::{Artifact, DiagDedupe, Job, JobId, Message}; + +/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything +/// necessary to communicate between the main thread and the execution of the job. +/// +/// The job may execute on either a dedicated thread or the main thread. If the job executes on the +/// main thread, the `output` field must be set to prevent a deadlock. +pub struct JobState<'a, 'cfg> { + /// Channel back to the main thread to coordinate messages and such. + /// + /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on + /// the message queue to prevent a deadlock. + messages: Arc>, + + /// Normally output is sent to the job queue with backpressure. When the job is fresh + /// however we need to immediately display the output to prevent a deadlock as the + /// output messages are processed on the same thread as they are sent from. `output` + /// defines where to output in this case. + /// + /// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed + /// between threads. This means that it isn't possible for multiple output messages to be + /// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case + /// interleaving is still prevented as the lock would be held for the whole printing of an + /// output message. + output: Option<&'a DiagDedupe<'cfg>>, + + /// The job id that this state is associated with, used when sending + /// messages back to the main thread. + id: JobId, + + /// Whether or not we're expected to have a call to `rmeta_produced`. Once + /// that method is called this is dynamically set to `false` to prevent + /// sending a double message later on. + rmeta_required: Cell, + + // Historical versions of Cargo made use of the `'a` argument here, so to + // leave the door open to future refactorings keep it here. + _marker: marker::PhantomData<&'a ()>, +} + +impl<'a, 'cfg> JobState<'a, 'cfg> { + pub(super) fn new( + id: JobId, + messages: Arc>, + output: Option<&'a DiagDedupe<'cfg>>, + rmeta_required: bool, + ) -> Self { + Self { + id, + messages, + output, + rmeta_required: Cell::new(rmeta_required), + _marker: marker::PhantomData, + } + } + + pub fn running(&self, cmd: &ProcessBuilder) { + self.messages.push(Message::Run(self.id, cmd.to_string())); + } + + pub fn build_plan( + &self, + module_name: String, + cmd: ProcessBuilder, + filenames: Arc>, + ) { + self.messages + .push(Message::BuildPlanMsg(module_name, cmd, filenames)); + } + + pub fn stdout(&self, stdout: String) -> CargoResult<()> { + if let Some(dedupe) = self.output { + writeln!(dedupe.config.shell().out(), "{}", stdout)?; + } else { + self.messages.push_bounded(Message::Stdout(stdout)); + } + Ok(()) + } + + pub fn stderr(&self, stderr: String) -> CargoResult<()> { + if let Some(dedupe) = self.output { + let mut shell = dedupe.config.shell(); + shell.print_ansi_stderr(stderr.as_bytes())?; + shell.err().write_all(b"\n")?; + } else { + self.messages.push_bounded(Message::Stderr(stderr)); + } + Ok(()) + } + + /// See [`Message::Diagnostic`] and [`Message::WarningCount`]. + pub fn emit_diag(&self, level: String, diag: String, fixable: bool) -> CargoResult<()> { + if let Some(dedupe) = self.output { + let emitted = dedupe.emit_diag(&diag)?; + if level == "warning" { + self.messages.push(Message::WarningCount { + id: self.id, + emitted, + fixable, + }); + } + } else { + self.messages.push_bounded(Message::Diagnostic { + id: self.id, + level, + diag, + fixable, + }); + } + Ok(()) + } + + /// See [`Message::Warning`]. + pub fn warning(&self, warning: String) -> CargoResult<()> { + self.messages.push_bounded(Message::Warning { + id: self.id, + warning, + }); + Ok(()) + } + + /// A method used to signal to the coordinator thread that the rmeta file + /// for an rlib has been produced. This is only called for some rmeta + /// builds when required, and can be called at any time before a job ends. + /// This should only be called once because a metadata file can only be + /// produced once! + pub fn rmeta_produced(&self) { + self.rmeta_required.set(false); + self.messages + .push(Message::Finish(self.id, Artifact::Metadata, Ok(()))); + } + + /// Drives a [`Job`] to finish. This ensures that a [`Message::Finish`] is + /// sent even if our job panics. + pub(super) fn run_to_finish(self, job: Job) { + let mut sender = FinishOnDrop { + messages: &self.messages, + id: self.id, + result: None, + }; + sender.result = Some(job.run(&self)); + + // If the `rmeta_required` wasn't consumed but it was set + // previously, then we either have: + // + // 1. The `job` didn't do anything because it was "fresh". + // 2. The `job` returned an error and didn't reach the point where + // it called `rmeta_produced`. + // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo. + // + // Ruling out the third, the other two are pretty common for 2 + // we'll just naturally abort the compilation operation but for 1 + // we need to make sure that the metadata is flagged as produced so + // send a synthetic message here. + if self.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() { + self.messages + .push(Message::Finish(self.id, Artifact::Metadata, Ok(()))); + } + + // Use a helper struct with a `Drop` implementation to guarantee + // that a `Finish` message is sent even if our job panics. We + // shouldn't panic unless there's a bug in Cargo, so we just need + // to make sure nothing hangs by accident. + struct FinishOnDrop<'a> { + messages: &'a Queue, + id: JobId, + result: Option>, + } + + impl Drop for FinishOnDrop<'_> { + fn drop(&mut self) { + let result = self + .result + .take() + .unwrap_or_else(|| Err(anyhow::format_err!("worker panicked"))); + self.messages + .push(Message::Finish(self.id, Artifact::All, result)); + } + } + } + + pub fn future_incompat_report(&self, report: Vec) { + self.messages + .push(Message::FutureIncompatReport(self.id, report)); + } + + /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) + /// on the passed client. + /// + /// This should arrange for the associated client to eventually get a token via + /// `client.release_raw()`. + pub fn will_acquire(&self) { + self.messages.push(Message::NeedsToken(self.id)); + } + + /// The rustc underlying this Job is informing us that it is done with a jobserver token. + /// + /// Note that it does *not* write that token back anywhere. + pub fn release_token(&self) { + self.messages.push(Message::ReleaseToken(self.id)); + } +} diff --git a/src/cargo/core/compiler/job_queue/mod.rs b/src/cargo/core/compiler/job_queue/mod.rs index 5080560eec4..2c055e42eb5 100644 --- a/src/cargo/core/compiler/job_queue/mod.rs +++ b/src/cargo/core/compiler/job_queue/mod.rs @@ -120,12 +120,12 @@ //! [`push_bounded`]: Queue::push_bounded mod job; +mod job_state; -use std::cell::{Cell, RefCell}; +use std::cell::RefCell; use std::collections::{BTreeMap, HashMap, HashSet}; use std::fmt::Write as _; use std::io; -use std::marker; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::thread::{self, Scope}; @@ -139,6 +139,7 @@ use semver::Version; pub use self::job::Freshness::{self, Dirty, Fresh}; pub use self::job::{Job, Work}; +pub use self::job_state::JobState; use super::context::OutputFile; use super::timings::Timings; use super::{BuildContext, BuildPlan, CompileMode, Context, Unit}; @@ -304,44 +305,6 @@ impl std::fmt::Display for JobId { } } -/// A `JobState` is constructed by `JobQueue::run` and passed to `Job::run`. It includes everything -/// necessary to communicate between the main thread and the execution of the job. -/// -/// The job may execute on either a dedicated thread or the main thread. If the job executes on the -/// main thread, the `output` field must be set to prevent a deadlock. -pub struct JobState<'a, 'cfg> { - /// Channel back to the main thread to coordinate messages and such. - /// - /// When the `output` field is `Some`, care must be taken to avoid calling `push_bounded` on - /// the message queue to prevent a deadlock. - messages: Arc>, - - /// Normally output is sent to the job queue with backpressure. When the job is fresh - /// however we need to immediately display the output to prevent a deadlock as the - /// output messages are processed on the same thread as they are sent from. `output` - /// defines where to output in this case. - /// - /// Currently the `Shell` inside `Config` is wrapped in a `RefCell` and thus can't be passed - /// between threads. This means that it isn't possible for multiple output messages to be - /// interleaved. In the future, it may be wrapped in a `Mutex` instead. In this case - /// interleaving is still prevented as the lock would be held for the whole printing of an - /// output message. - output: Option<&'a DiagDedupe<'cfg>>, - - /// The job id that this state is associated with, used when sending - /// messages back to the main thread. - id: JobId, - - /// Whether or not we're expected to have a call to `rmeta_produced`. Once - /// that method is called this is dynamically set to `false` to prevent - /// sending a double message later on. - rmeta_required: Cell, - - // Historical versions of Cargo made use of the `'a` argument here, so to - // leave the door open to future refactorings keep it here. - _marker: marker::PhantomData<&'a ()>, -} - /// Handler for deduplicating diagnostics. struct DiagDedupe<'cfg> { seen: RefCell>, @@ -432,105 +395,6 @@ enum Message { ReleaseToken(JobId), } -impl<'a, 'cfg> JobState<'a, 'cfg> { - pub fn running(&self, cmd: &ProcessBuilder) { - self.messages.push(Message::Run(self.id, cmd.to_string())); - } - - pub fn build_plan( - &self, - module_name: String, - cmd: ProcessBuilder, - filenames: Arc>, - ) { - self.messages - .push(Message::BuildPlanMsg(module_name, cmd, filenames)); - } - - pub fn stdout(&self, stdout: String) -> CargoResult<()> { - if let Some(dedupe) = self.output { - writeln!(dedupe.config.shell().out(), "{}", stdout)?; - } else { - self.messages.push_bounded(Message::Stdout(stdout)); - } - Ok(()) - } - - pub fn stderr(&self, stderr: String) -> CargoResult<()> { - if let Some(dedupe) = self.output { - let mut shell = dedupe.config.shell(); - shell.print_ansi_stderr(stderr.as_bytes())?; - shell.err().write_all(b"\n")?; - } else { - self.messages.push_bounded(Message::Stderr(stderr)); - } - Ok(()) - } - - /// See [`Message::Diagnostic`] and [`Message::WarningCount`]. - pub fn emit_diag(&self, level: String, diag: String, fixable: bool) -> CargoResult<()> { - if let Some(dedupe) = self.output { - let emitted = dedupe.emit_diag(&diag)?; - if level == "warning" { - self.messages.push(Message::WarningCount { - id: self.id, - emitted, - fixable, - }); - } - } else { - self.messages.push_bounded(Message::Diagnostic { - id: self.id, - level, - diag, - fixable, - }); - } - Ok(()) - } - - /// See [`Message::Warning`]. - pub fn warning(&self, warning: String) -> CargoResult<()> { - self.messages.push_bounded(Message::Warning { - id: self.id, - warning, - }); - Ok(()) - } - - /// A method used to signal to the coordinator thread that the rmeta file - /// for an rlib has been produced. This is only called for some rmeta - /// builds when required, and can be called at any time before a job ends. - /// This should only be called once because a metadata file can only be - /// produced once! - pub fn rmeta_produced(&self) { - self.rmeta_required.set(false); - self.messages - .push(Message::Finish(self.id, Artifact::Metadata, Ok(()))); - } - - pub fn future_incompat_report(&self, report: Vec) { - self.messages - .push(Message::FutureIncompatReport(self.id, report)); - } - - /// The rustc underlying this Job is about to acquire a jobserver token (i.e., block) - /// on the passed client. - /// - /// This should arrange for the associated client to eventually get a token via - /// `client.release_raw()`. - pub fn will_acquire(&self) { - self.messages.push(Message::NeedsToken(self.id)); - } - - /// The rustc underlying this Job is informing us that it is done with a jobserver token. - /// - /// Note that it does *not* write that token back anywhere. - pub fn release_token(&self) { - self.messages.push(Message::ReleaseToken(self.id)); - } -} - impl<'cfg> JobQueue<'cfg> { pub fn new(bcx: &BuildContext<'_, 'cfg>) -> JobQueue<'cfg> { JobQueue { @@ -1167,52 +1031,9 @@ impl<'cfg> DrainState<'cfg> { let is_fresh = job.freshness().is_fresh(); let rmeta_required = cx.rmeta_required(unit); - let doit = move |state: JobState<'_, '_>| { - let mut sender = FinishOnDrop { - messages: &state.messages, - id, - result: None, - }; - sender.result = Some(job.run(&state)); - - // If the `rmeta_required` wasn't consumed but it was set - // previously, then we either have: - // - // 1. The `job` didn't do anything because it was "fresh". - // 2. The `job` returned an error and didn't reach the point where - // it called `rmeta_produced`. - // 3. We forgot to call `rmeta_produced` and there's a bug in Cargo. - // - // Ruling out the third, the other two are pretty common for 2 - // we'll just naturally abort the compilation operation but for 1 - // we need to make sure that the metadata is flagged as produced so - // send a synthetic message here. - if state.rmeta_required.get() && sender.result.as_ref().unwrap().is_ok() { - state - .messages - .push(Message::Finish(state.id, Artifact::Metadata, Ok(()))); - } - - // Use a helper struct with a `Drop` implementation to guarantee - // that a `Finish` message is sent even if our job panics. We - // shouldn't panic unless there's a bug in Cargo, so we just need - // to make sure nothing hangs by accident. - struct FinishOnDrop<'a> { - messages: &'a Queue, - id: JobId, - result: Option>, - } - - impl Drop for FinishOnDrop<'_> { - fn drop(&mut self) { - let result = self - .result - .take() - .unwrap_or_else(|| Err(format_err!("worker panicked"))); - self.messages - .push(Message::Finish(self.id, Artifact::All, result)); - } - } + let doit = move |diag_dedupe| { + let state = JobState::new(id, messages, diag_dedupe, rmeta_required); + state.run_to_finish(job); }; match is_fresh { @@ -1220,25 +1041,11 @@ impl<'cfg> DrainState<'cfg> { self.timings.add_fresh(); // Running a fresh job on the same thread is often much faster than spawning a new // thread to run the job. - doit(JobState { - id, - messages, - output: Some(&self.diag_dedupe), - rmeta_required: Cell::new(rmeta_required), - _marker: marker::PhantomData, - }); + doit(Some(&self.diag_dedupe)); } false => { self.timings.add_dirty(); - scope.spawn(move || { - doit(JobState { - id, - messages: messages.clone(), - output: None, - rmeta_required: Cell::new(rmeta_required), - _marker: marker::PhantomData, - }) - }); + scope.spawn(move || doit(None)); } } }