From c230cd01ce5fe956d67a184339672b9bb09d4304 Mon Sep 17 00:00:00 2001 From: Tim Date: Fri, 5 Apr 2024 09:12:22 +1300 Subject: [PATCH] properly handle multiple ports in SharedWorker (#2468) --- .changeset/eight-birds-allow.md | 8 ++ .../src/internal/workerRunner.ts | 97 +++++++++++++------ .../platform-bun/src/internal/workerRunner.ts | 6 +- .../src/internal/workerRunner.ts | 6 +- packages/platform/src/WorkerRunner.ts | 3 +- .../platform/src/internal/workerRunner.ts | 28 +++--- 6 files changed, 100 insertions(+), 48 deletions(-) create mode 100644 .changeset/eight-birds-allow.md diff --git a/.changeset/eight-birds-allow.md b/.changeset/eight-birds-allow.md new file mode 100644 index 00000000000..139255acbbc --- /dev/null +++ b/.changeset/eight-birds-allow.md @@ -0,0 +1,8 @@ +--- +"@effect/platform-browser": patch +"@effect/platform-node": patch +"@effect/platform-bun": patch +"@effect/platform": patch +--- + +properly handle multiple ports in SharedWorker diff --git a/packages/platform-browser/src/internal/workerRunner.ts b/packages/platform-browser/src/internal/workerRunner.ts index e5b3e083e48..a56faf460a6 100644 --- a/packages/platform-browser/src/internal/workerRunner.ts +++ b/packages/platform-browser/src/internal/workerRunner.ts @@ -2,30 +2,47 @@ import { WorkerError } from "@effect/platform/WorkerError" import * as Runner from "@effect/platform/WorkerRunner" import * as Cause from "effect/Cause" import * as Effect from "effect/Effect" +import * as FiberSet from "effect/FiberSet" +import { globalValue } from "effect/GlobalValue" import * as Layer from "effect/Layer" import * as Queue from "effect/Queue" import * as Schedule from "effect/Schedule" +const cachedPorts = globalValue("@effect/platform-browser/Worker/cachedPorts", () => new Set()) +function globalHandleConnect(event: MessageEvent) { + cachedPorts.add((event as MessageEvent).ports[0]) +} +if ("onconnect" in self) { + self.onconnect = globalHandleConnect +} + const platformRunnerImpl = Runner.PlatformRunner.of({ [Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId, start(shutdown: Effect.Effect) { return Effect.gen(function*(_) { - const port = "postMessage" in self ? - self : - (yield* _(Effect.async((resume, signal) => { - self.addEventListener("connect", function(event) { - const port = (event as MessageEvent).ports[0] - port.start() - resume(Effect.succeed(port)) - }, { once: true, signal }) - }))) - const queue = yield* _(Queue.unbounded()) - yield* _( + let currentPortId = 0 + + const queue = yield* _(Queue.unbounded()) + const runFork = yield* _(FiberSet.makeRuntime()) + const ports = new Map() + const send = (portId: number, message: O, transfer?: ReadonlyArray) => + Effect.sync(() => { + ports.get(portId)?.postMessage([1, message], { + transfer: transfer as any + }) + }) + + function handlePort(port: MessagePort, sharedWorker: boolean) { + const portId = currentPortId++ + ports.set(portId, port) + Effect.async((resume) => { function onMessage(event: MessageEvent) { const message = (event as MessageEvent).data as Runner.BackingRunner.Message if (message[0] === 0) { - queue.unsafeOffer(message[1]) + queue.unsafeOffer([portId, message[1]]) + } else if (sharedWorker) { + resume(Effect.interrupt) } else { Effect.runFork(shutdown) } @@ -39,29 +56,51 @@ const platformRunnerImpl = Runner.PlatformRunner.of({ port.addEventListener("message", onMessage as any) port.addEventListener("messageerror", onMessageError as any) port.addEventListener("error", onError as any) + + // ready + if ("start" in port) { + port.start() + } + port.postMessage([0]) + return Effect.sync(() => { port.removeEventListener("message", onMessage as any) port.removeEventListener("messageerror", onMessageError as any) port.removeEventListener("error", onError as any) }) - }), - Effect.tapErrorCause((cause) => Cause.isInterruptedOnly(cause) ? Effect.unit : Effect.logDebug(cause)), - Effect.retry(Schedule.forever), - Effect.annotateLogs({ - package: "@effect/platform-browser", - module: "WorkerRunner" - }), - Effect.interruptible, - Effect.forkScoped - ) - const send = (message: O, transfer?: ReadonlyArray) => - Effect.sync(() => - port.postMessage([1, message], { - transfer: transfer as any - }) + }).pipe( + Effect.tapErrorCause((cause) => Cause.isInterruptedOnly(cause) ? Effect.unit : Effect.logDebug(cause)), + Effect.retry(Schedule.forever), + Effect.annotateLogs({ + package: "@effect/platform-browser", + module: "WorkerRunner" + }), + Effect.ensuring(Effect.sync(() => { + ports.delete(portId) + })), + Effect.interruptible, + runFork ) - // ready - port.postMessage([0]) + } + + if ("onconnect" in self) { + self.onconnect = function(event: MessageEvent) { + const port = (event as MessageEvent).ports[0] + handlePort(port, true) + } + yield* _(Effect.addFinalizer(() => + Effect.sync(() => { + ;(self as any).onconnect = globalHandleConnect + }) + )) + for (const port of cachedPorts) { + handlePort(port, true) + } + cachedPorts.clear() + } else { + handlePort(self as any, false) + } + return { queue, send } }) } diff --git a/packages/platform-bun/src/internal/workerRunner.ts b/packages/platform-bun/src/internal/workerRunner.ts index 36f529f0171..73985b90479 100644 --- a/packages/platform-bun/src/internal/workerRunner.ts +++ b/packages/platform-bun/src/internal/workerRunner.ts @@ -16,13 +16,13 @@ const platformRunnerImpl = Runner.PlatformRunner.of({ return yield* _(Effect.die("not in a worker")) } const port = self - const queue = yield* _(Queue.unbounded()) + const queue = yield* _(Queue.unbounded()) yield* _( Effect.async((resume) => { function onMessage(event: MessageEvent) { const message = (event as MessageEvent).data as Runner.BackingRunner.Message if (message[0] === 0) { - queue.unsafeOffer(message[1]) + queue.unsafeOffer([0, message[1]]) } else { Effect.runFork(shutdown) } @@ -46,7 +46,7 @@ const platformRunnerImpl = Runner.PlatformRunner.of({ Effect.interruptible, Effect.forkScoped ) - const send = (message: O, transfer?: ReadonlyArray) => + const send = (_portId: number, message: O, transfer?: ReadonlyArray) => Effect.sync(() => port.postMessage([1, message], { transfer: transfer as any diff --git a/packages/platform-node/src/internal/workerRunner.ts b/packages/platform-node/src/internal/workerRunner.ts index a271ae36c98..87f91f2aa54 100644 --- a/packages/platform-node/src/internal/workerRunner.ts +++ b/packages/platform-node/src/internal/workerRunner.ts @@ -15,12 +15,12 @@ const platformRunnerImpl = Runner.PlatformRunner.of({ return yield* _(new WorkerError({ reason: "spawn", error: new Error("not in worker") })) } const port = WorkerThreads.parentPort - const queue = yield* _(Queue.unbounded()) + const queue = yield* _(Queue.unbounded()) yield* _( Effect.async((resume) => { port.on("message", (message: Runner.BackingRunner.Message) => { if (message[0] === 0) { - queue.unsafeOffer(message[1]) + queue.unsafeOffer([0, message[1]]) } else { Effect.runFork(shutdown) } @@ -41,7 +41,7 @@ const platformRunnerImpl = Runner.PlatformRunner.of({ Effect.interruptible, Effect.forkScoped ) - const send = (message: O, transfers?: ReadonlyArray) => + const send = (_portId: number, message: O, transfers?: ReadonlyArray) => Effect.sync(() => port.postMessage([1, message], transfers as any)) // ready port.postMessage([0]) diff --git a/packages/platform/src/WorkerRunner.ts b/packages/platform/src/WorkerRunner.ts index d8d7e559f18..27dee428e5d 100644 --- a/packages/platform/src/WorkerRunner.ts +++ b/packages/platform/src/WorkerRunner.ts @@ -17,8 +17,9 @@ import type { WorkerError } from "./WorkerError.js" * @category models */ export interface BackingRunner { - readonly queue: Queue.Dequeue + readonly queue: Queue.Dequeue readonly send: ( + portId: number, message: O, transfers?: ReadonlyArray ) => Effect.Effect diff --git a/packages/platform/src/internal/workerRunner.ts b/packages/platform/src/internal/workerRunner.ts index e6c898d1d27..10be3cf8ce3 100644 --- a/packages/platform/src/internal/workerRunner.ts +++ b/packages/platform/src/internal/workerRunner.ts @@ -51,15 +51,16 @@ export const make = ( yield* _( Queue.take(backing.queue), options?.decode ? - Effect.flatMap((req): Effect.Effect, WorkerError> => { + Effect.flatMap((msg): Effect.Effect], WorkerError> => { + const req = msg[1] if (req[1] === 1) { - return Effect.succeed(req) + return Effect.succeed(msg) } - return Effect.map(options.decode!(req[2]), (data) => [req[0], req[1], data, req[3]]) + return Effect.map(options.decode!(req[2]), (data) => [msg[0], [req[0], req[1], data, req[3]]]) }) : identity, - Effect.tap((req) => { + Effect.tap(([portId, req]) => { const id = req[0] if (req[1] === 1) { const fiber = fiberMap.get(id) @@ -79,7 +80,7 @@ export const make = ( ? Effect.provideService(options.encodeOutput(req[2], data), Transferable.Collector, collector) : Effect.succeed(data), Effect.flatMap((payload) => - backing.send([id, 0, [payload]], [ + backing.send(portId, [id, 0, [payload]], [ ...transfers, ...collector.unsafeRead() ]) @@ -93,7 +94,7 @@ export const make = ( if (options?.encodeOutput === undefined) { const payload = Chunk.toReadonlyArray(data) const transfers = options?.transfers ? payload.flatMap(options.transfers) : undefined - return backing.send([id, 0, payload], transfers) + return backing.send(portId, [id, 0, payload], transfers) } const transfers: Array = [] @@ -110,12 +111,12 @@ export const make = ( Effect.provideService(Transferable.Collector, collector), Effect.flatMap((payload) => { collector.unsafeRead().forEach((transfer) => transfers.push(transfer)) - return backing.send([id, 0, payload], transfers) + return backing.send(portId, [id, 0, payload], transfers) }) ) }), Stream.runDrain, - Effect.andThen(backing.send([id, 1])) + Effect.andThen(backing.send(portId, [id, 1])) ) if (req[3]) { @@ -131,7 +132,8 @@ export const make = ( return effect }), - Effect.catchIf(isWorkerError, (error) => backing.send([id, 3, WorkerError.encodeCause(Cause.fail(error))])), + Effect.catchIf(isWorkerError, (error) => + backing.send(portId, [id, 3, WorkerError.encodeCause(Cause.fail(error))])), Effect.catchAllCause((cause) => Either.match(Cause.failureOrCause(cause), { onLeft: (error) => { @@ -146,15 +148,17 @@ export const make = ( ) : Effect.succeed(error), Effect.flatMap((payload) => - backing.send([id, 2, payload as any], [ + backing.send(portId, [id, 2, payload as any], [ ...transfers, ...collector.unsafeRead() ]) ), - Effect.catchAllCause((cause) => backing.send([id, 3, WorkerError.encodeCause(cause)])) + Effect.catchAllCause((cause) => + backing.send(portId, [id, 3, WorkerError.encodeCause(cause)]) + ) ) }, - onRight: (cause) => backing.send([id, 3, WorkerError.encodeCause(cause)]) + onRight: (cause) => backing.send(portId, [id, 3, WorkerError.encodeCause(cause)]) }) ), Effect.ensuring(Effect.sync(() => fiberMap.delete(id))),