Skip to content

Commit

Permalink
refactor(journal): add new metadata bytebuffer to journal
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 28, 2021
1 parent 930edda commit 3a3c4a5
Show file tree
Hide file tree
Showing 2 changed files with 247 additions and 27 deletions.
128 changes: 102 additions & 26 deletions src/journal/src/Journal/Internal/ByteBuffer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ data ByteBuffer = ByteBuffer
, bbSlice :: {-# UNPACK #-} !(IORef Slice)
}

newtype Capacity = Capacity Int
newtype Capacity = Capacity { unCapacity :: Int }
deriving (Num, Integral, Real, Ord, Eq, Enum)

newtype Limit = Limit Int
Expand Down Expand Up @@ -105,11 +105,17 @@ remaining bb = do
------------------------------------------------------------------------
-- * Checks

boundCheck :: ByteBuffer -> Int -> IO ()
boundCheck bb ix = do
if fromIntegral ix <= getCapacity bb
boundCheck :: ByteBuffer -> String -> Int -> IO ()
boundCheck bb ctx ix = do
-- XXX: parametrise on build flag and only do these checks if enabled?
if ix < fromIntegral (getCapacity bb)
then return ()
else throwIO (IndexOutOfBounds "XXX")
else throwIO (IndexOutOfBounds errMsg)
where
errMsg = concat
[ ctx, ": index out of bounds "
, "(", show ix, ",", show (unCapacity (getCapacity bb)), ")"
]

invariant :: ByteBuffer -> IO ()
invariant bb = do
Expand Down Expand Up @@ -243,41 +249,111 @@ getStorable bb = do

putStorableAt :: Storable a => ByteBuffer -> Int -> a -> IO ()
putStorableAt bb ix x = do
boundCheck bb ix
boundCheck bb "putStorableAt" ix
pokeByteOff (bbPtr bb) ix x

getStorableAt :: Storable a => ByteBuffer -> Int -> IO a
getStorableAt bb ix = do
boundCheck bb ix
boundCheck bb "getStorableAt" ix
peekByteOff (bbPtr bb) ix

------------------------------------------------------------------------

-- indexCharOffAddr#
-- indexWideCharOffAddr#
-- indexIntOffAddr#
-- indexWordOffAddr#
-- indexAddrOffAddr#
-- indexFloatOffAddr#
-- indexDoubleOffAddr#
-- indexStablePtrOffAddr#
-- indexInt8OffAddr#
-- indexInt16OffAddr#
-- indexInt32OffAddr#
-- indexInt64OffAddr#
-- indexWord8OffAddr#
-- indexWord16OffAddr#
-- indexWord32OffAddr#
-- indexWord64OffAddr#

