Skip to content

Commit

Permalink
refactor(journal): almost finish new try claim
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Dec 30, 2021
1 parent 5a65f1d commit ad1aae1
Show file tree
Hide file tree
Showing 2 changed files with 125 additions and 32 deletions.
86 changes: 63 additions & 23 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{-# LANGUAGE DerivingStrategies #-}
{-# LANGUAGE NumericUnderscores #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE PatternSynonyms #-}
{-# LANGUAGE ScopedTypeVariables #-}
Expand All @@ -10,7 +11,7 @@ import Control.Concurrent.STM (atomically, writeTVar)
import Control.Exception (assert)
import Control.Monad (unless, when)
import Data.Binary (Binary, decode, encode)
import Data.Bits (Bits, (.&.))
import Data.Bits (Bits, (.&.), (.|.), shiftL)
import qualified Data.ByteString as BS
import Data.ByteString.Internal (fromForeignPtr)
import qualified Data.ByteString.Lazy as LBS
Expand All @@ -34,11 +35,11 @@ import Journal.Types.AtomicCounter

------------------------------------------------------------------------

newtype HeaderTag = HeaderTag Word8
newtype HeaderTag = HeaderTag { unHeaderTag :: Word8 }
deriving newtype (Eq, Binary, Bits, Show, Num, Storable)

newtype HeaderVersion = HeaderVersion Word8
deriving newtype (Eq, Binary, Num, Storable)
deriving newtype (Eq, Binary, Num, Storable, Integral, Real, Ord, Enum)

newtype HeaderLength = HeaderLength Word32
deriving newtype (Eq, Ord, Binary, Enum, Real, Integral, Num, Storable)
Expand Down Expand Up @@ -107,13 +108,36 @@ tryClaim jour len = do
if position < limit
then do
result <- termAppenderClaim (jMetadata jour) termAppender termId termOffset len
return (newPosition result)
newPosition (jMetadata jour) result
else
return (backPressureStatus position len)

calculatePositionLimit = undefined

newPosition = undefined
newPosition :: Metadata -> Maybe TermOffset -> IO (Maybe Int64)
newPosition meta mResultingOffset =
case mResultingOffset of
Just resultingOffset -> do
-- XXX: cache
-- termOffset := resultingOffset
termCount <- activeTermCount meta
let index = indexByTermCount termCount
rt <- readRawTail meta index
initTermId <- initialTermId meta
termLen <- readTermLength meta

let termId = rawTailTermId rt
termOffset = rawTailTermOffset rt termLen
termBeginPosition =
computeTermBeginPosition termId (positionBitsToShift termLen) initTermId
return (Just (termBeginPosition + fromIntegral resultingOffset))
Nothing -> do
-- XXX:
-- if termBeginPosition + termBufferLength >= maxPossiblePosition
-- then return Nothing -- return MAX_POSSILBE_POSITION_EXCEEDED ?
-- else do
rotateTerm meta
return Nothing -- ADMIN_ACTION

backPressureStatus = undefined

Expand All @@ -136,36 +160,52 @@ termAppenderClaim meta termBuffer termId termOffset len = do
handleEndOfLogCondition termBuffer termOffset termLength termId
return Nothing
else do
headerWrite termBuffer termOffset frameLength termId
headerWrite termBuffer termOffset (fromIntegral frameLength) termId
-- claimBuffer <- wrapPart termBuffer termOffset frameLength
return (Just resultingOffset)

handleEndOfLogCondition :: ByteBuffer -> TermOffset -> Capacity -> TermId -> IO ()
handleEndOfLogCondition termBuffer termOffset (Capacity termLen) termId = do
when (termOffset < fromIntegral termLen) $ do

let offset :: Int32
offset = fromIntegral termOffset
let paddingLength :: HeaderLength
paddingLength = fromIntegral (termLen - fromIntegral termOffset)

paddingLength :: Int32
paddingLength = termLen - offset
headerWrite termBuffer termOffset paddingLength termId
writeFrameType termBuffer termOffset Padding
writeFrameLength termBuffer termOffset paddingLength

headerWrite termBuffer offset paddingLength termId
writeFrameType termBuffer offset Padding
writeFrameLength termBuffer offset paddingLength
fRAME_LENGTH_FIELD_OFFSET :: Int
fRAME_LENGTH_FIELD_OFFSET = 0

headerWrite = undefined
tAG_FIELD_OFFSET :: Int
tAG_FIELD_OFFSET = 6

writeFrameType = undefined
writeFrameLength = undefined
headerWrite :: ByteBuffer -> TermOffset -> HeaderLength -> TermId -> IO ()
headerWrite termBuffer termOffset len _termId = do
let versionFlagsType :: Int64
versionFlagsType = fromIntegral cURRENT_VERSION `shiftL` 32
-- XXX: Atomic write?
writeIntOffArrayIx termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
(versionFlagsType .|. ((- fromIntegral len) .&. 0xFFFF_FFFF))
-- XXX: store termId and offset (only need for replication?)

rotateTerm :: Journal' -> IO ()
rotateTerm jour = do
termCount <- activeTermCount (jMetadata jour)
writeFrameType :: ByteBuffer -> TermOffset -> HeaderTag -> IO ()
writeFrameType termBuffer termOffset (HeaderTag tag) = do
putByteAt termBuffer (fromIntegral termOffset + tAG_FIELD_OFFSET) tag

writeFrameLength :: ByteBuffer -> TermOffset -> HeaderLength -> IO ()
writeFrameLength termBuffer termOffset (HeaderLength len) = do
writeWord32OffArrayIx termBuffer (fromIntegral termOffset + fRAME_LENGTH_FIELD_OFFSET)
len

rotateTerm :: Metadata -> IO ()
rotateTerm meta = do
termCount <- activeTermCount meta
let activePartitionIndex = indexByTermCount termCount
nextIndex = nextPartitionIndex activePartitionIndex
rawTail <- readRawTail (jMetadata jour) activePartitionIndex
initTermId <- initialTermId (jMetadata jour)
rawTail <- readRawTail meta activePartitionIndex
initTermId <- initialTermId meta
let termId = rawTailTermId rawTail
nextTermId = termId + 1
termCount = fromIntegral (nextTermId - initTermId)
Expand All @@ -176,8 +216,8 @@ rotateTerm jour = do
-- termId := nextTermId
-- termBeginPosition += termBufferLength

initialiseTailWithTermId (jMetadata jour) nextIndex nextTermId
writeActiveTermCount (jMetadata jour) termCount
initialiseTailWithTermId meta nextIndex nextTermId
writeActiveTermCount meta termCount

commit = undefined

Expand Down
71 changes: 62 additions & 9 deletions src/journal/src/Journal/Internal/ByteBuffer.hs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MagicHash #-}
{-# LANGUAGE UnboxedTuples #-}

module Journal.Internal.ByteBuffer where

import Control.Exception
import Control.Monad
import qualified Data.ByteString as BS
import qualified Data.ByteString.Internal as BS
import qualified Data.ByteString.Lazy as LBS
import qualified Data.ByteString.Lazy.Internal as LBS
import Data.IORef
import Foreign
import GHC.ForeignPtr
import GHC.Exts
import GHC.Types
import GHC.ForeignPtr
import GHC.IO
import System.Posix.IO (openFd, defaultFileFlags, OpenMode(ReadWrite))
import GHC.Stack
import GHC.Types
import System.Posix.IO (OpenMode(ReadWrite), defaultFileFlags, openFd)

import Journal.Internal.Mmap

Expand All @@ -37,7 +42,7 @@ newtype Capacity = Capacity { unCapacity :: Int }
newtype Limit = Limit Int
deriving (Num, Integral, Real, Ord, Eq, Enum)

newtype Position = Position Int
newtype Position = Position { unPosition :: Int }
deriving (Num, Integral, Real, Ord, Eq, Enum)

newtype Slice = Slice Int
Expand Down Expand Up @@ -233,7 +238,11 @@ putByte :: ByteBuffer -> Word8 -> IO ()
putByte = undefined

getByte :: ByteBuffer -> IO Word8
getByte = undefined
getByte bb = do
pos <- readPosition bb
w8 <- readWord8OffArrayIx bb (unPosition pos)
writePosition bb (pos + 1)
return w8

putByteAt :: ByteBuffer -> Int -> Word8 -> IO ()
putByteAt = undefined
Expand All @@ -242,7 +251,7 @@ getByteAt :: ByteBuffer -> Int -> IO Word8
getByteAt = undefined

------------------------------------------------------------------------
-- * Multi-byte operations
-- * Multi-byte relative and absolute operations

putBytes :: ByteBuffer -> ByteBuffer -> IO ()
putBytes src dest = do
Expand All @@ -257,6 +266,36 @@ putBytes src dest = do
getBytes :: ByteBuffer -> Int -> Int -> IO [Word8]
getBytes bb offset len = undefined

putByteString :: ByteBuffer -> BS.ByteString -> IO ()
putByteString bb bs = do
let (fptr, I# offset#, I# len#) = BS.toForeignPtr bs
boundCheck bb (I# (len# -# 1#))
withForeignPtr fptr $ \(Ptr addr#) -> IO $ \s ->
case copyAddrToByteArray# addr# (bbData bb) offset# len# s of
s' -> (# s', () #)

putLazyByteString :: ByteBuffer -> LBS.ByteString -> IO ()
putLazyByteString bb lbs = do
let (fptr, I# offset#, I# len#) = BS.toForeignPtr (LBS.toStrict lbs)
boundCheck bb (I# (len# -# 1#))
withForeignPtr fptr $ \(Ptr addr#) -> IO $ \s ->
case copyAddrToByteArray# addr# (bbData bb) offset# len# s of
s' -> (# s', () #)

getByteString :: ByteBuffer -> Int -> IO BS.ByteString
getByteString bb len = do
bytes <- replicateM len (getByte bb)
return (BS.packBytes bytes)

getLazyByteString :: ByteBuffer -> Int -> IO LBS.ByteString
getLazyByteString bb len = do
bytes <- replicateM len (getByte bb)
return (LBS.packBytes bytes)

getByteStringAt :: ByteBuffer -> Int -> Int -> IO BS.ByteString
getByteStringAt bb offset len = do
undefined

------------------------------------------------------------------------
-- * Relative operations on `Storable` elements

Expand Down Expand Up @@ -316,7 +355,14 @@ readInt64OffArrayIx bb ix@(I# ix#) = do
IO $ \s ->
case readInt64Array# (bbData bb) ix# s of
(# s', i #) -> (# s', fromIntegral (I# i) #)
-- readWord8OffArray#

readWord8OffArrayIx :: ByteBuffer -> Int -> IO Word8
readWord8OffArrayIx bb offset@(I# offset#) = do
boundCheck bb offset
IO $ \s ->
case readWord8Array# (bbData bb) offset# s of
(# s', w8# #) -> (# s', fromIntegral (W# w8#) #)

-- readWord16OffArray#
-- readWord32OffArray#
-- readWord64OffArray#
Expand Down Expand Up @@ -357,7 +403,14 @@ writeInt64OffArrayIx bb ix@(I# ix#) value = do

-- writeWord8OffArray#
-- writeWord16OffArray#
-- writeWord32OffArray#
writeWord32OffArrayIx :: ByteBuffer -> Int -> Word32 -> IO ()
writeWord32OffArrayIx bb offset@(I# offset#) value = do
boundCheck bb offset
IO $ \s ->
case writeWord32Array# (bbData bb) offset# value# s of
s' -> (# s', () #)
where
W# value# = fromIntegral value
-- writeWord64OffArray#

-- atomicReadIntArray#
Expand Down

0 comments on commit ad1aae1

Please sign in to comment.