diff --git a/.changeset/forty-beers-refuse.md b/.changeset/forty-beers-refuse.md new file mode 100644 index 00000000000..17c20ae72c5 --- /dev/null +++ b/.changeset/forty-beers-refuse.md @@ -0,0 +1,35 @@ +--- +"effect": minor +--- + +add Stream.asyncPush api + +This api creates a stream from an external push-based resource. + +You can use the `emit` helper to emit values to the stream. You can also use +the `emit` helper to signal the end of the stream by using apis such as +`emit.end` or `emit.fail`. + +By default it uses an "unbounded" buffer size. +You can customize the buffer size and strategy by passing an object as the +second argument with the `bufferSize` and `strategy` fields. + +```ts +import { Effect, Stream } from "effect"; + +Stream.asyncPush( + (emit) => + Effect.acquireRelease( + Effect.gen(function* () { + yield* Effect.log("subscribing"); + return setInterval(() => emit.single("tick"), 1000); + }), + (handle) => + Effect.gen(function* () { + yield* Effect.log("unsubscribing"); + clearInterval(handle); + }), + ), + { bufferSize: 16, strategy: "dropping" }, +); +``` diff --git a/.changeset/tricky-cheetahs-help.md b/.changeset/tricky-cheetahs-help.md new file mode 100644 index 00000000000..4667c66beae --- /dev/null +++ b/.changeset/tricky-cheetahs-help.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +add `bufferSize` option to Stream.fromEventListener diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 4f148dc6f80..ffdc06299a9 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -373,6 +373,46 @@ export const asyncEffect: ( } | undefined ) => Stream = internal.asyncEffect +/** + * Creates a stream from an external push-based resource. + * + * You can use the `emit` helper to emit values to the stream. The `emit` helper + * returns a boolean indicating whether the value was emitted or not. + * + * You can also use the `emit` helper to signal the end of the stream by + * using apis such as `emit.end` or `emit.fail`. + * + * By default it uses an "unbounded" buffer size. + * You can customize the buffer size and strategy by passing an object as the + * second argument with the `bufferSize` and `strategy` fields. + * + * @example + * import { Effect, Stream } from "effect" + * + * Stream.asyncPush((emit) => + * Effect.acquireRelease( + * Effect.gen(function*() { + * yield* Effect.log("subscribing") + * return setInterval(() => emit.single("tick"), 1000) + * }), + * (handle) => + * Effect.gen(function*() { + * yield* Effect.log("unsubscribing") + * clearInterval(handle) + * }) + * ), { bufferSize: 16, strategy: "dropping" }) + * + * @since 3.6.0 + * @category constructors + */ +export const asyncPush: ( + register: (emit: Emit.EmitOpsPush) => Effect.Effect, + options?: { readonly bufferSize: "unbounded" } | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | undefined + } | undefined +) => Stream> = internal.asyncPush + /** * Creates a stream from an asynchronous callback that can be called multiple * times. The registration of the callback itself returns an a scoped @@ -5955,5 +5995,6 @@ export const fromEventListener: ( readonly capture?: boolean readonly passive?: boolean readonly once?: boolean + readonly bufferSize?: number | "unbounded" | undefined } | undefined ) => Stream = internal.fromEventListener diff --git a/packages/effect/src/StreamEmit.ts b/packages/effect/src/StreamEmit.ts index 4c6a1884c55..fdb935d1e44 100644 --- a/packages/effect/src/StreamEmit.ts +++ b/packages/effect/src/StreamEmit.ts @@ -81,3 +81,56 @@ export interface EmitOps { */ single(value: A): Promise } + +/** + * @since 3.6.0 + * @category models + */ +export interface EmitOpsPush { + /** + * Emits a chunk containing the specified values. + */ + chunk(chunk: Chunk.Chunk): boolean + + /** + * Emits a chunk containing the specified values. + */ + array(chunk: ReadonlyArray): boolean + + /** + * Terminates with a cause that dies with the specified defect. + */ + die(defect: Err): void + + /** + * Terminates with a cause that dies with a `Throwable` with the specified + * message. + */ + dieMessage(message: string): void + + /** + * Either emits the specified value if this `Exit` is a `Success` or else + * terminates with the specified cause if this `Exit` is a `Failure`. + */ + done(exit: Exit.Exit): void + + /** + * Terminates with an end of stream signal. + */ + end(): void + + /** + * Terminates with the specified error. + */ + fail(error: E): void + + /** + * Terminates the stream with the specified cause. + */ + halt(cause: Cause.Cause): void + + /** + * Emits a chunk containing the specified value. + */ + single(value: A): boolean +} diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index 0fa592eb753..f3adf2013f3 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -10,6 +10,7 @@ import * as Either from "../Either.js" import * as Equal from "../Equal.js" import * as Exit from "../Exit.js" import * as Fiber from "../Fiber.js" +import * as FiberRef from "../FiberRef.js" import type { LazyArg } from "../Function.js" import { constTrue, dual, identity, pipe } from "../Function.js" import * as Layer from "../Layer.js" @@ -597,6 +598,51 @@ export const asyncEffect = ( fromChannel ) +const queueFromBufferOptionsPush = ( + options?: { readonly bufferSize: "unbounded" } | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | undefined + } | undefined +): Effect.Effect | Exit.Exit>> => { + if (options?.bufferSize === "unbounded" || (options?.bufferSize === undefined && options?.strategy === undefined)) { + return Queue.unbounded() + } + switch (options?.strategy) { + case "sliding": + return Queue.sliding(options.bufferSize ?? 16) + default: + return Queue.dropping(options?.bufferSize ?? 16) + } +} + +/** @internal */ +export const asyncPush = ( + register: (emit: Emit.EmitOpsPush) => Effect.Effect, + options?: { + readonly bufferSize: "unbounded" + } | { + readonly bufferSize?: number | undefined + readonly strategy?: "dropping" | "sliding" | undefined + } | undefined +): Stream.Stream> => + Effect.acquireRelease( + queueFromBufferOptionsPush(options), + Queue.shutdown + ).pipe( + Effect.tap((queue) => + FiberRef.getWith(FiberRef.currentScheduler, (scheduler) => register(emit.makePush(queue, scheduler))) + ), + Effect.map((queue) => { + const loop: Channel.Channel, unknown, E> = core.flatMap(Queue.take(queue), (item) => + Exit.isExit(item) + ? Exit.isSuccess(item) ? core.void : core.failCause(item.cause) + : channel.zipRight(core.write(Chunk.unsafeFromArray(item)), loop)) + return loop + }), + channel.unwrapScoped, + fromChannel + ) + /** @internal */ export const asyncScoped = ( register: (emit: Emit.Emit) => Effect.Effect, @@ -8341,23 +8387,11 @@ export const fromEventListener = ( readonly capture?: boolean readonly passive?: boolean readonly once?: boolean + readonly bufferSize?: number | "unbounded" | undefined } | undefined ): Stream.Stream => - _async((emit) => { - let batch: Array = [] - let taskRunning = false - function cb(e: A) { - batch.push(e) - if (!taskRunning) { - taskRunning = true - queueMicrotask(() => { - const events = batch - batch = [] - taskRunning = false - emit.chunk(Chunk.unsafeFromArray(events)) - }) - } - } - target.addEventListener(type, cb as any, options) - return Effect.sync(() => target.removeEventListener(type, cb, options)) - }, "unbounded") + asyncPush((emit) => + Effect.acquireRelease( + Effect.sync(() => target.addEventListener(type, emit.single as any, options)), + () => Effect.sync(() => target.removeEventListener(type, emit.single, options)) + ), { bufferSize: typeof options === "object" ? options.bufferSize : undefined }) diff --git a/packages/effect/src/internal/stream/emit.ts b/packages/effect/src/internal/stream/emit.ts index 613e7d49b16..7dd1623d430 100644 --- a/packages/effect/src/internal/stream/emit.ts +++ b/packages/effect/src/internal/stream/emit.ts @@ -4,6 +4,8 @@ import * as Effect from "../../Effect.js" import * as Exit from "../../Exit.js" import { pipe } from "../../Function.js" import * as Option from "../../Option.js" +import type * as Queue from "../../Queue.js" +import type * as Scheduler from "../../Scheduler.js" import type * as Emit from "../../StreamEmit.js" /** @internal */ @@ -44,3 +46,78 @@ export const make = ( } return Object.assign(emit, ops) } + +/** @internal */ +export const makePush = ( + queue: Queue.Queue | Exit.Exit>, + scheduler: Scheduler.Scheduler +): Emit.EmitOpsPush => { + let finished = false + let buffer: Array = [] + let running = false + function array(items: ReadonlyArray) { + if (finished) return false + if (items.length <= 50_000) { + buffer.push.apply(buffer, items as Array) + } else { + for (let i = 0; i < items.length; i++) { + buffer.push(items[0]) + } + } + if (!running) { + running = true + scheduler.scheduleTask(flush, 0) + } + return true + } + function flush() { + running = false + if (buffer.length > 0) { + queue.unsafeOffer(buffer) + buffer = [] + } + } + function done(exit: Exit.Exit) { + if (finished) return + finished = true + if (exit._tag === "Success") { + buffer.push(exit.value) + } + flush() + queue.unsafeOffer(exit._tag === "Success" ? Exit.void : exit) + } + return { + single(value: A) { + if (finished) return false + buffer.push(value) + if (!running) { + running = true + scheduler.scheduleTask(flush, 0) + } + return true + }, + array, + chunk(chunk) { + return array(Chunk.toReadonlyArray(chunk)) + }, + done, + end() { + if (finished) return + finished = true + flush() + queue.unsafeOffer(Exit.void) + }, + halt(cause: Cause.Cause) { + return done(Exit.failCause(cause)) + }, + fail(error: E) { + return done(Exit.fail(error)) + }, + die(defect: Err): void { + return done(Exit.die(defect)) + }, + dieMessage(message: string): void { + return done(Exit.die(new Error(message))) + } + } +} diff --git a/packages/effect/test/Stream/async.test.ts b/packages/effect/test/Stream/async.test.ts index 3d6e5efa2fa..7b7e979f6d0 100644 --- a/packages/effect/test/Stream/async.test.ts +++ b/packages/effect/test/Stream/async.test.ts @@ -406,4 +406,60 @@ describe("Stream", () => { yield* $(Fiber.interrupt(fiber), Effect.exit) assert.isFalse(result) })) + + it.effect("asyncPush", () => + Effect.gen(function*() { + const array = [1, 2, 3, 4, 5] + const latch = yield* Deferred.make() + const fiber = yield* Stream.asyncPush((emit) => { + array.forEach((n) => { + emit.single(n) + }) + return pipe( + Deferred.succeed(latch, void 0), + Effect.asVoid + ) + }).pipe( + Stream.take(array.length), + Stream.run(Sink.collectAll()), + Effect.fork + ) + yield* Deferred.await(latch) + const result = yield* Fiber.join(fiber) + assert.deepStrictEqual(Array.from(result), array) + })) + + it.effect("asyncPush - signals the end of the stream", () => + Effect.gen(function*() { + const result = yield* Stream.asyncPush((emit) => { + emit.end() + return Effect.void + }).pipe(Stream.runCollect) + assert.isTrue(Chunk.isEmpty(result)) + })) + + it.effect("asyncPush - handles errors", () => + Effect.gen(function*() { + const error = new Cause.RuntimeException("boom") + const result = yield* Stream.asyncPush((emit) => { + emit.fail(error) + return Effect.void + }).pipe( + Stream.runCollect, + Effect.exit + ) + assert.deepStrictEqual(result, Exit.fail(error)) + })) + + it.effect("asyncPush - handles defects", () => + Effect.gen(function*() { + const error = new Cause.RuntimeException("boom") + const result = yield* Stream.asyncPush(() => { + throw error + }).pipe( + Stream.runCollect, + Effect.exit + ) + assert.deepStrictEqual(result, Exit.die(error)) + })) }) diff --git a/packages/platform-browser/src/BrowserStream.ts b/packages/platform-browser/src/BrowserStream.ts index 345372b7bae..50c22d45ec3 100644 --- a/packages/platform-browser/src/BrowserStream.ts +++ b/packages/platform-browser/src/BrowserStream.ts @@ -11,8 +11,13 @@ import * as internal from "./internal/stream.js" */ export const fromEventListenerWindow: ( type: K, - options?: boolean | Omit -) => Stream.Stream = internal.fromEventListenerWindow + options?: boolean | { + readonly capture?: boolean + readonly passive?: boolean + readonly once?: boolean + readonly bufferSize?: number | "unbounded" | undefined + } | undefined +) => Stream.Stream = internal.fromEventListenerWindow /** * Creates a `Stream` from document.addEventListener. @@ -20,5 +25,10 @@ export const fromEventListenerWindow: ( */ export const fromEventListenerDocument: ( type: K, - options?: boolean | Omit -) => Stream.Stream = internal.fromEventListenerDocument + options?: boolean | { + readonly capture?: boolean + readonly passive?: boolean + readonly once?: boolean + readonly bufferSize?: number | "unbounded" | undefined + } | undefined +) => Stream.Stream = internal.fromEventListenerDocument diff --git a/packages/platform-browser/src/internal/stream.ts b/packages/platform-browser/src/internal/stream.ts index a37cdca36c5..b38e5901353 100644 --- a/packages/platform-browser/src/internal/stream.ts +++ b/packages/platform-browser/src/internal/stream.ts @@ -7,11 +7,21 @@ import * as Stream from "effect/Stream" /** @internal */ export const fromEventListenerWindow = ( type: K, - options?: boolean | Omit + options?: boolean | { + readonly capture?: boolean + readonly passive?: boolean + readonly once?: boolean + readonly bufferSize?: number | "unbounded" | undefined + } | undefined ) => Stream.fromEventListener(window, type, options) /** @internal */ export const fromEventListenerDocument = ( type: K, - options?: boolean | Omit + options?: boolean | { + readonly capture?: boolean + readonly passive?: boolean + readonly once?: boolean + readonly bufferSize?: number | "unbounded" | undefined + } | undefined ) => Stream.fromEventListener(document, type, options)