From 191ff01c7d022f60e7edee5bcc3052638a44f13f Mon Sep 17 00:00:00 2001 From: Sean Cheatham Date: Sun, 11 Aug 2024 12:10:53 -0400 Subject: [PATCH 1/2] Create FunctionsStore and BlocksStore abstractions --- .../src/main/scala/chainless/ApiServer.scala | 6 +- .../main/scala/chainless/ChainlessMain.scala | 20 +-- .../main/scala/chainless/db/ObjectStore.scala | 66 +++++++-- .../chainless/replicator/Replicator.scala | 10 +- .../runner/persistent/DockerDriver.scala | 130 +++++++++--------- .../runner/persistent/JobProcessor.scala | 28 +--- .../runner/temporary/RunnerOperator.scala | 17 +-- backend/project/plugins.sbt | 3 +- 8 files changed, 145 insertions(+), 135 deletions(-) diff --git a/backend/core/src/main/scala/chainless/ApiServer.scala b/backend/core/src/main/scala/chainless/ApiServer.scala index c8af68a..46e07e9 100644 --- a/backend/core/src/main/scala/chainless/ApiServer.scala +++ b/backend/core/src/main/scala/chainless/ApiServer.scala @@ -52,7 +52,7 @@ import scala.concurrent.duration.* */ class ApiServer( functions: FunctionsDb[IO], - functionStoreClient: ObjectStore[IO], + functionStoreClient: FunctionsStore[IO], invocationsDB: FunctionInvocationsDb[IO], operator: RunnerOperator[IO], jobDispatcher: JobDispatcher[IO] @@ -139,7 +139,7 @@ class ApiServer( case request @ DELETE -> Root / "functions" / id => functions .delete(id) - .flatTap(deleted => IO.whenA(deleted)(functionStoreClient.delete("functions")(id).void)) + .flatTap(deleted => IO.whenA(deleted)(functionStoreClient.delete(id).void)) .ifM( Response().pure[F], Response().withStatus(Status.NotFound).pure[F] @@ -172,7 +172,7 @@ class ApiServer( Files.forIO.tempFile .use(file => stream.through(Files.forIO.writeAll(file)).compile.drain >> - functionStoreClient.save("functions")(id + revision.some.filter(_ > 0).fold("")(r => s"-$r"))( + functionStoreClient.save(id, revision)( Files.forIO.readAll(file) ) ) >> functions.updateRevision(id, revision).as(Response()) diff --git a/backend/core/src/main/scala/chainless/ChainlessMain.scala b/backend/core/src/main/scala/chainless/ChainlessMain.scala index 31a1e61..8bfde64 100644 --- a/backend/core/src/main/scala/chainless/ChainlessMain.scala +++ b/backend/core/src/main/scala/chainless/ChainlessMain.scala @@ -45,21 +45,24 @@ object ChainlessMain extends ResourceApp.Forever { functionsDb <- SqlFunctionsDb.make[F](sqliteConnection) functionInvocationsDb <- SqlFunctionInvocationsDb.make[F](sqliteConnection) blocksDb <- SqlBlocksDb.make[F](sqliteConnection) - objectStore = new ObjectStore[F](Path(args.dataDir) / "objects") + blocksStore <- ObjectStore.make[F](Path(args.dataDir) / "objects" / "blocks", "blocks").map(new BlocksStore[F](_)) + functionsStore <- ObjectStore + .make[F](Path(args.dataDir) / "objects" / "functions", "functions") + .map(new FunctionsStore[F](_)) runnerOperator = new RunnerOperator[F]( blocksDb, - objectStore, + blocksStore, functionInvocationsDb, newBlocksTopic.subscribeUnbounded ) - jobProcessor <- makeJobProcessor(args, objectStore, functionsDb, functionInvocationsDb, blocksDb) + jobProcessor <- makeJobProcessor(args, functionsStore, blocksStore, functionsDb, functionInvocationsDb, blocksDb) providers <- Providers .make[F](args.bitcoinRpcAddress, args.ethereumRpcAddress, args.apparatusRpcAddress) .map(_.toList.map(provider => provider.chain -> provider).toMap) - replicator = new Replicator[F](blocksDb, providers, objectStore, newBlocksTopic.publish1(_).void) + replicator = new Replicator[F](blocksDb, providers, blocksStore, newBlocksTopic.publish1(_).void) dispatcher = new JobDispatcher[F](newBlocksTopic.subscribeUnbounded, functionsDb, jobProcessor) _ <- healthcheck.setReadiness(Healthcheck.Healthy()).toResource - apiServer = new ApiServer(functionsDb, objectStore, functionInvocationsDb, runnerOperator, dispatcher) + apiServer = new ApiServer(functionsDb, functionsStore, functionInvocationsDb, runnerOperator, dispatcher) _ <- ( Logger[F].info("Running").toResource, replicator.replicate.toResource, @@ -70,7 +73,8 @@ object ChainlessMain extends ResourceApp.Forever { private def makeJobProcessor( args: RunnerManagerArgs, - objectStore: ObjectStore[F], + functionsStore: FunctionsStore[F], + blocksStore: BlocksStore[F], functionsDb: FunctionsDb[F], functionInvocationsDb: FunctionInvocationsDb[F], blocksDb: BlocksDb[F] @@ -88,7 +92,7 @@ object ChainlessMain extends ResourceApp.Forever { .flatMap(localCodeCache => DockerDriver.make[F]( s"http://chainless:$port", - objectStore, + functionsStore, localCodeCache ) ) @@ -98,7 +102,7 @@ object ChainlessMain extends ResourceApp.Forever { functionsDb, functionInvocationsDb, blocksDb, - objectStore, + blocksStore, canceledRef ) ) diff --git a/backend/core/src/main/scala/chainless/db/ObjectStore.scala b/backend/core/src/main/scala/chainless/db/ObjectStore.scala index 73d5e4e..06d323b 100644 --- a/backend/core/src/main/scala/chainless/db/ObjectStore.scala +++ b/backend/core/src/main/scala/chainless/db/ObjectStore.scala @@ -1,25 +1,31 @@ package chainless.db import cats.data.OptionT -import cats.effect.Async +import cats.effect.{Async, Resource} +import cats.effect.implicits.* import cats.implicits.* -import fs2.Stream +import chainless.models.* +import fs2.{Chunk, Stream} import fs2.io.file.{Files, Path} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import java.nio.charset.StandardCharsets + /** Stores byte data in a directory structure. Data is stored in "revisions", meaning if a key already exists, a new * revision is added (and marked "latest") instead of deleting the old entry. * @param baseDir - * a base directory in which "buckets" of data are saved + * a base directory in which "objects" of data are saved + * @param name + * the name of this store (for logging) */ -class ObjectStore[F[_]: Async: Files](baseDir: Path): +class ObjectStore[F[_]: Async: Files](baseDir: Path, name: String): - private given Logger[F] = Slf4jLogger.getLoggerFromName("ObjectStore") + private given Logger[F] = Slf4jLogger.getLoggerFromName(s"ObjectStore-$name") - def save(bucket: String)(id: String)(data: Stream[F, Byte]): F[Unit] = + def save(id: String)(data: Stream[F, Byte]): F[Unit] = Logger[F].info(s"Saving id=$id") >> - (baseDir / bucket / id) + (baseDir / id) .pure[F] .flatTap(Files[F].createDirectories(_)) .flatMap(objectDir => @@ -30,11 +36,11 @@ class ObjectStore[F[_]: Async: Files](baseDir: Path): ) >> Logger[F].info(s"Finished saving id=$id") - def get(bucket: String)(id: String): Stream[F, Byte] = + def get(id: String): Stream[F, Byte] = Stream .eval( OptionT - .pure[F](baseDir / bucket / id) + .pure[F](baseDir / id) .flatMap(objectDir => OptionT(contains(objectDir)) .map(revision => objectDir / revision.toString) @@ -43,15 +49,15 @@ class ObjectStore[F[_]: Async: Files](baseDir: Path): ) .flatMap(Files[F].readAll) - def exists(bucket: String)(id: String): F[Boolean] = + def exists(id: String): F[Boolean] = OptionT - .pure[F](baseDir / bucket / id) + .pure[F](baseDir / id) .flatMap(objectDir => OptionT(contains(objectDir))) .isDefined - def delete(bucket: String)(id: String): F[Boolean] = - exists(bucket)(id) - .flatTap(Async[F].whenA(_)(Files[F].deleteRecursively(baseDir / bucket / id))) + def delete(id: String): F[Boolean] = + exists(id) + .flatTap(Async[F].whenA(_)(Files[F].deleteRecursively(baseDir / id))) private def contains(objectDir: Path): F[Option[Int]] = Files[F] @@ -62,3 +68,35 @@ class ObjectStore[F[_]: Async: Files](baseDir: Path): .compile .fold(-1)(_.max(_)) .map(_.some.filter(_ >= 0)) + +object ObjectStore: + def make[F[_]: Async: Files](baseDir: Path, name: String): Resource[F, ObjectStore[F]] = + Files[F].createDirectories(baseDir).toResource.as(new ObjectStore[F](baseDir, name)) + +class BlocksStore[F[_]: Async](objectStore: ObjectStore[F]): + def saveBlock(block: BlockWithChain): F[Unit] = + objectStore.save(s"${block.meta.chain.name}/${block.meta.blockId}")( + Stream(block.block) + .map(_.noSpaces) + .through(fs2.text.utf8.encode) + ) + + def getBlock(meta: BlockMeta): F[BlockWithChain] = + objectStore + .get(s"${meta.chain.name}/${meta.blockId}") + .compile + .to(Chunk) + .map(chunk => new String(chunk.toArray[Byte], StandardCharsets.UTF_8)) + .map(io.circe.parser.parse) + .rethrow + .map(BlockWithChain(meta, _)) + +class FunctionsStore[F[_]: Async](objectStore: ObjectStore[F]): + def get(id: String)(revision: Int): Stream[F, Byte] = + objectStore.get(s"$id/$revision") + + def delete(id: String): F[Boolean] = + objectStore.delete(id) + + def save(id: String, revision: Int)(data: Stream[F, Byte]): F[Unit] = + objectStore.save(s"$id/$revision")(data) diff --git a/backend/core/src/main/scala/chainless/replicator/Replicator.scala b/backend/core/src/main/scala/chainless/replicator/Replicator.scala index 6abdaf6..a698908 100644 --- a/backend/core/src/main/scala/chainless/replicator/Replicator.scala +++ b/backend/core/src/main/scala/chainless/replicator/Replicator.scala @@ -23,7 +23,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger class Replicator[F[_]: Async]( blocks: BlocksDb[F], providers: Map[Chain, BlocksProvider[F]], - blockStore: ObjectStore[F], + blockStore: BlocksStore[F], broadcaster: BlockMeta => F[Unit] ): @@ -62,13 +62,7 @@ class Replicator[F[_]: Async]( private def pipe: Pipe[F, BlockWithChain, Unit] = _.evalTap(block => logger.info(show"Saving block $block")) - .evalTap(blockWithChain => - blockStore.save(blockWithChain.meta.chain.name)(blockWithChain.meta.blockId)( - Stream(blockWithChain.block) - .map(_.noSpaces) - .through(fs2.text.utf8.encode) - ) - ) + .evalTap(blockStore.saveBlock) .map(_.meta) .evalTap(blocks.insert) .evalTap(broadcaster) diff --git a/backend/core/src/main/scala/chainless/runner/persistent/DockerDriver.scala b/backend/core/src/main/scala/chainless/runner/persistent/DockerDriver.scala index 2a985cf..ec8755b 100644 --- a/backend/core/src/main/scala/chainless/runner/persistent/DockerDriver.scala +++ b/backend/core/src/main/scala/chainless/runner/persistent/DockerDriver.scala @@ -5,12 +5,12 @@ import cats.effect.implicits.* import cats.effect.std.Dispatcher import cats.effect.{Async, Resource} import cats.implicits.* -import chainless.db.ObjectStore +import chainless.db.FunctionsStore import chainless.models.Language import com.github.dockerjava.api.DockerClient import com.github.dockerjava.api.async.ResultCallback import com.github.dockerjava.api.command.{AsyncDockerCmd, SyncDockerCmd} -import com.github.dockerjava.api.model.{Bind, HostConfig, PruneType} +import com.github.dockerjava.api.model.{Bind, HostConfig} import com.github.dockerjava.core.{DefaultDockerClientConfig, DockerClientImpl} import com.github.dockerjava.httpclient5.ApacheDockerHttpClient import fs2.io.file.{Files, Path} @@ -26,7 +26,7 @@ import scala.concurrent.duration.* * the HTTP URL base path which is provided to user functions via the `CALLBACK_BASE_URL` environment variable * @param dockerClient * client to interact with docker - * @param objectStore + * @param functionsStore * object store containing the function code * @param tmpDir * a directory in which a function can be extracted and mounted into a container @@ -34,67 +34,71 @@ import scala.concurrent.duration.* class DockerDriver[F[_]: Async: Files]( callbackBasePath: String, dockerClient: DockerClient, - objectStore: ObjectStore[F], + functionsStore: FunctionsStore[F], tmpDir: Path ) { given Logger[F] = Slf4jLogger.getLoggerFromName("DockerDriver") def createContainer( language: Language, - functionRevisionId: String + functionId: String, + functionRevision: Int ): Resource[F, String] = - prune.toResource >> - currentCachedCodeFunctionId - .filter(_ == functionRevisionId) - .void - .semiflatTap(_ => Logger[F].info(s"Code cache hit for functionRevisionId=$functionRevisionId")) - .getOrElseF(clearLocalCodeCache >> fetchAndSave(language, functionRevisionId)) - .as(tmpDir / functionRevisionId) - .toResource - .flatMap(codeDir => - Async[F] - .delay(DockerImage.forLanguage(language)) - .map(i => - i.split(':') match { - case Array(repository, tag) => (repository, tag) - case Array(repository) => (repository, "latest") - case _ => throw MatchError(i) - } - ) - .toResource - .flatMap((imageRepository, imageTag) => - Logger[F].info(s"Pulling $imageRepository:$imageTag").toResource >> - useDockerAsync(dockerClient.pullImageCmd(imageRepository).withTag(imageTag)) - .timeout(2.minutes) - .toResource >> - Logger[F].info("Creating container").toResource >> - Resource - .make( - useDocker( - dockerClient - .createContainerCmd(s"$imageRepository:$imageTag") - .withEnv(s"CALLBACK_BASE_URL=$callbackBasePath" +: DockerDriver.envForLanguage(language)*) - .withHostConfig( - HostConfig - .newHostConfig() - .withBinds(Bind.parse(s"$codeDir:/code:ro")) - .withNetworkMode("chainless") - ) - .withCmd(DockerDriver.commandForLanguage(language)*) - .withWorkingDir("/code") - ) - .map(_.getId) - .timeout(30.seconds) - )(removeContainer) - ) - ) + Resource + .pure(functionId + functionRevision.toString) + .flatMap(functionRevisionId => + currentCachedCodeFunctionRevisionId + .filter(_ == functionRevisionId) + .void + .semiflatTap(_ => Logger[F].info(s"Code cache hit for functionId=$functionId revision=$functionRevision")) + .getOrElseF(clearLocalCodeCache >> fetchAndSave(language, functionId, functionRevision)) + .as(tmpDir / functionRevisionId) + .toResource + .flatMap(codeDir => + Async[F] + .delay(DockerImage.forLanguage(language)) + .map(i => + i.split(':') match { + case Array(repository, tag) => (repository, tag) + case Array(repository) => (repository, "latest") + case _ => throw MatchError(i) + } + ) + .toResource + .flatMap((imageRepository, imageTag) => + Logger[F].info(s"Pulling $imageRepository:$imageTag").toResource >> + useDockerAsync(dockerClient.pullImageCmd(imageRepository).withTag(imageTag)) + .timeout(2.minutes) + .toResource >> + Logger[F].info("Creating container").toResource >> + Resource + .make( + useDocker( + dockerClient + .createContainerCmd(s"$imageRepository:$imageTag") + .withEnv(s"CALLBACK_BASE_URL=$callbackBasePath" +: DockerDriver.envForLanguage(language)*) + .withHostConfig( + HostConfig + .newHostConfig() + .withBinds(Bind.parse(s"$codeDir:/code:ro")) + .withNetworkMode("chainless") + ) + .withCmd(DockerDriver.commandForLanguage(language)*) + .withWorkingDir("/code") + ) + .map(_.getId) + .timeout(30.seconds) + )(removeContainer) + ) + ) + ) def startContainer(containerId: String): F[Unit] = Logger[F].info(s"Starting container=$containerId") >> useDocker(dockerClient.startContainerCmd(containerId)) >> Logger[F].info(s"Started container=$containerId") - private def currentCachedCodeFunctionId: OptionT[F, String] = + private def currentCachedCodeFunctionRevisionId: OptionT[F, String] = OptionT .liftF(Files[F].exists(tmpDir)) .filter(identity) @@ -114,11 +118,12 @@ class DockerDriver[F[_]: Async: Files]( Files[F].createDirectories(tmpDir) ) - private def fetchAndSave(language: Language, functionId: String): F[Unit] = + private def fetchAndSave(language: Language, functionId: String, revision: Int): F[Unit] = for { - _ <- Logger[F].info(s"Fetching function code for functionId=$functionId") - _ <- Files[F].createDirectories(tmpDir / functionId) - data = objectStore.get("functions")(functionId) + _ <- Logger[F].info(s"Fetching function code for functionId=$functionId revision=$revision") + codeDir = tmpDir / s"$functionId${revision.toString}" + _ <- Files[F].createDirectories(codeDir) + data = functionsStore.get(functionId)(revision) _ <- (language match { case Language.JS => data @@ -126,7 +131,7 @@ class DockerDriver[F[_]: Async: Files]( .evalTap((name, isDirectory, data) => if (isDirectory) data.compile.drain else - (tmpDir / functionId / name) + (codeDir / name) .pure[F] .flatTap(file => Logger[F].debug(s"Extracting $file")) .flatTap(_.parent.traverse(Files[F].createDirectories)) @@ -136,7 +141,7 @@ class DockerDriver[F[_]: Async: Files]( .compile .drain case Language.JVM => - (tmpDir / functionId / "function.jar") + (codeDir / "function.jar") .pure[F] .flatTap(file => Logger[F].debug(s"Saving JAR $file")) .flatTap(file => data.through(Files[F].writeAll(file)).compile.drain) @@ -193,19 +198,12 @@ class DockerDriver[F[_]: Async: Files]( onError = (_: Throwable, _) => ().pure[F] )(useDocker(dockerClient.pingCmd()).void) - def prune: F[Unit] = - Logger[F].info("Pruning unused docker data") *> - useDocker(dockerClient.pruneCmd(PruneType.BUILD)) >> - useDocker(dockerClient.pruneCmd(PruneType.CONTAINERS)) >> - useDocker(dockerClient.pruneCmd(PruneType.NETWORKS)) >> - useDocker(dockerClient.pruneCmd(PruneType.VOLUMES)).void - } object DockerDriver: def make[F[_]: Async]( callbackBasePath: String, - functionStoreClient: ObjectStore[F], + functionsStore: FunctionsStore[F], tmpDir: Path ): Resource[F, DockerDriver[F]] = Resource @@ -224,7 +222,7 @@ object DockerDriver: Resource.fromAutoCloseable(Async[F].delay(DockerClientImpl.getInstance(config, httpClient))) ) ) - .map(new DockerDriver(callbackBasePath, _, functionStoreClient, tmpDir)) + .map(new DockerDriver(callbackBasePath, _, functionsStore, tmpDir)) .evalTap(_.awaitDockerReady) def commandForLanguage(language: Language): List[String] = diff --git a/backend/core/src/main/scala/chainless/runner/persistent/JobProcessor.scala b/backend/core/src/main/scala/chainless/runner/persistent/JobProcessor.scala index c617ded..25c96ce 100644 --- a/backend/core/src/main/scala/chainless/runner/persistent/JobProcessor.scala +++ b/backend/core/src/main/scala/chainless/runner/persistent/JobProcessor.scala @@ -8,12 +8,10 @@ import cats.implicits.* import cats.{MonadThrow, NonEmptyParallel} import chainless.db.* import chainless.models.* -import fs2.Chunk import io.circe.Json import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger -import java.nio.charset.StandardCharsets import scala.concurrent.duration.* /** Handles a "job" which is a request to run a function. The job is processed by a sub-container which handles each @@ -27,7 +25,7 @@ case class JobProcessorImpl[F[_]: Async: NonEmptyParallel]( functionsDb: FunctionsDb[F], functionInvocationsDb: FunctionInvocationsDb[F], blockApiClient: BlocksDb[F], - blockStoreClient: ObjectStore[F], + blockStoreClient: BlocksStore[F], jobMutex: Mutex[F], stateRef: Ref[F, Option[TaskProcessor[F]]], wasCanceledRef: Ref[F, Boolean] @@ -68,12 +66,11 @@ case class JobProcessorImpl[F[_]: Async: NonEmptyParallel]( functionRevision <- OptionT .fromOption(functionInfo.revision) .getOrRaise(new IllegalArgumentException("Function code not uploaded")) - functionRevisionId = job.functionId + functionRevision.some.filter(_ > 0).fold("")(r => s"-$r") _ <- Logger[F].info( - s"Launching container for functionId=${job.functionId} functionRevisionId=$functionRevisionId invocationId=${job.jobId}" + s"Launching container for functionId=${job.functionId} functionRevision=$functionRevision invocationId=${job.jobId}" ) _ <- dockerDriver - .createContainer(functionInfo.language, functionRevisionId) + .createContainer(functionInfo.language, job.functionId, functionRevision) .use(containerId => processWithInactiveContainer(containerId)(job, functionInfo, liteTasks, job.jobId, completionDeferred) ) @@ -192,17 +189,6 @@ case class JobProcessorImpl[F[_]: Async: NonEmptyParallel]( } } - def fetchBlock(meta: BlockMeta): F[BlockWithChain] = - blockStoreClient - .get(meta.chain.name)(meta.blockId) - .compile - .to(Chunk) - .map(chunk => new String(chunk.toArray[Byte], StandardCharsets.UTF_8)) - .map(io.circe.parser.parse) - .rethrow - .map(BlockWithChain(meta, _)) - .timeout(5.seconds) - } object JobProcessor: @@ -212,7 +198,7 @@ object JobProcessor: functionsDb: FunctionsDb[F], functionInvocationsDb: FunctionInvocationsDb[F], blockApiClient: BlocksDb[F], - blockStoreClient: ObjectStore[F], + blocksStore: BlocksStore[F], canceledRef: Ref[F, Boolean] ): Resource[F, JobProcessorImpl[F]] = (Mutex[F].toResource, Ref.of(none[TaskProcessor[F]]).toResource) @@ -222,7 +208,7 @@ object JobProcessor: functionsDb, functionInvocationsDb, blockApiClient, - blockStoreClient, + blocksStore, _, _, canceledRef @@ -320,8 +306,8 @@ case class TasksPending[F[_]: Async: Logger]( case InitLiteTask(config) => InitTask(config).pure[F] case ApplyLiteTask(block) => - jobProcessor - .fetchBlock(block) + jobProcessor.blockStoreClient + .getBlock(block) .onError(e => onFinished(computeDuration, currentState.asRight, e.some)) .map(ApplyBlockTask(currentState, _)) }) diff --git a/backend/core/src/main/scala/chainless/runner/temporary/RunnerOperator.scala b/backend/core/src/main/scala/chainless/runner/temporary/RunnerOperator.scala index 42b3a07..ae837e9 100644 --- a/backend/core/src/main/scala/chainless/runner/temporary/RunnerOperator.scala +++ b/backend/core/src/main/scala/chainless/runner/temporary/RunnerOperator.scala @@ -12,7 +12,6 @@ import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger import scala.concurrent.duration.* -import java.nio.charset.StandardCharsets import java.util.UUID /** Manages new requests to run temporary functions @@ -27,7 +26,7 @@ import java.util.UUID */ class RunnerOperator[F[_]: Async: NonEmptyParallel]( blocksDb: BlocksDb[F], - blocksStore: ObjectStore[F], + blocksStore: BlocksStore[F], invocationsDb: FunctionInvocationsDb[F], newBlocks: Stream[F, BlockMeta] ): @@ -43,7 +42,7 @@ class RunnerOperator[F[_]: Async: NonEmptyParallel]( .flatMap(runner => blocksDb .blocksAfterTimestamp(chains)(timestampMs) - .evalMap(fetchBlock) + .evalMap(blocksStore.getBlock) .evalScan((Duration.Zero, FunctionState(Map.empty, Json.Null))) { case ((_, stateWithChains), blockWithMeta) => Async[F] @@ -61,7 +60,7 @@ class RunnerOperator[F[_]: Async: NonEmptyParallel]( .flatMap(runner => newBlocks .filter(meta => chains.contains(meta.chain)) - .evalMap(fetchBlock) + .evalMap(blocksStore.getBlock) .evalScan((Duration.Zero, stateWithChains)) { case ((_, stateWithChains), blockWithMeta) => Async[F].timed(runner.applyBlock(stateWithChains, blockWithMeta)) } @@ -103,13 +102,3 @@ class RunnerOperator[F[_]: Async: NonEmptyParallel]( ) ) ) - - private def fetchBlock(meta: BlockMeta): F[BlockWithChain] = - blocksStore - .get(meta.chain.name)(meta.blockId) - .compile - .to(Chunk) - .map(chunk => new String(chunk.toArray[Byte], StandardCharsets.UTF_8)) - .map(io.circe.parser.parse) - .rethrow - .map(BlockWithChain(meta, _)) diff --git a/backend/project/plugins.sbt b/backend/project/plugins.sbt index abc3f03..3a0b06d 100644 --- a/backend/project/plugins.sbt +++ b/backend/project/plugins.sbt @@ -3,5 +3,6 @@ Seq( "ch.epfl.scala" % "sbt-scalafix" % "0.12.1", "com.github.sbt" % "sbt-native-packager" % "1.10.0", "com.eed3si9n" % "sbt-buildinfo" % "0.11.0", - "com.github.sbt" % "sbt-dynver" % "5.0.1" + "com.github.sbt" % "sbt-dynver" % "5.0.1", + "org.scoverage" % "sbt-scoverage" % "2.1.0" ).map(addSbtPlugin) From 474b355f0a4c0acf4e57d5cf98524ef1ab65acdd Mon Sep 17 00:00:00 2001 From: Sean Cheatham Date: Sun, 11 Aug 2024 12:43:28 -0400 Subject: [PATCH 2/2] Simplify ObjectStore --- .../main/scala/chainless/ChainlessMain.scala | 6 +- .../main/scala/chainless/db/ObjectStore.scala | 93 ++++--------------- 2 files changed, 19 insertions(+), 80 deletions(-) diff --git a/backend/core/src/main/scala/chainless/ChainlessMain.scala b/backend/core/src/main/scala/chainless/ChainlessMain.scala index 8bfde64..f5af563 100644 --- a/backend/core/src/main/scala/chainless/ChainlessMain.scala +++ b/backend/core/src/main/scala/chainless/ChainlessMain.scala @@ -45,10 +45,8 @@ object ChainlessMain extends ResourceApp.Forever { functionsDb <- SqlFunctionsDb.make[F](sqliteConnection) functionInvocationsDb <- SqlFunctionInvocationsDb.make[F](sqliteConnection) blocksDb <- SqlBlocksDb.make[F](sqliteConnection) - blocksStore <- ObjectStore.make[F](Path(args.dataDir) / "objects" / "blocks", "blocks").map(new BlocksStore[F](_)) - functionsStore <- ObjectStore - .make[F](Path(args.dataDir) / "objects" / "functions", "functions") - .map(new FunctionsStore[F](_)) + blocksStore = new BlocksStore[F](Path(args.dataDir) / "objects" / "blocks") + functionsStore = new FunctionsStore[F](Path(args.dataDir) / "objects" / "functions") runnerOperator = new RunnerOperator[F]( blocksDb, blocksStore, diff --git a/backend/core/src/main/scala/chainless/db/ObjectStore.scala b/backend/core/src/main/scala/chainless/db/ObjectStore.scala index 06d323b..140e77c 100644 --- a/backend/core/src/main/scala/chainless/db/ObjectStore.scala +++ b/backend/core/src/main/scala/chainless/db/ObjectStore.scala @@ -1,89 +1,27 @@ package chainless.db -import cats.data.OptionT -import cats.effect.{Async, Resource} +import cats.effect.Async import cats.effect.implicits.* import cats.implicits.* import chainless.models.* -import fs2.{Chunk, Stream} import fs2.io.file.{Files, Path} -import org.typelevel.log4cats.Logger -import org.typelevel.log4cats.slf4j.Slf4jLogger +import fs2.{Chunk, Stream} import java.nio.charset.StandardCharsets -/** Stores byte data in a directory structure. Data is stored in "revisions", meaning if a key already exists, a new - * revision is added (and marked "latest") instead of deleting the old entry. - * @param baseDir - * a base directory in which "objects" of data are saved - * @param name - * the name of this store (for logging) - */ -class ObjectStore[F[_]: Async: Files](baseDir: Path, name: String): - - private given Logger[F] = Slf4jLogger.getLoggerFromName(s"ObjectStore-$name") - - def save(id: String)(data: Stream[F, Byte]): F[Unit] = - Logger[F].info(s"Saving id=$id") >> - (baseDir / id) - .pure[F] - .flatTap(Files[F].createDirectories(_)) - .flatMap(objectDir => - OptionT(contains(objectDir)) - .fold(0)(_ + 1) - .map(nextRevision => objectDir / nextRevision.toString) - .flatMap(file => Files[F].createFile(file) *> data.through(Files[F].writeAll(file)).compile.drain) - ) >> - Logger[F].info(s"Finished saving id=$id") - - def get(id: String): Stream[F, Byte] = - Stream - .eval( - OptionT - .pure[F](baseDir / id) - .flatMap(objectDir => - OptionT(contains(objectDir)) - .map(revision => objectDir / revision.toString) - ) - .getOrRaise(new NoSuchElementException("No data")) - ) - .flatMap(Files[F].readAll) - - def exists(id: String): F[Boolean] = - OptionT - .pure[F](baseDir / id) - .flatMap(objectDir => OptionT(contains(objectDir))) - .isDefined - - def delete(id: String): F[Boolean] = - exists(id) - .flatTap(Async[F].whenA(_)(Files[F].deleteRecursively(baseDir / id))) - - private def contains(objectDir: Path): F[Option[Int]] = - Files[F] - .list(objectDir) - .evalFilter(Files[F].isRegularFile(_)) - .map(_.fileName.toString.toIntOption) - .collect { case Some(i) => i } - .compile - .fold(-1)(_.max(_)) - .map(_.some.filter(_ >= 0)) - -object ObjectStore: - def make[F[_]: Async: Files](baseDir: Path, name: String): Resource[F, ObjectStore[F]] = - Files[F].createDirectories(baseDir).toResource.as(new ObjectStore[F](baseDir, name)) - -class BlocksStore[F[_]: Async](objectStore: ObjectStore[F]): +class BlocksStore[F[_]: Async](baseDir: Path): def saveBlock(block: BlockWithChain): F[Unit] = - objectStore.save(s"${block.meta.chain.name}/${block.meta.blockId}")( + Files[F].createDirectories(baseDir / block.meta.chain.name) >> Stream(block.block) .map(_.noSpaces) .through(fs2.text.utf8.encode) - ) + .through(Files[F].writeAll(baseDir / block.meta.chain.name / block.meta.blockId)) + .compile + .drain def getBlock(meta: BlockMeta): F[BlockWithChain] = - objectStore - .get(s"${meta.chain.name}/${meta.blockId}") + Files[F] + .readAll(baseDir / meta.chain.name / meta.blockId) .compile .to(Chunk) .map(chunk => new String(chunk.toArray[Byte], StandardCharsets.UTF_8)) @@ -91,12 +29,15 @@ class BlocksStore[F[_]: Async](objectStore: ObjectStore[F]): .rethrow .map(BlockWithChain(meta, _)) -class FunctionsStore[F[_]: Async](objectStore: ObjectStore[F]): +class FunctionsStore[F[_]: Async](baseDir: Path): def get(id: String)(revision: Int): Stream[F, Byte] = - objectStore.get(s"$id/$revision") + Files[F].readAll(baseDir / id / revision.toString) - def delete(id: String): F[Boolean] = - objectStore.delete(id) + def delete(id: String): F[Unit] = + Files[F].deleteRecursively(baseDir / id) def save(id: String, revision: Int)(data: Stream[F, Byte]): F[Unit] = - objectStore.save(s"$id/$revision")(data) + Files[F].createDirectories(baseDir / id) >> data + .through(Files[F].writeAll(baseDir / id / revision.toString)) + .compile + .drain