Skip to content

Commit

Permalink
Avoids emitting the same output from multiple processes of the same p…
Browse files Browse the repository at this point in the history
…rocess pool (vercel/turborepo#3531)

It uses a `shared` set of `(line: [u8], occurences: u32)` to avoid
emitting lines that has been emitted by other processes already.

It still supports emitting the same line multiple times from one
process.

The `shared` data might be later used for showing logging from a worker
pool. It contains the merged output of all processes.

Fixes WEB-489
  • Loading branch information
sokra committed Feb 1, 2023
1 parent 442aac1 commit 316c73e
Showing 1 changed file with 70 additions and 4 deletions.
74 changes: 70 additions & 4 deletions crates/turbopack-node/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use std::{
collections::HashMap,
mem::take,
path::{Path, PathBuf},
process::{ExitStatus, Stdio},
sync::{Arc, Mutex},
time::Duration,
};

use anyhow::{bail, Context, Result};
use indexmap::IndexSet;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{
stderr, stdout, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
BufReader,
},
net::{TcpListener, TcpStream},
process::{Child, Command},
select,
Expand Down Expand Up @@ -55,11 +60,53 @@ impl Drop for RunningNodeJsPoolProcess {

const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);

type SharedOutputSet = Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>;

/// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
/// lines that has beem emitted by other `handle_output_stream` instances with
/// the same `shared` before.
async fn handle_output_stream(
stream: impl AsyncRead + Unpin,
shared: SharedOutputSet,
mut final_stream: impl AsyncWrite + Unpin,
) {
let mut buffered = BufReader::new(stream);
let mut own_output = HashMap::<Arc<[u8]>, u32>::new();
let mut buffer = Vec::new();
loop {
match buffered.read_until(b'\n', &mut buffer).await {
Ok(0) => {
break;
}
Err(err) => {
eprintln!("error reading from stream: {}", err);
break;
}
Ok(_) => {}
}
let line = Arc::from(take(&mut buffer).into_boxed_slice());
let occurance_number = *own_output
.entry(Arc::clone(&line))
.and_modify(|c| *c += 1)
.or_insert(0);
let new_line = {
let mut shared = shared.lock().unwrap();
shared.insert((line.clone(), occurance_number))
};
if new_line && final_stream.write(&line).await.is_err() {
// Whatever happened with stdout/stderr, we can't write to it anymore.
break;
}
}
}

impl NodeJsPoolProcess {
async fn new(
cwd: &Path,
env: &HashMap<String, String>,
entrypoint: &Path,
shared_stdout: SharedOutputSet,
shared_stderr: SharedOutputSet,
debug: bool,
) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")
Expand All @@ -85,10 +132,21 @@ impl NodeJsPoolProcess {
.expect("the SystemRoot environment variable should always be set"),
);
cmd.envs(env);
cmd.stderr(Stdio::inherit());
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());

let child = cmd.spawn().context("spawning node pooled process")?;
let mut child = cmd.spawn().context("spawning node pooled process")?;

tokio::spawn(handle_output_stream(
child.stdout.take().unwrap(),
shared_stdout,
stdout(),
));
tokio::spawn(handle_output_stream(
child.stderr.take().unwrap(),
shared_stderr,
stderr(),
));

Ok(Self::Spawned(SpawnedNodeJsPoolProcess {
listener,
Expand Down Expand Up @@ -184,6 +242,10 @@ pub struct NodeJsPool {
processes: Arc<Mutex<Vec<NodeJsPoolProcess>>>,
#[turbo_tasks(trace_ignore, debug_ignore)]
semaphore: Arc<Semaphore>,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stdout: SharedOutputSet,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stderr: SharedOutputSet,
debug: bool,
}

Expand All @@ -203,6 +265,8 @@ impl NodeJsPool {
env,
processes: Arc::new(Mutex::new(Vec::new())),
semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })),
shared_stdout: Arc::new(Mutex::new(IndexSet::new())),
shared_stderr: Arc::new(Mutex::new(IndexSet::new())),
debug,
}
}
Expand All @@ -220,6 +284,8 @@ impl NodeJsPool {
self.cwd.as_path(),
&self.env,
self.entrypoint.as_path(),
self.shared_stdout.clone(),
self.shared_stderr.clone(),
self.debug,
)
.await
Expand Down

0 comments on commit 316c73e

Please sign in to comment.