Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageChannel and MessagePort #3860

Merged
merged 13 commits into from
Jul 28, 2023
146 changes: 146 additions & 0 deletions packages/bun-types/globals.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,152 @@ declare function structuredClone<T>(
options?: StructuredSerializeOptions,
): T;

/**
* This Channel Messaging API interface allows us to create a new message channel and send data through it via its two MessagePort properties.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageChannel)
*/
interface MessageChannel {
/**
* Returns the first MessagePort object.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageChannel/port1)
*/
readonly port1: MessagePort;
/**
* Returns the second MessagePort object.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageChannel/port2)
*/
readonly port2: MessagePort;
}

declare var MessageChannel: {
prototype: MessageChannel;
new (): MessageChannel;
};

/**
* A message received by a target object.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageEvent)
*/
interface MessageEvent<T = any> extends Event {
/**
* Returns the data of the message.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageEvent/data)
*/
readonly data: T;
/**
* Returns the last event ID string, for server-sent events.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageEvent/lastEventId)
*/
readonly lastEventId: string;
/**
* Returns the origin of the message, for server-sent events and cross-document messaging.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageEvent/origin)
*/
readonly origin: string;
/**
* Returns the MessagePort array sent with the message, for cross-document messaging and channel messaging.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageEvent/ports)
*/
readonly ports: ReadonlyArray<MessagePort>;
/**
* Returns the WindowProxy of the source window, for cross-document messaging, and the MessagePort being attached, in the connect event fired at SharedWorkerGlobalScope objects.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageEvent/source)
*/
readonly source: MessageEventSource | null;
/**
* @deprecated
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessageEvent/initMessageEvent)
*/
initMessageEvent(
type: string,
bubbles?: boolean,
cancelable?: boolean,
data?: any,
origin?: string,
lastEventId?: string,
source?: MessageEventSource | null,
ports?: MessagePort[],
): void;
}

declare var MessageEvent: {
prototype: MessageEvent;
new <T>(type: string, eventInitDict?: MessageEventInit<T>): MessageEvent<T>;
};

interface MessagePortEventMap {
message: MessageEvent;
messageerror: MessageEvent;
}

/**
* This Channel Messaging API interface represents one of the two ports of a MessageChannel, allowing messages to be sent from one port and listening out for them arriving at the other.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessagePort)
*/
interface MessagePort extends EventTarget {
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessagePort/message_event) */
onmessage: ((this: MessagePort, ev: MessageEvent) => any) | null;
/** [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessagePort/messageerror_event) */
onmessageerror: ((this: MessagePort, ev: MessageEvent) => any) | null;
/**
* Disconnects the port, so that it is no longer active.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessagePort/close)
*/
close(): void;
/**
* Posts a message through the channel. Objects listed in transfer are transferred, not just cloned, meaning that they are no longer usable on the sending side.
*
* Throws a "DataCloneError" DOMException if transfer contains duplicate objects or port, or if message could not be cloned.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessagePort/postMessage)
*/
postMessage(message: any, transfer: Transferable[]): void;
postMessage(message: any, options?: StructuredSerializeOptions): void;
/**
* Begins dispatching messages received on the port.
*
* [MDN Reference](https://developer.mozilla.org/docs/Web/API/MessagePort/start)
*/
start(): void;
addEventListener<K extends keyof MessagePortEventMap>(
type: K,
listener: (this: MessagePort, ev: MessagePortEventMap[K]) => any,
options?: boolean | AddEventListenerOptions,
): void;
addEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | AddEventListenerOptions,
): void;
removeEventListener<K extends keyof MessagePortEventMap>(
type: K,
listener: (this: MessagePort, ev: MessagePortEventMap[K]) => any,
options?: boolean | EventListenerOptions,
): void;
removeEventListener(
type: string,
listener: EventListenerOrEventListenerObject,
options?: boolean | EventListenerOptions,
): void;
}

declare var MessagePort: {
prototype: MessagePort;
new (): MessagePort;
};

