Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: refactor MessagePort entanglement management #36345

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 28 additions & 30 deletions src/node_messaging.cc
Original file line number Diff line number Diff line change
Expand Up @@ -515,19 +515,8 @@ void Message::MemoryInfo(MemoryTracker* tracker) const {
tracker->TrackField("transferables", transferables_);
}

// TODO(@jasnell): The name here will be an empty string if the
// one-to-one MessageChannel is used. In such cases,
// SiblingGroup::Get() will return nothing and group_ will be
// an empty pointer. @addaleax suggests that the code here
// could be clearer if attaching the SiblingGroup were a
// separate step rather than part of the constructor here.
MessagePortData::MessagePortData(
MessagePort* owner,
const std::string& name)
: owner_(owner),
group_(SiblingGroup::Get(name)) {
if (group_)
group_->Entangle(this);
MessagePortData::MessagePortData(MessagePort* owner)
: owner_(owner) {
}

MessagePortData::~MessagePortData() {
Expand All @@ -552,17 +541,13 @@ void MessagePortData::AddToIncomingQueue(std::shared_ptr<Message> message) {
}

void MessagePortData::Entangle(MessagePortData* a, MessagePortData* b) {
CHECK(!a->group_);
CHECK(!b->group_);
b->group_ = a->group_ = std::make_shared<SiblingGroup>();
a->group_->Entangle(a);
a->group_->Entangle(b);
auto group = std::make_shared<SiblingGroup>();
group->Entangle({a, b});
}

void MessagePortData::Disentangle() {
if (group_) {
group_->Disentangle(this);
group_.reset();
}
}

Expand All @@ -572,13 +557,12 @@ MessagePort::~MessagePort() {

MessagePort::MessagePort(Environment* env,
Local<Context> context,
Local<Object> wrap,
const std::string& name)
Local<Object> wrap)
: HandleWrap(env,
wrap,
reinterpret_cast<uv_handle_t*>(&async_),
AsyncWrap::PROVIDER_MESSAGEPORT),
data_(new MessagePortData(this, name)) {
data_(new MessagePortData(this)) {
auto onmessage = [](uv_async_t* handle) {
// Called when data has been put into the queue.
MessagePort* channel = ContainerOf(&MessagePort::async_, handle);
Expand Down Expand Up @@ -645,7 +629,7 @@ MessagePort* MessagePort::New(
Environment* env,
Local<Context> context,
std::unique_ptr<MessagePortData> data,
const std::string& name) {
std::shared_ptr<SiblingGroup> sibling_group) {
Context::Scope context_scope(context);
Local<FunctionTemplate> ctor_templ = GetMessagePortConstructorTemplate(env);

Expand All @@ -654,14 +638,15 @@ MessagePort* MessagePort::New(
Local<Object> instance;
if (!ctor_templ->InstanceTemplate()->NewInstance(context).ToLocal(&instance))
return nullptr;
MessagePort* port = new MessagePort(env, context, instance, name);
MessagePort* port = new MessagePort(env, context, instance);
CHECK_NOT_NULL(port);
if (port->IsHandleClosing()) {
// Construction failed with an exception.
return nullptr;
}

if (data) {
CHECK(!sibling_group);
port->Detach();
port->data_ = std::move(data);

Expand All @@ -673,6 +658,8 @@ MessagePort* MessagePort::New(
// If the existing MessagePortData object had pending messages, this is
// the easiest way to run that queue.
port->TriggerAsync();
} else if (sibling_group) {
sibling_group->Entangle(port->data_.get());
}
return port;
}
Expand Down Expand Up @@ -1067,7 +1054,7 @@ void MessagePort::MoveToContext(const FunctionCallbackInfo<Value>& args) {
}

void MessagePort::Entangle(MessagePort* a, MessagePort* b) {
Entangle(a, b->data_.get());
MessagePortData::Entangle(a->data_.get(), b->data_.get());
}

void MessagePort::Entangle(MessagePort* a, MessagePortData* b) {
Expand Down Expand Up @@ -1274,7 +1261,6 @@ Maybe<bool> JSTransferable::Data::FinalizeTransferWrite(
}

std::shared_ptr<SiblingGroup> SiblingGroup::Get(const std::string& name) {
if (name.empty()) return {};
Mutex::ScopedLock lock(SiblingGroup::groups_mutex_);
std::shared_ptr<SiblingGroup> group;
auto i = groups_.find(name);
Expand Down Expand Up @@ -1348,14 +1334,24 @@ Maybe<bool> SiblingGroup::Dispatch(
return Just(true);
}

void SiblingGroup::Entangle(MessagePortData* data) {
void SiblingGroup::Entangle(MessagePortData* port) {
Entangle({ port });
}

void SiblingGroup::Entangle(std::initializer_list<MessagePortData*> ports) {
Mutex::ScopedLock lock(group_mutex_);
ports_.insert(data);
for (MessagePortData* data : ports) {
ports_.insert(data);
CHECK(!data->group_);
data->group_ = shared_from_this();
}
}

void SiblingGroup::Disentangle(MessagePortData* data) {
auto self = shared_from_this(); // Keep alive until end of function.
Mutex::ScopedLock lock(group_mutex_);
ports_.erase(data);
data->group_.reset();

data->AddToIncomingQueue(std::make_shared<Message>());
// If this is an anonymous group and there's another port, close it.
Expand Down Expand Up @@ -1407,8 +1403,10 @@ static void BroadcastChannel(const FunctionCallbackInfo<Value>& args) {
Context::Scope context_scope(env->context());
Utf8Value name(env->isolate(), args[0]);
MessagePort* port =
MessagePort::New(env, env->context(), nullptr, std::string(*name));
args.GetReturnValue().Set(port->object());
MessagePort::New(env, env->context(), {}, SiblingGroup::Get(*name));
if (port != nullptr) {
args.GetReturnValue().Set(port->object());
}
}

static void InitMessaging(Local<Object> target,
Expand Down
16 changes: 7 additions & 9 deletions src/node_messaging.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ class Message : public MemoryRetainer {
friend class MessagePort;
};

class SiblingGroup {
class SiblingGroup final : public std::enable_shared_from_this<SiblingGroup> {
public:
// Named SiblingGroup, Used for one-to-many BroadcastChannels.
static std::shared_ptr<SiblingGroup> Get(const std::string& name);
Expand All @@ -134,7 +134,7 @@ class SiblingGroup {
std::string* error = nullptr);

void Entangle(MessagePortData* data);

void Entangle(std::initializer_list<MessagePortData*> data);
void Disentangle(MessagePortData* data);

const std::string& name() const { return name_; }
Expand All @@ -159,9 +159,7 @@ class SiblingGroup {
// a specific Environment/Isolate/event loop, for easier transfer between those.
class MessagePortData : public TransferData {
public:
explicit MessagePortData(
MessagePort* owner,
const std::string& name = std::string());
explicit MessagePortData(MessagePort* owner);
~MessagePortData() override;

MessagePortData(MessagePortData&& other) = delete;
Expand Down Expand Up @@ -203,6 +201,7 @@ class MessagePortData : public TransferData {
MessagePort* owner_ = nullptr;
std::shared_ptr<SiblingGroup> group_;
friend class MessagePort;
friend class SiblingGroup;
};

// A message port that receives messages from other threads, including
Expand All @@ -216,8 +215,7 @@ class MessagePort : public HandleWrap {
// creating MessagePort instances.
MessagePort(Environment* env,
v8::Local<v8::Context> context,
v8::Local<v8::Object> wrap,
const std::string& name = std::string());
v8::Local<v8::Object> wrap);

public:
~MessagePort() override;
Expand All @@ -226,8 +224,8 @@ class MessagePort : public HandleWrap {
// `MessagePortData` object.
static MessagePort* New(Environment* env,
v8::Local<v8::Context> context,
std::unique_ptr<MessagePortData> data = nullptr,
const std::string& name = std::string());
std::unique_ptr<MessagePortData> data = {},
std::shared_ptr<SiblingGroup> sibling_group = {});

// Send a message, i.e. deliver it into the sibling's incoming queue.
// If this port is closed, or if there is no sibling, this message is
Expand Down