Skip to content

Commit

Permalink
fix memory leak in Socket's (#2750)
Browse files Browse the repository at this point in the history
  • Loading branch information
tim-smart authored May 15, 2024
1 parent 40c2b1d commit 6ac4847
Show file tree
Hide file tree
Showing 8 changed files with 97 additions and 46 deletions.
7 changes: 7 additions & 0 deletions .changeset/fast-lamps-flash.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@effect/platform-node-shared": patch
"@effect/experimental": patch
"@effect/platform": patch
---

fix memory leak in Socket's
5 changes: 5 additions & 0 deletions .changeset/mean-bats-learn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

ensure exponential schedules don't reach Infinity
4 changes: 3 additions & 1 deletion packages/effect/src/internal/schedule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,9 @@ export const exponential = (
factor = 2.0
): Schedule.Schedule<Duration.Duration> => {
const base = Duration.decode(baseInput)
return delayedSchedule(map(forever, (i) => Duration.times(base, Math.pow(factor, i))))
return delayedSchedule(
map(forever, (i) => Duration.times(base, Math.min(Number.MAX_SAFE_INTEGER, Math.pow(factor, i))))
)
}

/** @internal */
Expand Down
7 changes: 1 addition & 6 deletions packages/experimental/src/DevTools/Client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,7 @@ export const make: Effect.Effect<ClientImpl, never, Scope.Scope | Socket.Socket>
}
}
}),
Effect.tapErrorCause(Effect.logDebug),
Effect.retry(
Schedule.exponential("500 millis").pipe(
Schedule.union(Schedule.spaced("10 seconds"))
)
),
Effect.retry(Schedule.spaced("3 seconds")),
Effect.forkScoped,
Effect.interruptible
)
Expand Down
5 changes: 3 additions & 2 deletions packages/platform-bun/src/internal/http/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,14 @@ class ServerRequestImpl extends Inspectable.Class implements ServerRequest.Serve
}
resume(Effect.map(Deferred.await(deferred), (ws) => {
const write = (chunk: Uint8Array | string | Socket.CloseEvent) =>
Effect.sync(() =>
Effect.sync(() => {
typeof chunk === "string"
? ws.sendText(chunk)
: Socket.isCloseEvent(chunk)
? ws.close(chunk.code, chunk.reason)
: ws.sendBinary(chunk)
)
return true
})
const writer = Effect.succeed(write)
const runRaw = <R, E, _>(
handler: (_: Uint8Array | string) => Effect.Effect<_, E, R>
Expand Down
42 changes: 29 additions & 13 deletions packages/platform-node-shared/src/NodeSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import * as Channel from "effect/Channel"
import type * as Chunk from "effect/Chunk"
import * as Context from "effect/Context"
import * as Effect from "effect/Effect"
import * as FiberRef from "effect/FiberRef"
import * as FiberSet from "effect/FiberSet"
import * as Layer from "effect/Layer"
import * as Option from "effect/Option"
import * as Queue from "effect/Queue"
import type * as Scope from "effect/Scope"
import * as Scope from "effect/Scope"
import * as Net from "node:net"

/**
Expand All @@ -36,7 +37,7 @@ const EOF = Symbol.for("@effect/experimental/Socket/Node/EOF")
*/
export const makeNet = (
options: Net.NetConnectOpts
): Effect.Effect<Socket.Socket, Socket.SocketError, Scope.Scope> =>
): Effect.Effect<Socket.Socket, Socket.SocketError> =>
fromNetSocket(
Effect.acquireRelease(
Effect.async<Net.Socket, Socket.SocketError, never>((resume) => {
Expand Down Expand Up @@ -70,15 +71,21 @@ export const makeNet = (
* @since 1.0.0
* @category constructors
*/
export const fromNetSocket = (
open: Effect.Effect<Net.Socket, Socket.SocketError, Scope.Scope>
): Effect.Effect<Socket.Socket> =>
export const fromNetSocket = <RO>(
open: Effect.Effect<Net.Socket, Socket.SocketError, RO>
): Effect.Effect<Socket.Socket, never, Exclude<RO, Scope.Scope>> =>
Effect.gen(function*(_) {
const sendQueue = yield* _(Queue.unbounded<Uint8Array | string | Socket.CloseEvent | typeof EOF>())
const sendQueue = yield* _(Queue.dropping<Uint8Array | string | Socket.CloseEvent | typeof EOF>(
yield* FiberRef.get(Socket.currentSendQueueCapacity)
))
const openContext = yield* Effect.context<Exclude<RO, Scope.Scope>>()

const run = <R, E, _>(handler: (_: Uint8Array) => Effect.Effect<_, E, R>) =>
Effect.gen(function*(_) {
const conn = yield* _(open)
const scope = yield* Effect.scope
const conn = yield* open.pipe(
Effect.provide(Context.add(openContext, Scope.Scope, scope))
) as Effect.Effect<Net.Socket>
const fiberSet = yield* _(FiberSet.make<any, E | Socket.SocketError>())
const run = yield* _(
FiberSet.runtime(fiberSet)<R>(),
Expand All @@ -97,6 +104,7 @@ export const fromNetSocket = (
resume(error ? Effect.fail(new Socket.SocketGenericError({ reason: "Write", error })) : Effect.void)
})
}
return Effect.void
})
),
Effect.forever,
Expand All @@ -108,13 +116,13 @@ export const fromNetSocket = (
})
yield* _(
Effect.async<void, Socket.SocketError, never>((resume) => {
conn.on("end", () => {
function onEnd() {
resume(Effect.void)
})
conn.on("error", (error) => {
}
function onError(error: Error) {
resume(Effect.fail(new Socket.SocketGenericError({ reason: "Read", error })))
})
conn.on("close", (hadError) => {
}
function onClose(hadError: boolean) {
resume(
Effect.fail(
new Socket.SocketCloseError({
Expand All @@ -123,6 +131,14 @@ export const fromNetSocket = (
})
)
)
}
conn.on("end", onEnd)
conn.on("error", onError)
conn.on("close", onClose)
return Effect.sync(() => {
conn.off("end", onEnd)
conn.off("error", onError)
conn.off("close", onClose)
})
}),
Effect.raceFirst(FiberSet.join(fiberSet))
Expand Down Expand Up @@ -169,7 +185,7 @@ export const makeNetChannel = <IE = never>(
* @category layers
*/
export const layerNet = (options: Net.NetConnectOpts): Layer.Layer<Socket.Socket, Socket.SocketError> =>
Layer.scoped(
Layer.effect(
Socket.Socket,
makeNet(options)
)
63 changes: 46 additions & 17 deletions packages/platform/src/Socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import * as Context from "effect/Context"
import * as Deferred from "effect/Deferred"
import type { DurationInput } from "effect/Duration"
import * as Effect from "effect/Effect"
import * as ExecutionStrategy from "effect/ExecutionStrategy"
import * as Exit from "effect/Exit"
import * as FiberRef from "effect/FiberRef"
import * as FiberSet from "effect/FiberSet"
import { globalValue } from "effect/GlobalValue"
import * as Layer from "effect/Layer"
import * as Predicate from "effect/Predicate"
import * as Queue from "effect/Queue"
Expand Down Expand Up @@ -49,7 +52,11 @@ export interface Socket {
readonly runRaw: <R, E, _>(
handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>
) => Effect.Effect<void, SocketError | E, R>
readonly writer: Effect.Effect<(chunk: Uint8Array | string | CloseEvent) => Effect.Effect<void>, never, Scope.Scope>
readonly writer: Effect.Effect<
(chunk: Uint8Array | string | CloseEvent) => Effect.Effect<boolean>,
never,
Scope.Scope
>
}

/**
Expand Down Expand Up @@ -173,9 +180,10 @@ export const toChannel = <IE>(
void,
unknown
> =>
Channel.unwrap(
Channel.unwrapScoped(
Effect.gen(function*(_) {
const writeScope = yield* _(Scope.make())
const parentScope = yield* Effect.scope
const writeScope = yield* _(Scope.fork(parentScope, ExecutionStrategy.sequential))
const write = yield* _(Scope.extend(self.writer, writeScope))
const exitQueue = yield* _(Queue.unbounded<Exit.Exit<Chunk.Chunk<Uint8Array>, SocketError | IE>>())

Expand Down Expand Up @@ -299,7 +307,14 @@ export const makeWebSocket = (url: string | Effect.Effect<string>, options?: {
(typeof url === "string" ? Effect.succeed(url) : url).pipe(
Effect.flatMap((url) => Effect.map(WebSocketConstructor, (f) => f(url)))
),
(ws) => Effect.sync(() => ws.close())
(ws) =>
Effect.sync(() => {
ws.onclose = null
ws.onerror = null
ws.onmessage = null
ws.onopen = null
return ws.close()
})
),
options
)
Expand All @@ -317,16 +332,17 @@ export const fromWebSocket = <R>(
): Effect.Effect<Socket, never, Exclude<R, Scope.Scope>> =>
Effect.gen(function*(_) {
const closeCodeIsError = options?.closeCodeIsError ?? defaultCloseCodeIsError
const sendQueue = yield* Queue.unbounded<Uint8Array | string | CloseEvent>()
const sendQueue = yield* _(Queue.dropping<Uint8Array | string | CloseEvent>(
yield* FiberRef.get(currentSendQueueCapacity)
))
const acquireContext = yield* Effect.context<Exclude<R, Scope.Scope>>()
const acquireWithContext = Effect.provide(acquire, acquireContext) as Effect.Effect<
globalThis.WebSocket,
SocketError
>

const runRaw = <R, E, _>(handler: (_: string | Uint8Array) => Effect.Effect<_, E, R>) =>
Effect.gen(function*(_) {
const ws = yield* acquireWithContext
const scope = yield* Effect.scope
const ws = yield* (acquire.pipe(
Effect.provide(Context.add(acquireContext, Scope.Scope, scope))
) as Effect.Effect<globalThis.WebSocket>)
const fiberSet = yield* _(FiberSet.make<any, E | SocketError>())
const run = yield* _(
FiberSet.runtime(fiberSet)<R>(),
Expand Down Expand Up @@ -363,12 +379,16 @@ export const fromWebSocket = <R>(
}

if (ws.readyState !== 1) {
yield* _(
Effect.async<void, SocketError, never>((resume) => {
ws.onopen = () => {
resume(Effect.void)
}
}),
yield* Effect.async<void, SocketError, never>((resume) => {
function onOpen() {
ws.removeEventListener("open", onOpen)
resume(Effect.void)
}
ws.addEventListener("open", onOpen)
return Effect.sync(() => {
ws.removeEventListener("open", onOpen)
})
}).pipe(
Effect.timeoutFail({
duration: options?.openTimeout ?? 10000,
onTimeout: () => new SocketGenericError({ reason: "OpenTimeout", error: "timeout waiting for \"open\"" })
Expand Down Expand Up @@ -460,7 +480,16 @@ export const makeWebSocketChannel = <IE = never>(
export const layerWebSocket = (url: string, options?: {
readonly closeCodeIsError?: (code: number) => boolean
}): Layer.Layer<Socket, never, WebSocketConstructor> =>
Layer.scoped(
Layer.effect(
Socket,
makeWebSocket(url, options)
)

/**
* @since 1.0.0
* @category fiber refs
*/
export const currentSendQueueCapacity: FiberRef.FiberRef<number> = globalValue(
"@effect/platform/Socket/currentSendQueueCapacity",
() => FiberRef.unsafeMake(16)
)
10 changes: 3 additions & 7 deletions packages/platform/src/internal/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ export const makeManager = Effect.gen(function*() {
queue,
transfers = (_) => []
}: Worker.Worker.Options<I>) {
return Effect.gen(function*() {
const spawn = yield* Spawner
return Effect.gen(function*(_) {
const spawn = yield* _(Spawner)
const id = idCounter++
let requestIdCounter = 0
const semaphore = yield* Effect.makeSemaphore(permits)
Expand Down Expand Up @@ -125,11 +125,7 @@ export const makeManager = Effect.gen(function*() {
Effect.onError((cause) =>
Effect.forEach(requestMap.values(), ([queue]) => Queue.offer(queue, Exit.failCause(cause)))
),
Effect.retry(
Schedule.exponential("250 millis").pipe(
Schedule.union(Schedule.spaced("30 seconds"))
)
),
Effect.retry(Schedule.spaced(1000)),
Effect.annotateLogs({
package: "@effect/platform",
module: "Worker"
Expand Down

0 comments on commit 6ac4847

Please sign in to comment.