From beffa709f059108a4e25be3722b45003dbfad63a Mon Sep 17 00:00:00 2001 From: Ward Peeters Date: Thu, 12 Dec 2019 10:47:02 +0100 Subject: [PATCH 1/7] feat(gatsby): enable external jobs with ipc --- .../src/utils/__tests__/jobs-manager.js | 145 +++++++++++++++++- packages/gatsby/src/utils/jobs-manager.js | 85 +++++++++- 2 files changed, 227 insertions(+), 3 deletions(-) diff --git a/packages/gatsby/src/utils/__tests__/jobs-manager.js b/packages/gatsby/src/utils/__tests__/jobs-manager.js index 1462add8c26c7..07256eb1f0694 100644 --- a/packages/gatsby/src/utils/__tests__/jobs-manager.js +++ b/packages/gatsby/src/utils/__tests__/jobs-manager.js @@ -11,6 +11,7 @@ jest.mock(`p-defer`, () => jest.mock(`gatsby-cli/lib/reporter`, () => { return { phantomActivity: jest.fn(), + warn: jest.fn(), } }) @@ -216,7 +217,9 @@ describe(`Jobs manager`, () => { try { await enqueueJob(jobArgs) } catch (err) { - expect(err).toMatchInlineSnapshot(`[WorkerError: An error occured]`) + expect(err).toMatchInlineSnapshot( + `[WorkerError: Error: An error occured]` + ) } try { await enqueueJob(jobArgs2) @@ -334,4 +337,144 @@ describe(`Jobs manager`, () => { expect(isJobStale({ inputPaths })).toBe(false) }) }) + + describe(`IPC jobs`, () => { + let listeners = [] + beforeAll(() => { + jest.useFakeTimers() + }) + + beforeEach(() => { + process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `true` + listeners = [] + process.on = (type, cb) => { + listeners.push(cb) + } + + process.send = jest.fn() + }) + + afterAll(() => { + delete process.env.ENABLE_GATSBY_EXTERNAL_JOBS + jest.useRealTimers() + }) + + it(`should schedule a remote job when ipc and env variable are enabled`, async () => { + const { enqueueJob } = jobManager + const jobArgs = createInternalMockJob() + + enqueueJob(jobArgs) + + jest.runAllTimers() + + expect(process.send).toHaveBeenCalled() + expect(process.send).toHaveBeenCalledWith({ + type: `JOB_CREATED`, + payload: jobArgs, + }) + + expect(listeners.length).toBe(1) + expect(worker.TEST_JOB).not.toHaveBeenCalled() + }) + + it(`should resolve a job when complete message is received`, async () => { + const { enqueueJob } = jobManager + const jobArgs = createInternalMockJob() + + const promise = enqueueJob(jobArgs) + jest.runAllTimers() + + listeners[0]({ + type: `JOB_COMPLETED`, + payload: { + id: jobArgs.id, + result: { + output: `hello`, + }, + }, + }) + + jest.runAllTimers() + + await expect(promise).resolves.toStrictEqual({ + output: `hello`, + }) + expect(worker.TEST_JOB).not.toHaveBeenCalled() + }) + + it(`should reject a job when failed message is received`, async () => { + const { enqueueJob } = jobManager + const jobArgs = createInternalMockJob() + + const promise = enqueueJob(jobArgs) + + jest.runAllTimers() + + listeners[0]({ + type: `JOB_FAILED`, + payload: { + id: jobArgs.id, + error: `JOB failed...`, + }, + }) + + jest.runAllTimers() + + await expect(promise).rejects.toStrictEqual( + new jobManager.WorkerError(`JOB failed...`) + ) + expect(worker.TEST_JOB).not.toHaveBeenCalled() + }) + + it(`should run the worker locally when it's not available externally`, async () => { + worker.TEST_JOB.mockReturnValue({ output: `myresult` }) + const { enqueueJob } = jobManager + const jobArgs = createInternalMockJob() + + const promise = enqueueJob(jobArgs) + + jest.runAllTimers() + + listeners[0]({ + type: `JOB_NOT_WHITELISTED`, + payload: { + id: jobArgs.id, + }, + }) + + jest.runAllTimers() + + await expect(promise).resolves.toStrictEqual({ output: `myresult` }) + expect(worker.TEST_JOB).toHaveBeenCalledTimes(1) + }) + + it(`shouldn't schedule a remote job when ipc is enabled and env variable is false`, async () => { + process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `false` + jest.useRealTimers() + const { enqueueJob } = jobManager + const jobArgs = createInternalMockJob() + + await enqueueJob(jobArgs) + + expect(process.send).not.toHaveBeenCalled() + expect(worker.TEST_JOB).toHaveBeenCalled() + }) + + it(`should warn when external jobs are enabled but ipc isn't used`, async () => { + process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `true` + process.send = null + jest.useRealTimers() + const { enqueueJob } = jobManager + const jobArgs = createInternalMockJob() + const jobArgs2 = createInternalMockJob({ + args: { key: `val` }, + }) + + await enqueueJob(jobArgs) + await enqueueJob(jobArgs2) + + expect(reporter.warn).toHaveBeenCalledTimes(1) + expect(worker.TEST_JOB).toHaveBeenCalled() + }) + }) }) diff --git a/packages/gatsby/src/utils/jobs-manager.js b/packages/gatsby/src/utils/jobs-manager.js index 47bb9b95e9968..475fe46b6127a 100644 --- a/packages/gatsby/src/utils/jobs-manager.js +++ b/packages/gatsby/src/utils/jobs-manager.js @@ -9,10 +9,21 @@ const reporter = require(`gatsby-cli/lib/reporter`) let activityForJobs = null let activeJobs = 0 +let hasShownIPCDisabledWarning = false + +const MESSAGE_TYPES = { + JOB_CREATED: `JOB_CREATED`, + JOB_COMPLETED: `JOB_COMPLETED`, + JOB_FAILED: `JOB_FAILED`, + JOB_NOT_WHITELISTED: `JOB_NOT_WHITELISTED`, +} /** @type {Map}>} */ const jobsInProcess = new Map() +let isListeningForMessages = false +const externalJobsMap = new Map() + /** * We want to use absolute paths to make sure they are on the filesystem * @@ -57,6 +68,10 @@ const createFileHash = path => hasha.fromFileSync(path, { algorithm: `sha1` }) /** @type {pDefer.DeferredPromise|null} */ let hasActiveJobs = null +const hasExternalJobsEnabled = () => + process.env.ENABLE_GATSBY_EXTERNAL_JOBS === `true` || + process.env.ENABLE_GATSBY_EXTERNAL_JOBS === `1` + /** * Get the local worker function and execute it on the user's machine * @@ -81,12 +96,60 @@ const runLocalWorker = async (workerFn, job) => { }) ) } catch (err) { - reject(err) + reject(new WorkerError(err)) } }) }) } +const listenForJobMessages = () => { + process.on(`message`, msg => { + if ( + msg && + msg.type && + msg.payload && + msg.payload.id && + externalJobsMap.has(msg.payload.id) + ) { + const { job, deferred } = externalJobsMap.get(msg.payload.id) + switch (msg.type) { + case MESSAGE_TYPES.JOB_COMPLETED: { + deferred.resolve(msg.payload.result) + break + } + case MESSAGE_TYPES.JOB_FAILED: { + deferred.reject(new WorkerError(msg.payload.error)) + break + } + case MESSAGE_TYPES.JOB_NOT_WHITELISTED: { + deferred.resolve(runJob(job, true)) + break + } + } + + externalJobsMap.delete(msg.payload.id) + } + }) +} + +/** + * @param {InternalJob} job + */ +const runExternalWorker = job => { + const deferred = pDefer() + externalJobsMap.set(job.id, { + job, + deferred, + }) + + process.send({ + type: MESSAGE_TYPES.JOB_CREATED, + payload: job, + }) + + return deferred.promise +} + /** * Make sure we have everything we need to run a job * If we do, run it locally. @@ -95,7 +158,7 @@ const runLocalWorker = async (workerFn, job) => { * @param {InternalJob} job * @return {Promise} */ -const runJob = job => { +const runJob = (job, forceLocal = false) => { const { plugin } = job try { const worker = require(path.posix.join(plugin.resolve, `gatsby-worker.js`)) @@ -103,6 +166,24 @@ const runJob = job => { throw new Error(`No worker function found for ${job.name}`) } + if (!forceLocal && hasExternalJobsEnabled()) { + if (process.send) { + if (!isListeningForMessages) { + isListeningForMessages = true + listenForJobMessages() + } + + return runExternalWorker(job) + } else { + // only show the offloading warning once + if (!hasShownIPCDisabledWarning) { + hasShownIPCDisabledWarning = true + reporter.warn( + `Offloading of a job failed as IPC could not be detected. Running job locally.` + ) + } + } + } return runLocalWorker(worker[job.name], job) } catch (err) { throw new Error( From 91832e1a0bd57845b011062af9b41d5a1096ee09 Mon Sep 17 00:00:00 2001 From: Ward Peeters Date: Fri, 24 Jan 2020 13:48:15 +0100 Subject: [PATCH 2/7] Update jobs-manager.js --- packages/gatsby/src/utils/jobs-manager.js | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/gatsby/src/utils/jobs-manager.js b/packages/gatsby/src/utils/jobs-manager.js index 475fe46b6127a..590332189cdb3 100644 --- a/packages/gatsby/src/utils/jobs-manager.js +++ b/packages/gatsby/src/utils/jobs-manager.js @@ -7,9 +7,6 @@ const _ = require(`lodash`) const { createContentDigest, slash } = require(`gatsby-core-utils`) const reporter = require(`gatsby-cli/lib/reporter`) -let activityForJobs = null -let activeJobs = 0 -let hasShownIPCDisabledWarning = false const MESSAGE_TYPES = { JOB_CREATED: `JOB_CREATED`, @@ -18,12 +15,15 @@ const MESSAGE_TYPES = { JOB_NOT_WHITELISTED: `JOB_NOT_WHITELISTED`, } +const externalJobsMap = new Map() +let activityForJobs = null +let activeJobs = 0 +let isListeningForMessages = false +let hasShownIPCDisabledWarning = false + /** @type {Map}>} */ const jobsInProcess = new Map() -let isListeningForMessages = false -const externalJobsMap = new Map() - /** * We want to use absolute paths to make sure they are on the filesystem * From 71179d6d3ea32c87f1749182a243d8258b525347 Mon Sep 17 00:00:00 2001 From: Ward Peeters Date: Fri, 24 Jan 2020 13:48:34 +0100 Subject: [PATCH 3/7] Update jobs-manager.js --- packages/gatsby/src/utils/jobs-manager.js | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/gatsby/src/utils/jobs-manager.js b/packages/gatsby/src/utils/jobs-manager.js index 590332189cdb3..c0e71bfd588de 100644 --- a/packages/gatsby/src/utils/jobs-manager.js +++ b/packages/gatsby/src/utils/jobs-manager.js @@ -7,7 +7,6 @@ const _ = require(`lodash`) const { createContentDigest, slash } = require(`gatsby-core-utils`) const reporter = require(`gatsby-cli/lib/reporter`) - const MESSAGE_TYPES = { JOB_CREATED: `JOB_CREATED`, JOB_COMPLETED: `JOB_COMPLETED`, From c1b7dde7095b6ad9b77331ad002c0e042a333465 Mon Sep 17 00:00:00 2001 From: Ward Peeters Date: Fri, 24 Jan 2020 13:50:32 +0100 Subject: [PATCH 4/7] add jsdoc --- packages/gatsby/src/utils/jobs-manager.js | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/gatsby/src/utils/jobs-manager.js b/packages/gatsby/src/utils/jobs-manager.js index c0e71bfd588de..957d816385fff 100644 --- a/packages/gatsby/src/utils/jobs-manager.js +++ b/packages/gatsby/src/utils/jobs-manager.js @@ -14,7 +14,6 @@ const MESSAGE_TYPES = { JOB_NOT_WHITELISTED: `JOB_NOT_WHITELISTED`, } -const externalJobsMap = new Map() let activityForJobs = null let activeJobs = 0 let isListeningForMessages = false @@ -22,6 +21,8 @@ let hasShownIPCDisabledWarning = false /** @type {Map}>} */ const jobsInProcess = new Map() +/** @type {Map}>} */ +const externalJobsMap = new Map() /** * We want to use absolute paths to make sure they are on the filesystem From 8c642901fd1e0a776615a1b84aa61edb1b660b9e Mon Sep 17 00:00:00 2001 From: Ward Peeters Date: Fri, 24 Jan 2020 13:54:27 +0100 Subject: [PATCH 5/7] reset process.send/on --- packages/gatsby/src/utils/__tests__/jobs-manager.js | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/gatsby/src/utils/__tests__/jobs-manager.js b/packages/gatsby/src/utils/__tests__/jobs-manager.js index 07256eb1f0694..208c622ec9165 100644 --- a/packages/gatsby/src/utils/__tests__/jobs-manager.js +++ b/packages/gatsby/src/utils/__tests__/jobs-manager.js @@ -344,9 +344,13 @@ describe(`Jobs manager`, () => { jest.useFakeTimers() }) + let originalProcessOn + let originalSend beforeEach(() => { process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `true` listeners = [] + originalProcessOn = process.on + originalSend = process.send process.on = (type, cb) => { listeners.push(cb) } @@ -357,6 +361,8 @@ describe(`Jobs manager`, () => { afterAll(() => { delete process.env.ENABLE_GATSBY_EXTERNAL_JOBS jest.useRealTimers() + process.on = originalProcessOn + process.send = originalSend }) it(`should schedule a remote job when ipc and env variable are enabled`, async () => { From a25faeba48b70f08d01382a09e080e30f0252259 Mon Sep 17 00:00:00 2001 From: Ward Peeters Date: Fri, 24 Jan 2020 14:11:28 +0100 Subject: [PATCH 6/7] bump ipc version --- packages/gatsby/ipc.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/gatsby/ipc.json b/packages/gatsby/ipc.json index 61a2092b1b7fa..85deee8e24190 100644 --- a/packages/gatsby/ipc.json +++ b/packages/gatsby/ipc.json @@ -1,3 +1,3 @@ { - "version": 1 + "version": 2 } From 208a840c0224e2b79360b1fafda501cbd3e8327e Mon Sep 17 00:00:00 2001 From: Ward Peeters Date: Fri, 24 Jan 2020 14:47:15 +0100 Subject: [PATCH 7/7] don't export local jobs --- .../src/utils/__tests__/jobs-manager.js | 25 +++++++++++++++++++ packages/gatsby/src/utils/jobs-manager.js | 2 +- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/packages/gatsby/src/utils/__tests__/jobs-manager.js b/packages/gatsby/src/utils/__tests__/jobs-manager.js index 208c622ec9165..4ead79c281863 100644 --- a/packages/gatsby/src/utils/__tests__/jobs-manager.js +++ b/packages/gatsby/src/utils/__tests__/jobs-manager.js @@ -25,6 +25,16 @@ jest.mock( { virtual: true } ) +jest.mock( + `/gatsby-plugin-local/gatsby-worker.js`, + () => { + return { + TEST_JOB: jest.fn(), + } + }, + { virtual: true } +) + jest.mock(`uuid/v4`, () => jest.fn().mockImplementation(jest.requireActual(`uuid/v4`)) ) @@ -454,6 +464,21 @@ describe(`Jobs manager`, () => { expect(worker.TEST_JOB).toHaveBeenCalledTimes(1) }) + it(`should run the worker locally when it's a local plugin`, async () => { + jest.useRealTimers() + const worker = require(`/gatsby-plugin-local/gatsby-worker.js`) + const { enqueueJob, createInternalJob } = jobManager + const jobArgs = createInternalJob(createMockJob(), { + name: `gatsby-plugin-local`, + version: `1.0.0`, + resolve: `/gatsby-plugin-local`, + }) + + await expect(enqueueJob(jobArgs)).resolves.toBeUndefined() + expect(process.send).not.toHaveBeenCalled() + expect(worker.TEST_JOB).toHaveBeenCalledTimes(1) + }) + it(`shouldn't schedule a remote job when ipc is enabled and env variable is false`, async () => { process.env.ENABLE_GATSBY_EXTERNAL_JOBS = `false` jest.useRealTimers() diff --git a/packages/gatsby/src/utils/jobs-manager.js b/packages/gatsby/src/utils/jobs-manager.js index 957d816385fff..60a6941e5e134 100644 --- a/packages/gatsby/src/utils/jobs-manager.js +++ b/packages/gatsby/src/utils/jobs-manager.js @@ -166,7 +166,7 @@ const runJob = (job, forceLocal = false) => { throw new Error(`No worker function found for ${job.name}`) } - if (!forceLocal && hasExternalJobsEnabled()) { + if (!forceLocal && !job.plugin.isLocal && hasExternalJobsEnabled()) { if (process.send) { if (!isListeningForMessages) { isListeningForMessages = true