Skip to content

Commit

Permalink
add seperate db connections for vote and pool
Browse files Browse the repository at this point in the history
  • Loading branch information
Cmdv committed Dec 20, 2023
1 parent 3750473 commit 489ee3d
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 19 deletions.
2 changes: 1 addition & 1 deletion cardano-db-sync/src/Cardano/DbSync.hs
Original file line number Diff line number Diff line change
Expand Up @@ -167,12 +167,12 @@ runSyncNode metricsSetters trce iomgr dbConnString ranMigrations runMigrationFnc
runOrThrowIO $ runExceptT $ do
genCfg <- readCardanoGenesisConfig syncNodeConfig
logProtocolMagicId trce $ genesisProtocolMagicId genCfg

syncEnv <-
ExceptT $
mkSyncEnvFromConfig
trce
backend
dbConnString
syncOptions
genCfg
syncNodeParams
Expand Down
9 changes: 7 additions & 2 deletions cardano-db-sync/src/Cardano/DbSync/Api.hs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import Control.Concurrent.Class.MonadSTM.Strict (
import Control.Monad.Trans.Maybe (MaybeT (..))
import qualified Data.Strict.Maybe as Strict
import Data.Time.Clock (getCurrentTime)
import Database.Persist.Postgresql (ConnectionString)
import Database.Persist.Sql (SqlBackend)
import Ouroboros.Consensus.Block.Abstract (BlockProtocol, HeaderHash, Point (..), fromRawHash)
import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..))
Expand Down Expand Up @@ -331,6 +332,7 @@ getCurrentTipBlockNo env = do
mkSyncEnv ::
Trace IO Text ->
SqlBackend ->
ConnectionString ->
SyncOptions ->
ProtocolInfo CardanoBlock ->
Ledger.Network ->
Expand All @@ -340,7 +342,7 @@ mkSyncEnv ::
Bool ->
RunMigration ->
IO SyncEnv
mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart syncNP ranMigrations runMigrationFnc = do
mkSyncEnv trce backend connectionString syncOptions protoInfo nw nwMagic systemStart syncNP ranMigrations runMigrationFnc = do
dbCNamesVar <- newTVarIO =<< dbConstraintNamesExists backend
cache <- if soptCache syncOptions then newEmptyCache 250000 50000 else pure uninitiatedCache
consistentLevelVar <- newTVarIO Unchecked
Expand Down Expand Up @@ -379,6 +381,7 @@ mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart syncNP ranMi
SyncEnv
{ envBackend = backend
, envCache = cache
, envConnectionString = connectionString
, envConsistentLevel = consistentLevelVar
, envDbConstraints = dbCNamesVar
, envEpochState = epochVar
Expand All @@ -400,6 +403,7 @@ mkSyncEnv trce backend syncOptions protoInfo nw nwMagic systemStart syncNP ranMi
mkSyncEnvFromConfig ::
Trace IO Text ->
SqlBackend ->
ConnectionString ->
SyncOptions ->
GenesisConfig ->
SyncNodeParams ->
Expand All @@ -408,7 +412,7 @@ mkSyncEnvFromConfig ::
-- | run migration function
RunMigration ->
IO (Either SyncNodeError SyncEnv)
mkSyncEnvFromConfig trce backend syncOptions genCfg syncNodeParams ranMigration runMigrationFnc =
mkSyncEnvFromConfig trce backend connectionString syncOptions genCfg syncNodeParams ranMigration runMigrationFnc =
case genCfg of
GenesisCardano _ bCfg sCfg _ _
| unProtocolMagicId (Byron.configProtocolMagicId bCfg) /= Shelley.sgNetworkMagic (scConfig sCfg) ->
Expand Down Expand Up @@ -436,6 +440,7 @@ mkSyncEnvFromConfig trce backend syncOptions genCfg syncNodeParams ranMigration
<$> mkSyncEnv
trce
backend
connectionString
syncOptions
(fst $ mkProtocolInfoCardano genCfg [])
(Shelley.sgNetworkId $ scConfig sCfg)
Expand Down
2 changes: 2 additions & 0 deletions cardano-db-sync/src/Cardano/DbSync/Api/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,15 @@ import Control.Concurrent.Class.MonadSTM.Strict (
import Control.Concurrent.Class.MonadSTM.Strict.TBQueue (StrictTBQueue)
import qualified Data.Strict.Maybe as Strict
import Data.Time.Clock (UTCTime)
import Database.Persist.Postgresql (ConnectionString)
import Database.Persist.Sql (SqlBackend)
import Ouroboros.Consensus.BlockchainTime.WallClock.Types (SystemStart (..))
import Ouroboros.Network.Magic (NetworkMagic (..))

data SyncEnv = SyncEnv
{ envBackend :: !SqlBackend
, envCache :: !Cache
, envConnectionString :: !ConnectionString
, envConsistentLevel :: !(StrictTVar IO ConsistentLevel)
, envDbConstraints :: !(StrictTVar IO DB.ManualDbConstraints)
, envEpochState :: !(StrictTVar IO EpochState)
Expand Down
40 changes: 24 additions & 16 deletions cardano-db-sync/src/Cardano/DbSync/OffChain.hs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ module Cardano.DbSync.OffChain (
) where

import Cardano.BM.Trace (Trace, logInfo)
import Cardano.Db (runIohkLogging)
import qualified Cardano.Db as DB
import Cardano.DbSync.Api
import Cardano.DbSync.Api.Types (InsertOptions (..), SyncEnv (..))
Expand All @@ -34,6 +35,7 @@ import Control.Concurrent.Class.MonadSTM.Strict (
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.Time.Clock.POSIX (POSIXTime)
import qualified Data.Time.Clock.POSIX as Time
import Database.Persist.Postgresql (withPostgresqlConn)
import Database.Persist.Sql (SqlBackend)
import qualified Network.HTTP.Client as Http
import Network.HTTP.Client.TLS (tlsManagerSettings)
Expand Down Expand Up @@ -167,14 +169,17 @@ runFetchOffChainPoolThread syncEnv = do
-- if dissable gov is active then don't run voting anchor thread
when (ioOffChainPoolData iopts) $ do
logInfo trce "Running Offchain Pool fetch thread"
forever $ do
tDelay
-- load the offChain vote work queue using the db
_ <- runReaderT (loadOffChainPoolWorkQueue trce (envOffChainPoolWorkQueue syncEnv)) (envBackend syncEnv)
poolq <- atomically $ flushTBQueue (envOffChainPoolWorkQueue syncEnv)
manager <- Http.newManager tlsManagerSettings
now <- liftIO Time.getPOSIXTime
mapM_ (queuePoolInsert <=< fetchOffChainPoolData trce manager now) poolq
runIohkLogging trce $
withPostgresqlConn (envConnectionString syncEnv) $
\backendPool -> liftIO $
forever $ do
tDelay
-- load the offChain vote work queue using the db
_ <- runReaderT (loadOffChainPoolWorkQueue trce (envOffChainPoolWorkQueue syncEnv)) backendPool
poolq <- atomically $ flushTBQueue (envOffChainPoolWorkQueue syncEnv)
manager <- Http.newManager tlsManagerSettings
now <- liftIO Time.getPOSIXTime
mapM_ (queuePoolInsert <=< fetchOffChainPoolData trce manager now) poolq
where
trce = getTrace syncEnv
iopts = getInsertOptions syncEnv
Expand All @@ -187,14 +192,17 @@ runFetchOffChainVoteThread syncEnv = do
-- if dissable gov is active then don't run voting anchor thread
when (ioGov iopts) $ do
logInfo trce "Running Offchain Vote Anchor fetch thread"
forever $ do
tDelay
-- load the offChain pool work queue using the db
_ <- runReaderT (loadOffChainVoteWorkQueue trce (envOffChainVoteWorkQueue syncEnv)) (envBackend syncEnv)
voteq <- atomically $ flushTBQueue (envOffChainVoteWorkQueue syncEnv)
manager <- Http.newManager tlsManagerSettings
now <- liftIO Time.getPOSIXTime
mapM_ (queueVoteInsert <=< fetchOffChainVoteData trce manager now) voteq
runIohkLogging trce $
withPostgresqlConn (envConnectionString syncEnv) $
\backendVote -> liftIO $
forever $ do
tDelay
-- load the offChain pool work queue using the db
_ <- runReaderT (loadOffChainVoteWorkQueue trce (envOffChainVoteWorkQueue syncEnv)) backendVote
voteq <- atomically $ flushTBQueue (envOffChainVoteWorkQueue syncEnv)
manager <- Http.newManager tlsManagerSettings
now <- liftIO Time.getPOSIXTime
mapM_ (queueVoteInsert <=< fetchOffChainVoteData trce manager now) voteq
where
trce = getTrace syncEnv
iopts = getInsertOptions syncEnv
Expand Down

0 comments on commit 489ee3d

Please sign in to comment.