Skip to content

Commit

Permalink
Logging, in memory repos for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
epifab committed Jan 30, 2024
1 parent 90d8cab commit 5b65bc7
Show file tree
Hide file tree
Showing 32 changed files with 351 additions and 176 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
uses: actions/setup-java@v3
with:
distribution: temurin
java-version: 8
java-version: 17
- name: Build
run: sbt compile Test/compile
- name: Test
Expand Down
18 changes: 10 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@ import org.scalajs.jsenv.nodejs.NodeJSEnv
import sbtcrossproject.CrossPlugin.autoImport.{crossProject, CrossType}

Global / version := "SNAPSHOT"
Global / scalaVersion := "3.2.0"
Global / scalaVersion := "3.3.1"
Global / jsEnv := new NodeJSEnv(NodeJSEnv.Config().withArgs(List("--dns-result-order=ipv4first")))

val catsCoreVersion = "2.9.0"
val catsEffectVersion = "3.3.0"
val catsEffectTestingVersion = "1.5.0"
val http4sVersion = "0.23.18"
val http4sVersion = "0.23.19"
val http4sBlazeVersion = "0.23.14"
val http4sJdkClientVersion = "0.9.0"
val log4catsVersion = "2.5.0"
val logbackClassicVersion = "1.4.6"
val redis4catsVersion = "1.4.0"
val http4sJdkClientVersion = "0.9.1"
val log4catsVersion = "2.6.0"
val logbackClassicVersion = "1.4.12"
val redis4catsVersion = "1.4.1"
val fs2Version = "3.2.2"
val circeVersion = "0.14.5"
val secureRandomVersion = "1.0.0"
Expand All @@ -37,6 +37,8 @@ lazy val domain = crossProject(JVMPlatform, JSPlatform)
"io.circe" %%% "circe-core" % circeVersion,
"io.circe" %%% "circe-generic" % circeVersion,
"io.circe" %%% "circe-parser" % circeVersion,
"org.typelevel" %% "log4cats-slf4j" % log4catsVersion,
"ch.qos.logback" % "logback-classic" % logbackClassicVersion,
"org.typelevel" %%% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test,
"org.scalatest" %%% "scalatest" % scalaTestVersion % Test
)
Expand Down Expand Up @@ -72,8 +74,6 @@ lazy val backend = (project in file("modules/backend"))
"org.http4s" %% "http4s-jdk-http-client" % http4sJdkClientVersion,
"org.scala-lang.modules" %% "scala-xml" % scalaXmlVersion,
"dev.profunktor" %% "redis4cats-effects" % redis4catsVersion,
"org.typelevel" %% "log4cats-slf4j" % log4catsVersion,
"ch.qos.logback" % "logback-classic" % logbackClassicVersion,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.typelevel" %% "cats-effect-testing-scalatest" % catsEffectTestingVersion % Test
),
Expand All @@ -86,3 +86,5 @@ lazy val backend = (project in file("modules/backend"))
Runtime / managedClasspath += (Assets / packageBin).value,
fork := true
)

lazy val root = (project in file(".")).aggregate(domain.jvm, backend)
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.server.{Router, Server, ServerBuilder}
import org.http4s.server.middleware.{CORS, CORSConfig, GZip}
import org.http4s.server.websocket.WebSocketBuilder2
import org.typelevel.log4cats.slf4j.Slf4jLogger
import org.typelevel.log4cats.Logger

object BackendApp extends IOApp:

given Logger[IO] = Slf4jLogger.getLogger

