Skip to content

Commit

Permalink
feat(journal): introduce more newtypes and term rotation
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 29, 2021
1 parent 1f7290e commit bee8690
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 43 deletions.
35 changes: 29 additions & 6 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -90,17 +90,18 @@ tryClaim jour len = do
termCount <- activeTermCount (jMetadata jour)
let index = indexByTermCount termCount
activePartitionIndex = index
rt <- rawTail (jMetadata jour) index
rt <- readRawTail (jMetadata jour) index
initTermId <- initialTermId (jMetadata jour)
termLen <- termLength (jMetadata jour)

let tId = termId rt
tOffset = termOffset rt termLen
termBeginPosition = computeTermBeginPosition tId (positionBitsToShift termLen) initTermId
let termId = rawTailTermId rt
termOffset = rawTailTermOffset rt termLen
termBeginPosition =
computeTermBeginPosition termId (positionBitsToShift termLen) initTermId

limit <- calculatePositionLimit jour
let termAppender = jTermBuffers jour Vector.! activePartitionIndex
position = termBeginPosition + tOffset
let termAppender = jTermBuffers jour Vector.! unPartitionIndex activePartitionIndex
position = termBeginPosition + fromIntegral termOffset
if position < limit
then do
result <- undefined
Expand All @@ -116,6 +117,27 @@ newPosition = undefined
backPressureStatus = undefined

termAppenderClaim = undefined

rotateTerm :: Journal' -> IO ()
rotateTerm jour = do
termCount <- activeTermCount (jMetadata jour)
let activePartitionIndex = indexByTermCount termCount
nextIndex = nextPartitionIndex activePartitionIndex
rawTail <- readRawTail (jMetadata jour) activePartitionIndex
initTermId <- initialTermId (jMetadata jour)
let termId = rawTailTermId rawTail
nextTermId = termId + 1
termCount = fromIntegral (nextTermId - initTermId)

-- XXX: cache this? where exactly?
-- activePartionIndex := nextIndex
-- termOffset := 0
-- termId := nextTermId
-- termBeginPosition += termBufferLength

initialiseTailWithTermId (jMetadata jour) nextIndex nextTermId
writeActiveTermCount (jMetadata jour) termCount

commit = undefined

abort = undefined
Expand Down Expand Up @@ -150,6 +172,7 @@ claim jour len = assert (hEADER_SIZE + len <= getMaxByteSize jour) $ do
-- Check if header is written to offset (if that's the case the active
-- file hasn't been rotated yet)
undefined

mmapFile :: FilePath -> Int -> IO (Ptr Word8, Int)
mmapFile fp maxByteSize = do
(ptr, rawSize, _offset, size) <-
Expand Down
93 changes: 56 additions & 37 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE DerivingStrategies #-}

module Journal.Types
-- ( Journal'(Journal')
Expand Down Expand Up @@ -104,50 +106,66 @@ lOG_META_DATA_LENGTH = lOG_PAGE_SIZE_OFFSET

------------------------------------------------------------------------

rawTail :: ByteBuffer -> Int -> IO Int64
rawTail meta partitionIndex =
newtype RawTail = RawTail { unRawTail :: Int64 }
deriving newtype (Integral, Real, Num, Enum, Ord, Eq, Bits)

newtype PartitionIndex = PartitionIndex { unPartitionIndex :: Int }
deriving newtype (Integral, Real, Num, Enum, Ord, Eq)

newtype TermId = TermId { unTermId :: Int32 }
deriving newtype (Integral, Real, Num, Enum, Ord, Eq)

newtype TermOffset = TermOffset { unTermOffset :: Int32 }
deriving newtype (Integral, Real, Num, Enum, Ord, Eq)

newtype TermCount = TermCount { unTermCount :: Int32 }
deriving newtype (Integral, Real, Num, Enum, Ord, Eq)

readRawTail :: ByteBuffer -> PartitionIndex -> IO RawTail
readRawTail meta (PartitionIndex partitionIndex) = RawTail <$>
readIntOffArrayIx meta
(tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex))

