From b11823c74c3c1cea411fa3c58ffcb47862608631 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20M=C3=A9lois?= Date: Tue, 21 Feb 2023 10:21:48 +0100 Subject: [PATCH 1/6] Wrote server/client interpreters --- build.sc | 61 ++++++++++++- .../META-INF/smithy/jsonrpclib.smithy | 23 +++++ smithy/resources/META-INF/smithy/manifest | 1 + .../smithy4sinterop/ClientStub.scala | 86 +++++++++++++++++++ .../smithy4sinterop/EndpointSpec.scala | 15 ++++ .../smithy4sinterop/ServerEndpoints.scala | 50 +++++++++++ 6 files changed, 235 insertions(+), 1 deletion(-) create mode 100644 smithy/resources/META-INF/smithy/jsonrpclib.smithy create mode 100644 smithy/resources/META-INF/smithy/manifest create mode 100644 smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala create mode 100644 smithy4s/src/jsonrpclib/smithy4sinterop/EndpointSpec.scala create mode 100644 smithy4s/src/jsonrpclib/smithy4sinterop/ServerEndpoints.scala diff --git a/build.sc b/build.sc index 385e4d9..4fb977b 100644 --- a/build.sc +++ b/build.sc @@ -1,8 +1,10 @@ +import mill.define.Sources import mill.define.Target import mill.util.Jvm import $ivy.`com.lihaoyi::mill-contrib-bloop:$MILL_VERSION` import $ivy.`io.github.davidgregory084::mill-tpolecat::0.3.1` import $ivy.`io.chris-kipp::mill-ci-release::0.1.1` +import $ivy.`com.disneystreaming.smithy4s::smithy4s-mill-codegen-plugin::0.17.4` import os.Path import mill._ @@ -13,11 +15,12 @@ import scalanativelib._ import mill.scalajslib.api._ import io.github.davidgregory084._ import io.kipp.mill.ci.release.CiReleaseModule +import _root_.smithy4s.codegen.mill._ object versions { val scala212Version = "2.12.16" val scala213Version = "2.13.8" - val scala3Version = "3.1.2" + val scala3Version = "3.2.1" val scalaJSVersion = "1.10.1" val scalaNativeVersion = "0.4.8" val munitVersion = "0.7.29" @@ -83,6 +86,46 @@ object fs2 extends RPCCrossPlatformModule { cross => } +object smithy extends JavaModule {} + +object smithy4s extends RPCCrossPlatformModule { cross => + + override def crossPlatformModuleDeps = Seq(fs2) + def crossPlatformIvyDeps: T[Agg[Dep]] = Agg( + ivy"com.disneystreaming.smithy4s::smithy4s-json::${_root_.smithy4s.codegen.BuildInfo.version}" + ) + + // A module holding the code-generation logic to help cache that task + object gen extends Smithy4sModule { + def scalaVersion = "2.13.10" + def smithy4sInternalDependenciesAsJars = T { + smithy.jar() +: super.smithy4sInternalDependenciesAsJars() + } + } + + object jvm extends mill.Cross[JvmModule](scala213, scala3) + class JvmModule(cv: String) extends cross.JVM(cv) { + override def sources: Sources = T.sources { + super.sources() ++ gen.generatedSources() + } + } + + object js extends mill.Cross[JsModule](scala213, scala3) + class JsModule(cv: String) extends cross.JS(cv) { + override def sources: Sources = T.sources { + super.sources() ++ gen.generatedSources() + } + } + + object native extends mill.Cross[NativeModule](scala3) + class NativeModule(cv: String) extends cross.Native(cv) { + override def sources: Sources = T.sources { + super.sources() ++ gen.generatedSources() + } + } + +} + object examples extends mill.define.Module { object server extends ScalaModule { @@ -101,6 +144,22 @@ object examples extends mill.define.Module { } } + // object smithyServer extends ScalaModule { + // def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") + // def moduleDeps = Seq(fs2.jvm(versions.scala213), smithy4s.jvm(versions.scala213)) + // def scalaVersion = versions.scala213Version + // } + + // object smithyClient extends ScalaModule { + // def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") + // def moduleDeps = Seq(fs2.jvm(versions.scala213), smithy4s.jvm(versions.scala213)) + // def scalaVersion = versions.scala213Version + // def forkEnv: Target[Map[String, String]] = T { + // val assembledServer = smithyServer.assembly() + // super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString()) + // } + // } + } // ############################################################################# diff --git a/smithy/resources/META-INF/smithy/jsonrpclib.smithy b/smithy/resources/META-INF/smithy/jsonrpclib.smithy new file mode 100644 index 0000000..5d4e6f3 --- /dev/null +++ b/smithy/resources/META-INF/smithy/jsonrpclib.smithy @@ -0,0 +1,23 @@ +$version: "2.0" + +namespace jsonrpclib + +/// the JSON-RPC protocol, +/// see https://www.jsonrpc.org/specification +@protocolDefinition(traits: [ + jsonRequest + jsonNotification +]) +@trait(selector: "service") +structure jsonRPC { +} + +/// Identifies an operation that abides by request/response semantics +/// https://www.jsonrpc.org/specification#request_object +@trait(selector: "operation") +string jsonRequest + +/// Identifies an operation that abides by fire-and-forget semantics +/// see https://www.jsonrpc.org/specification#notification +@trait(selector: "operation") +string jsonNotification diff --git a/smithy/resources/META-INF/smithy/manifest b/smithy/resources/META-INF/smithy/manifest new file mode 100644 index 0000000..94839e2 --- /dev/null +++ b/smithy/resources/META-INF/smithy/manifest @@ -0,0 +1 @@ +jsonrpclib.smithy diff --git a/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala b/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala new file mode 100644 index 0000000..a800b3d --- /dev/null +++ b/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala @@ -0,0 +1,86 @@ +package jsonrpclib.smithy4sinterop + +import cats.MonadThrow +import jsonrpclib.fs2._ +import smithy4s.Service +import smithy4s.http.json.JCodec +import smithy4s.schema._ +import cats.effect.kernel.Async +import smithy4s.kinds.PolyFunction5 +import smithy4s.ShapeId +import cats.syntax.all._ + +object ClientStub { + + def apply[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit + F: Async[F] + ): F[service.Impl[F]] = new ClientStub(service, channel).compile + + def stream[Alg[_[_, _, _, _, _]], F[_]](service: Service[Alg], channel: FS2Channel[F])(implicit + F: Async[F] + ): fs2.Stream[F, service.Impl[F]] = fs2.Stream.eval(new ClientStub(service, channel).compile) +} + +private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], channel: FS2Channel[F])(implicit + F: Async[F] +) { + + def compile: F[service.Impl[F]] = precompileAll.map { stubCache => + val interpreter = new service.FunctorInterpreter[F] { + def apply[I, E, O, SI, SO](op: service.Operation[I, E, O, SI, SO]): F[O] = { + val (input, smithy4sEndpoint) = service.endpoint(op) + (stubCache(smithy4sEndpoint): F[I => F[O]]).flatMap { stub => + stub(input) + } + } + } + service.fromPolyFunction(interpreter) + } + + private type Stub[I, E, O, SI, SO] = F[I => F[O]] + private val precompileAll: F[PolyFunction5[service.Endpoint, Stub]] = { + F.ref(Map.empty[ShapeId, Any]).flatMap { cache => + service.endpoints + .traverse_ { ep => + val shapeId = ep.id + EndpointSpec.fromHints(ep.hints).liftTo[F](NotJsonRPCEndpoint(shapeId)).map { epSpec => + val stub = jsonRPCStub(ep, epSpec) + cache.update(_ + (shapeId -> stub)) + } + } + .as { + new PolyFunction5[service.Endpoint, Stub] { + def apply[I, E, O, SI, SO](ep: service.Endpoint[I, E, O, SI, SO]): Stub[I, E, O, SI, SO] = { + cache.get.map { _(ep.id).asInstanceOf[I => F[O]] } + } + } + } + } + } + + def jsonRPCStub[I, E, O, SI, SO]( + smithy4sEndpoint: service.Endpoint[I, E, O, SI, SO], + endpointSpec: EndpointSpec + ): I => F[O] = { + + implicit val inputCodec: JCodec[I] = JCodec.fromSchema(smithy4sEndpoint.input) + implicit val outputCodec: JCodec[O] = JCodec.fromSchema(smithy4sEndpoint.output) + + endpointSpec match { + case EndpointSpec.Notification(methodName) => + val coerce = coerceUnit[O](smithy4sEndpoint.output) + channel.notificationStub[I](methodName).andThen(_ *> coerce) + case EndpointSpec.Request(methodName) => + channel.simpleStub[I, O](methodName) + } + } + + case class NotJsonRPCEndpoint(shapeId: ShapeId) extends Throwable + case object NotUnitReturnType extends Throwable + private def coerceUnit[A](schema: Schema[A]): F[A] = + schema match { + case Schema.PrimitiveSchema(_, _, Primitive.PUnit) => MonadThrow[F].unit + case _ => MonadThrow[F].raiseError[A](NotUnitReturnType) + } + +} diff --git a/smithy4s/src/jsonrpclib/smithy4sinterop/EndpointSpec.scala b/smithy4s/src/jsonrpclib/smithy4sinterop/EndpointSpec.scala new file mode 100644 index 0000000..2e29930 --- /dev/null +++ b/smithy4s/src/jsonrpclib/smithy4sinterop/EndpointSpec.scala @@ -0,0 +1,15 @@ +package jsonrpclib.smithy4sinterop + +import smithy4s.Hints + +sealed trait EndpointSpec +object EndpointSpec { + case class Notification(methodName: String) extends EndpointSpec + case class Request(methodName: String) extends EndpointSpec + + def fromHints(hints: Hints): Option[EndpointSpec] = hints match { + case jsonrpclib.JsonRequest.hint(r) => Some(Request(r.value)) + case jsonrpclib.JsonNotification.hint(r) => Some(Notification(r.value)) + case _ => None + } +} diff --git a/smithy4s/src/jsonrpclib/smithy4sinterop/ServerEndpoints.scala b/smithy4s/src/jsonrpclib/smithy4sinterop/ServerEndpoints.scala new file mode 100644 index 0000000..3aa3c57 --- /dev/null +++ b/smithy4s/src/jsonrpclib/smithy4sinterop/ServerEndpoints.scala @@ -0,0 +1,50 @@ +package jsonrpclib.smithy4sinterop + +import _root_.smithy4s.{Endpoint => Smithy4sEndpoint} +import cats.MonadThrow +import cats.syntax.all._ +import jsonrpclib.Endpoint +import jsonrpclib.fs2._ +import smithy4s.Service +import smithy4s.http.json.JCodec +import smithy4s.kinds.FunctorAlgebra +import smithy4s.kinds.FunctorInterpreter + +object ServerEndpoints { + + def apply[Alg[_[_, _, _, _, _]], F[_]]( + impl: FunctorAlgebra[Alg, F] + )(implicit service: Service[Alg], F: MonadThrow[F]): List[Endpoint[F]] = { + val interpreter: service.FunctorInterpreter[F] = service.toPolyFunction(impl) + service.endpoints.flatMap { smithy4sEndpoint => + EndpointSpec.fromHints(smithy4sEndpoint.hints).map { endpointSpec => + jsonRPCEndpoint(smithy4sEndpoint, endpointSpec, interpreter) + } + } + + } + + // TODO : codify errors at smithy level and handle them. + def jsonRPCEndpoint[F[_]: MonadThrow, Op[_, _, _, _, _], I, E, O, SI, SO]( + smithy4sEndpoint: Smithy4sEndpoint[Op, I, E, O, SI, SO], + endpointSpec: EndpointSpec, + impl: FunctorInterpreter[Op, F] + ): Endpoint[F] = { + + implicit val inputCodec: JCodec[I] = JCodec.fromSchema(smithy4sEndpoint.input) + implicit val outputCodec: JCodec[O] = JCodec.fromSchema(smithy4sEndpoint.output) + + endpointSpec match { + case EndpointSpec.Notification(methodName) => + Endpoint[F](methodName).notification { (input: I) => + val op = smithy4sEndpoint.wrap(input) + impl(op).void + } + case EndpointSpec.Request(methodName) => + Endpoint[F](methodName).simple { (input: I) => + val op = smithy4sEndpoint.wrap(input) + impl(op) + } + } + } +} From 8b5795fe6e2ef7ea9dfcf1626de6d0efcf1a3f3e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20M=C3=A9lois?= Date: Tue, 21 Feb 2023 11:13:48 +0100 Subject: [PATCH 2/6] Example compiles --- build.sc | 56 +++++++-------- .../examples/smithy/client/ChildProcess.scala | 68 +++++++++++++++++++ .../examples/smithy/client/ClientMain.scala | 61 +++++++++++++++++ .../examples/smithy/server/ServerMain.scala | 41 +++++++++++ examples/smithyShared/smithy/spec.smithy | 45 ++++++++++++ 5 files changed, 243 insertions(+), 28 deletions(-) create mode 100644 examples/smithyClient/src/examples/smithy/client/ChildProcess.scala create mode 100644 examples/smithyClient/src/examples/smithy/client/ClientMain.scala create mode 100644 examples/smithyServer/src/examples/smithy/server/ServerMain.scala create mode 100644 examples/smithyShared/smithy/spec.smithy diff --git a/build.sc b/build.sc index 4fb977b..00daa76 100644 --- a/build.sc +++ b/build.sc @@ -99,29 +99,24 @@ object smithy4s extends RPCCrossPlatformModule { cross => object gen extends Smithy4sModule { def scalaVersion = "2.13.10" def smithy4sInternalDependenciesAsJars = T { - smithy.jar() +: super.smithy4sInternalDependenciesAsJars() + List(smithy.jar()) } } object jvm extends mill.Cross[JvmModule](scala213, scala3) - class JvmModule(cv: String) extends cross.JVM(cv) { - override def sources: Sources = T.sources { - super.sources() ++ gen.generatedSources() - } + def sharedSmithy = T.sources(T.workspace / "smithy" / "resources" / "META-INF" / "smithy") + class JvmModule(cv: String) extends cross.JVM(cv) with Smithy4sModule { + def smithy4sInputDirs = sharedSmithy } object js extends mill.Cross[JsModule](scala213, scala3) - class JsModule(cv: String) extends cross.JS(cv) { - override def sources: Sources = T.sources { - super.sources() ++ gen.generatedSources() - } + class JsModule(cv: String) extends cross.JS(cv) with Smithy4sModule { + def smithy4sInputDirs = sharedSmithy } object native extends mill.Cross[NativeModule](scala3) - class NativeModule(cv: String) extends cross.Native(cv) { - override def sources: Sources = T.sources { - super.sources() ++ gen.generatedSources() - } + class NativeModule(cv: String) extends cross.Native(cv) with Smithy4sModule { + def smithy4sInputDirs = sharedSmithy } } @@ -144,21 +139,26 @@ object examples extends mill.define.Module { } } - // object smithyServer extends ScalaModule { - // def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") - // def moduleDeps = Seq(fs2.jvm(versions.scala213), smithy4s.jvm(versions.scala213)) - // def scalaVersion = versions.scala213Version - // } - - // object smithyClient extends ScalaModule { - // def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") - // def moduleDeps = Seq(fs2.jvm(versions.scala213), smithy4s.jvm(versions.scala213)) - // def scalaVersion = versions.scala213Version - // def forkEnv: Target[Map[String, String]] = T { - // val assembledServer = smithyServer.assembly() - // super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString()) - // } - // } + object smithyShared extends Smithy4sModule { + def moduleDeps = Seq(smithy4s.jvm(versions.scala213)) + def scalaVersion = versions.scala213Version + } + + object smithyServer extends ScalaModule { + def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") + def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared) + def scalaVersion = versions.scala213Version + } + + object smithyClient extends ScalaModule { + def ivyDeps = Agg(ivy"co.fs2::fs2-io:${versions.fs2}") + def moduleDeps = Seq(fs2.jvm(versions.scala213), smithyShared) + def scalaVersion = versions.scala213Version + def forkEnv: Target[Map[String, String]] = T { + val assembledServer = smithyServer.assembly() + super.forkEnv() ++ Map("SERVER_JAR" -> assembledServer.path.toString()) + } + } } diff --git a/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala b/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala new file mode 100644 index 0000000..4056634 --- /dev/null +++ b/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala @@ -0,0 +1,68 @@ +package examples.smithy.client + +import fs2.Stream +import cats.effect._ +import cats.syntax.all._ +import scala.jdk.CollectionConverters._ +import java.io.OutputStream + +trait ChildProcess[F[_]] { + def stdin: fs2.Pipe[F, Byte, Unit] + def stdout: Stream[F, Byte] + def stderr: Stream[F, Byte] +} + +object ChildProcess { + + def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] = + Stream.bracket(start[F](command))(_._2).map(_._1) + + val readBufferSize = 512 + private def start[F[_]: Async](command: Seq[String]) = Async[F].interruptible { + val p = + new java.lang.ProcessBuilder(command.asJava) + .start() // .directory(new java.io.File(wd)).start() + val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit())) + + val terminate: F[Unit] = Sync[F].interruptible(p.destroy()) + + import cats._ + val onGlobal = new (F ~> F) { + def apply[A](fa: F[A]): F[A] = Async[F].evalOn(fa, scala.concurrent.ExecutionContext.global) + } + + val cp = new ChildProcess[F] { + def stdin: fs2.Pipe[F, Byte, Unit] = + writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream())) + + def stdout: fs2.Stream[F, Byte] = fs2.io + .readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize) + .translate(onGlobal) + + def stderr: fs2.Stream[F, Byte] = fs2.io + .readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize) + .translate(onGlobal) + // Avoids broken pipe - we cut off when the program ends. + // Users can decide what to do with the error logs using the exitCode value + .interruptWhen(done.void.attempt) + } + (cp, terminate) + } + + /** Adds a flush after each chunk + */ + def writeOutputStreamFlushingChunks[F[_]]( + fos: F[OutputStream], + closeAfterUse: Boolean = true + )(implicit F: Sync[F]): fs2.Pipe[F, Byte, Nothing] = + s => { + def useOs(os: OutputStream): Stream[F, Nothing] = + s.chunks.foreach(c => F.interruptible(os.write(c.toArray)) >> F.blocking(os.flush())) + + val os = + if (closeAfterUse) Stream.bracket(fos)(os => F.blocking(os.close())) + else Stream.eval(fos) + os.flatMap(os => useOs(os) ++ Stream.exec(F.blocking(os.flush()))) + } + +} diff --git a/examples/smithyClient/src/examples/smithy/client/ClientMain.scala b/examples/smithyClient/src/examples/smithy/client/ClientMain.scala new file mode 100644 index 0000000..e74ff63 --- /dev/null +++ b/examples/smithyClient/src/examples/smithy/client/ClientMain.scala @@ -0,0 +1,61 @@ +package examples.smithy.client + +import cats.effect._ +import cats.syntax.all._ +import fs2.Stream +import fs2.io._ +import jsonrpclib.CallId +import jsonrpclib.fs2._ +import jsonrpclib.smithy4sinterop.ClientStub +import jsonrpclib.smithy4sinterop.ServerEndpoints +import test._ + +import java.io.InputStream +import java.io.OutputStream + +object SmithyClientMain extends IOApp.Simple { + + // Reserving a method for cancelation. + val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity) + + type IOStream[A] = fs2.Stream[IO, A] + def log(str: String): IOStream[Unit] = Stream.eval(IO.consoleForIO.errorln(str)) + + // Implementing the generated interface + object Client extends TestClient[IO] { + def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Client says: hello $name !")) + def pong(pong: String): IO[Unit] = IO.consoleForIO.errorln(s"Client received pong: $pong") + } + + def run: IO[Unit] = { + import scala.concurrent.duration._ + val run = for { + //////////////////////////////////////////////////////// + /////// BOOTSTRAPPING + //////////////////////////////////////////////////////// + _ <- log("Starting client") + serverJar <- sys.env.get("SERVER_JAR").liftTo[IOStream](new Exception("SERVER_JAR env var does not exist")) + // Starting the server + rp <- ChildProcess.spawn[IO]("java", "-jar", serverJar) + // Creating a channel that will be used to communicate to the server + fs2Channel <- FS2Channel[IO](cancelTemplate = cancelEndpoint.some) + // Mounting our implementation of the generated interface onto the channel + _ <- fs2Channel.withEndpointsStream(ServerEndpoints(Client)) + // Creating stubs to talk to the remote server + server: TestServer[IO] <- ClientStub.stream(test.TestServer, fs2Channel) + _ <- Stream(()) + .concurrently(fs2Channel.output.through(lsp.encodePayloads).through(rp.stdin)) + .concurrently(rp.stdout.through(lsp.decodePayloads).through(fs2Channel.input)) + .concurrently(rp.stderr.through(fs2.io.stderr[IO])) + + //////////////////////////////////////////////////////// + /////// INTERACTION + //////////////////////////////////////////////////////// + result1 <- Stream.eval(server.greet("Client")) + _ <- log(s"Client received $result1") + _ <- Stream.eval(server.ping("Ping")) + } yield () + run.compile.drain + } + +} diff --git a/examples/smithyServer/src/examples/smithy/server/ServerMain.scala b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala new file mode 100644 index 0000000..890a575 --- /dev/null +++ b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala @@ -0,0 +1,41 @@ +package examples.smithy.server + +import jsonrpclib.CallId +import jsonrpclib.fs2._ +import cats.effect._ +import fs2.io._ +import jsonrpclib.Endpoint +import cats.syntax.all._ +import test._ // smithy4s-generated package +import jsonrpclib.smithy4sinterop.ClientStub +import jsonrpclib.smithy4sinterop.ServerEndpoints + +object ServerMain extends IOApp.Simple { + + // Reserving a method for cancelation. + val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity) + + // Implementing an incrementation endpoint + class ServerImpl(client: TestClient[IO]) extends TestServer[IO] { + def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Server says: hello $name !")) + + def ping(ping: String): IO[Unit] = client.pong(s"Returned to sender: $ping") + } + + def run: IO[Unit] = { + val run = for { + channel <- FS2Channel[IO](cancelTemplate = Some(cancelEndpoint)) + testClient <- ClientStub.stream(TestClient, channel) + _ <- channel.withEndpointsStream(ServerEndpoints(new ServerImpl(testClient))) + _ <- fs2.Stream + .eval(IO.never) // running the server forever + .concurrently(stdin[IO](512).through(lsp.decodePayloads).through(channel.input)) + .concurrently(channel.output.through(lsp.encodePayloads).through(stdout[IO])) + } yield {} + + // Using errorln as stdout is used by the RPC channel + IO.consoleForIO.errorln("Starting server") >> run.compile.drain + .guarantee(IO.consoleForIO.errorln("Terminating server")) + } + +} diff --git a/examples/smithyShared/smithy/spec.smithy b/examples/smithyShared/smithy/spec.smithy new file mode 100644 index 0000000..f832f90 --- /dev/null +++ b/examples/smithyShared/smithy/spec.smithy @@ -0,0 +1,45 @@ +$version: "2.0" + +namespace test + +use jsonrpclib#jsonRequest +use jsonrpclib#jsonRPC +use jsonrpclib#jsonNotification + +@jsonRPC +service TestServer { + operations: [Greet, Ping] +} + +@jsonRPC +service TestClient { + operations: [Greet, Pong] +} + +@jsonRequest("greet") +operation Greet { + input := { + @required + name: String + } + output := { + @required + message: String + } +} + +@jsonNotification("ping") +operation Ping { + input := { + @required + ping: String + } +} + +@jsonNotification("pong") +operation Pong { + input := { + @required + pong: String + } +} From c18a8141b0120fb72156640c57dc17423696540e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20M=C3=A9lois?= Date: Tue, 21 Feb 2023 12:20:38 +0100 Subject: [PATCH 3/6] Fixed un-bound flatMap, tried a few other things --- build.sc | 2 +- .../examples/smithy/client/ChildProcess.scala | 49 ++++++++----------- .../examples/smithy/client/ClientMain.scala | 3 +- .../examples/smithy/server/ServerMain.scala | 5 +- examples/smithyShared/smithy/spec.smithy | 2 +- fs2/src/jsonrpclib/fs2/FS2Channel.scala | 9 ++-- .../smithy4sinterop/ClientStub.scala | 6 ++- 7 files changed, 36 insertions(+), 40 deletions(-) diff --git a/build.sc b/build.sc index 00daa76..e224253 100644 --- a/build.sc +++ b/build.sc @@ -25,7 +25,7 @@ object versions { val scalaNativeVersion = "0.4.8" val munitVersion = "0.7.29" val munitNativeVersion = "1.0.0-M7" - val fs2 = "3.3.0" + val fs2 = "3.6.1" val weaver = "0.8.0" val scala213 = "2.13" diff --git a/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala b/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala index 4056634..154e66d 100644 --- a/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala +++ b/examples/smithyClient/src/examples/smithy/client/ChildProcess.scala @@ -15,39 +15,32 @@ trait ChildProcess[F[_]] { object ChildProcess { def spawn[F[_]: Async](command: String*): Stream[F, ChildProcess[F]] = - Stream.bracket(start[F](command))(_._2).map(_._1) + Stream.resource(startRes(command)) val readBufferSize = 512 - private def start[F[_]: Async](command: Seq[String]) = Async[F].interruptible { - val p = - new java.lang.ProcessBuilder(command.asJava) - .start() // .directory(new java.io.File(wd)).start() - val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit())) - val terminate: F[Unit] = Sync[F].interruptible(p.destroy()) - - import cats._ - val onGlobal = new (F ~> F) { - def apply[A](fa: F[A]): F[A] = Async[F].evalOn(fa, scala.concurrent.ExecutionContext.global) + private def startRes[F[_]: Async](command: Seq[String]) = Resource + .make { + Async[F].interruptible(new java.lang.ProcessBuilder(command.asJava).start()) + } { p => + Sync[F].interruptible(p.destroy()) } - - val cp = new ChildProcess[F] { - def stdin: fs2.Pipe[F, Byte, Unit] = - writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream())) - - def stdout: fs2.Stream[F, Byte] = fs2.io - .readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize) - .translate(onGlobal) - - def stderr: fs2.Stream[F, Byte] = fs2.io - .readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize) - .translate(onGlobal) - // Avoids broken pipe - we cut off when the program ends. - // Users can decide what to do with the error logs using the exitCode value - .interruptWhen(done.void.attempt) + .map { p => + val done = Async[F].fromCompletableFuture(Sync[F].delay(p.onExit())) + new ChildProcess[F] { + def stdin: fs2.Pipe[F, Byte, Unit] = + writeOutputStreamFlushingChunks[F](Sync[F].interruptible(p.getOutputStream())) + + def stdout: fs2.Stream[F, Byte] = fs2.io + .readInputStream[F](Sync[F].interruptible(p.getInputStream()), chunkSize = readBufferSize) + + def stderr: fs2.Stream[F, Byte] = fs2.io + .readInputStream[F](Sync[F].blocking(p.getErrorStream()), chunkSize = readBufferSize) + // Avoids broken pipe - we cut off when the program ends. + // Users can decide what to do with the error logs using the exitCode value + .interruptWhen(done.void.attempt) + } } - (cp, terminate) - } /** Adds a flush after each chunk */ diff --git a/examples/smithyClient/src/examples/smithy/client/ClientMain.scala b/examples/smithyClient/src/examples/smithy/client/ClientMain.scala index e74ff63..66a3c1f 100644 --- a/examples/smithyClient/src/examples/smithy/client/ClientMain.scala +++ b/examples/smithyClient/src/examples/smithy/client/ClientMain.scala @@ -23,7 +23,6 @@ object SmithyClientMain extends IOApp.Simple { // Implementing the generated interface object Client extends TestClient[IO] { - def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Client says: hello $name !")) def pong(pong: String): IO[Unit] = IO.consoleForIO.errorln(s"Client received pong: $pong") } @@ -55,7 +54,7 @@ object SmithyClientMain extends IOApp.Simple { _ <- log(s"Client received $result1") _ <- Stream.eval(server.ping("Ping")) } yield () - run.compile.drain + run.compile.drain.guarantee(IO.consoleForIO.errorln("Terminating client")) } } diff --git a/examples/smithyServer/src/examples/smithy/server/ServerMain.scala b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala index 890a575..266c1e1 100644 --- a/examples/smithyServer/src/examples/smithy/server/ServerMain.scala +++ b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala @@ -22,6 +22,8 @@ object ServerMain extends IOApp.Simple { def ping(ping: String): IO[Unit] = client.pong(s"Returned to sender: $ping") } + def printErr(s: String): IO[Unit] = IO.consoleForIO.errorln(s) + def run: IO[Unit] = { val run = for { channel <- FS2Channel[IO](cancelTemplate = Some(cancelEndpoint)) @@ -34,8 +36,7 @@ object ServerMain extends IOApp.Simple { } yield {} // Using errorln as stdout is used by the RPC channel - IO.consoleForIO.errorln("Starting server") >> run.compile.drain - .guarantee(IO.consoleForIO.errorln("Terminating server")) + printErr("Starting server") >> run.compile.drain.guarantee(printErr("Terminating server")) } } diff --git a/examples/smithyShared/smithy/spec.smithy b/examples/smithyShared/smithy/spec.smithy index f832f90..518c745 100644 --- a/examples/smithyShared/smithy/spec.smithy +++ b/examples/smithyShared/smithy/spec.smithy @@ -13,7 +13,7 @@ service TestServer { @jsonRPC service TestClient { - operations: [Greet, Pong] + operations: [Pong] } @jsonRequest("greet") diff --git a/fs2/src/jsonrpclib/fs2/FS2Channel.scala b/fs2/src/jsonrpclib/fs2/FS2Channel.scala index 4e4a454..77aa640 100644 --- a/fs2/src/jsonrpclib/fs2/FS2Channel.scala +++ b/fs2/src/jsonrpclib/fs2/FS2Channel.scala @@ -14,6 +14,7 @@ import cats.syntax.all._ import cats.effect.syntax.all._ import jsonrpclib.internals.MessageDispatcher import jsonrpclib.internals._ +import _root_.fs2.concurrent.{Channel => ConcurrentChannel} import scala.util.Try @@ -53,7 +54,7 @@ object FS2Channel { for { supervisor <- Stream.resource(Supervisor[F]) ref <- Ref[F].of(State[F](Map.empty, Map.empty, Map.empty, 0)).toStream - queue <- cats.effect.std.Queue.bounded[F, Payload](bufferSize).toStream + queue <- Stream.bracket(ConcurrentChannel.bounded[F, Payload](bufferSize))(_.closed) impl = new Impl(queue, ref, supervisor, cancelTemplate) // Creating a bespoke endpoint to receive cancelation requests @@ -98,7 +99,7 @@ object FS2Channel { } private class Impl[F[_]]( - private val queue: cats.effect.std.Queue[F, Payload], + private val queue: ConcurrentChannel[F, Payload], private val state: Ref[F, FS2Channel.State[F]], supervisor: Supervisor[F], maybeCancelTemplate: Option[CancelTemplate] @@ -106,7 +107,7 @@ object FS2Channel { extends MessageDispatcher[F] with FS2Channel[F] { - def output: Stream[F, Payload] = Stream.fromQueueUnterminated(queue) + def output: Stream[F, Payload] = queue.stream def input: Pipe[F, Payload, Unit] = _.evalMap(handleReceivedPayload) def mountEndpoint(endpoint: Endpoint[F]): F[Unit] = state @@ -136,7 +137,7 @@ object FS2Channel { } protected def reportError(params: Option[Payload], error: ProtocolError, method: String): F[Unit] = ??? protected def getEndpoint(method: String): F[Option[Endpoint[F]]] = state.get.map(_.endpoints.get(method)) - protected def sendMessage(message: Message): F[Unit] = queue.offer(Codec.encode(message)) + protected def sendMessage(message: Message): F[Unit] = queue.send(Codec.encode(message)).void protected def nextCallId(): F[CallId] = state.modify(_.nextCallId) protected def createPromise[A](callId: CallId): F[(Try[A] => F[Unit], () => F[A])] = Deferred[F, Try[A]].map { diff --git a/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala b/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala index a800b3d..07f72ec 100644 --- a/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala +++ b/smithy4s/src/jsonrpclib/smithy4sinterop/ClientStub.scala @@ -43,7 +43,7 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], service.endpoints .traverse_ { ep => val shapeId = ep.id - EndpointSpec.fromHints(ep.hints).liftTo[F](NotJsonRPCEndpoint(shapeId)).map { epSpec => + EndpointSpec.fromHints(ep.hints).liftTo[F](NotJsonRPCEndpoint(shapeId)).flatMap { epSpec => val stub = jsonRPCStub(ep, epSpec) cache.update(_ + (shapeId -> stub)) } @@ -51,7 +51,9 @@ private class ClientStub[Alg[_[_, _, _, _, _]], F[_]](val service: Service[Alg], .as { new PolyFunction5[service.Endpoint, Stub] { def apply[I, E, O, SI, SO](ep: service.Endpoint[I, E, O, SI, SO]): Stub[I, E, O, SI, SO] = { - cache.get.map { _(ep.id).asInstanceOf[I => F[O]] } + cache.get.map { c => + c(ep.id).asInstanceOf[I => F[O]] + } } } } From cce5d1df52ca3da6e65b01c1bae2d28c3f0bde68 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20M=C3=A9lois?= Date: Tue, 21 Feb 2023 12:29:41 +0100 Subject: [PATCH 4/6] Fix comment --- .../smithyServer/src/examples/smithy/server/ServerMain.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/smithyServer/src/examples/smithy/server/ServerMain.scala b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala index 40d8153..279ef38 100644 --- a/examples/smithyServer/src/examples/smithy/server/ServerMain.scala +++ b/examples/smithyServer/src/examples/smithy/server/ServerMain.scala @@ -15,7 +15,7 @@ object ServerMain extends IOApp.Simple { // Reserving a method for cancelation. val cancelEndpoint = CancelTemplate.make[CallId]("$/cancel", identity, identity) - // Implementing an incrementation endpoint + // Implementing the generated interface class ServerImpl(client: TestClient[IO]) extends TestServer[IO] { def greet(name: String): IO[GreetOutput] = IO.pure(GreetOutput(s"Server says: hello $name !")) From 0c3251b2ded24d371e7d6eb307f1cdc29c3cc065 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20M=C3=A9lois?= Date: Tue, 21 Feb 2023 15:13:19 +0100 Subject: [PATCH 5/6] Revert changes to FS2Channel --- fs2/src/jsonrpclib/fs2/FS2Channel.scala | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/fs2/src/jsonrpclib/fs2/FS2Channel.scala b/fs2/src/jsonrpclib/fs2/FS2Channel.scala index 14c5e7d..811ced9 100644 --- a/fs2/src/jsonrpclib/fs2/FS2Channel.scala +++ b/fs2/src/jsonrpclib/fs2/FS2Channel.scala @@ -12,8 +12,7 @@ import cats.effect.kernel._ import cats.effect.std.Supervisor import cats.syntax.all._ import cats.effect.syntax.all._ -import jsonrpclib.internals._ -import _root_.fs2.concurrent.{Channel => ConcurrentChannel} +import jsonrpclib.internals.MessageDispatcher import scala.util.Try import java.util.regex.Pattern @@ -55,7 +54,7 @@ object FS2Channel { for { supervisor <- Stream.resource(Supervisor[F]) ref <- Ref[F].of(State[F](Map.empty, Map.empty, Map.empty, Vector.empty, 0)).toStream - queue <- Stream.bracket(ConcurrentChannel.bounded[F, Message](bufferSize))(_.closed) + queue <- cats.effect.std.Queue.bounded[F, Message](bufferSize).toStream impl = new Impl(queue, ref, supervisor, cancelTemplate) // Creating a bespoke endpoint to receive cancelation requests @@ -117,7 +116,7 @@ object FS2Channel { } private class Impl[F[_]]( - private val queue: ConcurrentChannel[F, Message], + private val queue: cats.effect.std.Queue[F, Message], private val state: Ref[F, FS2Channel.State[F]], supervisor: Supervisor[F], maybeCancelTemplate: Option[CancelTemplate] @@ -125,7 +124,7 @@ object FS2Channel { extends MessageDispatcher[F] with FS2Channel[F] { - def output: Stream[F, Message] = queue.stream + def output: Stream[F, Message] = Stream.fromQueueUnterminated(queue) def inputOrBounce: Pipe[F, Either[ProtocolError, Message], Unit] = _.evalMap { case Left(error) => sendProtocolError(error) case Right(message) => handleReceivedMessage(message) @@ -159,7 +158,7 @@ object FS2Channel { } protected def reportError(params: Option[Payload], error: ProtocolError, method: String): F[Unit] = ??? protected def getEndpoint(method: String): F[Option[Endpoint[F]]] = state.get.map(_.getEndpoint(method)) - protected def sendMessage(message: Message): F[Unit] = queue.send(message).void + protected def sendMessage(message: Message): F[Unit] = queue.offer(message) protected def nextCallId(): F[CallId] = state.modify(_.nextCallId) protected def createPromise[A](callId: CallId): F[(Try[A] => F[Unit], () => F[A])] = Deferred[F, Try[A]].map { From 2cfb5891e7bc2e5097fabe662bf30d1b25a28044 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Olivier=20M=C3=A9lois?= Date: Tue, 21 Feb 2023 15:47:52 +0100 Subject: [PATCH 6/6] Bump remaining versions --- .mill-version | 2 +- build.sc | 9 +++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/.mill-version b/.mill-version index cd47247..223df19 100644 --- a/.mill-version +++ b/.mill-version @@ -1 +1 @@ -0.10.10 \ No newline at end of file +0.10.11 diff --git a/build.sc b/build.sc index 16cbe39..6cb4d6e 100644 --- a/build.sc +++ b/build.sc @@ -20,11 +20,12 @@ import _root_.smithy4s.codegen.mill._ object versions { val scala212Version = "2.12.16" val scala213Version = "2.13.10" - val scala3Version = "3.2.1" - val scalaJSVersion = "1.10.1" - val scalaNativeVersion = "0.4.8" + val scala3Version = "3.2.2" + val scalaJSVersion = "1.13.0" + val scalaNativeVersion = "0.4.10" val munitVersion = "0.7.29" val munitNativeVersion = "1.0.0-M7" + val jsoniterVersion = "2.21.0" val fs2 = "3.6.1" val weaver = "0.8.0" @@ -43,7 +44,7 @@ import versions._ object core extends RPCCrossPlatformModule { cross => def crossPlatformIvyDeps: T[Agg[Dep]] = Agg( - ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::2.17.0" + ivy"com.github.plokhotnyuk.jsoniter-scala::jsoniter-scala-macros::$jsoniterVersion" ) object jvm extends mill.Cross[JvmModule](scala213, scala3)