diff --git a/src/unix.rs b/src/unix.rs index d69ae88..5011caa 100644 --- a/src/unix.rs +++ b/src/unix.rs @@ -23,6 +23,7 @@ pub struct Acquired { impl Client { pub fn new(limit: usize) -> io::Result { let client = unsafe { Client::mk()? }; + client.configure_capacity(limit)?; // I don't think the character written here matters, but I could be // wrong! for _ in 0..limit { @@ -63,6 +64,70 @@ impl Client { Ok(Client::from_fds(pipes[0], pipes[1])) } + fn configure_capacity(&self, required_capacity: usize) -> io::Result<()> { + // On Linux we may need to increase the capacity of the pipe for the + // jobserver to work correctly. Linux seems to exhibit behavior where it + // implements a ring-buffer internally but apparently the ring-ness of + // the ring-buffer is connected to *pages* of the ring buffer rather + // than actual bytes of the ring buffer. This means that if the pipe has + // only one page of capacity we can hit a possible deadlock situation + // where a bunch of threads are writing to the pipe but they're all + // blocked, despite the current used capacity of the pipe being less + // than a page. + // + // This was first discovered in rust-lang/cargo#9739 where a system with + // a large amount of concurrency would hang in `cargo build` when the + // jobserver pipe only had one page of capacity. This was reduced to a + // reproduction program [1] which indeed showed that the system would + // deadlock if the capacity of the pipe was just one page. + // + // To fix this issue, on Linux only, we may increase the capacity of the + // pipe. The main thing here is that if the capacity of the pipe is a + // single page we try to increase it to two pages, otherwise we fail + // because a deadlock might happen. While we're at it this goes ahead + // and factors in the `required_capacity` requested by the client to + // this calculation as well. If for some reason you want 10_000 units of + // concurrency in the pipe that means we'll need more than 2 pages + // (typically 8192 bytes), so we round that up to 3 pages as well. + // + // Someone with more understanding of linux pipes and how they buffer + // internally should probably review this at some point. The exact cause + // of the deadlock seems a little uncertain and it's not clear why the + // example program [1] deadlocks and why simply adding another page + // fixes things. Is this a kernel bug? Do we need to always guarantee at + // least one free page? I'm not sure! Hopefully for now this is enough + // to fix the problem until machines start having more than 4k cores, + // which seems like it might be awhile. + // + // [1]: https://github.com/rust-lang/cargo/issues/9739#issuecomment-889183009 + #[cfg(target_os = "linux")] + unsafe { + let page_size = libc::sysconf(libc::_SC_PAGESIZE); + let actual_capacity = cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_GETPIPE_SZ))?; + + if let Some(c) = calculate_capacity( + required_capacity, + actual_capacity as usize, + page_size as usize, + ) { + cvt(libc::fcntl(self.write.as_raw_fd(), libc::F_SETPIPE_SZ, c)).map_err(|e| { + io::Error::new( + e.kind(), + format!( + "failed to increase jobserver pipe capacity from {} to {}; \ + jobserver otherwise might deadlock", + actual_capacity, c, + ), + ) + + // ... + })?; + } + } + + Ok(()) + } + pub unsafe fn open(s: &str) -> Option { let mut parts = s.splitn(2, ','); let read = parts.next().unwrap(); @@ -337,3 +402,44 @@ extern "C" fn sigusr1_handler( ) { // nothing to do } + +#[allow(dead_code)] +fn calculate_capacity( + required_capacity: usize, + actual_capacity: usize, + page_size: usize, +) -> Option { + if actual_capacity < required_capacity { + let mut rounded_capacity = round_up_to(required_capacity, page_size); + if rounded_capacity < page_size * 2 { + rounded_capacity += page_size; + } + return Some(rounded_capacity); + } + + if actual_capacity <= page_size { + return Some(page_size * 2); + } + + return None; + + fn round_up_to(a: usize, b: usize) -> usize { + assert!(b.is_power_of_two()); + (a + (b - 1)) & (!(b - 1)) + } +} + +#[cfg(test)] +mod tests { + use super::calculate_capacity; + + #[test] + fn test_calculate_capacity() { + assert_eq!(calculate_capacity(1, 65536, 4096), None); + assert_eq!(calculate_capacity(500, 65536, 4096), None); + assert_eq!(calculate_capacity(5000, 4096, 4096), Some(8192)); + assert_eq!(calculate_capacity(1, 4096, 4096), Some(8192)); + assert_eq!(calculate_capacity(4096, 4096, 4096), Some(8192)); + assert_eq!(calculate_capacity(8192, 4096, 4096), Some(8192)); + } +} diff --git a/tests/server.rs b/tests/server.rs index fcdd12c..0194e91 100644 --- a/tests/server.rs +++ b/tests/server.rs @@ -156,3 +156,25 @@ fn zero_client() { assert!(rx.try_recv().is_err()); } } + +#[test] +fn highly_concurrent() { + const N: usize = 10000; + + let client = t!(Client::new(80)); + + let threads = (0..80) + .map(|_| { + let client = client.clone(); + std::thread::spawn(move || { + for _ in 0..N { + drop(client.acquire().unwrap()); + } + }) + }) + .collect::>(); + + for t in threads { + t.join().unwrap(); + } +}