termId :: Int64 -> Int32
termId = fromIntegral . (`shiftR` 32)
rawTailTermId :: RawTail -> TermId
rawTailTermId = fromIntegral . (`shiftR` 32) . unRawTail

termOffset :: Int64 -> Int64 -> Int32
termOffset rawTail0 termLen =
fromIntegral (min (rawTail0 .&. 0xFFFF_FFFF) termLen)
rawTailTermOffset :: RawTail -> Int64 -> TermOffset
rawTailTermOffset (RawTail rt) termLen =
fromIntegral (min (rt .&. 0xFFFF_FFFF) termLen)

packTail :: Int32 -> Int32 -> Int64
packTail :: TermId -> TermOffset -> RawTail
packTail termId0 termOffset0 =
(fromIntegral termId0 `shiftL` 32) .|. (fromIntegral termOffset0 .&. 0xFFFF_FFFF);

setRawTail :: ByteBuffer -> Int32 -> Int32 -> Int -> IO ()
setRawTail meta termId0 termOffset0 partitionIndex =
writeRawTail :: ByteBuffer -> TermId -> TermOffset -> PartitionIndex -> IO ()
writeRawTail meta termId0 termOffset0 (PartitionIndex partitionIndex) =
writeIntOffArrayIx meta
(tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex))
(packTail termId0 termOffset0)
(unRawTail (packTail termId0 termOffset0))

casRawTail :: ByteBuffer -> Int -> Int64 -> Int64 -> IO Bool
casRawTail meta partitionIndex expectedRawTail newRawTail =
casRawTail :: ByteBuffer -> PartitionIndex -> RawTail -> RawTail -> IO Bool
casRawTail meta (PartitionIndex partitionIndex) expectedRawTail newRawTail =
casIntArray meta
(tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex))
(fromIntegral expectedRawTail) (fromIntegral newRawTail) -- XXX: 32-bit systems?

initialiseTailWithTermId :: ByteBuffer -> Int -> Int32 -> IO ()
initialiseTailWithTermId :: ByteBuffer -> PartitionIndex -> TermId -> IO ()
initialiseTailWithTermId meta partitionIndex termId0 =
setRawTail meta termId0 0 partitionIndex
writeRawTail meta termId0 0 partitionIndex

activeTermCount :: ByteBuffer -> IO Int32
activeTermCount meta = readIntOffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET
activeTermCount :: ByteBuffer -> IO TermCount
activeTermCount meta = TermCount <$> readIntOffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET

setActiveTermCount :: ByteBuffer -> Int32 -> IO ()
setActiveTermCount meta = writeIntOffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET
writeActiveTermCount :: ByteBuffer -> TermCount -> IO ()
writeActiveTermCount meta =
writeIntOffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET . fromIntegral

casActiveTermCount :: ByteBuffer -> Int32 -> Int32 -> IO Bool
casActiveTermCount meta expectedTermCount newTermCount =
casIntArray meta lOG_ACTIVE_TERM_COUNT_OFFSET expectedTermCount newTermCount
casActiveTermCount :: ByteBuffer -> TermCount -> TermCount -> IO Bool
casActiveTermCount meta (TermCount expected) (TermCount new) =
casIntArray meta lOG_ACTIVE_TERM_COUNT_OFFSET expected new

initialTermId :: ByteBuffer -> IO Int32
initialTermId meta = readIntOffArrayIx meta lOG_INITIAL_TERM_ID_OFFSET
initialTermId :: ByteBuffer -> IO TermId
initialTermId meta = TermId <$> readIntOffArrayIx meta lOG_INITIAL_TERM_ID_OFFSET

