-
Notifications
You must be signed in to change notification settings - Fork 214
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
A story of two indexers #511
Changes from all commits
80733d2
9262ac5
fa42ee4
6c573e8
744a075
6a90253
21f887f
c2a0ddf
4319579
99dffa9
265f074
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,16 +12,22 @@ import Cardano.Api qualified as C | |
import Cardano.BM.Setup (withTrace) | ||
import Cardano.BM.Trace (logError) | ||
import Cardano.BM.Tracing (defaultConfigStdout) | ||
import Control.Concurrent (forkIO) | ||
import Control.Concurrent.QSemN (QSemN, newQSemN, signalQSemN, waitQSemN) | ||
import Control.Concurrent.STM (atomically) | ||
import Control.Concurrent.STM.TChan (TChan, dupTChan, newBroadcastTChanIO, readTChan, writeTChan) | ||
import Control.Exception (catch) | ||
import Control.Lens.Operators ((&), (<&>), (^.)) | ||
import Control.Monad (void, when) | ||
import Data.ByteString.Char8 qualified as C8 | ||
import Data.Foldable (foldl') | ||
import Data.List (findIndex) | ||
import Data.Map (assocs) | ||
import Data.Maybe (catMaybes, fromMaybe) | ||
import Data.Maybe (catMaybes, fromJust, fromMaybe, isJust) | ||
import Data.Proxy (Proxy (Proxy)) | ||
import Data.Set (Set) | ||
import Data.Set qualified as Set | ||
import Data.String (IsString) | ||
import Index.VSplit qualified as Ix | ||
import Ledger (TxIn (..), TxOut (..), TxOutRef (..)) | ||
import Ledger.Tx.CardanoAPI (fromCardanoTxId, fromCardanoTxIn, fromCardanoTxOut, fromTxScriptValidity, | ||
|
@@ -31,8 +37,8 @@ import Marconi.Index.Datum qualified as Datum | |
import Marconi.Index.Utxo (UtxoIndex, UtxoUpdate (..)) | ||
import Marconi.Index.Utxo qualified as Utxo | ||
import Marconi.Logging (logging) | ||
import Options.Applicative (Parser, auto, execParser, flag, flag', help, helper, info, long, maybeReader, metavar, | ||
option, readerError, strOption, (<**>), (<|>)) | ||
import Options.Applicative (Mod, OptionFields, Parser, auto, execParser, flag', help, helper, info, long, maybeReader, | ||
metavar, option, readerError, strOption, (<**>), (<|>)) | ||
import Plutus.Script.Utils.V1.Scripts (Datum, DatumHash) | ||
import Plutus.Streaming (ChainSyncEvent (RollBackward, RollForward), ChainSyncEventException (NoIntersectionFound), | ||
withChainSyncEventStream) | ||
|
@@ -49,16 +55,12 @@ import Streaming.Prelude qualified as S | |
-- > select slotNo, datumHash, datum from kv_datumhsh_datum where slotNo = 39920450; | ||
-- 39920450|679a55b523ff8d61942b2583b76e5d49498468164802ef1ebe513c685d6fb5c2|X(002f9787436835852ea78d3c45fc3d436b324184 | ||
|
||
data IndexerType = DatumIndexer | ||
| UtxoIndexer | ||
deriving (Show, Eq) | ||
|
||
data Options = Options | ||
{ optionsSocketPath :: String, | ||
optionsNetworkId :: NetworkId, | ||
optionsChainPoint :: ChainPoint, | ||
optionsDatabasePath :: FilePath, | ||
optionsIndexerType :: IndexerType | ||
{ optionsSocketPath :: String, | ||
optionsNetworkId :: NetworkId, | ||
optionsChainPoint :: ChainPoint, | ||
optionsUtxoPath :: Maybe FilePath, | ||
optionsDatumPath :: Maybe FilePath | ||
} | ||
deriving (Show) | ||
|
||
|
@@ -71,8 +73,11 @@ optionsParser = | |
<$> strOption (long "socket-path" <> help "Path to node socket.") | ||
<*> networkIdParser | ||
<*> chainPointParser | ||
<*> strOption (long "database-path" <> help "Path to database.") | ||
<*> flag DatumIndexer UtxoIndexer (long "with-utxo-indexer") | ||
<*> optStrParser (long "utxo-db" <> help "Path to the utxo database.") | ||
<*> optStrParser (long "datum-db" <> help "Path to the datum database.") | ||
|
||
optStrParser :: IsString a => Mod OptionFields a -> Parser (Maybe a) | ||
optStrParser fields = Just <$> strOption fields <|> pure Nothing | ||
|
||
networkIdParser :: Parser NetworkId | ||
networkIdParser = | ||
|
@@ -115,29 +120,6 @@ getDatums (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) = concatMap extra | |
let hashes = assocs . fst $ scriptDataFromCardanoTxBody txBody | ||
in map (slotNo,) hashes | ||
|
||
datumIndexer | ||
:: FilePath | ||
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r | ||
-> IO () | ||
datumIndexer path = S.foldM_ step initial finish | ||
where | ||
initial :: IO DatumIndex | ||
initial = Datum.open path (Datum.Depth 2160) | ||
|
||
step :: DatumIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO DatumIndex | ||
step index (RollForward blk _ct) = | ||
Ix.insert (getDatums blk) index | ||
step index (RollBackward cp _ct) = do | ||
events <- Ix.getEvents (index ^. Ix.storage) | ||
return $ | ||
fromMaybe index $ do | ||
slot <- chainPointToSlotNo cp | ||
offset <- findIndex (any (\(s, _) -> s < slot)) events | ||
Ix.rewind offset index | ||
|
||
finish :: DatumIndex -> IO () | ||
finish _index = pure () | ||
|
||
-- UtxoIndexer | ||
getOutputs | ||
:: C.Tx era | ||
|
@@ -174,50 +156,117 @@ getUtxoUpdate slot txs = | |
, _slotNo = slot | ||
} | ||
|
||
utxoIndexer | ||
:: FilePath | ||
{- | The way we synchronise channel consumption is by waiting on a QSemN for each | ||
of the spawn indexers to finish processing the current event. | ||
|
||
The channel is used to transmit the next event to the listening indexers. Note | ||
that even if the channel is unbound it will actually only ever hold one event | ||
because it will be blocked until the processing of the event finishes on all | ||
indexers. | ||
|
||
The indexer count is where we save the number of running indexers so we know for | ||
how many we are waiting. | ||
-} | ||
data Coordinator = Coordinator | ||
raduom marked this conversation as resolved.
Show resolved
Hide resolved
|
||
{ _channel :: TChan (ChainSyncEvent (BlockInMode CardanoMode)) | ||
, _barrier :: QSemN | ||
, _indexerCount :: Int | ||
} | ||
|
||
initialCoordinator :: Int -> IO Coordinator | ||
initialCoordinator indexerCount = | ||
Coordinator <$> newBroadcastTChanIO | ||
<*> newQSemN 0 | ||
<*> pure indexerCount | ||
|
||
datumWorker | ||
:: Coordinator | ||
-> TChan (ChainSyncEvent (BlockInMode CardanoMode)) | ||
-> FilePath | ||
-> IO () | ||
datumWorker Coordinator{_barrier} ch path = Datum.open path (Datum.Depth 2160) >>= innerLoop | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The security param There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure why it shouldn't be hardcoded in this case. Would you like to expose it as a command-line argument? One for all indexers, or one command-line argument for each? I can write the code if you think it's important to do so. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect the security param to be same for all indexers in order to guaranty that rollbacks will only modify the data that is in memory. |
||
where | ||
innerLoop :: DatumIndex -> IO () | ||
innerLoop index = do | ||
signalQSemN _barrier 1 | ||
event <- atomically $ readTChan ch | ||
case event of | ||
RollForward blk _ct -> | ||
Ix.insert (getDatums blk) index >>= innerLoop | ||
RollBackward cp _ct -> do | ||
events <- Ix.getEvents (index ^. Ix.storage) | ||
innerLoop $ | ||
fromMaybe index $ do | ||
slot <- chainPointToSlotNo cp | ||
offset <- findIndex (any (\(s, _) -> s < slot)) events | ||
Ix.rewind offset index | ||
|
||
utxoWorker | ||
:: Coordinator | ||
-> TChan (ChainSyncEvent (BlockInMode CardanoMode)) | ||
-> FilePath | ||
-> IO () | ||
utxoWorker Coordinator{_barrier} ch path = Utxo.open path (Utxo.Depth 2160) >>= innerLoop | ||
where | ||
innerLoop :: UtxoIndex -> IO () | ||
innerLoop index = do | ||
signalQSemN _barrier 1 | ||
event <- atomically $ readTChan ch | ||
case event of | ||
RollForward (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) _ct -> | ||
Ix.insert (getUtxoUpdate slotNo txs) index >>= innerLoop | ||
RollBackward cp _ct -> do | ||
events <- Ix.getEvents (index ^. Ix.storage) | ||
innerLoop $ | ||
fromMaybe index $ do | ||
slot <- chainPointToSlotNo cp | ||
offset <- findIndex (\u -> (u ^. Utxo.slotNo) < slot) events | ||
Ix.rewind offset index | ||
|
||
combinedIndexer | ||
:: Maybe FilePath | ||
-> Maybe FilePath | ||
Comment on lines
+227
to
+228
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This approach does not seem extensible for multiple indexers. Why not pass a list of workers? They seem to have a similar interface. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can probably be done, but it's not trivial. You need to provide specific configurations for each indexer, and the startup may also be different (not the case here). The issue mentioned 2 indexers and I am not sure if supporting multiple indexers is on the bird path, but if it is, it should be a separate issue, I think. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
-> S.Stream (S.Of (ChainSyncEvent (BlockInMode CardanoMode))) IO r | ||
-> IO () | ||
utxoIndexer path = S.foldM_ step initial finish | ||
combinedIndexer utxoPath datumPath = S.foldM_ step initial finish | ||
where | ||
initial :: IO UtxoIndex | ||
initial = Utxo.open path (Utxo.Depth 2160) | ||
|
||
step :: UtxoIndex -> ChainSyncEvent (BlockInMode CardanoMode) -> IO UtxoIndex | ||
step index (RollForward (BlockInMode (Block (BlockHeader slotNo _ _) txs) _) _ct) = | ||
Ix.insert (getUtxoUpdate slotNo txs) index | ||
step index (RollBackward cp _ct) = do | ||
events <- Ix.getEvents (index ^. Ix.storage) | ||
return $ | ||
fromMaybe index $ do | ||
slot <- chainPointToSlotNo cp | ||
offset <- findIndex (\u -> (u ^. Utxo.slotNo) < slot) events | ||
Ix.rewind offset index | ||
|
||
finish :: UtxoIndex -> IO () | ||
finish _index = pure () | ||
initial :: IO Coordinator | ||
initial = do | ||
let indexerCount = length . catMaybes $ [utxoPath, datumPath] | ||
coordinator <- initialCoordinator indexerCount | ||
when (isJust datumPath) $ do | ||
ch <- atomically . dupTChan $ _channel coordinator | ||
void . forkIO . datumWorker coordinator ch $ fromJust datumPath | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why not do the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually thinking about how this works, this is a possible multi-threading bug. Let me fix it first :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok. So there is no bug. Moving the channel creation in the function would mean I need to define an inner loop function which I preferred not to do. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
when (isJust utxoPath) $ do | ||
ch <- atomically . dupTChan $ _channel coordinator | ||
void . forkIO . utxoWorker coordinator ch $ fromJust utxoPath | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above |
||
pure coordinator | ||
|
||
step :: Coordinator -> ChainSyncEvent (BlockInMode CardanoMode) -> IO Coordinator | ||
step c@Coordinator{_barrier, _indexerCount, _channel} event = do | ||
waitQSemN _barrier _indexerCount | ||
atomically $ writeTChan _channel event | ||
pure c | ||
|
||
finish :: Coordinator -> IO () | ||
finish _ = pure () | ||
|
||
main :: IO () | ||
main = do | ||
Options { optionsSocketPath | ||
, optionsNetworkId | ||
, optionsChainPoint | ||
, optionsDatabasePath | ||
, optionsIndexerType } <- parseOptions | ||
, optionsUtxoPath | ||
, optionsDatumPath } <- parseOptions | ||
|
||
c <- defaultConfigStdout | ||
|
||
let processor = | ||
case optionsIndexerType of | ||
DatumIndexer -> datumIndexer optionsDatabasePath | ||
UtxoIndexer -> utxoIndexer optionsDatabasePath | ||
|
||
withTrace c "marconi" $ \trace -> | ||
withChainSyncEventStream | ||
optionsSocketPath | ||
optionsNetworkId | ||
optionsChainPoint | ||
(processor . logging trace) | ||
(combinedIndexer optionsUtxoPath optionsDatumPath . logging trace) | ||
`catch` \NoIntersectionFound -> | ||
logError trace $ | ||
renderStrict $ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -126,6 +126,7 @@ executable marconi | |
, prettyprinter | ||
, serialise | ||
, sqlite-simple | ||
, stm | ||
, streaming | ||
, text | ||
, time |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curious, is it possible (or desirable) to put the data from the indexers in the same database?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the case of SQLite, most likely not due to higher contention.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