Skip to content

Commit

Permalink
feat(journal): add batching read
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Mar 18, 2022
1 parent 0effab0 commit 57ae88c
Show file tree
Hide file tree
Showing 2 changed files with 110 additions and 28 deletions.
76 changes: 75 additions & 1 deletion src/journal/src/Journal/MP.hs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module Journal.MP where
import Control.Monad (when)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int (Int64)
import Data.Int (Int32, Int64)
import qualified Data.Vector as Vector
import Network.Socket (Socket, recvBuf)
import Foreign (plusPtr)
Expand Down Expand Up @@ -46,6 +46,80 @@ recvBytesOffset bc sock offset len = withPtr bc $ \ptr ->
recvBytes :: BufferClaim -> Socket -> Int -> IO Int
recvBytes bc sock len = recvBytesOffset bc sock hEADER_LENGTH len

readManyJournalSC :: Journal -> Subscriber -> IO [ByteString]
readManyJournalSC jour sub = do
offset <- readBytesConsumed (jMetadata jour) sub
-- let jLog = logg (jLogger jour)
-- jLog ("readJournal, offset: " ++ show offset)

-- jLog ("readJournal, readIndex: " ++ show (unPartitionIndex readIndex))

termCount <- activeTermCount (jMetadata jour)
let activeTermIndex = indexByTermCount termCount
rawTail <- readRawTail (jMetadata jour) activeTermIndex
let activeTermId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLength

-- jLog ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
-- jLog ("readJournal, initTermId: " ++ show (unTermId initTermId))
let position =
computePosition activeTermId termOffset posBitsToShift initTermId
-- putStrLn ("readJournal, offset: " ++ show offset ++ ", position: " ++ show position)
-- assertM (int2Int64 offset <= position)

-- jLog ("readJournal, readTermCount: " ++ show readTermCount)
go offset position []
where
termLength :: Int32
termLength = jTermLength jour

initTermId :: TermId
initTermId = jInitialTermId jour

posBitsToShift :: Int32
posBitsToShift = jPositionBitsToShift jour

go offset position acc
| int2Int64 offset == position = return (reverse acc)
| otherwise = do
-- assertM (int2Int64 offset < position)

let readIndex = indexByPosition (int2Int64 offset) posBitsToShift
termBuffer = jTermBuffers jour Vector.! unPartitionIndex readIndex
readTermCount =
computeTermIdFromPosition (int2Int64 offset) posBitsToShift initTermId
- unTermId initTermId

relativeOffset = int2Int32 (align offset fRAME_ALIGNMENT) -
readTermCount * termLength
-- jLog ("readJournal, relativeOffset: " ++ show relativeOffset)
tag <- readFrameType termBuffer (TermOffset relativeOffset)
-- jLog ("readJournal, tag: " ++ show tag)
HeaderLength len <- readFrameLength termBuffer (TermOffset relativeOffset)
-- jLog ("readJournal, len: " ++ show len)
if tag == Padding
then do
incrBytesConsumed_ (jMetadata jour) sub (int322Int (abs len))
go (offset + int322Int (abs len)) position acc
else if len <= 0 || tag == Empty
then go offset position acc
else do
assertMMsg (show len) (len > 0)
-- jLog ("readJournal, termCount: " ++ show (unTermCount termCount))
-- NOTE: We need to read the bytestring before the CAS, otherwise the
-- bytes can be cleaned away before read. In case the CAS fails this
-- causes us to do unnecessary work, as we have to throw away the
-- bytestring we just read. A potentially better solution would be to
-- do cleaning asynchronously and somehow account there being a
-- buffer between the last reader and the cleaner...
bs <- getByteStringAt termBuffer
(int322Int relativeOffset + hEADER_LENGTH)
(int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
incrBytesConsumed_ (jMetadata jour) sub
(align (int322Int len) fRAME_ALIGNMENT)
go (offset + align (int322Int len) fRAME_ALIGNMENT) position (bs : acc)

readJournal :: Journal -> Subscriber -> IO (Maybe ByteString)
readJournal jour sub = do
offset <- readBytesConsumed (jMetadata jour) sub
Expand Down
62 changes: 35 additions & 27 deletions src/sut/dumblog/src/Dumblog/Journal/Worker.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ module Dumblog.Journal.Worker where

import Control.Concurrent (threadDelay)
import Control.Monad (unless)
import Data.ByteString (ByteString)
import qualified Data.ByteString as BS
import Data.Int (Int64)
import qualified Journal.Internal.Metrics as Metrics
Expand Down Expand Up @@ -57,30 +58,37 @@ worker journal metrics (WorkerInfo blocker logger snapshotFile currentVersion ev
writeBytesConsumed (jMetadata journal) Sub2 bytes
go 0 s
else do
mEntry <- Journal.readJournal journal Sub1
case mEntry of
Nothing -> threadDelay 0 >> go ev s
Just entry -> do
Metrics.decrCounter_ metrics QueueDepth 1
let Envelope key cmd version arrivalTime = decode entry
-- XXX: In case of decode error:
-- Metrics.incrCounter metrics ErrorsEncountered 1
-- wakeUpFrontend blocker key $ Left "Couldn't parse request"
-- -- ^ should be better error message
--
!startTime <- getCurrentNanosSinceEpoch
(s', r) <- runCommand version logger s cmd
wakeUpFrontend blocker key r
!endTime <- getCurrentNanosSinceEpoch
-- Convert from nano s to µs with `* 10^-3`.
let latency = realToFrac ((startTime - arrivalTime)) * 0.001 -- µs.
serviceTime = realToFrac ((endTime - startTime)) * 0.001
Metrics.measure metrics Latency latency
Metrics.measure metrics (case cmd of
Write {} -> ServiceTimeWrites
Read {} -> ServiceTimeReads) serviceTime
Metrics.measure metrics ResponseTime (latency + serviceTime)
case cmd of
Write bs -> Metrics.measure metrics WriteSize (realToFrac (BS.length bs))
_otherwise -> return ()
go (ev + 1) s'
entries <- Journal.readManyJournalSC journal Sub1
if null entries
then threadDelay 10 >> go ev s
else do
s' <- go' entries s
go (ev + length entries) s'

go' :: [ByteString] -> InMemoryDumblog -> IO InMemoryDumblog
go' [] s = return s
go' (entry : entries) s = do
Metrics.decrCounter_ metrics QueueDepth 1
let Envelope key cmd version arrivalTime = decode entry
-- XXX: In case of decode error:
-- Metrics.incrCounter metrics ErrorsEncountered 1
-- wakeUpFrontend blocker key $ Left "Couldn't parse request"
-- -- ^ should be better error message
--
!startTime <- getCurrentNanosSinceEpoch
(s', r) <- runCommand version logger s cmd
wakeUpFrontend blocker key r
!endTime <- getCurrentNanosSinceEpoch
-- Convert from nano s to µs with `* 10^-3`.
let latency = realToFrac ((startTime - arrivalTime)) * 0.001 -- µs.
serviceTime = realToFrac ((endTime - startTime)) * 0.001
Metrics.measure metrics Latency latency
Metrics.measure metrics (case cmd of
Write {} -> ServiceTimeWrites
Read {} -> ServiceTimeReads) serviceTime
Metrics.measure metrics ResponseTime (latency + serviceTime)
case cmd of
Write bs -> Metrics.measure metrics WriteSize (realToFrac (BS.length bs))
_otherwise -> return ()

go' entries s'

0 comments on commit 57ae88c

Please sign in to comment.