diff --git a/.changeset/stream-on-end.md b/.changeset/stream-on-end.md new file mode 100644 index 00000000000..1e9964d4514 --- /dev/null +++ b/.changeset/stream-on-end.md @@ -0,0 +1,22 @@ +--- +"effect": minor +--- + +Implement `Stream.onEnd` that adds an effect to be executed at the end of the stream. + +```ts +import { Console, Effect, Stream } from "effect"; + +const stream = Stream.make(1, 2, 3).pipe( + Stream.map((n) => n * 2), + Stream.tap((n) => Console.log(`after mapping: ${n}`)), + Stream.onEnd(Console.log("Stream ended")) +) + +Effect.runPromise(Stream.runCollect(stream)).then(console.log) +// after mapping: 2 +// after mapping: 4 +// after mapping: 6 +// Stream ended +// { _id: 'Chunk', values: [ 2, 4, 6 ] } +``` \ No newline at end of file diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index ffdc06299a9..ae1a66b8c2b 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -2927,6 +2927,38 @@ export const mkString: (self: Stream) => Effect.Effect = internal.never +/** + * Adds an effect to be executed at the end of the stream. + * + * @example + * import { Console, Effect, Stream } from "effect" + * + * const stream = Stream.make(1, 2, 3).pipe( + * Stream.map((n) => n * 2), + * Stream.tap((n) => Console.log(`after mapping: ${n}`)), + * Stream.onEnd(Console.log("Stream ended")) + * ) + * + * Effect.runPromise(Stream.runCollect(stream)).then(console.log) + * // after mapping: 2 + * // after mapping: 4 + * // after mapping: 6 + * // Stream ended + * // { _id: 'Chunk', values: [ 2, 4, 6 ] } + * + * @since 3.6.0 + * @category sequencing + */ +export const onEnd: { + <_, E2, R2>( + effect: Effect.Effect<_, E2, R2> + ): (self: Stream) => Stream + ( + self: Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream +} = internal.onEnd + /** * Runs the specified effect if this stream fails, providing the error to the * effect if it exists. diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index f3adf2013f3..fe02cea53f8 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -4162,6 +4162,23 @@ export const mkString = (self: Stream.Stream): Effect.Effect /** @internal */ export const never: Stream.Stream = fromEffect(Effect.never) +/** @internal */ +export const onEnd: { + <_, E2, R2>( + effect: Effect.Effect<_, E2, R2> + ): (self: Stream.Stream) => Stream.Stream + ( + self: Stream.Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream.Stream +} = dual( + 2, + ( + self: Stream.Stream, + effect: Effect.Effect<_, E2, R2> + ): Stream.Stream => concat(self, drain(fromEffect(effect))) +) + /** @internal */ export const onError = dual< ( diff --git a/packages/effect/test/Stream/lifecycle.test.ts b/packages/effect/test/Stream/lifecycle.test.ts index 59826f0ab7d..9685d5abca9 100644 --- a/packages/effect/test/Stream/lifecycle.test.ts +++ b/packages/effect/test/Stream/lifecycle.test.ts @@ -15,4 +15,16 @@ describe("Stream", () => { assert.strictEqual(counter, 1) assert.deepStrictEqual(Array.from(result), [1, 1]) })) + + it.effect("onEnd", () => + Effect.gen(function*($) { + let counter = 0 + const result = yield* $( + Stream.make(1, 2, 3), + Stream.onEnd(Effect.sync(() => counter++)), + Stream.runCollect + ) + assert.strictEqual(counter, 1) + assert.deepStrictEqual(Array.from(result), [1, 2, 3]) + })) })