-- writeIntArray#
-- readCharOffArray#
-- readWideCharOffArray#
-- readIntOffArray#
-- readWordOffArray#
-- readArrayOffAddr#
-- readFloatOffArray#
-- readDoubleOffArray#
-- readStablePtrOffArray#
-- readInt8OffArray#
-- readInt16OffArray#
readInt32OffArrayIx :: ByteBuffer -> Int -> IO Int32
readInt32OffArrayIx bb (I# ix#) = IO $ \s ->
case readInt32Array# (bbData bb) ix# s of
(# s', i #) -> (# s', fromIntegral (I# i) #)

readInt64OffArrayIx :: ByteBuffer -> Int -> IO Int64
readInt64OffArrayIx bb (I# ix#) = IO $ \s ->
case readInt64Array# (bbData bb) ix# s of
(# s', i #) -> (# s', fromIntegral (I# i) #)
-- readWord8OffArray#
-- readWord16OffArray#
-- readWord32OffArray#
-- readWord64OffArray#

-- writeCharOffArray#
-- writeWideCharOffArray#
-- writeIntOffArray#
-- writeWordOffArray#
-- writeArrayOffAddr#
-- writeFloatOffArray#
-- writeDoubleOffArray#
-- writeStablePtrOffArray#
-- writeInt8OffArray#
-- writeInt16OffArray#

writeInt32OffArrayIx :: ByteBuffer -> Int -> Int32 -> IO ()
writeInt32OffArrayIx bb (I# ix#) value = IO $ \s ->
case writeInt32Array# (bbData bb) ix# value# s of
s' -> (# s', () #)
where
I# value# = fromIntegral value

writeInt64OffArrayIx :: ByteBuffer -> Int -> Int64 -> IO ()
writeInt64OffArrayIx bb (I# ix#) value = IO $ \s ->
case writeInt64Array# (bbData bb) ix# value# s of
s' -> (# s', () #)
where
I# value# = fromIntegral value

-- writeWord8OffArray#
-- writeWord16OffArray#
-- writeWord32OffArray#
-- writeWord64OffArray#

-- | Given a bytebuffer, an offset in machine words, the expected old value, and
-- the new value, perform an atomic compare and swap i.e. write the new value if
-- the current value matches the provided old value. Returns a boolean
-- indicating whether the compare and swap succeded or not. Implies a full
-- memory barrier.
casIntArray :: ByteBuffer -> Int -> Int -> Int -> IO Bool
casIntArray bb (I# offset#) (I# old#) (I# new#) = IO $ \s ->
case casIntArray# (bbData bb) offset# old# new# s of
(# s', before# #) -> case before# ==# old# of
1# -> (# s', True #)
0# -> (# s', False #)

-- | Given a bytebuffer, and offset in machine words, and a value to add,
-- atomically add the value to the element. Returns the value of the element
-- before the operation. Implies a full memory barrier.
fetchAddIntArray :: ByteBuffer -> Int -> Int -> IO Int
fetchAddIntArray bb (I# offset#) (I# incr#) = IO $ \s ->
case fetchAddIntArray# (bbData bb) offset# incr# s of
(# s', before# #) -> (# s', I# before# #)

-- | Given a bytebuffer, and offset in machine words, and a value to add,
-- atomically add the value to the element. Implies a full memory barrier.
fetchAddIntArray_ :: ByteBuffer -> Int -> Int -> IO ()
fetchAddIntArray_ bb (I# offset#) (I# incr#) = IO $ \s ->
case fetchAddIntArray# (bbData bb) offset# incr# s of
(# s', _before# #) -> (# s', () #)

-- | Given a bytebuffer, and offset in machine words, and a value to add,
-- atomically add the value to the element. Returns the value of the element
-- after the operation. Implies a full memory barrier.
fetchAddIntArray' :: ByteBuffer -> Int -> Int -> IO Int
fetchAddIntArray' bb (I# offset#) (I# incr#) = IO $ \s ->
case fetchAddIntArray# (bbData bb) offset# incr# s of
(# s', before# #) -> (# s', I# (before# +# incr#) #)

------------------------------------------------------------------------
-- * Mapped

-- | Calls `msync` which forces the data in memory to be synced to disk.
force :: ByteBuffer -> IO ()
force = undefined
force bb = msync (bbPtr bb) (fromIntegral (bbCapacity bb)) MS_SYNC False

------------------------------------------------------------------------

Expand Down
146 changes: 145 additions & 1 deletion src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
{-# LANGUAGE NumericUnderscores #-}

module Journal.Types
( Journal(Journal)
, jMaxByteSize
Expand All @@ -19,20 +21,161 @@ module Journal.Types
, getMaxByteSize
, readFileCount
, bumpFileCount
, module Journal.Types.AtomicCounter)
, module Journal.Types.AtomicCounter
, packTail
, termId
, termOffset
)
where

import Control.Concurrent.STM
import Control.Concurrent.STM (TVar)
import Data.ByteString (ByteString)
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
import Data.Int
import Data.Bits
import Data.Vector (Vector)
import Data.Word (Word32, Word64, Word8)
import Foreign.Ptr (Ptr, plusPtr)
import Foreign.Storable (sizeOf)

import Journal.Internal.ByteBuffer
import Journal.Types.AtomicCounter

------------------------------------------------------------------------

pARTITION_COUNT :: Int
pARTITION_COUNT = 3

data Journal' = Journal'
{ jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer)
, jMetadata :: {-# UNPACK #-} !ByteBuffer
}

data JMetadata = JMetadata
{ mdRawTail0 :: Int64
, mdRawTail1 :: Int64
, mdRawTail2 :: Int64
, mdActiveCount :: Int32
-- padding
, mdInitialTermId :: Int32
-- mdDefaultFrameHeaderLength :: Int32?
-- mdMTULength :: Int32, only needed if we want to fragment large messages...
, mdTermLength :: Int32
, mdPageSize :: Int32
-- padding
-- , mdDefaultFrameHeader :: Bytestring???
}

tERM_TAIL_COUNTERS_OFFSET :: Int
tERM_TAIL_COUNTERS_OFFSET = 0

lOG_ACTIVE_TERM_COUNT_OFFSET :: Int
lOG_ACTIVE_TERM_COUNT_OFFSET = tERM_TAIL_COUNTERS_OFFSET +
sizeOf (8 :: Int64) * pARTITION_COUNT

lOG_INITIAL_TERM_ID_OFFSET :: Int
lOG_INITIAL_TERM_ID_OFFSET = lOG_ACTIVE_TERM_COUNT_OFFSET +
sizeOf (4 :: Int32)

lOG_TERM_LENGTH_OFFSET :: Int
lOG_TERM_LENGTH_OFFSET = lOG_INITIAL_TERM_ID_OFFSET +
sizeOf (4 :: Int32)

lOG_PAGE_SIZE_OFFSET :: Int
lOG_PAGE_SIZE_OFFSET = lOG_TERM_LENGTH_OFFSET +
sizeOf (4 :: Int32)

lOG_META_DATA_LENGTH :: Int
lOG_META_DATA_LENGTH = lOG_PAGE_SIZE_OFFSET

------------------------------------------------------------------------

rawTail :: ByteBuffer -> Int -> IO Int64
rawTail metadataBuffer partitionIndex =
readInt64OffArrayIx metadataBuffer
(tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex))

termId :: Int64 -> Int32
termId = fromIntegral . (`shiftR` 32)

termOffset :: Int64 -> Int64 -> Int32
termOffset rawTail0 termLen =
fromIntegral (min (rawTail0 .&. 0xFFFF_FFFF) termLen)

packTail :: Int32 -> Int32 -> Int64
packTail termId0 termOffset0 =
(fromIntegral termId0 `shiftL` 32) .|. (fromIntegral termOffset0 .&. 0xFFFF_FFFF);

setRawTail :: ByteBuffer -> Int32 -> Int32 -> Int -> IO ()
setRawTail meta termId0 termOffset0 partitionIndex =
writeInt64OffArrayIx meta
(tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex))
(packTail termId0 termOffset0)

casRawTail :: ByteBuffer -> Int -> Int64 -> Int64 -> IO Bool
casRawTail meta partitionIndex expectedRawTail newRawTail =
casIntArray meta
(tERM_TAIL_COUNTERS_OFFSET + (sizeOf (8 :: Int64) * partitionIndex))
(fromIntegral expectedRawTail) (fromIntegral newRawTail) -- XXX: 32-bit systems?

initialiseTailWithTermId :: ByteBuffer -> Int -> Int32 -> IO ()
initialiseTailWithTermId meta partitionIndex termId0 =
setRawTail meta termId0 0 partitionIndex

activeTermCount :: ByteBuffer -> IO Int32
activeTermCount meta = readInt32OffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET

setActiveTermCount :: ByteBuffer -> Int32 -> IO ()
setActiveTermCount meta = writeInt32OffArrayIx meta lOG_ACTIVE_TERM_COUNT_OFFSET

casActiveTermCount :: ByteBuffer -> Int32 -> Int32 -> IO Bool
casActiveTermCount meta expectedTermCount newTermCount =
undefined
-- casIntArray only works on `Int`, does it mean we need to change all `Int32`
-- to `Int`? Or can we keep `Int32` and use casIntArray + fromIntegral?

-- casIntArray meta lOG_ACTIVE_TERM_COUNT_OFFSET expectedTermCount newTermCount

initialTermId :: ByteBuffer -> IO Int32
initialTermId meta = readInt32OffArrayIx meta lOG_INITIAL_TERM_ID_OFFSET

-- should never be changed?
-- setInitialTermId :: ByteBuffer -> Int32 -> IO ()
-- setInitialTermId meta = writeInt32OffArrayIx meta lOG_INITIAL_TERM_ID_OFFSET

termLength :: ByteBuffer -> IO Int32
termLength meta = readInt32OffArrayIx meta lOG_TERM_LENGTH_OFFSET

-- should never be changed?
-- setTermLength :: ByteBuffer -> Int32 -> IO ()
-- setTermLength meta = writeInt32OffArrayIx meta lOG_TERM_LENGTH_OFFSET

pageSize :: ByteBuffer -> IO Int32
pageSize meta = readInt32OffArrayIx meta lOG_PAGE_SIZE_OFFSET

------------------------------------------------------------------------

indexByTermCount :: Int32 -> Int
indexByTermCount termCount = fromIntegral termCount `mod` pARTITION_COUNT

computeTermBeginPosition :: Int32 -> Int32 -> Int32 -> Int64
computeTermBeginPosition activeTermId posBitsToShift initTermId =
let
termCount :: Int64
-- Copes with negative `activeTermId` on rollover.
termCount = fromIntegral (activeTermId - initTermId)
in
termCount `shiftL` fromIntegral posBitsToShift

rotateLog :: ByteBuffer -> Int32 -> Int32 -> IO Bool
rotateLog meta termCount termId0 = do
-- XXX:
undefined
casActiveTermCount meta termCount (termCount + 1)

------------------------------------------------------------------------

data Journal = Journal
{ jPtr :: {-# UNPACK #-} !(TVar (Ptr Word8))
, jOffset :: {-# UNPACK #-} !AtomicCounter
Expand Down Expand Up @@ -76,6 +219,7 @@ data Options = Options
-- max disk space in total? multiple of maxSize?
-- checksum? none, crc32 or sha256?
-- wait strategy?
-- page size? (for prefetching (see ghc-prim) and buffering writes?)

data JournalConsumer = JournalConsumer
{ jcPtr :: {-# UNPACK #-} !(IORef (Ptr Word8))
Expand Down

0 comments on commit 3a3c4a5

Please sign in to comment.