Skip to content

Commit

Permalink
add listen / notify to PgClient (#3398)
Browse files Browse the repository at this point in the history
  • Loading branch information
sukovanej authored Aug 2, 2024
1 parent 3f8e3d4 commit 45dbb9f
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 3 deletions.
5 changes: 5 additions & 0 deletions .changeset/dull-dryers-double.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@effect/sql-pg": patch
---

Add `listen` / `notify` to `PgClient`.
5 changes: 5 additions & 0 deletions .changeset/tricky-apes-stare.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": patch
---

Fix `Stream.asyncPush` type signature - allow the `register` effect to fail.
2 changes: 1 addition & 1 deletion packages/effect/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ export const asyncEffect: <A, E = never, R = never>(
* @category constructors
*/
export const asyncPush: <A, E = never, R = never>(
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, E, R | Scope.Scope>,
options?: { readonly bufferSize: "unbounded" } | {
readonly bufferSize?: number | undefined
readonly strategy?: "dropping" | "sliding" | undefined
Expand Down
2 changes: 1 addition & 1 deletion packages/effect/src/internal/stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ const queueFromBufferOptionsPush = <A, E>(

/** @internal */
export const asyncPush = <A, E = never, R = never>(
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, never, R | Scope.Scope>,
register: (emit: Emit.EmitOpsPush<E, A>) => Effect.Effect<unknown, E, R | Scope.Scope>,
options?: {
readonly bufferSize: "unbounded"
} | {
Expand Down
30 changes: 30 additions & 0 deletions packages/sql-pg/examples/listen-notify.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { PgClient } from "@effect/sql-pg"
import { Config, Console, Effect, Stream } from "effect"

const program = Effect.gen(function*() {
const sql = yield* PgClient.PgClient

// start listening for notifications on the channel
yield* sql.listen("channel_name").pipe(
Stream.tap((message) => Console.log("Received message", message)),
Stream.runDrain,
Effect.forkScoped
)

// send 5 notifications to the channel
yield* sql.notify("channel_name", "Hello, world!").pipe(
Effect.tap(() => Effect.sleep("1 second")),
Effect.replicateEffect(5)
)
}).pipe(Effect.scoped)

const PgLive = PgClient.layer({
database: Config.succeed("postgres"),
username: Config.succeed("postgres")
})

program.pipe(
Effect.provide(PgLive),
Effect.tapErrorCause(Effect.logError),
Effect.runFork
)
19 changes: 18 additions & 1 deletion packages/sql-pg/src/PgClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ export interface PgClient extends Client.SqlClient {
readonly config: PgClientConfig
readonly json: (_: unknown) => Fragment
readonly array: (_: ReadonlyArray<Primitive>) => Fragment
readonly listen: (channel: string) => Stream.Stream<string, SqlError>
readonly notify: (channel: string, payload: string) => Effect.Effect<void, SqlError>
}

/**
Expand Down Expand Up @@ -249,7 +251,22 @@ export const make = (
database: client.options.database
},
json: (_: unknown) => PgJson(_),
array: (_: ReadonlyArray<Primitive>) => PgArray(_)
array: (_: ReadonlyArray<Primitive>) => PgArray(_),
listen: (channel: string) =>
Stream.asyncPush<string, SqlError>((emit) =>
Effect.acquireRelease(
Effect.tryPromise({
try: () => client.listen(channel, (payload) => emit.single(payload)),
catch: (cause) => new SqlError({ cause, message: "Failed to listen" })
}),
({ unlisten }) => Effect.promise(() => unlisten())
)
),
notify: (channel: string, payload: string) =>
Effect.tryPromise({
try: () => client.notify(channel, payload),
catch: (cause) => new SqlError({ cause, message: "Failed to notify" })
})
}
)
})
Expand Down

0 comments on commit 45dbb9f

Please sign in to comment.