diff --git a/packages/jest-worker/src/workers/ChildProcessWorker.ts b/packages/jest-worker/src/workers/ChildProcessWorker.ts index 46061e502c10..2b540c762c7c 100644 --- a/packages/jest-worker/src/workers/ChildProcessWorker.ts +++ b/packages/jest-worker/src/workers/ChildProcessWorker.ts @@ -6,6 +6,7 @@ */ import childProcess, {ChildProcess} from 'child_process'; +import {PassThrough} from 'stream'; import mergeStream from 'merge-stream'; import supportsColor from 'supports-color'; @@ -44,6 +45,7 @@ export default class ChildProcessWorker implements WorkerInterface { private _child!: ChildProcess; private _options: WorkerOptions; private _onProcessEnd!: OnEnd; + private _fakeStream: PassThrough | null; private _request: ChildMessage | null; private _retries!: number; private _stderr: ReturnType | null; @@ -51,6 +53,7 @@ export default class ChildProcessWorker implements WorkerInterface { constructor(options: WorkerOptions) { this._options = options; + this._fakeStream = null; this._request = null; this._stderr = null; this._stdout = null; @@ -75,7 +78,9 @@ export default class ChildProcessWorker implements WorkerInterface { if (child.stdout) { if (!this._stdout) { - this._stdout = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stdout = mergeStream(this._getFakeStream()); } this._stdout.add(child.stdout); @@ -83,7 +88,9 @@ export default class ChildProcessWorker implements WorkerInterface { if (child.stderr) { if (!this._stderr) { - this._stderr = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stderr = mergeStream(this._getFakeStream()); } this._stderr.add(child.stderr); @@ -119,6 +126,14 @@ export default class ChildProcessWorker implements WorkerInterface { } } + private _shutdown() { + // End the temporary streams so the merged streams end too + if (this._fakeStream) { + this._fakeStream.end(); + this._fakeStream = null; + } + } + onMessage(response: ParentMessage) { let error; @@ -171,6 +186,8 @@ export default class ChildProcessWorker implements WorkerInterface { if (this._request) { this._child.send(this._request); } + } else { + this._shutdown(); } } @@ -199,4 +216,11 @@ export default class ChildProcessWorker implements WorkerInterface { getStderr(): NodeJS.ReadableStream | null { return this._stderr; } + + private _getFakeStream() { + if (!this._fakeStream) { + this._fakeStream = new PassThrough(); + } + return this._fakeStream; + } } diff --git a/packages/jest-worker/src/workers/NodeThreadsWorker.ts b/packages/jest-worker/src/workers/NodeThreadsWorker.ts index d87801a66ee9..45d748e40e8a 100644 --- a/packages/jest-worker/src/workers/NodeThreadsWorker.ts +++ b/packages/jest-worker/src/workers/NodeThreadsWorker.ts @@ -6,6 +6,7 @@ */ import path from 'path'; +import {PassThrough} from 'stream'; // ESLint doesn't know about this experimental module // eslint-disable-next-line import/no-unresolved import {Worker} from 'worker_threads'; @@ -32,12 +33,14 @@ export default class ExperimentalWorker implements WorkerInterface { private _retries!: number; private _stderr: ReturnType | null; private _stdout: ReturnType | null; + private _fakeStream: PassThrough | null; constructor(options: WorkerOptions) { this._options = options; this._request = null; this._stderr = null; this._stdout = null; + this._fakeStream = null; this.initialize(); } @@ -62,7 +65,9 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._worker.stdout) { if (!this._stdout) { - this._stdout = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stdout = mergeStream(this._getFakeStream()); } this._stdout.add(this._worker.stdout); @@ -70,7 +75,9 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._worker.stderr) { if (!this._stderr) { - this._stderr = mergeStream(); + // We need to add a permanent stream to the merged stream to prevent it + // from ending when the subprocess stream ends + this._stderr = mergeStream(this._getFakeStream()); } this._stderr.add(this._worker.stderr); @@ -104,6 +111,14 @@ export default class ExperimentalWorker implements WorkerInterface { } } + private _shutdown() { + // End the permanent stream so the merged stream end too + if (this._fakeStream) { + this._fakeStream.end(); + this._fakeStream = null; + } + } + onMessage(response: ParentMessage) { let error; @@ -154,6 +169,8 @@ export default class ExperimentalWorker implements WorkerInterface { if (this._request) { this._worker.postMessage(this._request); } + } else { + this._shutdown(); } } @@ -183,4 +200,11 @@ export default class ExperimentalWorker implements WorkerInterface { getStderr(): NodeJS.ReadableStream | null { return this._stderr; } + + private _getFakeStream() { + if (!this._fakeStream) { + this._fakeStream = new PassThrough(); + } + return this._fakeStream; + } } diff --git a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js index afef3185f7df..8b1e2fad4c35 100644 --- a/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/ChildProcessWorker.test.js @@ -141,6 +141,7 @@ it('provides stdout and stderr from the child processes', async () => { forkInterface.emit('exit'); forkInterface.stdout.end('World!', {encoding: 'utf8'}); forkInterface.stderr.end('Workers!', {encoding: 'utf8'}); + forkInterface.emit('exit', 0); await expect(getStream(stdout)).resolves.toEqual('Hello World!'); await expect(getStream(stderr)).resolves.toEqual('Jest Workers!'); diff --git a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js index fa380bce7541..998f12c1f808 100644 --- a/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js +++ b/packages/jest-worker/src/workers/__tests__/NodeThreadsWorker.test.js @@ -153,6 +153,7 @@ it('provides stdout and stderr from the child processes', async () => { worker._worker.emit('exit'); worker._worker.stdout.end('World!', {encoding: 'utf8'}); worker._worker.stderr.end('Workers!', {encoding: 'utf8'}); + worker._worker.emit('exit', 0); await expect(getStream(stdout)).resolves.toEqual('Hello World!'); await expect(getStream(stderr)).resolves.toEqual('Jest Workers!');