Skip to content

Commit

Permalink
feat(journal): make it possible to dump the metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jan 7, 2022
1 parent 7a8d0a1 commit eb76c8f
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 44 deletions.
1 change: 1 addition & 0 deletions src/journal/journal.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ library
Journal.Internal.ByteBufferPtr
Journal.Internal.BufferClaim
Journal.Internal.Mmap
Journal.Internal.Utils
Journal.Types
Journal.Types.AtomicCounter

Expand Down
13 changes: 9 additions & 4 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import Data.ByteString.Internal (fromForeignPtr)
import Data.IORef (newIORef)
import qualified Data.Vector as Vector
import Data.Word (Word32, Word8)
import Foreign.Marshal.Alloc (callocBytes, free)
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Marshal.Alloc (callocBytes, free)
import Foreign.Ptr (plusPtr)
import Network.Socket (Socket, recvBuf)
import System.Directory
Expand All @@ -41,8 +41,10 @@ import System.Posix.IO (fdWriteBuf)
import Journal.Internal
import Journal.Internal.BufferClaim
import Journal.Internal.ByteBufferPtr
import Journal.Internal.Mmap (sysconfPageSize)
import Journal.Types
import Journal.Types.AtomicCounter
import Journal.Internal.Utils

------------------------------------------------------------------------

Expand Down Expand Up @@ -78,6 +80,8 @@ allocateJournal fp (Options _ termBufferLen) = do

writeTermLength meta (fromIntegral termBufferLen)
writeInitialTermId meta 0
pageSize <- sysconfPageSize
writePageSize (Metadata meta) (int2Int32 pageSize)

startJournal' :: FilePath -> Options -> IO Journal'
startJournal' fp (Options _ termLength) = do
Expand Down Expand Up @@ -272,7 +276,8 @@ tj = do
opts = defaultOptions
allocateJournal fp opts
jour <- startJournal' fp opts
Just (_five, claimBuf) <- tryClaim jour 5
putBS claimBuf (BSChar8.pack "hello")
commit claimBuf
-- Just (_five, claimBuf) <- tryClaim jour 5
-- putBS claimBuf (BSChar8.pack "hello")
-- commit claimBuf
dumpMetadata (jMetadata jour)
return ()
62 changes: 43 additions & 19 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@ import Data.Word (Word32, Word8)
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Ptr (Ptr, plusPtr)
import Foreign.Storable (Storable, peekByteOff, pokeByteOff, sizeOf)
import GHC.Stack (HasCallStack)
import System.Directory
(copyFile, doesFileExist, listDirectory, renameFile)
import System.FilePath ((</>))
import System.IO.MMap (Mode(ReadWriteEx), mmapFilePtr, munmapFilePtr)
import System.Posix.Fcntl (fileAllocate)
import System.Posix.Files (ownerReadMode, ownerWriteMode)
import System.Posix.IO
(OpenMode(ReadWrite), closeFd, defaultFileFlags, openFd)
import System.Posix.Types (Fd)


import Journal.Internal.BufferClaim
import Journal.Internal.ByteBufferPtr
import Journal.Internal.Parse
import Journal.Internal.Utils
import Journal.Types
import Journal.Types.AtomicCounter

Expand Down Expand Up @@ -69,7 +65,7 @@ tryClaim jour len = do
mResult <- termAppenderClaim (jMetadata jour) termAppender termId termOffset len
newPosition (jMetadata jour) mResult
else
return (backPressureStatus position len)
backPressureStatus position len

-- XXX: Save the result in `producerLimit :: AtomicCounter` and update it in a
-- separate process?
Expand All @@ -91,7 +87,7 @@ calculatePositionLimit jour = do
cleanBufferTo :: Journal' -> Int -> IO ()
cleanBufferTo _ _ = return ()

backPressureStatus = undefined
backPressureStatus _ _ = return Nothing

newPosition :: Metadata -> Maybe (TermOffset, BufferClaim) -> IO (Maybe (Int64, BufferClaim))
newPosition meta mResult =
Expand Down Expand Up @@ -393,18 +389,6 @@ fixInconsistency = undefined

