Skip to content

Commit

Permalink
fix(sut): correlate requests and responses with key
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 28, 2022
1 parent fbb323a commit 2e7c365
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 54 deletions.
53 changes: 30 additions & 23 deletions src/sut/dumblog/src/Dumblog/Common/HttpClient.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
module Dumblog.Common.HttpClient where

import Control.Exception (try)
import qualified Data.ByteString.Char8 as BSChar8
import qualified Data.ByteString.Char8 as BS8
import Data.ByteString.Lazy.Char8 (ByteString)
import qualified Data.ByteString.Lazy.Char8 as LBSChar8
import qualified Data.ByteString.Lazy.Char8 as LBS8
import Network.HTTP.Client
( HttpException(HttpExceptionRequest, InvalidUrlException)
, Manager
Expand All @@ -19,21 +19,24 @@ import Network.HTTP.Client
, parseRequest
, path
, requestBody
, requestHeaders
, responseBody
, responseTimeoutMicro
)
import Network.Wai.Handler.Warp (Port)
import Text.Read (readMaybe)

import Dumblog.Journal.Types (SeqNum, unSeqNum)

------------------------------------------------------------------------

data HttpClient = HttpClient
{ hcManager :: Manager -- | NOTE: If possible, you should share a single
-- `Manager` between multiple threads and requests.
, hcWriteReq :: ByteString -> Request
, hcReadReq :: Int -> Request
, hcBackupReq :: Int -> ByteString -> Request
, hcAckReq :: Int -> Request
, hcBackupReq :: Int -> ByteString -> SeqNum -> Request
, hcAckReq :: Int -> SeqNum -> Request
-- , hcErrors :: AtomicCounter
}

Expand All @@ -50,19 +53,23 @@ newHttpClient host port = do

readReq :: Int -> Request
readReq ix = initReq { method = "GET"
, path = path initReq <> BSChar8.pack (show ix)
, path = path initReq <> BS8.pack (show ix)
}

backupReq :: Int -> ByteString -> Request
backupReq ix bs = initReq { method = "PUT"
, path = path initReq <> BSChar8.pack (show ix)
, requestBody = RequestBodyLBS bs
}
backupReq :: Int -> ByteString -> SeqNum -> Request
backupReq ix bs sn = initReq
{ method = "PUT"
, path = path initReq <> BS8.pack (show ix)
, requestBody = RequestBodyLBS bs
, requestHeaders = requestHeaders initReq ++ [("key", BS8.pack (show (unSeqNum sn)))]
}

ackReq :: Int -> Request
ackReq ix = initReq { method = "PUT"
, path = path initReq <> BSChar8.pack (show ix)
}
ackReq :: Int -> SeqNum -> Request
ackReq ix sn = initReq
{ method = "PUT"
, path = path initReq <> BS8.pack (show ix)
, requestHeaders = requestHeaders initReq ++ [("key", BS8.pack (show (unSeqNum sn)))]
}

return (HttpClient mgr writeReq readReq backupReq ackReq)

Expand All @@ -75,7 +82,7 @@ writeHttp hc bs = do
putStrLn ("writeHttp, exception context: " ++ show exceptCtx)
return Nothing
Left InvalidUrlException {} -> error "writeHttp, impossible: invalid url"
Right resp -> return (readMaybe (LBSChar8.unpack (responseBody resp)))
Right resp -> return (readMaybe (LBS8.unpack (responseBody resp)))

readHttp :: HttpClient -> Int -> IO (Maybe ByteString)
readHttp hc ix = do
Expand All @@ -87,20 +94,20 @@ readHttp hc ix = do
Left InvalidUrlException {} -> error "readHttp, impossible: invalid url"
Right resp -> return (Just (responseBody resp))

backupHttp :: HttpClient -> Int -> ByteString -> IO ()
backupHttp hc ix bs = do
eResp <- try (httpLbs (hcBackupReq hc ix bs) (hcManager hc))
backupHttp :: HttpClient -> Int -> ByteString -> SeqNum -> IO ()
backupHttp hc ix bs sn = do
eResp <- try (httpLbs (hcBackupReq hc ix bs sn) (hcManager hc))
case eResp of
Left (HttpExceptionRequest _req exceptCtx) -> do
putStrLn ("readHttp, exception context: " ++ show exceptCtx)
putStrLn ("backupHttp, exception context: " ++ show exceptCtx)
Left InvalidUrlException {} -> error "backupHttp, impossible: invalid url"
Right _resp -> return ()

