Skip to content

Commit

Permalink
improve schema sync logic to support latest pg-client-hs lib
Browse files Browse the repository at this point in the history
  • Loading branch information
rakeshkky committed Mar 8, 2019
1 parent 4472a21 commit 6338341
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 106 deletions.
30 changes: 6 additions & 24 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import System.Exit (exitFailure)


import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
Expand Down Expand Up @@ -100,7 +99,7 @@ printYaml :: (A.ToJSON a) => a -> IO ()
printYaml = BC.putStrLn . Y.encode

mkPGLogger :: Logger -> Q.PGLogger
mkPGLogger (Logger logger) msg =
mkPGLogger (Logger logger) (Q.PLERetryMsg msg) =
logger $ PGLog LevelWarn msg

main :: IO ()
Expand Down Expand Up @@ -128,31 +127,21 @@ main = do
-- log postgres connection info
unLogger logger $ connInfoToLog ci

-- create empty cache update events queue
eventsQueue <- STM.newTQueueIO

pool <- Q.initPGPool ci cp pgLogger

-- start postgres cache update events listener thread in background
listenerTId <- C.forkIO $ schemaUpdateEventListener pool logger eventsQueue
unLogger logger $ mkThreadLog listenerTId instanceId TTListener

-- safe init catalog
initRes <- initialise logger ci httpManager

-- prepare event triggers data
prepareEvents logger ci

(app, cacheRef, cacheBuiltTime) <-
(app, cacheRef, cacheInitTime) <-
mkWaiApp isoL loggerCtx strfyNum pool httpManager am
corsCfg enableConsole enableTelemetry instanceId enabledAPIs

let scRef = _scrCache cacheRef

-- start cache update events processor thread in background
procTId <- C.forkIO $ schemaUpdateEventProcessor strfyNum pool logger httpManager
eventsQueue cacheRef instanceId cacheBuiltTime
unLogger logger $ mkThreadLog procTId instanceId TTProcessor
-- start a background thread for schema sync
void $ C.forkIO $ startSchemaSync strfyNum pool logger httpManager
cacheRef instanceId cacheInitTime

let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings

Expand All @@ -162,6 +151,7 @@ main = do

eventEngineCtx <- atomically $ initEventEngineCtx maxEvThrds evFetchMilliSec

let scRef = _scrCache cacheRef
unLogger logger $
mkGenericStrLog "event_triggers" "starting workers"
void $ C.forkIO $ processEventQueue hloggerCtx logEnvHeaders httpManager pool scRef eventEngineCtx
Expand Down Expand Up @@ -235,14 +225,6 @@ main = do
res <- runTx pgLogger ci unlockAllEvents
either printErrJExit return res

mkThreadLog threadId instanceId threadType =
let msg = T.pack (show threadType) <> " thread started"
in StartupLog LevelInfo "threads" $
A.object [ "instance_id" A..= getInstanceId instanceId
, "thread_id" A..= show threadId
, "message" A..= msg
]

getUniqIds pgLogger ci = do
eDbId <- runTx pgLogger ci getDbId
dbId <- either printErrJExit return eDbId
Expand Down
4 changes: 2 additions & 2 deletions server/src-lib/Hasura/Server/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -314,10 +314,10 @@ mkWaiApp isoLevel loggerCtx strfyNum pool httpManager mode corsCfg
pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo
httpManager strfyNum pool Q.Serializable $ do
buildSchemaCache
fetchLastUpdateTime
liftTx fetchLastUpdate
(time, sc) <- either initErrExit return pgResp
scRef <- newIORef sc
return (scRef, time)
return (scRef, snd <$> time)

cacheLock <- newMVar ()

Expand Down
9 changes: 3 additions & 6 deletions server/src-lib/Hasura/Server/Init.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ import Hasura.Server.Utils
import Network.Wai.Handler.Warp

newtype InstanceId
= InstanceId {getInstanceId :: UUID.UUID}
deriving (Show, Eq)

instanceIdToTxt :: InstanceId -> T.Text
instanceIdToTxt = UUID.toText . getInstanceId
= InstanceId {getInstanceId :: T.Text}
deriving (Show, Eq, J.ToJSON, J.FromJSON)

mkInstanceId :: IO InstanceId
mkInstanceId = InstanceId <$> UUID.nextRandom
mkInstanceId = (InstanceId . UUID.toText) <$> UUID.nextRandom

initErrExit :: (Show e) => e -> IO a
initErrExit e = print e >> exitFailure
Expand Down
19 changes: 10 additions & 9 deletions server/src-lib/Hasura/Server/Query.hs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ import Hasura.RQL.DML.Returning (encodeJSONVector)
import Hasura.RQL.DML.Select
import Hasura.RQL.DML.Update
import Hasura.RQL.Types
import Hasura.Server.Init (InstanceId (..),
instanceIdToTxt)
import Hasura.Server.Init (InstanceId (..))
import Hasura.Server.Utils

