Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A story of two indexers #511

Merged
merged 11 commits into from
Jun 16, 2022
177 changes: 113 additions & 64 deletions plutus-chain-index/app/Marconi.hs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,22 @@ import Cardano.Api qualified as C
import Cardano.BM.Setup (withTrace)
import Cardano.BM.Trace (logError)
import Cardano.BM.Tracing (defaultConfigStdout)
import Control.Concurrent (forkIO)
import Control.Concurrent.QSemN (QSemN, newQSemN, signalQSemN, waitQSemN)
import Control.Concurrent.STM (atomically)
import Control.Concurrent.STM.TChan (TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan)
import Control.Exception (catch)
import Control.Lens.Operators ((&), (<&>), (^.))
import Control.Monad (void, when)
import Data.ByteString.Char8 qualified as C8
import Data.Foldable (foldl')
import Data.List (findIndex)
import Data.Map (assocs)
import Data.Maybe (catMaybes, fromMaybe)
import Data.Maybe (catMaybes, fromJust, fromMaybe, isJust)
import Data.Proxy (Proxy (Proxy))
import Data.Set (Set)
import Data.Set qualified as Set
import Data.String (IsString)
import Index.VSplit qualified as Ix
import Ledger (TxIn (..), TxOut (..), TxOutRef (..))
import Ledger.Tx.CardanoAPI (fromCardanoTxId, fromCardanoTxIn, fromCardanoTxOut, fromTxScriptValidity,
Expand All @@ -31,8 +37,8 @@ import Marconi.Index.Datum qualified as Datum
import Marconi.Index.Utxo (UtxoIndex, UtxoUpdate (..))
import Marconi.Index.Utxo qualified as Utxo
import Marconi.Logging (logging)
import Options.Applicative (Parser, auto, execParser, flag, flag', help, helper, info, long, maybeReader, metavar,
option, readerError, strOption, (<**>), (<|>))
import Options.Applicative (Mod, OptionFields, Parser, auto, execParser, flag', help, helper, info, long, maybeReader,
metavar, option, readerError, strOption, (<**>), (<|>))
import Plutus.Script.Utils.V1.Scripts (Datum, DatumHash)
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound),
withChainSyncEventStream)
Expand All @@ -49,16 +55,12 @@ import Streaming.Prelude qualified as S
-- > select slotNo, datumHash, datum from kv_datumhsh_datum where slotNo = 39920450;
-- 39920450|679a55b523ff8d61942b2583b76e5d49498468164802ef1ebe513c685d6fb5c2|X(002f9787436835852ea78d3c45fc3d436b324184

data IndexerType = DatumIndexer
| UtxoIndexer
deriving (Show, Eq)

data Options = Options
{ optionsSocketPath :: String,
optionsNetworkId :: NetworkId,
optionsChainPoint :: ChainPoint,
optionsDatabasePath :: FilePath,
optionsIndexerType :: IndexerType
{ optionsSocketPath :: String,
optionsNetworkId :: NetworkId,
optionsChainPoint :: ChainPoint,
optionsUtxoPath :: Maybe FilePath,
optionsDatumPath :: Maybe FilePath
Comment on lines +62 to +63
Copy link
Contributor

@koslambrou koslambrou Jun 12, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious, is it possible (or desirable) to put the data from the indexers in the same database?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of SQLite, most likely not due to higher contention.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
deriving (Show)

Expand All @@ -71,8 +73,11 @@ optionsParser =
<$> strOption (long "socket-path" <> help "Path to node socket.")
<*> networkIdParser
<*> chainPointParser
<*> strOption (long "database-path" <> help "Path to database.")
<*> flag DatumIndexer UtxoIndexer (long "with-utxo-indexer")
<*> optStrParser (long "utxo-db" <> help "Path to the utxo database.")
<*> optStrParser (long "datum-db" <> help "Path to the datum database.")

optStrParser :: IsString a => Mod OptionFields a -> Parser (Maybe a)
optStrParser fields = Just <$> strOption fields <|> pure Nothing

networkIdParser :: Parser NetworkId
networkIdParser =
Expand Down Expand Up @@ -115,29 +120,6 @@ getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) = concatMap extra
let hashes = assocs . fst $ scriptDataFromCardanoTxBody txBody
in map (slotNo,) hashes

datumIndexer
:: FilePath
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
datumIndexer path = S.foldM_ step initial finish
where
initial :: IO DatumIndex
initial = Datum.open path (Datum.Depth 2160)

step :: DatumIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO DatumIndex
step index (RollForward blk _ct) =
Ix.insert (getDatums blk) index
step index (RollBackward cp _ct) = do
events <- Ix.getEvents (index ^. Ix.storage)
return $
fromMaybe index $ do
slot <- chainPointToSlotNo cp
offset <- findIndex (any (\(s, _) -> s < slot)) events
Ix.rewind offset index

finish :: DatumIndex -> IO ()
finish _index = pure ()

-- UtxoIndexer
getOutputs
:: C.Tx era
Expand Down Expand Up @@ -174,50 +156,117 @@ getUtxoUpdate slot txs =
, _slotNo = slot
}

utxoIndexer
:: FilePath
{- | The way we synchronise channel consumption is by waiting on a QSemN for each
of the spawn indexers to finish processing the current event.

The channel is used to transmit the next event to the listening indexers. Note
that even if the channel is unbound it will actually only ever hold one event
because it will be blocked until the processing of the event finishes on all
indexers.

The indexer count is where we save the number of running indexers so we know for
how many we are waiting.
-}
data Coordinator = Coordinator
raduom marked this conversation as resolved.
Show resolved Hide resolved
{ _channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode))
, _barrier :: QSemN
, _indexerCount :: Int
}

