Skip to content

Commit

Permalink
Merge pull request #420 from symbiont-io/journal
Browse files Browse the repository at this point in the history
journal
  • Loading branch information
symbiont-stevan-andjelkovic authored Jan 12, 2022
2 parents 34c251f + 12fc98e commit d7854a3
Show file tree
Hide file tree
Showing 12 changed files with 433 additions and 52 deletions.
10 changes: 10 additions & 0 deletions shell.nix
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ pkgs.mkShell {
name = "dev-shell";

buildInputs = [
gdb
bazel_4
buildifier # Bazel BUILD file formatter
go
Expand Down Expand Up @@ -49,4 +50,13 @@ pkgs.mkShell {
[ darwin.apple_sdk.frameworks.Foundation
darwin.apple_sdk.frameworks.CoreFoundation
];

# The following is needed for gdb to work, otherwise we get an error saying:
# `ModuleNotFoundError: No module named '_sysconfigdata__linux_x86_64-linux-gnu'`.
# (Source: https://github.com/NixOS/nixpkgs/issues/88711).
shellHook = ''
unset _PYTHON_HOST_PLATFORM
unset _PYTHON_SYSCONFIGDATA_NAME
'';

}
1 change: 1 addition & 0 deletions src/journal/app/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import Data.Word
import Foreign.Ptr

import Journal
import Journal.Types.AtomicCounter

------------------------------------------------------------------------

Expand Down
4 changes: 4 additions & 0 deletions src/journal/journal.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,15 @@ test-suite test
other-modules:
Journal.CRC32Test
Journal.Internal.AtomicsTest
Journal.Internal.ByteBufferTest
Journal.Internal.MmapTest
JournalTest
TastyDiscover

build-depends:
, async
, base
, binary
, bytestring
, directory
, HUnit
Expand All @@ -97,6 +100,7 @@ test-suite test
, tasty-hunit
, tasty-quickcheck
, vector
, unix
, zlib

ghc-options: -threaded -rtsopts -with-rtsopts=-N -fno-ignore-asserts
Expand Down
22 changes: 9 additions & 13 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,17 @@ import qualified Data.ByteString.Char8 as BSChar8
import Data.ByteString.Internal (fromForeignPtr)
import Data.IORef (newIORef)
import qualified Data.Vector as Vector
import Data.Word (Word32, Word8)
import Foreign.ForeignPtr (newForeignPtr_)
import Foreign.Marshal.Alloc (callocBytes, free)
import Foreign.Ptr (plusPtr)
import Network.Socket (Socket, recvBuf)
import System.Directory
( createDirectoryIfMissing
, doesDirectoryExist
, doesFileExist
, getFileSize
, removeFile
)
import System.FilePath (takeDirectory, (</>))
import System.Posix.IO (fdWriteBuf)

import Journal.Internal
import Journal.Internal.BufferClaim
Expand All @@ -58,28 +56,25 @@ allocateJournal fp (Options _ termBufferLen) = do
unless (popCount termBufferLen == 1) $
-- XXX: check bounds
error "allocateJournal: oTermBufferLength must be a power of 2"
-- XXX: only for debugging:
putStrLn ("removing " ++ fp)
removeFile fp
b <- doesFileExist fp
when (not b) $ do

putStrLn ("allocateJournal, creating new journal: " ++ fp)
let dir = takeDirectory fp
dirExists <- doesDirectoryExist dir
unless dirExists (createDirectoryIfMissing True dir)

let logLength = termBufferLen * pARTITION_COUNT + lOG_META_DATA_LENGTH

withRWFd fp $ \fd -> do
fileAllocate fd 0 (fromIntegral logLength)
-- NOTE: `fileAllocate` only allocates the space it doesn't zero it,
-- unlike `fallocate(1)`, so we do that next.
bracket (callocBytes logLength) free $ \zeroesPtr -> do
bytesWritten <- fdWriteBuf fd zeroesPtr (fromIntegral logLength)
assertM (bytesWritten == fromIntegral logLength)

fallocate fp (fromIntegral logLength)
bb <- mmapped fp logLength
meta <- wrapPart bb (logLength - lOG_META_DATA_LENGTH) lOG_META_DATA_LENGTH

writeTermLength meta (fromIntegral termBufferLen)
writeInitialTermId meta 0 -- XXX: should be random rather than 0.
writeInitialTermId meta 4 -- XXX: should be random rather than 4.
pageSize <- sysconfPageSize
writePageSize (Metadata meta) (int2Int32 pageSize)

Expand Down Expand Up @@ -276,7 +271,8 @@ tj = do
opts = defaultOptions
allocateJournal fp opts
jour <- startJournal' fp opts
-- Just (_five, claimBuf) <- tryClaim jour 5
-- Just (five, claimBuf) <- tryClaim jour 5
-- print five
-- putBS claimBuf (BSChar8.pack "hello")
-- commit claimBuf
dumpMetadata (jMetadata jour)
Expand Down
33 changes: 21 additions & 12 deletions src/journal/src/Journal/Internal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ tryClaim jour len = do
let index = indexByTermCount termCount
activePartitionIndex = index
rt <- readRawTail (jMetadata jour) index
initTermId <- initialTermId (jMetadata jour)
initTermId <- readInitialTermId (jMetadata jour)
termLen <- readTermLength (jMetadata jour)

-- XXX: cache and read these from there?
Expand Down Expand Up @@ -98,7 +98,7 @@ newPosition meta mResult =
termCount <- activeTermCount meta
let index = indexByTermCount termCount
rt <- readRawTail meta index
initTermId <- initialTermId meta
initTermId <- readInitialTermId meta
termLen <- readTermLength meta

let termId = rawTailTermId rt
Expand Down Expand Up @@ -163,7 +163,7 @@ rotateTerm meta = do
let activePartitionIndex = indexByTermCount termCount
nextIndex = nextPartitionIndex activePartitionIndex
rawTail <- readRawTail meta activePartitionIndex
initTermId <- initialTermId meta
initTermId <- readInitialTermId meta
let termId = rawTailTermId rawTail
nextTermId = termId + 1
termCount = fromIntegral (nextTermId - initTermId)
Expand Down Expand Up @@ -451,20 +451,29 @@ dumpFile fp = do

dumpMetadata :: Metadata -> IO ()
dumpMetadata meta = do
termCount <- activeTermCount meta
putStrLn ("termCount: " ++ show (unTermCount termCount))
putStrLn "Metadata"
putStrLn "========"
flip mapM_ [0 .. pARTITION_COUNT - 1] $ \i -> do
RawTail rawTail <- readRawTail meta (PartitionIndex i)
putStrLn ("rawTail" ++ show i ++ ": " ++ show rawTail)

let index = indexByTermCount termCount
activePartitionIndex = index
putStrLn ("activePartitionIndex: " ++ show (unPartitionIndex activePartitionIndex))
termCount <- activeTermCount meta
putStrLn ("activeTermCount: " ++ show (unTermCount termCount))

rawTail <- readRawTail meta index
initTermId <- initialTermId meta
initTermId <- readInitialTermId meta
putStrLn ("initialTermId: " ++ show (unTermId initTermId))

termLen <- readTermLength meta
putStrLn ("termBufferLength: " ++ show termLen)

pageSize <- readPageSize meta
putStrLn ("pageSize: " ++ show pageSize)

putStrLn "--------"

let index = indexByTermCount termCount
activePartitionIndex = index
rawTail <- readRawTail meta index
let termId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen
termBeginPosition =
Expand All @@ -473,8 +482,8 @@ dumpMetadata meta = do
putStrLn ("termId: " ++ show (unTermId termId))
putStrLn ("termOffset: " ++ show (unTermOffset termOffset))
putStrLn ("termBeginPosition: " ++ show termBeginPosition)
pageSize <- readPageSize meta
putStrLn ("pageSize: " ++ show pageSize)
putStrLn ("activePartitionIndex: " ++ show (unPartitionIndex activePartitionIndex))


dumpJournal' :: Journal' -> IO ()
dumpJournal' jour = do
Expand Down
6 changes: 6 additions & 0 deletions src/journal/src/Journal/Internal/Atomics.hs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ module Journal.Internal.Atomics where
import Foreign
import Foreign.C.Types

import Journal.Internal.Utils (int2Int64)

------------------------------------------------------------------------

foreign import ccall unsafe "stdatomic.h __atomic_fetch_add_1"
Expand Down Expand Up @@ -82,3 +84,7 @@ casInt64Ptr ptr expected desired = do
1 -> return True
_ ->
error "casInt64Addr: impossible, c_atomic_compare_exchange_strong should return a _Bool"

casIntPtr :: Ptr Int -> Int -> Int -> IO Bool
casIntPtr ptr expected desired =
casInt64Ptr (castPtr ptr) (int2Int64 expected) (int2Int64 desired)
34 changes: 16 additions & 18 deletions src/journal/src/Journal/Internal/ByteBufferPtr.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import Foreign.Storable
import GHC.Exts
import GHC.ForeignPtr
import GHC.IO (IO(IO))
import GHC.Int (Int32(I32#), Int64(I64#))
import GHC.Stack
import System.Posix.IO
(OpenMode(ReadWrite), closeFd, defaultFileFlags, openFd)
Expand Down Expand Up @@ -353,26 +354,26 @@ getStorableAt bb ix = do
------------------------------------------------------------------------

primitiveInt :: (Addr# -> Int# -> State# RealWorld -> (# State# RealWorld, Int# #))
-> ByteBuffer -> Int -> IO Int
primitiveInt f bb offset@(I# offset#) = do
-> (Int# -> i) -> ByteBuffer -> Int -> IO i
primitiveInt f c bb offset@(I# offset#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
IO $ \s ->
case f addr# offset# s of
(# s', i #) -> (# s', I# i #)
case f (addr# `plusAddr#` offset#) 0# s of
(# s', i #) -> (# s', c i #)

primitiveInt32 :: (Addr# -> Int# -> State# RealWorld -> (# State# RealWorld, Int# #))
-> ByteBuffer -> Int -> IO Int32
primitiveInt32 f bb offset = int2Int32 <$> primitiveInt f bb offset
primitiveInt32 f bb offset = primitiveInt f I32# bb offset

primitiveInt64 :: (Addr# -> Int# -> State# RealWorld -> (# State# RealWorld, Int# #))
-> ByteBuffer -> Int -> IO Int64
primitiveInt64 f bb offset = int2Int64 <$> primitiveInt f bb offset
primitiveInt64 f bb offset = primitiveInt f I64# bb offset

-- readCharOffArray#
-- readWideCharOffArray#
readIntOffArrayIx :: ByteBuffer -> Int -> IO Int
readIntOffArrayIx = primitiveInt readIntOffAddr#
readIntOffArrayIx = primitiveInt readIntOffAddr# I#

-- readWordOffArray#
-- readArrayOffAddr#
Expand Down Expand Up @@ -405,12 +406,13 @@ indexWord8OffAddr bb offset@(I# offset#) = do
writeInt = writeIntOffAddr

writeIntOffAddr :: ByteBuffer -> Int -> Int -> IO ()
writeIntOffAddr bb ix@(I# ix#) (I# value#) = do
boundCheck bb ix
writeIntOffAddr bb offset@(I# offset#) (I# value#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
IO $ \s ->
case writeIntOffAddr# addr# ix# value# s of
case writeIntOffAddr# (addr# `plusAddr#` offset#) 0# value# s of
s' -> (# s', () #)

-- writeWordOffArray#
-- writeArrayOffAddr#
-- writeFloatOffArray#
Expand All @@ -420,24 +422,20 @@ writeIntOffAddr bb ix@(I# ix#) (I# value#) = do
-- writeInt16OffArray#

writeInt32OffAddr :: ByteBuffer -> Int -> Int32 -> IO ()
writeInt32OffAddr bb offset@(I# offset#) value = do
writeInt32OffAddr bb offset@(I# offset#) (I32# value#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
IO $ \s ->
case writeInt32OffAddr# addr# offset# value# s of
case writeInt32OffAddr# (addr# `plusAddr#` offset#) 0# value# s of
s' -> (# s', () #)
where
I# value# = fromIntegral value

writeInt64OffAddr :: ByteBuffer -> Int -> Int64 -> IO ()
writeInt64OffAddr bb offset@(I# offset#) value = do
writeInt64OffAddr bb offset@(I# offset#) (I64# value#) = do
boundCheck bb offset
withForeignPtr (bbPtr bb) $ \(Ptr addr#) ->
IO $ \s ->
case writeInt64OffAddr# addr# offset# value# s of
case writeInt64OffAddr# (addr# `plusAddr#` offset#) 0# value# s of
s' -> (# s', () #)
where
I# value# = fromIntegral value

-- writeWord8OffArray#
-- writeWord16OffArray#
Expand Down
36 changes: 32 additions & 4 deletions src/journal/src/Journal/Internal/Utils.hs
Original file line number Diff line number Diff line change
@@ -1,14 +1,27 @@
{-# LANGUAGE MagicHash #-}

module Journal.Internal.Utils where

import Control.Exception (assert, bracket)
import Data.Int (Int32, Int64)
import Data.Bits ((.|.))
import Data.Int (Int32, Int64)
import Foreign.Marshal.Alloc (callocBytes, free)
import GHC.Int (Int(I#), Int32(I32#), Int64(I64#))
import GHC.Stack (HasCallStack)
import System.Directory (canonicalizePath, getTemporaryDirectory)
import System.IO (Handle, hClose, openTempFile)
import System.Posix.Files (ownerReadMode, ownerWriteMode)
import System.Posix.IO
(OpenMode(ReadWrite), closeFd, defaultFileFlags, openFd)
( OpenMode(ReadWrite)
, closeFd
, defaultFileFlags
, fdWriteBuf
, openFd
)
import System.Posix.Types (Fd)

import Journal.Internal.FileAllocate

------------------------------------------------------------------------

assertM :: (HasCallStack, Monad m) => Bool -> m ()
Expand All @@ -22,7 +35,22 @@ withRWFd fp k =
k

int2Int32 :: Int -> Int32
int2Int32 i = assert (i <= fromIntegral (maxBound :: Int32)) (fromIntegral i)
int2Int32 i@(I# i#) = assert (i <= fromIntegral (maxBound :: Int32)) (I32# i#)

int2Int64 :: Int -> Int64
int2Int64 i = assert (i <= fromIntegral (maxBound :: Int64)) (fromIntegral i)
int2Int64 i@(I# i#) = assert (i <= fromIntegral (maxBound :: Int64)) (I64# i#)

fallocate :: FilePath -> Int -> IO ()
fallocate fp len = do
withRWFd fp $ \fd -> do
fileAllocate fd 0 (fromIntegral len)
-- NOTE: `fileAllocate` only allocates the space it doesn't zero it,
-- unlike `fallocate(1)`, so we do that next.
bracket (callocBytes len) free $ \zeroesPtr -> do
bytesWritten <- fdWriteBuf fd zeroesPtr (fromIntegral len)
assertM (fromIntegral bytesWritten == len)

withTempFile :: String -> (FilePath -> Handle -> IO a) -> IO a
withTempFile name k = do
tmp <- canonicalizePath =<< getTemporaryDirectory
bracket (openTempFile tmp name) (\(_fp, h) -> hClose h) (uncurry k)
4 changes: 2 additions & 2 deletions src/journal/src/Journal/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ casActiveTermCount :: Metadata -> TermCount -> TermCount -> IO Bool
casActiveTermCount (Metadata meta) (TermCount expected) (TermCount new) =
casInt32Addr meta lOG_ACTIVE_TERM_COUNT_OFFSET expected new

initialTermId :: Metadata -> IO TermId
initialTermId (Metadata meta) =
readInitialTermId :: Metadata -> IO TermId
readInitialTermId (Metadata meta) =
TermId <$> readInt32OffAddr meta lOG_INITIAL_TERM_ID_OFFSET

writeInitialTermId :: ByteBuffer -> TermId -> IO ()
Expand Down
6 changes: 3 additions & 3 deletions src/journal/test/Journal/Internal/AtomicsTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@ unit_atomicCASSequential :: Assertion
unit_atomicCASSequential =
alloca $ \ptr -> do
poke ptr 0
b <- casIntPtr ptr 0 1
b <- casInt32Ptr ptr 0 1
assertBool "casIntPtr 0 1" b
b' <- casIntPtr ptr 2 3
b' <- casInt32Ptr ptr 2 3
assertBool "casIntPtr 2 3" (not b')
b'' <- casIntPtr ptr 1 2
b'' <- casInt32Ptr ptr 1 2
assertBool "casIntPtr 1 2" b''
result <- peek ptr
assertEqual "" 2 result
Expand Down
Loading

0 comments on commit d7854a3

Please sign in to comment.