Skip to content

Commit

Permalink
Merge pull request #6 from SeanCheatham/function-block-store-abstraction
Browse files Browse the repository at this point in the history
Create FunctionsStore and BlocksStore abstractions
  • Loading branch information
SeanCheatham authored Aug 11, 2024
2 parents 81494ab + 474b355 commit d30c2d6
Show file tree
Hide file tree
Showing 8 changed files with 124 additions and 175 deletions.
6 changes: 3 additions & 3 deletions backend/core/src/main/scala/chainless/ApiServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ import scala.concurrent.duration.*
*/
class ApiServer(
functions: FunctionsDb[IO],
functionStoreClient: ObjectStore[IO],
functionStoreClient: FunctionsStore[IO],
invocationsDB: FunctionInvocationsDb[IO],
operator: RunnerOperator[IO],
jobDispatcher: JobDispatcher[IO]
Expand Down Expand Up @@ -139,7 +139,7 @@ class ApiServer(
case request @ DELETE -> Root / "functions" / id =>
functions
.delete(id)
.flatTap(deleted => IO.whenA(deleted)(functionStoreClient.delete("functions")(id).void))
.flatTap(deleted => IO.whenA(deleted)(functionStoreClient.delete(id).void))
.ifM(
Response().pure[F],
Response().withStatus(Status.NotFound).pure[F]
Expand Down Expand Up @@ -172,7 +172,7 @@ class ApiServer(
Files.forIO.tempFile
.use(file =>
stream.through(Files.forIO.writeAll(file)).compile.drain >>
functionStoreClient.save("functions")(id + revision.some.filter(_ > 0).fold("")(r => s"-$r"))(
functionStoreClient.save(id, revision)(
Files.forIO.readAll(file)
)
) >> functions.updateRevision(id, revision).as(Response())
Expand Down
18 changes: 10 additions & 8 deletions backend/core/src/main/scala/chainless/ChainlessMain.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,22 @@ object ChainlessMain extends ResourceApp.Forever {
functionsDb <- SqlFunctionsDb.make[F](sqliteConnection)
functionInvocationsDb <- SqlFunctionInvocationsDb.make[F](sqliteConnection)
blocksDb <- SqlBlocksDb.make[F](sqliteConnection)
objectStore = new ObjectStore[F](Path(args.dataDir) / "objects")
blocksStore = new BlocksStore[F](Path(args.dataDir) / "objects" / "blocks")
functionsStore = new FunctionsStore[F](Path(args.dataDir) / "objects" / "functions")
runnerOperator = new RunnerOperator[F](
blocksDb,
objectStore,
blocksStore,
functionInvocationsDb,
newBlocksTopic.subscribeUnbounded
)
jobProcessor <- makeJobProcessor(args, objectStore, functionsDb, functionInvocationsDb, blocksDb)
jobProcessor <- makeJobProcessor(args, functionsStore, blocksStore, functionsDb, functionInvocationsDb, blocksDb)
providers <- Providers
.make[F](args.bitcoinRpcAddress, args.ethereumRpcAddress, args.apparatusRpcAddress)
.map(_.toList.map(provider => provider.chain -> provider).toMap)
replicator = new Replicator[F](blocksDb, providers, objectStore, newBlocksTopic.publish1(_).void)
replicator = new Replicator[F](blocksDb, providers, blocksStore, newBlocksTopic.publish1(_).void)
dispatcher = new JobDispatcher[F](newBlocksTopic.subscribeUnbounded, functionsDb, jobProcessor)
_ <- healthcheck.setReadiness(Healthcheck.Healthy()).toResource
apiServer = new ApiServer(functionsDb, objectStore, functionInvocationsDb, runnerOperator, dispatcher)
apiServer = new ApiServer(functionsDb, functionsStore, functionInvocationsDb, runnerOperator, dispatcher)
_ <- (
Logger[F].info("Running").toResource,
replicator.replicate.toResource,
Expand All @@ -70,7 +71,8 @@ object ChainlessMain extends ResourceApp.Forever {

private def makeJobProcessor(
args: RunnerManagerArgs,
objectStore: ObjectStore[F],
functionsStore: FunctionsStore[F],
blocksStore: BlocksStore[F],
functionsDb: FunctionsDb[F],
functionInvocationsDb: FunctionInvocationsDb[F],
blocksDb: BlocksDb[F]
Expand All @@ -88,7 +90,7 @@ object ChainlessMain extends ResourceApp.Forever {
.flatMap(localCodeCache =>
DockerDriver.make[F](
s"http://chainless:$port",
objectStore,
functionsStore,
localCodeCache
)
)
Expand All @@ -98,7 +100,7 @@ object ChainlessMain extends ResourceApp.Forever {
functionsDb,
functionInvocationsDb,
blocksDb,
objectStore,
blocksStore,
canceledRef
)
)
Expand Down
87 changes: 33 additions & 54 deletions backend/core/src/main/scala/chainless/db/ObjectStore.scala
Original file line number Diff line number Diff line change
@@ -1,64 +1,43 @@
package chainless.db

import cats.data.OptionT
import cats.effect.Async
import cats.effect.implicits.*
import cats.implicits.*
import fs2.Stream
import chainless.models.*
import fs2.io.file.{Files, Path}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import fs2.{Chunk, Stream}

/** Stores byte data in a directory structure. Data is stored in "revisions", meaning if a key already exists, a new
* revision is added (and marked "latest") instead of deleting the old entry.
* @param baseDir
* a base directory in which "buckets" of data are saved
*/
class ObjectStore[F[_]: Async: Files](baseDir: Path):
import java.nio.charset.StandardCharsets

private given Logger[F] = Slf4jLogger.getLoggerFromName("ObjectStore")
class BlocksStore[F[_]: Async](baseDir: Path):
def saveBlock(block: BlockWithChain): F[Unit] =
Files[F].createDirectories(baseDir / block.meta.chain.name) >>
Stream(block.block)
.map(_.noSpaces)
.through(fs2.text.utf8.encode)
.through(Files[F].writeAll(baseDir / block.meta.chain.name / block.meta.blockId))
.compile
.drain

def save(bucket: String)(id: String)(data: Stream[F, Byte]): F[Unit] =
Logger[F].info(s"Saving id=$id") >>
(baseDir / bucket / id)
.pure[F]
.flatTap(Files[F].createDirectories(_))
.flatMap(objectDir =>
OptionT(contains(objectDir))
.fold(0)(_ + 1)
.map(nextRevision => objectDir / nextRevision.toString)
.flatMap(file => Files[F].createFile(file) *> data.through(Files[F].writeAll(file)).compile.drain)
) >>
Logger[F].info(s"Finished saving id=$id")

def get(bucket: String)(id: String): Stream[F, Byte] =
Stream
.eval(
OptionT
.pure[F](baseDir / bucket / id)
.flatMap(objectDir =>
OptionT(contains(objectDir))
.map(revision => objectDir / revision.toString)
)
.getOrRaise(new NoSuchElementException("No data"))
)
.flatMap(Files[F].readAll)

def exists(bucket: String)(id: String): F[Boolean] =
OptionT
.pure[F](baseDir / bucket / id)
.flatMap(objectDir => OptionT(contains(objectDir)))
.isDefined

def delete(bucket: String)(id: String): F[Boolean] =
exists(bucket)(id)
.flatTap(Async[F].whenA(_)(Files[F].deleteRecursively(baseDir / bucket / id)))

private def contains(objectDir: Path): F[Option[Int]] =
def getBlock(meta: BlockMeta): F[BlockWithChain] =
Files[F]
.list(objectDir)
.evalFilter(Files[F].isRegularFile(_))
.map(_.fileName.toString.toIntOption)
.collect { case Some(i) => i }
.readAll(baseDir / meta.chain.name / meta.blockId)
.compile
.to(Chunk)
.map(chunk => new String(chunk.toArray[Byte], StandardCharsets.UTF_8))
.map(io.circe.parser.parse)
.rethrow
.map(BlockWithChain(meta, _))

class FunctionsStore[F[_]: Async](baseDir: Path):
def get(id: String)(revision: Int): Stream[F, Byte] =
Files[F].readAll(baseDir / id / revision.toString)

def delete(id: String): F[Unit] =
Files[F].deleteRecursively(baseDir / id)

def save(id: String, revision: Int)(data: Stream[F, Byte]): F[Unit] =
Files[F].createDirectories(baseDir / id) >> data
.through(Files[F].writeAll(baseDir / id / revision.toString))
.compile
.fold(-1)(_.max(_))
.map(_.some.filter(_ >= 0))
.drain
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.typelevel.log4cats.slf4j.Slf4jLogger
class Replicator[F[_]: Async](
blocks: BlocksDb[F],
providers: Map[Chain, BlocksProvider[F]],
blockStore: ObjectStore[F],
blockStore: BlocksStore[F],
broadcaster: BlockMeta => F[Unit]
):

Expand Down Expand Up @@ -62,13 +62,7 @@ class Replicator[F[_]: Async](

private def pipe: Pipe[F, BlockWithChain, Unit] =
_.evalTap(block => logger.info(show"Saving block $block"))
.evalTap(blockWithChain =>
blockStore.save(blockWithChain.meta.chain.name)(blockWithChain.meta.blockId)(
Stream(blockWithChain.block)
.map(_.noSpaces)
.through(fs2.text.utf8.encode)
)
)
.evalTap(blockStore.saveBlock)
.map(_.meta)
.evalTap(blocks.insert)
.evalTap(broadcaster)
Expand Down
Loading

0 comments on commit d30c2d6

Please sign in to comment.