Skip to content
This repository was archived by the owner on Dec 2, 2024. It is now read-only.

Commit

Permalink
Add combineIndexers which can start any number of indexers
Browse files Browse the repository at this point in the history
  • Loading branch information
eyeinsky committed Sep 30, 2022
1 parent 11d65a9 commit 84634a6
Showing 1 changed file with 25 additions and 33 deletions.
58 changes: 25 additions & 33 deletions marconi/src/Marconi/Indexers.hs
Original file line number Diff line number Diff line change
Expand Up @@ -118,11 +118,9 @@ initialCoordinator indexerCount =
<*> newQSemN 0
<*> pure indexerCount

datumWorker
:: Coordinator
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO ()
type Worker = Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> FilePath -> IO ()

datumWorker :: Worker
datumWorker Coordinator{_barrier} ch path = Datum.open path (Datum.Depth 2160) >>= innerLoop
where
innerLoop :: DatumIndex -> IO ()
Expand All @@ -140,12 +138,7 @@ datumWorker Coordinator{_barrier} ch path = Datum.open path (Datum.Depth 2160) >
offset <- findIndex (any (\(s, _) -> s < slot)) events
Ix.rewind offset index

utxoWorker
:: Maybe TargetAddresses
-> Coordinator
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO ()
utxoWorker :: Maybe TargetAddresses -> Worker
utxoWorker maybeTargetAddresses Coordinator{_barrier} ch path = Utxo.open path (Utxo.Depth 2160) >>= innerLoop
where
innerLoop :: UtxoIndex -> IO ()
Expand All @@ -163,12 +156,8 @@ utxoWorker maybeTargetAddresses Coordinator{_barrier} ch path = Utxo.open path (
offset <- findIndex (\u -> (u ^. Utxo.slotNo) < slot) events
Ix.rewind offset index

scriptTxWorker
:: Coordinator
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO ()
scriptTxWorker Coordinator{_barrier} ch path = ScriptTx.open path (ScriptTx.Depth 2160) >>= loop
scriptTxWorker :: Worker
scriptTxWorker Coordinator{_barrier} ch path = ScriptTx.open path (ScriptTx.Depth 0) >>= loop
where
loop :: ScriptTx.ScriptTxIndex -> IO ()
loop index = do
Expand All @@ -192,17 +181,24 @@ combinedIndexer
-> Maybe TargetAddresses
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
combinedIndexer utxoPath datumPath scriptTxPath maybeTargetAddresses = S.foldM_ step initial finish
combinedIndexer utxoPath datumPath scriptTxPath maybeTargetAddresses = combineIndexers remainingIndexers
where
liftMaybe (worker, maybePath) = case maybePath of
Just path -> Just (worker, path)
_ -> Nothing
pairs = [(utxoWorker maybeTargetAddresses, utxoPath), (datumWorker, datumPath), (scriptTxWorker, scriptTxPath)]
remainingIndexers = catMaybes $ map liftMaybe pairs

combineIndexers
:: [(Worker, FilePath)]
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
combineIndexers indexers = S.foldM_ step initial finish
where

initial :: IO Coordinator
initial = do
let indexerCount = length . catMaybes $ [utxoPath, datumPath, scriptTxPath]
coordinator <- initialCoordinator indexerCount
let forkIndexer' worker maybePath = maybe (pure ()) (forkIndexer coordinator worker) maybePath
forkIndexer' datumWorker datumPath
forkIndexer' (utxoWorker maybeTargetAddresses) utxoPath
forkIndexer' scriptTxWorker scriptTxPath
coordinator <- initialCoordinator $ length indexers
mapM_ (uncurry (forkIndexer coordinator)) indexers
pure coordinator

step :: Coordinator -> ChainSyncEvent (BlockInMode CardanoMode) -> IO Coordinator
Expand All @@ -214,11 +210,7 @@ combinedIndexer utxoPath datumPath scriptTxPath maybeTargetAddresses = S.foldM_
finish :: Coordinator -> IO ()
finish _ = pure ()

forkIndexer
:: Coordinator
-> (Coordinator -> TChan (ChainSyncEvent (BlockInMode CardanoMode)) -> a -> IO ())
-> a
-> IO ()
forkIndexer coordinator worker path = do
ch <- atomically . dupTChan $ _channel coordinator
void . forkIO . worker coordinator ch $ path
forkIndexer :: Coordinator -> Worker -> FilePath -> IO ()
forkIndexer coordinator worker path = do
ch <- atomically . dupTChan $ _channel coordinator
void . forkIO . worker coordinator ch $ path

0 comments on commit 84634a6

Please sign in to comment.