diff --git a/.changeset/bright-apricots-sit.md b/.changeset/bright-apricots-sit.md
new file mode 100644
index 00000000000..d5dc12b8ce9
--- /dev/null
+++ b/.changeset/bright-apricots-sit.md
@@ -0,0 +1,5 @@
+---
+"effect": minor
+---
+
+Add Stream.toReadableStreamEffect / .toReadableStreamRuntime
diff --git a/.changeset/seven-cougars-jump.md b/.changeset/seven-cougars-jump.md
new file mode 100644
index 00000000000..5848a537026
--- /dev/null
+++ b/.changeset/seven-cougars-jump.md
@@ -0,0 +1,5 @@
+---
+"@effect/platform": patch
+---
+
+Run client request stream with a current runtime.
diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts
index 83d0564512d..ea1e2b8c124 100644
--- a/packages/effect/src/Stream.ts
+++ b/packages/effect/src/Stream.ts
@@ -22,6 +22,7 @@ import type { Pipeable } from "./Pipeable.js"
import type { Predicate, Refinement } from "./Predicate.js"
import type * as PubSub from "./PubSub.js"
import type * as Queue from "./Queue.js"
+import type { Runtime } from "./Runtime.js"
import type * as Schedule from "./Schedule.js"
import type * as Scope from "./Scope.js"
import type * as Sink from "./Sink.js"
@@ -3859,7 +3860,36 @@ export const toQueueOfElements: {
* @since 2.0.0
* @category destructors
*/
-export const toReadableStream: (source: Stream) => ReadableStream = internal.toReadableStream
+export const toReadableStream: (self: Stream) => ReadableStream = internal.toReadableStream
+
+/**
+ * Converts the stream to a `Effect`.
+ *
+ * See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
+ *
+ * @since 2.0.0
+ * @category destructors
+ */
+export const toReadableStreamEffect: (self: Stream) => Effect.Effect, never, R> =
+ internal.toReadableStreamEffect
+
+/**
+ * Converts the stream to a `ReadableStream` using the provided runtime.
+ *
+ * See https://developer.mozilla.org/en-US/docs/Web/API/ReadableStream.
+ *
+ * @since 2.0.0
+ * @category destructors
+ */
+export const toReadableStreamRuntime: {
+ (
+ runtime: Runtime
+ ): (self: Stream) => ReadableStream
+ (
+ self: Stream,
+ runtime: Runtime
+ ): ReadableStream
+} = internal.toReadableStreamRuntime
/**
* Applies the transducer to the stream and emits its outputs.
diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts
index f095b2b7d1e..15ec125e449 100644
--- a/packages/effect/src/internal/stream.ts
+++ b/packages/effect/src/internal/stream.ts
@@ -6541,16 +6541,30 @@ export const toQueueOfElements = dual<
))
/** @internal */
-export const toReadableStream = (source: Stream.Stream) => {
- let pull: Effect.Effect
+export const toReadableStream = (self: Stream.Stream) =>
+ toReadableStreamRuntime(self, Runtime.defaultRuntime)
+
+/** @internal */
+export const toReadableStreamEffect = (self: Stream.Stream) =>
+ Effect.map(Effect.runtime(), (runtime) => toReadableStreamRuntime(self, runtime))
+
+/** @internal */
+export const toReadableStreamRuntime = dual<
+ (runtime: Runtime.Runtime) => (self: Stream.Stream) => ReadableStream,
+ (self: Stream.Stream, runtime: Runtime.Runtime) => ReadableStream
+>(2, (self: Stream.Stream, runtime: Runtime.Runtime): ReadableStream => {
+ const runSync = Runtime.runSync(runtime)
+ const runPromise = Runtime.runPromise(runtime)
+
+ let pull: Effect.Effect
let scope: Scope.CloseableScope
return new ReadableStream({
start(controller) {
- scope = Effect.runSync(Scope.make())
+ scope = runSync(Scope.make())
pull = pipe(
- toPull(source),
- Scope.use(scope),
- Effect.runSync,
+ toPull(self),
+ Scope.extend(scope),
+ runSync,
Effect.tap((chunk) =>
Effect.sync(() => {
Chunk.map(chunk, (a) => {
@@ -6573,13 +6587,13 @@ export const toReadableStream = (source: Stream.Stream) => {
)
},
pull() {
- return Effect.runPromise(pull)
+ return runPromise(pull)
},
cancel() {
- return Effect.runPromise(Scope.close(scope, Exit.void))
+ return runPromise(Scope.close(scope, Exit.void))
}
})
-}
+})
/** @internal */
export const transduce = dual<
diff --git a/packages/platform-bun/src/internal/http/server.ts b/packages/platform-bun/src/internal/http/server.ts
index ecb923c1033..d5b0cb8dff7 100644
--- a/packages/platform-bun/src/internal/http/server.ts
+++ b/packages/platform-bun/src/internal/http/server.ts
@@ -27,6 +27,7 @@ import * as Inspectable from "effect/Inspectable"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import type { ReadonlyRecord } from "effect/Record"
+import type * as Runtime from "effect/Runtime"
import type * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
import { Readable } from "node:stream"
@@ -73,25 +74,27 @@ export const make = (
return Server.make({
address: { _tag: "TcpAddress", port: server.port, hostname: server.hostname },
serve(httpApp, middleware) {
- const app = App.toHandled(httpApp, (request, exit) =>
- Effect.sync(() => {
- const impl = request as ServerRequestImpl
- if (exit._tag === "Success") {
- impl.resolve(makeResponse(request, exit.value))
- } else if (Cause.isInterruptedOnly(exit.cause)) {
- impl.resolve(
- new Response(undefined, {
- status: impl.source.signal.aborted ? 499 : 503
- })
- )
- } else {
- impl.reject(Cause.pretty(exit.cause))
- }
- }), middleware)
-
return pipe(
FiberSet.makeRuntime(),
- Effect.flatMap((runFork) =>
+ Effect.bindTo("runFork"),
+ Effect.bind("runtime", () => Effect.runtime()),
+ Effect.let("app", ({ runtime }) =>
+ App.toHandled(httpApp, (request, exit) =>
+ Effect.sync(() => {
+ const impl = request as ServerRequestImpl
+ if (exit._tag === "Success") {
+ impl.resolve(makeResponse(request, exit.value, runtime))
+ } else if (Cause.isInterruptedOnly(exit.cause)) {
+ impl.resolve(
+ new Response(undefined, {
+ status: impl.source.signal.aborted ? 499 : 503
+ })
+ )
+ } else {
+ impl.reject(Cause.pretty(exit.cause))
+ }
+ }), middleware)),
+ Effect.flatMap(({ app, runFork }) =>
Effect.async((_) => {
function handler(request: Request, server: BunServer) {
return new Promise((resolve, reject) => {
@@ -121,7 +124,11 @@ export const make = (
})
})
-const makeResponse = (request: ServerRequest.ServerRequest, response: ServerResponse.ServerResponse): Response => {
+const makeResponse = (
+ request: ServerRequest.ServerRequest,
+ response: ServerResponse.ServerResponse,
+ runtime: Runtime.Runtime
+): Response => {
const fields: {
headers: globalThis.Headers
status?: number
@@ -157,7 +164,10 @@ const makeResponse = (request: ServerRequest.ServerRequest, response: ServerResp
return new Response(body.formData as any, fields)
}
case "Stream": {
- return new Response(Stream.toReadableStream(body.stream), fields)
+ return new Response(
+ Stream.toReadableStreamRuntime(body.stream, runtime),
+ fields
+ )
}
}
}
diff --git a/packages/platform/src/Http/ServerResponse.ts b/packages/platform/src/Http/ServerResponse.ts
index 126b209a472..a84c3b06a8c 100644
--- a/packages/platform/src/Http/ServerResponse.ts
+++ b/packages/platform/src/Http/ServerResponse.ts
@@ -165,7 +165,7 @@ export const formData: (body: FormData, options?: Options.WithContent | undefine
* @since 1.0.0
* @category constructors
*/
-export const stream: (body: Stream.Stream, options?: Options | undefined) => ServerResponse =
+export const stream: (body: Stream.Stream, options?: Options | undefined) => ServerResponse =
internal.stream
/**
diff --git a/packages/platform/src/internal/http/client.ts b/packages/platform/src/internal/http/client.ts
index 2be03beb81f..ca4b08f806b 100644
--- a/packages/platform/src/internal/http/client.ts
+++ b/packages/platform/src/internal/http/client.ts
@@ -14,7 +14,6 @@ import * as Ref from "effect/Ref"
import type * as Schedule from "effect/Schedule"
import * as Scope from "effect/Scope"
import * as Stream from "effect/Stream"
-import type * as Body from "../../Http/Body.js"
import type * as Client from "../../Http/Client.js"
import * as Error from "../../Http/ClientError.js"
import type * as ClientRequest from "../../Http/ClientRequest.js"
@@ -222,26 +221,19 @@ export const fetch: Client.Client.Default = makeDefault((request, url, signal, f
(response) => internalResponse.fromWeb(request, response)
)
if (Method.hasBody(request.method)) {
- return send(convertBody(request.body))
+ switch (request.body._tag) {
+ case "Raw":
+ case "Uint8Array":
+ return send(request.body.body as any)
+ case "FormData":
+ return send(request.body.formData)
+ case "Stream":
+ return Effect.flatMap(Stream.toReadableStreamEffect(request.body.stream), send)
+ }
}
return send(undefined)
})
-const convertBody = (body: Body.Body): BodyInit | undefined => {
- switch (body._tag) {
- case "Empty":
- return undefined
- case "Raw":
- return body.body as any
- case "Uint8Array":
- return body.body
- case "FormData":
- return body.formData
- case "Stream":
- return Stream.toReadableStream(body.stream)
- }
-}
-
/** @internal */
export const transform = dual<
(
diff --git a/packages/platform/src/internal/http/serverResponse.ts b/packages/platform/src/internal/http/serverResponse.ts
index e576c4ca961..3834e6f5500 100644
--- a/packages/platform/src/internal/http/serverResponse.ts
+++ b/packages/platform/src/internal/http/serverResponse.ts
@@ -258,8 +258,8 @@ export const formData = (
)
/** @internal */
-export const stream = (
- body: Stream.Stream,
+export const stream = (
+ body: Stream.Stream,
options?: ServerResponse.Options | undefined
): ServerResponse.ServerResponse =>
new ServerResponseImpl(
diff --git a/packages/platform/test/HttpClient.test.ts b/packages/platform/test/HttpClient.test.ts
index b513e990d49..15d02327d2a 100644
--- a/packages/platform/test/HttpClient.test.ts
+++ b/packages/platform/test/HttpClient.test.ts
@@ -4,6 +4,7 @@ import { Ref } from "effect"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as Layer from "effect/Layer"
+import * as Logger from "effect/Logger"
import * as Stream from "effect/Stream"
import { assert, describe, expect, it } from "vitest"
@@ -129,4 +130,26 @@ describe("HttpClient", () => {
const response = yield* _(Http.request.get("/todos/1"), todoClient)
expect(response.id).toBe(1)
}).pipe(Effect.provide(Http.client.layer), Effect.runPromise))
+
+ it("streamBody accesses the current runtime", () =>
+ Effect.gen(function*(_) {
+ const defaultClient = yield* _(Http.client.Client)
+
+ const requestStream = Stream.fromIterable(["hello", "world"]).pipe(
+ Stream.tap((_) => Effect.log(_)),
+ Stream.encodeText
+ )
+
+ const logs: Array = []
+ const logger = Logger.make(({ message }) => logs.push(message))
+
+ yield* Http.request.post("https://jsonplaceholder.typicode.com").pipe(
+ Http.request.streamBody(requestStream),
+ defaultClient,
+ Effect.provide(Logger.replace(Logger.defaultLogger, logger)),
+ Effect.scoped
+ )
+
+ expect(logs).toEqual(["hello", "world"])
+ }).pipe(Effect.provide(Http.client.layer), Effect.runPromise))
})