ackHttp :: HttpClient -> Int -> IO ()
ackHttp hc ix = do
eResp <- try (httpLbs (hcAckReq hc ix) (hcManager hc))
ackHttp :: HttpClient -> Int -> SeqNum -> IO ()
ackHttp hc ix sn = do
eResp <- try (httpLbs (hcAckReq hc ix sn) (hcManager hc))
case eResp of
Left (HttpExceptionRequest _req exceptCtx) -> do
putStrLn ("readHttp, exception context: " ++ show exceptCtx)
putStrLn ("ackHttp, exception context: " ++ show exceptCtx)
Left InvalidUrlException {} -> error "ackHttp, impossible: invalid url"
Right _resp -> return ()
20 changes: 15 additions & 5 deletions src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ module Dumblog.Journal.FrontEnd where

import Control.Concurrent.MVar (MVar, putMVar)
import Data.Binary (encode)
import qualified Data.ByteString.Char8 as BS8
import Data.ByteString.Lazy (ByteString)
import qualified Data.ByteString.Lazy.Char8 as LBS8
import Data.Int (Int64)
Expand Down Expand Up @@ -50,9 +51,12 @@ httpFrontend journal metrics (FrontEndInfo blocker cVersion) req respond = do
respond (Wai.responseLBS status400 [] err)
Right ix -> do
reqBody <- Wai.consumeRequestBodyStrict req
if not (LBS8.null reqBody)
then appendInputNoWaitForResp (InternalMessageIn (Backup ix reqBody))
else appendInputNoWaitForResp (InternalMessageIn (Ack ix))
case parseSeqNum of
Nothing -> respond (Wai.responseLBS status400 [] "Missing sequence number")
Just sn ->
if not (LBS8.null reqBody)
then appendInputNoWaitForResp (InternalMessageIn (Backup ix reqBody sn))
else appendInputNoWaitForResp (InternalMessageIn (Ack ix sn))
"CONNECT" -> do
case parseIndex of
Left err -> do
Expand All @@ -72,11 +76,17 @@ httpFrontend journal metrics (FrontEndInfo blocker cVersion) req respond = do
_otherwise -> Left (LBS8.pack "parseIndex: GET /:ix, :ix isn't an integer")
_otherwise -> Left (LBS8.pack "parseIndex: GET /:ix, :ix missing")

appendInputWaitForResp :: Input -> IO Wai.ResponseReceived
parseSeqNum :: Maybe SeqNum
parseSeqNum = do
key <- lookup "key" (Wai.requestHeaders req)
fmap (SeqNum . fst) (BS8.readInt key)

appendInputWaitForResp :: (SeqNum -> Input) -> IO Wai.ResponseReceived
appendInputWaitForResp input = do
key <- newKey blocker
!arrivalTime <- getCurrentNanosSinceEpoch
let bs = encode (Envelope (sequenceNumber key) input cVersion arrivalTime)
let bs = encode (Envelope (sequenceNumber key) (input (SeqNum (sequenceNumber key)))
cVersion arrivalTime)
success = do
incrCounter metrics QueueDepth 1
blockRespond key
Expand Down
4 changes: 2 additions & 2 deletions src/sut/dumblog/src/Dumblog/Journal/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ replayDebug originCommands originState = do
logLines <- DLogger.flushQueue logger
let
ev = case cmd of
ClientRequest (Read {}) -> "read"
ClientRequest (Write {})-> "write"
ClientRequest (Read {}) _ -> "read"
ClientRequest (Write {}) _ -> "write"
msg = show cmd
ce = DebEvent
{ from = "client"
Expand Down
20 changes: 10 additions & 10 deletions src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs
Original file line number Diff line number Diff line change
Expand Up @@ -35,33 +35,33 @@ initState = InMemoryDumblog empty 0 Nothing
runCommand :: Bool -> Logger -> InMemoryDumblog -> Input -> IO (InMemoryDumblog, Output)
runCommand hasBug logger state@(InMemoryDumblog appLog ix mPeerPort) input =
case input of
ClientRequest req -> case req of
ClientRequest req sn -> case req of
Read i
| hasBug && ix == 3 -> do
logger "Weird reset happend"
pure (initState, ClientResponse (Error (LBS8.pack "Dumblog!")))
| i < ix -> pure (state, ClientResponse (OK (index appLog i)))
pure (initState, ClientResponse (Error (LBS8.pack "Dumblog!")) sn)
| i < ix -> pure (state, ClientResponse (OK (index appLog i)) sn)
| otherwise -> do
logger $ "Oh no, request not in log"
logger $ ("Max index is " ++ show (ix - 1))
pure (state, ClientResponse NotFound)
pure (state, ClientResponse NotFound sn)
Write bs
| isJust mPeerPort -> do
logger "Forwarding write to backup"
pure (InMemoryDumblog (appLog |> bs) (ix+1) mPeerPort,
InternalMessageOut (Backup (ix + 1) bs))
InternalMessageOut (Backup (ix + 1) bs sn))
| otherwise -> do
logger "Performing a write"
pure (InMemoryDumblog (appLog |> bs) (ix+1) mPeerPort,
ClientResponse (OK (LBS8.pack (show (ix + 1)))))
ClientResponse (OK (LBS8.pack (show (ix + 1)))) sn)

