Skip to content

Commit

Permalink
Auto merge of rust-lang#11764 - weihanglo:jobserver-cleanup, r=epage
Browse files Browse the repository at this point in the history
Jobserver cleanup
  • Loading branch information
bors committed Mar 1, 2023
2 parents e472ccc + d8fb0db commit 9580786
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 245 deletions.
24 changes: 0 additions & 24 deletions src/cargo/core/compiler/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ pub struct Context<'a, 'cfg> {
/// metadata files in addition to the rlib itself.
rmeta_required: HashSet<Unit>,

/// When we're in jobserver-per-rustc process mode, this keeps those
/// jobserver clients for each Unit (which eventually becomes a rustc
/// process).
pub rustc_clients: HashMap<Unit, Client>,

/// Map of the LTO-status of each unit. This indicates what sort of
/// compilation is happening (only object, only bitcode, both, etc), and is
/// precalculated early on.
Expand Down Expand Up @@ -124,7 +119,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
primary_packages: HashSet::new(),
files: None,
rmeta_required: HashSet::new(),
rustc_clients: HashMap::new(),
lto: HashMap::new(),
metadata_for_doc_units: HashMap::new(),
failed_scrape_units: Arc::new(Mutex::new(HashSet::new())),
Expand Down Expand Up @@ -614,24 +608,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
self.rmeta_required.contains(unit)
}

/// Used by `-Zjobserver-per-rustc`.
pub fn new_jobserver(&mut self) -> CargoResult<Client> {
let tokens = self.bcx.jobs() as usize;
let client = Client::new(tokens).with_context(|| "failed to create jobserver")?;

// Drain the client fully
for i in 0..tokens {
client.acquire_raw().with_context(|| {
format!(
"failed to fully drain {}/{} token from jobserver at startup",
i, tokens,
)
})?;
}

Ok(client)
}

/// Finds metadata for Doc/Docscrape units.
///
/// rustdoc needs a -Cmetadata flag in order to recognize StableCrateIds that refer to
Expand Down
16 changes: 0 additions & 16 deletions src/cargo/core/compiler/job_queue/job_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,4 @@ impl<'a, 'cfg> JobState<'a, 'cfg> {
self.messages
.push(Message::FutureIncompatReport(self.id, report));
}

/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
/// on the passed client.
///
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
self.messages.push(Message::NeedsToken(self.id));
}

/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
self.messages.push(Message::ReleaseToken(self.id));
}
}
177 changes: 30 additions & 147 deletions src/cargo/core/compiler/job_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,46 +30,30 @@
//!
//! ## Jobserver
//!
//! Cargo and rustc have a somewhat non-trivial jobserver relationship with each
//! other, which is due to scaling issues with sharing a single jobserver
//! amongst what is potentially hundreds of threads of work on many-cored
//! systems on (at least) Linux, and likely other platforms as well.
//! As of Feb. 2023, Cargo and rustc have a relatively simple jobserver
//! relationship with each other. They share a single jobserver amongst what
//! is potentially hundreds of threads of work on many-cored systems.
//! The jobserver could come from either the environment (e.g., from a `make`
//! invocation), or from Cargo creating its own jobserver server if there is no
//! jobserver to inherit from.
//!
//! Cargo wants to complete the build as quickly as possible, fully saturating
//! all cores (as constrained by the -j=N) parameter. Cargo also must not spawn
//! all cores (as constrained by the `-j=N`) parameter. Cargo also must not spawn
//! more than N threads of work: the total amount of tokens we have floating
//! around must always be limited to N.
//!
//! It is not really possible to optimally choose which crate should build first
//! or last; nor is it possible to decide whether to give an additional token to
//! rustc first or rather spawn a new crate of work. For now, the algorithm we
//! implement prioritizes spawning as many crates (i.e., rustc processes) as
//! possible, and then filling each rustc with tokens on demand.
//! It is not really possible to optimally choose which crate should build
//! first or last; nor is it possible to decide whether to give an additional
//! token to rustc first or rather spawn a new crate of work. The algorithm in
//! Cargo prioritizes spawning as many crates (i.e., rustc processes) as
//! possible. In short, the jobserver relationship among Cargo and rustc
//! processes is **1 `cargo` to N `rustc`**. Cargo knows nothing beyond rustc
//! processes in terms of parallelism[^parallel-rustc].
//!
//! We integrate with the [jobserver], originating from GNU make, to make sure
//! that build scripts which use make to build C code can cooperate with us on
//! the number of used tokens and avoid overfilling the system we're on.
//!
//! The jobserver is unfortunately a very simple protocol, so we enhance it a
//! little when we know that there is a rustc on the other end. Via the stderr
//! pipe we have to rustc, we get messages such as `NeedsToken` and
//! `ReleaseToken` from rustc.
//!
//! [`NeedsToken`] indicates that a rustc is interested in acquiring a token,
//! but never that it would be impossible to make progress without one (i.e.,
//! it would be incorrect for rustc to not terminate due to an unfulfilled
//! `NeedsToken` request); we do not usually fulfill all `NeedsToken` requests for a
//! given rustc.
//!
//! [`ReleaseToken`] indicates that a rustc is done with one of its tokens and
//! is ready for us to re-acquire ownership — we will either release that token
//! back into the general pool or reuse it ourselves. Note that rustc will
//! inform us that it is releasing a token even if it itself is also requesting
//! tokens; is up to us whether to return the token to that same rustc.
//!
//! `jobserver` also manages the allocation of tokens to rustc beyond
//! the implicit token each rustc owns (i.e., the ones used for parallel LLVM
//! work and parallel rustc threads).
//! We integrate with the [jobserver] crate, originating from GNU make
//! [POSIX jobserver], to make sure that build scripts which use make to
//! build C code can cooperate with us on the number of used tokens and
//! avoid overfilling the system we're on.
//!
//! ## Scheduling
//!
Expand Down Expand Up @@ -113,17 +97,24 @@
//!
//! See [`Message`] for all available message kinds.
//!
//! [^parallel-rustc]: In fact, `jobserver` that Cargo uses also manages the
//! allocation of tokens to rustc beyond the implicit token each rustc owns
//! (i.e., the ones used for parallel LLVM work and parallel rustc threads).
//! See also ["Rust Compiler Development Guide: Parallel Compilation"]
//! and [this comment][rustc-codegen] in rust-lang/rust.
//!
//! ["Rust Compiler Development Guide: Parallel Compilation"]: https://rustc-dev-guide.rust-lang.org/parallel-rustc.html
//! [rustc-codegen]: https://github.com/rust-lang/rust/blob/5423745db8b434fcde54888b35f518f00cce00e4/compiler/rustc_codegen_ssa/src/back/write.rs#L1204-L1217
//! [jobserver]: https://docs.rs/jobserver
//! [`NeedsToken`]: Message::NeedsToken
//! [`ReleaseToken`]: Message::ReleaseToken
//! [POSIX jobserver]: https://www.gnu.org/software/make/manual/html_node/POSIX-Jobserver.html
//! [`push`]: Queue::push
//! [`push_bounded`]: Queue::push_bounded
mod job;
mod job_state;

