Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLite: Prepare schema for rollback #672

Merged
merged 1 commit into from
Sep 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 103 additions & 60 deletions lib/core/src/Cardano/Wallet/DB/Sqlite.hs
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@ import Cardano.Wallet.DB.Sqlite.TH
, PrivateKey (..)
, RndState (..)
, RndStateAddress (..)
, RndStateId
, RndStateCheckpoint (..)
, RndStateCheckpointId
, RndStatePendingAddress (..)
, SeqState (..)
, SeqStateAddresses (..)
, SeqStateId
, SeqStateAddress (..)
, SeqStateCheckpoint (..)
, SeqStateCheckpointId
, SeqStatePendingIx (..)
, TxIn (..)
, TxMeta (..)
Expand Down Expand Up @@ -133,6 +136,7 @@ import Database.Persist.Sql
, insertMany_
, insert_
, rawExecute
, repsert
, repsertMany
, runMigrationSilent
, runSqlConn
Expand Down Expand Up @@ -528,10 +532,10 @@ mkCheckpointEntity wid wal =
cp = Checkpoint
{ checkpointWalletId = wid
, checkpointSlot = sl
, checkpointBlock = fromIntegral bh
, checkpointBlockHeight = fromIntegral bh
, checkpointParent = BlockId parent
}
utxo = [ UTxO wid sl (TxId input) ix addr coin
utxo = [ UTxO wid sl Nothing (TxId input) ix addr coin
| (W.TxIn input ix, W.TxOut addr coin) <- utxoMap ]
utxoMap = Map.assocs (W.getUTxO (W.utxo wal))

Expand All @@ -551,7 +555,7 @@ checkpointFromEntity cp utxo txs s =
header = (W.BlockHeader slot parentHeaderHash)
utxo' = W.UTxO . Map.fromList $
[ (W.TxIn input ix, W.TxOut addr coin)
| UTxO _ _ (TxId input) ix addr coin <- utxo
| UTxO _ _ _ (TxId input) ix addr coin <- utxo
]
pending = Set.fromList $ map snd txs
blockHeight' = Quantity . toEnum . fromEnum $ bh
Expand Down Expand Up @@ -751,7 +755,7 @@ selectUTxO
selectUTxO (Checkpoint wid sl _parent _bh) = fmap entityVal <$>
selectList
[ UtxoWalletId ==. wid
, UtxoCheckpointSlot ==. sl
, UtxoSlot ==. sl
] []

selectTxs
Expand Down Expand Up @@ -840,66 +844,73 @@ instance W.KeyToAddress t Seq.SeqKey => PersistState (Seq.SeqState t) where
(uncurry (==))
let eGap = Seq.gap extPool
let iGap = Seq.gap intPool
ssid <- insert (SeqState wid sl eGap iGap (AddressPoolXPub xpub))
insertAddressPool ssid intPool
insertAddressPool ssid extPool
insertMany_ $ mkSeqStatePendingIxs ssid $ Seq.pendingChangeIxs st
repsert (SeqStateKey wid) (SeqState wid eGap iGap (AddressPoolXPub xpub))
insertAddressPool wid sl intPool
insertAddressPool wid sl extPool
cpid <- insert (SeqStateCheckpoint wid sl)
insertMany_ $ mkSeqStatePendingIxs cpid $ Seq.pendingChangeIxs st

selectState (wid, sl) = runMaybeT $ do
st <- MaybeT $ selectFirst
[ SeqStateWalletId ==. wid
st <- MaybeT $ selectFirst [SeqStateWalletId ==. wid] []
stc <- MaybeT $ selectFirst
[ SeqStateCheckpointWalletId ==. wid
, SeqStateCheckpointSlot ==. sl
] []
let (ssid, SeqState _ _ eGap iGap xPub) = (entityKey st, entityVal st)
intPool <- lift $ selectAddressPool ssid iGap xPub
extPool <- lift $ selectAddressPool ssid eGap xPub
pendingChangeIxs <- lift $ selectSeqStatePendingIxs ssid

let SeqState _ eGap iGap xPub = entityVal st
intPool <- lift $ selectAddressPool wid sl iGap xPub
extPool <- lift $ selectAddressPool wid sl eGap xPub
pendingChangeIxs <- lift $ selectSeqStatePendingIxs (entityKey stc)
pure $ Seq.SeqState intPool extPool pendingChangeIxs

deleteState wid = do
ssid <- fmap entityKey <$> selectList [ SeqStateWalletId ==. wid ] []
deleteWhere [ SeqStateAddressesSeqStateId <-. ssid ]
deleteWhere [ SeqStatePendingIxSeqStateId <-. ssid ]
cpid <- selectList [ SeqStateCheckpointWalletId ==. wid ] []
deleteWhere [ SeqStatePendingIxCheckpointId <-. fmap entityKey cpid ]
Copy link
Member

@Anviking Anviking Sep 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not directly relevant to this, but these are run as "separate SQL statements"?

Should we use something like Sqlite transactions, to protect against inconsistent state from the program crashing in the middle of a complex sequence of deletes/updates/inserts?

We do use DB.withLock from the WalletLayer, but I think that just protects against multiple wallets/requests mutating the DB at the same time.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed (as mentioned here: https://github.com/input-output-hk/cardano-wallet/blob/master/lib/core/src/Cardano/Wallet.hs#L682-L685). The lock is only there to prevent concurrent issues in case clients are making concurrent conflicting requests, but it doesn't prevent data corruption / inconsistency like a transaction would do.

I recall @rvl had a reason for not using sqlite transaction here. I think it was something about being only partially supported by our driver library 🤔 ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good point.
It is running each DBLayer method in a transaction because we are using runSqlConn.
At the time of writing the initial DBLayer we were a bit unsure where to draw the lines with transactions and locking.
We ought to have another look. In particular, see if we can put a transaction around restoreBlocks in the wallet layer.

deleteCascadeWhere [ SeqStateCheckpointWalletId ==. wid ]
deleteWhere [ SeqStateAddressWalletId ==. wid ]
deleteWhere [ SeqStateWalletId ==. wid ]

insertAddressPool
:: forall t c. (Typeable c)
=> SeqStateId
=> W.WalletId
-> W.SlotId
-> Seq.AddressPool t c
-> SqlPersistT IO ()
insertAddressPool ssid pool =
insertAddressPool wid sl pool =
void $ dbChunked insertMany_
[ SeqStateAddresses ssid addr ix (Seq.changeChain @c)
[ SeqStateAddress wid sl addr ix (Seq.changeChain @c)
| (ix, addr) <- zip [0..] (Seq.addresses pool)
]

selectAddressPool
:: forall t c. (W.KeyToAddress t Seq.SeqKey, Typeable c)
=> SeqStateId
=> W.WalletId
-> W.SlotId
-> Seq.AddressPoolGap
-> AddressPoolXPub
-> SqlPersistT IO (Seq.AddressPool t c)
selectAddressPool ssid gap (AddressPoolXPub xpub) = do
selectAddressPool wid sl gap (AddressPoolXPub xpub) = do
addrs <- fmap entityVal <$> selectList
[ SeqStateAddressesSeqStateId ==. ssid
, SeqStateAddressesChangeChain ==. Seq.changeChain @c
] [Asc SeqStateAddressesIndex]
[ SeqStateAddressWalletId ==. wid
, SeqStateAddressSlot ==. sl
, SeqStateAddressChangeChain ==. Seq.changeChain @c
] [Asc SeqStateAddressIndex]
pure $ addressPoolFromEntity addrs
where
addressPoolFromEntity
:: [SeqStateAddresses]
:: [SeqStateAddress]
-> Seq.AddressPool t c
addressPoolFromEntity addrs =
Seq.mkAddressPool @t @c xpub gap (map seqStateAddressesAddress addrs)
Seq.mkAddressPool @t @c xpub gap (map seqStateAddressAddress addrs)

mkSeqStatePendingIxs :: SeqStateId -> Seq.PendingIxs -> [SeqStatePendingIx]
mkSeqStatePendingIxs ssid =
fmap (SeqStatePendingIx ssid . W.getIndex) . Seq.pendingIxsToList
mkSeqStatePendingIxs :: SeqStateCheckpointId -> Seq.PendingIxs -> [SeqStatePendingIx]
mkSeqStatePendingIxs cpid =
fmap (SeqStatePendingIx cpid . W.getIndex) . Seq.pendingIxsToList

selectSeqStatePendingIxs :: SeqStateId -> SqlPersistT IO Seq.PendingIxs
selectSeqStatePendingIxs ssid =
selectSeqStatePendingIxs :: SeqStateCheckpointId -> SqlPersistT IO Seq.PendingIxs
selectSeqStatePendingIxs cpid =
Seq.pendingIxsFromList . fromRes <$> selectList
[SeqStatePendingIxSeqStateId ==. ssid]
[SeqStatePendingIxCheckpointId ==. cpid]
[Desc SeqStatePendingIxIndex]
where
fromRes = fmap (W.Index . seqStatePendingIxIndex . entityVal)
Expand All @@ -908,62 +919,94 @@ selectSeqStatePendingIxs ssid =
HD Random address discovery
-------------------------------------------------------------------------------}

-- | Type alias for the index -> address map so that lines do not exceed 80
-- characters in width.
type RndStateAddresses = Map
(W.Index 'W.Hardened 'W.AccountK, W.Index 'W.Hardened 'W.AddressK)
W.Address

-- Persisting 'RndState' requires that the wallet root key has already been
-- added to the database with 'putPrivateKey'. Unlike sequential AD, random
-- address discovery requires a root key to recognize addresses.
instance PersistState (Rnd.RndState t) where
insertState (wid, sl) st = do
rsid <- insert $ RndState wid sl
(W.getIndex (st ^. #accountIndex))
(st ^. #gen)
insertRndStateAddresses rsid False $ Rnd.addresses st
insertRndStateAddresses rsid True $ Rnd.pendingAddresses st
repsert (RndStateKey wid)
(RndState wid (W.getIndex (st ^. #accountIndex)))
insertRndStateAddresses wid sl (Rnd.addresses st)
cpid <- insert $ RndStateCheckpoint wid sl (st ^. #gen)
insertRndStatePending cpid (Rnd.pendingAddresses st)

selectState (wid, sl) = runMaybeT $ do
st <- MaybeT $ selectFirst
[ RndStateWalletId ==. wid
] []
stc <- MaybeT $ selectFirst
[ RndStateCheckpointWalletId ==. wid
, RndStateCheckpointSlot ==. sl
] []
let (rsid, RndState _ _ accIx gen) = (entityKey st, entityVal st)
addresses <- lift $ selectRndStateAddresses rsid False
pendingAddresses <- lift $ selectRndStateAddresses rsid True
addresses <- lift $ selectRndStateAddresses wid sl
pendingAddresses <- lift $ selectRndStatePending (entityKey stc)
rndKey <- lift $ selectRndStateKey wid
pure $ Rnd.RndState
{ rndKey = rndKey
, accountIndex = W.Index accIx
, accountIndex = W.Index (rndStateAccountIndex (entityVal st))
, addresses = addresses
, pendingAddresses = pendingAddresses
, gen = gen
, gen = rndStateCheckpointGen (entityVal stc)
}

deleteState wid = do
rsid <- fmap entityKey <$> selectList [ RndStateWalletId ==. wid ] []
deleteWhere [ RndStateAddressRndStateId <-. rsid ]
cpid <- fmap entityKey <$> selectList [ RndStateCheckpointWalletId ==. wid ] []
deleteWhere [ RndStatePendingAddressCheckpointId <-. cpid ]
deleteWhere [ RndStateCheckpointWalletId ==. wid ]
deleteWhere [ RndStateAddressWalletId ==. wid ]
deleteWhere [ RndStateWalletId ==. wid ]

insertRndStateAddresses
:: RndStateId
-> Bool
-> Map (W.Index 'W.Hardened 'W.AccountK, W.Index 'W.Hardened 'W.AddressK) W.Address
:: W.WalletId
-> W.SlotId
-> RndStateAddresses
-> SqlPersistT IO ()
insertRndStateAddresses rsid pending addresses =
insertRndStateAddresses wid sl addresses =
void $ dbChunked insertMany_
[ RndStateAddress rsid accIx addrIx addr pending
[ RndStateAddress wid sl accIx addrIx addr
| ((W.Index accIx, W.Index addrIx), addr) <- Map.assocs addresses
]

insertRndStatePending
:: RndStateCheckpointId
-> RndStateAddresses
-> SqlPersistT IO ()
insertRndStatePending cpid addresses =
void $ dbChunked insertMany_
[ RndStatePendingAddress cpid accIx addrIx addr
| ((W.Index accIx, W.Index addrIx), addr) <- Map.assocs addresses
]

selectRndStateAddresses
:: RndStateId
-> Bool
-> SqlPersistT IO (Map (W.Index 'W.Hardened 'W.AccountK, W.Index 'W.Hardened 'W.AddressK) W.Address)
selectRndStateAddresses rsid pending = do
:: W.WalletId
-> W.SlotId
-> SqlPersistT IO RndStateAddresses
selectRndStateAddresses wid sl = do
addrs <- fmap entityVal <$> selectList
[ RndStateAddressWalletId ==. wid
, RndStateAddressSlot ==. sl
] []
pure $ Map.fromList $ map assocFromEntity addrs
where
assocFromEntity (RndStateAddress _ _ accIx addrIx addr) =
((W.Index accIx, W.Index addrIx), addr)

selectRndStatePending
:: RndStateCheckpointId
-> SqlPersistT IO RndStateAddresses
selectRndStatePending cpid = do
addrs <- fmap entityVal <$> selectList
[ RndStateAddressRndStateId ==. rsid
, RndStateAddressPending ==. pending
[ RndStatePendingAddressCheckpointId ==. cpid
] []
pure $ Map.fromList $ map assocFromEntity addrs
where
assocFromEntity (RndStateAddress _ accIx addrIx addr _) =
assocFromEntity (RndStatePendingAddress _ accIx addrIx addr) =
((W.Index accIx, W.Index addrIx), addr)

-- | Gets the wallet root key to put in RndState. If there is none yet, just
Expand Down
Loading