Skip to content

Commit

Permalink
Remove -Zjobserver-per-rustc
Browse files Browse the repository at this point in the history
`-Zjobserver-per-rustc` has been broken for a long while. My guess is that we
didn't make `-Zjobserver-token-request` into rustc (rust-lang/rust#67398).
To reduce the complexity of the current job queue implementation, I propose to
remove this portion. Cargo doesn't really receive any `jobserver_event`[^1]
from the offical rustc.

We can always bring this back if needed.

[^1]: https://github.com/rust-lang/cargo/blob/65cab34dc75587eeaff68d3c19358c4d79041452/src/cargo/core/compiler/mod.rs#L1704-L1713
  • Loading branch information
weihanglo committed Feb 24, 2023
1 parent 65cab34 commit 7cb13bf
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 209 deletions.
24 changes: 0 additions & 24 deletions src/cargo/core/compiler/context/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ pub struct Context<'a, 'cfg> {
/// metadata files in addition to the rlib itself.
rmeta_required: HashSet<Unit>,

/// When we're in jobserver-per-rustc process mode, this keeps those
/// jobserver clients for each Unit (which eventually becomes a rustc
/// process).
pub rustc_clients: HashMap<Unit, Client>,

/// Map of the LTO-status of each unit. This indicates what sort of
/// compilation is happening (only object, only bitcode, both, etc), and is
/// precalculated early on.
Expand Down Expand Up @@ -124,7 +119,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
primary_packages: HashSet::new(),
files: None,
rmeta_required: HashSet::new(),
rustc_clients: HashMap::new(),
lto: HashMap::new(),
metadata_for_doc_units: HashMap::new(),
failed_scrape_units: Arc::new(Mutex::new(HashSet::new())),
Expand Down Expand Up @@ -614,24 +608,6 @@ impl<'a, 'cfg> Context<'a, 'cfg> {
self.rmeta_required.contains(unit)
}

/// Used by `-Zjobserver-per-rustc`.
pub fn new_jobserver(&mut self) -> CargoResult<Client> {
let tokens = self.bcx.jobs() as usize;
let client = Client::new(tokens).with_context(|| "failed to create jobserver")?;

// Drain the client fully
for i in 0..tokens {
client.acquire_raw().with_context(|| {
format!(
"failed to fully drain {}/{} token from jobserver at startup",
i, tokens,
)
})?;
}

Ok(client)
}

/// Finds metadata for Doc/Docscrape units.
///
/// rustdoc needs a -Cmetadata flag in order to recognize StableCrateIds that refer to
Expand Down
16 changes: 0 additions & 16 deletions src/cargo/core/compiler/job_queue/job_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,20 +194,4 @@ impl<'a, 'cfg> JobState<'a, 'cfg> {
self.messages
.push(Message::FutureIncompatReport(self.id, report));
}

/// The rustc underlying this Job is about to acquire a jobserver token (i.e., block)
/// on the passed client.
///
/// This should arrange for the associated client to eventually get a token via
/// `client.release_raw()`.
pub fn will_acquire(&self) {
self.messages.push(Message::NeedsToken(self.id));
}

/// The rustc underlying this Job is informing us that it is done with a jobserver token.
///
/// Note that it does *not* write that token back anywhere.
pub fn release_token(&self) {
self.messages.push(Message::ReleaseToken(self.id));
}
}
114 changes: 3 additions & 111 deletions src/cargo/core/compiler/job_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ mod job;
mod job_state;

use std::cell::RefCell;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::collections::{HashMap, HashSet};
use std::fmt::Write as _;
use std::io;
use std::path::{Path, PathBuf};
Expand All @@ -133,7 +133,7 @@ use std::time::Duration;

use anyhow::{format_err, Context as _};
use cargo_util::ProcessBuilder;
use jobserver::{Acquired, Client, HelperThread};
use jobserver::{Acquired, HelperThread};
use log::{debug, trace};
use semver::Version;

Expand Down Expand Up @@ -199,13 +199,6 @@ struct DrainState<'cfg> {
/// single rustc process.
tokens: Vec<Acquired>,

/// rustc per-thread tokens, when in jobserver-per-rustc mode.
rustc_tokens: HashMap<JobId, Vec<Acquired>>,

/// This represents the list of rustc jobs (processes) and associated
/// clients that are interested in receiving a token.
to_send_clients: BTreeMap<JobId, Vec<Client>>,

/// The list of jobs that we have not yet started executing, but have
/// retrieved from the `queue`. We eagerly pull jobs off the main queue to
/// allow us to request jobserver tokens pretty early.
Expand Down Expand Up @@ -387,12 +380,6 @@ enum Message {
Token(io::Result<Acquired>),
Finish(JobId, Artifact, CargoResult<()>),
FutureIncompatReport(JobId, Vec<FutureBreakageItem>),

// This client should get release_raw called on it with one of our tokens
NeedsToken(JobId),

// A token previously passed to a NeedsToken client is being released.
ReleaseToken(JobId),
}

