From c16fc93a98aa1d432baeb3351092ed32e2ae5bd6 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Wed, 9 Aug 2023 21:46:24 +0300 Subject: [PATCH 1/5] emit warning when worker gracefully exit before finish --- index.js | 3 +++ lib/worker.js | 16 ++++++++++++++++ test/never-drain.test.js | 40 ++++++++++++++++++++++++++++++++++++++++ test/to-next.js | 9 +++++++++ 4 files changed, 68 insertions(+) create mode 100644 test/never-drain.test.js create mode 100644 test/to-next.js diff --git a/index.js b/index.js index 3b16f7e..f09a62a 100644 --- a/index.js +++ b/index.js @@ -178,6 +178,9 @@ function onWorkerMessage (msg) { stream.emit(msg.name, msg.args) } break + case 'WARNING': + process.emitWarning(msg.err) + break default: destroy(stream, new Error('this should not happen: ' + msg.code)) } diff --git a/lib/worker.js b/lib/worker.js index 4421175..03d02d1 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -151,3 +151,19 @@ process.on('uncaughtException', function (err) { }) process.exit(1) }) + +process.once('exit', exitCode => { + if (exitCode !== 0) { + process.exit(exitCode) + return + } + + if (destination?.writableNeedDrain && !destination?.writableEnded) { + parentPort.postMessage({ + code: 'WARNING', + err: new Error('ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream') + }) + } + + process.exit(0) +}) diff --git a/test/never-drain.test.js b/test/never-drain.test.js new file mode 100644 index 0000000..9531664 --- /dev/null +++ b/test/never-drain.test.js @@ -0,0 +1,40 @@ +const { test } = require('tap') +const ThreadStream = require('../index') +const { join } = require('path') + +function sleep (ms) { + return new Promise((resolve) => { + setTimeout(resolve, ms) + }) +} + +test('emit warning when the worker gracefully exit without the stream ended', async function (t) { + const expectedWarning = 'ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream' + const stream = new ThreadStream({ + filename: join(__dirname, 'to-next.js') + }) + + let streamWarning + function saveWarning (e) { + if (e.message === expectedWarning) { + streamWarning = e + } + } + process.on('warning', saveWarning) + + for (let i = 0; i < 10_000; i++) { + if (streamWarning?.message === expectedWarning) { + break + } + stream.write('hello') + await new Promise((resolve) => { + setTimeout(resolve, 1) + }) + } + + process.off('warning', saveWarning) + await sleep(10) + + t.equal(stream.destroyed, true) + t.equal(streamWarning?.message, expectedWarning) +}) diff --git a/test/to-next.js b/test/to-next.js new file mode 100644 index 0000000..8148f45 --- /dev/null +++ b/test/to-next.js @@ -0,0 +1,9 @@ +'use strict' + +const { PassThrough } = require('stream') + +async function run (opts) { + return new PassThrough({}) +} + +module.exports = run From 26f2291e16f50a9a8b826293b13ca7d918ceee5d Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 10 Aug 2023 17:20:47 +0300 Subject: [PATCH 2/5] try making tests less flaky --- test/never-drain.test.js | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/test/never-drain.test.js b/test/never-drain.test.js index 9531664..fbf809a 100644 --- a/test/never-drain.test.js +++ b/test/never-drain.test.js @@ -2,9 +2,23 @@ const { test } = require('tap') const ThreadStream = require('../index') const { join } = require('path') -function sleep (ms) { - return new Promise((resolve) => { - setTimeout(resolve, ms) +function retryUntilTimeout (fn, timeout) { + const start = Date.now() + return new Promise((resolve, reject) => { + async function run () { + if (fn()) { + resolve() + return + } + + if (Date.now() - start >= timeout) { + reject(new Error('timeout')) + return + } + setTimeout(run, 10) + } + + run() }) } @@ -33,8 +47,7 @@ test('emit warning when the worker gracefully exit without the stream ended', as } process.off('warning', saveWarning) - await sleep(10) - - t.equal(stream.destroyed, true) t.equal(streamWarning?.message, expectedWarning) + + await retryUntilTimeout(() => stream.destroyed === true, 3000) }) From d4b2f9a0f9710096f921d520146c937d6c39d72c Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 10 Aug 2023 17:31:43 +0300 Subject: [PATCH 3/5] try making tests less flaky --- test/never-drain.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/never-drain.test.js b/test/never-drain.test.js index fbf809a..1b31b04 100644 --- a/test/never-drain.test.js +++ b/test/never-drain.test.js @@ -49,5 +49,5 @@ test('emit warning when the worker gracefully exit without the stream ended', as process.off('warning', saveWarning) t.equal(streamWarning?.message, expectedWarning) - await retryUntilTimeout(() => stream.destroyed === true, 3000) + await retryUntilTimeout(() => stream.worker.exited === true, 3000) }) From 1bcc1e07db1d5d602609941dc8c21fbca62b0e25 Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 10 Aug 2023 19:58:02 +0300 Subject: [PATCH 4/5] try making tests less flaky --- test/never-drain.test.js | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/never-drain.test.js b/test/never-drain.test.js index 1b31b04..eab0d49 100644 --- a/test/never-drain.test.js +++ b/test/never-drain.test.js @@ -27,6 +27,7 @@ test('emit warning when the worker gracefully exit without the stream ended', as const stream = new ThreadStream({ filename: join(__dirname, 'to-next.js') }) + stream.unref() let streamWarning function saveWarning (e) { @@ -36,11 +37,12 @@ test('emit warning when the worker gracefully exit without the stream ended', as } process.on('warning', saveWarning) - for (let i = 0; i < 10_000; i++) { + const data = 'hello'.repeat(10) + for (let i = 0; i < 1000; i++) { if (streamWarning?.message === expectedWarning) { break } - stream.write('hello') + stream.write(data) await new Promise((resolve) => { setTimeout(resolve, 1) }) From a8f3f8d510026bb08fd737715d1433841f3b0cbc Mon Sep 17 00:00:00 2001 From: Raz Luvaton <16746759+rluvaton@users.noreply.github.com> Date: Thu, 10 Aug 2023 20:04:32 +0300 Subject: [PATCH 5/5] trigger GitHub Actions CI