Skip to content

Commit

Permalink
add Stream.asyncPush api (#3277)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored and gcanti committed Jul 29, 2024
1 parent efb3960 commit a8ff9fa
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 24 deletions.
35 changes: 35 additions & 0 deletions .changeset/forty-beers-refuse.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
---
"effect": minor
---

add Stream.asyncPush api

This api creates a stream from an external push-based resource.

You can use the `emit` helper to emit values to the stream. You can also use
the `emit` helper to signal the end of the stream by using apis such as
`emit.end` or `emit.fail`.

By default it uses an "unbounded" buffer size.
You can customize the buffer size and strategy by passing an object as the
second argument with the `bufferSize` and `strategy` fields.

```ts
import { Effect, Stream } from "effect";

Stream.asyncPush<string>(
(emit) =>
Effect.acquireRelease(
Effect.gen(function* () {
yield* Effect.log("subscribing");
return setInterval(() => emit.single("tick"), 1000);
}),
(handle) =>
Effect.gen(function* () {
yield* Effect.log("unsubscribing");
clearInterval(handle);
}),
),
{ bufferSize: 16, strategy: "dropping" },
);
```
5 changes: 5 additions & 0 deletions .changeset/tricky-cheetahs-help.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

add `bufferSize` option to Stream.fromEventListener
41 changes: 41 additions & 0 deletions packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,46 @@ export const asyncEffect: <A, E = never, R = never>(
} | undefined
) => Stream<A, E, R> = internal.asyncEffect

/**
* Creates a stream from an external push-based resource.
*
* You can use the `emit` helper to emit values to the stream. The `emit` helper
* returns a boolean indicating whether the value was emitted or not.
*
* You can also use the `emit` helper to signal the end of the stream by
* using apis such as `emit.end` or `emit.fail`.
*
* By default it uses an "unbounded" buffer size.
* You can customize the buffer size and strategy by passing an object as the
* second argument with the `bufferSize` and `strategy` fields.
*
* @example
* import { Effect, Stream } from "effect"
*
* Stream.asyncPush<string>((emit) =>
* Effect.acquireRelease(
* Effect.gen(function*() {
* yield* Effect.log("subscribing")
* return setInterval(() => emit.single("tick"), 1000)
* }),
* (handle) =>
* Effect.gen(function*() {
* yield* Effect.log("unsubscribing")
* clearInterval(handle)
* })
* ), { bufferSize: 16, strategy: "dropping" })
*
* @since 3.6.0
* @category constructors
*/
export const asyncPush: <A, E = never, R = never>(
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | undefined
} | undefined
) => Stream<A, E, Exclude<R, Scope.Scope>> = internal.asyncPush

/**
* Creates a stream from an asynchronous callback that can be called multiple
* times. The registration of the callback itself returns an a scoped
Expand Down Expand Up @@ -5955,5 +5995,6 @@ export const fromEventListener: <A = unknown>(
readonly capture?: boolean
readonly passive?: boolean
readonly once?: boolean
readonly bufferSize?: number | "unbounded" | undefined
} | undefined
) => Stream<A> = internal.fromEventListener
53 changes: 53 additions & 0 deletions packages/effect/src/StreamEmit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,56 @@ export interface EmitOps<in R, in E, in A, out B> {
*/
single(value: A): Promise<B>
}

