diff --git a/doc/api/_toc.md b/doc/api/_toc.md index 9b487b50a55031..1b2fdea26e46eb 100644 --- a/doc/api/_toc.md +++ b/doc/api/_toc.md @@ -53,6 +53,7 @@ * [Utilities](util.html) * [V8](v8.html) * [VM](vm.html) +* [Worker](worker.html) * [ZLIB](zlib.html)
diff --git a/doc/api/all.md b/doc/api/all.md index d013f07bd328fc..6f0a21dd092105 100644 --- a/doc/api/all.md +++ b/doc/api/all.md @@ -46,4 +46,5 @@ @include util @include v8 @include vm +@include worker @include zlib diff --git a/doc/api/errors.md b/doc/api/errors.md index fc1bcd3e6e994b..f3e5939f258576 100644 --- a/doc/api/errors.md +++ b/doc/api/errors.md @@ -650,12 +650,23 @@ Used when a child process is being forked without specifying an IPC channel. Used when the main process is trying to read data from the child process's STDERR / STDOUT, and the data's length is longer than the `maxBuffer` option. + +### ERR_CLOSED_MESSAGE_PORT + +There was an attempt to use a `MessagePort` instance in a closed +state, usually after `.close()` has been called. + ### ERR_CONSOLE_WRITABLE_STREAM `Console` was instantiated without `stdout` stream, or `Console` has a non-writable `stdout` or `stderr` stream. + +### ERR_CONSTRUCT_CALL_REQUIRED + +A constructor for a class was called without `new`. + ### ERR_CPU_USAGE @@ -1203,6 +1214,11 @@ urlSearchParams.has.call(buf, 'foo'); // Throws a TypeError with code 'ERR_INVALID_THIS' ``` + +### ERR_INVALID_TRANSFER_OBJECT + +An invalid transfer object was passed to `postMessage()`. + ### ERR_INVALID_TUPLE diff --git a/doc/api/worker.md b/doc/api/worker.md new file mode 100644 index 00000000000000..4724714cd62f26 --- /dev/null +++ b/doc/api/worker.md @@ -0,0 +1,146 @@ +# Worker + + + +> Stability: 1 - Experimental + +## Class: MessageChannel + + +Instances of the `worker.MessageChannel` class represent an asynchronous, +two-way communications channel. +The `MessageChannel` has no methods of its own. `new MessageChannel()` +yields an object with `port1` and `port2` properties, which refer to linked +[`MessagePort`][] instances. + +```js +const { MessageChannel } = require('worker'); + +const { port1, port2 } = new MessageChannel(); +port1.on('message', (message) => console.log('received', message)); +port2.postMessage({ foo: 'bar' }); +// prints: received { foo: 'bar' } +``` + +## Class: MessagePort + + +* Extends: {EventEmitter} + +Instances of the `worker.MessagePort` class represent one end of an +asynchronous, two-way communications channel. It can be used to transfer +structured data, memory regions and other `MessagePort`s between different +[`Worker`][]s. + +With the exception of `MessagePort`s being [`EventEmitter`][]s rather +than `EventTarget`s, this implementation matches [browser `MessagePort`][]s. + +### Event: 'close' + + +The `'close'` event is emitted once either side of the channel has been +disconnected. + +### Event: 'message' + + +* `value` {any} The transmitted value + +The `'message'` event is emitted for any incoming message, containing the cloned +input of [`port.postMessage()`][]. + +Listeners on this event will receive a clone of the `value` parameter as passed +to `postMessage()` and no further arguments. + +### port.close() + + +Disables further sending of messages on either side of the connection. +This method can be called once you know that no further communication +will happen over this `MessagePort`. + +### port.postMessage(value[, transferList]) + + +* `value` {any} +* `transferList` {Object[]} + +Sends a JavaScript value to the receiving side of this channel. +`value` will be transferred in a way which is compatible with +the [HTML structured clone algorithm][]. In particular, it may contain circular +references and objects like typed arrays that the `JSON` API is not able +to stringify. + +`transferList` may be a list of `ArrayBuffer` objects. +After transferring, they will not be usable on the sending side of the channel +anymore (even if they are not contained in `value`). + +`value` may still contain `ArrayBuffer` instances that are not in +`transferList`; in that case, the underlying memory is copied rather than moved. + +For more information on the serialization and deserialization mechanisms +behind this API, see the [serialization API of the `v8` module][v8.serdes]. + +Because the object cloning uses the structured clone algorithm, +non-enumerable properties, property accessors, and object prototypes are +not preserved. In particular, [`Buffer`][] objects will be read as +plain [`Uint8Array`][]s on the receiving side. + +The message object will be cloned immediately, and can be modified after +posting without having side effects. + +### port.ref() + + +Opposite of `unref()`. Calling `ref()` on a previously `unref()`ed port will +*not* let the program exit if it's the only active handle left (the default +behavior). If the port is `ref()`ed, calling `ref()` again will have no effect. + +If listeners are attached or removed using `.on('message')`, the port will +be `ref()`ed and `unref()`ed automatically depending on whether +listeners for the event exist. + +### port.start() + + +Starts receiving messages on this `MessagePort`. When using this port +as an event emitter, this will be called automatically once `'message'` +listeners are attached. + +### port.unref() + + +Calling `unref()` on a port will allow the thread to exit if this is the only +active handle in the event system. If the port is already `unref()`ed calling +`unref()` again will have no effect. + +If listeners are attached or removed using `.on('message')`, the port will +be `ref()`ed and `unref()`ed automatically depending on whether +listeners for the event exist. + +[`Buffer`]: buffer.html +[`EventEmitter`]: events.html +[`MessagePort`]: #worker_class_messageport +[`port.postMessage()`]: #worker_port_postmessage_value_transferlist +[v8.serdes]: v8.html#v8_serialization_api +[`Uint8Array`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Uint8Array +[browser `MessagePort`]: https://developer.mozilla.org/en-US/docs/Web/API/MessagePort +[HTML structured clone algorithm]: https://developer.mozilla.org/en-US/docs/Web/API/Web_Workers_API/Structured_clone_algorithm diff --git a/lib/internal/bootstrap/loaders.js b/lib/internal/bootstrap/loaders.js index ff809a91291bee..417e8594e14aab 100644 --- a/lib/internal/bootstrap/loaders.js +++ b/lib/internal/bootstrap/loaders.js @@ -194,7 +194,8 @@ }; NativeModule.isInternal = function(id) { - return id.startsWith('internal/'); + return id.startsWith('internal/') || + (id === 'worker' && !process.binding('config').experimentalWorker); }; } diff --git a/lib/internal/modules/cjs/helpers.js b/lib/internal/modules/cjs/helpers.js index 60346c5841c7df..55eaed7d376506 100644 --- a/lib/internal/modules/cjs/helpers.js +++ b/lib/internal/modules/cjs/helpers.js @@ -105,6 +105,11 @@ const builtinLibs = [ 'v8', 'vm', 'zlib' ]; +if (process.binding('config').experimentalWorker) { + builtinLibs.push('worker'); + builtinLibs.sort(); +} + if (typeof process.binding('inspector').open === 'function') { builtinLibs.push('inspector'); builtinLibs.sort(); diff --git a/lib/internal/worker.js b/lib/internal/worker.js new file mode 100644 index 00000000000000..03caa07a4b1eeb --- /dev/null +++ b/lib/internal/worker.js @@ -0,0 +1,103 @@ +'use strict'; + +const EventEmitter = require('events'); +const util = require('util'); + +const { internalBinding } = require('internal/bootstrap/loaders'); +const { MessagePort, MessageChannel } = internalBinding('messaging'); +util.inherits(MessagePort, EventEmitter); + +const kOnMessageListener = Symbol('kOnMessageListener'); + +const debug = util.debuglog('worker'); + +// A MessagePort consists of a handle (that wraps around an +// uv_async_t) which can receive information from other threads and emits +// .onmessage events, and a function used for sending data to a MessagePort +// in some other thread. +MessagePort.prototype[kOnMessageListener] = function onmessage(payload) { + debug('received message', payload); + // Emit the deserialized object to userland. + this.emit('message', payload); +}; + +// This is for compatibility with the Web's MessagePort API. It makes sense to +// provide it as an `EventEmitter` in Node.js, but if somebody overrides +// `onmessage`, we'll switch over to the Web API model. +Object.defineProperty(MessagePort.prototype, 'onmessage', { + enumerable: true, + configurable: true, + get() { + return this[kOnMessageListener]; + }, + set(value) { + this[kOnMessageListener] = value; + if (typeof value === 'function') { + this.ref(); + this.start(); + } else { + this.unref(); + this.stop(); + } + } +}); + +// This is called from inside the `MessagePort` constructor. +function oninit() { + setupPortReferencing(this, this, 'message'); +} + +Object.defineProperty(MessagePort.prototype, 'oninit', { + enumerable: true, + writable: false, + value: oninit +}); + +// This is called after the underlying `uv_async_t` has been closed. +function onclose() { + if (typeof this.onclose === 'function') { + // Not part of the Web standard yet, but there aren't many reasonable + // alternatives in a non-EventEmitter usage setting. + // Refs: https://github.com/whatwg/html/issues/1766 + this.onclose(); + } + this.emit('close'); +} + +Object.defineProperty(MessagePort.prototype, '_onclose', { + enumerable: true, + writable: false, + value: onclose +}); + +const originalClose = MessagePort.prototype.close; +MessagePort.prototype.close = function(cb) { + if (typeof cb === 'function') + this.once('close', cb); + originalClose.call(this); +}; + +function setupPortReferencing(port, eventEmitter, eventName) { + // Keep track of whether there are any workerMessage listeners: + // If there are some, ref() the channel so it keeps the event loop alive. + // If there are none or all are removed, unref() the channel so the worker + // can shutdown gracefully. + port.unref(); + eventEmitter.on('newListener', (name) => { + if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + port.ref(); + port.start(); + } + }); + eventEmitter.on('removeListener', (name) => { + if (name === eventName && eventEmitter.listenerCount(eventName) === 0) { + port.stop(); + port.unref(); + } + }); +} + +module.exports = { + MessagePort, + MessageChannel +}; diff --git a/lib/worker.js b/lib/worker.js new file mode 100644 index 00000000000000..d67fb4efe40a33 --- /dev/null +++ b/lib/worker.js @@ -0,0 +1,5 @@ +'use strict'; + +const { MessagePort, MessageChannel } = require('internal/worker'); + +module.exports = { MessagePort, MessageChannel }; diff --git a/node.gyp b/node.gyp index 4b94c1dd6b2ad9..9a8dbf00cd9f15 100644 --- a/node.gyp +++ b/node.gyp @@ -78,6 +78,7 @@ 'lib/util.js', 'lib/v8.js', 'lib/vm.js', + 'lib/worker.js', 'lib/zlib.js', 'lib/internal/assert.js', 'lib/internal/async_hooks.js', @@ -156,6 +157,7 @@ 'lib/internal/validators.js', 'lib/internal/stream_base_commons.js', 'lib/internal/vm/module.js', + 'lib/internal/worker.js', 'lib/internal/streams/lazy_transform.js', 'lib/internal/streams/async_iterator.js', 'lib/internal/streams/buffer_list.js', @@ -334,6 +336,7 @@ 'src/node_file.cc', 'src/node_http2.cc', 'src/node_http_parser.cc', + 'src/node_messaging.cc', 'src/node_os.cc', 'src/node_platform.cc', 'src/node_perf.cc', @@ -391,6 +394,7 @@ 'src/node_http2_state.h', 'src/node_internals.h', 'src/node_javascript.h', + 'src/node_messaging.h', 'src/node_mutex.h', 'src/node_perf.h', 'src/node_perf_common.h', diff --git a/src/async_wrap.h b/src/async_wrap.h index 377702a8d6ef9c..cf269a4c1f5e1e 100644 --- a/src/async_wrap.h +++ b/src/async_wrap.h @@ -49,6 +49,7 @@ namespace node { V(HTTP2SETTINGS) \ V(HTTPPARSER) \ V(JSSTREAM) \ + V(MESSAGEPORT) \ V(PIPECONNECTWRAP) \ V(PIPESERVERWRAP) \ V(PIPEWRAP) \ diff --git a/src/env.h b/src/env.h index 7a432eaa3d4ff0..d87c39c5186bd7 100644 --- a/src/env.h +++ b/src/env.h @@ -193,6 +193,7 @@ struct PackageConfig { V(main_string, "main") \ V(max_buffer_string, "maxBuffer") \ V(message_string, "message") \ + V(message_port_constructor_string, "MessagePort") \ V(minttl_string, "minttl") \ V(modulus_string, "modulus") \ V(name_string, "name") \ @@ -212,6 +213,7 @@ struct PackageConfig { V(onhandshakedone_string, "onhandshakedone") \ V(onhandshakestart_string, "onhandshakestart") \ V(onheaders_string, "onheaders") \ + V(oninit_string, "oninit") \ V(onmessage_string, "onmessage") \ V(onnewsession_string, "onnewsession") \ V(onocspresponse_string, "onocspresponse") \ @@ -242,6 +244,8 @@ struct PackageConfig { V(pipe_target_string, "pipeTarget") \ V(pipe_source_string, "pipeSource") \ V(port_string, "port") \ + V(port1_string, "port1") \ + V(port2_string, "port2") \ V(preference_string, "preference") \ V(priority_string, "priority") \ V(promise_string, "promise") \ @@ -323,6 +327,7 @@ struct PackageConfig { V(http2stream_constructor_template, v8::ObjectTemplate) \ V(immediate_callback_function, v8::Function) \ V(inspector_console_api_object, v8::Object) \ + V(message_port_constructor_template, v8::FunctionTemplate) \ V(pbkdf2_constructor_template, v8::ObjectTemplate) \ V(pipe_constructor_template, v8::FunctionTemplate) \ V(performance_entry_callback, v8::Function) \ diff --git a/src/node.cc b/src/node.cc index bf3aae2d35f773..baa97281b064a7 100644 --- a/src/node.cc +++ b/src/node.cc @@ -253,6 +253,11 @@ bool config_experimental_modules = false; // that is used by lib/vm.js bool config_experimental_vm_modules = false; +// Set in node.cc by ParseArgs when --experimental-worker is used. +// Used in node_config.cc to set a constant on process.binding('config') +// that is used by lib/worker.js +bool config_experimental_worker = false; + // Set in node.cc by ParseArgs when --experimental-repl-await is used. // Used in node_config.cc to set a constant on process.binding('config') // that is used by lib/repl.js. @@ -3094,6 +3099,7 @@ static void PrintHelp() { " --experimental-vm-modules experimental ES Module support\n" " in vm module\n" #endif // defined(NODE_HAVE_I18N_SUPPORT) + " --experimental-worker experimental threaded Worker support\n" #if HAVE_OPENSSL && NODE_FIPS_MODE " --force-fips force FIPS crypto (cannot be disabled)\n" #endif // HAVE_OPENSSL && NODE_FIPS_MODE @@ -3257,6 +3263,7 @@ static void CheckIfAllowedInEnv(const char* exe, bool is_env, "--experimental-modules", "--experimental-repl-await", "--experimental-vm-modules", + "--experimental-worker", "--force-fips", "--icu-data-dir", "--inspect", @@ -3454,6 +3461,8 @@ static void ParseArgs(int* argc, new_v8_argc += 1; } else if (strcmp(arg, "--experimental-vm-modules") == 0) { config_experimental_vm_modules = true; + } else if (strcmp(arg, "--experimental-worker") == 0) { + config_experimental_worker = true; } else if (strcmp(arg, "--experimental-repl-await") == 0) { config_experimental_repl_await = true; } else if (strcmp(arg, "--loader") == 0) { diff --git a/src/node_config.cc b/src/node_config.cc index 603d55491a259b..dd5ee666486874 100644 --- a/src/node_config.cc +++ b/src/node_config.cc @@ -91,6 +91,9 @@ static void Initialize(Local target, if (config_experimental_vm_modules) READONLY_BOOLEAN_PROPERTY("experimentalVMModules"); + if (config_experimental_worker) + READONLY_BOOLEAN_PROPERTY("experimentalWorker"); + if (config_experimental_repl_await) READONLY_BOOLEAN_PROPERTY("experimentalREPLAwait"); diff --git a/src/node_errors.h b/src/node_errors.h index b2f2b256c4c120..81169d241bc226 100644 --- a/src/node_errors.h +++ b/src/node_errors.h @@ -23,9 +23,12 @@ namespace node { #define ERRORS_WITH_CODE(V) \ V(ERR_BUFFER_OUT_OF_BOUNDS, RangeError) \ V(ERR_BUFFER_TOO_LARGE, Error) \ + V(ERR_CLOSED_MESSAGE_PORT, Error) \ + V(ERR_CONSTRUCT_CALL_REQUIRED, Error) \ V(ERR_INDEX_OUT_OF_RANGE, RangeError) \ V(ERR_INVALID_ARG_VALUE, TypeError) \ V(ERR_INVALID_ARG_TYPE, TypeError) \ + V(ERR_INVALID_TRANSFER_OBJECT, TypeError) \ V(ERR_MEMORY_ALLOCATION_FAILED, Error) \ V(ERR_MISSING_ARGS, TypeError) \ V(ERR_MISSING_MODULE, Error) \ @@ -54,7 +57,10 @@ namespace node { // Errors with predefined static messages #define PREDEFINED_ERROR_MESSAGES(V) \ + V(ERR_CLOSED_MESSAGE_PORT, "Cannot send data on closed MessagePort") \ + V(ERR_CONSTRUCT_CALL_REQUIRED, "Cannot call constructor without `new`") \ V(ERR_INDEX_OUT_OF_RANGE, "Index out of range") \ + V(ERR_INVALID_TRANSFER_OBJECT, "Found invalid object in transferList") \ V(ERR_MEMORY_ALLOCATION_FAILED, "Failed to allocate memory") \ V(ERR_SCRIPT_EXECUTION_INTERRUPTED, \ "Script execution was interrupted by `SIGINT`") diff --git a/src/node_internals.h b/src/node_internals.h index 7bf4eaf1ecf86a..a5d8ed0e5d3ad7 100644 --- a/src/node_internals.h +++ b/src/node_internals.h @@ -114,6 +114,7 @@ struct sockaddr; V(http_parser) \ V(inspector) \ V(js_stream) \ + V(messaging) \ V(module_wrap) \ V(os) \ V(performance) \ @@ -189,6 +190,11 @@ extern bool config_experimental_modules; // that is used by lib/vm.js extern bool config_experimental_vm_modules; +// Set in node.cc by ParseArgs when --experimental-vm-modules is used. +// Used in node_config.cc to set a constant on process.binding('config') +// that is used by lib/vm.js +extern bool config_experimental_worker; + // Set in node.cc by ParseArgs when --experimental-repl-await is used. // Used in node_config.cc to set a constant on process.binding('config') // that is used by lib/repl.js. diff --git a/src/node_messaging.cc b/src/node_messaging.cc new file mode 100644 index 00000000000000..c6e701c7d94426 --- /dev/null +++ b/src/node_messaging.cc @@ -0,0 +1,548 @@ +#include "node_messaging.h" +#include "node_internals.h" +#include "node_buffer.h" +#include "node_errors.h" +#include "util.h" +#include "util-inl.h" +#include "async_wrap.h" +#include "async_wrap-inl.h" + +using v8::Array; +using v8::ArrayBuffer; +using v8::ArrayBufferCreationMode; +using v8::Context; +using v8::EscapableHandleScope; +using v8::Exception; +using v8::Function; +using v8::FunctionCallbackInfo; +using v8::FunctionTemplate; +using v8::HandleScope; +using v8::Isolate; +using v8::Just; +using v8::Local; +using v8::Maybe; +using v8::MaybeLocal; +using v8::Nothing; +using v8::Object; +using v8::String; +using v8::Value; +using v8::ValueDeserializer; +using v8::ValueSerializer; + +namespace node { +namespace worker { + +Message::Message(MallocedBuffer&& buffer) + : main_message_buf_(std::move(buffer)) {} + +namespace { + +// This is used to tell V8 how to read transferred host objects, like other +// `MessagePort`s and `SharedArrayBuffer`s, and make new JS objects out of them. +class DeserializerDelegate : public ValueDeserializer::Delegate { + public: + DeserializerDelegate(Message* m, Environment* env) + : env_(env), msg_(m) {} + + ValueDeserializer* deserializer = nullptr; + + private: + Environment* env_; + Message* msg_; +}; + +} // anonymous namespace + +MaybeLocal Message::Deserialize(Environment* env, + Local context) { + EscapableHandleScope handle_scope(env->isolate()); + Context::Scope context_scope(context); + + DeserializerDelegate delegate(this, env); + ValueDeserializer deserializer( + env->isolate(), + reinterpret_cast(main_message_buf_.data), + main_message_buf_.size, + &delegate); + delegate.deserializer = &deserializer; + + // Attach all transfered ArrayBuffers to their new Isolate. + for (uint32_t i = 0; i < array_buffer_contents_.size(); ++i) { + Local ab = + ArrayBuffer::New(env->isolate(), + array_buffer_contents_[i].release(), + array_buffer_contents_[i].size, + ArrayBufferCreationMode::kInternalized); + deserializer.TransferArrayBuffer(i, ab); + } + array_buffer_contents_.clear(); + + if (deserializer.ReadHeader(context).IsNothing()) + return MaybeLocal(); + return handle_scope.Escape( + deserializer.ReadValue(context).FromMaybe(Local())); +} + +namespace { + +// This tells V8 how to serialize objects that it does not understand +// (e.g. C++ objects) into the output buffer, in a way that our own +// DeserializerDelegate understands how to unpack. +class SerializerDelegate : public ValueSerializer::Delegate { + public: + SerializerDelegate(Environment* env, Local context, Message* m) + : env_(env), context_(context), msg_(m) {} + + void ThrowDataCloneError(Local message) override { + env_->isolate()->ThrowException(Exception::Error(message)); + } + + ValueSerializer* serializer = nullptr; + + private: + Environment* env_; + Local context_; + Message* msg_; + + friend class worker::Message; +}; + +} // anynomous namespace + +Maybe Message::Serialize(Environment* env, + Local context, + Local input, + Local transfer_list_v) { + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(context); + + // Verify that we're not silently overwriting an existing message. + CHECK(main_message_buf_.is_empty()); + + SerializerDelegate delegate(env, context, this); + ValueSerializer serializer(env->isolate(), &delegate); + delegate.serializer = &serializer; + + std::vector> array_buffers; + if (transfer_list_v->IsArray()) { + Local transfer_list = transfer_list_v.As(); + uint32_t length = transfer_list->Length(); + for (uint32_t i = 0; i < length; ++i) { + Local entry; + if (!transfer_list->Get(context, i).ToLocal(&entry)) + return Nothing(); + // Currently, we support ArrayBuffers. + if (entry->IsArrayBuffer()) { + Local ab = entry.As(); + // If we cannot render the ArrayBuffer unusable in this Isolate and + // take ownership of its memory, copying the buffer will have to do. + if (!ab->IsNeuterable() || ab->IsExternal()) + continue; + // We simply use the array index in the `array_buffers` list as the + // ID that we write into the serialized buffer. + uint32_t id = array_buffers.size(); + array_buffers.push_back(ab); + serializer.TransferArrayBuffer(id, ab); + continue; + } + + THROW_ERR_INVALID_TRANSFER_OBJECT(env); + return Nothing(); + } + } + + serializer.WriteHeader(); + if (serializer.WriteValue(context, input).IsNothing()) { + return Nothing(); + } + + for (Local ab : array_buffers) { + // If serialization succeeded, we want to take ownership of + // (a.k.a. externalize) the underlying memory region and render + // it inaccessible in this Isolate. + ArrayBuffer::Contents contents = ab->Externalize(); + ab->Neuter(); + array_buffer_contents_.push_back( + MallocedBuffer { static_cast(contents.Data()), + contents.ByteLength() }); + } + + // The serializer gave us a buffer allocated using `malloc()`. + std::pair data = serializer.Release(); + main_message_buf_ = + MallocedBuffer(reinterpret_cast(data.first), data.second); + return Just(true); +} + +MessagePortData::MessagePortData(MessagePort* owner) : owner_(owner) { } + +MessagePortData::~MessagePortData() { + CHECK_EQ(owner_, nullptr); + Disentangle(); +} + +void MessagePortData::AddToIncomingQueue(Message&& message) { + // This function will be called by other threads. + Mutex::ScopedLock lock(mutex_); + incoming_messages_.emplace_back(std::move(message)); + + if (owner_ != nullptr) + owner_->TriggerAsync(); +} + +bool MessagePortData::IsSiblingClosed() const { + Mutex::ScopedLock lock(*sibling_mutex_); + return sibling_ == nullptr; +} + +void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) { + CHECK_EQ(a->sibling_, nullptr); + CHECK_EQ(b->sibling_, nullptr); + a->sibling_ = b; + b->sibling_ = a; + a->sibling_mutex_ = b->sibling_mutex_; +} + +void MessagePortData::PingOwnerAfterDisentanglement() { + Mutex::ScopedLock lock(mutex_); + if (owner_ != nullptr) + owner_->TriggerAsync(); +} + +void MessagePortData::Disentangle() { + // Grab a copy of the sibling mutex, then replace it so that each sibling + // has its own sibling_mutex_ now. + std::shared_ptr sibling_mutex = sibling_mutex_; + Mutex::ScopedLock sibling_lock(*sibling_mutex); + sibling_mutex_ = std::make_shared(); + + MessagePortData* sibling = sibling_; + if (sibling_ != nullptr) { + sibling_->sibling_ = nullptr; + sibling_ = nullptr; + } + + // We close MessagePorts after disentanglement, so we trigger the + // corresponding uv_async_t to let them know that this happened. + PingOwnerAfterDisentanglement(); + if (sibling != nullptr) { + sibling->PingOwnerAfterDisentanglement(); + } +} + +MessagePort::~MessagePort() { + if (data_) + data_->owner_ = nullptr; +} + +MessagePort::MessagePort(Environment* env, + Local context, + Local wrap) + : HandleWrap(env, + wrap, + reinterpret_cast(new uv_async_t()), + AsyncWrap::PROVIDER_MESSAGEPORT), + data_(new MessagePortData(this)) { + auto onmessage = [](uv_async_t* handle) { + // Called when data has been put into the queue. + MessagePort* channel = static_cast(handle->data); + channel->OnMessage(); + }; + CHECK_EQ(uv_async_init(env->event_loop(), + async(), + onmessage), 0); + async()->data = static_cast(this); + + Local fn; + if (!wrap->Get(context, env->oninit_string()).ToLocal(&fn)) + return; + + if (fn->IsFunction()) { + Local init = fn.As(); + USE(init->Call(context, wrap, 0, nullptr)); + } +} + +void MessagePort::AddToIncomingQueue(Message&& message) { + data_->AddToIncomingQueue(std::move(message)); +} + +uv_async_t* MessagePort::async() { + return reinterpret_cast(GetHandle()); +} + +void MessagePort::TriggerAsync() { + CHECK_EQ(uv_async_send(async()), 0); +} + +void MessagePort::New(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + if (!args.IsConstructCall()) { + THROW_ERR_CONSTRUCT_CALL_REQUIRED(env); + return; + } + + Local context = args.This()->CreationContext(); + Context::Scope context_scope(context); + + new MessagePort(env, context, args.This()); +} + +MessagePort* MessagePort::New( + Environment* env, + Local context, + std::unique_ptr data) { + Context::Scope context_scope(context); + Local ctor; + if (!GetMessagePortConstructor(env, context).ToLocal(&ctor)) + return nullptr; + MessagePort* port = nullptr; + + // Construct a new instance, then assign the listener instance and possibly + // the MessagePortData to it. + Local instance; + if (!ctor->NewInstance(context).ToLocal(&instance)) + return nullptr; + ASSIGN_OR_RETURN_UNWRAP(&port, instance, nullptr); + if (data) { + port->Detach(); + port->data_ = std::move(data); + port->data_->owner_ = port; + // If the existing MessagePortData object had pending messages, this is + // the easiest way to run that queue. + port->TriggerAsync(); + } + return port; +} + +void MessagePort::OnMessage() { + HandleScope handle_scope(env()->isolate()); + Local context = object()->CreationContext(); + + // data_ can only ever be modified by the owner thread, so no need to lock. + // However, the message port may be transferred while it is processing + // messages, so we need to check that this handle still owns its `data_` field + // on every iteration. + while (data_) { + Message received; + { + // Get the head of the message queue. + Mutex::ScopedLock lock(data_->mutex_); + if (!data_->receiving_messages_) + break; + if (data_->incoming_messages_.empty()) + break; + received = std::move(data_->incoming_messages_.front()); + data_->incoming_messages_.pop_front(); + } + + if (!env()->can_call_into_js()) { + // In this case there is nothing to do but to drain the current queue. + continue; + } + + { + // Call the JS .onmessage() callback. + HandleScope handle_scope(env()->isolate()); + Context::Scope context_scope(context); + Local args[] = { + received.Deserialize(env(), context).FromMaybe(Local()) + }; + + if (args[0].IsEmpty() || + !object()->Has(context, env()->onmessage_string()).FromMaybe(false) || + MakeCallback(env()->onmessage_string(), 1, args).IsEmpty()) { + // Re-schedule OnMessage() execution in case of failure. + if (data_) + TriggerAsync(); + return; + } + } + } + + if (data_ && data_->IsSiblingClosed()) { + Close(); + } +} + +bool MessagePort::IsSiblingClosed() const { + CHECK(data_); + return data_->IsSiblingClosed(); +} + +void MessagePort::OnClose() { + if (data_) { + data_->owner_ = nullptr; + data_->Disentangle(); + } + data_.reset(); + delete async(); +} + +std::unique_ptr MessagePort::Detach() { + Mutex::ScopedLock lock(data_->mutex_); + data_->owner_ = nullptr; + return std::move(data_); +} + + +void MessagePort::Send(Message&& message) { + Mutex::ScopedLock lock(*data_->sibling_mutex_); + if (data_->sibling_ == nullptr) + return; + data_->sibling_->AddToIncomingQueue(std::move(message)); +} + +void MessagePort::Send(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + Message msg; + if (msg.Serialize(env, object()->CreationContext(), args[0], args[1]) + .IsNothing()) { + return; + } + Send(std::move(msg)); +} + +void MessagePort::PostMessage(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + if (!port->data_) { + return THROW_ERR_CLOSED_MESSAGE_PORT(env); + } + if (args.Length() == 0) { + return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to " + "MessagePort.postMessage"); + } + port->Send(args); +} + +void MessagePort::Start() { + Mutex::ScopedLock lock(data_->mutex_); + data_->receiving_messages_ = true; + if (!data_->incoming_messages_.empty()) + TriggerAsync(); +} + +void MessagePort::Stop() { + Mutex::ScopedLock lock(data_->mutex_); + data_->receiving_messages_ = false; +} + +void MessagePort::Start(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + if (!port->data_) { + THROW_ERR_CLOSED_MESSAGE_PORT(env); + return; + } + port->Start(); +} + +void MessagePort::Stop(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + MessagePort* port; + ASSIGN_OR_RETURN_UNWRAP(&port, args.This()); + if (!port->data_) { + THROW_ERR_CLOSED_MESSAGE_PORT(env); + return; + } + port->Stop(); +} + +size_t MessagePort::self_size() const { + Mutex::ScopedLock lock(data_->mutex_); + size_t sz = sizeof(*this) + sizeof(*data_); + for (const Message& msg : data_->incoming_messages_) + sz += sizeof(msg) + msg.main_message_buf_.size; + return sz; +} + +void MessagePort::Entangle(MessagePort* a, MessagePort* b) { + Entangle(a, b->data_.get()); +} + +void MessagePort::Entangle(MessagePort* a, MessagePortData* b) { + MessagePortData::Entangle(a->data_.get(), b); +} + +MaybeLocal GetMessagePortConstructor( + Environment* env, Local context) { + // Factor generating the MessagePort JS constructor into its own piece + // of code, because it is needed early on in the child environment setup. + Local templ = env->message_port_constructor_template(); + if (!templ.IsEmpty()) + return templ->GetFunction(context); + + { + Local m = env->NewFunctionTemplate(MessagePort::New); + m->SetClassName(env->message_port_constructor_string()); + m->InstanceTemplate()->SetInternalFieldCount(1); + + AsyncWrap::AddWrapMethods(env, m); + + env->SetProtoMethod(m, "postMessage", MessagePort::PostMessage); + env->SetProtoMethod(m, "start", MessagePort::Start); + env->SetProtoMethod(m, "stop", MessagePort::Stop); + env->SetProtoMethod(m, "close", HandleWrap::Close); + env->SetProtoMethod(m, "unref", HandleWrap::Unref); + env->SetProtoMethod(m, "ref", HandleWrap::Ref); + env->SetProtoMethod(m, "hasRef", HandleWrap::HasRef); + + env->set_message_port_constructor_template(m); + } + + return GetMessagePortConstructor(env, context); +} + +namespace { + +static void MessageChannel(const FunctionCallbackInfo& args) { + Environment* env = Environment::GetCurrent(args); + if (!args.IsConstructCall()) { + THROW_ERR_CONSTRUCT_CALL_REQUIRED(env); + return; + } + + Local context = args.This()->CreationContext(); + Context::Scope context_scope(context); + + MessagePort* port1 = MessagePort::New(env, context); + MessagePort* port2 = MessagePort::New(env, context); + MessagePort::Entangle(port1, port2); + + args.This()->Set(env->context(), env->port1_string(), port1->object()) + .FromJust(); + args.This()->Set(env->context(), env->port2_string(), port2->object()) + .FromJust(); +} + +static void InitMessaging(Local target, + Local unused, + Local context, + void* priv) { + Environment* env = Environment::GetCurrent(context); + + { + Local message_channel_string = + FIXED_ONE_BYTE_STRING(env->isolate(), "MessageChannel"); + Local templ = env->NewFunctionTemplate(MessageChannel); + templ->SetClassName(message_channel_string); + target->Set(env->context(), + message_channel_string, + templ->GetFunction(context).ToLocalChecked()).FromJust(); + } + + target->Set(context, + env->message_port_constructor_string(), + GetMessagePortConstructor(env, context).ToLocalChecked()) + .FromJust(); +} + +} // anonymous namespace + +} // namespace worker +} // namespace node + +NODE_MODULE_CONTEXT_AWARE_INTERNAL(messaging, node::worker::InitMessaging) diff --git a/src/node_messaging.h b/src/node_messaging.h new file mode 100644 index 00000000000000..7bd60163ea167c --- /dev/null +++ b/src/node_messaging.h @@ -0,0 +1,167 @@ +#ifndef SRC_NODE_MESSAGING_H_ +#define SRC_NODE_MESSAGING_H_ + +#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + +#include "env.h" +#include "node_mutex.h" +#include +#include + +namespace node { +namespace worker { + +class MessagePortData; +class MessagePort; + +// Represents a single communication message. +class Message { + public: + explicit Message(MallocedBuffer&& payload = MallocedBuffer()); + + Message(Message&& other) = default; + Message& operator=(Message&& other) = default; + Message& operator=(const Message&) = delete; + Message(const Message&) = delete; + + // Deserialize the contained JS value. May only be called once, and only + // after Serialize() has been called (e.g. by another thread). + v8::MaybeLocal Deserialize(Environment* env, + v8::Local context); + + // Serialize a JS value, and optionally transfer objects, into this message. + // The Message object retains ownership of all transferred objects until + // deserialization. + v8::Maybe Serialize(Environment* env, + v8::Local context, + v8::Local input, + v8::Local transfer_list); + + private: + MallocedBuffer main_message_buf_; + std::vector> array_buffer_contents_; + + friend class MessagePort; +}; + +// This contains all data for a `MessagePort` instance that is not tied to +// a specific Environment/Isolate/event loop, for easier transfer between those. +class MessagePortData { + public: + explicit MessagePortData(MessagePort* owner); + ~MessagePortData(); + + MessagePortData(MessagePortData&& other) = delete; + MessagePortData& operator=(MessagePortData&& other) = delete; + MessagePortData(const MessagePortData& other) = delete; + MessagePortData& operator=(const MessagePortData& other) = delete; + + // Add a message to the incoming queue and notify the receiver. + // This may be called from any thread. + void AddToIncomingQueue(Message&& message); + + // Returns true if and only this MessagePort is currently not entangled + // with another message port. + bool IsSiblingClosed() const; + + // Turns `a` and `b` into siblings, i.e. connects the sending side of one + // to the receiving side of the other. This is not thread-safe. + static void Entangle(MessagePortData* a, MessagePortData* b); + + // Removes any possible sibling. This is thread-safe (it acquires both + // `sibling_mutex_` and `mutex_`), and has to be because it is called once + // the corresponding JS handle handle wants to close + // which can happen on either side of a worker. + void Disentangle(); + + private: + // After disentangling this message port, the owner handle (if any) + // is asynchronously triggered, so that it can close down naturally. + void PingOwnerAfterDisentanglement(); + + // This mutex protects all fields below it, with the exception of + // sibling_. + mutable Mutex mutex_; + bool receiving_messages_ = false; + std::list incoming_messages_; + MessagePort* owner_ = nullptr; + // This mutex protects the sibling_ field and is shared between two entangled + // MessagePorts. If both mutexes are acquired, this one needs to be + // acquired first. + std::shared_ptr sibling_mutex_ = std::make_shared(); + MessagePortData* sibling_ = nullptr; + + friend class MessagePort; +}; + +// A message port that receives messages from other threads, including +// the uv_async_t handle that is used to notify the current event loop of +// new incoming messages. +class MessagePort : public HandleWrap { + public: + // Create a new MessagePort. The `context` argument specifies the Context + // instance that is used for creating the values emitted from this port. + MessagePort(Environment* env, + v8::Local context, + v8::Local wrap); + ~MessagePort(); + + // Create a new message port instance, optionally over an existing + // `MessagePortData` object. + static MessagePort* New(Environment* env, + v8::Local context, + std::unique_ptr data = nullptr); + + // Send a message, i.e. deliver it into the sibling's incoming queue. + // If there is no sibling, i.e. this port is closed, + // this message is silently discarded. + void Send(Message&& message); + void Send(const v8::FunctionCallbackInfo& args); + // Deliver a single message into this port's incoming queue. + void AddToIncomingQueue(Message&& message); + + // Start processing messages on this port as a receiving end. + void Start(); + // Stop processing messages on this port as a receiving end. + void Stop(); + + static void New(const v8::FunctionCallbackInfo& args); + static void PostMessage(const v8::FunctionCallbackInfo& args); + static void Start(const v8::FunctionCallbackInfo& args); + static void Stop(const v8::FunctionCallbackInfo& args); + + // Turns `a` and `b` into siblings, i.e. connects the sending side of one + // to the receiving side of the other. This is not thread-safe. + static void Entangle(MessagePort* a, MessagePort* b); + static void Entangle(MessagePort* a, MessagePortData* b); + + // Detach this port's data for transferring. After this, the MessagePortData + // is no longer associated with this handle, although it can still receive + // messages. + std::unique_ptr Detach(); + + bool IsSiblingClosed() const; + + size_t self_size() const override; + + private: + void OnClose() override; + void OnMessage(); + void TriggerAsync(); + inline uv_async_t* async(); + + std::unique_ptr data_ = nullptr; + + friend class MessagePortData; +}; + +v8::MaybeLocal GetMessagePortConstructor( + Environment* env, v8::Local context); + +} // namespace worker +} // namespace node + +#endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS + + +#endif // SRC_NODE_MESSAGING_H_ diff --git a/src/util.h b/src/util.h index e272286d3e4b96..fade27458f3e16 100644 --- a/src/util.h +++ b/src/util.h @@ -436,8 +436,11 @@ struct MallocedBuffer { return ret; } + inline bool is_empty() const { return data == nullptr; } + MallocedBuffer() : data(nullptr) {} explicit MallocedBuffer(size_t size) : data(Malloc(size)), size(size) {} + MallocedBuffer(char* data, size_t size) : data(data), size(size) {} MallocedBuffer(MallocedBuffer&& other) : data(other.data), size(other.size) { other.data = nullptr; } diff --git a/test/parallel/test-message-channel.js b/test/parallel/test-message-channel.js new file mode 100644 index 00000000000000..0facaa1d835ea8 --- /dev/null +++ b/test/parallel/test-message-channel.js @@ -0,0 +1,26 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); +const { MessageChannel } = require('worker'); + +{ + const channel = new MessageChannel(); + + channel.port1.on('message', common.mustCall(({ typedArray }) => { + assert.deepStrictEqual(typedArray, new Uint8Array([0, 1, 2, 3, 4])); + })); + + const typedArray = new Uint8Array([0, 1, 2, 3, 4]); + channel.port2.postMessage({ typedArray }, [ typedArray.buffer ]); + assert.strictEqual(typedArray.buffer.byteLength, 0); + channel.port2.close(); +} + +{ + const channel = new MessageChannel(); + + channel.port1.on('close', common.mustCall()); + channel.port2.on('close', common.mustCall()); + channel.port2.close(); +} diff --git a/test/parallel/test-message-port-arraybuffer.js b/test/parallel/test-message-port-arraybuffer.js new file mode 100644 index 00000000000000..4abeb585b4fb15 --- /dev/null +++ b/test/parallel/test-message-port-arraybuffer.js @@ -0,0 +1,20 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { MessageChannel } = require('worker'); + +{ + const { port1, port2 } = new MessageChannel(); + + const arrayBuffer = new ArrayBuffer(40); + const typedArray = new Uint32Array(arrayBuffer); + typedArray[0] = 0x12345678; + + port1.postMessage(typedArray, [ arrayBuffer ]); + port2.on('message', common.mustCall((received) => { + assert.strictEqual(received[0], 0x12345678); + port2.close(common.mustCall()); + })); +} diff --git a/test/parallel/test-message-port.js b/test/parallel/test-message-port.js new file mode 100644 index 00000000000000..8a7f3805200fa3 --- /dev/null +++ b/test/parallel/test-message-port.js @@ -0,0 +1,56 @@ +// Flags: --experimental-worker +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const { MessageChannel, MessagePort } = require('worker'); + +{ + const { port1, port2 } = new MessageChannel(); + assert(port1 instanceof MessagePort); + assert(port2 instanceof MessagePort); + + const input = { a: 1 }; + port1.postMessage(input); + port2.on('message', common.mustCall((received) => { + assert.deepStrictEqual(received, input); + port2.close(common.mustCall()); + })); +} + +{ + const { port1, port2 } = new MessageChannel(); + + const input = { a: 1 }; + port1.postMessage(input); + // Check that the message still gets delivered if `port2` has its + // `on('message')` handler attached at a later point in time. + setImmediate(() => { + port2.on('message', common.mustCall((received) => { + assert.deepStrictEqual(received, input); + port2.close(common.mustCall()); + })); + }); +} + +{ + const { port1, port2 } = new MessageChannel(); + + const input = { a: 1 }; + + const dummy = common.mustNotCall(); + // Check that the message still gets delivered if `port2` has its + // `on('message')` handler attached at a later point in time, even if a + // listener was removed previously. + port2.addListener('message', dummy); + setImmediate(() => { + port2.removeListener('message', dummy); + port1.postMessage(input); + setImmediate(() => { + port2.on('message', common.mustCall((received) => { + assert.deepStrictEqual(received, input); + port2.close(common.mustCall()); + })); + }); + }); +} diff --git a/test/sequential/test-async-wrap-getasyncid.js b/test/sequential/test-async-wrap-getasyncid.js index 971296915ceecb..84a3e3b1f4dc05 100644 --- a/test/sequential/test-async-wrap-getasyncid.js +++ b/test/sequential/test-async-wrap-getasyncid.js @@ -35,7 +35,9 @@ common.crashOnUnhandledRejection(); delete providers.HTTP2STREAM; delete providers.HTTP2PING; delete providers.HTTP2SETTINGS; + // TODO(addaleax): Test for these delete providers.STREAMPIPE; + delete providers.MESSAGEPORT; const objKeys = Object.keys(providers); if (objKeys.length > 0) diff --git a/tools/doc/type-parser.js b/tools/doc/type-parser.js index e7c7aa69da5e95..be72893832373a 100644 --- a/tools/doc/type-parser.js +++ b/tools/doc/type-parser.js @@ -117,7 +117,9 @@ const customTypesMap = { 'Tracing': 'tracing.html#tracing_tracing_object', 'URL': 'url.html#url_the_whatwg_url_api', - 'URLSearchParams': 'url.html#url_class_urlsearchparams' + 'URLSearchParams': 'url.html#url_class_urlsearchparams', + + 'MessagePort': 'worker.html#worker_class_messageport' }; const arrayPart = /(?:\[])+$/;