Skip to content

Commit

Permalink
support new Pool options in /platform WorkerPool (#2876)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart committed Jun 6, 2024
1 parent 0f40d98 commit 2b9ddfc
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 29 deletions.
5 changes: 5 additions & 0 deletions .changeset/afraid-meals-tell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": minor
---

support new Pool options in /platform WorkerPool
25 changes: 19 additions & 6 deletions packages/platform/src/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ export declare namespace Worker {
readonly transfers?: ((message: I) => ReadonlyArray<unknown>) | undefined
readonly queue?: WorkerQueue<I> | undefined
readonly initialMessage?: LazyArg<I> | undefined
readonly permits?: number | undefined
}

/**
Expand Down Expand Up @@ -157,10 +156,21 @@ export declare namespace WorkerPool {
* @since 1.0.0
* @category models
*/
export interface Options<I> extends Worker.Options<I> {
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly size: number
}
export type Options<I> =
& Worker.Options<I>
& ({
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly size: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
} | {
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly minSize: number
readonly maxSize: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
readonly timeToLive: Duration.DurationInput
})
}

/**
Expand Down Expand Up @@ -270,7 +280,6 @@ export declare namespace SerializedWorker {
* @category models
*/
export interface BaseOptions<I> {
readonly permits?: number | undefined
readonly queue?: WorkerQueue<I> | undefined
}
}
Expand Down Expand Up @@ -312,10 +321,14 @@ export declare namespace SerializedWorkerPool {
& ({
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly size: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
} | {
readonly onCreate?: (worker: Worker<I, unknown, unknown>) => Effect.Effect<void, WorkerError>
readonly minSize: number
readonly maxSize: number
readonly concurrency?: number | undefined
readonly targetUtilization?: number | undefined
readonly timeToLive: Duration.DurationInput
})
}
Expand Down
62 changes: 39 additions & 23 deletions packages/platform/src/internal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,13 @@ export const makeManager = Effect.gen(function*() {
spawn<I, O, E>({
encode,
initialMessage,
permits = 1,
queue,
transfers = (_) => []
}: Worker.Worker.Options<I>) {
return Effect.gen(function*(_) {
const spawn = yield* _(Spawner)
const id = idCounter++
let requestIdCounter = 0
const semaphore = Effect.unsafeMakeSemaphore(permits)
const requestMap = new Map<
number,
readonly [Queue.Queue<Exit.Exit<ReadonlyArray<O>, E | WorkerError>>, Deferred.Deferred<void>]
Expand Down Expand Up @@ -237,10 +235,9 @@ export const makeManager = Effect.gen(function*() {
executeRelease
)

yield* semaphore.take(1).pipe(
Effect.andThen(outbound.take),
yield* outbound.take.pipe(
Effect.flatMap(([id, request, span]) =>
pipe(
Effect.fork(
Effect.suspend(() => {
const result = requestMap.get(id)
if (!result) return Effect.void
Expand All @@ -260,14 +257,12 @@ export const makeManager = Effect.gen(function*() {
Effect.catchAllCause((cause) => Queue.offer(result[0], Exit.failCause(cause))),
Effect.zipRight(Deferred.await(result[1]))
)
}),
Effect.ensuring(semaphore.release(1)),
Effect.fork
})
)
),
Effect.forever,
Effect.interruptible,
Effect.forkScoped
Effect.forkScoped,
Effect.interruptible
)

if (initialMessage) {
Expand Down Expand Up @@ -299,24 +294,42 @@ export const makePool = <I, O, E>(
Effect.tap((worker) => Effect.addFinalizer(() => Effect.sync(() => workers.delete(worker)))),
options.onCreate ? Effect.tap(options.onCreate) : identity
)
const backing = yield* Pool.make({
acquire,
size: options.size
})
const get = Effect.scoped(backing.get)
const backing = "minSize" in options ?
yield* Pool.makeWithTTL({
acquire,
min: options.minSize,
max: options.maxSize,
concurrency: options.concurrency,
targetUtilization: options.targetUtilization,
timeToLive: options.timeToLive
}) :
yield* Pool.make({
acquire,
size: options.size,
concurrency: options.concurrency,
targetUtilization: options.targetUtilization
})
const pool: Worker.WorkerPool<I, O, E> = {
backing,
broadcast: (message: I) =>
Effect.forEach(workers, (worker) => worker.executeEffect(message), {
concurrency: "unbounded",
discard: true
}),
execute: (message: I) => Stream.unwrap(Effect.map(get, (worker) => worker.execute(message))),
executeEffect: (message: I) => Effect.flatMap(get, (worker) => worker.executeEffect(message))
execute: (message: I) =>
Stream.unwrapScoped(Effect.map(
backing.get,
(worker) => worker.execute(message)
)),
executeEffect: (message: I) =>
Effect.scoped(Effect.flatMap(
backing.get,
(worker) => worker.executeEffect(message)
))
}

// report any spawn errors
yield* get
yield* Effect.scoped(backing.get)

return pool
})
Expand Down Expand Up @@ -391,13 +404,16 @@ export const makePoolSerialized = <I extends Schema.TaggedRequest.Any>(
acquire,
min: options.minSize,
max: options.maxSize,
concurrency: options.concurrency,
targetUtilization: options.targetUtilization,
timeToLive: options.timeToLive
}) :
Pool.make({
acquire,
size: options.size
size: options.size,
concurrency: options.concurrency,
targetUtilization: options.targetUtilization
})
const get = Effect.scoped(backing.get)
const pool: Worker.SerializedWorkerPool<I> = {
backing,
broadcast: <Req extends I>(message: Req) =>
Expand All @@ -406,13 +422,13 @@ export const makePoolSerialized = <I extends Schema.TaggedRequest.Any>(
discard: true
}) as any,
execute: <Req extends I>(message: Req) =>
Stream.unwrap(Effect.map(get, (worker) => worker.execute(message))) as any,
Stream.unwrapScoped(Effect.map(backing.get, (worker) => worker.execute(message))) as any,
executeEffect: <Req extends I>(message: Req) =>
Effect.flatMap(get, (worker) => worker.executeEffect(message)) as any
Effect.scoped(Effect.flatMap(backing.get, (worker) => worker.executeEffect(message))) as any
}

// report any spawn errors
yield* get
yield* Effect.scoped(backing.get)

return pool
})
Expand Down

0 comments on commit 2b9ddfc

Please sign in to comment.