import qualified Database.PG.Query as Q
Expand Down Expand Up @@ -119,16 +118,18 @@ instance HasHttpManager Run where
instance HasSQLGenCtx Run where
askSQLGenCtx = asks _3

fetchLastUpdateTime :: Run (Maybe UTCTime)
fetchLastUpdateTime = do
l <- liftTx $ Q.listQE defaultTxErrorHandler
fetchLastUpdate :: Q.TxE QErr (Maybe (InstanceId, UTCTime))
fetchLastUpdate = do
l <- Q.listQE defaultTxErrorHandler
[Q.sql|
SELECT occurred_at FROM hdb_catalog.hdb_schema_update_event
ORDER BY id DESC LIMIT 1
SELECT instance_id::text, occurred_at
FROM hdb_catalog.hdb_schema_update_event
ORDER BY occurred_at DESC LIMIT 1
|] () True
case l of
[] -> return Nothing
[Identity t] -> return $ Just t
[(instId, occurredAt)] ->
return $ Just (InstanceId instId, occurredAt)
-- never happens
_ -> throw500 "more than one row returned by query"

Expand All @@ -139,7 +140,7 @@ recordSchemaUpdate instanceId =
hdb_catalog.hdb_schema_update_event
(instance_id, occurred_at)
VALUES ($1::uuid, DEFAULT)
|] (Identity $ instanceIdToTxt instanceId) True
|] (Identity $ getInstanceId instanceId) True

peelRun
:: SchemaCache
Expand Down
182 changes: 118 additions & 64 deletions server/src-lib/Hasura/Server/SchemaUpdate.hs
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
module Hasura.Server.SchemaUpdate
( ThreadType(..)
, schemaUpdateEventListener
, schemaUpdateEventProcessor
)
(startSchemaSync)
where

import Hasura.Prelude
Expand All @@ -12,13 +9,15 @@ import Hasura.RQL.DDL.Schema.Table (buildSchemaCache)
import Hasura.RQL.Types
import Hasura.Server.App (SchemaCacheRef (..), withSCUpdate)
import Hasura.Server.Init (InstanceId (..))
import Hasura.Server.Logging
import Hasura.Server.Query

import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.UUID

import qualified Control.Concurrent as C
import qualified Control.Concurrent.Async as A
import qualified Control.Concurrent.STM as STM
import qualified Data.Text as T
import qualified Data.Time as UTC
Expand Down Expand Up @@ -58,20 +57,11 @@ instance ToEngineLog SchemaUpdateEventLog where

data EventPayload
= EventPayload
{ _epInstanceId :: !UUID
{ _epInstanceId :: !InstanceId
, _epOccurredAt :: !UTC.UTCTime
} deriving (Show, Eq)
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)

data SchemaUpdateEvent
= SUESuccess !EventPayload
| SUEPGReConn
deriving (Show, Eq)

instance ToJSON SchemaUpdateEvent where
toJSON (SUESuccess payload) = toJSON payload
toJSON SUEPGReConn = String "postgres reconnection"

data ThreadError
= TEJsonParse !T.Text
| TEQueryError !QErr
Expand All @@ -81,83 +71,147 @@ $(deriveToJSON
}
''ThreadError)

-- | An IO action that enables metadata syncing
startSchemaSync
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
startSchemaSync strfyNum pool logger httpMgr cacheRef instanceId cacheInitTime = do
-- Init events queue
eventsQueue <- STM.newTQueueIO
-- Start listener thread
lAsync <- A.async $ listener strfyNum pool
logger httpMgr eventsQueue cacheRef instanceId cacheInitTime

-- Start processor thread
pAsync <- A.async $ processor strfyNum pool
logger httpMgr eventsQueue cacheRef instanceId

void $ A.waitAny [lAsync, pAsync]

-- | An IO action that listens to postgres for events and pushes them to a Queue
schemaUpdateEventListener
:: PG.PGPool
listener
:: Bool
-> PG.PGPool
-> Logger
-> STM.TQueue SchemaUpdateEvent
-> IO ()
schemaUpdateEventListener pool logger eventsQueue =
-> HTTP.Manager
-> STM.TQueue EventPayload
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime -> IO ()
listener strfyNum pool logger httpMgr eventsQueue
cacheRef instanceId cacheInitTime = do
logThreadStartup logger instanceId threadType
-- Never exits
forever $ do
listenResE <- liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
listenResE <-
liftIO $ runExceptT $ PG.listen pool pgChannel notifyHandler
either onError return listenResE
logWarn
C.threadDelay $ 1 * 1000 * 1000 -- 1 second
where
notifyHandler = PG.NotifyHandler onReconn onMessage
threadType = TTListener

shouldRefresh dbInstId accrdAt =
case cacheInitTime of
Nothing -> True
Just time -> (dbInstId /= instanceId) && accrdAt > time

