Skip to content

Commit

Permalink
Create FunctionsStore and BlocksStore abstractions
Browse files Browse the repository at this point in the history
  • Loading branch information
SeanCheatham committed Aug 11, 2024
1 parent 81494ab commit 191ff01
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 135 deletions.
6 changes: 3 additions & 3 deletions backend/core/src/main/scala/chainless/ApiServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import scala.concurrent.duration.*
*/
class ApiServer(
functions: FunctionsDb[IO],
functionStoreClient: ObjectStore[IO],
functionStoreClient: FunctionsStore[IO],
invocationsDB: FunctionInvocationsDb[IO],
operator: RunnerOperator[IO],
jobDispatcher: JobDispatcher[IO]
Expand Down Expand Up @@ -139,7 +139,7 @@ class ApiServer(
case request @ DELETE -> Root / "functions" / id =>
functions
.delete(id)
.flatTap(deleted => IO.whenA(deleted)(functionStoreClient.delete("functions")(id).void))
.flatTap(deleted => IO.whenA(deleted)(functionStoreClient.delete(id).void))
.ifM(
Response().pure[F],
Response().withStatus(Status.NotFound).pure[F]
Expand Down Expand Up @@ -172,7 +172,7 @@ class ApiServer(
Files.forIO.tempFile
.use(file =>
stream.through(Files.forIO.writeAll(file)).compile.drain >>
functionStoreClient.save("functions")(id + revision.some.filter(_ > 0).fold("")(r => s"-$r"))(
functionStoreClient.save(id, revision)(
Files.forIO.readAll(file)
)
) >> functions.updateRevision(id, revision).as(Response())
Expand Down
20 changes: 12 additions & 8 deletions backend/core/src/main/scala/chainless/ChainlessMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,24 @@ object ChainlessMain extends ResourceApp.Forever {
functionsDb <- SqlFunctionsDb.make[F](sqliteConnection)
functionInvocationsDb <- SqlFunctionInvocationsDb.make[F](sqliteConnection)
blocksDb <- SqlBlocksDb.make[F](sqliteConnection)
objectStore = new ObjectStore[F](Path(args.dataDir) / "objects")
blocksStore <- ObjectStore.make[F](Path(args.dataDir) / "objects" / "blocks", "blocks").map(new BlocksStore[F](_))
functionsStore <- ObjectStore
.make[F](Path(args.dataDir) / "objects" / "functions", "functions")
.map(new FunctionsStore[F](_))
runnerOperator = new RunnerOperator[F](
blocksDb,
objectStore,
blocksStore,
functionInvocationsDb,
newBlocksTopic.subscribeUnbounded
)
jobProcessor <- makeJobProcessor(args, objectStore, functionsDb, functionInvocationsDb, blocksDb)
jobProcessor <- makeJobProcessor(args, functionsStore, blocksStore, functionsDb, functionInvocationsDb, blocksDb)
providers <- Providers
.make[F](args.bitcoinRpcAddress, args.ethereumRpcAddress, args.apparatusRpcAddress)
.map(_.toList.map(provider => provider.chain -> provider).toMap)
replicator = new Replicator[F](blocksDb, providers, objectStore, newBlocksTopic.publish1(_).void)
replicator = new Replicator[F](blocksDb, providers, blocksStore, newBlocksTopic.publish1(_).void)
dispatcher = new JobDispatcher[F](newBlocksTopic.subscribeUnbounded, functionsDb, jobProcessor)
_ <- healthcheck.setReadiness(Healthcheck.Healthy()).toResource
apiServer = new ApiServer(functionsDb, objectStore, functionInvocationsDb, runnerOperator, dispatcher)
apiServer = new ApiServer(functionsDb, functionsStore, functionInvocationsDb, runnerOperator, dispatcher)
_ <- (
Logger[F].info("Running").toResource,
replicator.replicate.toResource,
Expand All @@ -70,7 +73,8 @@ object ChainlessMain extends ResourceApp.Forever {

private def makeJobProcessor(
args: RunnerManagerArgs,
objectStore: ObjectStore[F],
functionsStore: FunctionsStore[F],
blocksStore: BlocksStore[F],
functionsDb: FunctionsDb[F],
functionInvocationsDb: FunctionInvocationsDb[F],
blocksDb: BlocksDb[F]
Expand All @@ -88,7 +92,7 @@ object ChainlessMain extends ResourceApp.Forever {
.flatMap(localCodeCache =>
DockerDriver.make[F](
s"http://chainless:$port",
objectStore,
functionsStore,
localCodeCache
)
)
Expand All @@ -98,7 +102,7 @@ object ChainlessMain extends ResourceApp.Forever {
functionsDb,
functionInvocationsDb,
blocksDb,
objectStore,
blocksStore,
canceledRef
)
)
Expand Down
66 changes: 52 additions & 14 deletions backend/core/src/main/scala/chainless/db/ObjectStore.scala
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
package chainless.db

import cats.data.OptionT
import cats.effect.Async
import cats.effect.{Async, Resource}
import cats.effect.implicits.*
import cats.implicits.*
import fs2.Stream
import chainless.models.*
import fs2.{Chunk, Stream}
import fs2.io.file.{Files, Path}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger

import java.nio.charset.StandardCharsets

/** Stores byte data in a directory structure. Data is stored in "revisions", meaning if a key already exists, a new
* revision is added (and marked "latest") instead of deleting the old entry.
* @param baseDir
* a base directory in which "buckets" of data are saved
* a base directory in which "objects" of data are saved
* @param name
* the name of this store (for logging)
*/
class ObjectStore[F[_]: Async: Files](baseDir: Path):
class ObjectStore[F[_]: Async: Files](baseDir: Path, name: String):

private given Logger[F] = Slf4jLogger.getLoggerFromName("ObjectStore")
private given Logger[F] = Slf4jLogger.getLoggerFromName(s"ObjectStore-$name")

def save(bucket: String)(id: String)(data: Stream[F, Byte]): F[Unit] =
def save(id: String)(data: Stream[F, Byte]): F[Unit] =
Logger[F].info(s"Saving id=$id") >>
(baseDir / bucket / id)
(baseDir / id)
.pure[F]
.flatTap(Files[F].createDirectories(_))
.flatMap(objectDir =>
Expand All @@ -30,11 +36,11 @@ class ObjectStore[F[_]: Async: Files](baseDir: Path):
) >>
Logger[F].info(s"Finished saving id=$id")

def get(bucket: String)(id: String): Stream[F, Byte] =
def get(id: String): Stream[F, Byte] =
Stream
.eval(
OptionT
.pure[F](baseDir / bucket / id)
.pure[F](baseDir / id)
.flatMap(objectDir =>
OptionT(contains(objectDir))
.map(revision => objectDir / revision.toString)
Expand All @@ -43,15 +49,15 @@ class ObjectStore[F[_]: Async: Files](baseDir: Path):
)
.flatMap(Files[F].readAll)

def exists(bucket: String)(id: String): F[Boolean] =
def exists(id: String): F[Boolean] =
OptionT
.pure[F](baseDir / bucket / id)
.pure[F](baseDir / id)
.flatMap(objectDir => OptionT(contains(objectDir)))
.isDefined

def delete(bucket: String)(id: String): F[Boolean] =
exists(bucket)(id)
.flatTap(Async[F].whenA(_)(Files[F].deleteRecursively(baseDir / bucket / id)))
def delete(id: String): F[Boolean] =
exists(id)
.flatTap(Async[F].whenA(_)(Files[F].deleteRecursively(baseDir / id)))

private def contains(objectDir: Path): F[Option[Int]] =
Files[F]
Expand All @@ -62,3 +68,35 @@ class ObjectStore[F[_]: Async: Files](baseDir: Path):
.compile
.fold(-1)(_.max(_))
.map(_.some.filter(_ >= 0))

object ObjectStore:
def make[F[_]: Async: Files](baseDir: Path, name: String): Resource[F, ObjectStore[F]] =
Files[F].createDirectories(baseDir).toResource.as(new ObjectStore[F](baseDir, name))

class BlocksStore[F[_]: Async](objectStore: ObjectStore[F]):
def saveBlock(block: BlockWithChain): F[Unit] =
objectStore.save(s"${block.meta.chain.name}/${block.meta.blockId}")(
Stream(block.block)
.map(_.noSpaces)
.through(fs2.text.utf8.encode)
)

def getBlock(meta: BlockMeta): F[BlockWithChain] =
objectStore
.get(s"${meta.chain.name}/${meta.blockId}")
.compile
.to(Chunk)
.map(chunk => new String(chunk.toArray[Byte], StandardCharsets.UTF_8))
.map(io.circe.parser.parse)
.rethrow
.map(BlockWithChain(meta, _))

class FunctionsStore[F[_]: Async](objectStore: ObjectStore[F]):
def get(id: String)(revision: Int): Stream[F, Byte] =
objectStore.get(s"$id/$revision")

def delete(id: String): F[Boolean] =
objectStore.delete(id)

def save(id: String, revision: Int)(data: Stream[F, Byte]): F[Unit] =
objectStore.save(s"$id/$revision")(data)
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
class Replicator[F[_]: Async](
blocks: BlocksDb[F],
providers: Map[Chain, BlocksProvider[F]],
blockStore: ObjectStore[F],
blockStore: BlocksStore[F],
broadcaster: BlockMeta => F[Unit]
):

Expand Down Expand Up @@ -62,13 +62,7 @@ class Replicator[F[_]: Async](

private def pipe: Pipe[F, BlockWithChain, Unit] =
_.evalTap(block => logger.info(show"Saving block $block"))
.evalTap(blockWithChain =>
blockStore.save(blockWithChain.meta.chain.name)(blockWithChain.meta.blockId)(
Stream(blockWithChain.block)
.map(_.noSpaces)
.through(fs2.text.utf8.encode)
)
)
.evalTap(blockStore.saveBlock)
.map(_.meta)
.evalTap(blocks.insert)
.evalTap(broadcaster)
Expand Down
Loading

0 comments on commit 191ff01

Please sign in to comment.