Skip to content

Commit

Permalink
worker: implement MessagePort and MessageChannel
Browse files Browse the repository at this point in the history
Implement `MessagePort` and `MessageChannel` along the lines of
the DOM classes of the same names. `MessagePort`s initially
support transferring only `ArrayBuffer`s.

Thanks to Stephen Belanger for reviewing this change in its
original form, to Benjamin Gruenbaum for reviewing the
added tests in their original form, and to Olivia Hugger
for reviewing the documentation in its original form.

Refs: ayojs/ayo#98
  • Loading branch information
addaleax committed May 22, 2018
1 parent 6af7927 commit 391fe5f
Show file tree
Hide file tree
Showing 21 changed files with 1,142 additions and 0 deletions.
1 change: 1 addition & 0 deletions doc/api/_toc.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
* [Utilities](util.html)
* [V8](v8.html)
* [VM](vm.html)
* [Worker](worker.html)
* [ZLIB](zlib.html)

<div class="line"></div>
Expand Down
16 changes: 16 additions & 0 deletions doc/api/errors.md
Original file line number Diff line number Diff line change
Expand Up @@ -650,12 +650,23 @@ Used when a child process is being forked without specifying an IPC channel.
Used when the main process is trying to read data from the child process's
STDERR / STDOUT, and the data's length is longer than the `maxBuffer` option.

<a id="ERR_CLOSED_MESSAGE_PORT"></a>
### ERR_CLOSED_MESSAGE_PORT

Used when there was an attempt to use a `MessagePort` instance in a closed
state, usually after `.close()` has been called.

<a id="ERR_CONSOLE_WRITABLE_STREAM"></a>
### ERR_CONSOLE_WRITABLE_STREAM

`Console` was instantiated without `stdout` stream, or `Console` has a
non-writable `stdout` or `stderr` stream.

<a id="ERR_CONSTRUCT_CALL_REQUIRED"></a>
### ERR_CONSTRUCT_CALL_REQUIRED

A constructor for a class was called without `new`.

<a id="ERR_CPU_USAGE"></a>
### ERR_CPU_USAGE

Expand Down Expand Up @@ -1203,6 +1214,11 @@ urlSearchParams.has.call(buf, 'foo');
// Throws a TypeError with code 'ERR_INVALID_THIS'
```

<a id="ERR_INVALID_TRANSFER_OBJECT"></a>
### ERR_INVALID_TRANSFER_OBJECT

An invalid transfer object was passed to `.postMessage()`.

<a id="ERR_INVALID_TUPLE"></a>
### ERR_INVALID_TUPLE

Expand Down
149 changes: 149 additions & 0 deletions doc/api/worker.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Worker

<!--introduced_in=REPLACEME-->

> Stability: 1 - Experimental
## Class: MessageChannel
<!-- YAML
added: REPLACEME
-->

Instances of the `worker.MessageChannel` class represent an asynchronous,
two-way communications channel.
The `MessageChannel` has no methods of its own. `new MessageChannel()`
yields an object with `port1` and `port2` properties, which refer to linked
[`MessagePort`][] instances.

```js
const { MessageChannel } = require('worker');

