Skip to content

Commit

Permalink
fix(journal): use position instead of term offset in read
Browse files Browse the repository at this point in the history
  • Loading branch information
symbiont-stevan-andjelkovic committed Jan 28, 2022
1 parent 3a2812c commit 883210b
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 22 deletions.
31 changes: 17 additions & 14 deletions src/journal/src/Journal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -145,33 +145,36 @@ recvBytes bc sock len = withPtr bc $ \ptr -> recvBuf sock ptr len

-- * Consumption

-- XXX: move
readJournal :: Journal -> IO (Maybe ByteString)
readJournal jour = do
offset <- readCounter (jBytesConsumed jour)
putStrLn ("readJournal, offset: " ++ show offset)
(termBuffer , TermOffset termOffset) <- readTermBufferTermOffset jour
assertM (offset <= int322Int termOffset)
if offset == int322Int termOffset

termCount <- activeTermCount (jMetadata jour)
let activeTermIndex = indexByTermCount termCount
rawTail <- readRawTail (jMetadata jour) activeTermIndex
termLen <- readTermLength (jMetadata jour)
let termBuffer = jTermBuffers jour Vector.! unPartitionIndex activeTermIndex
activeTermId = rawTailTermId rawTail
termOffset = rawTailTermOffset rawTail termLen

putStrLn ("readJournal, termOffset: " ++ show (unTermOffset termOffset))
initTermId <- readInitialTermId (jMetadata jour)
let position =
computePosition activeTermId termOffset (positionBitsToShift termLen) initTermId
assertM (int2Int64 offset <= position)
if int2Int64 offset == position
then return Nothing
else do
assertM (offset < int322Int termOffset)
assertM (int2Int64 offset < position)
HeaderLength len <- readFrameLength termBuffer (TermOffset (int2Int32 offset))
putStrLn ("readJournal, len: " ++ show len)
assertM (len > 0)
bs <- getByteStringAt termBuffer
(offset + hEADER_LENGTH) (int322Int len - hEADER_LENGTH)
assertM (BS.length bs == int322Int len - hEADER_LENGTH)
incrCounter_ (int322Int len) (jBytesConsumed jour)
return (Just bs)
where
readTermBufferTermOffset :: Journal -> IO (ByteBuffer, TermOffset)
readTermBufferTermOffset jour = do
termCount <- activeTermCount (jMetadata jour)
let activeTermIndex = indexByTermCount termCount
rawTail <- readRawTail (jMetadata jour) activeTermIndex
termLen <- readTermLength (jMetadata jour)
let bb = jTermBuffers jour Vector.! unPartitionIndex activeTermIndex
return (bb, rawTailTermOffset rawTail termLen)

------------------------------------------------------------------------

Expand Down
24 changes: 16 additions & 8 deletions src/journal/test/JournalTest.hs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,13 @@ data Command
-- TruncateAfterSnapshot
-- LoadSnapshot
-- Replay
| DumpJournal
deriving Show

constructorString :: Command -> String
constructorString AppendBS {} = "AppendBS"
constructorString ReadJournal = "ReadJournal"
constructorString DumpJournal = "DumpJournal"

prettyCommand :: Command -> String
prettyCommand = show
Expand Down Expand Up @@ -134,10 +136,12 @@ precondition :: Model -> Command -> Bool
precondition m ReadJournal = Vector.length (fjJournal m) /= fjIndex m
precondition m (AppendBS rle) = let bs = decodeRunLength rle in
not (BS.null bs) && BS.length bs + hEADER_LENGTH < oTermBufferLength testOptions `div` 2
precondition m DumpJournal = True

step :: Command -> Model -> (Model, Response)
step (AppendBS rle) m = Unit <$> appendBSFake (decodeRunLength rle) m
step ReadJournal m = ByteString <$> readJournalFake m
step DumpJournal m = (m, Unit (Just ()))

exec :: Command -> Journal -> IO Response
exec (AppendBS rle) j = do
Expand All @@ -149,6 +153,7 @@ exec (AppendBS rle) j = do
Unit <$> appendBS j bs
Just () -> return (Unit (Just ()))
exec ReadJournal j = ByteString <$> readJournal j
exec DumpJournal j = Unit . Just <$> dumpJournal j

genRunLenEncoding :: Gen [(Int, Char)]
genRunLenEncoding = sized $ \n -> do
Expand Down Expand Up @@ -263,7 +268,9 @@ runCommands cmds = do
allocateJournal fp testOptions
j <- startJournal fp testOptions
putStrLn ""
go m j cmds []
b <- go m j cmds []
dumpJournal j
return b
where
go :: Model -> Journal -> [Command] -> [(Command, Response)] -> IO Bool
go m j [] _hist = putStrLn "\nSuccess!" >> return True
Expand Down Expand Up @@ -299,24 +306,25 @@ runCommands cmds = do
unit_bug0 :: Assertion
unit_bug0 = assertProgram ""
[ AppendBS [(2, 'E')]
, AppendBS [(32762, 'O')]
, AppendBS [(32752, 'O')]
]

unit_bug1 :: Assertion
unit_bug1 = assertProgram ""
[ AppendBS [(32762, 'O')]
, AppendBS [(32762, 'G')]
[ AppendBS [(32756, 'O')]
, AppendBS [(32756, 'G')]
]

unit_bug11 :: Assertion
unit_bug11 = assertProgram ""
[ AppendBS [(32762, 'O')]
[ AppendBS [(32756, 'O')]
, ReadJournal
, AppendBS [(32762, 'G')]
, AppendBS [(32756, 'G')]
, ReadJournal
, AppendBS [(32762, 'K')]
, AppendBS [(32756, 'K')]
, DumpJournal
, ReadJournal
, AppendBS [(32762, 'J')]
, AppendBS [(32756, 'J')]
]

unit_bug2 :: Assertion
Expand Down

0 comments on commit 883210b

Please sign in to comment.