refreshCache Nothing = return ()
refreshCache (Just (dbInstId, accrdAt)) =
when (shouldRefresh dbInstId accrdAt) $
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
threadType "reloading schema cache on listen start"

notifyHandler = \case
PG.PNEOnStart -> do
eRes <- runExceptT $ PG.runTx pool
(PG.Serializable, Nothing) fetchLastUpdate
case eRes of
Left e -> onError e
Right mLastUpd -> refreshCache mLastUpd

PG.PNEPQNotify notif ->
case eitherDecodeStrict $ PQ.notifyExtra notif of
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
Right payload -> do
logInfo logger threadType $ object ["received_event" .= payload]
-- Push a notify event to Queue
STM.atomically $ STM.writeTQueue eventsQueue payload

onError = logError logger threadType . TEQueryError
logWarn = unLogger logger $
SchemaUpdateEventLog LevelWarn TTListener $ String
"error occured retrying pg listen after 1 second"

onReconn = do
-- emit postgres reconnection event
let event = SUEPGReConn
logInfo logger threadType $ object ["received_event" .= event]
STM.atomically $ STM.writeTQueue eventsQueue event

-- Postgres notification handler
onMessage notif =
case eitherDecodeStrict $ PQ.notifyExtra notif of
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
Right payload -> do
let newEvent = SUESuccess payload
logInfo logger threadType $ object ["received_event" .= newEvent]
-- Push a success event to Queue along with event payload
STM.atomically $ STM.writeTQueue eventsQueue newEvent

-- | An IO action that processes events from Queue
schemaUpdateEventProcessor
processor
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> STM.TQueue SchemaUpdateEvent
-> STM.TQueue EventPayload
-> SchemaCacheRef
-> InstanceId
-> Maybe UTC.UTCTime
-> IO ()
schemaUpdateEventProcessor strfyNum pool logger httpManager
eventsQueue cacheRef instanceId cacheInit =
-> InstanceId -> IO ()
processor strfyNum pool logger httpMgr eventsQueue
cacheRef instanceId = do
logThreadStartup logger instanceId threadType
-- Never exits
forever $ do
event <- STM.atomically $ STM.readTQueue eventsQueue
logInfo logger threadType $ object ["processed_event" .= event]
when (shouldReload event) $ do
-- Reload schema cache from catalog
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $
peelRun emptySchemaCache adminUserInfo
httpManager strfyNum pool PG.Serializable buildSchemaCache
case resE of
Left e -> logError logger threadType $ TEQueryError e
Right _ ->
logInfo logger threadType $
object ["message" .= ("schema cache reloaded" :: T.Text)]
when (shouldReload event) $
refreshSchemaCache strfyNum pool logger httpMgr cacheRef
threadType "schema cache reloaded"
where
threadType = TTProcessor

-- If postgres reconnect happens reload schema cache
shouldReload SUEPGReConn = True
-- If event is from another server and occurred after
-- init schema cache built then reload
shouldReload (SUESuccess payload) =
(_epInstanceId payload /= getInstanceId instanceId)
&& maybe True (withCacheInit $ _epOccurredAt payload) cacheInit
-- If event is from another server
shouldReload payload = _epInstanceId payload /= instanceId

withCacheInit occurredAt initTime = occurredAt > initTime
logThreadStartup
:: Show a
=> Logger
-> InstanceId
-> a -> IO ()
logThreadStartup logger instanceId threadType =
unLogger logger threadLog
where
threadLog =
let msg = T.pack (show threadType) <> " thread started"
in StartupLog LevelInfo "threads" $
object [ "instance_id" .= getInstanceId instanceId
, "message" .= msg
]

refreshSchemaCache
:: Bool
-> PG.PGPool
-> Logger
-> HTTP.Manager
-> SchemaCacheRef
-> ThreadType
-> T.Text -> IO ()
refreshSchemaCache strfyNum pool logger httpManager cacheRef threadType msg = do
-- Reload schema cache from catalog
resE <- liftIO $ runExceptT $ withSCUpdate cacheRef $
peelRun emptySchemaCache adminUserInfo
httpManager strfyNum pool PG.Serializable buildSchemaCache
case resE of
Left e -> logError logger threadType $ TEQueryError e
Right _ ->
logInfo logger threadType $ object ["message" .= msg]

logInfo :: Logger -> ThreadType -> Value -> IO ()
logInfo logger threadType val = unLogger logger $
SchemaUpdateEventLog LevelInfo threadType val

logError :: ToJSON a => Logger -> ThreadType -> a -> IO ()
logError logger threadType err = unLogger logger $
SchemaUpdateEventLog LevelError threadType $ object ["error" .= toJSON err]
logError logger threadType err =
unLogger logger $ SchemaUpdateEventLog LevelError threadType $
object ["error" .= toJSON err]
Loading

0 comments on commit 6338341

Please sign in to comment.