const { port1, port2 } = new MessageChannel();
port1.on('message', (message) => console.log('received', message));
port2.postMessage({ foo: 'bar' });
// prints: received { foo: 'bar' }
```

## Class: MessagePort
<!-- YAML
added: REPLACEME
-->

* Extends: {EventEmitter}

Instances of the `worker.MessagePort` class represent one end of an
asynchronous, two-way communications channel. It can be used to transfer
structured data, memory regions and other `MessagePort`s between different
[`Worker`][]s.

*Note*: With the exception of `MessagePort`s being [`EventEmitter`][]s rather
than `EventTarget`s, this implementation matches [browser `MessagePort`][]s.

### Event: 'close'
<!-- YAML
added: REPLACEME
-->

The `'close'` event is emitted once either side of the channel has been
disconnected.

### Event: 'message'
<!-- YAML
added: REPLACEME
-->

* `value` {any} The transmitted value

The `'message'` event is emitted for any incoming message, containing the cloned
input of [`port.postMessage()`][].

Listeners on this event will receive a clone of the `value` parameter as passed
to `postMessage()` and no further arguments.

### port.close()
<!-- YAML
added: REPLACEME
-->

* Returns: {undefined}

Disables further sending of messages on either side of the connection.
This this method can be called once you know that no further communication
will happen over this `MessagePort`.

### port.postMessage(value[, transferList])
<!-- YAML
added: REPLACEME
-->

* `value` {any}
* `transferList` {Object[]}

* Returns: {undefined}

Sends a JavaScript value to the receiving side of this channel.
`value` will be transferred in a way which is compatible with
the [HTML structured clone algorithm][]. In particular, it may contain circular
references and objects like typed arrays that the `JSON` API is not able
to stringify.

`transferList` may be a list of `ArrayBuffer` objects.
After transferring, they will not be usable on the sending side of the channel
anymore (even if they are not contained in `value`).

`value` may still contain `ArrayBuffer` instances that are not in
`transferList`; in that case, the underlying memory is copied rather than moved.

For more information on the serialization and deserialization mechanisms
behind this API, see the [serialization API of the `v8` module][v8.serdes].

*Note*: Because the object cloning uses the structured clone algorithm,
non-enumberable properties, property accessors, and object prototypes are
not preserved. In particular, [`Buffer`][] objects will be read as
plain [`Uint8Array`][]s on the receiving side.

### port.ref()
<!-- YAML
added: REPLACEME
-->

Opposite of `unref`, calling `ref` on a previously `unref`d port will *not*
let the program exit if it's the only active handle left (the default behavior).
If the port is `ref`d calling `ref` again will have no effect.

If listeners are attached or removed using `.on('message')`, the port will
be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.

### port.unref()
<!-- YAML
added: REPLACEME
-->

Calling `unref` on a port will allow the thread to exit if this is the only
active handle in the event system. If the port is already `unref`d calling
`unref` again will have no effect.

If listeners are attached or removed using `.on('message')`, the port will
be `ref()`ed and `unref()`ed automatically depending on whether
listeners for the event exist.

### port.start()
<!-- YAML
added: REPLACEME
-->

* Returns: {undefined}

Starts receiving messages on this `MessagePort`. When using this port
as an event emitter, this will be called automatically once `'message'`
listeners are attached.

[`Buffer`]: buffer.html
[`EventEmitter`]: events.html
[`MessagePort`]: #worker_class_messageport
[`port.postMessage()`]: #worker_port_postmessage_value_transferlist
[v8.serdes]: v8.html#v8_serialization_api
[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array
[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort
[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm
5 changes: 5 additions & 0 deletions lib/internal/modules/cjs/helpers.js
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ const builtinLibs = [
'v8', 'vm', 'zlib'
];

if (process.binding('config').experimentalWorker) {
builtinLibs.push('worker');
builtinLibs.sort();
}

if (typeof process.binding('inspector').open === 'function') {
builtinLibs.push('inspector');
builtinLibs.sort();
Expand Down
103 changes: 103 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
'use strict';

const EventEmitter = require('events');
const util = require('util');

const { internalBinding } = require('internal/bootstrap/loaders');
const { MessagePort, MessageChannel } = internalBinding('messaging');
util.inherits(MessagePort, EventEmitter);

const kOnMessageListener = Symbol('kOnMessageListener');

const debug = util.debuglog('worker');

// A MessagePort consists of a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
MessagePort.prototype[kOnMessageListener] = function onmessage(payload) {
debug('received message', payload);
// Emit the deserialized object to userland.
this.emit('message', payload);
};

// This is for compatibility with the Web's MessagePort API. It makes sense to
// provide it as an `EventEmitter` in Node.js, but if somebody overrides
// `onmessage`, we'll switch over to the Web API model.
Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: true,
configurable: true,
get() {
return this[kOnMessageListener];
},
set(value) {
this[kOnMessageListener] = value;
if (typeof value === 'function') {
this.ref();
this.start();
} else {
this.unref();
this.stop();
}
}
});

// This is called from inside the `MessagePort` constructor.
function oninit() {
setupPortReferencing(this, this, 'message');
}

Object.defineProperty(MessagePort.prototype, 'oninit', {
enumerable: true,
writable: false,
value: oninit
});

// This is called after the underlying `uv_async_t` has been closed.
function onclose() {
if (typeof this.onclose === 'function') {
// Not part of the Web standard yet, but there aren't many reasonable
// alternatives in a non-EventEmitter usage setting.
// Refs: https://github.com/whatwg/html/issues/1766
this.onclose();
}
this.emit('close');
}

Object.defineProperty(MessagePort.prototype, '_onclose', {
enumerable: true,
writable: false,
value: onclose
});

const originalClose = MessagePort.prototype.close;
MessagePort.prototype.close = function(cb) {
if (typeof cb === 'function')
this.once('close', cb);
originalClose.call(this);
};

function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
port.start();
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.stop();
port.unref();
}
});
}

module.exports = {
MessagePort,
MessageChannel
};
12 changes: 12 additions & 0 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';

if (!process.binding('config').experimentalWorker) {
// TODO(addaleax): Is this the right way to do this?
// eslint-disable-next-line no-restricted-syntax
throw new Error('The `worker` module is experimental and may change at ' +
'any time. Pass --experimental-worker to Node.js in order to enable it.');
}

const { MessagePort, MessageChannel } = require('internal/worker');

module.exports = { MessagePort, MessageChannel };
4 changes: 4 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
'lib/util.js',
'lib/v8.js',
'lib/vm.js',
'lib/worker.js',
'lib/zlib.js',
'lib/internal/assert.js',
'lib/internal/async_hooks.js',
Expand Down Expand Up @@ -156,6 +157,7 @@
'lib/internal/validators.js',
'lib/internal/stream_base_commons.js',
'lib/internal/vm/module.js',
'lib/internal/worker.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/async_iterator.js',
'lib/internal/streams/buffer_list.js',
Expand Down Expand Up @@ -333,6 +335,7 @@
'src/node_file.cc',
'src/node_http2.cc',
'src/node_http_parser.cc',
'src/node_messaging.cc',
'src/node_os.cc',
'src/node_platform.cc',
'src/node_perf.cc',
Expand Down Expand Up @@ -391,6 +394,7 @@
'src/node_http2_state.h',
'src/node_internals.h',
'src/node_javascript.h',
'src/node_messaging.h',
'src/node_mutex.h',
'src/node_perf.h',
'src/node_perf_common.h',
Expand Down
1 change: 1 addition & 0 deletions src/async_wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ namespace node {
V(HTTP2SETTINGS) \
V(HTTPPARSER) \
V(JSSTREAM) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPESERVERWRAP) \
V(PIPEWRAP) \
Expand Down
5 changes: 5 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ struct PackageConfig {
V(main_string, "main") \
V(max_buffer_string, "maxBuffer") \
V(message_string, "message") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(modulus_string, "modulus") \
V(name_string, "name") \
Expand All @@ -209,6 +210,7 @@ struct PackageConfig {
V(onhandshakedone_string, "onhandshakedone") \
V(onhandshakestart_string, "onhandshakestart") \
V(onheaders_string, "onheaders") \
V(oninit_string, "oninit") \
V(onmessage_string, "onmessage") \
V(onnewsession_string, "onnewsession") \
V(onocspresponse_string, "onocspresponse") \
Expand Down Expand Up @@ -239,6 +241,8 @@ struct PackageConfig {
V(pipe_target_string, "pipeTarget") \
V(pipe_source_string, "pipeSource") \
V(port_string, "port") \
V(port1_string, "port1") \
V(port2_string, "port2") \
V(preference_string, "preference") \
V(priority_string, "priority") \
V(promise_string, "promise") \
Expand Down Expand Up @@ -320,6 +324,7 @@ struct PackageConfig {
V(http2stream_constructor_template, v8::ObjectTemplate) \
V(immediate_callback_function, v8::Function) \
V(inspector_console_api_object, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
V(performance_entry_callback, v8::Function) \
Expand Down
Loading

0 comments on commit 391fe5f

Please sign in to comment.