interface EncodeIntoResult {
/**
* The read Unicode code units of input.
Expand Down
5 changes: 3 additions & 2 deletions src/bun.js/bindings/BunJSCModule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "JavaScriptCore/VMTrapsInlines.h"
#include "SerializedScriptValue.h"
#include "ExceptionOr.h"
#include "MessagePort.h"

#if ENABLE(REMOTE_INSPECTOR)
#include "JavaScriptCore/RemoteInspectorServer.h"
Expand Down Expand Up @@ -552,8 +553,8 @@ JSC_DEFINE_HOST_FUNCTION(functionSerialize, (JSGlobalObject * lexicalGlobalObjec
}

Vector<JSC::Strong<JSC::JSObject>> transferList;

ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*globalObject, value, WTFMove(transferList));
Vector<RefPtr<MessagePort>> dummyPorts;
ExceptionOr<Ref<SerializedScriptValue>> serialized = SerializedScriptValue::create(*globalObject, value, WTFMove(transferList), dummyPorts);

if (serialized.hasException()) {
WebCore::propagateException(*globalObject, throwScope, serialized.releaseException());
Expand Down
8 changes: 6 additions & 2 deletions src/bun.js/bindings/BunWorkerGlobalScope.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
#include "config.h"

#include "BunWorkerGlobalScope.h"
#include "MessagePortChannelProviderImpl.h"

namespace Bun {
using namespace WebCore;
namespace WebCore {

WTF_MAKE_ISO_ALLOCATED_IMPL(GlobalScope);

MessagePortChannelProvider& GlobalScope::messagePortChannelProvider()
{
return *reinterpret_cast<MessagePortChannelProvider*>(&MessagePortChannelProviderImpl::singleton());
}
}
15 changes: 13 additions & 2 deletions src/bun.js/bindings/BunWorkerGlobalScope.h
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
#pragma once

#include "root.h"

#include "EventTarget.h"
Expand All @@ -7,8 +9,12 @@
#include <wtf/HashSet.h>
#include <wtf/Lock.h>

namespace Bun {
class GlobalScope final : public RefCounted<GlobalScope>, public EventTargetWithInlineData {
namespace WebCore {

class MessagePortChannelProvider;
class MessagePortChannelProviderImpl;

class GlobalScope : public RefCounted<GlobalScope>, public EventTargetWithInlineData {
WTF_MAKE_ISO_ALLOCATED(GlobalScope);

public:
Expand All @@ -33,6 +39,11 @@ class GlobalScope final : public RefCounted<GlobalScope>, public EventTargetWith
void derefEventTarget() final { deref(); }
void eventListenersDidChange() final {}

MessagePortChannelProvider& messagePortChannelProvider();

ScriptExecutionContext* m_context;

private:
MessagePortChannelProviderImpl* m_messagePortChannelProvider;
};
}
137 changes: 137 additions & 0 deletions src/bun.js/bindings/ScriptExecutionContext.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "root.h"
#include "headers.h"
#include "ScriptExecutionContext.h"
#include "MessagePort.h"

#include "webcore/WebSocket.h"
#include "libusockets.h"
Expand Down Expand Up @@ -70,6 +71,23 @@ void ScriptExecutionContext::unrefEventLoop()
Bun__eventLoop__incrementRefConcurrently(WebCore::clientData(vm())->bunVM, -1);
}

ScriptExecutionContext::~ScriptExecutionContext()
{
checkConsistency();

{
Locker locker { allScriptExecutionContextsMapLock };
ASSERT_WITH_MESSAGE(!allScriptExecutionContextsMap().contains(m_identifier), "A ScriptExecutionContext subclass instance implementing postTask should have already removed itself from the map");
}

auto postMessageCompletionHandlers = WTFMove(m_processMessageWithMessagePortsSoonHandlers);
for (auto& completionHandler : postMessageCompletionHandlers)
completionHandler();

while (auto* destructionObserver = m_destructionObservers.takeAny())
destructionObserver->contextDestroyed();
}

bool ScriptExecutionContext::postTaskTo(ScriptExecutionContextIdentifier identifier, Function<void(ScriptExecutionContext&)>&& task)
{
Locker locker { allScriptExecutionContextsMapLock };
Expand All @@ -82,6 +100,125 @@ bool ScriptExecutionContext::postTaskTo(ScriptExecutionContextIdentifier identif
return true;
}

void ScriptExecutionContext::didCreateDestructionObserver(ContextDestructionObserver& observer)
{
ASSERT(!m_inScriptExecutionContextDestructor);
m_destructionObservers.add(&observer);
}

void ScriptExecutionContext::willDestroyDestructionObserver(ContextDestructionObserver& observer)
{
m_destructionObservers.remove(&observer);
}

extern "C" void* Bun__getVM();

bool ScriptExecutionContext::isContextThread()
{
auto clientData = WebCore::clientData(vm());
return clientData->bunVM == Bun__getVM();
}

bool ScriptExecutionContext::ensureOnContextThread(ScriptExecutionContextIdentifier identifier, Function<void(ScriptExecutionContext&)>&& task)
{
ScriptExecutionContext* context = nullptr;
{
Locker locker { allScriptExecutionContextsMapLock };
context = allScriptExecutionContextsMap().get(identifier);

if (!context)
return false;

if (!context->isContextThread()) {
context->postTaskConcurrently(WTFMove(task));
return true;
}
}

task(*context);
return true;
}

bool ScriptExecutionContext::ensureOnMainThread(Function<void(ScriptExecutionContext&)>&& task)
{
Locker locker { allScriptExecutionContextsMapLock };
auto* context = allScriptExecutionContextsMap().get(1);

if (!context) {
return false;
}

context->postTaskConcurrently(WTFMove(task));
return true;
}

void ScriptExecutionContext::processMessageWithMessagePortsSoon(CompletionHandler<void()>&& completionHandler)
{
ASSERT(isContextThread());
m_processMessageWithMessagePortsSoonHandlers.append(WTFMove(completionHandler));

if (m_willProcessMessageWithMessagePortsSoon) {
return;
}

m_willProcessMessageWithMessagePortsSoon = true;

postTask([](ScriptExecutionContext& context) {
context.dispatchMessagePortEvents();
});
}

void ScriptExecutionContext::dispatchMessagePortEvents()
{
ASSERT(isContextThread());
checkConsistency();

ASSERT(m_willprocessMessageWithMessagePortsSoon);
m_willProcessMessageWithMessagePortsSoon = false;

auto completionHandlers = std::exchange(m_processMessageWithMessagePortsSoonHandlers, Vector<CompletionHandler<void()>> {});

// Make a frozen copy of the ports so we can iterate while new ones might be added or destroyed.
for (auto* messagePort : copyToVector(m_messagePorts)) {
// The port may be destroyed, and another one created at the same address,
// but this is harmless. The worst that can happen as a result is that
// dispatchMessages() will be called needlessly.
if (m_messagePorts.contains(messagePort) && messagePort->started())
messagePort->dispatchMessages();
}

for (auto& completionHandler : completionHandlers)
completionHandler();
}

void ScriptExecutionContext::checkConsistency() const
{
for (auto* messagePort : m_messagePorts)
ASSERT(messagePort->scriptExecutionContext() == this);

for (auto* destructionObserver : m_destructionObservers)
ASSERT(destructionObserver->scriptExecutionContext() == this);

// for (auto* activeDOMObject : m_activeDOMObjects) {
// ASSERT(activeDOMObject->scriptExecutionContext() == this);
// activeDOMObject->assertSuspendIfNeededWasCalled();
// }
}

void ScriptExecutionContext::createdMessagePort(MessagePort& messagePort)
{
ASSERT(isContextThread());

m_messagePorts.add(&messagePort);
}

void ScriptExecutionContext::destroyedMessagePort(MessagePort& messagePort)
{
ASSERT(isContextThread());

m_messagePorts.remove(&messagePort);
}

us_socket_context_t* ScriptExecutionContext::webSocketContextNoSSL()
{
if (!m_client_websockets_ctx) {
Expand Down
Loading