From 9f3bc90ed1f2eeb97979b393552f6bb3593cf048 Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 10 Jul 2024 22:56:02 +1200 Subject: [PATCH 1/2] use "dropping" strategy by default for Stream.async apis --- .changeset/chilled-ducks-sniff.md | 24 +++++++++++++++ .changeset/cool-birds-exercise.md | 5 ++++ packages/effect/src/Stream.ts | 6 ++-- packages/effect/src/internal/stream.ts | 36 +++++++++++++---------- packages/effect/test/Stream/async.test.ts | 6 ++-- 5 files changed, 56 insertions(+), 21 deletions(-) create mode 100644 .changeset/chilled-ducks-sniff.md create mode 100644 .changeset/cool-birds-exercise.md diff --git a/.changeset/chilled-ducks-sniff.md b/.changeset/chilled-ducks-sniff.md new file mode 100644 index 0000000000..2f65736f7e --- /dev/null +++ b/.changeset/chilled-ducks-sniff.md @@ -0,0 +1,24 @@ +--- +"effect": major +--- + +Use object options for Stream.async apis + +Instead of: + +```ts +Stream.async((emit) => { + //... +}, 16); +``` + +You can now write: + +```ts +Stream.async( + (emit) => { + //... + }, + { bufferSize: 16 }, +); +``` diff --git a/.changeset/cool-birds-exercise.md b/.changeset/cool-birds-exercise.md new file mode 100644 index 0000000000..c78f3bec89 --- /dev/null +++ b/.changeset/cool-birds-exercise.md @@ -0,0 +1,5 @@ +--- +"effect": major +--- + +use "dropping" strategy by default for Stream.async apis diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index ae1a66b8c2..814ee7d35e 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -312,7 +312,7 @@ export const as: { const _async: ( register: (emit: Emit.Emit) => Effect.Effect | void, - bufferSize?: number | "unbounded" | { + options?: { readonly bufferSize: "unbounded" } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined @@ -367,7 +367,7 @@ export { */ export const asyncEffect: ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { readonly bufferSize: "unbounded" } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined @@ -424,7 +424,7 @@ export const asyncPush: ( */ export const asyncScoped: ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { readonly bufferSize: "unbounded" } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index fe02cea53f..a5790647e7 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -463,23 +463,23 @@ export const as = dual< >(2, (self: Stream.Stream, value: B): Stream.Stream => map(self, () => value)) const queueFromBufferOptions = ( - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Effect.Effect>> => { - if (bufferSize === "unbounded") { + if (options?.bufferSize === "unbounded") { return Queue.unbounded() - } else if (typeof bufferSize === "number" || bufferSize === undefined) { - return Queue.bounded(bufferSize ?? 16) } - switch (bufferSize.strategy) { - case "dropping": - return Queue.dropping(bufferSize.bufferSize ?? 16) + switch (options?.strategy) { case "sliding": - return Queue.sliding(bufferSize.bufferSize ?? 16) + return Queue.sliding(options.bufferSize ?? 16) + case "suspend": + return Queue.bounded(options.bufferSize ?? 16) default: - return Queue.bounded(bufferSize.bufferSize ?? 16) + return Queue.dropping(options?.bufferSize ?? 16) } } @@ -488,13 +488,15 @@ export const _async = ( register: ( emit: Emit.Emit ) => Effect.Effect | void, - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Stream.Stream => Effect.acquireRelease( - queueFromBufferOptions(bufferSize), + queueFromBufferOptions(options), (queue) => Queue.shutdown(queue) ).pipe( Effect.flatMap((output) => @@ -543,14 +545,16 @@ export const _async = ( /** @internal */ export const asyncEffect = ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Stream.Stream => pipe( Effect.acquireRelease( - queueFromBufferOptions(bufferSize), + queueFromBufferOptions(options), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => @@ -646,14 +650,16 @@ export const asyncPush = ( /** @internal */ export const asyncScoped = ( register: (emit: Emit.Emit) => Effect.Effect, - bufferSize?: number | "unbounded" | { + options?: { + readonly bufferSize: "unbounded" + } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | "suspend" | undefined } | undefined ): Stream.Stream> => pipe( Effect.acquireRelease( - queueFromBufferOptions(bufferSize), + queueFromBufferOptions(options), (queue) => Queue.shutdown(queue) ), Effect.flatMap((output) => diff --git a/packages/effect/test/Stream/async.test.ts b/packages/effect/test/Stream/async.test.ts index 7b7e979f6d..3a4a100f6b 100644 --- a/packages/effect/test/Stream/async.test.ts +++ b/packages/effect/test/Stream/async.test.ts @@ -110,7 +110,7 @@ describe("Stream", () => { ) ) return Effect.void - }, 5) + }, { bufferSize: 5 }) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) @@ -205,7 +205,7 @@ describe("Stream", () => { ) ) return Effect.void - }, 5) + }, { bufferSize: 5 }) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) @@ -398,7 +398,7 @@ describe("Stream", () => { ) ) return Effect.void - }, 5) + }, { bufferSize: 5 }) const sink = pipe(Sink.take(1), Sink.zipRight(Sink.never)) const fiber = yield* $(stream, Stream.run(sink), Effect.fork) yield* $(Ref.get(refCount), Effect.repeat({ while: (n) => n !== 7 })) From cb0720c3ff847ac0f89f6d2a191aed04541cbbe6 Mon Sep 17 00:00:00 2001 From: Tim Date: Wed, 17 Jul 2024 15:46:49 +1200 Subject: [PATCH 2/2] bounded by default --- .changeset/cool-birds-exercise.md | 5 ----- packages/effect/src/internal/stream.ts | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) delete mode 100644 .changeset/cool-birds-exercise.md diff --git a/.changeset/cool-birds-exercise.md b/.changeset/cool-birds-exercise.md deleted file mode 100644 index c78f3bec89..0000000000 --- a/.changeset/cool-birds-exercise.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"effect": major ---- - -use "dropping" strategy by default for Stream.async apis diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index a5790647e7..c871d34661 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -474,12 +474,12 @@ const queueFromBufferOptions = ( return Queue.unbounded() } switch (options?.strategy) { + case "dropping": + return Queue.dropping(options.bufferSize ?? 16) case "sliding": return Queue.sliding(options.bufferSize ?? 16) - case "suspend": - return Queue.bounded(options.bufferSize ?? 16) default: - return Queue.dropping(options?.bufferSize ?? 16) + return Queue.bounded(options?.bufferSize ?? 16) } }