Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve worker pool error reporting and finalization #2819

Merged
merged 2 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/dry-waves-begin.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

ensure pool calls finalizer for failed acquisitions
9 changes: 9 additions & 0 deletions .changeset/eighty-mugs-sneeze.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
"@effect/platform-browser": minor
"@effect/platform": minor
"@effect/platform-bun": minor
"@effect/platform-node": minor
"@effect/platform-node-shared": minor
---

remove `permits` from workers, to prevent issues with pool resizing
5 changes: 5 additions & 0 deletions .changeset/stale-beers-promise.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": patch
---

ensure worker pool construction errors are reported during creation
15 changes: 9 additions & 6 deletions packages/effect/src/internal/pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,12 +218,15 @@ class PoolImpl<in out A, in out E> implements Pool.Pool<A, E> {
const release = (attempted: Attempted<A, E>): Effect.Effect<unknown> =>
core.exitMatch(attempted.result, {
onFailure: () =>
core.flatten(ref.modify(this.state, (state) => {
if (state.size <= this.min) {
return [allocateUinterruptible(this), { ...state, free: state.free + 1 }] as const
}
return [core.void, { ...state, size: state.size - 1 }] as const
})),
core.zipRight(
attempted.finalizer,
core.flatten(ref.modify(this.state, (state) => {
if (state.size <= this.min) {
return [allocateUinterruptible(this), { ...state, free: state.free + 1 }] as const
}
return [core.void, { ...state, size: state.size - 1 }] as const
}))
),
onSuccess: (item) =>
core.flatMap(ref.get(this.invalidated), (set) => {
if (pipe(set, HashSet.has(item))) {
Expand Down
21 changes: 21 additions & 0 deletions packages/effect/test/Pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,4 +246,25 @@ describe("Pool", () => {
const result = yield* $(Fiber.interrupt(fiber))
expect(result).toEqual(Exit.interrupt(fiberId))
}))

it.scoped("finalizer is called for failed allocations", () =>
Effect.gen(function*() {
const scope = yield* Scope.make()
const count = yield* Ref.make(0)
const get = Effect.acquireRelease(
Ref.updateAndGet(count, (n) => n + 1),
() => Ref.update(count, (n) => n - 1)
).pipe(
Effect.andThen(Effect.fail("boom"))
)
const pool = yield* Pool.make({ acquire: get, size: 10 }).pipe(
Scope.extend(scope)
)
yield* Effect.scoped(pool.get).pipe(
Effect.ignore
)
expect(yield* Ref.get(count)).toBe(10)
yield* Scope.close(scope, Exit.void)
expect(yield* Ref.get(count)).toBe(0)
}))
})
5 changes: 4 additions & 1 deletion packages/platform-browser/src/internal/worker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as Worker from "@effect/platform/Worker"
import { WorkerError } from "@effect/platform/WorkerError"
import * as Deferred from "effect/Deferred"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
import * as Queue from "effect/Queue"
Expand All @@ -19,6 +20,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({
yield* _(Effect.addFinalizer(() => Effect.sync(() => port.postMessage([1]))))

const queue = yield* _(Queue.unbounded<Worker.BackingWorker.Message<O>>())
const latch = yield* Deferred.make<void>()

const fiber = yield* _(
Effect.async<never, WorkerError, never>((resume) => {
Expand All @@ -30,6 +32,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({
}
port.addEventListener("message", onMessage as any)
port.addEventListener("error", onError as any)
Deferred.unsafeDone(latch, Effect.void)
return Effect.sync(() => {
port.removeEventListener("message", onMessage as any)
port.removeEventListener("error", onError as any)
Expand All @@ -38,7 +41,7 @@ const platformWorkerImpl = Worker.PlatformWorker.of({
Effect.interruptible,
Effect.forkScoped
)
yield* _(Effect.yieldNow())
yield* Deferred.await(latch)

if ("start" in port) {
port.start()
Expand Down
11 changes: 5 additions & 6 deletions packages/platform/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,6 @@ export declare namespace Worker {
export interface Options<I> {
readonly encode?: (message: I) => Effect.Effect<unknown, WorkerError>
readonly transfers?: (message: I) => ReadonlyArray<unknown>
readonly permits?: number
readonly queue?: WorkerQueue<I>
readonly initialMessage?: LazyArg<I>
}
Expand Down Expand Up @@ -227,7 +226,7 @@ export const layerManager: Layer.Layer<WorkerManager, never, PlatformWorker> = i
*/
export const makePool: <I, O, E>(
options: WorkerPool.Options<I>
) => Effect.Effect<WorkerPool<I, O, E>, never, WorkerManager | Spawner | Scope.Scope> = internal.makePool
) => Effect.Effect<WorkerPool<I, O, E>, WorkerError, WorkerManager | Spawner | Scope.Scope> = internal.makePool

/**
* @since 1.0.0
Expand All @@ -236,7 +235,7 @@ export const makePool: <I, O, E>(
export const makePoolLayer: <Tag, I, O, E>(
tag: Context.Tag<Tag, WorkerPool<I, O, E>>,
options: WorkerPool.Options<I>
) => Layer.Layer<Tag, never, WorkerManager | Spawner> = internal.makePoolLayer
) => Layer.Layer<Tag, WorkerError, WorkerManager | Spawner> = internal.makePoolLayer

/**
* @since 1.0.0
Expand Down Expand Up @@ -277,7 +276,6 @@ export declare namespace SerializedWorker {
* @category models
*/
export interface BaseOptions<I> {
readonly permits?: number
readonly queue?: WorkerQueue<I>
}
}
Expand Down Expand Up @@ -341,7 +339,8 @@ export const makeSerialized: <I extends Schema.TaggedRequest.Any>(
*/
export const makePoolSerialized: <I extends Schema.TaggedRequest.Any>(
options: SerializedWorkerPool.Options<I>
) => Effect.Effect<SerializedWorkerPool<I>, never, WorkerManager | Spawner | Scope.Scope> = internal.makePoolSerialized
) => Effect.Effect<SerializedWorkerPool<I>, WorkerError, WorkerManager | Spawner | Scope.Scope> =
internal.makePoolSerialized

/**
* @since 1.0.0
Expand All @@ -350,7 +349,7 @@ export const makePoolSerialized: <I extends Schema.TaggedRequest.Any>(
export const makePoolSerializedLayer: <Tag, I extends Schema.TaggedRequest.Any>(
tag: Context.Tag<Tag, SerializedWorkerPool<I>>,
options: SerializedWorkerPool.Options<I>
) => Layer.Layer<Tag, never, WorkerManager | Spawner> = internal.makePoolSerializedLayer
) => Layer.Layer<Tag, WorkerError, WorkerManager | Spawner> = internal.makePoolSerializedLayer

/**
* @since 1.0.0
Expand Down
21 changes: 3 additions & 18 deletions packages/platform/src/WorkerError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
* @since 1.0.0
*/
import * as Schema from "@effect/schema/Schema"
import * as Cause from "effect/Cause"
import { identity } from "effect/Function"
import type * as Cause from "effect/Cause"
import * as Predicate from "effect/Predicate"
import * as internal from "./internal/workerError.js"

Expand All @@ -25,27 +24,13 @@ export type WorkerErrorTypeId = typeof WorkerErrorTypeId
*/
export const isWorkerError = (u: unknown): u is WorkerError => Predicate.hasProperty(u, WorkerErrorTypeId)

const causeDefectPretty: Schema.Schema<unknown> = Schema.transform(
Schema.Unknown,
Schema.Unknown,
{
decode: identity,
encode: (defect) => {
if (Predicate.isObject(defect)) {
return Cause.pretty(Cause.die(defect))
}
return String(defect)
}
}
)

/**
* @since 1.0.0
* @category errors
*/
export class WorkerError extends Schema.TaggedError<WorkerError>()("WorkerError", {
reason: Schema.Literal("spawn", "decode", "send", "unknown", "encode"),
error: causeDefectPretty
error: Schema.CauseDefectUnknown
}) {
/**
* @since 1.0.0
Expand All @@ -58,7 +43,7 @@ export class WorkerError extends Schema.TaggedError<WorkerError>()("WorkerError"
static readonly Cause: Schema.Schema<
Cause.Cause<WorkerError>,
Schema.CauseEncoded<WorkerErrorFrom>
> = Schema.Cause({ defect: causeDefectPretty, error: this })
> = Schema.Cause({ error: this })

/**
* @since 1.0.0
Expand Down
30 changes: 15 additions & 15 deletions packages/platform/src/internal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ export const makeManager = Effect.gen(function*() {
spawn<I, O, E>({
encode,
initialMessage,
permits = 1,
queue,
transfers = (_) => []
}: Worker.Worker.Options<I>) {
return Effect.gen(function*(_) {
const spawn = yield* _(Spawner)
const id = idCounter++
let requestIdCounter = 0
const semaphore = yield* Effect.makeSemaphore(permits)
const requestMap = new Map<
number,
readonly [Queue.Queue<Exit.Exit<ReadonlyArray<O>, E | WorkerError>>, Deferred.Deferred<void>]
Expand Down Expand Up @@ -237,8 +235,7 @@ export const makeManager = Effect.gen(function*() {
executeRelease
)

yield* semaphore.take(1).pipe(
Effect.zipRight(outbound.take),
yield* outbound.take.pipe(
Effect.flatMap(([id, request, span]) =>
pipe(
Effect.suspend(() => {
Expand All @@ -261,7 +258,6 @@ export const makeManager = Effect.gen(function*() {
Effect.zipRight(Deferred.await(result[1]))
)
}),
Effect.ensuring(semaphore.release(1)),
Effect.fork
)
),
Expand Down Expand Up @@ -318,19 +314,21 @@ export const makePool = <I, O, E>(
discard: true
}),
execute: (message: I) =>
Stream.unwrap(
Stream.unwrapScoped(
Effect.map(
Effect.scoped(backing.get),
backing.get,
(worker) => worker.execute(message)
)
),
executeEffect: (message: I) =>
Effect.flatMap(
Effect.scoped(backing.get),
(worker) => worker.executeEffect(message)
Effect.scoped(
Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))
)
}

// report any spawn errors
yield* Effect.scoped(backing.get)

return pool
})

Expand Down Expand Up @@ -418,19 +416,21 @@ export const makePoolSerialized = <I extends Schema.TaggedRequest.Any>(
discard: true
}) as any,
execute: <Req extends I>(message: Req) =>
Stream.unwrap(
Stream.unwrapScoped(
Effect.map(
Effect.scoped(backing.get),
backing.get,
(worker) => worker.execute(message)
)
) as any,
executeEffect: <Req extends I>(message: Req) =>
Effect.flatMap(
Effect.scoped(backing.get),
(worker) => worker.executeEffect(message)
Effect.scoped(
Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))
) as any
}

// report any spawn errors
yield* Effect.scoped(backing.get)

return pool
})

Expand Down