initialCoordinator :: Int -> IO Coordinator
initialCoordinator indexerCount =
Coordinator <$> newBroadcastTChanIO
<*> newQSemN 0
<*> pure indexerCount

datumWorker
:: Coordinator
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO ()
datumWorker Coordinator{_barrier} ch path = Datum.open path (Datum.Depth 2160) >>= innerLoop
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The security param 2160 shouldn't be hardcoded right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure why it shouldn't be hardcoded in this case. Would you like to expose it as a command-line argument? One for all indexers, or one command-line argument for each? I can write the code if you think it's important to do so.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect the security param to be same for all indexers in order to guaranty that rollbacks will only modify the data that is in memory.
I'll add a story about querying the security param from the local node directly (without needing to pass it as a CLI param).

where
innerLoop :: DatumIndex -> IO ()
innerLoop index = do
signalQSemN _barrier 1
event <- atomically $ readTChan ch
case event of
RollForward blk _ct ->
Ix.insert (getDatums blk) index >>= innerLoop
RollBackward cp _ct -> do
events <- Ix.getEvents (index ^. Ix.storage)
innerLoop $
fromMaybe index $ do
slot <- chainPointToSlotNo cp
offset <- findIndex (any (\(s, _) -> s < slot)) events
Ix.rewind offset index

utxoWorker
:: Coordinator
-> TChan (ChainSyncEvent (BlockInMode CardanoMode))
-> FilePath
-> IO ()
utxoWorker Coordinator{_barrier} ch path = Utxo.open path (Utxo.Depth 2160) >>= innerLoop
where
innerLoop :: UtxoIndex -> IO ()
innerLoop index = do
signalQSemN _barrier 1
event <- atomically $ readTChan ch
case event of
RollForward (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) _ct ->
Ix.insert (getUtxoUpdate slotNo txs) index >>= innerLoop
RollBackward cp _ct -> do
events <- Ix.getEvents (index ^. Ix.storage)
innerLoop $
fromMaybe index $ do
slot <- chainPointToSlotNo cp
offset <- findIndex (\u -> (u ^. Utxo.slotNo) < slot) events
Ix.rewind offset index

combinedIndexer
:: Maybe FilePath
-> Maybe FilePath
Comment on lines +227 to +228
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach does not seem extensible for multiple indexers. Why not pass a list of workers? They seem to have a similar interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can probably be done, but it's not trivial. You need to provide specific configurations for each indexer, and the startup may also be different (not the case here). The issue mentioned 2 indexers and I am not sure if supporting multiple indexers is on the bird path, but if it is, it should be a separate issue, I think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r
-> IO ()
utxoIndexer path = S.foldM_ step initial finish
combinedIndexer utxoPath datumPath = S.foldM_ step initial finish
where
initial :: IO UtxoIndex
initial = Utxo.open path (Utxo.Depth 2160)

step :: UtxoIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO UtxoIndex
step index (RollForward (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) _ct) =
Ix.insert (getUtxoUpdate slotNo txs) index
step index (RollBackward cp _ct) = do
events <- Ix.getEvents (index ^. Ix.storage)
return $
fromMaybe index $ do
slot <- chainPointToSlotNo cp
offset <- findIndex (\u -> (u ^. Utxo.slotNo) < slot) events
Ix.rewind offset index

finish :: UtxoIndex -> IO ()
finish _index = pure ()
initial :: IO Coordinator
initial = do
let indexerCount = length . catMaybes $ [utxoPath, datumPath]
coordinator <- initialCoordinator indexerCount
when (isJust datumPath) $ do
ch <- atomically . dupTChan $ _channel coordinator
void . forkIO . datumWorker coordinator ch $ fromJust datumPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not do the dupTChan inside the datumWorker instead of passing it as a parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually thinking about how this works, this is a possible multi-threading bug. Let me fix it first :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. So there is no bug.

Moving the channel creation in the function would mean I need to define an inner loop function which I preferred not to do.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

when (isJust utxoPath) $ do
ch <- atomically . dupTChan $ _channel coordinator
void . forkIO . utxoWorker coordinator ch $ fromJust utxoPath
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above

pure coordinator

step :: Coordinator -> ChainSyncEvent (BlockInMode CardanoMode) -> IO Coordinator
step c@Coordinator{_barrier, _indexerCount, _channel} event = do
waitQSemN _barrier _indexerCount
atomically $ writeTChan _channel event
pure c

finish :: Coordinator -> IO ()
finish _ = pure ()

main :: IO ()
main = do
Options { optionsSocketPath
, optionsNetworkId
, optionsChainPoint
, optionsDatabasePath
, optionsIndexerType } <- parseOptions
, optionsUtxoPath
, optionsDatumPath } <- parseOptions

c <- defaultConfigStdout

let processor =
case optionsIndexerType of
DatumIndexer -> datumIndexer optionsDatabasePath
UtxoIndexer -> utxoIndexer optionsDatabasePath

withTrace c "marconi" $ \trace ->
withChainSyncEventStream
optionsSocketPath
optionsNetworkId
optionsChainPoint
(processor . logging trace)
(combinedIndexer optionsUtxoPath optionsDatumPath . logging trace)
`catch` \NoIntersectionFound ->
logError trace $
renderStrict $
Expand Down
1 change: 1 addition & 0 deletions plutus-chain-index/plutus-chain-index.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ executable marconi
, prettyprinter
, serialise
, sqlite-simple
, stm
, streaming
, text
, time