Skip to content

Commit

Permalink
feat(sut): more work towards being able to run multiple dumblogs
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 25, 2022
1 parent cbfa699 commit 06827f2
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 19 deletions.
2 changes: 1 addition & 1 deletion src/sut/dumblog/app/journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@ import Dumblog.Journal.Main
main :: IO ()
main = do
(cfg, _help) <- getWithHelp "Dumblog (journal)"
journalDumblog cfg (64 * 1024) 8054 Nothing
journalDumblog cfg (64 * 1024) Nothing
6 changes: 2 additions & 4 deletions src/sut/dumblog/bench/Common.hs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ import Text.Read (readMaybe)

import Dumblog.Common.HttpClient
import Dumblog.Common.Utils (showBytes)
import Dumblog.Journal.Main (dUMBLOG_PORT)

------------------------------------------------------------------------

hOST :: String
hOST = "localhost"

pORT :: Int
pORT = 8054

wRITE_FREQUENCY :: Int
wRITE_FREQUENCY = 20

Expand Down Expand Up @@ -70,7 +68,7 @@ commonSetup msg io = do
a <- async (io ready)
link a
() <- takeMVar ready
hc <- newHttpClient hOST pORT
hc <- newHttpClient hOST dUMBLOG_PORT
return (a, hc)

commonTeardown :: (Async (), HttpClient) -> IO ()
Expand Down
2 changes: 1 addition & 1 deletion src/sut/dumblog/bench/journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ main :: IO ()
main = do
removePathForcibly dUMBLOG_JOURNAL
removePathForcibly dUMBLOG_SNAPSHOT
commonMain "Journal" (journalDumblog quietRun bUFFER_CAPACITY pORT . Just)
commonMain "Journal" (journalDumblog quietRun bUFFER_CAPACITY . Just)
25 changes: 15 additions & 10 deletions src/sut/dumblog/src/Dumblog/Journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ import Control.Concurrent.Async (link, withAsync)
import Control.Concurrent.MVar (MVar)
import Control.Exception (bracket_)
import qualified Data.Aeson as Aeson
import Data.Binary (decode)
import Data.Int (Int64)
import Data.Maybe (fromMaybe)
import Data.TreeDiff (ansiWlPretty, ediff, ppEditExpr)
import Data.Vector (Vector)
import qualified Data.Vector as Vector
Expand Down Expand Up @@ -40,7 +42,6 @@ import Journal.Types
, writeBytesConsumed
)
import Options.Generic
import Data.Binary (decode)
import System.Directory (copyFile, getTemporaryDirectory, removeFile)
import System.FilePath ((<.>), (</>))

Expand All @@ -51,9 +52,8 @@ import Dumblog.Journal.FrontEnd (FrontEndInfo(..), runFrontEnd)
import qualified Dumblog.Journal.Logger as DLogger
import Dumblog.Journal.Snapshot (Snapshot)
import qualified Dumblog.Journal.Snapshot as Snapshot
import Dumblog.Journal.StateMachine
(InMemoryDumblog, initState)
import Dumblog.Journal.Types (Input(..), ClientRequest(..))
import Dumblog.Journal.StateMachine (InMemoryDumblog, initState)
import Dumblog.Journal.Types (ClientRequest(..), Input(..))
import Dumblog.Journal.Versions (dUMBLOG_CURRENT_VERSION, runCommand)
import Dumblog.Journal.Worker (WorkerInfo(..), worker)

Expand Down Expand Up @@ -148,7 +148,9 @@ startingState (Just snap) = Snapshot.ssState snap

data DumblogConfig
= Run
{ quiet :: Bool <?> "Should we suppress program log messages" }
{ quiet :: Bool <?> "Should we suppress program log messages"
, port :: Maybe Int
}
| DebugFile
{ output :: FilePath <?> "Where to output the debug file"
}
Expand All @@ -160,7 +162,7 @@ data DumblogConfig
instance ParseRecord DumblogConfig

quietRun :: DumblogConfig
quietRun = Run (Helpful True)
quietRun = Run (Helpful True) Nothing

