Skip to content

Commit

Permalink
worker: refactor MessagePort entanglement management
Browse files Browse the repository at this point in the history
This addresses the `TODO` left on my request in 9e446b3. :)

Refs: nodejs#36271
  • Loading branch information
addaleax committed Dec 1, 2020
1 parent 9e446b3 commit 6334542
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 39 deletions.
58 changes: 28 additions & 30 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,19 +515,8 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("transferables", transferables_);
}

// TODO(@jasnell): The name here will be an empty string if the
// one-to-one MessageChannel is used. In such cases,
// SiblingGroup::Get() will return nothing and group_ will be
// an empty pointer. @addaleax suggests that the code here
// could be clearer if attaching the SiblingGroup were a
// separate step rather than part of the constructor here.
MessagePortData::MessagePortData(
MessagePort* owner,
const std::string& name)
: owner_(owner),
group_(SiblingGroup::Get(name)) {
if (group_)
group_->Entangle(this);
MessagePortData::MessagePortData(MessagePort* owner)
: owner_(owner) {
}

MessagePortData::~MessagePortData() {
Expand All @@ -552,17 +541,13 @@ void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
}

void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK(!a->group_);
CHECK(!b->group_);
b->group_ = a->group_ = std::make_shared<SiblingGroup>();
a->group_->Entangle(a);
a->group_->Entangle(b);
auto group = std::make_shared<SiblingGroup>();
group->Entangle({a, b});
}

void MessagePortData::Disentangle() {
if (group_) {
group_->Disentangle(this);
group_.reset();
}
}

Expand All @@ -572,13 +557,12 @@ MessagePort::~MessagePort() {

MessagePort::MessagePort(Environment* env,
Local<Context> context,
Local<Object> wrap,
const std::string& name)
Local<Object> wrap)
: HandleWrap(env,
wrap,
reinterpret_cast<uv_handle_t*>(&async_),
AsyncWrap::PROVIDER_MESSAGEPORT),
data_(new MessagePortData(this, name)) {
data_(new MessagePortData(this)) {
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
Expand Down Expand Up @@ -645,7 +629,7 @@ MessagePort* MessagePort::New(
Environment* env,
Local<Context> context,
std::unique_ptr<MessagePortData> data,
const std::string& name) {
std::shared_ptr<SiblingGroup> sibling_group) {
Context::Scope context_scope(context);
Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);

Expand All @@ -654,14 +638,15 @@ MessagePort* MessagePort::New(
Local<Object> instance;
if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
return nullptr;
MessagePort* port = new MessagePort(env, context, instance, name);
MessagePort* port = new MessagePort(env, context, instance);
CHECK_NOT_NULL(port);
if (port->IsHandleClosing()) {
// Construction failed with an exception.
return nullptr;
}

if (data) {
CHECK(!sibling_group);
port->Detach();
port->data_ = std::move(data);

Expand All @@ -673,6 +658,8 @@ MessagePort* MessagePort::New(
// If the existing MessagePortData object had pending messages, this is
// the easiest way to run that queue.
port->TriggerAsync();
} else if (sibling_group) {
sibling_group->Entangle(port->data_.get());
}
return port;
}
Expand Down Expand Up @@ -1067,7 +1054,7 @@ void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
}

void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
Entangle(a, b->data_.get());
MessagePortData::Entangle(a->data_.get(), b->data_.get());
}

void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
Expand Down Expand Up @@ -1274,7 +1261,6 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
}

std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
if (name.empty()) return {};
Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
std::shared_ptr<SiblingGroup> group;
auto i = groups_.find(name);
Expand Down Expand Up @@ -1348,14 +1334,24 @@ Maybe<bool> SiblingGroup::Dispatch(
return Just(true);
}

void SiblingGroup::Entangle(MessagePortData* data) {
void SiblingGroup::Entangle(MessagePortData* port) {
Entangle({ port });
}

void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
Mutex::ScopedLock lock(group_mutex_);
ports_.insert(data);
for (MessagePortData* data : ports) {
ports_.insert(data);
CHECK(!data->group_);
data->group_ = shared_from_this();
}
}

void SiblingGroup::Disentangle(MessagePortData* data) {
auto self = shared_from_this(); // Keep alive until end of function.
Mutex::ScopedLock lock(group_mutex_);
ports_.erase(data);
data->group_.reset();

data->AddToIncomingQueue(std::make_shared<Message>());
// If this is an anonymous group and there's another port, close it.
Expand Down Expand Up @@ -1407,8 +1403,10 @@ static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
Context::Scope context_scope(env->context());
Utf8Value name(env->isolate(), args[0]);
MessagePort* port =
MessagePort::New(env, env->context(), nullptr, std::string(*name));
args.GetReturnValue().Set(port->object());
MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
if (port != nullptr) {
args.GetReturnValue().Set(port->object());
}
}

static void InitMessaging(Local<Object> target,
Expand Down
16 changes: 7 additions & 9 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Message : public MemoryRetainer {
friend class MessagePort;
};

class SiblingGroup {
class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
public:
// Named SiblingGroup, Used for one-to-many BroadcastChannels.
static std::shared_ptr<SiblingGroup> Get(const std::string& name);
Expand All @@ -134,7 +134,7 @@ class SiblingGroup {
std::string* error = nullptr);

void Entangle(MessagePortData* data);

void Entangle(std::initializer_list<MessagePortData*> data);
void Disentangle(MessagePortData* data);

const std::string& name() const { return name_; }
Expand All @@ -159,9 +159,7 @@ class SiblingGroup {
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData : public TransferData {
public:
explicit MessagePortData(
MessagePort* owner,
const std::string& name = std::string());
explicit MessagePortData(MessagePort* owner);
~MessagePortData() override;

MessagePortData(MessagePortData&& other) = delete;
Expand Down Expand Up @@ -203,6 +201,7 @@ class MessagePortData : public TransferData {
MessagePort* owner_ = nullptr;
std::shared_ptr<SiblingGroup> group_;
friend class MessagePort;
friend class SiblingGroup;
};

// A message port that receives messages from other threads, including
Expand All @@ -216,8 +215,7 @@ class MessagePort : public HandleWrap {
// creating MessagePort instances.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap,
const std::string& name = std::string());
v8::Local<v8::Object> wrap);

public:
~MessagePort() override;
Expand All @@ -226,8 +224,8 @@ class MessagePort : public HandleWrap {
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<MessagePortData> data = nullptr,
const std::string& name = std::string());
std::unique_ptr<MessagePortData> data = {},
std::shared_ptr<SiblingGroup> sibling_group = {});

// Send a message, i.e. deliver it into the sibling's incoming queue.
// If this port is closed, or if there is no sibling, this message is
Expand Down

0 comments on commit 6334542

Please sign in to comment.