Skip to content

Commit

Permalink
worker: fix interaction of terminate() with messaging port
Browse files Browse the repository at this point in the history
When a Worker is terminated, its own handle and the public
`MessagePort` are `.ref()`’ed, so that all relevant events,
including the `'exit'` events, end up being received.

However, this is problematic if messages end up being queued
from the Worker between the beginning of the `.terminate()` call
and its completion, and there are no `'message'` event handlers
present at that time. In that situation, currently the messages
would not end up being processed, and since the MessagePort
is still `.ref()`’ed, it would keep the event loop alive
indefinitely.

To fix this:

- Make sure that all messages end up being received by
  `drainMessagePort()`, including cases in which the port had
  been stopped (i.e. there are no `'message'` listeners) and
  cases in which we exceed the limit for messages being processed
  in one batch.
- Unref the Worker’s internal ports manually after the Worker
  has exited.

Either of these solutions should be solving this on its own,
but I think it makes sense to make sure that both of them
happen during cleanup.

PR-URL: #37319
Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com>
Reviewed-By: Colin Ihrig <cjihrig@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Reviewed-By: Rich Trott <rtrott@gmail.com>
  • Loading branch information
addaleax authored and targos committed Feb 28, 2021
1 parent ddae112 commit 31f4600
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 10 deletions.
4 changes: 4 additions & 0 deletions lib/internal/worker.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ class Worker extends EventEmitter {
debug(`[${threadId}] hears end event for Worker ${this.threadId}`);
drainMessagePort(this[kPublicPort]);
drainMessagePort(this[kPort]);
this.removeAllListeners('message');
this.removeAllListeners('messageerrors');
this[kPublicPort].unref();
this[kPort].unref();
this[kDispose]();
if (customErr) {
debug(`[${threadId}] failing with custom error ${customErr} \
Expand Down
21 changes: 13 additions & 8 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -565,7 +565,7 @@ MessagePort::MessagePort(Environment* env,
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
channel->OnMessage();
channel->OnMessage(MessageProcessingMode::kNormalOperation);
};

CHECK_EQ(uv_async_init(env->event_loop(),
Expand Down Expand Up @@ -664,15 +664,17 @@ MessagePort* MessagePort::New(
}

MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
bool only_if_receiving) {
MessageProcessingMode mode) {
std::shared_ptr<Message> received;
{
// Get the head of the message queue.
Mutex::ScopedLock lock(data_->mutex_);

Debug(this, "MessagePort has message");

bool wants_message = receiving_messages_ || !only_if_receiving;
bool wants_message =
receiving_messages_ ||
mode == MessageProcessingMode::kForceReadMessages;
// We have nothing to do if:
// - There are no pending messages
// - We are not intending to receive messages, and the message we would
Expand All @@ -697,16 +699,18 @@ MaybeLocal<Value> MessagePort::ReceiveMessage(Local<Context> context,
return received->Deserialize(env(), context);
}

void MessagePort::OnMessage() {
void MessagePort::OnMessage(MessageProcessingMode mode) {
Debug(this, "Running MessagePort::OnMessage()");
HandleScope handle_scope(env()->isolate());
Local<Context> context = object(env()->isolate())->CreationContext();

size_t processing_limit;
{
if (mode == MessageProcessingMode::kNormalOperation) {
Mutex::ScopedLock(data_->mutex_);
processing_limit = std::max(data_->incoming_messages_.size(),
static_cast<size_t>(1000));
} else {
processing_limit = std::numeric_limits<size_t>::max();
}

// data_ can only ever be modified by the owner thread, so no need to lock.
Expand Down Expand Up @@ -738,7 +742,7 @@ void MessagePort::OnMessage() {
// Catch any exceptions from parsing the message itself (not from
// emitting it) as 'messageeror' events.
TryCatchScope try_catch(env());
if (!ReceiveMessage(context, true).ToLocal(&payload)) {
if (!ReceiveMessage(context, mode).ToLocal(&payload)) {
if (try_catch.HasCaught() && !try_catch.HasTerminated())
message_error = try_catch.Exception();
goto reschedule;
Expand Down Expand Up @@ -999,7 +1003,7 @@ void MessagePort::CheckType(const FunctionCallbackInfo<Value>& args) {
void MessagePort::Drain(const FunctionCallbackInfo<Value>& args) {
MessagePort* port;
ASSIGN_OR_RETURN_UNWRAP(&port, args[0].As<Object>());
port->OnMessage();
port->OnMessage(MessageProcessingMode::kForceReadMessages);
}

void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
Expand All @@ -1018,7 +1022,8 @@ void MessagePort::ReceiveMessage(const FunctionCallbackInfo<Value>& args) {
}

MaybeLocal<Value> payload =
port->ReceiveMessage(port->object()->CreationContext(), false);
port->ReceiveMessage(port->object()->CreationContext(),
MessageProcessingMode::kForceReadMessages);
if (!payload.IsEmpty())
args.GetReturnValue().Set(payload.ToLocalChecked());
}
Expand Down
9 changes: 7 additions & 2 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,16 @@ class MessagePort : public HandleWrap {
SET_SELF_SIZE(MessagePort)

private:
enum class MessageProcessingMode {
kNormalOperation,
kForceReadMessages
};

void OnClose() override;
void OnMessage();
void OnMessage(MessageProcessingMode mode);
void TriggerAsync();
v8::MaybeLocal<v8::Value> ReceiveMessage(v8::Local<v8::Context> context,
bool only_if_receiving);
MessageProcessingMode mode);

std::unique_ptr<MessagePortData> data_ = nullptr;
bool receiving_messages_ = false;
Expand Down
12 changes: 12 additions & 0 deletions test/parallel/test-worker-terminate-ref-public-port.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
'use strict';
const common = require('../common');
const { Worker } = require('worker_threads');

// The actual test here is that the Worker does not keep the main thread
// running after it has been .terminate()’ed.

const w = new Worker(`
const p = require('worker_threads').parentPort;
while(true) p.postMessage({})`, { eval: true });
w.once('message', () => w.terminate());
w.once('exit', common.mustCall());

0 comments on commit 31f4600

Please sign in to comment.