{-
Unclear how to:
Expand All @@ -180,14 +182,17 @@ dUMBLOG_JOURNAL = "/tmp/dumblog.journal"
dUMBLOG_SNAPSHOT :: FilePath
dUMBLOG_SNAPSHOT = "/tmp/dumblog.snapshot"

journalDumblog :: DumblogConfig -> Int -> Int -> Maybe (MVar ()) -> IO ()
journalDumblog cfg _capacity port mReady = do
dUMBLOG_PORT :: Int
dUMBLOG_PORT = 8054

journalDumblog :: DumblogConfig -> Int -> Maybe (MVar ()) -> IO ()
journalDumblog cfg _capacity mReady = do
let fpj = dUMBLOG_JOURNAL
fpm = dUMBLOG_METRICS
fps = dUMBLOG_SNAPSHOT
untilSnapshot = 1000
case cfg of
Run q -> do
Run q mPort -> do
mSnapshot <- Snapshot.readFile fps
journal <- fetchJournal mSnapshot fpj dumblogOptions
metrics <- Metrics.newMetrics dumblogSchema fpm
Expand All @@ -202,7 +207,7 @@ journalDumblog cfg _capacity port mReady = do
wInfo = WorkerInfo blocker logger fps dUMBLOG_CURRENT_VERSION events untilSnapshot
withAsync (worker journal metrics wInfo workerState) $ \a -> do
link a
runFrontEnd port journal metrics feInfo mReady
runFrontEnd (fromMaybe dUMBLOG_PORT mPort) journal metrics feInfo mReady
DebugFile fp -> debugFile (unHelpful fp)
DebugFileWatch fp -> do
putStrLn "[journal]: waiting for journal changes..."
Expand Down
5 changes: 4 additions & 1 deletion src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ runCommand hasBug logger state@(InMemoryDumblog appLog ix mPeerPort) input =
logger "Performing a write"
pure (InMemoryDumblog (appLog |> bs) (ix+1) mPeerPort,
ClientResponse (OK (LBS8.pack (show (ix + 1)))))

InternalMessageIn msg -> case msg of
Backup ix' bs -> do
logger "Performing a backup"
Expand All @@ -62,4 +63,6 @@ runCommand hasBug logger state@(InMemoryDumblog appLog ix mPeerPort) input =
logger "Acknowledging a backup"
pure (state, ClientResponse (OK (LBS8.pack (show ix'))))

AdminCommand (Connect port) -> pure (state { peerPort = Just port }, AdminResponse)
AdminCommand (Connect port) -> do
logger ("Adding peer on port: " ++ show port)
pure (state { peerPort = Just port }, AdminResponse)
15 changes: 13 additions & 2 deletions src/sut/dumblog/src/Dumblog/Journal/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import Data.Binary (decode)
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy as LBS
import Data.Int (Int64)
import Data.Maybe (fromMaybe)
import qualified Journal.Internal.Metrics as Metrics
import qualified Journal.MP as Journal
import Journal.Types
Expand All @@ -18,6 +19,7 @@ import Journal.Types
, writeBytesConsumed
)

import Dumblog.Common.HttpClient (ackHttp, backupHttp, newHttpClient)
import Dumblog.Common.Metrics
import Dumblog.Journal.Blocker
import Dumblog.Journal.Codec
Expand Down Expand Up @@ -81,7 +83,8 @@ worker journal metrics wi = go (wiEvents wi)
Metrics.measure metrics Latency latency
Metrics.measure metrics (case input of
ClientRequest (Write {}) -> ServiceTimeWrites
ClientRequest (Read {}) -> ServiceTimeReads) serviceTime
ClientRequest (Read {}) -> ServiceTimeReads
_othrwise -> error "impossible") serviceTime
Metrics.measure metrics ResponseTime (latency + serviceTime)
case input of
ClientRequest (Write bs) -> Metrics.measure metrics WriteSize (realToFrac (LBS.length bs))
Expand All @@ -91,7 +94,15 @@ worker journal metrics wi = go (wiEvents wi)
InternalMessageOut msg ->
case msg of
Ack ix -> do
let port = fromMaybe (error "No peer port") (peerPort s')
-- XXX: avoid creating a new http client every time...
hc <- newHttpClient "localhost" port
ackHttp hc ix
return s'
Backup ix bs -> do
let port = fromMaybe (error "No peer port") (peerPort s')
hc <- newHttpClient "localhost" port
backupHttp hc ix bs
return s'
Backup ix bs -> return s'

AdminResponse -> return s'

0 comments on commit 06827f2

Please sign in to comment.