Skip to content

Commit

Permalink
ensure Socket.toChannel fiber is attached to Scope (#3550)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Sep 5, 2024
1 parent b63ce8f commit 4a701c4
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 30 deletions.
5 changes: 5 additions & 0 deletions .changeset/clean-elephants-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/platform": patch
---

ensure Socket.toChannel fiber is attached to Scope
47 changes: 17 additions & 30 deletions packages/platform/src/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,12 @@ export const toChannel = <IE>(
}
})
),
Effect.tap(({ exitQueue }) =>
Effect.tap(({ exitQueue, scope }) =>
self.run((data) => Queue.offer(exitQueue, Exit.succeed(Chunk.of(data)))).pipe(
Effect.zipRight(Effect.failCause(Cause.empty)),
Effect.exit,
Effect.tap((exit) => Queue.offer(exitQueue, exit)),
Effect.fork,
Effect.forkIn(scope),
Effect.interruptible
)
),
Expand Down Expand Up @@ -348,16 +348,12 @@ export const fromWebSocket = <R>(
Effect.map(
Queue.dropping<Uint8Array | string | CloseEvent>(fiber.getFiberRef(currentSendQueueCapacity)),
(sendQueue) => {
const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context<Exclude<R, Scope.Scope>>
const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context<R>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) =>
Effect.scope.pipe(
Effect.bindTo("scope"),
Effect.bind("ws", ({ scope }) =>
acquire.pipe(
Effect.provide(Context.add(acquireContext, Scope.Scope, scope))
) as Effect.Effect<globalThis.WebSocket>),
Effect.bind("fiberSet", (_) => FiberSet.make<any, E | SocketError>()),
acquire.pipe(
Effect.bindTo("ws"),
Effect.bind("fiberSet", () => FiberSet.make<any, E | SocketError>()),
Effect.bind("run", ({ fiberSet, ws }) =>
Effect.provideService(FiberSet.runtime(fiberSet)<R>(), WebSocket, ws)),
Effect.tap(({ fiberSet, run, ws }) => {
Expand Down Expand Up @@ -392,19 +388,12 @@ export const fromWebSocket = <R>(
}

if (ws.readyState !== 1) {
return Effect.async<void, SocketError, never>((resume) => {
function onOpen() {
ws.removeEventListener("open", onOpen)
resume(Effect.void)
}
ws.addEventListener("open", onOpen)
return Effect.sync(() => {
ws.removeEventListener("open", onOpen)
})
}).pipe(
Effect.tap((_) => {
open = true
}),
const openDeferred = Deferred.unsafeMake<void>(fiber.id())
ws.onopen = () => {
open = true
Deferred.unsafeDone(openDeferred, Effect.void)
}
return Deferred.await(openDeferred).pipe(
Effect.timeoutFail({
duration: options?.openTimeout ?? 10000,
onTimeout: () =>
Expand Down Expand Up @@ -445,6 +434,7 @@ export const fromWebSocket = <R>(
(_) => Effect.void
)
),
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)),
Effect.scoped,
Effect.interruptible
)
Expand Down Expand Up @@ -534,15 +524,11 @@ export const fromTransformStream = <R>(acquire: Effect.Effect<InputTransformStre
Effect.map(
Queue.dropping<Uint8Array | string | CloseEvent | typeof EOF>(fiber.getFiberRef(currentSendQueueCapacity)),
(sendQueue) => {
const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context<Exclude<R, Scope.Scope>>
const acquireContext = fiber.getFiberRef(FiberRef.currentContext) as Context.Context<R>
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const runRaw = <_, E, R>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) =>
Effect.scope.pipe(
Effect.bindTo("scope"),
Effect.bind("stream", ({ scope }) =>
acquire.pipe(
Effect.provide(Context.add(acquireContext, Scope.Scope, scope))
) as Effect.Effect<InputTransformStream>),
acquire.pipe(
Effect.bindTo("stream"),
Effect.bind("reader", ({ stream }) =>
Effect.acquireRelease(
Effect.sync(() => stream.readable.getReader()),
Expand Down Expand Up @@ -615,6 +601,7 @@ export const fromTransformStream = <R>(acquire: Effect.Effect<InputTransformStre
(_) => Effect.void
)
),
Effect.mapInputContext((input: Context.Context<R | Scope.Scope>) => Context.merge(acquireContext, input)),
Effect.scoped,
Effect.interruptible
)
Expand Down

0 comments on commit 4a701c4

Please sign in to comment.