Skip to content

Commit

Permalink
refactor Socket to use do notation (#2988)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored Jun 14, 2024
1 parent e32b5eb commit 07e12ec
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 207 deletions.
6 changes: 6 additions & 0 deletions .changeset/cold-queens-act.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"@effect/platform-node-shared": patch
"@effect/platform": patch
---

refactor Socket to use do notation
167 changes: 88 additions & 79 deletions packages/platform-node-shared/src/NodeSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,93 +74,102 @@ export const makeNet = (
export const fromNetSocket = <RO>(
open: Effect.Effect<Net.Socket, Socket.SocketError, RO>
): Effect.Effect<Socket.Socket, never, Exclude<RO, Scope.Scope>> =>
Effect.gen(function*(_) {
const sendQueue = yield* _(Queue.dropping<Uint8Array | string | Socket.CloseEvent | typeof EOF>(
yield* FiberRef.get(Socket.currentSendQueueCapacity)
))
const openContext = yield* Effect.context<Exclude<RO, Scope.Scope>>()

const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) =>
Effect.gen(function*(_) {
const scope = yield* Effect.scope
const conn = yield* open.pipe(
Effect.provide(Context.add(openContext, Scope.Scope, scope))
) as Effect.Effect<Net.Socket>
const fiberSet = yield* _(FiberSet.make<any, E | Socket.SocketError>())
const run = yield* _(
FiberSet.runtime(fiberSet)<R>(),
Effect.provideService(NetSocket, conn)
)
yield* _(
Queue.take(sendQueue),
Effect.tap((chunk) =>
Effect.async<void, Socket.SocketError, never>((resume) => {
if (Socket.isCloseEvent(chunk)) {
conn.destroy(chunk.code > 1000 ? new Error(`closed with code ${chunk.code}`) : undefined)
} else if (chunk === EOF) {
conn.end(() => resume(Effect.void))
} else {
conn.write(chunk, (error) => {
resume(error ? Effect.fail(new Socket.SocketGenericError({ reason: "Write", error })) : Effect.void)
FiberRef.get(Socket.currentSendQueueCapacity).pipe(
Effect.flatMap((sendQueueCapacity) =>
Queue.dropping<Uint8Array | string | Socket.CloseEvent | typeof EOF>(
sendQueueCapacity
)
),
Effect.bindTo("sendQueue"),
Effect.bind("openContext", () => Effect.context<Exclude<RO, Scope.Scope>>()),
Effect.map(({ openContext, sendQueue }) => {
const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) =>
Effect.scope.pipe(
Effect.bindTo("scope"),
Effect.bind("conn", ({ scope }) =>
open.pipe(
Effect.provide(Context.add(openContext, Scope.Scope, scope))
) as Effect.Effect<Net.Socket>),
Effect.bind("fiberSet", (_) => FiberSet.make<any, E | Socket.SocketError>()),
Effect.bind("run", ({ conn, fiberSet }) =>
FiberSet.runtime(fiberSet)<R>().pipe(
Effect.provideService(NetSocket, conn)
)),
Effect.tap(({ conn, fiberSet }) =>
Queue.take(sendQueue).pipe(
Effect.tap((chunk) =>
Effect.async<void, Socket.SocketError, never>((resume) => {
if (Socket.isCloseEvent(chunk)) {
conn.destroy(chunk.code > 1000 ? new Error(`closed with code ${chunk.code}`) : undefined)
} else if (chunk === EOF) {
conn.end(() => resume(Effect.void))
} else {
conn.write(chunk, (error) => {
resume(
error ? Effect.fail(new Socket.SocketGenericError({ reason: "Write", error })) : Effect.void
)
})
}
return Effect.void
})
}
return Effect.void
})
),
Effect.forever,
Effect.withUnhandledErrorLogLevel(Option.none()),
FiberSet.run(fiberSet)
)
),
Effect.forever,
Effect.withUnhandledErrorLogLevel(Option.none()),
FiberSet.run(fiberSet)
)
conn.on("data", (chunk) => {
run(handler(chunk))
})
yield* _(
Effect.async<void, Socket.SocketError, never>((resume) => {
function onEnd() {
resume(Effect.void)
}
function onError(error: Error) {
resume(Effect.fail(new Socket.SocketGenericError({ reason: "Read", error })))
}
function onClose(hadError: boolean) {
resume(
Effect.fail(
new Socket.SocketCloseError({
reason: "Close",
code: hadError ? 1006 : 1000
})
)
)
}
conn.on("end", onEnd)
conn.on("error", onError)
conn.on("close", onClose)
return Effect.sync(() => {
conn.off("end", onEnd)
conn.off("error", onError)
conn.off("close", onClose)
Effect.tap(({ conn, fiberSet, run }) => {
conn.on("data", (chunk) => {
run(handler(chunk))
})

return Effect.async<void, Socket.SocketError, never>((resume) => {
function onEnd() {
resume(Effect.void)
}
function onError(error: Error) {
resume(Effect.fail(new Socket.SocketGenericError({ reason: "Read", error })))
}
function onClose(hadError: boolean) {
resume(
Effect.fail(
new Socket.SocketCloseError({
reason: "Close",
code: hadError ? 1006 : 1000
})
)
)
}
conn.on("end", onEnd)
conn.on("error", onError)
conn.on("close", onClose)
return Effect.sync(() => {
conn.off("end", onEnd)
conn.off("error", onError)
conn.off("close", onClose)
})
}).pipe(
Effect.raceFirst(FiberSet.join(fiberSet))
)
}),
Effect.raceFirst(FiberSet.join(fiberSet))
Effect.scoped,
Effect.interruptible
)
}).pipe(
Effect.scoped,
Effect.interruptible
)

const write = (chunk: Uint8Array | string | Socket.CloseEvent) => Queue.offer(sendQueue, chunk)
const writer = Effect.acquireRelease(
Effect.succeed(write),
() => Queue.offer(sendQueue, EOF)
)
const write = (chunk: Uint8Array | string | Socket.CloseEvent) => Queue.offer(sendQueue, chunk)
const writer = Effect.acquireRelease(
Effect.succeed(write),
() => Queue.offer(sendQueue, EOF)
)

return Socket.Socket.of({
[Socket.TypeId]: Socket.TypeId,
run,
runRaw: run,
writer
return Socket.Socket.of({
[Socket.TypeId]: Socket.TypeId,
run,
runRaw: run,
writer
})
})
})
)

/**
* @since 1.0.0
Expand Down
Loading

0 comments on commit 07e12ec

Please sign in to comment.