Skip to content

Commit

Permalink
Fixed a concurrency bug.
Browse files Browse the repository at this point in the history
  • Loading branch information
raduom committed Jun 10, 2022
1 parent 4c491b1 commit a130a76
Showing 1 changed file with 8 additions and 54 deletions.
62 changes: 8 additions & 54 deletions plutus-chain-index/app/Marconi.hs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ optionsParser =
<$> strOption (long "socket-path" <> help "Path to node socket.")
<*> networkIdParser
<*> chainPointParser
<*> optStrParser (long "utxo-db")
<*> optStrParser (long "datum-db")
<*> optStrParser (long "utxo-db" <> help "Path to the utxo database.")
<*> optStrParser (long "datum-db" <> help "Path to the datum database.")

optStrParser :: IsString a => Mod OptionFields a -> Parser (Maybe a)
optStrParser fields = Just <$> strOption fields <|> pure Nothing
Expand Down Expand Up @@ -120,29 +120,6 @@ getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) = concatMap extra
let hashes = assocs . fst $ scriptDataFromCardanoTxBody txBody
in map (slotNo,) hashes

datumIndexer
:: FilePath
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
datumIndexer path = S.foldM_ step initial finish
where
initial :: IO DatumIndex
initial = Datum.open path (Datum.Depth 2160)

step :: DatumIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO DatumIndex
step index (RollForward blk _ct) =
Ix.insert (getDatums blk) index
step index (RollBackward cp _ct) = do
events <- Ix.getEvents (index ^. Ix.storage)
return $
fromMaybe index $ do
slot <- chainPointToSlotNo cp
offset <- findIndex (any (\(s, _) -> s < slot)) events
Ix.rewind offset index

finish :: DatumIndex -> IO ()
finish _index = pure ()

-- UtxoIndexer
getOutputs
:: C.Tx era
Expand Down Expand Up @@ -179,29 +156,6 @@ getUtxoUpdate slot txs =
, _slotNo = slot
}

utxoIndexer
:: FilePath
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
utxoIndexer path = S.foldM_ step initial finish
where
initial :: IO UtxoIndex
initial = Utxo.open path (Utxo.Depth 2160)

step :: UtxoIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO UtxoIndex
step index (RollForward (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) _ct) =
Ix.insert (getUtxoUpdate slotNo txs) index
step index (RollBackward cp _ct) = do
events <- Ix.getEvents (index ^. Ix.storage)
return $
fromMaybe index $ do
slot <- chainPointToSlotNo cp
offset <- findIndex (\u -> (u ^. Utxo.slotNo) < slot) events
Ix.rewind offset index

finish :: UtxoIndex -> IO ()
finish _index = pure ()

data Coordinator = Coordinator
{ _channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
, _barrier :: QSemN
Expand All @@ -219,12 +173,12 @@ datumWorker
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO ()
datumWorker coordinator ch path = Datum.open path (Datum.Depth 2160) >>= innerLoop
datumWorker Coordinator{_barrier} ch path = Datum.open path (Datum.Depth 2160) >>= innerLoop
where
innerLoop :: DatumIndex -> IO ()
innerLoop index = do
signalQSemN _barrier 1
event <- atomically $ readTChan ch
signalQSemN (_barrier coordinator) 1
case event of
RollForward blk _ct ->
Ix.insert (getDatums blk) index >>= innerLoop
Expand All @@ -241,12 +195,12 @@ utxoWorker
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO ()
utxoWorker coordinator ch path = Utxo.open path (Utxo.Depth 2160) >>= innerLoop
utxoWorker Coordinator{_barrier} ch path = Utxo.open path (Utxo.Depth 2160) >>= innerLoop
where
innerLoop :: UtxoIndex -> IO ()
innerLoop index = do
signalQSemN _barrier 1
event <- atomically $ readTChan ch
signalQSemN (_barrier coordinator) 1
case event of
RollForward (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) _ct ->
Ix.insert (getUtxoUpdate slotNo txs) index >>= innerLoop
Expand All @@ -259,8 +213,8 @@ utxoWorker coordinator ch path = Utxo.open path (Utxo.Depth 2160) >>= innerLoop
Ix.rewind offset index

combinedIndexer
:: Maybe FilePath -- utxo
-> Maybe FilePath -- datum
:: Maybe FilePath
-> Maybe FilePath
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
combinedIndexer utxoPath datumPath = S.foldM_ step initial finish
Expand Down

0 comments on commit a130a76

Please sign in to comment.