From 45dbb9ffeaf93d9e4df99d0cd4920e41ba9a3978 Mon Sep 17 00:00:00 2001 From: Milan Suk Date: Fri, 2 Aug 2024 12:06:43 +0200 Subject: [PATCH] add `listen` / `notify` to `PgClient` (#3398) --- .changeset/dull-dryers-double.md | 5 ++++ .changeset/tricky-apes-stare.md | 5 ++++ packages/effect/src/Stream.ts | 2 +- packages/effect/src/internal/stream.ts | 2 +- packages/sql-pg/examples/listen-notify.ts | 30 +++++++++++++++++++++++ packages/sql-pg/src/PgClient.ts | 19 +++++++++++++- 6 files changed, 60 insertions(+), 3 deletions(-) create mode 100644 .changeset/dull-dryers-double.md create mode 100644 .changeset/tricky-apes-stare.md create mode 100644 packages/sql-pg/examples/listen-notify.ts diff --git a/.changeset/dull-dryers-double.md b/.changeset/dull-dryers-double.md new file mode 100644 index 0000000000..0543f9d0fc --- /dev/null +++ b/.changeset/dull-dryers-double.md @@ -0,0 +1,5 @@ +--- +"@effect/sql-pg": patch +--- + +Add `listen` / `notify` to `PgClient`. diff --git a/.changeset/tricky-apes-stare.md b/.changeset/tricky-apes-stare.md new file mode 100644 index 0000000000..6ab2cf97f8 --- /dev/null +++ b/.changeset/tricky-apes-stare.md @@ -0,0 +1,5 @@ +--- +"effect": patch +--- + +Fix `Stream.asyncPush` type signature - allow the `register` effect to fail. diff --git a/packages/effect/src/Stream.ts b/packages/effect/src/Stream.ts index 8704158f82..e35d77f01b 100644 --- a/packages/effect/src/Stream.ts +++ b/packages/effect/src/Stream.ts @@ -406,7 +406,7 @@ export const asyncEffect: ( * @category constructors */ export const asyncPush: ( - register: (emit: Emit.EmitOpsPush) => Effect.Effect, + register: (emit: Emit.EmitOpsPush) => Effect.Effect, options?: { readonly bufferSize: "unbounded" } | { readonly bufferSize?: number | undefined readonly strategy?: "dropping" | "sliding" | undefined diff --git a/packages/effect/src/internal/stream.ts b/packages/effect/src/internal/stream.ts index fe02cea53f..9d28dab7f9 100644 --- a/packages/effect/src/internal/stream.ts +++ b/packages/effect/src/internal/stream.ts @@ -617,7 +617,7 @@ const queueFromBufferOptionsPush = ( /** @internal */ export const asyncPush = ( - register: (emit: Emit.EmitOpsPush) => Effect.Effect, + register: (emit: Emit.EmitOpsPush) => Effect.Effect, options?: { readonly bufferSize: "unbounded" } | { diff --git a/packages/sql-pg/examples/listen-notify.ts b/packages/sql-pg/examples/listen-notify.ts new file mode 100644 index 0000000000..6872074799 --- /dev/null +++ b/packages/sql-pg/examples/listen-notify.ts @@ -0,0 +1,30 @@ +import { PgClient } from "@effect/sql-pg" +import { Config, Console, Effect, Stream } from "effect" + +const program = Effect.gen(function*() { + const sql = yield* PgClient.PgClient + + // start listening for notifications on the channel + yield* sql.listen("channel_name").pipe( + Stream.tap((message) => Console.log("Received message", message)), + Stream.runDrain, + Effect.forkScoped + ) + + // send 5 notifications to the channel + yield* sql.notify("channel_name", "Hello, world!").pipe( + Effect.tap(() => Effect.sleep("1 second")), + Effect.replicateEffect(5) + ) +}).pipe(Effect.scoped) + +const PgLive = PgClient.layer({ + database: Config.succeed("postgres"), + username: Config.succeed("postgres") +}) + +program.pipe( + Effect.provide(PgLive), + Effect.tapErrorCause(Effect.logError), + Effect.runFork +) diff --git a/packages/sql-pg/src/PgClient.ts b/packages/sql-pg/src/PgClient.ts index a12a9d2466..097580d90e 100644 --- a/packages/sql-pg/src/PgClient.ts +++ b/packages/sql-pg/src/PgClient.ts @@ -41,6 +41,8 @@ export interface PgClient extends Client.SqlClient { readonly config: PgClientConfig readonly json: (_: unknown) => Fragment readonly array: (_: ReadonlyArray) => Fragment + readonly listen: (channel: string) => Stream.Stream + readonly notify: (channel: string, payload: string) => Effect.Effect } /** @@ -249,7 +251,22 @@ export const make = ( database: client.options.database }, json: (_: unknown) => PgJson(_), - array: (_: ReadonlyArray) => PgArray(_) + array: (_: ReadonlyArray) => PgArray(_), + listen: (channel: string) => + Stream.asyncPush((emit) => + Effect.acquireRelease( + Effect.tryPromise({ + try: () => client.listen(channel, (payload) => emit.single(payload)), + catch: (cause) => new SqlError({ cause, message: "Failed to listen" }) + }), + ({ unlisten }) => Effect.promise(() => unlisten()) + ) + ), + notify: (channel: string, payload: string) => + Effect.tryPromise({ + try: () => client.notify(channel, payload), + catch: (cause) => new SqlError({ cause, message: "Failed to notify" }) + }) } ) })