InternalMessageIn msg -> case msg of
Backup ix' bs -> do
Backup ix' bs sn -> do
logger "Performing a backup"
pure (InMemoryDumblog (appLog |> bs) ix' mPeerPort, InternalMessageOut (Ack ix'))
Ack ix' -> do
pure (InMemoryDumblog (appLog |> bs) ix' mPeerPort, InternalMessageOut (Ack ix' sn))
Ack ix' sn -> do
logger "Acknowledging a backup"
pure (state, ClientResponse (OK (LBS8.pack (show ix'))))
pure (state, ClientResponse (OK (LBS8.pack (show ix'))) sn)

AdminCommand (Connect port) -> do
logger ("Adding peer on port: " ++ show port)
Expand Down
12 changes: 8 additions & 4 deletions src/sut/dumblog/src/Dumblog/Journal/Types.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE DeriveAnyClass #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DerivingStrategies #-}

Expand All @@ -10,8 +11,11 @@ import GHC.Generics (Generic)

------------------------------------------------------------------------

newtype SeqNum = SeqNum { unSeqNum :: Int }
deriving newtype (Show, Binary)

data Input
= ClientRequest ClientRequest
= ClientRequest ClientRequest SeqNum
| InternalMessageIn InternalMessage
| AdminCommand AdminCommand
deriving stock (Generic, Show)
Expand All @@ -29,7 +33,7 @@ data AdminCommand
deriving anyclass Binary

data Output
= ClientResponse ClientResponse
= ClientResponse ClientResponse SeqNum
| InternalMessageOut InternalMessage
| AdminResponse
deriving stock Show
Expand All @@ -41,7 +45,7 @@ data ClientResponse
deriving stock Show

data InternalMessage
= Backup Int ByteString
| Ack Int
= Backup Int ByteString SeqNum
| Ack Int SeqNum
deriving stock (Generic, Show)
deriving anyclass Binary
20 changes: 10 additions & 10 deletions src/sut/dumblog/src/Dumblog/Journal/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -74,35 +74,35 @@ worker journal metrics wi = go (wiEvents wi)
!startTime <- getCurrentNanosSinceEpoch
(s', output) <- runCommand version (wiLogger wi) s input
case output of
ClientResponse resp -> do
ClientResponse resp (SeqNum key) -> do
wakeUpFrontend (wiBlockers wi) key resp
!endTime <- getCurrentNanosSinceEpoch
-- Convert from nano s to µs with `* 10^-3`.
let latency = realToFrac ((startTime - arrivalTime)) * 0.001 -- µs.
serviceTime = realToFrac ((endTime - startTime)) * 0.001
Metrics.measure metrics Latency latency
Metrics.measure metrics (case input of
ClientRequest (Write {}) -> ServiceTimeWrites
ClientRequest (Read {}) -> ServiceTimeReads
_otherwise -> error "impossible") serviceTime
case input of
ClientRequest (Write {}) _ -> Metrics.measure metrics ServiceTimeWrites serviceTime
ClientRequest (Read {}) _ -> Metrics.measure metrics ServiceTimeReads serviceTime
_otherwise -> return ()
Metrics.measure metrics ResponseTime (latency + serviceTime)
case input of
ClientRequest (Write bs) -> Metrics.measure metrics WriteSize (realToFrac (LBS.length bs))
ClientRequest (Write bs) _ -> Metrics.measure metrics WriteSize (realToFrac (LBS.length bs))
_otherwise -> return ()

return s'
InternalMessageOut msg ->
case msg of
Ack ix -> do
Ack ix sn -> do
let port = fromMaybe (error "No peer port") (peerPort s')
-- XXX: avoid creating a new http client every time...
hc <- newHttpClient "localhost" port
ackHttp hc ix
ackHttp hc ix sn
return s'
Backup ix bs -> do
Backup ix bs sn -> do
let port = fromMaybe (error "No peer port") (peerPort s')
hc <- newHttpClient "localhost" port
backupHttp hc ix bs
backupHttp hc ix bs sn
return s'

AdminResponse -> return s'

0 comments on commit 2e7c365

Please sign in to comment.