Skip to content

Commit

Permalink
feat(sut): Add a Logger to dumblog
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-daniel-gustafsson committed Mar 14, 2022
1 parent 710c304 commit 9084a00
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 17 deletions.
2 changes: 1 addition & 1 deletion src/sut/dumblog/bench/journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ main :: IO ()
main = do
removePathForcibly dUMBLOG_JOURNAL
removePathForcibly dUMBLOG_SNAPSHOT
commonMain "Journal" (journalDumblog Run bUFFER_CAPACITY pORT . Just)
commonMain "Journal" (journalDumblog quietRun bUFFER_CAPACITY pORT . Just)
1 change: 1 addition & 0 deletions src/sut/dumblog/dumblog.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ library
Dumblog.Journal.Blocker
Dumblog.Journal.Codec
Dumblog.Journal.FrontEnd
Dumblog.Journal.Logger
Dumblog.Journal.Main
Dumblog.Journal.Metrics
Dumblog.Journal.Snapshot
Expand Down
26 changes: 26 additions & 0 deletions src/sut/dumblog/src/Dumblog/Journal/Logger.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module Dumblog.Journal.Logger where

import Data.Foldable
import Data.Int
import Data.IORef
import Data.Sequence

type Logger = LogItem -> IO ()

nullLogger :: Logger
nullLogger = const (pure ())

ioLogger :: Logger
ioLogger = putStrLn

type LogItem = String
newtype QueueLogger = QueueLogger (IORef (Seq LogItem))

newQueueLogger :: IO QueueLogger
newQueueLogger = QueueLogger <$> newIORef empty

queueLogger :: QueueLogger -> Logger
queueLogger (QueueLogger ref) = \logItem -> atomicModifyIORef' ref $ \xs -> (xs |> logItem, ())

