Skip to content

Commit

Permalink
feat(sut): display journal metadata (bytes produced and consumed)
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 10, 2022
1 parent 10d2ec0 commit 9fa26e2
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 16 deletions.
15 changes: 13 additions & 2 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ module Journal
( module Journal.Types
, defaultOptions
, allocateJournal
, journalMetadata
, startJournal
, appendBS
-- , tee
Expand All @@ -15,8 +16,8 @@ module Journal
, metricsBytesWritten
) where

import Control.Exception (assert, bracket)
import Control.Monad (unless, when, forM_)
import Control.Exception (IOException, assert, bracket, try)
import Control.Monad (forM_, unless, when)
import Data.Bits (popCount)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
Expand Down Expand Up @@ -101,6 +102,16 @@ allocateJournal fp (Options termBufferLen logger maxSub) = do
unless (int322Int pageSize' == pageSize) $
error "allocateJournal: pageSize doesn't match the metadata"

journalMetadata :: FilePath -> Options -> IO (Either IOException Metadata)
journalMetadata fp opts = do
let logLength = oTermBufferLength opts * pARTITION_COUNT + lOG_META_DATA_LENGTH
ebb <- try (mmapped fp logLength)
case ebb of
Right bb -> do
meta <- wrapPart bb (logLength - lOG_META_DATA_LENGTH) lOG_META_DATA_LENGTH
return (Right (Metadata meta))
Left err -> return (Left err)

startJournal :: FilePath -> Options -> IO Journal
startJournal fp (Options termLength logger _maxSub) = do
logLength <- fromIntegral <$> getFileSize fp
Expand Down
24 changes: 17 additions & 7 deletions src/sut/dumblog/src/Dumblog/Journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,29 @@ instance ParseRecord DumblogConfig
Unclear how to:
* How to archive the journal
-}

dumblogOptions :: Options
dumblogOptions = defaultOptions
{ oLogger = Logger.nullLogger
, oMaxSubscriber = Sub2
}

dUMBLOG_JOURNAL :: FilePath
dUMBLOG_JOURNAL = "/tmp/dumblog.journal"

dUMBLOG_METRICS :: FilePath
dUMBLOG_METRICS = "/tmp/dumblog.metrics"

journalDumblog :: DumblogConfig -> Int -> Int -> Maybe (MVar ()) -> IO ()
journalDumblog cfg _capacity port mReady = do
let fpj = "/tmp/dumblog.journal"
fpm = "/tmp/dumblog.metrics"
let fpj = dUMBLOG_JOURNAL
fpm = dUMBLOG_METRICS
fps = "/tmp/dumblog.snapshot"
opts = defaultOptions { oLogger = Logger.nullLogger
, oMaxSubscriber = Sub2
}
untilSnapshot = 1000
case cfg of
Run -> do
mSnapshot <- Snapshot.readFile fps
journal <- fetchJournal mSnapshot fpj opts
journal <- fetchJournal mSnapshot fpj dumblogOptions
metrics <- Metrics.newMetrics dumblogSchema fpm
blocker <- emptyBlocker
counter <- AtomicCounter.newCounter 0 -- it is okay to start over
Expand All @@ -140,7 +150,7 @@ journalDumblog cfg _capacity port mReady = do
runFrontEnd port journal feInfo mReady
DebugFile fp -> do
mSnapshot <- Snapshot.readFile fps
journal <- fetchJournal mSnapshot fpj opts
journal <- fetchJournal mSnapshot fpj dumblogOptions
cmds <- collectAll journal
debugFileContents <- replayDebug cmds (startingState mSnapshot)
Aeson.encodeFile (unHelpful fp) debugFileContents
Expand Down
48 changes: 41 additions & 7 deletions src/sut/dumblog/src/Dumblog/Metrics/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,36 @@

module Dumblog.Metrics.Main where

import Data.Maybe (fromMaybe)
import Control.Monad (forever)
import Control.Concurrent (threadDelay)
import Text.Printf (printf)
import Control.Exception (IOException)
import Control.Monad (forever)
import Data.Maybe (fromMaybe)
import GHC.IO.Encoding (setLocaleEncoding, utf8)
import Text.Printf (printf)

import Dumblog.Journal.Main
import Dumblog.Journal.Metrics
import Journal (journalMetadata)
import Journal.Internal.Metrics
import Journal.Types

------------------------------------------------------------------------

metricsMain :: IO ()
metricsMain = forever $ do
setLocaleEncoding utf8 -- Otherwise we can't print µ...
metrics <- newMetrics dumblogSchema "/tmp/dumblog.metrics"
metrics <- newMetrics dumblogSchema dUMBLOG_METRICS
eMeta <- journalMetadata dUMBLOG_JOURNAL dumblogOptions
putStrLn ansiClearScreen
displayServiceTime metrics
displayQueueDepth eMeta
threadDelay 1_000_000

ansiClearScreen :: String
ansiClearScreen = "\ESC[2J"

displayServiceTime :: DumblogMetrics -> IO ()
displayServiceTime metrics = do
mMin <- percentile metrics ServiceTime 0
mMed <- percentile metrics ServiceTime 50
m90 <- percentile metrics ServiceTime 90
Expand All @@ -35,7 +49,27 @@ metricsMain = forever $ do
printf " max %10.2f µs\n" (fromMaybe 0 mMax)
cnt <- count metrics ServiceTime
putStrLn (printf " count %10d" cnt)
threadDelay 1_000_000

ansiClearScreen :: String
ansiClearScreen = "\ESC[2J"
displayQueueDepth :: Either IOException Metadata -> IO ()
displayQueueDepth (Left _err) = do
putStrLn "Journal metadata:"
printf " 0 bytes produced\n"
printf " 0 bytes consumed\n"
displayQueueDepth (Right meta) = do
putStrLn "Journal metadata:"
termCount <- activeTermCount meta
let index = indexByTermCount termCount
rt <- readRawTail meta index
initTermId <- readInitialTermId meta
termLen <- readTermLength meta

let termId = rawTailTermId rt
termOffset = rawTailTermOffset rt termLen
termBeginPosition =
computeTermBeginPosition termId (positionBitsToShift termLen) initTermId

produced = termBeginPosition + fromIntegral termOffset
consumed <- readBytesConsumed meta Sub1
printf " %d bytes produced\n" produced
printf " %d bytes consumed\n" consumed
printf " %d bytes difference\n" (produced - fromIntegral consumed)

0 comments on commit 9fa26e2

Please sign in to comment.