use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fmt::Write as _;
use std::io;
use std::path::{Path, PathBuf};
Expand All @@ -133,7 +124,7 @@ use std::time::Duration;

use anyhow::{format_err, Context as _};
use cargo_util::ProcessBuilder;
use jobserver::{Acquired, Client, HelperThread};
use jobserver::{Acquired, HelperThread};
use log::{debug, trace};
use semver::Version;

Expand Down Expand Up @@ -199,13 +190,6 @@ struct DrainState<'cfg> {
/// single rustc process.
tokens: Vec<Acquired>,

/// rustc per-thread tokens, when in jobserver-per-rustc mode.
rustc_tokens: HashMap<JobId, Vec<Acquired>>,

/// This represents the list of rustc jobs (processes) and associated
/// clients that are interested in receiving a token.
to_send_clients: BTreeMap<JobId, Vec<Client>>,

/// The list of jobs that we have not yet started executing, but have
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
/// allow us to request jobserver tokens pretty early.
Expand Down Expand Up @@ -387,12 +371,6 @@ enum Message {
Token(io::Result<Acquired>),
Finish(JobId, Artifact, CargoResult<()>),
FutureIncompatReport(JobId, Vec<FutureBreakageItem>),

// This client should get release_raw called on it with one of our tokens
NeedsToken(JobId),

// A token previously passed to a NeedsToken client is being released.
ReleaseToken(JobId),
}

impl<'cfg> JobQueue<'cfg> {
Expand Down Expand Up @@ -507,8 +485,6 @@ impl<'cfg> JobQueue<'cfg> {
next_id: 0,
timings: self.timings,
tokens: Vec::new(),
rustc_tokens: HashMap::new(),
to_send_clients: BTreeMap::new(),
pending_queue: Vec::new(),
print: DiagnosticPrinter::new(cx.bcx.config),
finished: 0,
Expand Down Expand Up @@ -600,46 +576,9 @@ impl<'cfg> DrainState<'cfg> {
self.active.len() < self.tokens.len() + 1
}

// The oldest job (i.e., least job ID) is the one we grant tokens to first.
fn pop_waiting_client(&mut self) -> (JobId, Client) {
// FIXME: replace this with BTreeMap::first_entry when that stabilizes.
let key = *self
.to_send_clients
.keys()
.next()
.expect("at least one waiter");
let clients = self.to_send_clients.get_mut(&key).unwrap();
let client = clients.pop().unwrap();
if clients.is_empty() {
self.to_send_clients.remove(&key);
}
(key, client)
}

// If we managed to acquire some extra tokens, send them off to a waiting rustc.
fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
let (id, client) = self.pop_waiting_client();
// This unwrap is guaranteed to succeed. `active` must be at least
// length 1, as otherwise there can't be a client waiting to be sent
// on, so tokens.len() must also be at least one.
let token = self.tokens.pop().unwrap();
self.rustc_tokens
.entry(id)
.or_insert_with(Vec::new)
.push(token);
client
.release_raw()
.with_context(|| "failed to release jobserver token")?;
}

Ok(())
}

