Skip to content

Commit 4f9866e

Browse files
authored
Merge pull request #878 from osiewicz/no_drops_on_implicit_job_token
feat: Use channels to maintain job tokens & reuse the implicit token without dropping it first
2 parents 2447a2b + 65ab371 commit 4f9866e

File tree

2 files changed

+164
-69
lines changed

2 files changed

+164
-69
lines changed

src/job_token.rs

+139
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
use jobserver::{Acquired, Client, HelperThread};
2+
use std::{
3+
env,
4+
mem::MaybeUninit,
5+
sync::{
6+
mpsc::{self, Receiver, Sender},
7+
Once,
8+
},
9+
};
10+
11+
pub(crate) struct JobToken {
12+
/// The token can either be a fresh token obtained from the jobserver or - if `token` is None - an implicit token for this process.
13+
/// Both are valid values to put into queue.
14+
token: Option<Acquired>,
15+
/// A pool to which `token` should be returned. `pool` is optional, as one might want to release a token straight away instead
16+
/// of storing it back in the pool - see [`Self::forget()`] function for that.
17+
pool: Option<Sender<Option<Result<Acquired, crate::Error>>>>,
18+
}
19+
20+
impl Drop for JobToken {
21+
fn drop(&mut self) {
22+
if let Some(pool) = &self.pool {
23+
// Always send back an Ok() variant as we know that the acquisition for this token has succeeded.
24+
let _ = pool.send(self.token.take().map(|token| Ok(token)));
25+
}
26+
}
27+
}
28+
29+
impl JobToken {
30+
/// Ensure that this token is not put back into queue once it's dropped.
31+
/// This also leads to releasing it sooner for other processes to use,
32+
/// which is a correct thing to do once it is known that there won't be
33+
/// any more token acquisitions.
34+
pub(crate) fn forget(&mut self) {
35+
self.pool.take();
36+
}
37+
}
38+
39+
/// A thin wrapper around jobserver's Client.
40+
/// It would be perfectly fine to just use jobserver's Client, but we also want to reuse
41+
/// our own implicit token assigned for this build script. This struct manages that and
42+
/// gives out tokens without exposing whether they're implicit tokens or tokens from jobserver.
43+
/// Furthermore, instead of giving up job tokens, it keeps them around
44+
/// for reuse if we know we're going to request another token after freeing the current one.
45+
pub(crate) struct JobTokenServer {
46+
helper: HelperThread,
47+
tx: Sender<Option<Result<Acquired, crate::Error>>>,
48+
rx: Receiver<Option<Result<Acquired, crate::Error>>>,
49+
}
50+
51+
impl JobTokenServer {
52+
pub(crate) fn new() -> &'static Self {
53+
jobserver()
54+
}
55+
fn new_inner(client: Client) -> Result<Self, crate::Error> {
56+
let (tx, rx) = mpsc::channel();
57+
// Push the implicit token. Since JobTokens only give back what they got,
58+
// there should be at most one global implicit token in the wild.
59+
tx.send(None).unwrap();
60+
let pool = tx.clone();
61+
let helper = client.into_helper_thread(move |acq| {
62+
let _ = pool.send(Some(acq.map_err(|e| e.into())));
63+
})?;
64+
Ok(Self { helper, tx, rx })
65+
}
66+
67+
pub(crate) fn acquire(&self) -> Result<JobToken, crate::Error> {
68+
let token = if let Ok(token) = self.rx.try_recv() {
69+
// Opportunistically check if there's a token that can be reused.
70+
token
71+
} else {
72+
// Cold path, request a token and block
73+
self.helper.request_token();
74+
self.rx.recv().unwrap()
75+
};
76+
let token = if let Some(token) = token {
77+
Some(token?)
78+
} else {
79+
None
80+
};
81+
Ok(JobToken {
82+
token,
83+
pool: Some(self.tx.clone()),
84+
})
85+
}
86+
}
87+
88+
/// Returns a suitable `JobTokenServer` used to coordinate
89+
/// parallelism between build scripts. A global `JobTokenServer` is used as this ensures
90+
/// that only one implicit job token is used in the wild.
91+
/// Having multiple separate job token servers would lead to each of them assuming that they have control
92+
/// over the implicit job token.
93+
/// As it stands, each caller of `jobserver` can receive an implicit job token and there will be at most
94+
/// one implicit job token in the wild.
95+
fn jobserver() -> &'static JobTokenServer {
96+
static INIT: Once = Once::new();
97+
static mut JOBSERVER: MaybeUninit<JobTokenServer> = MaybeUninit::uninit();
98+
99+
fn _assert_sync<T: Sync>() {}
100+
_assert_sync::<jobserver::Client>();
101+
102+
unsafe {
103+
INIT.call_once(|| {
104+
let server = default_jobserver();
105+
JOBSERVER = MaybeUninit::new(
106+
JobTokenServer::new_inner(server).expect("Job server initialization failed"),
107+
);
108+
});
109+
// Poor man's assume_init_ref, as that'd require a MSRV of 1.55.
110+
&*JOBSERVER.as_ptr()
111+
}
112+
}
113+
114+
unsafe fn default_jobserver() -> jobserver::Client {
115+
// Try to use the environmental jobserver which Cargo typically
116+
// initializes for us...
117+
if let Some(client) = jobserver::Client::from_env() {
118+
return client;
119+
}
120+
121+
// ... but if that fails for whatever reason select something
122+
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
123+
// configured by Cargo) and otherwise just fall back to a
124+
// semi-reasonable number. Note that we could use `num_cpus` here
125+
// but it's an extra dependency that will almost never be used, so
126+
// it's generally not too worth it.
127+
let mut parallelism = 4;
128+
if let Ok(amt) = env::var("NUM_JOBS") {
129+
if let Ok(amt) = amt.parse() {
130+
parallelism = amt;
131+
}
132+
}
133+
134+
// If we create our own jobserver then be sure to reserve one token
135+
// for ourselves.
136+
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
137+
client.acquire_raw().expect("failed to acquire initial");
138+
return client;
139+
}

