diff --git a/.changeset/healthy-radios-cough.md b/.changeset/healthy-radios-cough.md new file mode 100644 index 0000000000..3618f698e5 --- /dev/null +++ b/.changeset/healthy-radios-cough.md @@ -0,0 +1,6 @@ +--- +"@effect/rpc-http": minor +"@effect/rpc": minor +--- + +add Rpc prefix to rpc modules diff --git a/.changeset/two-sloths-yell.md b/.changeset/two-sloths-yell.md new file mode 100644 index 0000000000..dd779d95da --- /dev/null +++ b/.changeset/two-sloths-yell.md @@ -0,0 +1,5 @@ +--- +"@effect/rpc": patch +--- + +support backpressure for Rpc streams diff --git a/packages/cluster-browser/src/RpcBroadcastChannel.ts b/packages/cluster-browser/src/RpcBroadcastChannel.ts index 6ae42016e0..31b89f091a 100644 --- a/packages/cluster-browser/src/RpcBroadcastChannel.ts +++ b/packages/cluster-browser/src/RpcBroadcastChannel.ts @@ -1,9 +1,9 @@ /** * @since 1.0.0 */ -import * as Resolver from "@effect/rpc/Resolver" -import * as Router from "@effect/rpc/Router" import type * as Rpc from "@effect/rpc/Rpc" +import * as RpcResolver from "@effect/rpc/RpcResolver" +import * as RpcRouter from "@effect/rpc/RpcRouter" import * as Schema from "@effect/schema/Schema" import type * as Serializable from "@effect/schema/Serializable" import * as Effect from "effect/Effect" @@ -27,8 +27,8 @@ const BroadcastMessage = Schema.Union(ClientRequest, ServerResponse) /** * @since 1.0.0 */ -export const toBroadcastChannelRouter = >(self: R, channelId: string) => { - const handler = Router.toHandlerEffect(self) +export const toBroadcastChannelRouter = >(self: R, channelId: string) => { + const handler = RpcRouter.toHandlerNoStream(self) return Effect.gen(function*($) { const queue = yield* $(Queue.unbounded()) @@ -69,13 +69,13 @@ export const toBroadcastChannelRouter = >(self /** * @since 1.0.0 */ -export const make = >( +export const make = >( channelId: string ): RequestResolver.RequestResolver< - Rpc.Request>, - Serializable.SerializableWithResult.Context> -> => { - return Resolver.make((requests) => { + Rpc.Request>, + Serializable.SerializableWithResult.Context> +> => + RpcResolver.make((requests) => { return Effect.gen(function*($) { const queue = yield* $(Queue.unbounded()) yield* $(Effect.addFinalizer(() => Queue.shutdown(queue))) @@ -109,4 +109,3 @@ export const make = >( ) }).pipe(Effect.scoped) })() -} diff --git a/packages/cluster-node/examples/sample-connect.ts b/packages/cluster-node/examples/sample-connect.ts index 724a9d20f4..d030c84954 100644 --- a/packages/cluster-node/examples/sample-connect.ts +++ b/packages/cluster-node/examples/sample-connect.ts @@ -7,8 +7,8 @@ import * as Sharding from "@effect/cluster/Sharding" import * as ShardingConfig from "@effect/cluster/ShardingConfig" import { HttpClient, HttpClientRequest } from "@effect/platform" import { NodeHttpClient, NodeRuntime } from "@effect/platform-node" -import { Resolver } from "@effect/rpc" -import { HttpResolver } from "@effect/rpc-http" +import { RpcResolver } from "@effect/rpc" +import { HttpRpcResolver } from "@effect/rpc-http" import { Effect, Layer, Logger, LogLevel, Ref } from "effect" import { CounterEntity, GetCurrent, Increment } from "./sample-common.js" @@ -31,23 +31,23 @@ const liveLayer = Effect.gen(function*() { Layer.provide(Sharding.live), Layer.provide(StorageFile.storageFile), Layer.provide(PodsRpc.podsRpc((podAddress) => - HttpResolver.make( + HttpRpcResolver.make( HttpClient.fetchOk.pipe( HttpClient.mapRequest( HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`) ) ) - ).pipe(Resolver.toClient) + ).pipe(RpcResolver.toClient) )), Layer.provide(ShardManagerClientRpc.shardManagerClientRpc( (shardManagerUri) => - HttpResolver.make( + HttpRpcResolver.make( HttpClient.fetchOk.pipe( HttpClient.mapRequest( HttpClientRequest.prependUrl(shardManagerUri) ) ) - ).pipe(Resolver.toClient) + ).pipe(RpcResolver.toClient) )), Layer.provide(ShardingConfig.withDefaults({ shardingPort: 54322 })), Layer.provide(Serialization.json), diff --git a/packages/cluster-node/examples/sample-manager.ts b/packages/cluster-node/examples/sample-manager.ts index d349bd2aab..01321024b0 100644 --- a/packages/cluster-node/examples/sample-manager.ts +++ b/packages/cluster-node/examples/sample-manager.ts @@ -7,8 +7,8 @@ import * as PodsHealth from "@effect/cluster/PodsHealth" import * as ShardManager from "@effect/cluster/ShardManager" import { HttpClient, HttpClientRequest, HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform" import { NodeHttpServer, NodeRuntime } from "@effect/platform-node" -import { Resolver } from "@effect/rpc" -import { HttpResolver, HttpRouter as RpcHttpRouter } from "@effect/rpc-http" +import { RpcResolver } from "@effect/rpc" +import { HttpRpcResolver, HttpRpcRouter } from "@effect/rpc-http" import { Context, Effect, Layer, Logger, LogLevel } from "effect" import { createServer } from "node:http" @@ -16,7 +16,7 @@ const HttpLive = Layer.flatMap( Layer.effect(ManagerConfig.ManagerConfig, ManagerConfig.ManagerConfig), (config) => HttpRouter.empty.pipe( - HttpRouter.post("/api/rest", RpcHttpRouter.toHttpApp(ShardManagerServiceRpc.router)), + HttpRouter.post("/api/rest", HttpRpcRouter.toHttpApp(ShardManagerServiceRpc.router)), HttpServer.serve(HttpMiddleware.logger), HttpServer.withLogAddress, Layer.provide( @@ -35,13 +35,13 @@ const liveShardingManager = Effect.never.pipe( Layer.provide(StorageFile.storageFile), Layer.provide(PodsHealth.local), Layer.provide(PodsRpc.podsRpc((podAddress) => - HttpResolver.make( + HttpRpcResolver.make( HttpClient.fetchOk.pipe( HttpClient.mapRequest( HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`) ) ) - ).pipe(Resolver.toClient) + ).pipe(RpcResolver.toClient) )), Layer.provide(ManagerConfig.fromConfig), Layer.provide(HttpClient.layer) diff --git a/packages/cluster-node/examples/sample-shard.ts b/packages/cluster-node/examples/sample-shard.ts index 0b73fa6d0a..85854704a4 100644 --- a/packages/cluster-node/examples/sample-shard.ts +++ b/packages/cluster-node/examples/sample-shard.ts @@ -9,8 +9,8 @@ import * as Sharding from "@effect/cluster/Sharding" import * as ShardingConfig from "@effect/cluster/ShardingConfig" import { HttpClient, HttpClientRequest, HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform" import { NodeHttpClient, NodeHttpServer, NodeRuntime } from "@effect/platform-node" -import { Resolver } from "@effect/rpc" -import { HttpResolver, HttpRouter as RpcHttpRouter } from "@effect/rpc-http" +import { RpcResolver } from "@effect/rpc" +import { HttpRpcResolver, HttpRpcRouter } from "@effect/rpc-http" import { Context, Effect, Exit, Layer, Logger, LogLevel, Ref } from "effect" import { createServer } from "node:http" import { CounterEntity } from "./sample-common.js" @@ -19,7 +19,7 @@ const HttpLive = Layer.flatMap( Layer.effect(ShardingConfig.ShardingConfig, ShardingConfig.ShardingConfig), (config) => HttpRouter.empty.pipe( - HttpRouter.post("/api/rest", RpcHttpRouter.toHttpApp(ShardingServiceRpc.router)), + HttpRouter.post("/api/rest", HttpRpcRouter.toHttpApp(ShardingServiceRpc.router)), HttpServer.serve(HttpMiddleware.logger), HttpServer.withLogAddress, Layer.provide( @@ -63,23 +63,23 @@ const liveLayer = Sharding.registerEntity( Layer.provideMerge(Sharding.live), Layer.provide(StorageFile.storageFile), Layer.provide(PodsRpc.podsRpc((podAddress) => - HttpResolver.make( + HttpRpcResolver.make( HttpClient.fetchOk.pipe( HttpClient.mapRequest( HttpClientRequest.prependUrl(`http://${podAddress.host}:${podAddress.port}/api/rest`) ) ) - ).pipe(Resolver.toClient) + ).pipe(RpcResolver.toClient) )), Layer.provide(ShardManagerClientRpc.shardManagerClientRpc( (shardManagerUri) => - HttpResolver.make( + HttpRpcResolver.make( HttpClient.fetchOk.pipe( HttpClient.mapRequest( HttpClientRequest.prependUrl(shardManagerUri) ) ) - ).pipe(Resolver.toClient) + ).pipe(RpcResolver.toClient) )), Layer.provide(Serialization.json), Layer.provide(NodeHttpClient.layerUndici), diff --git a/packages/cluster-node/src/PodsRpc.ts b/packages/cluster-node/src/PodsRpc.ts index bc4ee0e350..06fce51aee 100644 --- a/packages/cluster-node/src/PodsRpc.ts +++ b/packages/cluster-node/src/PodsRpc.ts @@ -6,8 +6,8 @@ import * as Pods from "@effect/cluster/Pods" import type * as SerializedEnvelope from "@effect/cluster/SerializedEnvelope" import type * as ShardId from "@effect/cluster/ShardId" import * as ShardingException from "@effect/cluster/ShardingException" -import type * as Resolver from "@effect/rpc/Resolver" import type * as Rpc from "@effect/rpc/Rpc" +import type * as RpcResolver from "@effect/rpc/RpcResolver" import * as Effect from "effect/Effect" import { pipe } from "effect/Function" import type * as HashSet from "effect/HashSet" @@ -25,7 +25,7 @@ import type * as ShardingServiceRpc from "./ShardingServiceRpc.js" export function podsRpc( buildClient: ( podAddress: PodAddress.PodAddress - ) => Resolver.Client< + ) => RpcResolver.Client< RequestResolver.RequestResolver, never> > ): Layer.Layer { diff --git a/packages/cluster-node/src/ShardManagerClientRpc.ts b/packages/cluster-node/src/ShardManagerClientRpc.ts index 2fa2523131..04fa03ae43 100644 --- a/packages/cluster-node/src/ShardManagerClientRpc.ts +++ b/packages/cluster-node/src/ShardManagerClientRpc.ts @@ -4,8 +4,8 @@ import * as Pod from "@effect/cluster/Pod" import * as ShardingConfig from "@effect/cluster/ShardingConfig" import * as ShardManagerClient from "@effect/cluster/ShardManagerClient" -import type * as Resolver from "@effect/rpc/Resolver" import type * as Rpc from "@effect/rpc/Rpc" +import type * as RpcResolver from "@effect/rpc/RpcResolver" import * as Effect from "effect/Effect" import * as Layer from "effect/Layer" import type * as RequestResolver from "effect/RequestResolver" @@ -17,7 +17,7 @@ import type * as ShardManagerServiceRpc from "./ShardManagerServiceRpc.js" * @category layer */ export function shardManagerClientRpc( - makeClient: (shardManagerUri: string) => Resolver.Client< + makeClient: (shardManagerUri: string) => RpcResolver.Client< RequestResolver.RequestResolver, never> > ) { diff --git a/packages/cluster-node/src/ShardManagerServiceRpc.ts b/packages/cluster-node/src/ShardManagerServiceRpc.ts index 68187778f4..d4e9aa9c41 100644 --- a/packages/cluster-node/src/ShardManagerServiceRpc.ts +++ b/packages/cluster-node/src/ShardManagerServiceRpc.ts @@ -2,8 +2,8 @@ * @since 1.0.0 */ import * as ShardManager from "@effect/cluster/ShardManager" -import * as Router from "@effect/rpc/Router" import * as Rpc from "@effect/rpc/Rpc" +import * as RpcRouter from "@effect/rpc/RpcRouter" import * as Effect from "effect/Effect" import { pipe } from "effect/Function" import * as ShardManagerProtocolHttp from "./ShardManagerProtocol.js" @@ -12,7 +12,7 @@ import * as ShardManagerProtocolHttp from "./ShardManagerProtocol.js" * @since 1.0.0 * @category rpc */ -export const router = Router.make( +export const router = RpcRouter.make( Rpc.effect(ShardManagerProtocolHttp.Register, (request) => pipe( ShardManager.ShardManager, @@ -45,4 +45,4 @@ export type ShardManagerServiceRpc = typeof router * @since 1.0.0 * @category models */ -export type ShardManagerServiceRpcRequest = Router.Router.Request +export type ShardManagerServiceRpcRequest = RpcRouter.RpcRouter.Request diff --git a/packages/cluster-node/src/ShardingServiceRpc.ts b/packages/cluster-node/src/ShardingServiceRpc.ts index 66361666e5..bd9d37e657 100644 --- a/packages/cluster-node/src/ShardingServiceRpc.ts +++ b/packages/cluster-node/src/ShardingServiceRpc.ts @@ -2,8 +2,8 @@ * @since 1.0.0 */ import * as Sharding from "@effect/cluster/Sharding" -import * as Router from "@effect/rpc/Router" import * as Rpc from "@effect/rpc/Rpc" +import * as RpcRouter from "@effect/rpc/RpcRouter" import * as Effect from "effect/Effect" import { pipe } from "effect/Function" import * as ShardingProtocol from "./ShardingProtocol.js" @@ -12,7 +12,7 @@ import * as ShardingProtocol from "./ShardingProtocol.js" * @since 1.0.0 * @category rpc */ -export const router = Router.make( +export const router = RpcRouter.make( Rpc.effect(ShardingProtocol.AssignShards, (request) => pipe( Sharding.Tag, @@ -41,4 +41,4 @@ export type ShardingServiceRpc = typeof router * @since 1.0.0 * @category models */ -export type ShardingServiceRpcRequest = Router.Router.Request +export type ShardingServiceRpcRequest = RpcRouter.RpcRouter.Request diff --git a/packages/rpc-http/examples/client.ts b/packages/rpc-http/examples/client.ts index cdd71a363f..f54966b862 100644 --- a/packages/rpc-http/examples/client.ts +++ b/packages/rpc-http/examples/client.ts @@ -1,16 +1,16 @@ import { HttpClient, HttpClientRequest } from "@effect/platform" -import { Resolver } from "@effect/rpc" -import { HttpResolver } from "@effect/rpc-http" +import { RpcResolver } from "@effect/rpc" +import { HttpRpcResolver } from "@effect/rpc-http" import { Console, Effect, Stream } from "effect" import type { UserRouter } from "./router.js" import { GetUser, GetUserIds } from "./schema.js" // Create the client -const client = HttpResolver.make( +const client = HttpRpcResolver.make( HttpClient.fetchOk.pipe( HttpClient.mapRequest(HttpClientRequest.prependUrl("http://localhost:3000/rpc")) ) -).pipe(Resolver.toClient) +).pipe(RpcResolver.toClient) // Use the client client(new GetUserIds()).pipe( diff --git a/packages/rpc-http/examples/router.ts b/packages/rpc-http/examples/router.ts index 87752561d2..625cbc70dd 100644 --- a/packages/rpc-http/examples/router.ts +++ b/packages/rpc-http/examples/router.ts @@ -1,13 +1,13 @@ import { HttpMiddleware, HttpRouter, HttpServer } from "@effect/platform" import { NodeHttpServer, NodeRuntime } from "@effect/platform-node" -import { Router, Rpc } from "@effect/rpc" -import { HttpRouter as RpcHttpRouter } from "@effect/rpc-http" +import { Rpc, RpcRouter } from "@effect/rpc" +import { HttpRpcRouter } from "@effect/rpc-http" import { Array, Effect, Layer, Stream } from "effect" import { createServer } from "http" import { GetUser, GetUserIds, User, UserId } from "./schema.js" // Implement the RPC server router -const router = Router.make( +const router = RpcRouter.make( Rpc.stream(GetUserIds, () => Stream.fromIterable(Array.makeBy(1000, UserId.make))), Rpc.effect(GetUser, ({ id }) => Effect.succeed(new User({ id, name: "John Doe" }))) ) @@ -16,7 +16,7 @@ export type UserRouter = typeof router // Create the http server const HttpLive = HttpRouter.empty.pipe( - HttpRouter.post("/rpc", RpcHttpRouter.toHttpApp(router)), + HttpRouter.post("/rpc", HttpRpcRouter.toHttpApp(router)), HttpServer.serve(HttpMiddleware.logger), HttpServer.withLogAddress, Layer.provide(NodeHttpServer.layer(createServer, { port: 3000 })) diff --git a/packages/rpc-http/src/HttpResolver.ts b/packages/rpc-http/src/HttpRpcResolver.ts similarity index 74% rename from packages/rpc-http/src/HttpResolver.ts rename to packages/rpc-http/src/HttpRpcResolver.ts index 0f66ded9a6..6d00d43b6b 100644 --- a/packages/rpc-http/src/HttpResolver.ts +++ b/packages/rpc-http/src/HttpRpcResolver.ts @@ -4,9 +4,9 @@ import * as Body from "@effect/platform/HttpBody" import * as Client from "@effect/platform/HttpClient" import * as ClientRequest from "@effect/platform/HttpClientRequest" -import * as Resolver from "@effect/rpc/Resolver" -import type * as Router from "@effect/rpc/Router" import type * as Rpc from "@effect/rpc/Rpc" +import * as Resolver from "@effect/rpc/RpcResolver" +import type * as Router from "@effect/rpc/RpcRouter" import type * as Serializable from "@effect/schema/Serializable" import * as Chunk from "effect/Chunk" import * as Effect from "effect/Effect" @@ -18,11 +18,11 @@ import * as Stream from "effect/Stream" * @category constructors * @since 1.0.0 */ -export const make = >( +export const make = >( client: Client.HttpClient.Default ): RequestResolver.RequestResolver< - Rpc.Request>, - Serializable.SerializableWithResult.Context> + Rpc.Request>, + Serializable.SerializableWithResult.Context> > => Resolver.make((requests) => client(ClientRequest.post("", { @@ -44,11 +44,11 @@ export const make = >( * @category constructors * @since 1.0.0 */ -export const makeClient = >( +export const makeClient = >( baseUrl: string -): Serializable.SerializableWithResult.Context> extends never ? Resolver.Client< +): Serializable.SerializableWithResult.Context> extends never ? Resolver.Client< RequestResolver.RequestResolver< - Rpc.Request> + Rpc.Request> > > : "HttpResolver.makeClient: request context is not `never`" => diff --git a/packages/rpc-http/src/HttpResolverNoStream.ts b/packages/rpc-http/src/HttpRpcResolverNoStream.ts similarity index 68% rename from packages/rpc-http/src/HttpResolverNoStream.ts rename to packages/rpc-http/src/HttpRpcResolverNoStream.ts index d835589119..4668adce19 100644 --- a/packages/rpc-http/src/HttpResolverNoStream.ts +++ b/packages/rpc-http/src/HttpRpcResolverNoStream.ts @@ -4,10 +4,10 @@ import * as Body from "@effect/platform/HttpBody" import * as Client from "@effect/platform/HttpClient" import * as ClientRequest from "@effect/platform/HttpClientRequest" -import * as Resolver from "@effect/rpc/Resolver" -import * as ResolverNoStream from "@effect/rpc/ResolverNoStream" -import type * as Router from "@effect/rpc/Router" import type * as Rpc from "@effect/rpc/Rpc" +import * as Resolver from "@effect/rpc/RpcResolver" +import * as ResolverNoStream from "@effect/rpc/RpcResolverNoStream" +import type * as Router from "@effect/rpc/RpcRouter" import type * as Serializable from "@effect/schema/Serializable" import * as Effect from "effect/Effect" import type * as RequestResolver from "effect/RequestResolver" @@ -17,11 +17,11 @@ import * as Schedule from "effect/Schedule" * @category constructors * @since 1.0.0 */ -export const make = >( +export const make = >( client: Client.HttpClient.Default ): RequestResolver.RequestResolver< - Rpc.Request>, - Serializable.SerializableWithResult.Context> + Rpc.Request>, + Serializable.SerializableWithResult.Context> > => ResolverNoStream.make((requests) => client(ClientRequest.post("", { @@ -36,11 +36,11 @@ export const make = >( * @category constructors * @since 1.0.0 */ -export const makeClient = >( +export const makeClient = >( baseUrl: string -): Serializable.SerializableWithResult.Context> extends never ? Resolver.Client< +): Serializable.SerializableWithResult.Context> extends never ? Resolver.Client< RequestResolver.RequestResolver< - Rpc.Request> + Rpc.Request> > > : "HttpResolver.makeClientEffect: request context is not `never`" => diff --git a/packages/rpc-http/src/HttpRouter.ts b/packages/rpc-http/src/HttpRpcRouter.ts similarity index 87% rename from packages/rpc-http/src/HttpRouter.ts rename to packages/rpc-http/src/HttpRpcRouter.ts index 0232813fc1..e1ea1b5dc5 100644 --- a/packages/rpc-http/src/HttpRouter.ts +++ b/packages/rpc-http/src/HttpRpcRouter.ts @@ -5,7 +5,7 @@ import type * as App from "@effect/platform/HttpApp" import type * as ServerError from "@effect/platform/HttpServerError" import * as ServerRequest from "@effect/platform/HttpServerRequest" import * as ServerResponse from "@effect/platform/HttpServerResponse" -import * as Router from "@effect/rpc/Router" +import * as Router from "@effect/rpc/RpcRouter" import * as Chunk from "effect/Chunk" import * as Context from "effect/Context" import * as Effect from "effect/Effect" @@ -16,9 +16,9 @@ import * as Stream from "effect/Stream" * @since 1.0.0 * @category conversions */ -export const toHttpApp = >(self: R): App.Default< +export const toHttpApp = >(self: R): App.Default< ServerError.RequestError, - Router.Router.Context + Router.RpcRouter.Context > => { const handler = Router.toHandler(self) return Effect.withFiberRuntime((fiber) => { diff --git a/packages/rpc-http/src/HttpRouterNoStream.ts b/packages/rpc-http/src/HttpRpcRouterNoStream.ts similarity index 75% rename from packages/rpc-http/src/HttpRouterNoStream.ts rename to packages/rpc-http/src/HttpRpcRouterNoStream.ts index d506b057b7..c9da867ca7 100644 --- a/packages/rpc-http/src/HttpRouterNoStream.ts +++ b/packages/rpc-http/src/HttpRpcRouterNoStream.ts @@ -5,7 +5,7 @@ import type * as App from "@effect/platform/HttpApp" import type * as ServerError from "@effect/platform/HttpServerError" import * as ServerRequest from "@effect/platform/HttpServerRequest" import * as ServerResponse from "@effect/platform/HttpServerResponse" -import * as Router from "@effect/rpc/Router" +import * as Router from "@effect/rpc/RpcRouter" import type { ParseError } from "@effect/schema/ParseResult" import * as Effect from "effect/Effect" @@ -13,11 +13,11 @@ import * as Effect from "effect/Effect" * @since 1.0.0 * @category conversions */ -export const toHttpApp = >(self: R): App.Default< +export const toHttpApp = >(self: R): App.Default< ServerError.RequestError | ParseError, - Router.Router.Context + Router.RpcRouter.Context > => { - const handler = Router.toHandlerEffect(self) + const handler = Router.toHandlerNoStream(self) return ServerRequest.HttpServerRequest.pipe( Effect.flatMap((_) => _.json), Effect.flatMap(handler), diff --git a/packages/rpc-http/src/index.ts b/packages/rpc-http/src/index.ts index 74f8245a74..488ff5fa7f 100644 --- a/packages/rpc-http/src/index.ts +++ b/packages/rpc-http/src/index.ts @@ -1,19 +1,19 @@ /** * @since 1.0.0 */ -export * as HttpResolver from "./HttpResolver.js" +export * as HttpRpcResolver from "./HttpRpcResolver.js" /** * @since 1.0.0 */ -export * as HttpResolverNoStream from "./HttpResolverNoStream.js" +export * as HttpRpcResolverNoStream from "./HttpRpcResolverNoStream.js" /** * @since 1.0.0 */ -export * as HttpRouter from "./HttpRouter.js" +export * as HttpRpcRouter from "./HttpRpcRouter.js" /** * @since 1.0.0 */ -export * as HttpRouterNoStream from "./HttpRouterNoStream.js" +export * as HttpRpcRouterNoStream from "./HttpRpcRouterNoStream.js" diff --git a/packages/rpc/src/Resolver.ts b/packages/rpc/src/RpcResolver.ts similarity index 94% rename from packages/rpc/src/Resolver.ts rename to packages/rpc/src/RpcResolver.ts index 87c70e918c..3c38c8e6a5 100644 --- a/packages/rpc/src/Resolver.ts +++ b/packages/rpc/src/RpcResolver.ts @@ -14,8 +14,8 @@ import * as Request from "effect/Request" import * as RequestResolver from "effect/RequestResolver" import * as Stream from "effect/Stream" import { StreamRequestTypeId, withRequestTag } from "./internal/rpc.js" -import type * as Router from "./Router.js" import * as Rpc from "./Rpc.js" +import type * as Router from "./RpcRouter.js" /** * @since 1.0.0 @@ -24,9 +24,9 @@ import * as Rpc from "./Rpc.js" export const make = ( handler: (u: ReadonlyArray) => Stream.Stream ) => ->(): RequestResolver.RequestResolver< - Rpc.Request>, - Serializable.SerializableWithResult.Context> | HR +>(): RequestResolver.RequestResolver< + Rpc.Request>, + Serializable.SerializableWithResult.Context> | HR > => { const getDecode = withRequestTag((req) => Schema.decodeUnknown(Serializable.exitSchema(req))) const getDecodeChunk = withRequestTag((req) => Schema.decodeUnknown(Schema.Chunk(Serializable.exitSchema(req)))) @@ -48,7 +48,7 @@ export const make = ( Stream.runForEach( Stream.filter( handler(payload), - (_): _ is Router.Router.Response => Arr.isArray(_) && _.length === 2 + (_): _ is Router.RpcRouter.Response => Arr.isArray(_) && _.length === 2 ), ([index, response]): Effect.Effect => { const request = effectRequests[index] @@ -78,7 +78,7 @@ export const make = ( Effect.map((payload) => pipe( handler([payload]), - Stream.mapEffect((_) => Effect.orDie(decode((_ as Router.Router.Response)[1]))), + Stream.mapEffect((_) => Effect.orDie(decode((_ as Router.RpcRouter.Response)[1]))), Stream.flattenChunks, Stream.flatMap(Exit.match({ onFailure: (cause) => Cause.isEmptyType(cause) ? Stream.empty : Stream.failCause(cause), diff --git a/packages/rpc/src/ResolverNoStream.ts b/packages/rpc/src/RpcResolverNoStream.ts similarity index 93% rename from packages/rpc/src/ResolverNoStream.ts rename to packages/rpc/src/RpcResolverNoStream.ts index 21d4d2e760..286d6cf131 100644 --- a/packages/rpc/src/ResolverNoStream.ts +++ b/packages/rpc/src/RpcResolverNoStream.ts @@ -13,8 +13,8 @@ import * as Request from "effect/Request" import * as RequestResolver from "effect/RequestResolver" import * as Stream from "effect/Stream" import { StreamRequestTypeId, withRequestTag } from "./internal/rpc.js" -import type * as Router from "./Router.js" import type * as Rpc from "./Rpc.js" +import type * as Router from "./RpcRouter.js" /** * @since 1.0.0 @@ -23,9 +23,9 @@ import type * as Rpc from "./Rpc.js" export const make = ( handler: (u: ReadonlyArray) => Effect.Effect ) => ->(): RequestResolver.RequestResolver< - Rpc.Request>, - Serializable.SerializableWithResult.Context> | HR +>(): RequestResolver.RequestResolver< + Rpc.Request>, + Serializable.SerializableWithResult.Context> | HR > => { const getDecode = withRequestTag((req) => Schema.decodeUnknown(Serializable.exitSchema(req))) const getDecodeChunk = withRequestTag((req) => Schema.decodeUnknown(Schema.Chunk(Serializable.exitSchema(req)))) diff --git a/packages/rpc/src/Router.ts b/packages/rpc/src/RpcRouter.ts similarity index 83% rename from packages/rpc/src/Router.ts rename to packages/rpc/src/RpcRouter.ts index 691716ba49..c3bf8aae33 100644 --- a/packages/rpc/src/Router.ts +++ b/packages/rpc/src/RpcRouter.ts @@ -22,7 +22,7 @@ import * as Rpc from "./Rpc.js" * @since 1.0.0 * @category type ids */ -export const TypeId: unique symbol = Symbol.for("@effect/rpc/Router") +export const TypeId: unique symbol = Symbol.for("@effect/rpc/RpcRouter") /** * @since 1.0.0 @@ -34,13 +34,13 @@ export type TypeId = typeof TypeId * @since 1.0.0 * @category refinements */ -export const isRouter = (u: unknown): u is Router => Predicate.hasProperty(u, TypeId) +export const isRpcRouter = (u: unknown): u is RpcRouter => Predicate.hasProperty(u, TypeId) /** * @since 1.0.0 * @category models */ -export interface Router extends Pipeable { +export interface RpcRouter extends Pipeable { readonly [TypeId]: TypeId readonly rpcs: ReadonlySet> } @@ -49,12 +49,12 @@ export interface Router extends Pipeab * @since 1.0.0 * @category models */ -export declare namespace Router { +export declare namespace RpcRouter { /** * @since 1.0.0 * @category models */ - export type Context> = A extends Router + export type Context> = A extends RpcRouter ? R | Serializable.SerializableWithResult.Context : never @@ -62,7 +62,7 @@ export declare namespace Router { * @since 1.0.0 * @category models */ - export type ContextRaw> = A extends Router + export type ContextRaw> = A extends RpcRouter ? R | Serializable.Serializable.Context : never @@ -70,7 +70,7 @@ export declare namespace Router { * @since 1.0.0 * @category models */ - export type Request> = A extends Router ? Req + export type Request> = A extends RpcRouter ? Req : never /** @@ -93,7 +93,7 @@ export declare namespace Router { const fromSet = ( rpcs: ReadonlySet> -): Router => ({ +): RpcRouter => ({ [TypeId]: TypeId, rpcs, pipe() { @@ -105,25 +105,25 @@ const fromSet = ( * @since 1.0.0 * @category constructors */ -export const make = | Router>>( +export const make = | RpcRouter>>( ...rpcs: Rpcs -): Router< +): RpcRouter< | Rpc.Rpc.Request< Extract > - | Router.Request< + | RpcRouter.Request< Extract >, | Rpc.Rpc.Context< Extract > - | Router.Context< + | RpcRouter.Context< Extract > > => { const rpcSet = new Set>() rpcs.forEach((rpc) => { - if (isRouter(rpc)) { + if (isRpcRouter(rpc)) { rpc.rpcs.forEach((rpc) => rpcSet.add(rpc)) } else { rpcSet.add(rpc) @@ -140,17 +140,17 @@ export const provideServiceEffect: { ( tag: Context.Tag, effect: Effect.Effect - ): (self: Router) => Router | R2> + ): (self: RpcRouter) => RpcRouter | R2> ( - self: Router, + self: RpcRouter, tag: Context.Tag, effect: Effect.Effect - ): Router | R2> + ): RpcRouter | R2> } = dual(3, ( - self: Router, + self: RpcRouter, tag: Context.Tag, effect: Effect.Effect -): Router | R2> => fromSet(new Set([...self.rpcs].map(Rpc.provideServiceEffect(tag, effect))))) +): RpcRouter | R2> => fromSet(new Set([...self.rpcs].map(Rpc.provideServiceEffect(tag, effect))))) /** * @since 1.0.0 @@ -160,17 +160,17 @@ export const provideService: { ( tag: Context.Tag, service: S - ): (self: Router) => Router> + ): (self: RpcRouter) => RpcRouter> ( - self: Router, + self: RpcRouter, tag: Context.Tag, service: S - ): Router> + ): RpcRouter> } = dual(3, ( - self: Router, + self: RpcRouter, tag: Context.Tag, service: S -): Router> => fromSet(new Set([...self.rpcs].map(Rpc.provideService(tag, service))))) +): RpcRouter> => fromSet(new Set([...self.rpcs].map(Rpc.provideService(tag, service))))) const EOF = Symbol.for("@effect/rpc/Router/EOF") @@ -197,7 +197,7 @@ const emptyExit = Schema.encodeSync(Schema.Exit({ * @since 1.0.0 * @category combinators */ -export const toHandler = >(router: R, options?: { +export const toHandler = >(router: R, options?: { readonly spanPrefix?: string }) => { const spanPrefix = options?.spanPrefix ?? "Rpc.router " @@ -216,10 +216,10 @@ export const toHandler = >(router: R, options?: { const getEncode = withRequestTag((req) => Schema.encode(Serializable.exitSchema(req))) const getEncodeChunk = withRequestTag((req) => Schema.encode(Schema.Chunk(Serializable.exitSchema(req)))) - return (u: unknown): Stream.Stream> => + return (u: unknown): Stream.Stream> => pipe( decode(u), - Effect.zip(Queue.unbounded()), + Effect.zip(Queue.bounded(4)), Effect.tap(([requests, queue]) => pipe( Effect.forEach(requests, (req, index) => { @@ -298,7 +298,7 @@ export const toHandler = >(router: R, options?: { * @since 1.0.0 * @category combinators */ -export const toHandlerEffect = >(router: R, options?: { +export const toHandlerNoStream = >(router: R, options?: { readonly spanPrefix?: string }) => { const spanPrefix = options?.spanPrefix ?? "Rpc.router " @@ -317,10 +317,10 @@ export const toHandlerEffect = >(router: R, options?: const getEncode = withRequestTag((req) => Schema.encode(Serializable.exitSchema(req))) const getEncodeChunk = withRequestTag((req) => Schema.encode(Schema.Chunk(Serializable.exitSchema(req)))) - return (u: unknown): Effect.Effect, ParseError, Router.Context> => + return (u: unknown): Effect.Effect, ParseError, RpcRouter.Context> => Effect.flatMap( decode(u), - Effect.forEach((req): Effect.Effect => { + Effect.forEach((req): Effect.Effect => { const [request, rpc] = req.request if (rpc._tag === "Effect") { const encode = getEncode(request) @@ -370,11 +370,11 @@ export const toHandlerEffect = >(router: R, options?: * @since 1.0.0 * @category combinators */ -export const toHandlerRaw = >(router: R) => { +export const toHandlerRaw = >(router: R) => { const schema: Schema.Schema< readonly [Schema.TaggedRequest.All, Rpc.Rpc], unknown, - Router.ContextRaw + RpcRouter.ContextRaw > = Schema.Union(...[...router.rpcs].map((rpc) => Schema.transform( Schema.typeSchema(rpc.schema), @@ -384,7 +384,7 @@ export const toHandlerRaw = >(router: R) => { )) const parse = Schema.decode(schema) - return >(request: Req): Rpc.Rpc.Result> => { + return >(request: Req): Rpc.Rpc.Result> => { const isStream = StreamRequestTypeId in request const withHandler = parse(request) if (isStream) { @@ -404,11 +404,11 @@ export const toHandlerRaw = >(router: R) => { * @since 1.0.0 * @category combinators */ -export const toHandlerUndecoded = >(router: R) => { +export const toHandlerUndecoded = >(router: R) => { const handler = toHandlerRaw(router) const getEncode = withRequestTag((req) => Schema.encode(Serializable.successSchema(req))) const getEncodeChunk = withRequestTag((req) => Schema.encode(Schema.ChunkFromSelf(Serializable.successSchema(req)))) - return >(request: Req): Rpc.Rpc.ResultUndecoded> => { + return >(request: Req): Rpc.Rpc.ResultUndecoded> => { const result = handler(request) if (Effect.isEffect(result)) { const encode = getEncode(request) diff --git a/packages/rpc/src/index.ts b/packages/rpc/src/index.ts index 6ddb6e579d..56a86163a9 100644 --- a/packages/rpc/src/index.ts +++ b/packages/rpc/src/index.ts @@ -1,19 +1,19 @@ /** * @since 1.0.0 */ -export * as Resolver from "./Resolver.js" +export * as Rpc from "./Rpc.js" /** * @since 1.0.0 */ -export * as ResolverNoStream from "./ResolverNoStream.js" +export * as RpcResolver from "./RpcResolver.js" /** * @since 1.0.0 */ -export * as Router from "./Router.js" +export * as RpcResolverNoStream from "./RpcResolverNoStream.js" /** * @since 1.0.0 */ -export * as Rpc from "./Rpc.js" +export * as RpcRouter from "./RpcRouter.js" diff --git a/packages/rpc/test/Router.test.ts b/packages/rpc/test/Router.test.ts index 6c479a8389..cb75b0c956 100644 --- a/packages/rpc/test/Router.test.ts +++ b/packages/rpc/test/Router.test.ts @@ -1,6 +1,4 @@ -import * as Resolver from "@effect/rpc/Resolver" -import * as ResolverNoStream from "@effect/rpc/ResolverNoStream" -import * as Router from "@effect/rpc/Router" +import { RpcResolver, RpcResolverNoStream, RpcRouter } from "@effect/rpc" import * as Rpc from "@effect/rpc/Rpc" import { Schema } from "@effect/schema" import * as S from "@effect/schema/Schema" @@ -34,7 +32,7 @@ class CreatePost extends S.TaggedRequest()("CreatePost", { } }) {} -const posts = Router.make( +const posts = RpcRouter.make( Rpc.effect(CreatePost, ({ body }) => Effect.succeed(new Post({ id: 1, body }))) ) @@ -102,7 +100,7 @@ class FailStream extends Rpc.StreamRequest()( { failure: SomeError, success: S.Number, payload: {} } ) {} -const router = Router.make( +const router = RpcRouter.make( posts, Rpc.effect(Greet, ({ name }) => Effect.succeed(`Hello, ${name}!`)), Rpc.effect(Fail, () => @@ -137,12 +135,12 @@ const router = Router.make( Stream.mapEffect((i) => i === 3 ? Effect.fail(new SomeError({ message: "fail" })) : Effect.succeed(i)) )) ).pipe( - Router.provideService(Name, "John") + RpcRouter.provideService(Name, "John") ) -const handler = Router.toHandler(router) -const handlerEffect = Router.toHandlerEffect(router) -const handlerUndecoded = Router.toHandlerUndecoded(router) +const handler = RpcRouter.toHandler(router) +const handlerEffect = RpcRouter.toHandlerNoStream(router) +const handlerUndecoded = RpcRouter.toHandlerUndecoded(router) const handlerArray = (u: ReadonlyArray) => handler(u.map((request, i) => ({ request, @@ -168,15 +166,15 @@ const handlerEffectArray = (u: ReadonlyArray) => }))).pipe( Effect.map(Array.filter((_): _ is S.ExitEncoded => Array.isArray(_) === false)) ) -const resolver = Resolver.make(handler)() -const resolverEffect = ResolverNoStream.make(handlerEffect)() -const resolverWithHeaders = Resolver.annotateHeadersEffect( +const resolver = RpcResolver.make(handler)() +const resolverEffect = RpcResolverNoStream.make(handlerEffect)() +const resolverWithHeaders = RpcResolver.annotateHeadersEffect( resolver, Effect.succeed({ BAZ: "qux" }) ) -const client = Resolver.toClient(resolver) +const client = RpcResolver.toClient(resolver) describe("Router", () => { it("handler/", async () => {