fn handle_event(
&mut self,
cx: &mut Context<'_, '_>,
jobserver_helper: &HelperThread,
plan: &mut BuildPlan,
event: Message,
) -> Result<(), ErrorToHandle> {
Expand Down Expand Up @@ -699,19 +638,6 @@ impl<'cfg> DrainState<'cfg> {
Artifact::All => {
trace!("end: {:?}", id);
self.finished += 1;
if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
// This puts back the tokens that this rustc
// acquired into our primary token list.
//
// This represents a rustc bug: it did not
// release all of its thread tokens but finished
// completely. But we want to make Cargo resilient
// to such rustc bugs, as they're generally not
// fatal in nature (i.e., Cargo can make progress
// still, and the build might not even fail).
self.tokens.extend(rustc_tokens);
}
self.to_send_clients.remove(&id);
self.report_warning_count(
cx.bcx.config,
id,
Expand Down Expand Up @@ -756,31 +682,6 @@ impl<'cfg> DrainState<'cfg> {
let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
self.tokens.push(token);
}
Message::NeedsToken(id) => {
trace!("queue token request");
jobserver_helper.request_token();
let client = cx.rustc_clients[&self.active[&id]].clone();
self.to_send_clients
.entry(id)
.or_insert_with(Vec::new)
.push(client);
}
Message::ReleaseToken(id) => {
// Note that this pops off potentially a completely
// different token, but all tokens of the same job are
// conceptually the same so that's fine.
//
// self.tokens is a "pool" -- the order doesn't matter -- and
// this transfers ownership of the token into that pool. If we
// end up using it on the next go around, then this token will
// be truncated, same as tokens obtained through Message::Token.
let rustc_tokens = self
.rustc_tokens
.get_mut(&id)
.expect("no tokens associated");
self.tokens
.push(rustc_tokens.pop().expect("rustc releases token it has"));
}
}

Ok(())
Expand All @@ -795,19 +696,6 @@ impl<'cfg> DrainState<'cfg> {
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let mut events = self.messages.try_pop_all();
trace!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
self.rustc_tokens
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
self.to_send_clients
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
events.len(),
);
if events.is_empty() {
loop {
self.tick_progress();
Expand Down Expand Up @@ -866,17 +754,13 @@ impl<'cfg> DrainState<'cfg> {
break;
}

if let Err(e) = self.grant_rustc_token_requests() {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
}

// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
for event in self.wait_for_events() {
if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
if let Err(event_err) = self.handle_event(cx, plan, event) {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
}
}
Expand Down Expand Up @@ -970,7 +854,6 @@ impl<'cfg> DrainState<'cfg> {
self.active.len(),
self.pending_queue.len(),
self.queue.len(),
self.rustc_tokens.len(),
);
self.timings.record_cpu();

Expand Down
34 changes: 1 addition & 33 deletions src/cargo/core/compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,14 +715,7 @@ fn prepare_rustc(
base.env("CARGO_TARGET_TMPDIR", tmp.display().to_string());
}

if cx.bcx.config.cli_unstable().jobserver_per_rustc {
let client = cx.new_jobserver()?;
base.inherit_jobserver(&client);
base.arg("-Z").arg("jobserver-token-requests");
assert!(cx.rustc_clients.insert(unit.clone(), client).is_none());
} else {
base.inherit_jobserver(&cx.jobserver);
}
base.inherit_jobserver(&cx.jobserver);
build_base_args(cx, &mut base, unit, crate_types)?;
build_deps_args(&mut base, cx, unit)?;
Ok(base)
Expand Down Expand Up @@ -1701,31 +1694,6 @@ fn on_stderr_line_inner(
return Ok(false);
}

#[derive(serde::Deserialize)]
struct JobserverNotification {
jobserver_event: Event,
}

#[derive(Debug, serde::Deserialize)]
enum Event {
WillAcquire,
Release,
}

if let Ok(JobserverNotification { jobserver_event }) =
serde_json::from_str::<JobserverNotification>(compiler_message.get())
{
trace!(
"found jobserver directive from rustc: `{:?}`",
jobserver_event
);
match jobserver_event {
Event::WillAcquire => state.will_acquire(),
Event::Release => state.release_token(),
}
return Ok(false);
}

// And failing all that above we should have a legitimate JSON diagnostic
// from the compiler, so wrap it in an external Cargo JSON message
// indicating which package it came from and then emit it.
Expand Down
Loading

0 comments on commit 9580786

Please sign in to comment.