flushQueue :: QueueLogger -> IO [LogItem]
flushQueue (QueueLogger ref) = atomicModifyIORef' ref $ \xs -> (empty, toList xs)
28 changes: 19 additions & 9 deletions src/sut/dumblog/src/Dumblog/Journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import Options.Generic
import Dumblog.Journal.Blocker (emptyBlocker)
import Dumblog.Journal.Codec (Envelope(..), decode)
import Dumblog.Journal.FrontEnd (FrontEndInfo(..), runFrontEnd)
import qualified Dumblog.Journal.Logger as DLogger
import Dumblog.Journal.Metrics (dumblogSchema)
import Dumblog.Journal.Snapshot (Snapshot)
import qualified Dumblog.Journal.Snapshot as Snapshot
Expand Down Expand Up @@ -69,21 +70,24 @@ replay [] s = do
pure s
replay (cmd:cmds) s = do
putStrLn $ "[REPLAY] running: " <> show cmd
(s', _) <- runCommand s cmd
(s', _) <- runCommand DLogger.ioLogger s cmd
replay cmds s'

type DebugFile = Vector InstanceStateRepr

-- TODO: merge with `replay`
replayDebug :: [Command] -> InMemoryDumblog -> IO DebugFile
replayDebug = go 0 mempty
replayDebug originCommands originState = do
queueLogger <- DLogger.newQueueLogger
go queueLogger 0 mempty originCommands originState
where
go _logTime dfile [] _s = do
go _ _logTime dfile [] _s = do
putStrLn "[REPLAY-DEBUG] finished!"
pure dfile
go logTime dfile (cmd:cmds) s = do
go logger logTime dfile (cmd:cmds) s = do
putStrLn $ "[REPLAY-DEBUG] running: " <> show cmd
(s', _) <- runCommand s cmd
(s', _) <- runCommand (DLogger.queueLogger logger) s cmd
logLines <- DLogger.flushQueue logger
let
(ev, msg) = case cmd of
Read i -> ("read", show i)
Expand All @@ -98,10 +102,10 @@ replayDebug = go 0 mempty
is = InstanceStateRepr
{ state = LText.unpack (LEncoding.decodeUtf8 (Aeson.encode (mergePatch (Aeson.toJSON s) (Aeson.toJSON s'))))
, currentEvent = ce
, logs = []
, logs = logLines
, sent = []
}
go (succ logTime) (Vector.snoc dfile is) cmds s'
go logger (succ logTime) (Vector.snoc dfile is) cmds s'

collectAll :: Journal -> IO [Command]
collectAll jour = do
Expand All @@ -123,13 +127,17 @@ startingState (Just snap) = Snapshot.ssState snap

data DumblogConfig
= Run
{ quiet :: Bool <?> "Should we suppress program log messages"}
| DebugFile
{ output :: FilePath <?> "Where to output the debug file"
}
deriving (Generic, Show)

instance ParseRecord DumblogConfig

quietRun :: DumblogConfig
quietRun = Run (Helpful True)

{-
Unclear how to:
* How to archive the journal
Expand Down Expand Up @@ -158,7 +166,7 @@ journalDumblog cfg _capacity port mReady = do
fps = dUMBLOG_SNAPSHOT
untilSnapshot = 1000
case cfg of
Run -> do
Run q -> do
mSnapshot <- Snapshot.readFile fps
journal <- fetchJournal mSnapshot fpj dumblogOptions
metrics <- Metrics.newMetrics dumblogSchema fpm
Expand All @@ -168,7 +176,9 @@ journalDumblog cfg _capacity port mReady = do
let
events = length cmds
feInfo = FrontEndInfo blocker
wInfo = WorkerInfo blocker fps events untilSnapshot
logger | unHelpful q = DLogger.nullLogger
| otherwise = DLogger.ioLogger
wInfo = WorkerInfo blocker logger fps events untilSnapshot
withAsync (worker journal metrics wInfo workerState) $ \a -> do
link a
runFrontEnd port journal metrics feInfo mReady
Expand Down
14 changes: 10 additions & 4 deletions src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Data.Text.Encoding (decodeUtf8)
import Data.Sequence
import GHC.Generics (Generic)

import Dumblog.Journal.Logger
import Dumblog.Journal.Types
import Dumblog.Journal.Metrics
import Journal.Internal.Metrics (incrCounter)
Expand All @@ -40,10 +41,15 @@ instance ToJSON InMemoryDumblog
initState :: InMemoryDumblog
initState = InMemoryDumblog empty 0

runCommand :: InMemoryDumblog -> Command -> IO (InMemoryDumblog, Response)
runCommand state@(InMemoryDumblog appLog ix) cmd = case cmd of
Write bs -> pure (InMemoryDumblog (appLog |> DumblogByteString bs) (ix+1), LBS8.pack (show ix))
runCommand :: Logger -> InMemoryDumblog -> Command -> IO (InMemoryDumblog, Response)
runCommand logger state@(InMemoryDumblog appLog ix) cmd = case cmd of
Write bs -> do
logger "Performing a write"
pure (InMemoryDumblog (appLog |> DumblogByteString bs) (ix+1), LBS8.pack (show ix))
Read i
| i < ix -> pure (state, LBS.fromStrict $ innerByteString (index appLog i))
| otherwise -> pure (state, "Transaction not in the store!")
| otherwise -> do
logger $ "Oh no, request not in log"
logger $ ("Max index is " ++ show (ix - 1))
pure (state, "Transaction not in the store!")
-- ^ XXX: we probably should really signal failure
8 changes: 5 additions & 3 deletions src/sut/dumblog/src/Dumblog/Journal/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import Journal.Types

import Dumblog.Journal.Blocker
import Dumblog.Journal.Codec
import Dumblog.Journal.Logger
import Dumblog.Journal.Metrics
import qualified Dumblog.Journal.Snapshot as Snapshot
import Dumblog.Journal.StateMachine
Expand All @@ -27,6 +28,7 @@ import Dumblog.Journal.Types

data WorkerInfo = WorkerInfo
{ wiBlockers :: Blocker (Either Response Response)
, wiLogger :: Logger
, wiSnapshotFile :: FilePath
, wiEvents :: Int -- how many events since last snapshot
, wiEventsInRound :: Int -- how many events in one snapshot
Expand All @@ -40,12 +42,12 @@ wakeUpFrontend blocker key resp = do
error $ "Frontend never added MVar"

worker :: Journal -> DumblogMetrics -> WorkerInfo -> InMemoryDumblog -> IO ()
worker journal metrics (WorkerInfo blocker snapshotFile eventCount untilSnapshot) =
worker journal metrics (WorkerInfo blocker logger snapshotFile eventCount untilSnapshot) =
go eventCount
where
go ev s
| ev >= untilSnapshot = do
putStrLn $ "[worker] Performing Snapshot"
logger "[worker] Performing Snapshot"
bytes <- readBytesConsumed (jMetadata journal) Sub1
Snapshot.toFile (Snapshot.Snapshot bytes s) snapshotFile
writeBytesConsumed (jMetadata journal) Sub2 bytes
Expand All @@ -63,7 +65,7 @@ worker journal metrics (WorkerInfo blocker snapshotFile eventCount untilSnapshot
-- -- ^ should be better error message
--
!startTime <- getCurrentNanosSinceEpoch
(s', r) <- runCommand s cmd
(s', r) <- runCommand logger s cmd
wakeUpFrontend blocker key (Right r)
!endTime <- getCurrentNanosSinceEpoch
-- Convert from nano s to µs with `* 10^-3`.
Expand Down

0 comments on commit 9084a00

Please sign in to comment.