------------------------------------------------------------------------

assertM :: (HasCallStack, Monad m) => Bool -> m ()
assertM b = assert b (return ())

withRWFd :: FilePath -> (Fd -> IO a) -> IO a
withRWFd fp k =
bracket
(openFd fp ReadWrite (Just (ownerReadMode .|. ownerWriteMode)) defaultFileFlags)
closeFd
k

------------------------------------------------------------------------

-- * Debugging

dumpFile :: FilePath -> IO ()
Expand Down Expand Up @@ -464,3 +448,43 @@ dumpFile fp = do
go (ix + 1)
(totBytes + hEADER_LENGTH + fromIntegral len)
(BS.drop (fromIntegral len) bs')

dumpMetadata :: Metadata -> IO ()
dumpMetadata meta = do
termCount <- activeTermCount meta
putStrLn ("termCount: " ++ show (unTermCount termCount))

let index = indexByTermCount termCount
activePartitionIndex = index
putStrLn ("activePartitionIndex: " ++ show (unPartitionIndex activePartitionIndex))

rawTail <- readRawTail meta index
initTermId <- initialTermId meta
putStrLn ("initialTermId: " ++ show (unTermId initTermId))

termLen <- readTermLength meta
putStrLn ("termBufferLength: " ++ show termLen)

let termId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen
termBeginPosition =
computeTermBeginPosition termId (positionBitsToShift termLen) initTermId

putStrLn ("termId: " ++ show (unTermId termId))
putStrLn ("termOffset: " ++ show (unTermOffset termOffset))
putStrLn ("termBeginPosition: " ++ show termBeginPosition)
pageSize <- readPageSize meta
putStrLn ("pageSize: " ++ show pageSize)

dumpJournal' :: Journal' -> IO ()
dumpJournal' jour = do
undefined
{-
limit <- calculatePositionLimit jour
let termAppender = jTermBuffers jour Vector.! unPartitionIndex activePartitionIndex
position = termBeginPosition + fromIntegral termOffset
putStrLn $ "limit: " ++ show limit
putStrLn $ "termBeginPosition = " ++ show termBeginPosition
putStrLn $ "termOffset = " ++ show (unTermOffset termOffset)
-}
40 changes: 22 additions & 18 deletions src/journal/src/Journal/Internal/ByteBufferPtr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,19 @@ import qualified Data.ByteString.Lazy.Internal as LBS
import Data.IORef
import Data.Int
import Data.Word
import Foreign (fillBytes, plusPtr, withForeignPtr, copyBytes)
import Foreign (copyBytes, fillBytes, plusPtr, withForeignPtr)
import Foreign.Concurrent
import Foreign.Storable
import GHC.Exts
import GHC.ForeignPtr
import GHC.Stack
import GHC.IO (IO(IO))
import GHC.Stack
import System.Posix.IO
(OpenMode(ReadWrite), closeFd, defaultFileFlags, openFd)

import Journal.Internal.Atomics
import Journal.Internal.Mmap
import Journal.Internal.Utils

------------------------------------------------------------------------
-- * Types
Expand Down Expand Up @@ -349,14 +350,27 @@ getStorableAt bb ix = do

------------------------------------------------------------------------

primitiveInt :: (Addr# -> Int# -> State# RealWorld -> (# State# RealWorld, Int# #))
-> ByteBuffer -> Int -> IO Int
primitiveInt f bb offset@(I# offset#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
IO $ \s ->
case f addr# offset# s of
(# s', i #) -> (# s', I# i #)

primitiveInt32 :: (Addr# -> Int# -> State# RealWorld -> (# State# RealWorld, Int# #))
-> ByteBuffer -> Int -> IO Int32
primitiveInt32 f bb offset = int2Int32 <$> primitiveInt f bb offset

primitiveInt64 :: (Addr# -> Int# -> State# RealWorld -> (# State# RealWorld, Int# #))
-> ByteBuffer -> Int -> IO Int64
primitiveInt64 f bb offset = int2Int64 <$> primitiveInt f bb offset

-- readCharOffArray#
-- readWideCharOffArray#
readIntOffArrayIx :: ByteBuffer -> Int -> IO Int
readIntOffArrayIx bb offset@(I# offset#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
return (fromIntegral (I# (indexIntOffAddr# addr# offset#)))
readIntOffArrayIx = primitiveInt readIntOffAddr#

-- readWordOffArray#
-- readArrayOffAddr#
Expand All @@ -367,20 +381,10 @@ readIntOffArrayIx bb offset@(I# offset#) = do
-- readInt16OffArray#

readInt32OffAddr :: ByteBuffer -> Int -> IO Int32
readInt32OffAddr bb offset@(I# offset#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
IO $ \s ->
case readInt32OffAddr# addr# offset# s of
(# s', i #) -> (# s', fromIntegral (I# i) #)
readInt32OffAddr = primitiveInt32 readInt32OffAddr#

readInt64OffAddr :: ByteBuffer -> Int -> IO Int64
readInt64OffAddr bb offset@(I# offset#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
IO $ \s ->
case readInt64OffAddr# addr# offset# s of
(# s', i #) -> (# s', fromIntegral (I# i) #)
readInt64OffAddr = primitiveInt64 readInt64OffAddr#

indexWord8OffAddr :: ByteBuffer -> Int -> IO Word8
indexWord8OffAddr bb offset@(I# offset#) = do
Expand Down
28 changes: 28 additions & 0 deletions src/journal/src/Journal/Internal/Utils.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
module Journal.Internal.Utils where

import Control.Exception (assert, bracket)
import Data.Int (Int32, Int64)
import Data.Bits ((.|.))
import GHC.Stack (HasCallStack)
import System.Posix.Files (ownerReadMode, ownerWriteMode)
import System.Posix.IO
(OpenMode(ReadWrite), closeFd, defaultFileFlags, openFd)
import System.Posix.Types (Fd)

------------------------------------------------------------------------

assertM :: (HasCallStack, Monad m) => Bool -> m ()
assertM b = assert b (return ())

withRWFd :: FilePath -> (Fd -> IO a) -> IO a
withRWFd fp k =
bracket
(openFd fp ReadWrite (Just (ownerReadMode .|. ownerWriteMode)) defaultFileFlags)
closeFd
k

int2Int32 :: Int -> Int32
int2Int32 i = assert (i <= fromIntegral (maxBound :: Int32)) (fromIntegral i)

int2Int64 :: Int -> Int64
int2Int64 i = assert (i <= fromIntegral (maxBound :: Int64)) (fromIntegral i)
9 changes: 6 additions & 3 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ import Journal.Types.AtomicCounter
pARTITION_COUNT :: Int
pARTITION_COUNT = 3

newtype Metadata = Metadata ByteBuffer
newtype Metadata = Metadata { unMetadata :: ByteBuffer }

data Journal' = Journal'
{ jTermBuffers :: {-# UNPACK #-} !(Vector ByteBuffer)
Expand Down Expand Up @@ -176,8 +176,11 @@ readTermLength (Metadata meta) = readInt32OffAddr meta lOG_TERM_LENGTH_OFFSET
writeTermLength :: ByteBuffer -> Int32 -> IO ()
writeTermLength meta = writeInt32OffAddr meta lOG_TERM_LENGTH_OFFSET

pageSize :: ByteBuffer -> IO Int32
pageSize meta = readInt32OffAddr meta lOG_PAGE_SIZE_OFFSET
readPageSize :: Metadata -> IO Int32
readPageSize (Metadata meta) = readInt32OffAddr meta lOG_PAGE_SIZE_OFFSET

writePageSize :: Metadata -> Int32 -> IO ()
writePageSize (Metadata meta) = writeInt32OffAddr meta lOG_PAGE_SIZE_OFFSET

-- | The number of bits to shift when multiplying or dividing by the term buffer
-- length.
Expand Down

0 comments on commit eb76c8f

Please sign in to comment.