Skip to content

Commit

Permalink
emit warning when worker gracefully exit before finish
Browse files Browse the repository at this point in the history
  • Loading branch information
rluvaton committed Aug 9, 2023
1 parent a22b342 commit 7706910
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 0 deletions.
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ function onWorkerMessage (msg) {
stream.emit(msg.name, msg.args)
}
break
case 'WARNING':
process.emitWarning(msg.err)
break
default:
destroy(stream, new Error('this should not happen: ' + msg.code))
}
Expand Down
16 changes: 16 additions & 0 deletions lib/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,19 @@ process.on('uncaughtException', function (err) {
})
process.exit(1)
})

process.once('exit', exitCode => {
if (exitCode !== 0) {
process.exit(exitCode)
return
}

if (destination?.writableNeedDrain && !destination?.writableEnded) {
parentPort.postMessage({
code: 'WARNING',
err: new Error('ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream')
})
}

process.exit(0)
})
40 changes: 40 additions & 0 deletions test/never-drain.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
const { test } = require('tap')
const ThreadStream = require('../index')
const { join } = require('path')

function sleep (ms) {
return new Promise((resolve) => {
setTimeout(resolve, ms)
})
}

test('emit warning when the worker gracefully exit without the stream ended', async function (t) {
const expectedWarning = 'ThreadStream: process exited before destination stream was drained. this may indicate that the destination stream try to write to a another missing stream'
const stream = new ThreadStream({
filename: join(__dirname, 'to-next.js')
})

let streamWarning
function saveWarning (e) {
if (e.message === expectedWarning) {
streamWarning = e
}
}
process.on('warning', saveWarning)

for (let i = 0; i < 10_000; i++) {
if (streamWarning?.message === expectedWarning) {
break
}
stream.write('hello')
await new Promise((resolve) => {
setTimeout(resolve, 1)
})
}

process.off('warning', saveWarning)
await sleep(10)

t.equal(stream.destroyed, true)
t.equal(streamWarning?.message, expectedWarning)
})
9 changes: 9 additions & 0 deletions test/to-next.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
'use strict'

const { PassThrough } = require('stream')

async function run (opts) {
return new PassThrough({})
}

module.exports = run

0 comments on commit 7706910

Please sign in to comment.