Skip to content

Commit

Permalink
Avoids emitting the same output from multiple processes of the same p…
Browse files Browse the repository at this point in the history
…rocess pool (#3531)

It uses a `shared` set of `(line: [u8], occurences: u32)` to avoid
emitting lines that has been emitted by other processes already.

It still supports emitting the same line multiple times from one
process.

The `shared` data might be later used for showing logging from a worker
pool. It contains the merged output of all processes.

Fixes WEB-489
  • Loading branch information
sokra committed Feb 1, 2023
1 parent bc9e7c6 commit 7177ae4
Showing 1 changed file with 70 additions and 4 deletions.
74 changes: 70 additions & 4 deletions crates/turbopack-node/src/pool.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
use std::{
collections::HashMap,
mem::take,
path::{Path, PathBuf},
process::{ExitStatus, Stdio},
sync::{Arc, Mutex},
time::Duration,
};

use anyhow::{bail, Context, Result};
use indexmap::IndexSet;
use serde::{de::DeserializeOwned, Serialize};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::{
stderr, stdout, AsyncBufReadExt, AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt,
BufReader,
},
net::{TcpListener, TcpStream},
process::{Child, Command},
select,
Expand Down Expand Up @@ -55,11 +60,53 @@ impl Drop for RunningNodeJsPoolProcess {

const CONNECT_TIMEOUT: Duration = Duration::from_secs(30);

type SharedOutputSet = Arc<Mutex<IndexSet<(Arc<[u8]>, u32)>>>;

/// Pipes the `stream` from `final_stream`, but uses `shared` to deduplicate
/// lines that has beem emitted by other `handle_output_stream` instances with
/// the same `shared` before.
async fn handle_output_stream(
stream: impl AsyncRead + Unpin,
shared: SharedOutputSet,
mut final_stream: impl AsyncWrite + Unpin,
) {
let mut buffered = BufReader::new(stream);
let mut own_output = HashMap::<Arc<[u8]>, u32>::new();
let mut buffer = Vec::new();
loop {
match buffered.read_until(b'\n', &mut buffer).await {
Ok(0) => {
break;
}
Err(err) => {
eprintln!("error reading from stream: {}", err);
break;
}
Ok(_) => {}
}
let line = Arc::from(take(&mut buffer).into_boxed_slice());
let occurance_number = *own_output
.entry(Arc::clone(&line))
.and_modify(|c| *c += 1)
.or_insert(0);
let new_line = {
let mut shared = shared.lock().unwrap();
shared.insert((line.clone(), occurance_number))
};
if new_line && final_stream.write(&line).await.is_err() {
// Whatever happened with stdout/stderr, we can't write to it anymore.
break;
}
}
}

impl NodeJsPoolProcess {
async fn new(
cwd: &Path,
env: &HashMap<String, String>,
entrypoint: &Path,
shared_stdout: SharedOutputSet,
shared_stderr: SharedOutputSet,
debug: bool,
) -> Result<Self> {
let listener = TcpListener::bind("127.0.0.1:0")
Expand All @@ -85,10 +132,21 @@ impl NodeJsPoolProcess {
.expect("the SystemRoot environment variable should always be set"),
);
cmd.envs(env);
cmd.stderr(Stdio::inherit());
cmd.stdout(Stdio::inherit());
cmd.stderr(Stdio::piped());
cmd.stdout(Stdio::piped());

let child = cmd.spawn().context("spawning node pooled process")?;
let mut child = cmd.spawn().context("spawning node pooled process")?;

tokio::spawn(handle_output_stream(
child.stdout.take().unwrap(),
shared_stdout,
stdout(),
));
tokio::spawn(handle_output_stream(
child.stderr.take().unwrap(),
shared_stderr,
stderr(),
));

Ok(Self::Spawned(SpawnedNodeJsPoolProcess {
listener,
Expand Down Expand Up @@ -184,6 +242,10 @@ pub struct NodeJsPool {
processes: Arc<Mutex<Vec<NodeJsPoolProcess>>>,
#[turbo_tasks(trace_ignore, debug_ignore)]
semaphore: Arc<Semaphore>,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stdout: SharedOutputSet,
#[turbo_tasks(trace_ignore, debug_ignore)]
shared_stderr: SharedOutputSet,
debug: bool,
}

Expand All @@ -203,6 +265,8 @@ impl NodeJsPool {
env,
processes: Arc::new(Mutex::new(Vec::new())),
semaphore: Arc::new(Semaphore::new(if debug { 1 } else { concurrency })),
shared_stdout: Arc::new(Mutex::new(IndexSet::new())),
shared_stderr: Arc::new(Mutex::new(IndexSet::new())),
debug,
}
}
Expand All @@ -220,6 +284,8 @@ impl NodeJsPool {
self.cwd.as_path(),
&self.env,
self.entrypoint.as_path(),
self.shared_stdout.clone(),
self.shared_stderr.clone(),
self.debug,
)
.await
Expand Down

0 comments on commit 7177ae4

Please sign in to comment.