From 025de059270e9477731dcd96993fc810f89a85ba Mon Sep 17 00:00:00 2001 From: evelant Date: Fri, 10 May 2024 13:44:09 -0400 Subject: [PATCH 01/10] add TSubscriptionRef --- .changeset/cold-cougars-pretend.md | 5 + packages/effect/src/TPubSub.ts | 9 + packages/effect/src/TQueue.ts | 4 +- packages/effect/src/TSubscriptionRef.ts | 186 ++++++++++++ packages/effect/src/index.ts | 5 + packages/effect/src/internal/stm/core.ts | 3 +- packages/effect/src/internal/stm/tQueue.ts | 4 +- packages/effect/src/internal/stm/tRef.ts | 2 +- .../src/internal/stm/tSubscriptionRef.ts | 285 ++++++++++++++++++ packages/effect/src/internal/stream.ts | 57 ++++ packages/effect/test/TSubscriptionRef.test.ts | 153 ++++++++++ 11 files changed, 706 insertions(+), 7 deletions(-) create mode 100644 .changeset/cold-cougars-pretend.md create mode 100644 packages/effect/src/TSubscriptionRef.ts create mode 100644 packages/effect/src/internal/stm/tSubscriptionRef.ts create mode 100644 packages/effect/test/TSubscriptionRef.test.ts diff --git a/.changeset/cold-cougars-pretend.md b/.changeset/cold-cougars-pretend.md new file mode 100644 index 0000000000..df705b7505 --- /dev/null +++ b/.changeset/cold-cougars-pretend.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add TSubscriptionRef diff --git a/packages/effect/src/TPubSub.ts b/packages/effect/src/TPubSub.ts index 1a15cfa1b6..f6b7b045ce 100644 --- a/packages/effect/src/TPubSub.ts +++ b/packages/effect/src/TPubSub.ts @@ -107,6 +107,15 @@ export const isEmpty: (self: TPubSub) => STM.STM = internal.isEmp */ export const isFull: (self: TPubSub) => STM.STM = internal.isFull +/** + * Interrupts any fibers that are suspended on `offer` or `take`. Future calls + * to `offer*` and `take*` will be interrupted immediately. + * + * @since 2.0.0 + * @category utils + */ +export const shutdown: (self: TPubSub) => STM.STM = internal.shutdown + /** * Returns `true` if `shutdown` has been called, otherwise returns `false`. * diff --git a/packages/effect/src/TQueue.ts b/packages/effect/src/TQueue.ts index 800555b2c8..83e0e9d7dc 100644 --- a/packages/effect/src/TQueue.ts +++ b/packages/effect/src/TQueue.ts @@ -262,7 +262,7 @@ export const isFull: (self: TQueue) => STM.STM = internal.isFull * @since 2.0.0 * @category getters */ -export const isShutdown: (self: TQueue) => STM.STM = internal.isShutdown +export const isShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.isShutdown /** * Places one value in the queue. @@ -345,7 +345,7 @@ export const seek: { * @since 2.0.0 * @category mutations */ -export const shutdown: (self: TQueue) => STM.STM = internal.shutdown +export const shutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.shutdown /** * Retrieves the size of the queue, which is equal to the number of elements diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts new file mode 100644 index 0000000000..ef9cd114c1 --- /dev/null +++ b/packages/effect/src/TSubscriptionRef.ts @@ -0,0 +1,186 @@ +/** + * @since 3.9.0 + */ +import type * as Effect from "./Effect.js" +import * as internal from "./internal/stm/tSubscriptionRef.js" +import type * as Option from "./Option.js" +import type { Pipeable } from "./Pipeable.js" +import type * as Scope from "./Scope.js" +import type * as STM from "./STM.js" +import type * as Stream from "./Stream.js" +import type * as TPubSub from "./TPubSub.js" +import type * as TQueue from "./TQueue.js" +import type * as TRef from "./TRef.js" +import type * as Types from "./Types.js" + +/** + * @since 3.9.0 + * @category symbols + */ +export const TSubscriptionRefTypeId: unique symbol = internal.TSubscriptionRefTypeId + +/** + * @since 3.9.0 + * @category symbols + */ +export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId + +/** + * A `TSubscriptionRef` is a `TRef` that can be subscribed to in order to + * receive a `TDequeue` of the current value and all committed changes to the value. + * + * @since 3.9.0 + * @category models + */ +export interface TSubscriptionRef extends TSubscriptionRef.Variance, Pipeable { + /** @internal */ + readonly ref: TRef.TRef + /** @internal */ + readonly pubsub: TPubSub.TPubSub + /** + * A TDequeue containing the current value of the `Ref` as well as all changes + * to that value. + */ + readonly changes: STM.STM> + + /** @internal */ + modify(f: (a: A) => readonly [B, A]): STM.STM +} + +/** + * @since 3.9.0 + */ +export declare namespace TSubscriptionRef { + /** + * @since 3.9.0 + * @category models + */ + export interface Variance { + readonly [TSubscriptionRefTypeId]: { + readonly _A: Types.Invariant + } + } +} + +/** + * @since 3.9.0 + * @category mutations + */ +export const get: (self: TSubscriptionRef) => STM.STM = internal.get + +/** + * @since 3.9.0 + * @category mutations + */ +export const getAndSet: { + (value: A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, value: A): STM.STM +} = internal.getAndSet + +/** + * @since 3.9.0 + * @category mutations + */ +export const getAndUpdate: { + (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => A): STM.STM +} = internal.getAndUpdate + +/** + * @since 3.9.0 + * @category mutations + */ +export const getAndUpdateSome: { + (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM +} = internal.getAndUpdateSome + +/** + * @since 3.9.0 + * @category constructors + */ +export const make: (value: A) => STM.STM> = internal.make + +/** + * @since 3.9.0 + * @category mutations + */ +export const modify: { + (f: (a: A) => readonly [B, A]): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => readonly [B, A]): STM.STM +} = internal.modify + +/** + * @since 3.9.0 + * @category mutations + */ +export const modifySome: { + (fallback: B, f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, fallback: B, f: (a: A) => Option.Option): STM.STM +} = internal.modifySome + +/** + * @since 3.9.0 + * @category mutations + */ +export const set: { + (value: A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, value: A): STM.STM +} = internal.set + +/** + * @since 3.9.0 + * @category mutations + */ +export const setAndGet: { + (value: A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, value: A): STM.STM +} = internal.setAndGet + +/** + * @since 3.9.0 + * @category mutations + */ +export const update: { + (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => A): STM.STM +} = internal.update + +/** + * @since 3.9.0 + * @category mutations + */ +export const updateAndGet: { + (f: (a: A) => A): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => A): STM.STM +} = internal.updateAndGet + +/** + * @since 3.9.0 + * @category mutations + */ +export const updateSome: { + (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM +} = internal.updateSome + +/** + * @since 3.9.0 + * @category mutations + */ +export const updateSomeAndGet: { + (f: (a: A) => Option.Option): (self: TSubscriptionRef) => STM.STM + (self: TSubscriptionRef, f: (a: A) => Option.Option): STM.STM +} = internal.updateSomeAndGet + +/** + * @since 3.9.0 + * @category mutations + */ +export const changesScoped: (self: TSubscriptionRef) => Effect.Effect, never, Scope.Scope> = internal.changesScoped + +/** + * @since 3.9.0 + * @category mutations + */ +export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts index cb90a4ac72..2106158191 100644 --- a/packages/effect/src/index.ts +++ b/packages/effect/src/index.ts @@ -895,6 +895,11 @@ export * as TSemaphore from "./TSemaphore.js" */ export * as TSet from "./TSet.js" +/** + * @since 2.0.0 + */ +export * as TSubscriptionRef from "./TSubscriptionRef.js" + /** * @since 2.0.0 */ diff --git a/packages/effect/src/internal/stm/core.ts b/packages/effect/src/internal/stm/core.ts index 7c0f558715..1203090752 100644 --- a/packages/effect/src/internal/stm/core.ts +++ b/packages/effect/src/internal/stm/core.ts @@ -15,11 +15,10 @@ import { pipeArguments } from "../../Pipeable.js" import { hasProperty } from "../../Predicate.js" import type * as Scheduler from "../../Scheduler.js" import type * as STM from "../../STM.js" -import { StreamTypeId } from "../../Stream.js" import { YieldWrap } from "../../Utils.js" import { ChannelTypeId } from "../core-stream.js" import { withFiberRuntime } from "../core.js" -import { effectVariance } from "../effectable.js" +import { effectVariance, StreamTypeId } from "../effectable.js" import { OP_COMMIT } from "../opCodes/effect.js" import { SingleShotGen } from "../singleShotGen.js" import { SinkTypeId } from "../sink.js" diff --git a/packages/effect/src/internal/stm/tQueue.ts b/packages/effect/src/internal/stm/tQueue.ts index 90039374f3..8ca27b9865 100644 --- a/packages/effect/src/internal/stm/tQueue.ts +++ b/packages/effect/src/internal/stm/tQueue.ts @@ -3,7 +3,7 @@ import * as Chunk from "../../Chunk.js" import { dual, pipe } from "../../Function.js" import * as Option from "../../Option.js" import { hasProperty, type Predicate } from "../../Predicate.js" -import * as STM from "../../STM.js" +import type * as STM from "../../STM.js" import type * as TQueue from "../../TQueue.js" import type * as TRef from "../../TRef.js" import * as core from "./core.js" @@ -99,7 +99,7 @@ class TQueueImpl implements TQueue.TQueue { size: STM.STM = core.withSTMRuntime((runtime) => { const queue = tRef.unsafeGet(this.ref, runtime.journal) if (queue === undefined) { - return STM.interruptAs(runtime.fiberId) + return core.interruptAs(runtime.fiberId) } return core.succeed(queue.length) }) diff --git a/packages/effect/src/internal/stm/tRef.ts b/packages/effect/src/internal/stm/tRef.ts index c780509360..45566333d0 100644 --- a/packages/effect/src/internal/stm/tRef.ts +++ b/packages/effect/src/internal/stm/tRef.ts @@ -16,7 +16,7 @@ export const TRefTypeId: TRef.TRefTypeId = Symbol.for( TRefSymbolKey ) as TRef.TRefTypeId -const tRefVariance = { +export const tRefVariance = { /* c8 ignore next */ _A: (_: any) => _ } diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts new file mode 100644 index 0000000000..e6a06b9fad --- /dev/null +++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts @@ -0,0 +1,285 @@ +import * as Effect from "../../Effect.js" +import { dual, pipe } from "../../Function.js" +import * as Option from "../../Option.js" +import { pipeArguments } from "../../Pipeable.js" +import * as STM from "../../STM.js" +import * as TPubSub from "../../TPubSub.js" +import * as TQueue from "../../TQueue.js" +import * as TRef from "../../TRef.js" +import type * as TSubscriptionRef from "../../TSubscriptionRef.js" +import * as stream from "../stream.js" + +/** @internal */ +const TSubscriptionRefSymbolKey = "effect/TSubscriptionRef" + +/** @internal */ +export const TSubscriptionRefTypeId: TSubscriptionRef.TSubscriptionRefTypeId = Symbol.for( + TSubscriptionRefSymbolKey +) as TSubscriptionRef.TSubscriptionRefTypeId + +const TSubscriptionRefVariance = { + /* c8 ignore next */ + _A: (_: any) => _ +} + +/** @internal */ +class TSubscriptionRefImpl implements TSubscriptionRef.TSubscriptionRef { + readonly [TSubscriptionRefTypeId] = TSubscriptionRefVariance + constructor( + readonly ref: TRef.TRef, + readonly pubsub: TPubSub.TPubSub + ) { + } + + pipe() { + return pipeArguments(this, arguments) + } + + get changes(): STM.STM>{ + return STM.flatMap(TQueue.unbounded(), queue => pipe( + STM.flatMap(TRef.get(this.ref), a => TQueue.offer(queue, a)), + STM.flatMap(() => TPubSub.subscribe(this.pubsub)), + STM.flatMap(dequeue => pipe( + TQueue.poll(dequeue), + STM.tap(a => Option.isNone(a) ? TQueue.shutdown(dequeue) : TQueue.offer(queue, a.value)), + )), + STM.as(queue) + )) + // return TPubSub.subscribe(this.pubsub) + // return STM.flatMap(TQueue.unbounded(), queue => pipe( + // STM.flatMap(TRef.get(this.ref), a => TQueue.offer(queue, a)), + // STM.flatMap(() => TPubSub.subscribe(this.pubsub)), + // STM.flatMap(dequeue => pipe( + // TQueue.takeAll(dequeue), + // STM.tap(as => TQueue.offerAll(queue, as)), + // STM.map(() => queue) + // )) + // )) + // return pipe( + // TRef.get(this.ref), + // STM.flatMap(a => + // pipe( + // TPubSub.subscribe(this.pubsub), + // STM.flatMap(dequeue => pipe( + // TQueue.unbounded(), + // STM.flatMap(queue => pipe( + // TQueue.offer(queue, a), + // STM.flatMap(() => TQueue.takeAll(dequeue)), + // STM.tap(as => TQueue.offerAll(queue, as)), + // STM.map(() => queue) + // )) + // )) + // ) + // ) + // ) + // return pipe( + // TRef.get(this.ref), + // Effect.flatMap((a) => + // Effect.map( + // streamInternal.fromTPubSub(this.pubsub, { scoped: true }), + // (s) => + // streamInternal.concat( + // streamInternal.make(a), + // s + // ) + // ) + // ), + // streamInternal.unwrapScoped + // ) + } + + modify(f: (a: A) => readonly [B, A]): STM.STM { + return pipe( + TRef.get(this.ref), + STM.map(f), + STM.flatMap(([b, a]) => + pipe( + TRef.set(this.ref, a), + STM.as(b), + STM.zipLeft(TPubSub.publish(this.pubsub, a)) + ) + ) + ) + } +} + +/** @internal */ +export const make = (value: A): STM.STM> => + pipe( + STM.all([ + TPubSub.unbounded(), + TRef.make(value) + ]), + STM.map(([pubsub, ref]) => new TSubscriptionRefImpl(ref, pubsub)) + ) + +/** @internal */ +export const get = (self: TSubscriptionRef.TSubscriptionRef) => TRef.get(self.ref) + +/** @internal */ +export const set = dual< + (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM +>( + 2, + (self: TSubscriptionRef.TSubscriptionRef, value: A): STM.STM => + self.modify((): [void, A] => [void 0, value]) +) + +/** @internal */ +export const getAndSet = dual< + (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM +>(2, (self, value) => self.modify((a) => [a, value])) + +/** @internal */ +export const getAndUpdate = dual< + (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM +>(2, (self, f) => self.modify((a) => [a, f(a)])) + +/** @internal */ +export const getAndUpdateSome = dual< + (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM +>(2, (self, f) => + self.modify((a) => + Option.match(f(a), { + onNone: () => [a, a], + onSome: (b) => [a, b] + }) + )) + +/** @internal */ +export const setAndGet = dual< + (value: A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, value: A) => STM.STM +>(2, (self, value) => self.modify(() => [value, value])) + +/** @internal */ +export const modify = dual< + (f: (a: A) => readonly [B, A]) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => readonly [B, A]) => STM.STM +>(2, (self, f) => self.modify(f)) + +/** @internal */ +export const modifySome = dual< + ( + fallback: B, + f: (a: A) => Option.Option + ) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + ( + self: TSubscriptionRef.TSubscriptionRef, + fallback: B, + f: (a: A) => Option.Option + ) => STM.STM +>(3, (self, fallback, f) => + self.modify((a) => + Option.match(f(a), { + onNone: () => [fallback, a], + onSome: (b) => b + }) + )) + +/** @internal */ +export const update = dual< + (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM +>(2, (self, f) => self.modify((a) => [void 0, f(a)])) + +/** @internal */ +export const updateAndGet = dual< + (f: (a: A) => A) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => A) => STM.STM +>(2, (self, f) => + self.modify((a) => { + const b = f(a) + return [b, b] + })) + +/** @internal */ +export const updateSome = dual< + (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM +>( + 2, + (self, f) => + self.modify((a) => [ + void 0, + Option.match(f(a), { + onNone: () => a, + onSome: (b) => b + }) + ]) +) + +/** @internal */ +export const updateSomeAndGet = dual< + (f: (a: A) => Option.Option) => (self: TSubscriptionRef.TSubscriptionRef) => STM.STM, + (self: TSubscriptionRef.TSubscriptionRef, f: (a: A) => Option.Option) => STM.STM +>( + 2, + (self, f) => + self.modify((a) => + Option.match(f(a), { + onNone: () => [a, a], + onSome: (b) => [b, b] + }) + ) +) + +/** @internal */ +export const changesScoped = (self: TSubscriptionRef.TSubscriptionRef) => + Effect.acquireRelease( + pipe( + Effect.flatMap(TQueue.unbounded(), queue => + Effect.flatMap(TPubSub.subscribeScoped(self.pubsub), dequeue => pipe( + STM.flatMap(TRef.get(self.ref), a => TQueue.offer(queue, a)), + STM.flatMap(() => pipe( + TQueue.takeAll(dequeue), + STM.tap(as => TQueue.offerAll(queue, as)), + )), + STM.as(queue) + ) + )), + ), + (dequeue) => TQueue.shutdown(dequeue) + ) + +/** @internal */ +export const changesStream = (self: TSubscriptionRef.TSubscriptionRef) => + pipe( + TRef.get(self.ref), + Effect.flatMap((a) => + Effect.map( + stream.fromTPubSub(self.pubsub, { scoped: true }), + (s) => + stream.concat( + stream.make(a), + s + ) + ) + ), + stream.unwrapScoped +) + + // pipe( + // changesScoped(self), + // Effect.map(t => stream.fromTQueue(t, { shutdown: true })), + // stream.unwrap + // ) + +// pipe( +// TRef.get(self.ref), +// Effect.flatMap((a) => +// Effect.map( +// stream.fromTPubSub(self.pubsub, { scoped: true }), +// (s) => +// stream.concat( +// stream.make(a), +// s +// ) +// ) +// ), +// stream.unwrapScoped +// ) diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index c5abc3e098..b276c46897 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -31,6 +31,8 @@ import type * as Stream from "../Stream.js" import type * as Emit from "../StreamEmit.js" import * as HaltStrategy from "../StreamHaltStrategy.js" import type * as Take from "../Take.js" +import * as TPubSub from "../TPubSub.js" +import * as TQueue from "../TQueue.js" import type * as Tracer from "../Tracer.js" import * as Tuple from "../Tuple.js" import type { NoInfer, TupleOf } from "../Types.js" @@ -3133,6 +3135,36 @@ export const fromPubSub: { return options?.shutdown ? ensuring(stream, PubSub.shutdown(pubsub)) : stream } +/** @internal */ +export const fromTPubSub: { + (pubsub: TPubSub.TPubSub, options: { + readonly scoped: true + readonly maxChunkSize?: number | undefined + readonly shutdown?: boolean | undefined + }): Effect.Effect, never, Scope.Scope> + (pubsub: TPubSub.TPubSub, options?: { + readonly scoped?: false | undefined + readonly maxChunkSize?: number | undefined + readonly shutdown?: boolean | undefined + }): Stream.Stream +} = (pubsub, options): any => { + const maxChunkSize = options?.maxChunkSize ?? DefaultChunkSize + + if (options?.scoped) { + const effect = Effect.map( + TPubSub.subscribe(pubsub), + (queue) => fromTQueue(queue, { maxChunkSize, shutdown: true }) + ) + + return options.shutdown ? Effect.map(effect, ensuring(TPubSub.shutdown(pubsub))) : effect + } + const stream = flatMap( + scoped(TPubSub.subscribe(pubsub)), + (queue) => fromTQueue(queue, { maxChunkSize }) + ) + return options?.shutdown ? ensuring(stream, TPubSub.shutdown(pubsub)) : stream +} + /** @internal */ export const fromIterable = (iterable: Iterable): Stream.Stream => suspend(() => @@ -3224,6 +3256,31 @@ export const fromQueue = ( options?.shutdown ? ensuring(Queue.shutdown(queue)) : identity ) +/** @internal */ +export const fromTQueue = ( + queue: TQueue.TDequeue, + options?: { + readonly maxChunkSize?: number | undefined + readonly shutdown?: boolean | undefined + } +): Stream.Stream => + pipe( + TQueue.takeBetween(queue, 1, options?.maxChunkSize ?? DefaultChunkSize), + Effect.map(Chunk.fromIterable), + Effect.catchAllCause((cause) => + pipe( + TQueue.isShutdown(queue), + Effect.flatMap((isShutdown) => + isShutdown && Cause.isInterrupted(cause) ? + pull.end() : + pull.failCause(cause) + ) + ) + ), + repeatEffectChunkOption, + options?.shutdown ? ensuring(TQueue.shutdown(queue)) : identity + ) + /** @internal */ export const fromSchedule = (schedule: Schedule.Schedule): Stream.Stream => pipe( diff --git a/packages/effect/test/TSubscriptionRef.test.ts b/packages/effect/test/TSubscriptionRef.test.ts new file mode 100644 index 0000000000..a285c61864 --- /dev/null +++ b/packages/effect/test/TSubscriptionRef.test.ts @@ -0,0 +1,153 @@ +import { Chunk, Deferred, Effect, Equal, Exit, Fiber, pipe, Random, STM, Stream } from "effect" +import * as Number from "effect/Number" +import * as it from "effect/test/utils/extend" +import * as TSubscriptionRef from "effect/TSubscriptionRef" +import { assert, describe } from "vitest" + +describe.concurrent("TSubscriptionRef", () => { + it.effect("only emits comitted values", () => + Effect.gen(function*($) { + const subscriptionRef = yield* $(TSubscriptionRef.make(0)) + + const transaction = pipe( + TSubscriptionRef.update(subscriptionRef, (n) => n + 1), + STM.tap(() => TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + ) + + const subscriber = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.take(1), + Stream.runCollect, + Effect.fork + )) + // stream doesn't work properly without a yield, it will drop values + yield* $(Effect.yieldNow()) + yield* $(STM.commit(transaction)) + yield* $(Effect.yieldNow()) + const result = yield* $(Fiber.join(subscriber)) + + assert.deepStrictEqual(Array.from(result), [2]) + })) + + it.effect("emits every comitted value", () => + Effect.gen(function*($) { + const subscriptionRef = yield* $(TSubscriptionRef.make(0)) + + const transaction = pipe( + TSubscriptionRef.update(subscriptionRef, (n) => n + 1), + STM.commit, + // stream doesn't work properly without a yield, it will drop the first value without this + Effect.tap(() => Effect.yieldNow()), + Effect.flatMap(() => TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + ) + + const subscriber = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.take(2), + Stream.runCollect, + Effect.fork + )) + // stream doesn't work properly without a yield, it will drop the first value without this + yield* $(Effect.yieldNow()) + yield* $(transaction) + const result = yield* $(Fiber.join(subscriber)) + + assert.deepStrictEqual(Array.from(result), [1, 2]) + })) + + it.effect("multiple subscribers can receive committed values", () => + Effect.gen(function*($) { + const subscriptionRef = yield* $(TSubscriptionRef.make(0)) + const deferred1 = yield* $(Deferred.make()) + const deferred2 = yield* $(Deferred.make()) + const subscriber1 = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.tap(() => Deferred.succeed(deferred1, void 0)), + Stream.take(3), + Stream.tap((v) => Effect.sync(() => console.log(`received value on subscriber 1: ${v}`))), + Stream.runCollect, + Effect.fork + )) + yield* $(Deferred.await(deferred1)) + yield* $(TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + const subscriber2 = yield* $(pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.tap(() => Deferred.succeed(deferred2, void 0)), + Stream.take(2), + Stream.tap((v) => Effect.sync(() => console.log(`received value on subscriber 2: ${v}`))), + Stream.runCollect, + Effect.fork + )) + yield* $(Deferred.await(deferred2)) + yield* $(TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) + console.log(`joining fibers`) + const result1 = yield* $(Fiber.join(subscriber1)) + console.log(`joined 1`) + const result2 = yield* $(Fiber.join(subscriber2)) + console.log(`joined 2`) + assert.deepStrictEqual(Array.from(result1), [0, 1, 2]) + assert.deepStrictEqual(Array.from(result2), [1, 2]) + })) + + it.effect("subscriptions are interruptible", () => + Effect.gen(function*($) { + const ref = yield* $(TSubscriptionRef.make(0)) + const deferred1 = yield* $(Deferred.make()) + const deferred2 = yield* $(Deferred.make()) + const subscriber1 = yield* $( + TSubscriptionRef.changesStream(ref), + Stream.tap(() => Deferred.succeed(deferred1, void 0)), + Stream.take(5), + Stream.runCollect, + Effect.fork + ) + yield* $(Deferred.await(deferred1)) + yield* $(TSubscriptionRef.update(ref, (n) => n + 1)) + const subscriber2 = yield* $( + TSubscriptionRef.changesStream(ref), + Stream.tap(() => Deferred.succeed(deferred2, void 0)), + Stream.take(2), + Stream.runCollect, + Effect.fork + ) + yield* $(Deferred.await(deferred2)) + yield* $(TSubscriptionRef.update(ref, (n) => n + 1)) + const result1 = yield* $(Fiber.interrupt(subscriber1)) + const result2 = yield* $(Fiber.join(subscriber2)) + assert.isTrue(Exit.isInterrupted(result1)) + assert.deepStrictEqual(Array.from(result2), [1, 2]) + })) + + it.effect("concurrent subscribes and unsubscribes are handled correctly", () => + Effect.gen(function*($) { + const subscriber = (subscriptionRef: TSubscriptionRef.TSubscriptionRef) => + pipe( + Random.nextIntBetween(0, 200), + Effect.flatMap((n) => + pipe( + TSubscriptionRef.changesStream(subscriptionRef), + Stream.take(n), + Stream.runCollect + ) + ) + ) + const ref = yield* $(TSubscriptionRef.make(0)) + const fiber = yield* $( + TSubscriptionRef.update(ref, (n) => n + 1), + Effect.forever, + Effect.fork + ) + const result = yield* $( + Effect.map( + Effect.all( + Array.from({ length: 2 }, () => subscriber(ref)), + { concurrency: 2 } + ), + Chunk.unsafeFromArray + ) + ) + yield* $(Fiber.interrupt(fiber)) + const isSorted = Chunk.every(result, (chunk) => Equal.equals(chunk, Chunk.sort(chunk, Number.Order))) + assert.isTrue(isSorted) + })) +}) From 58aa96b544d85b5da23bab5fc48a095accb77412 Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Mon, 7 Oct 2024 12:31:43 +0200 Subject: [PATCH 02/10] merge dequeue, implement TRef --- packages/effect/src/TRef.ts | 3 +- packages/effect/src/TSubscriptionRef.ts | 17 +- packages/effect/src/internal/stm/tRef.ts | 7 +- .../src/internal/stm/tSubscriptionRef.ts | 223 +++++++++++------- 4 files changed, 155 insertions(+), 95 deletions(-) diff --git a/packages/effect/src/TRef.ts b/packages/effect/src/TRef.ts index 5b98a7c653..1dd83e9c4e 100644 --- a/packages/effect/src/TRef.ts +++ b/packages/effect/src/TRef.ts @@ -7,6 +7,7 @@ import type * as TxnId from "./internal/stm/stm/txnId.js" import type * as Versioned from "./internal/stm/stm/versioned.js" import * as internal from "./internal/stm/tRef.js" import type * as Option from "./Option.js" +import type { Pipeable } from "./Pipeable.js" import type * as STM from "./STM.js" import type * as Types from "./Types.js" @@ -34,7 +35,7 @@ export type TRefTypeId = typeof TRefTypeId * @since 2.0.0 * @category models */ -export interface TRef extends TRef.Variance { +export interface TRef extends TRef.Variance, Pipeable { /** * Note: the method is unbound, exposed only for potential extensions. */ diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts index ef9cd114c1..e7fbbef689 100644 --- a/packages/effect/src/TSubscriptionRef.ts +++ b/packages/effect/src/TSubscriptionRef.ts @@ -4,7 +4,6 @@ import type * as Effect from "./Effect.js" import * as internal from "./internal/stm/tSubscriptionRef.js" import type * as Option from "./Option.js" -import type { Pipeable } from "./Pipeable.js" import type * as Scope from "./Scope.js" import type * as STM from "./STM.js" import type * as Stream from "./Stream.js" @@ -32,19 +31,19 @@ export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId * @since 3.9.0 * @category models */ -export interface TSubscriptionRef extends TSubscriptionRef.Variance, Pipeable { +export interface TSubscriptionRef extends TSubscriptionRef.Variance, TRef.TRef { /** @internal */ readonly ref: TRef.TRef /** @internal */ readonly pubsub: TPubSub.TPubSub + /** @internal */ + modify(f: (a: A) => readonly [B, A]): STM.STM + /** * A TDequeue containing the current value of the `Ref` as well as all changes * to that value. */ - readonly changes: STM.STM> - - /** @internal */ - modify(f: (a: A) => readonly [B, A]): STM.STM + readonly changes: STM.STM> } /** @@ -177,10 +176,12 @@ export const updateSomeAndGet: { * @since 3.9.0 * @category mutations */ -export const changesScoped: (self: TSubscriptionRef) => Effect.Effect, never, Scope.Scope> = internal.changesScoped +export const changesScoped: (self: TSubscriptionRef) => Effect.Effect, never, Scope.Scope> = + internal.changesScoped /** * @since 3.9.0 * @category mutations */ -export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream +export const changesStream: (self: TSubscriptionRef) => Stream.Stream = + internal.changesStream diff --git a/packages/effect/src/internal/stm/tRef.ts b/packages/effect/src/internal/stm/tRef.ts index 45566333d0..3162fc252b 100644 --- a/packages/effect/src/internal/stm/tRef.ts +++ b/packages/effect/src/internal/stm/tRef.ts @@ -1,5 +1,7 @@ import { dual } from "../../Function.js" import * as Option from "../../Option.js" +import type { Pipeable } from "../../Pipeable.js" +import { pipeArguments } from "../../Pipeable.js" import type * as STM from "../../STM.js" import type * as TRef from "../../TRef.js" import * as core from "./core.js" @@ -22,7 +24,7 @@ export const tRefVariance = { } /** @internal */ -export class TRefImpl implements TRef.TRef { +export class TRefImpl implements TRef.TRef, Pipeable { readonly [TRefTypeId] = tRefVariance /** @internal */ todos: Map @@ -40,6 +42,9 @@ export class TRefImpl implements TRef.TRef { return retValue }) } + pipe() { + return pipeArguments(this, arguments) + } } /** @internal */ diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts index e6a06b9fad..c9b743a605 100644 --- a/packages/effect/src/internal/stm/tSubscriptionRef.ts +++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts @@ -8,6 +8,8 @@ import * as TQueue from "../../TQueue.js" import * as TRef from "../../TRef.js" import type * as TSubscriptionRef from "../../TSubscriptionRef.js" import * as stream from "../stream.js" +import { tDequeueVariance } from "./tQueue.js" +import { tRefVariance } from "./tRef.js" /** @internal */ const TSubscriptionRefSymbolKey = "effect/TSubscriptionRef" @@ -22,70 +24,117 @@ const TSubscriptionRefVariance = { _A: (_: any) => _ } +class TDequeueMerge implements TQueue.TDequeue { + [TQueue.TDequeueTypeId] = tDequeueVariance + + constructor( + readonly first: TQueue.TDequeue, + readonly second: TQueue.TDequeue + ) {} + + peek: STM.STM = STM.gen(this, function*() { + const first = yield* this.peekOption + if (first._tag === "Some") { + return first.value + } + return yield* STM.retry + }) + + peekOption: STM.STM, never, never> = STM.gen(this, function*() { + const first = yield* this.first.peekOption + if (first._tag === "Some") { + return first + } + const second = yield* this.second.peekOption + if (second._tag === "Some") { + return second + } + return Option.none() + }) + + take: STM.STM = STM.gen(this, function*() { + if (!(yield* this.first.isEmpty)) { + return yield* this.first.take + } + if (!(yield* this.second.isEmpty)) { + return yield* this.second.take + } + return yield* STM.retry + }) + + takeAll: STM.STM, never, never> = STM.gen(this, function*() { + return [...yield* this.first.takeAll, ...yield* this.second.takeAll] + }) + + takeUpTo(max: number): STM.STM, never, never> { + return STM.gen(this, function*() { + const first = yield* this.first.takeUpTo(max) + if (first.length >= max) { + return first + } + return [...first, ...yield* this.second.takeUpTo(max - first.length)] + }) + } + + capacity(): number { + return this.first.capacity() + this.second.capacity() + } + + size: STM.STM = STM.gen(this, function*() { + return (yield* this.first.size) + (yield* this.second.size) + }) + + isFull: STM.STM = STM.gen(this, function*() { + return (yield* this.first.isFull) && (yield* this.second.isFull) + }) + + isEmpty: STM.STM = STM.gen(this, function*() { + return (yield* this.first.isEmpty) && (yield* this.second.isEmpty) + }) + + shutdown: STM.STM = STM.gen(this, function*() { + yield* this.first.shutdown + yield* this.second.shutdown + }) + + isShutdown: STM.STM = STM.gen(this, function*() { + return (yield* this.first.isShutdown) && (yield* this.second.isShutdown) + }) + + awaitShutdown: STM.STM = STM.gen(this, function*() { + yield* this.first.awaitShutdown + yield* this.second.awaitShutdown + }) +} + /** @internal */ class TSubscriptionRefImpl implements TSubscriptionRef.TSubscriptionRef { readonly [TSubscriptionRefTypeId] = TSubscriptionRefVariance + readonly [TRef.TRefTypeId] = tRefVariance + constructor( readonly ref: TRef.TRef, readonly pubsub: TPubSub.TPubSub - ) { + ) {} + + get todos() { + return this.ref.todos } - + + get versioned() { + return this.ref.versioned + } + pipe() { return pipeArguments(this, arguments) } - - get changes(): STM.STM>{ - return STM.flatMap(TQueue.unbounded(), queue => pipe( - STM.flatMap(TRef.get(this.ref), a => TQueue.offer(queue, a)), - STM.flatMap(() => TPubSub.subscribe(this.pubsub)), - STM.flatMap(dequeue => pipe( - TQueue.poll(dequeue), - STM.tap(a => Option.isNone(a) ? TQueue.shutdown(dequeue) : TQueue.offer(queue, a.value)), - )), - STM.as(queue) - )) - // return TPubSub.subscribe(this.pubsub) - // return STM.flatMap(TQueue.unbounded(), queue => pipe( - // STM.flatMap(TRef.get(this.ref), a => TQueue.offer(queue, a)), - // STM.flatMap(() => TPubSub.subscribe(this.pubsub)), - // STM.flatMap(dequeue => pipe( - // TQueue.takeAll(dequeue), - // STM.tap(as => TQueue.offerAll(queue, as)), - // STM.map(() => queue) - // )) - // )) - // return pipe( - // TRef.get(this.ref), - // STM.flatMap(a => - // pipe( - // TPubSub.subscribe(this.pubsub), - // STM.flatMap(dequeue => pipe( - // TQueue.unbounded(), - // STM.flatMap(queue => pipe( - // TQueue.offer(queue, a), - // STM.flatMap(() => TQueue.takeAll(dequeue)), - // STM.tap(as => TQueue.offerAll(queue, as)), - // STM.map(() => queue) - // )) - // )) - // ) - // ) - // ) - // return pipe( - // TRef.get(this.ref), - // Effect.flatMap((a) => - // Effect.map( - // streamInternal.fromTPubSub(this.pubsub, { scoped: true }), - // (s) => - // streamInternal.concat( - // streamInternal.make(a), - // s - // ) - // ) - // ), - // streamInternal.unwrapScoped - // ) + + get changes(): STM.STM> { + return STM.gen(this, function*() { + const first = yield* TQueue.unbounded() + yield* TQueue.offer(first, yield* TRef.get(this.ref)) + return new TDequeueMerge(first, yield* TPubSub.subscribe(this.pubsub)) + }) } modify(f: (a: A) => readonly [B, A]): STM.STM { @@ -232,43 +281,47 @@ export const updateSomeAndGet = dual< export const changesScoped = (self: TSubscriptionRef.TSubscriptionRef) => Effect.acquireRelease( pipe( - Effect.flatMap(TQueue.unbounded(), queue => - Effect.flatMap(TPubSub.subscribeScoped(self.pubsub), dequeue => pipe( - STM.flatMap(TRef.get(self.ref), a => TQueue.offer(queue, a)), - STM.flatMap(() => pipe( - TQueue.takeAll(dequeue), - STM.tap(as => TQueue.offerAll(queue, as)), - )), - STM.as(queue) - ) - )), + Effect.flatMap(TQueue.unbounded(), (queue) => + Effect.flatMap(TPubSub.subscribeScoped(self.pubsub), (dequeue) => + pipe( + STM.flatMap(TRef.get(self.ref), (a) => + TQueue.offer(queue, a)), + STM.flatMap(() => + pipe( + TQueue.takeAll(dequeue), + STM.tap((as) => TQueue.offerAll(queue, as)) + ) + ), + STM.as(queue) + ))) ), - (dequeue) => TQueue.shutdown(dequeue) + (dequeue) => + TQueue.shutdown(dequeue) ) /** @internal */ export const changesStream = (self: TSubscriptionRef.TSubscriptionRef) => - pipe( - TRef.get(self.ref), - Effect.flatMap((a) => - Effect.map( - stream.fromTPubSub(self.pubsub, { scoped: true }), - (s) => - stream.concat( - stream.make(a), - s - ) - ) - ), - stream.unwrapScoped -) + pipe( + TRef.get(self.ref), + Effect.flatMap((a) => + Effect.map( + stream.fromTPubSub(self.pubsub, { scoped: true }), + (s) => + stream.concat( + stream.make(a), + s + ) + ) + ), + stream.unwrapScoped + ) + +// pipe( +// changesScoped(self), +// Effect.map(t => stream.fromTQueue(t, { shutdown: true })), +// stream.unwrap +// ) - // pipe( - // changesScoped(self), - // Effect.map(t => stream.fromTQueue(t, { shutdown: true })), - // stream.unwrap - // ) - // pipe( // TRef.get(self.ref), // Effect.flatMap((a) => From 0dcdca2d399590193218420cca360c866f4b8a56 Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Mon, 7 Oct 2024 15:55:20 +0200 Subject: [PATCH 03/10] fix test, fix implementation of changesStream and changesScoped, expose changes --- packages/effect/src/TSubscriptionRef.ts | 6 ++ packages/effect/src/internal/stm/tPubSub.ts | 1 + .../src/internal/stm/tSubscriptionRef.ts | 56 +------------------ packages/effect/test/TSubscriptionRef.test.ts | 5 -- 4 files changed, 9 insertions(+), 59 deletions(-) diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts index e7fbbef689..791ea55a6c 100644 --- a/packages/effect/src/TSubscriptionRef.ts +++ b/packages/effect/src/TSubscriptionRef.ts @@ -185,3 +185,9 @@ export const changesScoped: (self: TSubscriptionRef) => Effect.Effect(self: TSubscriptionRef) => Stream.Stream = internal.changesStream + +/** + * @since 3.9.0 + * @category mutations + */ +export const changes: (self: TSubscriptionRef) => STM.STM> = (self) => self.changes diff --git a/packages/effect/src/internal/stm/tPubSub.ts b/packages/effect/src/internal/stm/tPubSub.ts index 089be12ce5..b838ddb90a 100644 --- a/packages/effect/src/internal/stm/tPubSub.ts +++ b/packages/effect/src/internal/stm/tPubSub.ts @@ -201,6 +201,7 @@ class TPubSubSubscriptionImpl implements TQueue.TDequeue { capacity(): number { return this.requestedCapacity } + size: STM.STM = core.withSTMRuntime((runtime) => { let currentSubscriberHead = tRef.unsafeGet(this.subscriberHead, runtime.journal) if (currentSubscriberHead === undefined) { diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts index c9b743a605..0b114cf397 100644 --- a/packages/effect/src/internal/stm/tSubscriptionRef.ts +++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts @@ -279,60 +279,8 @@ export const updateSomeAndGet = dual< /** @internal */ export const changesScoped = (self: TSubscriptionRef.TSubscriptionRef) => - Effect.acquireRelease( - pipe( - Effect.flatMap(TQueue.unbounded(), (queue) => - Effect.flatMap(TPubSub.subscribeScoped(self.pubsub), (dequeue) => - pipe( - STM.flatMap(TRef.get(self.ref), (a) => - TQueue.offer(queue, a)), - STM.flatMap(() => - pipe( - TQueue.takeAll(dequeue), - STM.tap((as) => TQueue.offerAll(queue, as)) - ) - ), - STM.as(queue) - ))) - ), - (dequeue) => - TQueue.shutdown(dequeue) - ) + Effect.acquireRelease(self.changes, TQueue.shutdown) /** @internal */ export const changesStream = (self: TSubscriptionRef.TSubscriptionRef) => - pipe( - TRef.get(self.ref), - Effect.flatMap((a) => - Effect.map( - stream.fromTPubSub(self.pubsub, { scoped: true }), - (s) => - stream.concat( - stream.make(a), - s - ) - ) - ), - stream.unwrapScoped - ) - -// pipe( -// changesScoped(self), -// Effect.map(t => stream.fromTQueue(t, { shutdown: true })), -// stream.unwrap -// ) - -// pipe( -// TRef.get(self.ref), -// Effect.flatMap((a) => -// Effect.map( -// stream.fromTPubSub(self.pubsub, { scoped: true }), -// (s) => -// stream.concat( -// stream.make(a), -// s -// ) -// ) -// ), -// stream.unwrapScoped -// ) + stream.unwrap(Effect.map(self.changes, (queue) => stream.fromTQueue(queue, { shutdown: true }))) diff --git a/packages/effect/test/TSubscriptionRef.test.ts b/packages/effect/test/TSubscriptionRef.test.ts index a285c61864..2cc54e89b9 100644 --- a/packages/effect/test/TSubscriptionRef.test.ts +++ b/packages/effect/test/TSubscriptionRef.test.ts @@ -64,7 +64,6 @@ describe.concurrent("TSubscriptionRef", () => { TSubscriptionRef.changesStream(subscriptionRef), Stream.tap(() => Deferred.succeed(deferred1, void 0)), Stream.take(3), - Stream.tap((v) => Effect.sync(() => console.log(`received value on subscriber 1: ${v}`))), Stream.runCollect, Effect.fork )) @@ -74,17 +73,13 @@ describe.concurrent("TSubscriptionRef", () => { TSubscriptionRef.changesStream(subscriptionRef), Stream.tap(() => Deferred.succeed(deferred2, void 0)), Stream.take(2), - Stream.tap((v) => Effect.sync(() => console.log(`received value on subscriber 2: ${v}`))), Stream.runCollect, Effect.fork )) yield* $(Deferred.await(deferred2)) yield* $(TSubscriptionRef.update(subscriptionRef, (n) => n + 1)) - console.log(`joining fibers`) const result1 = yield* $(Fiber.join(subscriber1)) - console.log(`joined 1`) const result2 = yield* $(Fiber.join(subscriber2)) - console.log(`joined 2`) assert.deepStrictEqual(Array.from(result1), [0, 1, 2]) assert.deepStrictEqual(Array.from(result2), [1, 2]) })) From a168e7aa3155c89f09c8528c0cb3467d9770ba69 Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Mon, 7 Oct 2024 15:56:06 +0200 Subject: [PATCH 04/10] fix changesStream signature --- packages/effect/src/TSubscriptionRef.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts index 791ea55a6c..2087c2c4e1 100644 --- a/packages/effect/src/TSubscriptionRef.ts +++ b/packages/effect/src/TSubscriptionRef.ts @@ -183,8 +183,7 @@ export const changesScoped: (self: TSubscriptionRef) => Effect.Effect(self: TSubscriptionRef) => Stream.Stream = - internal.changesStream +export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream /** * @since 3.9.0 From 4e192dd945996753dfd4e34a42f242e515f70c6a Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Mon, 7 Oct 2024 19:41:09 +0200 Subject: [PATCH 05/10] fix function signatures --- packages/effect/src/TQueue.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/effect/src/TQueue.ts b/packages/effect/src/TQueue.ts index 83e0e9d7dc..e8b9b465fa 100644 --- a/packages/effect/src/TQueue.ts +++ b/packages/effect/src/TQueue.ts @@ -206,7 +206,7 @@ export const isTEnqueue: (u: unknown) => u is TEnqueue = internal.isTEn * @since 2.0.0 * @category mutations */ -export const awaitShutdown: (self: TQueue) => STM.STM = internal.awaitShutdown +export const awaitShutdown: (self: TDequeue | TEnqueue) => STM.STM = internal.awaitShutdown /** * Creates a bounded queue with the back pressure strategy. The queue will @@ -226,7 +226,7 @@ export const bounded: (requestedCapacity: number) => STM.STM> = int * @since 2.0.0 * @category getters */ -export const capacity: (self: TQueue) => number = internal.capacity +export const capacity: (self: TDequeue | TEnqueue) => number = internal.capacity /** * Creates a bounded queue with the dropping strategy. The queue will drop new @@ -245,7 +245,7 @@ export const dropping: (requestedCapacity: number) => STM.STM> = in * @since 2.0.0 * @category getters */ -export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpty +export const isEmpty: (self: TDequeue | TEnqueue) => STM.STM = internal.isEmpty /** * Returns `true` if the `TQueue` contains at least one element, `false` @@ -254,7 +254,7 @@ export const isEmpty: (self: TQueue) => STM.STM = internal.isEmpt * @since 2.0.0 * @category getters */ -export const isFull: (self: TQueue) => STM.STM = internal.isFull +export const isFull: (self: TDequeue | TEnqueue) => STM.STM = internal.isFull /** * Returns `true` if `shutdown` has been called, otherwise returns `false`. @@ -355,7 +355,7 @@ export const shutdown: (self: TDequeue | TEnqueue) => STM.STM = i * @since 2.0.0 * @category getters */ -export const size: (self: TQueue) => STM.STM = internal.size +export const size: (self: TDequeue | TEnqueue) => STM.STM = internal.size /** * Creates a bounded queue with the sliding strategy. The queue will add new From f75693468bdfba4781a123da89729f114d403d97 Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Mon, 7 Oct 2024 19:43:41 +0200 Subject: [PATCH 06/10] fix codegen --- packages/effect/src/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts index 2106158191..0185e7e77d 100644 --- a/packages/effect/src/index.ts +++ b/packages/effect/src/index.ts @@ -896,7 +896,7 @@ export * as TSemaphore from "./TSemaphore.js" export * as TSet from "./TSet.js" /** - * @since 2.0.0 + * @since 3.9.0 */ export * as TSubscriptionRef from "./TSubscriptionRef.js" From f96215413849b16c9a51571d963d875f5357aef0 Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Mon, 7 Oct 2024 19:44:32 +0200 Subject: [PATCH 07/10] fix version --- packages/effect/src/TSubscriptionRef.ts | 44 ++++++++++++------------- packages/effect/src/index.ts | 2 +- 2 files changed, 23 insertions(+), 23 deletions(-) diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts index 2087c2c4e1..a80458ad8b 100644 --- a/packages/effect/src/TSubscriptionRef.ts +++ b/packages/effect/src/TSubscriptionRef.ts @@ -1,5 +1,5 @@ /** - * @since 3.9.0 + * @since 3.10.0 */ import type * as Effect from "./Effect.js" import * as internal from "./internal/stm/tSubscriptionRef.js" @@ -13,13 +13,13 @@ import type * as TRef from "./TRef.js" import type * as Types from "./Types.js" /** - * @since 3.9.0 + * @since 3.10.0 * @category symbols */ export const TSubscriptionRefTypeId: unique symbol = internal.TSubscriptionRefTypeId /** - * @since 3.9.0 + * @since 3.10.0 * @category symbols */ export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId @@ -28,7 +28,7 @@ export type TSubscriptionRefTypeId = typeof TSubscriptionRefTypeId * A `TSubscriptionRef` is a `TRef` that can be subscribed to in order to * receive a `TDequeue` of the current value and all committed changes to the value. * - * @since 3.9.0 + * @since 3.10.0 * @category models */ export interface TSubscriptionRef extends TSubscriptionRef.Variance, TRef.TRef { @@ -47,11 +47,11 @@ export interface TSubscriptionRef extends TSubscriptionRef.Variance } /** - * @since 3.9.0 + * @since 3.10.0 */ export declare namespace TSubscriptionRef { /** - * @since 3.9.0 + * @since 3.10.0 * @category models */ export interface Variance { @@ -62,13 +62,13 @@ export declare namespace TSubscriptionRef { } /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const get: (self: TSubscriptionRef) => STM.STM = internal.get /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const getAndSet: { @@ -77,7 +77,7 @@ export const getAndSet: { } = internal.getAndSet /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const getAndUpdate: { @@ -86,7 +86,7 @@ export const getAndUpdate: { } = internal.getAndUpdate /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const getAndUpdateSome: { @@ -95,13 +95,13 @@ export const getAndUpdateSome: { } = internal.getAndUpdateSome /** - * @since 3.9.0 + * @since 3.10.0 * @category constructors */ export const make: (value: A) => STM.STM> = internal.make /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const modify: { @@ -110,7 +110,7 @@ export const modify: { } = internal.modify /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const modifySome: { @@ -119,7 +119,7 @@ export const modifySome: { } = internal.modifySome /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const set: { @@ -128,7 +128,7 @@ export const set: { } = internal.set /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const setAndGet: { @@ -137,7 +137,7 @@ export const setAndGet: { } = internal.setAndGet /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const update: { @@ -146,7 +146,7 @@ export const update: { } = internal.update /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const updateAndGet: { @@ -155,7 +155,7 @@ export const updateAndGet: { } = internal.updateAndGet /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const updateSome: { @@ -164,7 +164,7 @@ export const updateSome: { } = internal.updateSome /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const updateSomeAndGet: { @@ -173,20 +173,20 @@ export const updateSomeAndGet: { } = internal.updateSomeAndGet /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const changesScoped: (self: TSubscriptionRef) => Effect.Effect, never, Scope.Scope> = internal.changesScoped /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream /** - * @since 3.9.0 + * @since 3.10.0 * @category mutations */ export const changes: (self: TSubscriptionRef) => STM.STM> = (self) => self.changes diff --git a/packages/effect/src/index.ts b/packages/effect/src/index.ts index 0185e7e77d..a57bd513ed 100644 --- a/packages/effect/src/index.ts +++ b/packages/effect/src/index.ts @@ -896,7 +896,7 @@ export * as TSemaphore from "./TSemaphore.js" export * as TSet from "./TSet.js" /** - * @since 3.9.0 + * @since 3.10.0 */ export * as TSubscriptionRef from "./TSubscriptionRef.js" From 35e1a10b6e0d05bb36d4828e8cc020820adba879 Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 10 Oct 2024 11:30:04 +1300 Subject: [PATCH 08/10] use subscribeScoped for Stream integration --- .changeset/shiny-squids-sell.md | 5 +++ packages/effect/src/Stream.ts | 42 +++++++++++++++++++ packages/effect/src/TSubscriptionRef.ts | 2 +- .../src/internal/stm/tSubscriptionRef.ts | 22 +++++----- packages/effect/src/internal/stream.ts | 12 +++--- 5 files changed, 65 insertions(+), 18 deletions(-) create mode 100644 .changeset/shiny-squids-sell.md diff --git a/.changeset/shiny-squids-sell.md b/.changeset/shiny-squids-sell.md new file mode 100644 index 0000000000..0e68f8a95f --- /dev/null +++ b/.changeset/shiny-squids-sell.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add Stream.fromTQueue & Stream.fromTPubSub diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 0441ef8012..7ac2a86f8d 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -29,6 +29,8 @@ import type * as Sink from "./Sink.js" import type * as Emit from "./StreamEmit.js" import type * as HaltStrategy from "./StreamHaltStrategy.js" import type * as Take from "./Take.js" +import type { TPubSub } from "./TPubSub.js" +import type { TDequeue } from "./TQueue.js" import type * as Tracer from "./Tracer.js" import type { Covariant, NoInfer, TupleOf } from "./Types.js" import type * as Unify from "./Unify.js" @@ -2013,6 +2015,32 @@ export const fromPubSub: { ): Stream } = internal.fromPubSub +/** + * Creates a stream from a subscription to a `TPubSub`. + * + * @param shutdown If `true`, the `TPubSub` will be shutdown after the stream is evaluated (defaults to `false`) + * @since 3.10.0 + * @category constructors + */ +export const fromTPubSub: { + ( + pubsub: TPubSub, + options: { + readonly maxChunkSize?: number | undefined + readonly scoped: true + readonly shutdown?: boolean | undefined + } + ): Effect.Effect, never, Scope.Scope> + ( + pubsub: TPubSub, + options?: { + readonly maxChunkSize?: number | undefined + readonly scoped?: false | undefined + readonly shutdown?: boolean | undefined + } + ): Stream +} = internal.fromTPubSub + /** * Creates a new `Stream` from an iterable collection of values. * @@ -2094,6 +2122,20 @@ export const fromQueue: ( } ) => Stream = internal.fromQueue +/** + * Creates a stream from a TQueue of values + * + * @since 3.10.0 + * @category constructors + */ +export const fromTQueue: ( + queue: TDequeue, + options?: { + readonly maxChunkSize?: number | undefined + readonly shutdown?: boolean | undefined + } +) => Stream = internal.fromTQueue + /** * Creates a stream from a `ReadableStream`. * diff --git a/packages/effect/src/TSubscriptionRef.ts b/packages/effect/src/TSubscriptionRef.ts index a80458ad8b..dfc6ccb5ff 100644 --- a/packages/effect/src/TSubscriptionRef.ts +++ b/packages/effect/src/TSubscriptionRef.ts @@ -183,7 +183,7 @@ export const changesScoped: (self: TSubscriptionRef) => Effect.Effect(self: TSubscriptionRef) => Stream.Stream = internal.changesStream +export const changesStream: (self: TSubscriptionRef) => Stream.Stream = internal.changesStream /** * @since 3.10.0 diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts index 0b114cf397..e2f88696ca 100644 --- a/packages/effect/src/internal/stm/tSubscriptionRef.ts +++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts @@ -32,7 +32,7 @@ class TDequeueMerge implements TQueue.TDequeue { readonly second: TQueue.TDequeue ) {} - peek: STM.STM = STM.gen(this, function*() { + peek: STM.STM = STM.gen(this, function*() { const first = yield* this.peekOption if (first._tag === "Some") { return first.value @@ -40,7 +40,7 @@ class TDequeueMerge implements TQueue.TDequeue { return yield* STM.retry }) - peekOption: STM.STM, never, never> = STM.gen(this, function*() { + peekOption: STM.STM> = STM.gen(this, function*() { const first = yield* this.first.peekOption if (first._tag === "Some") { return first @@ -52,7 +52,7 @@ class TDequeueMerge implements TQueue.TDequeue { return Option.none() }) - take: STM.STM = STM.gen(this, function*() { + take: STM.STM = STM.gen(this, function*() { if (!(yield* this.first.isEmpty)) { return yield* this.first.take } @@ -62,11 +62,11 @@ class TDequeueMerge implements TQueue.TDequeue { return yield* STM.retry }) - takeAll: STM.STM, never, never> = STM.gen(this, function*() { + takeAll: STM.STM> = STM.gen(this, function*() { return [...yield* this.first.takeAll, ...yield* this.second.takeAll] }) - takeUpTo(max: number): STM.STM, never, never> { + takeUpTo(max: number): STM.STM> { return STM.gen(this, function*() { const first = yield* this.first.takeUpTo(max) if (first.length >= max) { @@ -80,28 +80,28 @@ class TDequeueMerge implements TQueue.TDequeue { return this.first.capacity() + this.second.capacity() } - size: STM.STM = STM.gen(this, function*() { + size: STM.STM = STM.gen(this, function*() { return (yield* this.first.size) + (yield* this.second.size) }) - isFull: STM.STM = STM.gen(this, function*() { + isFull: STM.STM = STM.gen(this, function*() { return (yield* this.first.isFull) && (yield* this.second.isFull) }) - isEmpty: STM.STM = STM.gen(this, function*() { + isEmpty: STM.STM = STM.gen(this, function*() { return (yield* this.first.isEmpty) && (yield* this.second.isEmpty) }) - shutdown: STM.STM = STM.gen(this, function*() { + shutdown: STM.STM = STM.gen(this, function*() { yield* this.first.shutdown yield* this.second.shutdown }) - isShutdown: STM.STM = STM.gen(this, function*() { + isShutdown: STM.STM = STM.gen(this, function*() { return (yield* this.first.isShutdown) && (yield* this.second.isShutdown) }) - awaitShutdown: STM.STM = STM.gen(this, function*() { + awaitShutdown: STM.STM = STM.gen(this, function*() { yield* this.first.awaitShutdown yield* this.second.awaitShutdown }) diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index b276c46897..1ffffec0eb 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -3138,13 +3138,13 @@ export const fromPubSub: { /** @internal */ export const fromTPubSub: { (pubsub: TPubSub.TPubSub, options: { - readonly scoped: true readonly maxChunkSize?: number | undefined + readonly scoped: true readonly shutdown?: boolean | undefined }): Effect.Effect, never, Scope.Scope> (pubsub: TPubSub.TPubSub, options?: { - readonly scoped?: false | undefined readonly maxChunkSize?: number | undefined + readonly scoped?: false | undefined readonly shutdown?: boolean | undefined }): Stream.Stream } = (pubsub, options): any => { @@ -3152,15 +3152,15 @@ export const fromTPubSub: { if (options?.scoped) { const effect = Effect.map( - TPubSub.subscribe(pubsub), + TPubSub.subscribeScoped(pubsub), (queue) => fromTQueue(queue, { maxChunkSize, shutdown: true }) ) return options.shutdown ? Effect.map(effect, ensuring(TPubSub.shutdown(pubsub))) : effect } const stream = flatMap( - scoped(TPubSub.subscribe(pubsub)), - (queue) => fromTQueue(queue, { maxChunkSize }) + TPubSub.subscribe(pubsub), + (queue) => fromTQueue(queue, { maxChunkSize, shutdown: true }) ) return options?.shutdown ? ensuring(stream, TPubSub.shutdown(pubsub)) : stream } @@ -3266,7 +3266,7 @@ export const fromTQueue = ( ): Stream.Stream => pipe( TQueue.takeBetween(queue, 1, options?.maxChunkSize ?? DefaultChunkSize), - Effect.map(Chunk.fromIterable), + Effect.map(Chunk.unsafeFromArray), Effect.catchAllCause((cause) => pipe( TQueue.isShutdown(queue), From 02ab365a9bff88bb69b3475b0fa4c325ab1b1efe Mon Sep 17 00:00:00 2001 From: Tim Date: Thu, 10 Oct 2024 11:54:23 +1300 Subject: [PATCH 09/10] only take one item at a time --- packages/effect/src/Stream.ts | 7 +------ packages/effect/src/internal/stream.ts | 14 ++++---------- 2 files changed, 5 insertions(+), 16 deletions(-) diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 7ac2a86f8d..ae450ebdbe 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2026,7 +2026,6 @@ export const fromTPubSub: { ( pubsub: TPubSub, options: { - readonly maxChunkSize?: number | undefined readonly scoped: true readonly shutdown?: boolean | undefined } @@ -2034,7 +2033,6 @@ export const fromTPubSub: { ( pubsub: TPubSub, options?: { - readonly maxChunkSize?: number | undefined readonly scoped?: false | undefined readonly shutdown?: boolean | undefined } @@ -2130,10 +2128,7 @@ export const fromQueue: ( */ export const fromTQueue: ( queue: TDequeue, - options?: { - readonly maxChunkSize?: number | undefined - readonly shutdown?: boolean | undefined - } + options?: { readonly shutdown?: boolean | undefined } ) => Stream = internal.fromTQueue /** diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 1ffffec0eb..7ae6d33ef1 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -3138,29 +3138,24 @@ export const fromPubSub: { /** @internal */ export const fromTPubSub: { (pubsub: TPubSub.TPubSub, options: { - readonly maxChunkSize?: number | undefined readonly scoped: true readonly shutdown?: boolean | undefined }): Effect.Effect, never, Scope.Scope> (pubsub: TPubSub.TPubSub, options?: { - readonly maxChunkSize?: number | undefined readonly scoped?: false | undefined readonly shutdown?: boolean | undefined }): Stream.Stream } = (pubsub, options): any => { - const maxChunkSize = options?.maxChunkSize ?? DefaultChunkSize - if (options?.scoped) { const effect = Effect.map( TPubSub.subscribeScoped(pubsub), - (queue) => fromTQueue(queue, { maxChunkSize, shutdown: true }) + (queue) => fromTQueue(queue, { shutdown: true }) ) - return options.shutdown ? Effect.map(effect, ensuring(TPubSub.shutdown(pubsub))) : effect } const stream = flatMap( TPubSub.subscribe(pubsub), - (queue) => fromTQueue(queue, { maxChunkSize, shutdown: true }) + (queue) => fromTQueue(queue, { shutdown: true }) ) return options?.shutdown ? ensuring(stream, TPubSub.shutdown(pubsub)) : stream } @@ -3260,13 +3255,12 @@ export const fromQueue = ( export const fromTQueue = ( queue: TQueue.TDequeue, options?: { - readonly maxChunkSize?: number | undefined readonly shutdown?: boolean | undefined } ): Stream.Stream => pipe( - TQueue.takeBetween(queue, 1, options?.maxChunkSize ?? DefaultChunkSize), - Effect.map(Chunk.unsafeFromArray), + TQueue.take(queue), + Effect.map(Chunk.of), Effect.catchAllCause((cause) => pipe( TQueue.isShutdown(queue), From 55dcb28e25e473eb172d20d43b817f4c0aa7d7f4 Mon Sep 17 00:00:00 2001 From: Michael Arnaldi Date: Thu, 10 Oct 2024 14:08:34 +0200 Subject: [PATCH 10/10] fix TQueue and TPubSub stream constructors --- packages/effect/src/Stream.ts | 23 +----------- .../src/internal/stm/tSubscriptionRef.ts | 2 +- packages/effect/src/internal/stream.ts | 37 ++++--------------- 3 files changed, 10 insertions(+), 52 deletions(-) diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index ae450ebdbe..75bbfb5e70 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2018,26 +2018,10 @@ export const fromPubSub: { /** * Creates a stream from a subscription to a `TPubSub`. * - * @param shutdown If `true`, the `TPubSub` will be shutdown after the stream is evaluated (defaults to `false`) * @since 3.10.0 * @category constructors */ -export const fromTPubSub: { - ( - pubsub: TPubSub, - options: { - readonly scoped: true - readonly shutdown?: boolean | undefined - } - ): Effect.Effect, never, Scope.Scope> - ( - pubsub: TPubSub, - options?: { - readonly scoped?: false | undefined - readonly shutdown?: boolean | undefined - } - ): Stream -} = internal.fromTPubSub +export const fromTPubSub: (pubsub: TPubSub) => Stream = internal.fromTPubSub /** * Creates a new `Stream` from an iterable collection of values. @@ -2126,10 +2110,7 @@ export const fromQueue: ( * @since 3.10.0 * @category constructors */ -export const fromTQueue: ( - queue: TDequeue, - options?: { readonly shutdown?: boolean | undefined } -) => Stream = internal.fromTQueue +export const fromTQueue: (queue: TDequeue) => Stream = internal.fromTQueue /** * Creates a stream from a `ReadableStream`. diff --git a/packages/effect/src/internal/stm/tSubscriptionRef.ts b/packages/effect/src/internal/stm/tSubscriptionRef.ts index e2f88696ca..94a4924055 100644 --- a/packages/effect/src/internal/stm/tSubscriptionRef.ts +++ b/packages/effect/src/internal/stm/tSubscriptionRef.ts @@ -283,4 +283,4 @@ export const changesScoped = (self: TSubscriptionRef.TSubscriptionRef) => /** @internal */ export const changesStream = (self: TSubscriptionRef.TSubscriptionRef) => - stream.unwrap(Effect.map(self.changes, (queue) => stream.fromTQueue(queue, { shutdown: true }))) + stream.unwrap(Effect.map(self.changes, stream.fromTQueue)) diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 7ae6d33ef1..3a3ca4402d 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -3136,28 +3136,11 @@ export const fromPubSub: { } /** @internal */ -export const fromTPubSub: { - (pubsub: TPubSub.TPubSub, options: { - readonly scoped: true - readonly shutdown?: boolean | undefined - }): Effect.Effect, never, Scope.Scope> - (pubsub: TPubSub.TPubSub, options?: { - readonly scoped?: false | undefined - readonly shutdown?: boolean | undefined - }): Stream.Stream -} = (pubsub, options): any => { - if (options?.scoped) { - const effect = Effect.map( - TPubSub.subscribeScoped(pubsub), - (queue) => fromTQueue(queue, { shutdown: true }) - ) - return options.shutdown ? Effect.map(effect, ensuring(TPubSub.shutdown(pubsub))) : effect - } - const stream = flatMap( - TPubSub.subscribe(pubsub), - (queue) => fromTQueue(queue, { shutdown: true }) - ) - return options?.shutdown ? ensuring(stream, TPubSub.shutdown(pubsub)) : stream +export const fromTPubSub = (pubsub: TPubSub.TPubSub): Stream.Stream => { + return unwrapScoped(Effect.map( + TPubSub.subscribeScoped(pubsub), + (queue) => fromTQueue(queue) + )) } /** @internal */ @@ -3252,12 +3235,7 @@ export const fromQueue = ( ) /** @internal */ -export const fromTQueue = ( - queue: TQueue.TDequeue, - options?: { - readonly shutdown?: boolean | undefined - } -): Stream.Stream => +export const fromTQueue = (queue: TQueue.TDequeue): Stream.Stream => pipe( TQueue.take(queue), Effect.map(Chunk.of), @@ -3271,8 +3249,7 @@ export const fromTQueue = ( ) ) ), - repeatEffectChunkOption, - options?.shutdown ? ensuring(TQueue.shutdown(queue)) : identity + repeatEffectChunkOption ) /** @internal */