diff --git a/lib/internal/per_context/messageport.js b/lib/internal/per_context/messageport.js index 43587319b040de..e5df08e6b22368 100644 --- a/lib/internal/per_context/messageport.js +++ b/lib/internal/per_context/messageport.js @@ -4,22 +4,30 @@ const { } = primordials; class MessageEvent { - constructor(data, target, type) { + constructor(data, target, type, ports) { this.data = data; this.target = target; this.type = type; + this.ports = ports ?? []; } } const kHybridDispatch = SymbolFor('nodejs.internal.kHybridDispatch'); +const kCurrentlyReceivingPorts = + SymbolFor('nodejs.internal.kCurrentlyReceivingPorts'); -exports.emitMessage = function(data, type) { +exports.emitMessage = function(data, ports, type) { if (typeof this[kHybridDispatch] === 'function') { - this[kHybridDispatch](data, type, undefined); + this[kCurrentlyReceivingPorts] = ports; + try { + this[kHybridDispatch](data, type, undefined); + } finally { + this[kCurrentlyReceivingPorts] = undefined; + } return; } - const event = new MessageEvent(data, this, type); + const event = new MessageEvent(data, this, type, ports); if (type === 'message') { if (typeof this.onmessage === 'function') this.onmessage(event); diff --git a/lib/internal/worker/io.js b/lib/internal/worker/io.js index 7a9eb2bdbe1509..fdfbe10b5fefcf 100644 --- a/lib/internal/worker/io.js +++ b/lib/internal/worker/io.js @@ -15,6 +15,7 @@ const { ObjectSetPrototypeOf, ReflectApply, Symbol, + SymbolFor, } = primordials; const { @@ -70,6 +71,8 @@ const kWritableCallbacks = Symbol('kWritableCallbacks'); const kSource = Symbol('kSource'); const kStartedReading = Symbol('kStartedReading'); const kStdioWantsMoreDataCallback = Symbol('kStdioWantsMoreDataCallback'); +const kCurrentlyReceivingPorts = + SymbolFor('nodejs.internal.kCurrentlyReceivingPorts'); const messageTypes = { UP_AND_RUNNING: 'upAndRunning', @@ -150,7 +153,9 @@ ObjectDefineProperty( if (type !== 'message' && type !== 'messageerror') { return ReflectApply(originalCreateEvent, this, arguments); } - return new MessageEvent(type, { data }); + const ports = this[kCurrentlyReceivingPorts]; + this[kCurrentlyReceivingPorts] = undefined; + return new MessageEvent(type, { data, ports }); }, configurable: false, writable: false, @@ -161,6 +166,7 @@ ObjectDefineProperty( function oninit() { initNodeEventTarget(this); setupPortReferencing(this, this, 'message'); + this[kCurrentlyReceivingPorts] = undefined; } defineEventHandler(MessagePort.prototype, 'message'); diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 2699bd2792e544..595f793a6241d4 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -126,11 +126,18 @@ class DeserializerDelegate : public ValueDeserializer::Delegate { } // anonymous namespace MaybeLocal Message::Deserialize(Environment* env, - Local context) { + Local context, + Local* port_list) { + Context::Scope context_scope(context); + CHECK(!IsCloseMessage()); + if (port_list != nullptr && !transferables_.empty()) { + // Need to create this outside of the EscapableHandleScope, but inside + // the Context::Scope. + *port_list = Array::New(env->isolate()); + } EscapableHandleScope handle_scope(env->isolate()); - Context::Scope context_scope(context); // Create all necessary objects for transferables, e.g. MessagePort handles. std::vector> host_objects(transferables_.size()); @@ -146,10 +153,27 @@ MaybeLocal Message::Deserialize(Environment* env, }); for (uint32_t i = 0; i < transferables_.size(); ++i) { + HandleScope handle_scope(env->isolate()); TransferData* data = transferables_[i].get(); host_objects[i] = data->Deserialize( env, context, std::move(transferables_[i])); if (!host_objects[i]) return {}; + if (port_list != nullptr) { + // If we gather a list of all message ports, and this transferred object + // is a message port, add it to that list. This is a bit of an odd case + // of special handling for MessagePorts (as opposed to applying to all + // transferables), but it's required for spec compliancy. + DCHECK((*port_list)->IsArray()); + Local port_list_array = port_list->As(); + Local obj = host_objects[i]->object(); + if (env->message_port_constructor_template()->HasInstance(obj)) { + if (port_list_array->Set(context, + port_list_array->Length(), + obj).IsNothing()) { + return {}; + } + } + } } transferables_.clear(); @@ -664,7 +688,8 @@ MessagePort* MessagePort::New( } MaybeLocal MessagePort::ReceiveMessage(Local context, - MessageProcessingMode mode) { + MessageProcessingMode mode, + Local* port_list) { std::shared_ptr received; { // Get the head of the message queue. @@ -696,7 +721,7 @@ MaybeLocal MessagePort::ReceiveMessage(Local context, if (!env()->can_call_into_js()) return MaybeLocal(); - return received->Deserialize(env(), context); + return received->Deserialize(env(), context, port_list); } void MessagePort::OnMessage(MessageProcessingMode mode) { @@ -735,14 +760,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) { Local emit_message = PersistentToLocal::Strong(emit_message_fn_); Local payload; + Local port_list = Undefined(env()->isolate()); Local message_error; - Local argv[2]; + Local argv[3]; { // Catch any exceptions from parsing the message itself (not from // emitting it) as 'messageeror' events. TryCatchScope try_catch(env()); - if (!ReceiveMessage(context, mode).ToLocal(&payload)) { + if (!ReceiveMessage(context, mode, &port_list).ToLocal(&payload)) { if (try_catch.HasCaught() && !try_catch.HasTerminated()) message_error = try_catch.Exception(); goto reschedule; @@ -757,13 +783,15 @@ void MessagePort::OnMessage(MessageProcessingMode mode) { } argv[0] = payload; - argv[1] = env()->message_string(); + argv[1] = port_list; + argv[2] = env()->message_string(); if (MakeCallback(emit_message, arraysize(argv), argv).IsEmpty()) { reschedule: if (!message_error.IsEmpty()) { argv[0] = message_error; - argv[1] = env()->messageerror_string(); + argv[1] = Undefined(env()->isolate()); + argv[2] = env()->messageerror_string(); USE(MakeCallback(emit_message, arraysize(argv), argv)); } diff --git a/src/node_messaging.h b/src/node_messaging.h index 2e63b22e4ceced..429a5646b8ada4 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -62,8 +62,10 @@ class Message : public MemoryRetainer { // Deserialize the contained JS value. May only be called once, and only // after Serialize() has been called (e.g. by another thread). - v8::MaybeLocal Deserialize(Environment* env, - v8::Local context); + v8::MaybeLocal Deserialize( + Environment* env, + v8::Local context, + v8::Local* port_list = nullptr); // Serialize a JS value, and optionally transfer objects, into this message. // The Message object retains ownership of all transferred objects until @@ -293,8 +295,10 @@ class MessagePort : public HandleWrap { void OnClose() override; void OnMessage(MessageProcessingMode mode); void TriggerAsync(); - v8::MaybeLocal ReceiveMessage(v8::Local context, - MessageProcessingMode mode); + v8::MaybeLocal ReceiveMessage( + v8::Local context, + MessageProcessingMode mode, + v8::Local* port_list = nullptr); std::unique_ptr data_ = nullptr; bool receiving_messages_ = false; diff --git a/test/parallel/test-worker-message-port-move.js b/test/parallel/test-worker-message-port-move.js index 7e5d0243fa8b96..44efd2e6a6b94f 100644 --- a/test/parallel/test-worker-message-port-move.js +++ b/test/parallel/test-worker-message-port-move.js @@ -34,9 +34,13 @@ vm.runInContext('(' + function() { assert(!(port instanceof MessagePort)); assert.strictEqual(port.onmessage, undefined); - port.onmessage = function({ data }) { + port.onmessage = function({ data, ports }) { assert(data instanceof Object); - port.postMessage(data); + assert(ports instanceof Array); + assert.strictEqual(ports.length, 1); + assert.strictEqual(ports[0], data.p); + assert(!(data.p instanceof MessagePort)); + port.postMessage({}); }; port.start(); } @@ -55,8 +59,10 @@ vm.runInContext('(' + function() { } } + ')()', context); +const otherChannel = new MessageChannel(); port2.on('message', common.mustCall((msg) => { assert(msg instanceof Object); port2.close(); + otherChannel.port2.close(); })); -port2.postMessage({}); +port2.postMessage({ p: otherChannel.port1 }, [ otherChannel.port1 ]); diff --git a/test/parallel/test-worker-message-port.js b/test/parallel/test-worker-message-port.js index 51618e4fab1850..b5810e726d856c 100644 --- a/test/parallel/test-worker-message-port.js +++ b/test/parallel/test-worker-message-port.js @@ -34,6 +34,7 @@ const { MessageChannel, MessagePort } = require('worker_threads'); port1.onmessage = common.mustCall((message) => { assert.strictEqual(message.data, 4); assert.strictEqual(message.target, port1); + assert.deepStrictEqual(message.ports, []); port2.close(common.mustCall()); }); @@ -161,6 +162,19 @@ const { MessageChannel, MessagePort } = require('worker_threads'); port1.close(); } +{ + // Test MessageEvent#ports + const c1 = new MessageChannel(); + const c2 = new MessageChannel(); + c1.port1.postMessage({ port: c2.port2 }, [ c2.port2 ]); + c1.port2.addEventListener('message', common.mustCall((ev) => { + assert.strictEqual(ev.ports.length, 1); + assert.strictEqual(ev.ports[0].constructor, MessagePort); + c1.port1.close(); + c2.port1.close(); + })); +} + { assert.deepStrictEqual( Object.getOwnPropertyNames(MessagePort.prototype).sort(),