Skip to content
This repository has been archived by the owner on Aug 31, 2018. It is now read-only.

Commit

Permalink
worker: implement MessagePort and MessageChannel
Browse files Browse the repository at this point in the history
Implement `MessagePort` and `MessageChannel` along the lines of
the DOM classes of the same names. `MessagePort`s initially
support transferring only `ArrayBuffer`s.
  • Loading branch information
addaleax committed Oct 12, 2017
1 parent ea80331 commit d9b49b7
Show file tree
Hide file tree
Showing 11 changed files with 978 additions and 1 deletion.
3 changes: 2 additions & 1 deletion lib/internal/module.js
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ const builtinLibs = [
'assert', 'async_hooks', 'buffer', 'child_process', 'cluster', 'crypto',
'dgram', 'dns', 'domain', 'events', 'fs', 'http', 'http2', 'https', 'net',
'os', 'path', 'perf_hooks', 'punycode', 'querystring', 'readline', 'repl',
'stream', 'string_decoder', 'tls', 'tty', 'url', 'util', 'v8', 'vm', 'zlib'
'stream', 'string_decoder', 'tls', 'tty', 'url', 'util', 'v8', 'vm', 'worker',
'zlib'
];

if (typeof process.binding('inspector').connect === 'function') {
Expand Down
83 changes: 83 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
'use strict';

const EventEmitter = require('events');
const util = require('util');

const { MessagePort, MessageChannel } = process.binding('messaging');
util.inherits(MessagePort, EventEmitter);

const debug = util.debuglog('worker');

// A MessagePort consists a handle (that wraps around an
// uv_async_t) which can receive information from other threads and emits
// .onmessage events, and a function used for sending data to a MessagePort
// in some other thread.
function onmessage(payload, flag) {
debug(`[${process.threadId}] received message`, flag, payload);
// Emit the flag and deserialized object to userland.
if (flag === 0 || flag === undefined)
this.emit('message', payload);
else
this.emit('flaggedMessage', flag, payload);
}

Object.defineProperty(MessagePort.prototype, 'onmessage', {
enumerable: false,
configurable: true,
get() { return onmessage; },
set(value) {
Object.defineProperty(this, {
writable: true,
enumerable: true,
configurable: true,
value
});
this.ref();
this.start();
}
});

function oninit() {
setupPortReferencing(this, this, 'message');
}

Object.defineProperty(MessagePort.prototype, 'oninit', {
enumerable: false,
writable: false,
value: oninit
});

function onclose() {
this.emit('close');
}

Object.defineProperty(MessagePort.prototype, 'onclose', {
enumerable: false,
writable: false,
value: onclose
});

function setupPortReferencing(port, eventEmitter, eventName) {
// Keep track of whether there are any workerMessage listeners:
// If there are some, ref() the channel so it keeps the event loop alive.
// If there are none or all are removed, unref() the channel so the worker
// can shutdown gracefully.
port.unref();
eventEmitter.on('newListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.ref();
port.start();
}
});
eventEmitter.on('removeListener', (name) => {
if (name === eventName && eventEmitter.listenerCount(eventName) === 0) {
port.stop();
port.unref();
}
});
}

module.exports = {
MessagePort,
MessageChannel
};
5 changes: 5 additions & 0 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
'use strict';

const { MessagePort, MessageChannel } = require('internal/worker');

module.exports = { MessagePort, MessageChannel };
4 changes: 4 additions & 0 deletions node.gyp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
'lib/util.js',
'lib/v8.js',
'lib/vm.js',
'lib/worker.js',
'lib/zlib.js',
'lib/internal/buffer.js',
'lib/internal/child_process.js',
Expand Down Expand Up @@ -129,6 +130,7 @@
'lib/internal/http2/util.js',
'lib/internal/v8_prof_polyfill.js',
'lib/internal/v8_prof_processor.js',
'lib/internal/worker.js',
'lib/internal/streams/lazy_transform.js',
'lib/internal/streams/BufferList.js',
'lib/internal/streams/legacy.js',
Expand Down Expand Up @@ -208,6 +210,7 @@
'src/node_http2.cc',
'src/node_http_parser.cc',
'src/node_main.cc',
'src/node_messaging.cc',
'src/node_os.cc',
'src/node_platform.cc',
'src/node_perf.cc',
Expand Down Expand Up @@ -259,6 +262,7 @@
'src/node_http2_state.h',
'src/node_internals.h',
'src/node_javascript.h',
'src/node_messaging.h',
'src/node_mutex.h',
'src/node_platform.h',
'src/node_perf.h',
Expand Down
1 change: 1 addition & 0 deletions src/async-wrap.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ namespace node {
V(HTTP2SESSIONSHUTDOWNWRAP) \
V(HTTPPARSER) \
V(JSSTREAM) \
V(MESSAGEPORT) \
V(PIPECONNECTWRAP) \
V(PIPEWRAP) \
V(PROCESSWRAP) \
Expand Down
6 changes: 6 additions & 0 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ class ModuleWrap;
V(mac_string, "mac") \
V(max_buffer_string, "maxBuffer") \
V(message_string, "message") \
V(message_port_constructor_string, "MessagePort") \
V(minttl_string, "minttl") \
V(model_string, "model") \
V(modulus_string, "modulus") \
Expand All @@ -208,6 +209,7 @@ class ModuleWrap;
V(onhandshakedone_string, "onhandshakedone") \
V(onhandshakestart_string, "onhandshakestart") \
V(onheaders_string, "onheaders") \
V(oninit_string, "oninit") \
V(onmessage_string, "onmessage") \
V(onnewsession_string, "onnewsession") \
V(onnewsessiondone_string, "onnewsessiondone") \
Expand Down Expand Up @@ -235,6 +237,8 @@ class ModuleWrap;
V(pid_string, "pid") \
V(pipe_string, "pipe") \
V(port_string, "port") \
V(port1_string, "port1") \
V(port2_string, "port2") \
V(preference_string, "preference") \
V(priority_string, "priority") \
V(produce_cached_data_string, "produceCachedData") \
Expand Down Expand Up @@ -308,6 +312,7 @@ class ModuleWrap;
V(domain_array, v8::Array) \
V(domains_stack_array, v8::Array) \
V(inspector_console_api_object, v8::Object) \
V(message_port_constructor_template, v8::FunctionTemplate) \
V(module_load_list_array, v8::Array) \
V(pbkdf2_constructor_template, v8::ObjectTemplate) \
V(pipe_constructor_template, v8::FunctionTemplate) \
Expand All @@ -318,6 +323,7 @@ class ModuleWrap;
V(promise_wrap_template, v8::ObjectTemplate) \
V(push_values_to_array_function, v8::Function) \
V(randombytes_constructor_template, v8::ObjectTemplate) \
V(sab_lifetimepartner_constructor_template, v8::FunctionTemplate) \
V(script_context_constructor_template, v8::FunctionTemplate) \
V(script_data_constructor_function, v8::Function) \
V(secure_context_constructor_template, v8::FunctionTemplate) \
Expand Down
Loading

0 comments on commit d9b49b7

Please sign in to comment.