From 2e7c365eb0b5bc1321bb2ab6d784f17437d17370 Mon Sep 17 00:00:00 2001 From: Stevan Andjelkovic Date: Mon, 28 Mar 2022 12:42:15 +0200 Subject: [PATCH] fix(sut): correlate requests and responses with key --- .../dumblog/src/Dumblog/Common/HttpClient.hs | 53 +++++++++++-------- .../dumblog/src/Dumblog/Journal/FrontEnd.hs | 20 +++++-- src/sut/dumblog/src/Dumblog/Journal/Main.hs | 4 +- .../src/Dumblog/Journal/StateMachine.hs | 20 +++---- src/sut/dumblog/src/Dumblog/Journal/Types.hs | 12 +++-- src/sut/dumblog/src/Dumblog/Journal/Worker.hs | 20 +++---- 6 files changed, 75 insertions(+), 54 deletions(-) diff --git a/src/sut/dumblog/src/Dumblog/Common/HttpClient.hs b/src/sut/dumblog/src/Dumblog/Common/HttpClient.hs index 49628482..3585fbfd 100644 --- a/src/sut/dumblog/src/Dumblog/Common/HttpClient.hs +++ b/src/sut/dumblog/src/Dumblog/Common/HttpClient.hs @@ -3,9 +3,9 @@ module Dumblog.Common.HttpClient where import Control.Exception (try) -import qualified Data.ByteString.Char8 as BSChar8 +import qualified Data.ByteString.Char8 as BS8 import Data.ByteString.Lazy.Char8 (ByteString) -import qualified Data.ByteString.Lazy.Char8 as LBSChar8 +import qualified Data.ByteString.Lazy.Char8 as LBS8 import Network.HTTP.Client ( HttpException(HttpExceptionRequest, InvalidUrlException) , Manager @@ -19,12 +19,15 @@ import Network.HTTP.Client , parseRequest , path , requestBody + , requestHeaders , responseBody , responseTimeoutMicro ) import Network.Wai.Handler.Warp (Port) import Text.Read (readMaybe) +import Dumblog.Journal.Types (SeqNum, unSeqNum) + ------------------------------------------------------------------------ data HttpClient = HttpClient @@ -32,8 +35,8 @@ data HttpClient = HttpClient -- `Manager` between multiple threads and requests. , hcWriteReq :: ByteString -> Request , hcReadReq :: Int -> Request - , hcBackupReq :: Int -> ByteString -> Request - , hcAckReq :: Int -> Request + , hcBackupReq :: Int -> ByteString -> SeqNum -> Request + , hcAckReq :: Int -> SeqNum -> Request -- , hcErrors :: AtomicCounter } @@ -50,19 +53,23 @@ newHttpClient host port = do readReq :: Int -> Request readReq ix = initReq { method = "GET" - , path = path initReq <> BSChar8.pack (show ix) + , path = path initReq <> BS8.pack (show ix) } - backupReq :: Int -> ByteString -> Request - backupReq ix bs = initReq { method = "PUT" - , path = path initReq <> BSChar8.pack (show ix) - , requestBody = RequestBodyLBS bs - } + backupReq :: Int -> ByteString -> SeqNum -> Request + backupReq ix bs sn = initReq + { method = "PUT" + , path = path initReq <> BS8.pack (show ix) + , requestBody = RequestBodyLBS bs + , requestHeaders = requestHeaders initReq ++ [("key", BS8.pack (show (unSeqNum sn)))] + } - ackReq :: Int -> Request - ackReq ix = initReq { method = "PUT" - , path = path initReq <> BSChar8.pack (show ix) - } + ackReq :: Int -> SeqNum -> Request + ackReq ix sn = initReq + { method = "PUT" + , path = path initReq <> BS8.pack (show ix) + , requestHeaders = requestHeaders initReq ++ [("key", BS8.pack (show (unSeqNum sn)))] + } return (HttpClient mgr writeReq readReq backupReq ackReq) @@ -75,7 +82,7 @@ writeHttp hc bs = do putStrLn ("writeHttp, exception context: " ++ show exceptCtx) return Nothing Left InvalidUrlException {} -> error "writeHttp, impossible: invalid url" - Right resp -> return (readMaybe (LBSChar8.unpack (responseBody resp))) + Right resp -> return (readMaybe (LBS8.unpack (responseBody resp))) readHttp :: HttpClient -> Int -> IO (Maybe ByteString) readHttp hc ix = do @@ -87,20 +94,20 @@ readHttp hc ix = do Left InvalidUrlException {} -> error "readHttp, impossible: invalid url" Right resp -> return (Just (responseBody resp)) -backupHttp :: HttpClient -> Int -> ByteString -> IO () -backupHttp hc ix bs = do - eResp <- try (httpLbs (hcBackupReq hc ix bs) (hcManager hc)) +backupHttp :: HttpClient -> Int -> ByteString -> SeqNum -> IO () +backupHttp hc ix bs sn = do + eResp <- try (httpLbs (hcBackupReq hc ix bs sn) (hcManager hc)) case eResp of Left (HttpExceptionRequest _req exceptCtx) -> do - putStrLn ("readHttp, exception context: " ++ show exceptCtx) + putStrLn ("backupHttp, exception context: " ++ show exceptCtx) Left InvalidUrlException {} -> error "backupHttp, impossible: invalid url" Right _resp -> return () -ackHttp :: HttpClient -> Int -> IO () -ackHttp hc ix = do - eResp <- try (httpLbs (hcAckReq hc ix) (hcManager hc)) +ackHttp :: HttpClient -> Int -> SeqNum -> IO () +ackHttp hc ix sn = do + eResp <- try (httpLbs (hcAckReq hc ix sn) (hcManager hc)) case eResp of Left (HttpExceptionRequest _req exceptCtx) -> do - putStrLn ("readHttp, exception context: " ++ show exceptCtx) + putStrLn ("ackHttp, exception context: " ++ show exceptCtx) Left InvalidUrlException {} -> error "ackHttp, impossible: invalid url" Right _resp -> return () diff --git a/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs b/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs index 618f3bcb..f8f97952 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/FrontEnd.hs @@ -6,6 +6,7 @@ module Dumblog.Journal.FrontEnd where import Control.Concurrent.MVar (MVar, putMVar) import Data.Binary (encode) +import qualified Data.ByteString.Char8 as BS8 import Data.ByteString.Lazy (ByteString) import qualified Data.ByteString.Lazy.Char8 as LBS8 import Data.Int (Int64) @@ -50,9 +51,12 @@ httpFrontend journal metrics (FrontEndInfo blocker cVersion) req respond = do respond (Wai.responseLBS status400 [] err) Right ix -> do reqBody <- Wai.consumeRequestBodyStrict req - if not (LBS8.null reqBody) - then appendInputNoWaitForResp (InternalMessageIn (Backup ix reqBody)) - else appendInputNoWaitForResp (InternalMessageIn (Ack ix)) + case parseSeqNum of + Nothing -> respond (Wai.responseLBS status400 [] "Missing sequence number") + Just sn -> + if not (LBS8.null reqBody) + then appendInputNoWaitForResp (InternalMessageIn (Backup ix reqBody sn)) + else appendInputNoWaitForResp (InternalMessageIn (Ack ix sn)) "CONNECT" -> do case parseIndex of Left err -> do @@ -72,11 +76,17 @@ httpFrontend journal metrics (FrontEndInfo blocker cVersion) req respond = do _otherwise -> Left (LBS8.pack "parseIndex: GET /:ix, :ix isn't an integer") _otherwise -> Left (LBS8.pack "parseIndex: GET /:ix, :ix missing") - appendInputWaitForResp :: Input -> IO Wai.ResponseReceived + parseSeqNum :: Maybe SeqNum + parseSeqNum = do + key <- lookup "key" (Wai.requestHeaders req) + fmap (SeqNum . fst) (BS8.readInt key) + + appendInputWaitForResp :: (SeqNum -> Input) -> IO Wai.ResponseReceived appendInputWaitForResp input = do key <- newKey blocker !arrivalTime <- getCurrentNanosSinceEpoch - let bs = encode (Envelope (sequenceNumber key) input cVersion arrivalTime) + let bs = encode (Envelope (sequenceNumber key) (input (SeqNum (sequenceNumber key))) + cVersion arrivalTime) success = do incrCounter metrics QueueDepth 1 blockRespond key diff --git a/src/sut/dumblog/src/Dumblog/Journal/Main.hs b/src/sut/dumblog/src/Dumblog/Journal/Main.hs index e3c116f6..a75736c0 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/Main.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/Main.hs @@ -102,8 +102,8 @@ replayDebug originCommands originState = do logLines <- DLogger.flushQueue logger let ev = case cmd of - ClientRequest (Read {}) -> "read" - ClientRequest (Write {})-> "write" + ClientRequest (Read {}) _ -> "read" + ClientRequest (Write {}) _ -> "write" msg = show cmd ce = DebEvent { from = "client" diff --git a/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs b/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs index 09fadc0a..7e4c49a1 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/StateMachine.hs @@ -35,33 +35,33 @@ initState = InMemoryDumblog empty 0 Nothing runCommand :: Bool -> Logger -> InMemoryDumblog -> Input -> IO (InMemoryDumblog, Output) runCommand hasBug logger state@(InMemoryDumblog appLog ix mPeerPort) input = case input of - ClientRequest req -> case req of + ClientRequest req sn -> case req of Read i | hasBug && ix == 3 -> do logger "Weird reset happend" - pure (initState, ClientResponse (Error (LBS8.pack "Dumblog!"))) - | i < ix -> pure (state, ClientResponse (OK (index appLog i))) + pure (initState, ClientResponse (Error (LBS8.pack "Dumblog!")) sn) + | i < ix -> pure (state, ClientResponse (OK (index appLog i)) sn) | otherwise -> do logger $ "Oh no, request not in log" logger $ ("Max index is " ++ show (ix - 1)) - pure (state, ClientResponse NotFound) + pure (state, ClientResponse NotFound sn) Write bs | isJust mPeerPort -> do logger "Forwarding write to backup" pure (InMemoryDumblog (appLog |> bs) (ix+1) mPeerPort, - InternalMessageOut (Backup (ix + 1) bs)) + InternalMessageOut (Backup (ix + 1) bs sn)) | otherwise -> do logger "Performing a write" pure (InMemoryDumblog (appLog |> bs) (ix+1) mPeerPort, - ClientResponse (OK (LBS8.pack (show (ix + 1))))) + ClientResponse (OK (LBS8.pack (show (ix + 1)))) sn) InternalMessageIn msg -> case msg of - Backup ix' bs -> do + Backup ix' bs sn -> do logger "Performing a backup" - pure (InMemoryDumblog (appLog |> bs) ix' mPeerPort, InternalMessageOut (Ack ix')) - Ack ix' -> do + pure (InMemoryDumblog (appLog |> bs) ix' mPeerPort, InternalMessageOut (Ack ix' sn)) + Ack ix' sn -> do logger "Acknowledging a backup" - pure (state, ClientResponse (OK (LBS8.pack (show ix')))) + pure (state, ClientResponse (OK (LBS8.pack (show ix'))) sn) AdminCommand (Connect port) -> do logger ("Adding peer on port: " ++ show port) diff --git a/src/sut/dumblog/src/Dumblog/Journal/Types.hs b/src/sut/dumblog/src/Dumblog/Journal/Types.hs index d8fc41ca..b7279265 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/Types.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/Types.hs @@ -1,4 +1,5 @@ {-# LANGUAGE DeriveAnyClass #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingStrategies #-} @@ -10,8 +11,11 @@ import GHC.Generics (Generic) ------------------------------------------------------------------------ +newtype SeqNum = SeqNum { unSeqNum :: Int } + deriving newtype (Show, Binary) + data Input - = ClientRequest ClientRequest + = ClientRequest ClientRequest SeqNum | InternalMessageIn InternalMessage | AdminCommand AdminCommand deriving stock (Generic, Show) @@ -29,7 +33,7 @@ data AdminCommand deriving anyclass Binary data Output - = ClientResponse ClientResponse + = ClientResponse ClientResponse SeqNum | InternalMessageOut InternalMessage | AdminResponse deriving stock Show @@ -41,7 +45,7 @@ data ClientResponse deriving stock Show data InternalMessage - = Backup Int ByteString - | Ack Int + = Backup Int ByteString SeqNum + | Ack Int SeqNum deriving stock (Generic, Show) deriving anyclass Binary diff --git a/src/sut/dumblog/src/Dumblog/Journal/Worker.hs b/src/sut/dumblog/src/Dumblog/Journal/Worker.hs index bb758978..76a97dd9 100644 --- a/src/sut/dumblog/src/Dumblog/Journal/Worker.hs +++ b/src/sut/dumblog/src/Dumblog/Journal/Worker.hs @@ -74,35 +74,35 @@ worker journal metrics wi = go (wiEvents wi) !startTime <- getCurrentNanosSinceEpoch (s', output) <- runCommand version (wiLogger wi) s input case output of - ClientResponse resp -> do + ClientResponse resp (SeqNum key) -> do wakeUpFrontend (wiBlockers wi) key resp !endTime <- getCurrentNanosSinceEpoch -- Convert from nano s to µs with `* 10^-3`. let latency = realToFrac ((startTime - arrivalTime)) * 0.001 -- µs. serviceTime = realToFrac ((endTime - startTime)) * 0.001 Metrics.measure metrics Latency latency - Metrics.measure metrics (case input of - ClientRequest (Write {}) -> ServiceTimeWrites - ClientRequest (Read {}) -> ServiceTimeReads - _otherwise -> error "impossible") serviceTime + case input of + ClientRequest (Write {}) _ -> Metrics.measure metrics ServiceTimeWrites serviceTime + ClientRequest (Read {}) _ -> Metrics.measure metrics ServiceTimeReads serviceTime + _otherwise -> return () Metrics.measure metrics ResponseTime (latency + serviceTime) case input of - ClientRequest (Write bs) -> Metrics.measure metrics WriteSize (realToFrac (LBS.length bs)) + ClientRequest (Write bs) _ -> Metrics.measure metrics WriteSize (realToFrac (LBS.length bs)) _otherwise -> return () return s' InternalMessageOut msg -> case msg of - Ack ix -> do + Ack ix sn -> do let port = fromMaybe (error "No peer port") (peerPort s') -- XXX: avoid creating a new http client every time... hc <- newHttpClient "localhost" port - ackHttp hc ix + ackHttp hc ix sn return s' - Backup ix bs -> do + Backup ix bs sn -> do let port = fromMaybe (error "No peer port") (peerPort s') hc <- newHttpClient "localhost" port - backupHttp hc ix bs + backupHttp hc ix bs sn return s' AdminResponse -> return s'