diff --git a/index.js b/index.js index 3b16f7e..f09a62a 100644 --- a/index.js +++ b/index.js @@ -178,6 +178,9 @@ function onWorkerMessage (msg) { stream.emit(msg.name, msg.args) } break + case 'WARNING': + process.emitWarning(msg.err) + break default: destroy(stream, new Error('this should not happen: ' + msg.code)) } diff --git a/lib/worker.js b/lib/worker.js index 4421175..03d02d1 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -151,3 +151,19 @@ process.on('uncaughtException', function (err) { }) process.exit(1) }) + +process.once('exit', exitCode => { + if (exitCode !== 0) { + process.exit(exitCode) + return + } + + if (destination?.writableNeedDrain && !destination?.writableEnded) { + parentPort.postMessage({ + code: 'WARNING', + err: new Error('ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream') + }) + } + + process.exit(0) +}) diff --git a/test/never-drain.test.js b/test/never-drain.test.js new file mode 100644 index 0000000..eab0d49 --- /dev/null +++ b/test/never-drain.test.js @@ -0,0 +1,55 @@ +const { test } = require('tap') +const ThreadStream = require('../index') +const { join } = require('path') + +function retryUntilTimeout (fn, timeout) { + const start = Date.now() + return new Promise((resolve, reject) => { + async function run () { + if (fn()) { + resolve() + return + } + + if (Date.now() - start >= timeout) { + reject(new Error('timeout')) + return + } + setTimeout(run, 10) + } + + run() + }) +} + +test('emit warning when the worker gracefully exit without the stream ended', async function (t) { + const expectedWarning = 'ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream' + const stream = new ThreadStream({ + filename: join(__dirname, 'to-next.js') + }) + stream.unref() + + let streamWarning + function saveWarning (e) { + if (e.message === expectedWarning) { + streamWarning = e + } + } + process.on('warning', saveWarning) + + const data = 'hello'.repeat(10) + for (let i = 0; i < 1000; i++) { + if (streamWarning?.message === expectedWarning) { + break + } + stream.write(data) + await new Promise((resolve) => { + setTimeout(resolve, 1) + }) + } + + process.off('warning', saveWarning) + t.equal(streamWarning?.message, expectedWarning) + + await retryUntilTimeout(() => stream.worker.exited === true, 3000) +}) diff --git a/test/to-next.js b/test/to-next.js new file mode 100644 index 0000000..8148f45 --- /dev/null +++ b/test/to-next.js @@ -0,0 +1,9 @@ +'use strict' + +const { PassThrough } = require('stream') + +async function run (opts) { + return new PassThrough({}) +} + +module.exports = run