private def webServer(gameController: GameController[IO]): Resource[IO, Server] =
BlazeServerBuilder[IO]
.withHttpWebSocketApp((webSocket: WebSocketBuilder2[IO]) =>
Expand All @@ -38,3 +42,4 @@ object BackendApp extends IOApp:

override def run(args: List[String]): IO[ExitCode] =
resource.use(_.compile.drain.as(ExitCode.Success))
end BackendApp
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@ import bastoni.domain.model.PlayerState.*
import bastoni.domain.view.{FromPlayer, ToPlayer}
import bastoni.domain.view.FromPlayer.*
import cats.effect.{Sync, Temporal}
import cats.effect.kernel.Async
import cats.effect.syntax.temporal.*
import org.typelevel.log4cats.Logger

import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.util.chaining.*

object DumbPlayer extends ActStrategy:
def apply[F[_]: Sync: Temporal](
def apply[F[_]: Async: Logger](
controller: GameController[F],
pause: FiniteDuration = 0.millis
): VirtualPlayer[F] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import bastoni.domain.model.*
import bastoni.domain.view.{FromPlayer, ToPlayer}
import bastoni.domain.view.FromPlayer.{Connect, JoinTable}
import cats.effect.{Sync, Temporal}
import cats.effect.kernel.Async
import cats.effect.syntax.temporal.*
import cats.syntax.all.*
import org.typelevel.log4cats.Logger

import scala.concurrent.duration.DurationInt
import scala.concurrent.duration.FiniteDuration
Expand All @@ -19,7 +22,7 @@ case class ActContext(
trait ActStrategy:
def act(context: ActContext, action: Action): FromPlayer.GameCommand

class VirtualPlayer[F[_]: Sync: Temporal](
class VirtualPlayer[F[_]: Async: Logger](
controller: GameController[F],
strategy: ActStrategy,
pause: FiniteDuration = 0.millis
Expand All @@ -46,7 +49,10 @@ class VirtualPlayer[F[_]: Sync: Temporal](
case Some((context, ToPlayer.Request(Command.Act(playerId, action, _)))) if me.is(playerId) =>
strategy.act(context, action)
}
.evalMap(command => Sync[F].pure(command).delayBy(pause))
.evalMap(command =>
Logger[F].debug(Console.CYAN + show"VirtualPlayer: $me will play $command" + Console.RESET) *>
Sync[F].pure(command).delayBy(pause)
)

actions.through(controller.publish(me, roomId))
end VirtualPlayer
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package bastoni.domain.logic

import bastoni.domain.model.*
import bastoni.domain.model.Event.*
import cats.implicits.showInterpolator
import cats.syntax.show
import io.circe.generic.semiauto.deriveCodec
import io.circe.Codec

Expand All @@ -18,20 +20,24 @@ case class GameContext(room: RoomServerView, stateMachine: Option[GameStateMachi
case Command.JoinTable(user, seed) =>
room.join(user, seed) match
case Right((newRoom, seat)) => newRoom -> List(PlayerJoinedTable(user, seat))
case Left(error) => room -> Nil
case Left(error) => room -> List(ClientError(s"Failed to join the table: $error"))

case Command.LeaveTable(user) =>
room.leave(user) match
case Right((newRoom, seat)) => newRoom -> List(PlayerLeftTable(user, seat))
case Left(error) => room -> Nil
case Left(error) => room -> List(ClientError(s"Failed to leave the table: $error"))

case _ => room -> Nil

val (newStateMachine, gameMessages) = (stateMachine, message) match
case (None, Command.StartMatch(playerId, gameType))
if updatedRoom.contains(playerId) && updatedRoom.round.size > 1 =>
val (machine, initialEvents) = GameStateMachineFactory(gameType)(updatedRoom)
Some(machine) -> initialEvents
case (None, Command.StartMatch(playerId, gameType)) =>
if !updatedRoom.contains(playerId)
then stateMachine -> (ClientError(show"Player $playerId cannot start a match: not in the room") :: Nil)
else if updatedRoom.round.size <= 1
then stateMachine -> (ClientError(show"Cannot start a match in an empty room") :: Nil)
else
GameStateMachineFactory(gameType)(updatedRoom) match
case (m, e) => Some(m) -> e
case (None, _) => stateMachine -> Nil
case (Some(stateMachine), message) => stateMachine(message)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import cats.effect.{Resource, Sync}
import cats.effect.syntax.all.*
import cats.syntax.all.*
import cats.Monad
import org.typelevel.log4cats.Logger

import scala.util.Random

Expand All @@ -23,18 +24,39 @@ trait GameController[F[_]] extends GameSubscriber[F] with GamePublisher[F]

object GameController:

def apply[F[_]: Sync](messageBus: MessageBus[F]): GameController[F] =
def apply[F[_]: Sync: Logger](messageBus: MessageBus[F]): GameController[F] =
val pub = publisher(messageBus)
val sub = subscriber(messageBus)
new GameController[F]:
override def publish(me: User, roomId: RoomId)(input: fs2.Stream[F, GameCommand]): fs2.Stream[F, Unit] =
pub.publish(me, roomId)(input)
override def publish(me: User, roomId: RoomId)(stream: fs2.Stream[F, GameCommand]): fs2.Stream[F, Unit] =
pub.publish(me, roomId)(
stream.evalTap(command =>
Logger[F].debug(
Console.CYAN +
show"GameController: $me publishes $command to $roomId" +
Console.RESET
)
)
)

override def publish1(me: User, roomId: RoomId)(input: GameCommand): F[Unit] =
pub.publish1(me, roomId)(input)
override def publish1(me: User, roomId: RoomId)(command: GameCommand): F[Unit] =
pub.publish1(me, roomId)(command) <* Logger[F].debug(
Console.CYAN +
show"GameController: $me publishes $command to $roomId" +
Console.RESET
)

override def subscribe(me: User, roomId: RoomId): fs2.Stream[F, ToPlayer] =
sub.subscribe(me, roomId)
sub
.subscribe(me, roomId)
.evalTap(message =>
Logger[F].debug(
Console.CYAN +
show"GameController: $me receives $message from $roomId" +
Console.RESET
)
)
end apply

def subscriber[F[_]: Sync](messageBus: MessageBus[F]): GameSubscriber[F] =
(me: User, roomId: RoomId) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import cats.syntax.all.*
import cats.Applicative
import io.circe.{Decoder, DecodingFailure, Encoder, Json}
import io.circe.syntax.EncoderOps
import org.typelevel.log4cats.Logger

import scala.concurrent.duration.*

Expand All @@ -25,7 +26,7 @@ extension (data: List[Command | Delayed[Command] | ServerEvent])

object GameService:

def apply[F[_]: Concurrent](newId: F[MessageId], gameRepo: GameRepo[F], messageRepo: MessageRepo[F])(
def apply[F[_]: Concurrent: Logger](newId: F[MessageId], gameRepo: GameRepo[F], messageRepo: MessageRepo[F])(
messages: fs2.Stream[F, Message]
): fs2.Stream[F, Message | Delayed[Message]] =
messages
Expand All @@ -42,7 +43,7 @@ object GameService:
}
.flatMap(fs2.Stream.iterable)

def runner[F[_]: Async](
def runner[F[_]: Async: Logger](
name: String,
messageQueue: MessageConsumer[F],
messageBus: MessagePublisher[F],
Expand All @@ -51,14 +52,20 @@ object GameService:
delayDuration: Delay => FiniteDuration = Delay.default
): ServiceRunner[F] =

val oldMessages: fs2.Stream[F, Message | Delayed[Message]] = messageRepo.inFlight
val oldMessages: fs2.Stream[F, Message | Delayed[Message]] =
messageRepo.inFlight

val newMessages: fs2.Stream[F, Message | Delayed[Message]] = messageQueue.consume
.through(GameService(Async[F].delay(MessageId.newId), gameRepo, messageRepo))
.through(GameService.apply(Async[F].delay(MessageId.newId), gameRepo, messageRepo))

(oldMessages ++ newMessages)
.evalMap {
case Delayed(message, delay) => messageBus.publish1(message).delayBy(delayDuration(delay)).start.void
case message: Message => messageBus.publish1(message)
case delayed @ Delayed(message, delay) =>
Logger[F].debug(Console.RED + show"GameService($name): Emitting $delayed" + Console.RESET) *>
messageBus.publish1(message).delayBy(delayDuration(delay)).start.void
case message: Message =>
Logger[F].debug(Console.GREEN + show"GameService($name): Emitting $message" + Console.RESET) *>
messageBus.publish1(message)
}

end GameService
Original file line number Diff line number Diff line change
@@ -1,31 +1,37 @@
package bastoni.domain.logic

import bastoni.domain.model.*
import cats.{Functor, Monad, Show}
import cats.effect.{Concurrent, Resource, Sync}
import cats.effect.std.Queue
import cats.syntax.flatMap.toFlatMapOps
import cats.syntax.functor.toFunctorOps
import cats.Monad
import cats.syntax.all.*
import fs2.concurrent.Topic
import org.typelevel.log4cats.Logger

trait Bus[F[_], A] extends Subscriber[F, A], Publisher[F, A]:
def run: fs2.Stream[F, Unit]
def subscribe: Resource[F, fs2.Stream[F, A]]

object InMemoryBus:

private class InMemoryBus[F[_], A](
private class InMemoryBus[F[_]: Monad: Logger, A: Show](
topic: Topic[F, A],
queue: Queue[F, A],
val run: fs2.Stream[F, Unit]
) extends Bus[F, A]:
def publish1(message: A): F[Unit] = queue.offer(message)
def publish1(message: A): F[Unit] =
Logger[F].debug(
Console.GREEN +
show"InMemoryBus: Publishing $message" +
Console.RESET
) *> queue.offer(message)

def publish(messages: fs2.Stream[F, A]): fs2.Stream[F, Unit] = messages.evalMap(publish1)
def publish(messages: fs2.Stream[F, A]): fs2.Stream[F, Unit] =
messages.evalMap(publish1)

val subscribe: Resource[F, fs2.Stream[F, A]] = topic.subscribeAwait(128)

def apply[F[_]: Concurrent, A]: F[Bus[F, A]] =
def apply[F[_]: Logger: Concurrent, A: Show]: F[Bus[F, A]] =
for
topic <- Topic[F, A]
queue <- Queue.bounded[F, A](128)
Expand All @@ -35,4 +41,4 @@ object InMemoryBus:
type MessageBus[F[_]] = Bus[F, Message]

object MessageBus:
def inMemory[F[_]: Concurrent]: F[MessageBus[F]] = InMemoryBus[F, Message]
def inMemory[F[_]: Concurrent: Logger]: F[MessageBus[F]] = InMemoryBus[F, Message]
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@ package bastoni.domain.logic

import bastoni.domain.repos.*
import cats.effect.{Async, Resource}
import org.typelevel.log4cats.Logger

object Services:

def apply[F[_]: Async](
def apply[F[_]: Async: Logger](
instances: Int,
messageQueue: MessageQueue[F],
messageBus: MessageBus[F],
Expand All @@ -30,7 +31,7 @@ object Services:
.concurrently(runners)
(gameController, servicesRunner)

def inMemory[F[_]: Async]: Resource[F, (GameController[F], fs2.Stream[F, Unit])] =
def inMemory[F[_]: Async: Logger]: Resource[F, (GameController[F], fs2.Stream[F, Unit])] =
for
messageBus <- Resource.eval(MessageBus.inMemory)
messageQueue <- MessageQueue.inMemory(messageBus)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package bastoni.domain.logic

import bastoni.domain.model.{Command, Delay, Delayed, PotentiallyDelayed, ServerEvent}
import bastoni.domain.model.*
import cats.Show
import io.circe.{Decoder, DecodingFailure, Encoder}
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax.*

type StateMachineInput = ServerEvent | Command
Expand All @@ -26,6 +26,11 @@ object StateMachineInput:
}
}

given Show[StateMachineInput] = Show {
case serverEvent: ServerEvent => Show[ServerEvent].show(serverEvent)
case command: Command => Show[Command].show(command)
}

object StateMachineOutput:
given encoder: Encoder[StateMachineOutput] = Encoder.instance {
case e: ServerEvent => Encoder[ServerEvent].apply(e).mapObject(_.add("supertype", "Event".asJson))
Expand All @@ -49,3 +54,11 @@ object StateMachineOutput:
case supertype => Left(DecodingFailure(s"Not an event nor a command: $supertype", typeCursor.history))
}
}

given Show[StateMachineOutput] = Show {
case serverEvent: ServerEvent => Show[ServerEvent].show(serverEvent)
case command: Command => Show[Command].show(command)
case delayedCommand: Delayed[Command] => Show[Delayed[Command]].show(delayedCommand)
}

end StateMachineOutput
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package bastoni.domain.model

import bastoni.domain.model.Command.PlayCard
import cats.Show
import io.circe.{Decoder, Encoder}
import io.circe.derivation.{ConfiguredDecoder, ConfiguredEncoder}

Expand All @@ -22,3 +23,5 @@ object Command:

given Encoder[Command] = ConfiguredEncoder.derive(discriminator = Some("type"))
given Decoder[Command] = ConfiguredDecoder.derive(discriminator = Some("type"))

given Show[Command] = Show(Encoder[Command].apply(_).noSpaces)
Loading

0 comments on commit 5b65bc7

Please sign in to comment.