Skip to content

Commit

Permalink
use Mailbox for NodeStream module (#3616)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Sep 16, 2024
1 parent e3eae02 commit cd75658
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 32 deletions.
5 changes: 5 additions & 0 deletions .changeset/great-peaches-jog.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform-node-shared": patch
---

use Mailbox for NodeStream module
57 changes: 25 additions & 32 deletions packages/platform-node-shared/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@ import * as Cause from "effect/Cause"
import * as Channel from "effect/Channel"
import * as Chunk from "effect/Chunk"
import * as Effect from "effect/Effect"
import * as Either from "effect/Either"
import * as Exit from "effect/Exit"
import type { LazyArg } from "effect/Function"
import { dual } from "effect/Function"
import * as Queue from "effect/Queue"
import * as Mailbox from "effect/Mailbox"
import * as Runtime from "effect/Runtime"
import * as Scope from "effect/Scope"
import type * as AsyncInput from "effect/SingleProducerAsyncInput"
Expand Down Expand Up @@ -118,27 +117,27 @@ export const fromDuplex = <IE, E, I = Uint8Array | string, O = Uint8Array>(
Effect.tap(
Effect.zip(
Effect.sync(evaluate),
Queue.unbounded<Either.Either<void, Exit.Exit<void, IE | E>>>()
Mailbox.make<void, IE | E>()
),
([duplex, queue]) => readableOffer(duplex, queue, onError)
([duplex, mailbox]) => readableOffer(duplex, mailbox, onError)
),
([duplex, queue]) =>
([duplex, mailbox]) =>
Channel.embedInput(
readableTake(duplex, queue, options.chunkSize ? Number(options.chunkSize) : undefined),
readableTake(duplex, mailbox, options.chunkSize ? Number(options.chunkSize) : undefined),
writeInput(
duplex,
(cause) => Queue.offer(queue, Either.left(Exit.failCause(cause))),
(cause) => mailbox.failCause(cause),
options
)
),
([duplex, queue]) =>
([duplex, mailbox]) =>
Effect.zipRight(
Effect.sync(() => {
if (!duplex.closed) {
duplex.destroy()
}
}),
Queue.shutdown(queue)
mailbox.shutdown
)
)

Expand Down Expand Up @@ -199,19 +198,19 @@ export const fromReadableChannel = <E, A = Uint8Array>(
Effect.tap(
Effect.zip(
Effect.sync(evaluate),
Queue.unbounded<Either.Either<void, Exit.Exit<void, E>>>()
Mailbox.make<void, E>()
),
([readable, queue]) => readableOffer(readable, queue, onError)
([readable, mailbox]) => readableOffer(readable, mailbox, onError)
),
([readable, queue]) => readableTake(readable, queue, chunkSize),
([readable, queue]) =>
([readable, mailbox]) => readableTake(readable, mailbox, chunkSize),
([readable, mailbox]) =>
Effect.zipRight(
Effect.sync(() => {
if ("closed" in readable && !readable.closed) {
readable.destroy()
}
}),
Queue.shutdown(queue)
mailbox.shutdown
)
)

Expand Down Expand Up @@ -269,42 +268,33 @@ export const writeEffect = <A>(

const readableOffer = <E>(
readable: Readable | NodeJS.ReadableStream,
queue: Queue.Queue<Either.Either<void, Exit.Exit<void, E>>>,
mailbox: Mailbox.Mailbox<void, E>,
onError: (error: unknown) => E
) =>
Effect.sync(() => {
readable.on("readable", () => {
const size = queue.unsafeSize()
if (size._tag === "Some" && size.value <= 0) {
queue.unsafeOffer(Either.right(void 0))
}
mailbox.unsafeOffer(void 0)
})
readable.on("error", (err) => {
queue.unsafeOffer(Either.left(Exit.fail(onError(err))))
mailbox.unsafeDone(Exit.fail(onError(err)))
})
readable.on("end", () => {
queue.unsafeOffer(Either.left(Exit.void))
mailbox.unsafeDone(Exit.void)
})
if (readable.readable) {
queue.unsafeOffer(Either.right(void 0))
mailbox.unsafeOffer(void 0)
}
})

const readableTake = <E, A>(
readable: Readable | NodeJS.ReadableStream,
queue: Queue.Queue<Either.Either<void, Exit.Exit<void, E>>>,
mailbox: Mailbox.Mailbox<void, E>,
chunkSize: number | undefined
) => {
const read = readChunkChannel<A>(readable, chunkSize)
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E, unknown, void, unknown> = Channel.flatMap(
Queue.take(queue),
Either.match({
onLeft: Exit.match({
onFailure: Channel.failCause,
onSuccess: (_) => Channel.void
}),
onRight: (_) => Channel.flatMap(read, () => loop)
})
const loop: Channel.Channel<Chunk.Chunk<A>, unknown, E> = Channel.flatMap(
mailbox.takeAll,
([, done]) => done ? read : Channel.zipRight(read, loop)
)
return loop
}
Expand All @@ -316,6 +306,9 @@ const readChunkChannel = <A>(
Channel.suspend(() => {
const arr: Array<A> = []
let chunk = readable.read(chunkSize)
if (chunk === null) {
return Channel.void
}
while (chunk !== null) {
arr.push(chunk)
chunk = readable.read(chunkSize)
Expand Down

0 comments on commit cd75658

Please sign in to comment.