From d2b05688901c39342aaa419fd2cb277c1c018671 Mon Sep 17 00:00:00 2001 From: Anna Henningsen Date: Sun, 25 Aug 2019 21:48:58 +0200 Subject: [PATCH] worker: make transfer list behave like web MessagePort Allow generic iterables as transfer list arguments, as well as an options object with a `transfer` option, for web compatibility. PR-URL: https://github.com/nodejs/node/pull/29319 Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991 Reviewed-By: James M Snell --- src/env.h | 3 + src/node_messaging.cc | 217 ++++++++++++------ src/node_messaging.h | 6 +- ...er-message-port-terminate-transfer-list.js | 26 +++ test/parallel/test-worker-message-port.js | 63 ++++- 5 files changed, 238 insertions(+), 77 deletions(-) create mode 100644 test/parallel/test-worker-message-port-terminate-transfer-list.js diff --git a/src/env.h b/src/env.h index 9a0f98e121b380..c3f1c9aea6e985 100644 --- a/src/env.h +++ b/src/env.h @@ -211,6 +211,7 @@ constexpr size_t kFsStatsBufferLength = V(dns_soa_string, "SOA") \ V(dns_srv_string, "SRV") \ V(dns_txt_string, "TXT") \ + V(done_string, "done") \ V(duration_string, "duration") \ V(emit_warning_string, "emitWarning") \ V(encoding_string, "encoding") \ @@ -272,6 +273,7 @@ constexpr size_t kFsStatsBufferLength = V(modulus_string, "modulus") \ V(name_string, "name") \ V(netmask_string, "netmask") \ + V(next_string, "next") \ V(nistcurve_string, "nistCurve") \ V(nsname_string, "nsname") \ V(ocsp_request_string, "OCSPRequest") \ @@ -353,6 +355,7 @@ constexpr size_t kFsStatsBufferLength = V(ticketkeycallback_string, "onticketkeycallback") \ V(timeout_string, "timeout") \ V(tls_ticket_string, "tlsTicket") \ + V(transfer_string, "transfer") \ V(ttl_string, "ttl") \ V(type_string, "type") \ V(uid_string, "uid") \ diff --git a/src/node_messaging.cc b/src/node_messaging.cc index 46f06b747e312f..5aec784f60cfb3 100644 --- a/src/node_messaging.cc +++ b/src/node_messaging.cc @@ -31,6 +31,7 @@ using v8::Object; using v8::ObjectTemplate; using v8::SharedArrayBuffer; using v8::String; +using v8::Symbol; using v8::Value; using v8::ValueDeserializer; using v8::ValueSerializer; @@ -304,7 +305,7 @@ class SerializerDelegate : public ValueSerializer::Delegate { Maybe Message::Serialize(Environment* env, Local context, Local input, - Local transfer_list_v, + const TransferList& transfer_list_v, Local source_port) { HandleScope handle_scope(env->isolate()); Context::Scope context_scope(context); @@ -317,72 +318,66 @@ Maybe Message::Serialize(Environment* env, delegate.serializer = &serializer; std::vector> array_buffers; - if (transfer_list_v->IsArray()) { - Local transfer_list = transfer_list_v.As(); - uint32_t length = transfer_list->Length(); - for (uint32_t i = 0; i < length; ++i) { - Local entry; - if (!transfer_list->Get(context, i).ToLocal(&entry)) - return Nothing(); - // Currently, we support ArrayBuffers and MessagePorts. - if (entry->IsArrayBuffer()) { - Local ab = entry.As(); - // If we cannot render the ArrayBuffer unusable in this Isolate and - // take ownership of its memory, copying the buffer will have to do. - if (!ab->IsDetachable() || ab->IsExternal() || - !env->isolate_data()->uses_node_allocator()) { - continue; - } - if (std::find(array_buffers.begin(), array_buffers.end(), ab) != - array_buffers.end()) { - ThrowDataCloneException( - context, - FIXED_ONE_BYTE_STRING( - env->isolate(), - "Transfer list contains duplicate ArrayBuffer")); - return Nothing(); - } - // We simply use the array index in the `array_buffers` list as the - // ID that we write into the serialized buffer. - uint32_t id = array_buffers.size(); - array_buffers.push_back(ab); - serializer.TransferArrayBuffer(id, ab); - continue; - } else if (env->message_port_constructor_template() - ->HasInstance(entry)) { - // Check if the source MessagePort is being transferred. - if (!source_port.IsEmpty() && entry == source_port) { - ThrowDataCloneException( - context, - FIXED_ONE_BYTE_STRING(env->isolate(), - "Transfer list contains source port")); - return Nothing(); - } - MessagePort* port = Unwrap(entry.As()); - if (port == nullptr || port->IsDetached()) { - ThrowDataCloneException( - context, - FIXED_ONE_BYTE_STRING( - env->isolate(), - "MessagePort in transfer list is already detached")); - return Nothing(); - } - if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) != - delegate.ports_.end()) { - ThrowDataCloneException( - context, - FIXED_ONE_BYTE_STRING( - env->isolate(), - "Transfer list contains duplicate MessagePort")); - return Nothing(); - } - delegate.ports_.push_back(port); + for (uint32_t i = 0; i < transfer_list_v.length(); ++i) { + Local entry = transfer_list_v[i]; + // Currently, we support ArrayBuffers and MessagePorts. + if (entry->IsArrayBuffer()) { + Local ab = entry.As(); + // If we cannot render the ArrayBuffer unusable in this Isolate and + // take ownership of its memory, copying the buffer will have to do. + if (!ab->IsDetachable() || ab->IsExternal() || + !env->isolate_data()->uses_node_allocator()) { continue; } - - THROW_ERR_INVALID_TRANSFER_OBJECT(env); - return Nothing(); + if (std::find(array_buffers.begin(), array_buffers.end(), ab) != + array_buffers.end()) { + ThrowDataCloneException( + context, + FIXED_ONE_BYTE_STRING( + env->isolate(), + "Transfer list contains duplicate ArrayBuffer")); + return Nothing(); + } + // We simply use the array index in the `array_buffers` list as the + // ID that we write into the serialized buffer. + uint32_t id = array_buffers.size(); + array_buffers.push_back(ab); + serializer.TransferArrayBuffer(id, ab); + continue; + } else if (env->message_port_constructor_template() + ->HasInstance(entry)) { + // Check if the source MessagePort is being transferred. + if (!source_port.IsEmpty() && entry == source_port) { + ThrowDataCloneException( + context, + FIXED_ONE_BYTE_STRING(env->isolate(), + "Transfer list contains source port")); + return Nothing(); + } + MessagePort* port = Unwrap(entry.As()); + if (port == nullptr || port->IsDetached()) { + ThrowDataCloneException( + context, + FIXED_ONE_BYTE_STRING( + env->isolate(), + "MessagePort in transfer list is already detached")); + return Nothing(); + } + if (std::find(delegate.ports_.begin(), delegate.ports_.end(), port) != + delegate.ports_.end()) { + ThrowDataCloneException( + context, + FIXED_ONE_BYTE_STRING( + env->isolate(), + "Transfer list contains duplicate MessagePort")); + return Nothing(); + } + delegate.ports_.push_back(port); + continue; } + + THROW_ERR_INVALID_TRANSFER_OBJECT(env); + return Nothing(); } serializer.WriteHeader(); @@ -664,7 +659,7 @@ std::unique_ptr MessagePort::Detach() { Maybe MessagePort::PostMessage(Environment* env, Local message_v, - Local transfer_v) { + const TransferList& transfer_v) { Isolate* isolate = env->isolate(); Local obj = object(isolate); Local context = obj->CreationContext(); @@ -705,20 +700,98 @@ Maybe MessagePort::PostMessage(Environment* env, return Just(true); } +static Maybe ReadIterable(Environment* env, + Local context, + // NOLINTNEXTLINE(runtime/references) + TransferList& transfer_list, + Local object) { + if (!object->IsObject()) return Just(false); + + if (object->IsArray()) { + Local arr = object.As(); + size_t length = arr->Length(); + transfer_list.AllocateSufficientStorage(length); + for (size_t i = 0; i < length; i++) { + if (!arr->Get(context, i).ToLocal(&transfer_list[i])) + return Nothing(); + } + return Just(true); + } + + Isolate* isolate = env->isolate(); + Local iterator_method; + if (!object.As()->Get(context, Symbol::GetIterator(isolate)) + .ToLocal(&iterator_method)) return Nothing(); + if (!iterator_method->IsFunction()) return Just(false); + + Local iterator; + if (!iterator_method.As()->Call(context, object, 0, nullptr) + .ToLocal(&iterator)) return Nothing(); + if (!iterator->IsObject()) return Just(false); + + Local next; + if (!iterator.As()->Get(context, env->next_string()).ToLocal(&next)) + return Nothing(); + if (!next->IsFunction()) return Just(false); + + std::vector> entries; + while (env->can_call_into_js()) { + Local result; + if (!next.As()->Call(context, iterator, 0, nullptr) + .ToLocal(&result)) return Nothing(); + if (!result->IsObject()) return Just(false); + + Local done; + if (!result.As()->Get(context, env->done_string()).ToLocal(&done)) + return Nothing(); + if (done->BooleanValue(isolate)) break; + + Local val; + if (!result.As()->Get(context, env->value_string()).ToLocal(&val)) + return Nothing(); + entries.push_back(val); + } + + transfer_list.AllocateSufficientStorage(entries.size()); + std::copy(entries.begin(), entries.end(), &transfer_list[0]); + return Just(true); +} + void MessagePort::PostMessage(const FunctionCallbackInfo& args) { Environment* env = Environment::GetCurrent(args); + Local obj = args.This(); + Local context = obj->CreationContext(); + if (args.Length() == 0) { return THROW_ERR_MISSING_ARGS(env, "Not enough arguments to " "MessagePort.postMessage"); } + if (!args[1]->IsNullOrUndefined() && !args[1]->IsObject()) { // Browsers ignore null or undefined, and otherwise accept an array or an // options object. - // TODO(addaleax): Add support for an options object and generic sequence - // support. - // Refs: https://github.com/nodejs/node/pull/28033#discussion_r289964991 return THROW_ERR_INVALID_ARG_TYPE(env, - "Optional transferList argument must be an array"); + "Optional transferList argument must be an iterable"); + } + + TransferList transfer_list; + if (args[1]->IsObject()) { + bool was_iterable; + if (!ReadIterable(env, context, transfer_list, args[1]).To(&was_iterable)) + return; + if (!was_iterable) { + Local transfer_option; + if (!args[1].As()->Get(context, env->transfer_string()) + .ToLocal(&transfer_option)) return; + if (!transfer_option->IsUndefined()) { + if (!ReadIterable(env, context, transfer_list, transfer_option) + .To(&was_iterable)) return; + if (!was_iterable) { + return THROW_ERR_INVALID_ARG_TYPE(env, + "Optional options.transfer argument must be an iterable"); + } + } + } } MessagePort* port = Unwrap(args.This()); @@ -727,13 +800,11 @@ void MessagePort::PostMessage(const FunctionCallbackInfo& args) { // transfers. if (port == nullptr) { Message msg; - Local obj = args.This(); - Local context = obj->CreationContext(); - USE(msg.Serialize(env, context, args[0], args[1], obj)); + USE(msg.Serialize(env, context, args[0], transfer_list, obj)); return; } - port->PostMessage(env, args[0], args[1]); + port->PostMessage(env, args[0], transfer_list); } void MessagePort::Start() { diff --git a/src/node_messaging.h b/src/node_messaging.h index d9f25a95d76fc9..054521b0563c42 100644 --- a/src/node_messaging.h +++ b/src/node_messaging.h @@ -14,6 +14,8 @@ namespace worker { class MessagePortData; class MessagePort; +typedef MaybeStackBuffer, 8> TransferList; + // Represents a single communication message. class Message : public MemoryRetainer { public: @@ -44,7 +46,7 @@ class Message : public MemoryRetainer { v8::Maybe Serialize(Environment* env, v8::Local context, v8::Local input, - v8::Local transfer_list, + const TransferList& transfer_list, v8::Local source_port = v8::Local()); @@ -149,7 +151,7 @@ class MessagePort : public HandleWrap { // serialized with transfers, then silently discarded. v8::Maybe PostMessage(Environment* env, v8::Local message, - v8::Local transfer); + const TransferList& transfer); // Start processing messages on this port as a receiving end. void Start(); diff --git a/test/parallel/test-worker-message-port-terminate-transfer-list.js b/test/parallel/test-worker-message-port-terminate-transfer-list.js new file mode 100644 index 00000000000000..a066405d9d14de --- /dev/null +++ b/test/parallel/test-worker-message-port-terminate-transfer-list.js @@ -0,0 +1,26 @@ +'use strict'; +const common = require('../common'); + +const { parentPort, MessageChannel, Worker } = require('worker_threads'); + +// Do not use isMainThread so that this test itself can be run inside a Worker. +if (!process.env.HAS_STARTED_WORKER) { + process.env.HAS_STARTED_WORKER = 1; + const w = new Worker(__filename); + w.once('message', common.mustCall(() => { + w.once('message', common.mustNotCall()); + setTimeout(() => w.terminate(), 100); + })); +} else { + const { port1 } = new MessageChannel(); + + parentPort.postMessage('ready'); + + // Make sure we don’t end up running JS after the infinite loop is broken. + port1.postMessage({}, { + transfer: (function*() { while (true); })() + }); + + parentPort.postMessage('UNREACHABLE'); + process.kill(process.pid, 'SIGINT'); +} diff --git a/test/parallel/test-worker-message-port.js b/test/parallel/test-worker-message-port.js index d1c58216fd9d00..d128dc7edb25fd 100644 --- a/test/parallel/test-worker-message-port.js +++ b/test/parallel/test-worker-message-port.js @@ -72,22 +72,81 @@ const { MessageChannel, MessagePort } = require('worker_threads'); { const { port1, port2 } = new MessageChannel(); - port2.on('message', common.mustCall(4)); + port2.on('message', common.mustCall(6)); port1.postMessage(1, null); port1.postMessage(2, undefined); port1.postMessage(3, []); port1.postMessage(4, {}); + port1.postMessage(5, { transfer: undefined }); + port1.postMessage(6, { transfer: [] }); const err = { constructor: TypeError, code: 'ERR_INVALID_ARG_TYPE', - message: 'Optional transferList argument must be an array' + message: 'Optional transferList argument must be an iterable' }; assert.throws(() => port1.postMessage(5, 0), err); assert.throws(() => port1.postMessage(5, false), err); assert.throws(() => port1.postMessage(5, 'X'), err); assert.throws(() => port1.postMessage(5, Symbol('X')), err); + + const err2 = { + constructor: TypeError, + code: 'ERR_INVALID_ARG_TYPE', + message: 'Optional options.transfer argument must be an iterable' + }; + + assert.throws(() => port1.postMessage(5, { transfer: null }), err2); + assert.throws(() => port1.postMessage(5, { transfer: 0 }), err2); + assert.throws(() => port1.postMessage(5, { transfer: false }), err2); + assert.throws(() => port1.postMessage(5, { transfer: {} }), err2); + assert.throws(() => port1.postMessage(5, { + transfer: { [Symbol.iterator]() { return {}; } } + }), err2); + assert.throws(() => port1.postMessage(5, { + transfer: { [Symbol.iterator]() { return { next: 42 }; } } + }), err2); + assert.throws(() => port1.postMessage(5, { + transfer: { [Symbol.iterator]() { return { next: null }; } } + }), err2); + port1.close(); +} + +{ + // Make sure these ArrayBuffers end up detached, i.e. are actually being + // transferred because the transfer list provides them. + const { port1, port2 } = new MessageChannel(); + port2.on('message', common.mustCall((msg) => { + assert.strictEqual(msg.ab.byteLength, 10); + }, 4)); + + { + const ab = new ArrayBuffer(10); + port1.postMessage({ ab }, [ ab ]); + assert.strictEqual(ab.byteLength, 0); + } + + { + const ab = new ArrayBuffer(10); + port1.postMessage({ ab }, { transfer: [ ab ] }); + assert.strictEqual(ab.byteLength, 0); + } + + { + const ab = new ArrayBuffer(10); + port1.postMessage({ ab }, (function*() { yield ab; })()); + assert.strictEqual(ab.byteLength, 0); + } + + { + const ab = new ArrayBuffer(10); + port1.postMessage({ ab }, { + transfer: (function*() { yield ab; })() + }); + assert.strictEqual(ab.byteLength, 0); + } + port1.close(); }