Skip to content

Commit

Permalink
feat: post message to worker when message event is emitted (#145)
Browse files Browse the repository at this point in the history
* feat: post message to worker when message event is emitted

* test: added missing MessageChannel import
  • Loading branch information
10xLaCroixDrinker authored Apr 22, 2024
1 parent 8f7930d commit f7516e1
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 0 deletions.
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,27 @@ stream.on('eventName', function (a, b, c, n, err) {
})
```

### Post Messages

You can post messages to the worker by emitting a `message` event on the ThreadStream.

```js
const stream = new ThreadStream({
filename: join(__dirname, 'worker.js'),
workerData: {},
})
stream.emit('message', message)
```

On your worker, you can listen for this message using [`worker.parentPort.on('message', cb)`](https://nodejs.org/api/worker_threads.html#event-message).

```js
const { parentPort } = require('worker_threads')
parentPort.on('message', function (message) {
console.log('received:', message)
})
```

## License

MIT
20 changes: 20 additions & 0 deletions index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,26 @@ declare class ThreadStream extends EventEmitter {
* @throws {Error} if the stream is already flushing, if it fails to flush or if it takes more than 10 seconds to flush.
*/
flushSync(): void
/**
* Synchronously calls each of the listeners registered for the event named`eventName`, in the order they were registered, passing the supplied arguments
* to each.
*
* @param eventName the name of the event.
* @param args the arguments to be passed to the event handlers.
* @returns {boolean} `true` if the event had listeners, `false` otherwise.
*/
emit(eventName: string | symbol, ...args: any[]): boolean {
return super.emit(eventName, ...args);
}
/**
* Post a message to the Worker with specified data and an optional list of transferable objects.
*
* @param eventName the name of the event, specifically 'message'.
* @param message message data to be sent to the Worker.
* @param transferList an optional list of transferable objects to be transferred to the Worker context.
* @returns {boolean} true if the event had listeners, false otherwise.
*/
emit(eventName: 'message', message: any, transferList?: Transferable[]): boolean
}

export = ThreadStream;
3 changes: 3 additions & 0 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,9 @@ class ThreadStream extends EventEmitter {

// TODO (fix): Make private?
this.worker = createWorker(this, opts) // TODO (fix): make private
this.on('message', (message, transferList) => {
this.worker.postMessage(message, transferList)
})
}

write (data) {
Expand Down
18 changes: 18 additions & 0 deletions test/on-message.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
'use strict'

const { parentPort } = require('worker_threads')
const { Writable } = require('stream')

function run () {
parentPort.once('message', function ({ text, takeThisPortPlease }) {
takeThisPortPlease.postMessage(`received: ${text}`)
})
return new Writable({
autoDestroy: true,
write (chunk, enc, cb) {
cb()
}
})
}

module.exports = run
24 changes: 24 additions & 0 deletions test/post-message.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
'use strict'

const { test } = require('tap')
const { join } = require('path')
const { once } = require('events')
const { MessageChannel } = require('worker_threads')
const ThreadStream = require('..')

test('message events emitted on the stream are posted to the worker', async function (t) {
t.plan(1)

const { port1, port2 } = new MessageChannel()
const stream = new ThreadStream({
filename: join(__dirname, 'on-message.js'),
sync: false
})
t.teardown(() => {
stream.end()
})

stream.emit('message', { text: 'hello', takeThisPortPlease: port1 }, [port1])
const [confirmation] = await once(port2, 'message')
t.equal(confirmation, 'received: hello')
})

0 comments on commit f7516e1

Please sign in to comment.