/**
* @since 3.6.0
* @category models
*/
export interface EmitOpsPush<in E, in A> {
/**
* Emits a chunk containing the specified values.
*/
chunk(chunk: Chunk.Chunk<A>): boolean

/**
* Emits a chunk containing the specified values.
*/
array(chunk: ReadonlyArray<A>): boolean

/**
* Terminates with a cause that dies with the specified defect.
*/
die<Err>(defect: Err): void

/**
* Terminates with a cause that dies with a `Throwable` with the specified
* message.
*/
dieMessage(message: string): void

/**
* Either emits the specified value if this `Exit` is a `Success` or else
* terminates with the specified cause if this `Exit` is a `Failure`.
*/
done(exit: Exit.Exit<A, E>): void

/**
* Terminates with an end of stream signal.
*/
end(): void

/**
* Terminates with the specified error.
*/
fail(error: E): void

/**
* Terminates the stream with the specified cause.
*/
halt(cause: Cause.Cause<E>): void

/**
* Emits a chunk containing the specified value.
*/
single(value: A): boolean
}
70 changes: 52 additions & 18 deletions packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import * as Either from "../Either.js"
import * as Equal from "../Equal.js"
import * as Exit from "../Exit.js"
import * as Fiber from "../Fiber.js"
import * as FiberRef from "../FiberRef.js"
import type { LazyArg } from "../Function.js"
import { constTrue, dual, identity, pipe } from "../Function.js"
import * as Layer from "../Layer.js"
Expand Down Expand Up @@ -597,6 +598,51 @@ export const asyncEffect = <A, E = never, R = never>(
fromChannel
)

const queueFromBufferOptionsPush = <A, E>(
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | undefined
} | undefined
): Effect.Effect<Queue.Queue<Array<A> | Exit.Exit<void, E>>> => {
if (options?.bufferSize === "unbounded" || (options?.bufferSize === undefined && options?.strategy === undefined)) {
return Queue.unbounded()
}
switch (options?.strategy) {
case "sliding":
return Queue.sliding(options.bufferSize ?? 16)
default:
return Queue.dropping(options?.bufferSize ?? 16)
}
}

/** @internal */
export const asyncPush = <A, E = never, R = never>(
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
options?: {
readonly bufferSize: "unbounded"
} | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | undefined
} | undefined
): Stream.Stream<A, E, Exclude<R, Scope.Scope>> =>
Effect.acquireRelease(
queueFromBufferOptionsPush<A, E>(options),
Queue.shutdown
).pipe(
Effect.tap((queue) =>
FiberRef.getWith(FiberRef.currentScheduler, (scheduler) => register(emit.makePush(queue, scheduler)))
),
Effect.map((queue) => {
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = core.flatMap(Queue.take(queue), (item) =>
Exit.isExit(item)
? Exit.isSuccess(item) ? core.void : core.failCause(item.cause)
: channel.zipRight(core.write(Chunk.unsafeFromArray(item)), loop))
return loop
}),
channel.unwrapScoped,
fromChannel
)

/** @internal */
export const asyncScoped = <A, E = never, R = never>(
register: (emit: Emit.Emit<R, E, A, void>) => Effect.Effect<unknown, E, R | Scope.Scope>,
Expand Down Expand Up @@ -8341,23 +8387,11 @@ export const fromEventListener = <A = unknown>(
readonly capture?: boolean
readonly passive?: boolean
readonly once?: boolean
readonly bufferSize?: number | "unbounded" | undefined
} | undefined
): Stream.Stream<A> =>
_async<A>((emit) => {
let batch: Array<A> = []
let taskRunning = false
function cb(e: A) {
batch.push(e)
if (!taskRunning) {
taskRunning = true
queueMicrotask(() => {
const events = batch
batch = []
taskRunning = false
emit.chunk(Chunk.unsafeFromArray(events))
})
}
}
target.addEventListener(type, cb as any, options)
return Effect.sync(() => target.removeEventListener(type, cb, options))
}, "unbounded")
asyncPush<A>((emit) =>
Effect.acquireRelease(
Effect.sync(() => target.addEventListener(type, emit.single as any, options)),
() => Effect.sync(() => target.removeEventListener(type, emit.single, options))
), { bufferSize: typeof options === "object" ? options.bufferSize : undefined })
77 changes: 77 additions & 0 deletions packages/effect/src/internal/stream/emit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import * as Effect from "../../Effect.js"
import * as Exit from "../../Exit.js"
import { pipe } from "../../Function.js"
import * as Option from "../../Option.js"
import type * as Queue from "../../Queue.js"
import type * as Scheduler from "../../Scheduler.js"
import type * as Emit from "../../StreamEmit.js"