src/lib.rs

+25-69
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,9 @@ use std::process::{Child, Command, Stdio};
6666
use std::sync::{Arc, Mutex};
6767
use std::thread::{self, JoinHandle};
6868

69+
#[cfg(feature = "parallel")]
70+
mod job_token;
6971
mod os_pipe;
70-
7172
// These modules are all glue to support reading the MSVC version from
7273
// the registry and from COM interfaces
7374
#[cfg(windows)]
@@ -1294,7 +1295,7 @@ impl Build {
12941295

12951296
#[cfg(feature = "parallel")]
12961297
fn compile_objects(&self, objs: &[Object], print: &PrintThread) -> Result<(), Error> {
1297-
use std::sync::{mpsc, Once};
1298+
use std::sync::mpsc;
12981299

12991300
if objs.len() <= 1 {
13001301
for obj in objs {
@@ -1305,14 +1306,8 @@ impl Build {
13051306
return Ok(());
13061307
}
13071308

1308-
// Limit our parallelism globally with a jobserver. Start off by
1309-
// releasing our own token for this process so we can have a bit of an
1310-
// easier to write loop below. If this fails, though, then we're likely
1311-
// on Windows with the main implicit token, so we just have a bit extra
1312-
// parallelism for a bit and don't reacquire later.
1313-
let server = jobserver();
1314-
// Reacquire our process's token on drop
1315-
let _reacquire = server.release_raw().ok().map(|_| JobserverToken(server));
1309+
// Limit our parallelism globally with a jobserver.
1310+
let tokens = crate::job_token::JobTokenServer::new();
13161311

13171312
// When compiling objects in parallel we do a few dirty tricks to speed
13181313
// things up:
@@ -1333,7 +1328,7 @@ impl Build {
13331328
// acquire the appropriate tokens, Once all objects have been compiled
13341329
// we wait on all the processes and propagate the results of compilation.
13351330

1336-
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, _)>();
1331+
let (tx, rx) = mpsc::channel::<(_, String, KillOnDrop, crate::job_token::JobToken)>();
13371332

13381333
// Since jobserver::Client::acquire can block, waiting
13391334
// must be done in parallel so that acquire won't block forever.
@@ -1346,7 +1341,13 @@ impl Build {
13461341

13471342
loop {
13481343
let mut has_made_progress = false;
1349-
1344+
// If the other end of the pipe is already disconnected, then we're not gonna get any new jobs,
1345+
// so it doesn't make sense to reuse the tokens; in fact,
1346+
// releasing them as soon as possible (once we know that the other end is disconnected) is beneficial.
1347+
// Imagine that the last file built takes an hour to finish; in this scenario,
1348+
// by not releasing the tokens before that last file is done we would effectively block other processes from
1349+
// starting sooner - even though we only need one token for that last file, not N others that were acquired.
1350+
let mut is_disconnected = false;
13501351
// Reading new pending tasks
13511352
loop {
13521353
match rx.try_recv() {
@@ -1362,23 +1363,32 @@ impl Build {
13621363
Ok(())
13631364
};
13641365
}
1366+
Err(mpsc::TryRecvError::Disconnected) => {
1367+
is_disconnected = true;
1368+
break;
1369+
}
13651370
_ => break,
13661371
}
13671372
}
13681373

13691374
// Try waiting on them.
1370-
pendings.retain_mut(|(cmd, program, child, _)| {
1375+
pendings.retain_mut(|(cmd, program, child, token)| {
13711376
match try_wait_on_child(cmd, program, &mut child.0, &mut stdout) {
13721377
Ok(Some(())) => {
13731378
// Task done, remove the entry
1379+
if is_disconnected {
1380+
token.forget();
1381+
}
13741382
has_made_progress = true;
13751383
false
13761384
}
13771385
Ok(None) => true, // Task still not finished, keep the entry
13781386
Err(err) => {
13791387
// Task fail, remove the entry.
13801388
has_made_progress = true;
1381-
1389+
if is_disconnected {
1390+
token.forget();
1391+
}
13821392
// Since we can only return one error, log the error to make
13831393
// sure users always see all the compilation failures.
13841394
let _ = writeln!(stdout, "cargo:warning={}", err);
@@ -1416,11 +1426,9 @@ impl Build {
14161426
};
14171427
}
14181428
})?;
1419-
14201429
for obj in objs {
14211430
let (mut cmd, program) = self.create_compile_object_cmd(obj)?;
1422-
let token = server.acquire()?;
1423-
1431+
let token = tokens.acquire()?;
14241432
let child = spawn(&mut cmd, &program, print.pipe_writer_cloned()?.unwrap())?;
14251433

14261434
tx.send((cmd, program, KillOnDrop(child), token))
@@ -1431,51 +1439,6 @@ impl Build {
14311439

14321440
return wait_thread.join().expect("wait_thread panics");
14331441

1434-
/// Returns a suitable `jobserver::Client` used to coordinate
1435-
/// parallelism between build scripts.
1436-
fn jobserver() -> &'static jobserver::Client {
1437-
static INIT: Once = Once::new();
1438-
static mut JOBSERVER: Option<jobserver::Client> = None;
1439-
1440-
fn _assert_sync<T: Sync>() {}
1441-
_assert_sync::<jobserver::Client>();
1442-
1443-
unsafe {
1444-
INIT.call_once(|| {
1445-
let server = default_jobserver();
1446-
JOBSERVER = Some(server);
1447-
});
1448-
JOBSERVER.as_ref().unwrap()
1449-
}
1450-
}
1451-
1452-
unsafe fn default_jobserver() -> jobserver::Client {
1453-
// Try to use the environmental jobserver which Cargo typically
1454-
// initializes for us...
1455-
if let Some(client) = jobserver::Client::from_env() {
1456-
return client;
1457-
}
1458-
1459-
// ... but if that fails for whatever reason select something
1460-
// reasonable and crate a new jobserver. Use `NUM_JOBS` if set (it's
1461-
// configured by Cargo) and otherwise just fall back to a
1462-
// semi-reasonable number. Note that we could use `num_cpus` here
1463-
// but it's an extra dependency that will almost never be used, so
1464-
// it's generally not too worth it.
1465-
let mut parallelism = 4;
1466-
if let Ok(amt) = env::var("NUM_JOBS") {
1467-
if let Ok(amt) = amt.parse() {
1468-
parallelism = amt;
1469-
}
1470-
}
1471-
1472-
// If we create our own jobserver then be sure to reserve one token
1473-
// for ourselves.
1474-
let client = jobserver::Client::new(parallelism).expect("failed to create jobserver");
1475-
client.acquire_raw().expect("failed to acquire initial");
1476-
return client;
1477-
}
1478-
14791442
struct KillOnDrop(Child);
14801443

14811444
impl Drop for KillOnDrop {
@@ -1485,13 +1448,6 @@ impl Build {
14851448
child.kill().ok();
14861449
}
14871450
}
1488-
1489-
struct JobserverToken(&'static jobserver::Client);
1490-
impl Drop for JobserverToken {
1491-
fn drop(&mut self) {
1492-
let _ = self.0.acquire_raw();
1493-
}
1494-
}
14951451
}
14961452

14971453
#[cfg(not(feature = "parallel"))]

0 commit comments

Comments
 (0)