Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix utxo sync delay. #590

Merged
merged 2 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 18 additions & 4 deletions plutus-chain-index/app/Marconi/Index/Utxo.hs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import Cardano.Api (SlotNo)
import Codec.Serialise (deserialiseOrFail, serialise)
import Control.Lens.Operators ((&), (^.))
import Control.Lens.TH (makeLenses)
import Control.Monad (when)
import Data.ByteString.Lazy (toStrict)
import Data.Foldable (foldl', forM_, toList)
import Data.Maybe (fromJust)
Expand All @@ -39,6 +40,7 @@ import Database.SQLite.Simple.ToRow (ToRow (toRow))
import GHC.Generics (Generic)
import Ledger (Address, TxId, TxOut, TxOutRef (TxOutRef, txOutRefId, txOutRefIdx))
import Ledger qualified as Ledger
import System.Random.MWC (createSystemRandom, uniformR)

import Index.VSqlite (SqliteIndex)
import Index.VSqlite qualified as Ix
Expand Down Expand Up @@ -98,7 +100,10 @@ open
-> Depth
-> IO UtxoIndex
open dbPath (Depth k) = do
ix <- fromJust <$> Ix.newBoxed query store onInsert k ((k + 1) * 4) dbPath
-- The second parameter ((k + 1) * 2) specifies the amount of events that are buffered.
-- The larger the number, the more RAM the indexer uses. However, we get improved SQL
-- queries due to batching more events together.
ix <- fromJust <$> Ix.newBoxed query store onInsert k ((k + 1) * 2) dbPath
andreabedini marked this conversation as resolved.
Show resolved Hide resolved
let c = ix ^. Ix.handle
SQL.execute_ c "CREATE TABLE IF NOT EXISTS utxos (address TEXT NOT NULL, txId TEXT NOT NULL, inputIx INT NOT NULL)"
SQL.execute_ c "CREATE TABLE IF NOT EXISTS spent (txId TEXT NOT NULL, inputIx INT NOT NULL)"
Expand All @@ -114,7 +119,12 @@ query
-> IO Result
query ix addr updates = do
-- SELECT all utxos that have not been spent.
storedUtxos <- SQL.query (ix ^. Ix.handle) "SELECT address, txId, inputIx FROM utxos LEFT JOIN spent ON utxos.txId = spent.txId AND utxos.inputIx = spent.inputIx WHERE utxos.txId IS NULL AND utxos.address = ?" (Only addr)
let c = ix ^. Ix.handle
-- Create indexes initially. When created this should be a no-op.
andreabedini marked this conversation as resolved.
Show resolved Hide resolved


-- Perform the db query
storedUtxos <- SQL.query c "SELECT address, txId, inputIx FROM utxos LEFT JOIN spent ON utxos.txId = spent.txId AND utxos.inputIx = spent.inputIx WHERE utxos.txId IS NULL AND utxos.address = ?" (Only addr)
let memoryUtxos = concatMap (filter (onlyAt addr) . toRows) updates
spentOutputs = foldl' Set.union Set.empty $ map _inputs updates
pure . Just $ storedUtxos ++ memoryUtxos
Expand All @@ -136,10 +146,14 @@ store ix = do
SQL.execute c "INSERT INTO utxos (address, txId, inputIx) VALUES (?, ?, ?)"
forM_ spent $
SQL.execute c "INSERT INTO spent (txId, inputIx) VALUES (?, ?)"

SQL.execute_ c "DELETE FROM utxos WHERE utxos.rowid IN (SELECT utxos.rowid FROM utxos LEFT JOIN spent on utxos.txId = spent.txId AND utxos.inputIx = spent.inputIx WHERE spent.txId IS NOT NULL)"
SQL.execute_ c "COMMIT"

-- We want to perform vacuum about once every 100 * buffer ((k + 1) * 2)
rndCheck <- createSystemRandom >>= uniformR (1 :: Int, 100)
when (rndCheck == 42) $ do
andreabedini marked this conversation as resolved.
Show resolved Hide resolved
SQL.execute_ c "DELETE FROM utxos WHERE utxos.rowid IN (SELECT utxos.rowid FROM utxos LEFT JOIN spent on utxos.txId = spent.txId AND utxos.inputIx = spent.inputIx WHERE spent.txId IS NOT NULL)"
SQL.execute_ c "VACUUM"

onInsert :: UtxoIndex -> UtxoUpdate -> IO [()]
onInsert _ix _update = pure []

Expand Down
1 change: 1 addition & 0 deletions plutus-chain-index/plutus-chain-index.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ executable marconi
, bytestring
, containers
, lens
, mwc-random
andreabedini marked this conversation as resolved.
Show resolved Hide resolved
, optparse-applicative
, prettyprinter
, serialise
Expand Down