diff --git a/.changeset/clean-elephants-wave.md b/.changeset/clean-elephants-wave.md new file mode 100644 index 0000000000..eee47d19d5 --- /dev/null +++ b/.changeset/clean-elephants-wave.md @@ -0,0 +1,5 @@ +--- +"@effect/platform": patch +--- + +ensure Socket.toChannel fiber is attached to Scope diff --git a/packages/platform/src/Socket.ts b/packages/platform/src/Socket.ts index 6fce62d45d..fe30db1c40 100644 --- a/packages/platform/src/Socket.ts +++ b/packages/platform/src/Socket.ts @@ -209,12 +209,12 @@ export const toChannel = ( } }) ), - Effect.tap(({ exitQueue }) => + Effect.tap(({ exitQueue, scope }) => self.run((data) => Queue.offer(exitQueue, Exit.succeed(Chunk.of(data)))).pipe( Effect.zipRight(Effect.failCause(Cause.empty)), Effect.exit, Effect.tap((exit) => Queue.offer(exitQueue, exit)), - Effect.fork, + Effect.forkIn(scope), Effect.interruptible ) ), @@ -348,16 +348,12 @@ export const fromWebSocket = ( Effect.map( Queue.dropping(fiber.getFiberRef(currentSendQueueCapacity)), (sendQueue) => { - const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context> + const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) => - Effect.scope.pipe( - Effect.bindTo("scope"), - Effect.bind("ws", ({ scope }) => - acquire.pipe( - Effect.provide(Context.add(acquireContext, Scope.Scope, scope)) - ) as Effect.Effect), - Effect.bind("fiberSet", (_) => FiberSet.make()), + acquire.pipe( + Effect.bindTo("ws"), + Effect.bind("fiberSet", () => FiberSet.make()), Effect.bind("run", ({ fiberSet, ws }) => Effect.provideService(FiberSet.runtime(fiberSet)(), WebSocket, ws)), Effect.tap(({ fiberSet, run, ws }) => { @@ -392,19 +388,12 @@ export const fromWebSocket = ( } if (ws.readyState !== 1) { - return Effect.async((resume) => { - function onOpen() { - ws.removeEventListener("open", onOpen) - resume(Effect.void) - } - ws.addEventListener("open", onOpen) - return Effect.sync(() => { - ws.removeEventListener("open", onOpen) - }) - }).pipe( - Effect.tap((_) => { - open = true - }), + const openDeferred = Deferred.unsafeMake(fiber.id()) + ws.onopen = () => { + open = true + Deferred.unsafeDone(openDeferred, Effect.void) + } + return Deferred.await(openDeferred).pipe( Effect.timeoutFail({ duration: options?.openTimeout ?? 10000, onTimeout: () => @@ -445,6 +434,7 @@ export const fromWebSocket = ( (_) => Effect.void ) ), + Effect.mapInputContext((input: Context.Context) => Context.merge(acquireContext, input)), Effect.scoped, Effect.interruptible ) @@ -534,15 +524,11 @@ export const fromTransformStream = (acquire: Effect.Effect(fiber.getFiberRef(currentSendQueueCapacity)), (sendQueue) => { - const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context> + const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) => - Effect.scope.pipe( - Effect.bindTo("scope"), - Effect.bind("stream", ({ scope }) => - acquire.pipe( - Effect.provide(Context.add(acquireContext, Scope.Scope, scope)) - ) as Effect.Effect), + acquire.pipe( + Effect.bindTo("stream"), Effect.bind("reader", ({ stream }) => Effect.acquireRelease( Effect.sync(() => stream.readable.getReader()), @@ -615,6 +601,7 @@ export const fromTransformStream = (acquire: Effect.Effect Effect.void ) ), + Effect.mapInputContext((input: Context.Context) => Context.merge(acquireContext, input)), Effect.scoped, Effect.interruptible )