impl<'cfg> JobQueue<'cfg> {
Expand Down Expand Up @@ -507,8 +494,6 @@ impl<'cfg> JobQueue<'cfg> {
next_id: 0,
timings: self.timings,
tokens: Vec::new(),
rustc_tokens: HashMap::new(),
to_send_clients: BTreeMap::new(),
pending_queue: Vec::new(),
print: DiagnosticPrinter::new(cx.bcx.config),
finished: 0,
Expand Down Expand Up @@ -600,46 +585,9 @@ impl<'cfg> DrainState<'cfg> {
self.active.len() < self.tokens.len() + 1
}

// The oldest job (i.e., least job ID) is the one we grant tokens to first.
fn pop_waiting_client(&mut self) -> (JobId, Client) {
// FIXME: replace this with BTreeMap::first_entry when that stabilizes.
let key = *self
.to_send_clients
.keys()
.next()
.expect("at least one waiter");
let clients = self.to_send_clients.get_mut(&key).unwrap();
let client = clients.pop().unwrap();
if clients.is_empty() {
self.to_send_clients.remove(&key);
}
(key, client)
}

// If we managed to acquire some extra tokens, send them off to a waiting rustc.
fn grant_rustc_token_requests(&mut self) -> CargoResult<()> {
while !self.to_send_clients.is_empty() && self.has_extra_tokens() {
let (id, client) = self.pop_waiting_client();
// This unwrap is guaranteed to succeed. `active` must be at least
// length 1, as otherwise there can't be a client waiting to be sent
// on, so tokens.len() must also be at least one.
let token = self.tokens.pop().unwrap();
self.rustc_tokens
.entry(id)
.or_insert_with(Vec::new)
.push(token);
client
.release_raw()
.with_context(|| "failed to release jobserver token")?;
}

Ok(())
}

fn handle_event(
&mut self,
cx: &mut Context<'_, '_>,
jobserver_helper: &HelperThread,
plan: &mut BuildPlan,
event: Message,
) -> Result<(), ErrorToHandle> {
Expand Down Expand Up @@ -699,19 +647,6 @@ impl<'cfg> DrainState<'cfg> {
Artifact::All => {
trace!("end: {:?}", id);
self.finished += 1;
if let Some(rustc_tokens) = self.rustc_tokens.remove(&id) {
// This puts back the tokens that this rustc
// acquired into our primary token list.
//
// This represents a rustc bug: it did not
// release all of its thread tokens but finished
// completely. But we want to make Cargo resilient
// to such rustc bugs, as they're generally not
// fatal in nature (i.e., Cargo can make progress
// still, and the build might not even fail).
self.tokens.extend(rustc_tokens);
}
self.to_send_clients.remove(&id);
self.report_warning_count(
cx.bcx.config,
id,
Expand Down Expand Up @@ -756,31 +691,6 @@ impl<'cfg> DrainState<'cfg> {
let token = acquired_token.with_context(|| "failed to acquire jobserver token")?;
self.tokens.push(token);
}
Message::NeedsToken(id) => {
trace!("queue token request");
jobserver_helper.request_token();
let client = cx.rustc_clients[&self.active[&id]].clone();
self.to_send_clients
.entry(id)
.or_insert_with(Vec::new)
.push(client);
}
Message::ReleaseToken(id) => {
// Note that this pops off potentially a completely
// different token, but all tokens of the same job are
// conceptually the same so that's fine.
//
// self.tokens is a "pool" -- the order doesn't matter -- and
// this transfers ownership of the token into that pool. If we
// end up using it on the next go around, then this token will
// be truncated, same as tokens obtained through Message::Token.
let rustc_tokens = self
.rustc_tokens
.get_mut(&id)
.expect("no tokens associated");
self.tokens
.push(rustc_tokens.pop().expect("rustc releases token it has"));
}
}

Ok(())
Expand All @@ -795,19 +705,6 @@ impl<'cfg> DrainState<'cfg> {
// listen for a message with a timeout, and on timeout we run the
// previous parts of the loop again.
let mut events = self.messages.try_pop_all();
trace!(
"tokens in use: {}, rustc_tokens: {:?}, waiting_rustcs: {:?} (events this tick: {})",
self.tokens.len(),
self.rustc_tokens
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
self.to_send_clients
.iter()
.map(|(k, j)| (k, j.len()))
.collect::<Vec<_>>(),
events.len(),
);
if events.is_empty() {
loop {
self.tick_progress();
Expand Down Expand Up @@ -866,17 +763,13 @@ impl<'cfg> DrainState<'cfg> {
break;
}

if let Err(e) = self.grant_rustc_token_requests() {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, e);
}

// And finally, before we block waiting for the next event, drop any
// excess tokens we may have accidentally acquired. Due to how our
// jobserver interface is architected we may acquire a token that we
// don't actually use, and if this happens just relinquish it back
// to the jobserver itself.
for event in self.wait_for_events() {
if let Err(event_err) = self.handle_event(cx, jobserver_helper, plan, event) {
if let Err(event_err) = self.handle_event(cx, plan, event) {
self.handle_error(&mut cx.bcx.config.shell(), &mut errors, event_err);
}
}
Expand Down Expand Up @@ -970,7 +863,6 @@ impl<'cfg> DrainState<'cfg> {
self.active.len(),
self.pending_queue.len(),
self.queue.len(),
self.rustc_tokens.len(),
);
self.timings.record_cpu();

Expand Down
34 changes: 1 addition & 33 deletions src/cargo/core/compiler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -715,14 +715,7 @@ fn prepare_rustc(
base.env("CARGO_TARGET_TMPDIR", tmp.display().to_string());
}

if cx.bcx.config.cli_unstable().jobserver_per_rustc {
let client = cx.new_jobserver()?;
base.inherit_jobserver(&client);
base.arg("-Z").arg("jobserver-token-requests");
assert!(cx.rustc_clients.insert(unit.clone(), client).is_none());
} else {
base.inherit_jobserver(&cx.jobserver);
}
base.inherit_jobserver(&cx.jobserver);
build_base_args(cx, &mut base, unit, crate_types)?;
build_deps_args(&mut base, cx, unit)?;
Ok(base)
Expand Down Expand Up @@ -1701,31 +1694,6 @@ fn on_stderr_line_inner(
return Ok(false);
}

#[derive(serde::Deserialize)]
struct JobserverNotification {
jobserver_event: Event,
}

#[derive(Debug, serde::Deserialize)]
enum Event {
WillAcquire,
Release,
}

if let Ok(JobserverNotification { jobserver_event }) =
serde_json::from_str::<JobserverNotification>(compiler_message.get())
{
trace!(
"found jobserver directive from rustc: `{:?}`",
jobserver_event
);
match jobserver_event {
Event::WillAcquire => state.will_acquire(),
Event::Release => state.release_token(),
}
return Ok(false);
}

// And failing all that above we should have a legitimate JSON diagnostic
// from the compiler, so wrap it in an external Cargo JSON message
// indicating which package it came from and then emit it.
Expand Down
25 changes: 2 additions & 23 deletions src/cargo/core/compiler/timings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,6 @@ struct Concurrency {
/// Number of units that are not yet ready, because they are waiting for
/// dependencies to finish.
inactive: usize,
/// Number of rustc "extra" threads -- i.e., how many tokens have been
/// provided across all current rustc instances that are not the main thread
/// tokens.
rustc_parallelism: usize,
}

impl<'cfg> Timings<'cfg> {
Expand Down Expand Up @@ -240,13 +236,7 @@ impl<'cfg> Timings<'cfg> {
}

/// This is called periodically to mark the concurrency of internal structures.
pub fn mark_concurrency(
&mut self,
active: usize,
waiting: usize,
inactive: usize,
rustc_parallelism: usize,
) {
pub fn mark_concurrency(&mut self, active: usize, waiting: usize, inactive: usize) {
if !self.enabled {
return;
}
Expand All @@ -255,7 +245,6 @@ impl<'cfg> Timings<'cfg> {
active,
waiting,
inactive,
rustc_parallelism,
};
self.concurrency.push(c);
}
Expand Down Expand Up @@ -307,7 +296,7 @@ impl<'cfg> Timings<'cfg> {
if !self.enabled {
return Ok(());
}
self.mark_concurrency(0, 0, 0, 0);
self.mark_concurrency(0, 0, 0);
self.unit_times
.sort_unstable_by(|a, b| a.start.partial_cmp(&b.start).unwrap());
if self.report_html {
Expand Down Expand Up @@ -391,12 +380,6 @@ impl<'cfg> Timings<'cfg> {
let num_cpus = available_parallelism()
.map(|x| x.get().to_string())
.unwrap_or_else(|_| "n/a".into());
let max_rustc_concurrency = self
.concurrency
.iter()
.map(|c| c.rustc_parallelism)
.max()
.unwrap();
let rustc_info = render_rustc_info(bcx);
let error_msg = match error {
Some(e) => format!(
Expand Down Expand Up @@ -440,9 +423,6 @@ impl<'cfg> Timings<'cfg> {
<tr>
<td>rustc:</td><td>{}</td>
</tr>
<tr>
<td>Max (global) rustc threads concurrency:</td><td>{}</td>
</tr>
{}
</table>
"#,
Expand All @@ -457,7 +437,6 @@ impl<'cfg> Timings<'cfg> {
self.start_str,
total_time,
rustc_info,
max_rustc_concurrency,
error_msg,
)?;
Ok(())
Expand Down
Loading

0 comments on commit 7cb13bf

Please sign in to comment.