Skip to content

Commit

Permalink
refactor & simplify /platform backing workers (#3255)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jul 15, 2024
1 parent 1b45236 commit ada68b3
Show file tree
Hide file tree
Showing 11 changed files with 661 additions and 610 deletions.
10 changes: 10 additions & 0 deletions .changeset/happy-llamas-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
"@effect/platform-browser": minor
"@effect/platform-node": minor
"@effect/platform-bun": minor
"@effect/platform": minor
---

refactor & simplify /platform backing workers

Improves worker performance by 2x
87 changes: 36 additions & 51 deletions packages/platform-browser/src/internal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,43 @@ import { WorkerError } from "@effect/platform/WorkerError"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"

const platformWorkerImpl = Worker.PlatformWorker.of({
[Worker.PlatformWorkerTypeId]: Worker.PlatformWorkerTypeId,
spawn<I, O>(worker_: unknown) {
return Effect.gen(function*(_) {
const worker = worker_ as globalThis.SharedWorker | globalThis.Worker | MessagePort
let port: globalThis.Worker | MessagePort
if ("port" in worker) {
port = worker.port
} else {
port = worker
}

yield* _(Effect.addFinalizer(() => Effect.sync(() => port.postMessage([1]))))

const queue = yield* _(Queue.unbounded<Worker.BackingWorker.Message<O>>())
const latch = yield* Deferred.make<void>()

const fiber = yield* _(
Effect.async<never, WorkerError, never>((resume) => {
function onMessage(event: MessageEvent) {
queue.unsafeOffer((event as MessageEvent).data)
}
function onError(event: ErrorEvent) {
resume(new WorkerError({ reason: "unknown", error: event.error ?? event.message }))
}
port.addEventListener("message", onMessage as any)
port.addEventListener("error", onError as any)
Deferred.unsafeDone(latch, Effect.void)
return Effect.sync(() => {
port.removeEventListener("message", onMessage as any)
port.removeEventListener("error", onError as any)
})
}),
Effect.interruptible,
Effect.forkScoped
)
yield* Deferred.await(latch)

if ("start" in port) {
port.start()
}

const send = (message: I, transfers?: ReadonlyArray<unknown>) =>
Effect.try({
try: () => port.postMessage([0, message], transfers as any),
catch: (error) => new WorkerError({ reason: "send", error })
import * as Scope from "effect/Scope"

const platformWorkerImpl = Worker.makePlatform<globalThis.SharedWorker | globalThis.Worker | MessagePort>()({
setup({ scope, worker }) {
const port = "port" in worker ? worker.port : worker
return Effect.as(
Scope.addFinalizer(
scope,
Effect.sync(() => {
port.postMessage([1])
})

return { fiber, queue, send }
})
),
port
)
},
listen({ deferred, emit, port, scope }) {
function onMessage(event: MessageEvent) {
emit(event.data)
}
function onError(event: ErrorEvent) {
Deferred.unsafeDone(
deferred,
new WorkerError({ reason: "unknown", error: event.error ?? event.message })
)
}
port.addEventListener("message", onMessage as any)
port.addEventListener("error", onError as any)
if ("start" in port) {
port.start()
}
return Scope.addFinalizer(
scope,
Effect.sync(() => {
port.removeEventListener("message", onMessage as any)
port.removeEventListener("error", onError as any)
})
)
}
})

Expand Down
178 changes: 99 additions & 79 deletions packages/platform-browser/src/internal/workerRunner.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import { WorkerError } from "@effect/platform/WorkerError"
import * as Runner from "@effect/platform/WorkerRunner"
import * as Cause from "effect/Cause"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as ExecStrategy from "effect/ExecutionStrategy"
import * as Exit from "effect/Exit"
import * as FiberId from "effect/FiberId"
import * as FiberSet from "effect/FiberSet"
import { globalValue } from "effect/GlobalValue"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
import * as Schedule from "effect/Schedule"
import * as Scope from "effect/Scope"

const cachedPorts = globalValue("@effect/platform-browser/Worker/cachedPorts", () => new Set<MessagePort>())
function globalHandleConnect(event: MessageEvent) {
Expand All @@ -18,92 +20,110 @@ if (typeof self !== "undefined" && "onconnect" in self) {

const platformRunnerImpl = Runner.PlatformRunner.of({
[Runner.PlatformRunnerTypeId]: Runner.PlatformRunnerTypeId,
start<I, O>(shutdown: Effect.Effect<void>) {
return Effect.gen(function*() {
start<I, O>() {
return Effect.sync(() => {
let currentPortId = 0

yield* Effect.addFinalizer(() => Effect.sync(() => self.close()))

const queue = yield* Queue.unbounded<readonly [portId: number, message: I]>()
const runFork = yield* FiberSet.makeRuntime<never>()
const ports = new Map<number, MessagePort>()
const ports = new Map<number, readonly [MessagePort, Scope.CloseableScope]>()
const send = (portId: number, message: O, transfer?: ReadonlyArray<unknown>) =>
Effect.sync(() => {
ports.get(portId)?.postMessage([1, message], {
;(ports.get(portId)?.[0] ?? self).postMessage([1, message], {
transfer: transfer as any
})
})

function handlePort(port: MessagePort, sharedWorker: boolean) {
const portId = currentPortId++
ports.set(portId, port)

Effect.async<never, WorkerError, never>((resume) => {
function onMessage(event: MessageEvent) {
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
if (message[0] === 0) {
queue.unsafeOffer([portId, message[1]])
} else if (sharedWorker && ports.size > 1) {
resume(Effect.interrupt)
} else {
Effect.runFork(shutdown)
}
}
function onMessageError(error: ErrorEvent) {
resume(new WorkerError({ reason: "decode", error: error.error ?? error.message }))
}
function onError(error: ErrorEvent) {
resume(new WorkerError({ reason: "unknown", error: error.error ?? error.message }))
}
port.addEventListener("message", onMessage as any)
port.addEventListener("messageerror", onMessageError as any)
port.addEventListener("error", onError as any)

// ready
if ("start" in port) {
port.start()
}
port.postMessage([0])

return Effect.sync(() => {
port.removeEventListener("message", onMessage as any)
port.removeEventListener("messageerror", onMessageError as any)
port.removeEventListener("error", onError as any)
})
}).pipe(
Effect.tapErrorCause((cause) => Cause.isInterruptedOnly(cause) ? Effect.void : Effect.logDebug(cause)),
Effect.retry(Schedule.forever),
Effect.annotateLogs({
package: "@effect/platform-browser",
module: "WorkerRunner"
}),
Effect.ensuring(Effect.sync(() => {
ports.delete(portId)
})),
Effect.interruptible,
runFork
)
}

if ("onconnect" in self) {
self.onconnect = function(event: MessageEvent) {
const port = (event as MessageEvent).ports[0]
handlePort(port, true)
}
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
;(self as any).onconnect = globalHandleConnect
})
const run = <A, E, R>(handler: (portId: number, message: I) => Effect.Effect<A, E, R>) =>
Effect.uninterruptibleMask((restore) =>
Scope.make().pipe(
Effect.bindTo("scope"),
Effect.bind("fiberSet", ({ scope }) => FiberSet.make<any, WorkerError | E>().pipe(Scope.extend(scope))),
Effect.bind("runFork", ({ fiberSet }) => FiberSet.runtime(fiberSet)<R>()),
Effect.tap(({ fiberSet, runFork, scope }) => {
function onMessage(portId: number) {
return function(event: MessageEvent) {
const message = (event as MessageEvent).data as Runner.BackingRunner.Message<I>
if (message[0] === 0) {
runFork(restore(handler(portId, message[1])))
} else {
const port = ports.get(portId)
if (port) {
Effect.runFork(Scope.close(port[1], Exit.void))
}
ports.delete(portId)
if (ports.size === 0) {
Deferred.unsafeDone(fiberSet.deferred, Exit.interrupt(FiberId.none))
}
}
}
}
function onMessageError(error: MessageEvent) {
Deferred.unsafeDone(
fiberSet.deferred,
new WorkerError({ reason: "decode", error: error.data })
)
}
function onError(error: any) {
Deferred.unsafeDone(
fiberSet.deferred,
new WorkerError({ reason: "unknown", error: error.data })
)
}
function handlePort(port: MessagePort) {
return Scope.fork(scope, ExecStrategy.sequential).pipe(
Effect.flatMap((scope) => {
const portId = currentPortId++
ports.set(portId, [port, scope])
const onMsg = onMessage(portId)
port.addEventListener("message", onMsg)
port.addEventListener("messageerror", onMessageError)
if ("start" in port) {
port.start()
}
port.postMessage([0])
return Scope.addFinalizer(
scope,
Effect.sync(() => {
port.removeEventListener("message", onMsg)
port.removeEventListener("messageerror", onError)
})
)
}),
runFork
)
}
self.addEventListener("error", onError)
if ("onconnect" in self) {
self.onconnect = function(event: MessageEvent) {
const port = (event as MessageEvent).ports[0]
handlePort(port)
}
for (const port of cachedPorts) {
handlePort(port)
}
cachedPorts.clear()
} else {
handlePort(self as any)
}
return Scope.addFinalizer(
scope,
Effect.sync(() => {
self.removeEventListener("error", onError)
if ("onconnect" in self) {
self.onconnect = globalHandleConnect
}
self.close()
})
)
}),
Effect.flatMap(({ fiberSet, scope }) =>
restore(FiberSet.join(fiberSet) as Effect.Effect<never, E | WorkerError>).pipe(
Effect.ensuring(Scope.close(scope, Exit.void))
)
)
)
)
for (const port of cachedPorts) {
handlePort(port, true)
}
cachedPorts.clear()
} else {
handlePort(self as any, false)
}

return { queue, send }
return { run, send }
})
}
})
Expand Down
Loading

0 comments on commit ada68b3

Please sign in to comment.