diff --git a/packages/jest-worker/package.json b/packages/jest-worker/package.json index 6d741c362541..e5e86a2d0405 100644 --- a/packages/jest-worker/package.json +++ b/packages/jest-worker/package.json @@ -8,6 +8,7 @@ "license": "MIT", "main": "build/index.js", "dependencies": { + "jest-serializer": "^22.3.0", "merge-stream": "^1.0.1" } } diff --git a/packages/jest-worker/src/__tests__/channel.test.js b/packages/jest-worker/src/__tests__/channel.test.js new file mode 100644 index 000000000000..dee9cef5088c --- /dev/null +++ b/packages/jest-worker/src/__tests__/channel.test.js @@ -0,0 +1,95 @@ +/** + * Copyright (c) 2018-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + */ + +'use strict'; + +import EventEmitter from 'events'; +import Channel from '../channel'; +import serializer from 'jest-serializer'; + +let stream; +let channel; + +beforeEach(() => { + stream = Object.assign(new EventEmitter(), { + write: jest.fn(), + }); + + channel = new Channel(stream); +}); + +it('sends data prefixed with length', () => { + const data = { + bar: [0, true, '2'], + foo: 42, + }; + + const buffer = serializer.serialize(data); + + channel.send(data); + + // Verify that the length is sent first. + expect(stream.write.mock.calls[0]).toEqual([ + new Buffer([buffer.length, 0, 0, 0]), + ]); + + // Then the corresponding serialized message as a buffer. + expect(stream.write.mock.calls[1]).toEqual([buffer]); +}); + +it('processes a single message split into different buffers', () => { + const data = { + bar: [0, true, '2'], + foo: 42, + }; + + const message = serializer.serialize(data); + const received = jest.fn(); + + channel.on('message', received); + + // Just received the length. + stream.emit('data', Buffer.from([message.length, 0, 0, 0])); + expect(received).not.toBeCalled(); + + // Now received half of the buffer. + stream.emit('data', message.slice(0, message.length / 2)); + expect(received).not.toBeCalled(); + + // And now the full buffer. + stream.emit('data', message.slice(message.length / 2)); + expect(received).toHaveBeenCalledTimes(1); + + expect(received.mock.calls[0][0]).toEqual(data); +}); + +it('can process multiple messages into a single buffer', () => { + const data1 = { + bar: [0, true, '2'], + foo: 42, + }; + + const data2 = [null, 'red', {}]; + + const message1 = serializer.serialize(data1); + const message2 = serializer.serialize(data2); + const message = Buffer.allocUnsafe(8 + message1.length + message2.length); + const received = jest.fn(); + + message.writeUInt32LE(message1.length, 0); + message1.copy(message, 4); + + message.writeUInt32LE(message2.length, 4 + message1.length); + message2.copy(message, 8 + message1.length); + + channel.on('message', received); + stream.emit('data', message); + + expect(received).toHaveBeenCalledTimes(2); + expect(received.mock.calls[0][0]).toEqual(data1); + expect(received.mock.calls[1][0]).toEqual(data2); +}); diff --git a/packages/jest-worker/src/__tests__/child.test.js b/packages/jest-worker/src/__tests__/child.test.js index 6bd8e6e54af1..16baffbd7c53 100644 --- a/packages/jest-worker/src/__tests__/child.test.js +++ b/packages/jest-worker/src/__tests__/child.test.js @@ -7,10 +7,11 @@ 'use strict'; +import EventEmitter from 'events'; + const mockError = new TypeError('Booo'); const mockExtendedError = new ReferenceError('Booo extended'); const processExit = process.exit; -const processSend = process.send; const sleep = ms => new Promise(resolve => setTimeout(resolve, ms)); import { @@ -22,10 +23,13 @@ import { } from '../types'; let mockCount; +let channel; beforeEach(() => { mockCount = 0; + jest.mock('net').mock('../channel'); + jest.mock( '../my-fancy-worker', () => { @@ -90,26 +94,29 @@ beforeEach(() => { {virtual: true}, ); + require('../channel').default.mockImplementation(() => { + channel = Object.assign(new EventEmitter(), { + send: jest.fn(), + }); + + return channel; + }); + process.exit = jest.fn(); - process.send = jest.fn(); // Require the child! require('../child'); }); afterEach(() => { - jest.resetModules(); - - process.removeAllListeners('message'); - process.exit = processExit; - process.send = processSend; + jest.resetModules(); }); it('lazily requires the file', () => { expect(mockCount).toBe(0); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. './my-fancy-worker', @@ -117,7 +124,7 @@ it('lazily requires the file', () => { expect(mockCount).toBe(0); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooWorks', @@ -128,31 +135,31 @@ it('lazily requires the file', () => { }); it('returns results immediately when function is synchronous', () => { - process.send = jest.fn(); + channel.send = jest.fn(); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. './my-fancy-worker', ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooWorks', [], ]); - expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 1989]); + expect(channel.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 1989]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooThrows', [], ]); - expect(process.send.mock.calls[1][0]).toEqual([ + expect(channel.send.mock.calls[1][0]).toEqual([ PARENT_MESSAGE_ERROR, 'TypeError', 'Booo', @@ -160,14 +167,14 @@ it('returns results immediately when function is synchronous', () => { {}, ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooThrowsANumber', [], ]); - expect(process.send.mock.calls[2][0]).toEqual([ + expect(channel.send.mock.calls[2][0]).toEqual([ PARENT_MESSAGE_ERROR, 'Number', void 0, @@ -175,14 +182,14 @@ it('returns results immediately when function is synchronous', () => { 412, ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooThrowsAnErrorWithExtraProperties', [], ]); - expect(process.send.mock.calls[3][0]).toEqual([ + expect(channel.send.mock.calls[3][0]).toEqual([ PARENT_MESSAGE_ERROR, 'ReferenceError', 'Booo extended', @@ -190,32 +197,32 @@ it('returns results immediately when function is synchronous', () => { {baz: 123, qux: 456}, ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooThrowsNull', [], ]); - expect(process.send.mock.calls[4][0][0]).toBe(PARENT_MESSAGE_ERROR); - expect(process.send.mock.calls[4][0][1]).toBe('Error'); - expect(process.send.mock.calls[4][0][2]).toEqual( + expect(channel.send.mock.calls[4][0][0]).toBe(PARENT_MESSAGE_ERROR); + expect(channel.send.mock.calls[4][0][1]).toBe('Error'); + expect(channel.send.mock.calls[4][0][2]).toEqual( '"null" or "undefined" thrown', ); - expect(process.send).toHaveBeenCalledTimes(5); + expect(channel.send).toHaveBeenCalledTimes(5); }); it('returns results when it gets resolved if function is asynchronous', async () => { jest.useRealTimers(); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. './my-fancy-worker', ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooPromiseWorks', @@ -224,9 +231,9 @@ it('returns results when it gets resolved if function is asynchronous', async () await sleep(10); - expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 1989]); + expect(channel.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 1989]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'fooPromiseThrows', @@ -235,7 +242,7 @@ it('returns results when it gets resolved if function is asynchronous', async () await sleep(10); - expect(process.send.mock.calls[1][0]).toEqual([ + expect(channel.send.mock.calls[1][0]).toEqual([ PARENT_MESSAGE_ERROR, 'TypeError', 'Booo', @@ -243,51 +250,51 @@ it('returns results when it gets resolved if function is asynchronous', async () {}, ]); - expect(process.send).toHaveBeenCalledTimes(2); + expect(channel.send).toHaveBeenCalledTimes(2); }); it('calls the main module if the method call is "default"', () => { - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. './my-fancy-standalone-worker', ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'default', [], ]); - expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 12345]); + expect(channel.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 12345]); }); it('calls the main export if the method call is "default" and it is a Babel transpiled one', () => { - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. './my-fancy-babel-worker', ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_CALL, true, // Not really used here, but for flow type purity. 'default', [], ]); - expect(process.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 67890]); + expect(channel.send.mock.calls[0][0]).toEqual([PARENT_MESSAGE_OK, 67890]); }); it('finishes the process with exit code 0 if requested', () => { - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_INITIALIZE, true, // Not really used here, but for flow type purity. './my-fancy-worker', ]); - process.emit('message', [ + channel.emit('message', [ CHILD_MESSAGE_END, true, // Not really used here, but for flow type purity. ]); @@ -298,34 +305,6 @@ it('finishes the process with exit code 0 if requested', () => { it('throws if an invalid message is detected', () => { // Type 27 does not exist. expect(() => { - process.emit('message', [27]); + channel.emit('message', [27]); }).toThrow(TypeError); }); - -it('throws if child is not forked', () => { - delete process.send; - - process.emit('message', [ - CHILD_MESSAGE_INITIALIZE, - true, // Not really used here, but for flow type purity. - './my-fancy-worker', - ]); - - expect(() => { - process.emit('message', [ - CHILD_MESSAGE_CALL, - true, // Not really used here, but for flow type purity. - 'fooWorks', - [], - ]); - }).toThrow(); - - expect(() => { - process.emit('message', [ - CHILD_MESSAGE_CALL, - true, // Not really used here, but for flow type purity. - 'fooThrows', - [], - ]); - }).toThrow(); -}); diff --git a/packages/jest-worker/src/__tests__/worker.test.js b/packages/jest-worker/src/__tests__/worker.test.js index aacba5def4bf..ce2b857a7d27 100644 --- a/packages/jest-worker/src/__tests__/worker.test.js +++ b/packages/jest-worker/src/__tests__/worker.test.js @@ -18,26 +18,39 @@ import { PARENT_MESSAGE_OK, } from '../types'; +let Channel; let Worker; + let forkInterface; let childProcess; let originalExecArgv; +let channel; beforeEach(() => { - jest.mock('child_process'); + jest.mock('child_process').mock('../channel'); + originalExecArgv = process.execArgv; childProcess = require('child_process'); childProcess.fork.mockImplementation(() => { forkInterface = Object.assign(new EventEmitter(), { - send: jest.fn(), stderr: {}, + stdio: [{}, {}, {}, {}, {}], stdout: {}, }); return forkInterface; }); + Channel = require('../channel'); + Channel.default.mockImplementation(() => { + channel = Object.assign(new EventEmitter(), { + send: jest.fn(), + }); + + return channel; + }); + Worker = require('../worker').default; }); @@ -50,14 +63,16 @@ it('passes fork options down to child_process.fork, adding the defaults', () => const child = require.resolve('../child'); process.execArgv = ['--inspect', '-p']; + process.env.JEST_WORKER_ID = '123'; new Worker({ forkOptions: { cwd: '/tmp', execPath: 'hello', + stdio: 'ignore', }, maxRetries: 3, - workerId: process.env.JEST_WORKER_ID, + workerId: '123', workerPath: '/tmp/foo/bar/baz.js', }); @@ -67,7 +82,7 @@ it('passes fork options down to child_process.fork, adding the defaults', () => env: process.env, // Default option. execArgv: ['-p'], // Filtered option. execPath: 'hello', // Added option. - silent: true, // Default option. + stdio: ['ignore', 'ignore', 'ignore', 'ipc', 'pipe'], // Overridden option. }); }); @@ -89,7 +104,7 @@ it('initializes the child process with the given workerPath', () => { workerPath: '/tmp/foo/bar/baz.js', }); - expect(forkInterface.send.mock.calls[0][0]).toEqual([ + expect(channel.send.mock.calls[0][0]).toEqual([ CHILD_MESSAGE_INITIALIZE, false, '/tmp/foo/bar/baz.js', @@ -163,7 +178,7 @@ it('sends the task to the child process', () => { worker.send(request, () => {}); // Skipping call "0" because it corresponds to the "initialize" one. - expect(forkInterface.send.mock.calls[1][0]).toEqual(request); + expect(channel.send.mock.calls[1][0]).toEqual(request); }); it('relates replies to requests, in order', () => { @@ -186,7 +201,7 @@ it('relates replies to requests, in order', () => { expect(request2[1]).toBe(false); // then first call replies... - forkInterface.emit('message', [PARENT_MESSAGE_OK, 44]); + channel.emit('message', [PARENT_MESSAGE_OK, 44]); expect(callback1.mock.calls[0][0]).toBeFalsy(); expect(callback1.mock.calls[0][1]).toBe(44); @@ -196,7 +211,7 @@ it('relates replies to requests, in order', () => { expect(request2[1]).toBe(true); // and then the second call replies... - forkInterface.emit('message', [ + channel.emit('message', [ PARENT_MESSAGE_ERROR, 'TypeError', 'foo', @@ -222,7 +237,7 @@ it('creates error instances for known errors', () => { // Testing a generic ECMAScript error. worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback1); - forkInterface.emit('message', [ + channel.emit('message', [ PARENT_MESSAGE_ERROR, 'TypeError', 'bar', @@ -238,7 +253,7 @@ it('creates error instances for known errors', () => { // Testing a custom error. worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback2); - forkInterface.emit('message', [ + channel.emit('message', [ PARENT_MESSAGE_ERROR, 'RandomCustomError', 'bar', @@ -255,13 +270,7 @@ it('creates error instances for known errors', () => { // Testing a non-object throw. worker.send([CHILD_MESSAGE_CALL, false, 'method', []], callback3); - forkInterface.emit('message', [ - PARENT_MESSAGE_ERROR, - 'Number', - null, - null, - 412, - ]); + channel.emit('message', [PARENT_MESSAGE_ERROR, 'Number', null, null, 412]); expect(callback3.mock.calls[0][0]).toBe(412); }); @@ -277,7 +286,7 @@ it('throws when the child process returns a strange message', () => { // Type 27 does not exist. expect(() => { - forkInterface.emit('message', [27]); + channel.emit('message', [27]); }).toThrow(TypeError); }); diff --git a/packages/jest-worker/src/channel.js b/packages/jest-worker/src/channel.js new file mode 100644 index 000000000000..800d4fe5c61e --- /dev/null +++ b/packages/jest-worker/src/channel.js @@ -0,0 +1,92 @@ +/** + * Copyright (c) 2017-present, Facebook, Inc. All rights reserved. + * + * This source code is licensed under the MIT license found in the + * LICENSE file in the root directory of this source tree. + * + * @flow + */ + +'use strict'; + +import EventEmitter from 'events'; +import serializer from 'jest-serializer'; + +import type {Duplex} from 'stream'; + +/** + * This class provides an thin layer to send objects over a socket by using the + * best available serialization mechanism, which is in turn provided by the + * jest-serializer module. + * + * In order to send, the payload is encoded as a buffer, and a 4-byte unsigned + * integer with the size is added at the front. When receiving, buffers are + * saved until there is enough information to parse a payload, then a "message" + * event is emitted. + */ +export default class extends EventEmitter { + _size: Buffer; + _stream: Duplex; + + _receiveBuffers: Array; + _receiveBytes: number; + _receiveSize: number; + + constructor(stream: Duplex) { + super(); + + this._size = new Buffer(4); + this._stream = stream; + this._clearReceive(); + + stream.on('data', this._receive.bind(this)); + } + + send(message: any) { + const buffer = serializer.serialize(message); + const size = this._size; + const stream = this._stream; + + size.writeUInt32LE(buffer.length, 0); + + stream.write(size); + stream.write(buffer); + } + + _receive(buffer: Buffer) { + // First time receiving, so the size is encoded at the four first bytes. + if (!this._receiveBytes) { + this._receiveSize = buffer.readUInt32LE(0); + } + + const receiveBuffers = this._receiveBuffers; + const receiveBytes = (this._receiveBytes += buffer.length); + const receiveSize = this._receiveSize; + + receiveBuffers.push(buffer); + + // Not enough data yet; so just store the buffer in the list and return. + if (receiveBytes < receiveSize) { + return; + } + + const all = Buffer.concat(receiveBuffers, receiveBytes); + const message = all.slice(4, receiveSize + 4); + const rest = all.slice(receiveSize + 4); + + this.emit('message', serializer.deserialize(message)); + + // Clean the queue and call recursively if there is any data left. + this._clearReceive(); + + if (rest.length) { + this._receive(rest); + } + } + + _clearReceive() { + this._receiveBuffers = []; + this._receiveBytes = 0; + this._receiveSize = 0; + } +} diff --git a/packages/jest-worker/src/child.js b/packages/jest-worker/src/child.js index dc372bf9a088..569d55b02ee4 100644 --- a/packages/jest-worker/src/child.js +++ b/packages/jest-worker/src/child.js @@ -9,6 +9,9 @@ 'use strict'; +import Channel from './channel'; +import {Socket} from 'net'; + import { CHILD_MESSAGE_CALL, CHILD_MESSAGE_END, @@ -32,7 +35,9 @@ let file = null; * If an invalid message is detected, the child will exit (by throwing) with a * non-zero exit code. */ -process.on('message', (request: any /* Should be ChildMessage */) => { +const channel = new Channel(new Socket({fd: 4})); + +channel.on('message', (request: any /* Should be ChildMessage */) => { switch (request[0]) { case CHILD_MESSAGE_INITIALIZE: file = request[2]; @@ -54,23 +59,15 @@ process.on('message', (request: any /* Should be ChildMessage */) => { }); function reportSuccess(result: any) { - if (!process || !process.send) { - throw new Error('Child can only be used on a forked process'); - } - - process.send([PARENT_MESSAGE_OK, result]); + channel.send([PARENT_MESSAGE_OK, result]); } function reportError(error: Error) { - if (!process || !process.send) { - throw new Error('Child can only be used on a forked process'); - } - if (error == null) { error = new Error('"null" or "undefined" thrown'); } - process.send([ + channel.send([ PARENT_MESSAGE_ERROR, error.constructor && error.constructor.name, error.message, diff --git a/packages/jest-worker/src/worker.js b/packages/jest-worker/src/worker.js index 80562376828b..a0fa67423e06 100644 --- a/packages/jest-worker/src/worker.js +++ b/packages/jest-worker/src/worker.js @@ -10,6 +10,7 @@ 'use strict'; import childProcess from 'child_process'; +import Channel from './channel'; import { CHILD_MESSAGE_INITIALIZE, @@ -47,6 +48,7 @@ import type { */ export default class { _busy: boolean; + _channel: Channel; _child: ChildProcess; _last: ?QueueChildMessage; _options: WorkerOptions; @@ -82,30 +84,47 @@ export default class { } _initialize() { + const forkOptions = this._options.forkOptions || {}; + const forkStdio = forkOptions.stdio || []; + + const stdio = + typeof forkStdio === 'string' + ? [forkStdio, forkStdio, forkStdio] + : forkStdio.concat('pipe', 'pipe', 'pipe').slice(0, 3); + const child = childProcess.fork( require.resolve('./child'), // $FlowFixMe: Flow does not work well with Object.assign. Object.assign( { + // Node JS implementation defaults cwd to "undefined". cwd: process.cwd(), + + // Add custom id to the worker. env: Object.assign({}, process.env, { JEST_WORKER_ID: this._options.workerId, }), - // Suppress --debug / --inspect flags while preserving others (like --harmony). + + // Suppress --debug / --inspect flags while preserving others. execArgv: process.execArgv.filter(v => !/^--(debug|inspect)/.test(v)), - silent: true, }, - this._options.forkOptions, + forkOptions, + { + // Node's IPC channel is unused, but forced by Node's API. + stdio: stdio.concat(['ipc', 'pipe']), + }, ), ); - child.on('message', this._receive.bind(this)); + const channel = new Channel(child.stdio[4]); + + channel.on('message', this._receive.bind(this)); child.on('exit', this._exit.bind(this)); - // $FlowFixMe: wrong "ChildProcess.send" signature. - child.send([CHILD_MESSAGE_INITIALIZE, false, this._options.workerPath]); + channel.send([CHILD_MESSAGE_INITIALIZE, false, this._options.workerPath]); this._retries++; + this._channel = channel; this._child = child; this._busy = false; @@ -148,8 +167,7 @@ export default class { this._retries = 0; this._busy = true; - // $FlowFixMe: wrong "ChildProcess.send" signature. - this._child.send(item.request); + this._channel.send(item.request); } else { this._last = item; }