-- should never be changed?
-- setInitialTermId :: ByteBuffer -> Int32 -> IO ()
Expand Down Expand Up @@ -190,33 +208,34 @@ positionBitsToShift termBufferLength =
------------------------------------------------------------------------

-- | Rotate to the next partition in sequence for the current term id.
nextPartitionIndex :: Int32 -> Int32
nextPartitionIndex :: PartitionIndex -> PartitionIndex
nextPartitionIndex currentIndex =
(currentIndex + 1) `mod` fromIntegral pARTITION_COUNT

-- | Calculate the partition index to be used given the initial term and active
-- term ids.
indexByTerm :: Int32 -> Int32 -> Int32
indexByTerm initTermId activeTermId =
indexByTerm :: TermId -> TermId -> PartitionIndex
indexByTerm initTermId activeTermId = fromIntegral $
(activeTermId - initTermId) `mod` fromIntegral pARTITION_COUNT

-- | Caluclate the partition index based on number of terms that have passed.
indexByTermCount :: Int32 -> Int
indexByTermCount termCount = fromIntegral termCount `mod` pARTITION_COUNT
indexByTermCount :: TermCount -> PartitionIndex
indexByTermCount termCount = PartitionIndex $
fromIntegral termCount `mod` pARTITION_COUNT

-- | Calculate the partition index given a stream position.
indexByPosition :: Int64 -> Int -> Int
indexByPosition :: Int64 -> Int -> PartitionIndex
indexByPosition pos posBitsToShift = fromIntegral $
(pos `shiftR` posBitsToShift) `mod` fromIntegral pARTITION_COUNT

-- | Compute the current position in absolute number of bytes.
computePosition :: Int32 -> Int32 -> Int -> Int32 -> Int64
computePosition :: TermId -> TermOffset -> Int -> TermId -> Int64
computePosition activeTermId termOffset posBitsToShift initTermId =
computeTermBeginPosition activeTermId posBitsToShift initTermId + fromIntegral termOffset

-- | Compute the current position in absolute number of bytes for the beginning
-- of a term.
computeTermBeginPosition :: Int32 -> Int32 -> Int32 -> Int64
computeTermBeginPosition :: TermId -> Int32 -> TermId -> Int64
computeTermBeginPosition activeTermId posBitsToShift initTermId =
let
termCount :: Int64
Expand All @@ -226,7 +245,7 @@ computeTermBeginPosition activeTermId posBitsToShift initTermId =
termCount `shiftL` fromIntegral posBitsToShift

-- | Compute the term id from a position.
computeTermIdFromPosition :: Int64 -> Int -> Int32 -> Int32
computeTermIdFromPosition :: Int64 -> Int -> TermId -> Int32
computeTermIdFromPosition pos posBitsToShift initTermId = fromIntegral $
(pos `shiftR` posBitsToShift) + fromIntegral initTermId

Expand All @@ -250,7 +269,7 @@ align value alignment = (value + (alignment - 1)) .&. (- alignment)

-- | Rotate the log and update the tail counter for the new term. This function
-- is thread safe.
rotateLog :: ByteBuffer -> Int32 -> Int32 -> IO Bool
rotateLog :: ByteBuffer -> TermCount -> TermId -> IO Bool
rotateLog meta termCount termId0 = do
go
casActiveTermCount meta termCount nextTermCount
Expand All @@ -261,11 +280,11 @@ rotateLog meta termCount termId0 = do
expectedTermId = nextTermId - fromIntegral pARTITION_COUNT

go = do
rt <- rawTail meta nextIndex
if expectedTermId /= termId rt
rawTail <- readRawTail meta nextIndex
if expectedTermId /= rawTailTermId rawTail
then return ()
else do
b <- casRawTail meta nextIndex rt (packTail nextTermId 0)
b <- casRawTail meta nextIndex rawTail (packTail nextTermId 0)
if b then return () else go

------------------------------------------------------------------------
Expand Down

0 comments on commit bee8690

Please sign in to comment.