/** @internal */
Expand Down Expand Up @@ -44,3 +46,78 @@ export const make = <R, E, A, B>(
}
return Object.assign(emit, ops)
}

/** @internal */
export const makePush = <E, A>(
queue: Queue.Queue<Array<A> | Exit.Exit<void, E>>,
scheduler: Scheduler.Scheduler
): Emit.EmitOpsPush<E, A> => {
let finished = false
let buffer: Array<A> = []
let running = false
function array(items: ReadonlyArray<A>) {
if (finished) return false
if (items.length <= 50_000) {
buffer.push.apply(buffer, items as Array<A>)
} else {
for (let i = 0; i < items.length; i++) {
buffer.push(items[0])
}
}
if (!running) {
running = true
scheduler.scheduleTask(flush, 0)
}
return true
}
function flush() {
running = false
if (buffer.length > 0) {
queue.unsafeOffer(buffer)
buffer = []
}
}
function done(exit: Exit.Exit<A, E>) {
if (finished) return
finished = true
if (exit._tag === "Success") {
buffer.push(exit.value)
}
flush()
queue.unsafeOffer(exit._tag === "Success" ? Exit.void : exit)
}
return {
single(value: A) {
if (finished) return false
buffer.push(value)
if (!running) {
running = true
scheduler.scheduleTask(flush, 0)
}
return true
},
array,
chunk(chunk) {
return array(Chunk.toReadonlyArray(chunk))
},
done,
end() {
if (finished) return
finished = true
flush()
queue.unsafeOffer(Exit.void)
},
halt(cause: Cause.Cause<E>) {
return done(Exit.failCause(cause))
},
fail(error: E) {
return done(Exit.fail(error))
},
die<Err>(defect: Err): void {
return done(Exit.die(defect))
},
dieMessage(message: string): void {
return done(Exit.die(new Error(message)))
}
}
}
56 changes: 56 additions & 0 deletions packages/effect/test/Stream/async.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,4 +406,60 @@ describe("Stream", () => {
yield* $(Fiber.interrupt(fiber), Effect.exit)
assert.isFalse(result)
}))

it.effect("asyncPush", () =>
Effect.gen(function*() {
const array = [1, 2, 3, 4, 5]
const latch = yield* Deferred.make<void>()
const fiber = yield* Stream.asyncPush<number>((emit) => {
array.forEach((n) => {
emit.single(n)
})
return pipe(
Deferred.succeed(latch, void 0),
Effect.asVoid
)
}).pipe(
Stream.take(array.length),
Stream.run(Sink.collectAll()),
Effect.fork
)
yield* Deferred.await(latch)
const result = yield* Fiber.join(fiber)
assert.deepStrictEqual(Array.from(result), array)
}))

it.effect("asyncPush - signals the end of the stream", () =>
Effect.gen(function*() {
const result = yield* Stream.asyncPush<number>((emit) => {
emit.end()
return Effect.void
}).pipe(Stream.runCollect)
assert.isTrue(Chunk.isEmpty(result))
}))

it.effect("asyncPush - handles errors", () =>
Effect.gen(function*() {
const error = new Cause.RuntimeException("boom")
const result = yield* Stream.asyncPush<number, Cause.RuntimeException>((emit) => {
emit.fail(error)
return Effect.void
}).pipe(
Stream.runCollect,
Effect.exit
)
assert.deepStrictEqual(result, Exit.fail(error))
}))

it.effect("asyncPush - handles defects", () =>
Effect.gen(function*() {
const error = new Cause.RuntimeException("boom")
const result = yield* Stream.asyncPush<number, Cause.RuntimeException>(() => {
throw error
}).pipe(
Stream.runCollect,
Effect.exit
)
assert.deepStrictEqual(result, Exit.die(error))
}))
})
Loading

0 comments on commit a8ff9fa

Please sign in to comment.