Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

manage schema cache when horizontally scaled (closes #1182) #1574

Merged
merged 25 commits into from
Mar 12, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
a79a6cc
manage schema cache when horizontally scaled, closes #1182
rakeshkky Feb 7, 2019
428f70d
Merge branch 'master' into issue-1182-cache-update
rakeshkky Feb 8, 2019
4539bdc
support pg-client-hs library with retries & improve naming
rakeshkky Feb 21, 2019
2c4fa62
Merge branch 'master' into issue-1182-cache-update
rakeshkky Feb 22, 2019
933b734
Merge branch 'master' into issue-1182-cache-update
rakeshkky Mar 6, 2019
e9c7c2b
added basic tests
arvi3411301 Mar 7, 2019
9fb165f
unset HASURA_GRAPHQL_AUTH_HOOK
arvi3411301 Mar 7, 2019
9504395
add server option in scaling tests
arvi3411301 Mar 7, 2019
1632dd1
fix test script
arvi3411301 Mar 7, 2019
bf30119
fix 'shouldReload' function
rakeshkky Mar 7, 2019
1d8c647
added relationship test
arvi3411301 Mar 7, 2019
054b26a
Merge branch 'issue-1182-cache-update' of github.com:rakeshkky/graphq…
arvi3411301 Mar 7, 2019
cf28c95
added pgbouncer to restart postgres connection
arvi3411301 Mar 7, 2019
4472a21
fix pgbouncer config path
arvi3411301 Mar 7, 2019
6338341
improve schema sync logic to support latest pg-client-hs lib
rakeshkky Mar 8, 2019
7f98d5f
improve invoking schema sync threads & improve thread logging
rakeshkky Mar 11, 2019
cc45860
Merge branch 'master' into issue-1182-cache-update
rakeshkky Mar 11, 2019
4a74839
add pgbouncer user
shahidhk Mar 11, 2019
e5b9b85
add user flag to pgbouncer command
arvi3411301 Mar 11, 2019
d0df8ac
fix tests
arvi3411301 Mar 11, 2019
a5cf3a3
add sleep for 30s
arvi3411301 Mar 11, 2019
db55aa7
add 30s delay for test validation
arvi3411301 Mar 11, 2019
6a8c4a1
fix tests
arvi3411301 Mar 11, 2019
81edef2
fix tests
arvi3411301 Mar 11, 2019
524bf91
run pgbouncer as daemon
arvi3411301 Mar 11, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions server/graphql-engine.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ library
, Hasura.Server.Version
, Hasura.Server.CheckUpdates
, Hasura.Server.Telemetry
, Hasura.Server.CacheUpdate
, Hasura.RQL.Types
, Hasura.RQL.Instances
, Hasura.RQL.Types.SchemaCache
Expand Down Expand Up @@ -316,6 +317,7 @@ executable graphql-engine
, wreq
, connection
, string-conversions
, uuid

other-modules: Ops

Expand Down
36 changes: 32 additions & 4 deletions server/src-exec/Main.hs
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,32 @@ import Options.Applicative
import System.Environment (getEnvironment, lookupEnv)
import System.Exit (exitFailure)


import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.Aeson as A
import qualified Data.ByteString.Char8 as BC
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BLC
import qualified Data.Text as T
import qualified Data.UUID.V4 as UUID
import qualified Data.Yaml as Y
import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Client.TLS as HTTP
import qualified Network.Wai.Handler.Warp as Warp

import Hasura.Events.Lib
import Hasura.Logging (Logger (..), defaultLoggerSettings,
mkLogger, mkLoggerCtx)
import Hasura.Logging
import Hasura.Prelude
import Hasura.RQL.DDL.Metadata (fetchMetadata)
import Hasura.RQL.Types (QErr, adminUserInfo,
emptySchemaCache)
import Hasura.Server.App (mkWaiApp)
import Hasura.Server.Auth
import Hasura.Server.CacheUpdate
import Hasura.Server.CheckUpdates (checkForUpdates)
import Hasura.Server.Init
import Hasura.Server.Logging
import Hasura.Server.Query (peelRun)
import Hasura.Server.Telemetry
import Hasura.Server.Version (currentVersion)
Expand Down Expand Up @@ -100,6 +104,7 @@ main = do
-- global http manager
httpManager <- HTTP.newManager HTTP.tlsManagerSettings
loggerCtx <- mkLoggerCtx $ defaultLoggerSettings True
serverId <- UUID.nextRandom
rakeshkky marked this conversation as resolved.
Show resolved Hide resolved
let logger = mkLogger loggerCtx
case hgeCmd of
HCServe so@(ServeOptions port host cp isoL mAccessKey mAuthHook mJwtSecret
Expand All @@ -117,15 +122,30 @@ main = do
-- log postgres connection info
unLogger logger $ connInfoToLog ci

-- create empty cache update events queue
eventsQueue <- STM.newTQueueIO

-- start postgres cache update events listener thread in background
listenerPool <- getMinimalPool ci
rakeshkky marked this conversation as resolved.
Show resolved Hide resolved
listenerTId <- C.forkIO $ cacheUpdateEventListener listenerPool logger eventsQueue
unLogger logger $ mkThreadLog listenerTId serverId TTListener

-- safe init catalog
initRes <- initialise logger ci httpManager

-- prepare event triggers data
prepareEvents logger ci

pool <- Q.initPGPool ci cp
(app, cacheRef) <- mkWaiApp isoL loggerCtx pool httpManager
am corsCfg enableConsole enableTelemetry

(app, cacheRef, cacheLk, cacheBuiltTime) <-
mkWaiApp isoL loggerCtx pool httpManager am corsCfg enableConsole
enableTelemetry serverId

-- start cache update events processor thread in background
procTId <- C.forkIO $ cacheUpdateEventProcessor pool logger httpManager
eventsQueue cacheRef cacheLk serverId cacheBuiltTime
unLogger logger $ mkThreadLog procTId serverId TTProcessor

let warpSettings = Warp.setPort port $ Warp.setHost host Warp.defaultSettings

Expand Down Expand Up @@ -209,6 +229,14 @@ main = do
res <- runTx ci unlockAllEvents
either printErrJExit return res

mkThreadLog threadId serverId threadType =
let msg = T.pack (show threadType) <> " thread started"
in StartupLog LevelInfo "threads" $
A.object [ "server_id" A..= serverId
, "thread_id" A..= show threadId
, "message" A..= msg
]

getUniqIds ci = do
eDbId <- runTx ci getDbId
dbId <- either printErrJExit return eDbId
Expand Down
15 changes: 13 additions & 2 deletions server/src-exec/Ops.hs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import qualified Database.PG.Query as Q
import qualified Database.PG.Query.Connection as Q

curCatalogVer :: T.Text
curCatalogVer = "9"
curCatalogVer = "10"

initCatalogSafe
:: (QErrM m, UserInfoM m, CacheRWM m, MonadTx m, MonadIO m, HasHttpManager m)
Expand Down Expand Up @@ -318,6 +318,12 @@ from8To9 = do
migrateMetadataFrom8 =
$(unTypeQ (Y.decodeFile "src-rsr/migrate_metadata_from_8_to_9.yaml" :: Q (TExp RQLQuery)))

from9To10 :: MonadTx m => m ()
from9To10 = do
-- migrate database
Q.Discard () <- liftTx $ Q.multiQE defaultTxErrorHandler
$(Q.sqlFromFile "src-rsr/migrate_from_9_to_10.sql")
return ()

migrateCatalog
:: (MonadTx m, CacheRWM m, MonadIO m, UserInfoM m, HasHttpManager m)
Expand All @@ -335,12 +341,17 @@ migrateCatalog migrationTime = do
| preVer == "6" -> from6ToCurrent
| preVer == "7" -> from7ToCurrent
| preVer == "8" -> from8ToCurrent
| preVer == "9" -> from9ToCurrent
| otherwise -> throw400 NotSupported $
"unsupported version : " <> preVer
where
from9ToCurrent = do
from9To10
postMigrate

from8ToCurrent = do
from8To9
postMigrate
from9ToCurrent

from7ToCurrent = do
from7To8
Expand Down
29 changes: 21 additions & 8 deletions server/src-lib/Hasura/Server/App.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import Data.Aeson hiding (json)
import Data.IORef
import Data.Time.Clock (UTCTime,
getCurrentTime)
import Data.UUID (UUID)
import Network.Wai (requestHeaders,
strictRequestBody)
import Web.Spock.Core
Expand Down Expand Up @@ -96,6 +97,7 @@ data ServerCtx
, scCacheLock :: MVar ()
, scAuthMode :: AuthMode
, scManager :: HTTP.Manager
, scServerId :: UUID
}

data HandlerCtx
Expand Down Expand Up @@ -214,7 +216,8 @@ v1QueryHandler query = do
httpMgr <- scManager . hcServerCtx <$> ask
pool <- scPGPool . hcServerCtx <$> ask
isoL <- scIsolation . hcServerCtx <$> ask
runQuery pool isoL userInfo schemaCache httpMgr query
serverId <- scServerId . hcServerCtx <$> ask
runQuery pool isoL serverId userInfo schemaCache httpMgr query

-- Also update the schema cache
dbActionReload = do
Expand Down Expand Up @@ -289,18 +292,24 @@ mkWaiApp
-> CorsConfig
-> Bool
-> Bool
-> IO (Wai.Application, IORef SchemaCache)
mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg enableConsole enableTelemetry = do
cacheRef <- do
-> UUID
rakeshkky marked this conversation as resolved.
Show resolved Hide resolved
-> IO (Wai.Application, IORef SchemaCache, MVar (), UTCTime)
mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg
enableConsole enableTelemetry serverId = do
(cacheRef, cacheBuiltTime) <- do
pgResp <- runExceptT $ peelRun emptySchemaCache adminUserInfo
httpManager pool Q.Serializable buildSchemaCache
either initErrExit return pgResp >>= newIORef . snd
httpManager pool Q.Serializable $ do
buildSchemaCache
liftIO getCurrentTime
rakeshkky marked this conversation as resolved.
Show resolved Hide resolved
(time, sc) <- either initErrExit return pgResp
scRef <- newIORef sc
return (scRef, time)

cacheLock <- newMVar ()

let serverCtx =
ServerCtx isoLevel pool (L.mkLogger loggerCtx) cacheRef
cacheLock mode httpManager
cacheLock mode httpManager serverId

spockApp <- spockAsApp $ spockT id $
httpApp corsCfg serverCtx enableConsole enableTelemetry
Expand All @@ -309,7 +318,11 @@ mkWaiApp isoLevel loggerCtx pool httpManager mode corsCfg enableConsole enableTe

wsServerEnv <- WS.createWSServerEnv (scLogger serverCtx) httpManager cacheRef runTx
let wsServerApp = WS.createWSServerApp mode wsServerEnv
return (WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp, cacheRef)
return ( WS.websocketsOr WS.defaultConnectionOptions wsServerApp spockApp
, cacheRef
, cacheLock
, cacheBuiltTime
)

httpApp :: CorsConfig -> ServerCtx -> Bool -> Bool -> SpockT IO ()
httpApp corsCfg serverCtx enableConsole enableTelemetry = do
Expand Down
178 changes: 178 additions & 0 deletions server/src-lib/Hasura/Server/CacheUpdate.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
module Hasura.Server.CacheUpdate
( ThreadType(..)
, cacheUpdateEventListener
, cacheUpdateEventProcessor
)
where

import Hasura.Prelude

import Hasura.Logging
import Hasura.RQL.DDL.Schema.Table (buildSchemaCache)
import Hasura.RQL.Types
import Hasura.Server.App (withLock)
import Hasura.Server.Query
import Hasura.Server.Utils (bsToTxt)

import Control.Concurrent.MVar
import Data.Aeson
import Data.Aeson.Casing
import Data.Aeson.TH
import Data.UUID
import Database.PG.Query

import qualified Control.Concurrent as C
import qualified Control.Concurrent.STM as STM
import qualified Data.IORef as IORef
import qualified Data.Text as T
import qualified Data.Time as UTC
import qualified Database.PostgreSQL.LibPQ as PQ
import qualified Network.HTTP.Client as HTTP

pgChannel :: PGChannel
pgChannel = "hasura_cache_update"

data ThreadType
= TTListener
| TTProcessor
deriving (Eq)

instance Show ThreadType where
show TTListener = "listener"
show TTProcessor = "processor"


data CacheUpdateEventLog
= CacheUpdateEventLog
{ cuelLogLevel :: !LogLevel
, cuelThreadType :: !ThreadType
, cuelInfo :: !Value
} deriving (Show, Eq)

instance ToJSON CacheUpdateEventLog where
toJSON (CacheUpdateEventLog _ t info) =
object [ "thread_type" .= show t
, "info" .= info
]

instance ToEngineLog CacheUpdateEventLog where
toEngineLog threadLog =
(cuelLogLevel threadLog, "cache_update_event", toJSON threadLog)

data EventPayload
= EventPayload
{ _epServerId :: !UUID
, _epOccurredAt :: !UTC.UTCTime
} deriving (Show, Eq)
$(deriveJSON (aesonDrop 3 snakeCase) ''EventPayload)

data CacheUpdateEvent
= CUEListenSuccess !EventPayload
| CUEListenFail
deriving (Show, Eq)

instance ToJSON CacheUpdateEvent where
toJSON (CUEListenSuccess payload) = toJSON payload
toJSON CUEListenFail = String "event listening failed"

data ThreadError
= TEJsonParse !T.Text
| TEQueryError !QErr
$(deriveToJSON
defaultOptions { constructorTagModifier = snakeCase . drop 2
, sumEncoding = TaggedObject "type" "info"
}
''ThreadError)

-- | An IO action that listens to postgres for events and pushes them to a Queue
cacheUpdateEventListener
:: PGPool
-> Logger
-> STM.TQueue CacheUpdateEvent
-> IO ()
cacheUpdateEventListener pool logger eventsQueue =
-- Never exits
forever $ do
listenResE <- liftIO $ runExceptT $ listen pool pgChannel notifyHandler
either onError return listenResE
where
threadType = TTListener

onError e = do
logError logger threadType $ TEQueryError e
-- Push a listen failed event to queue
STM.atomically $ STM.writeTQueue eventsQueue CUEListenFail
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't emit a 'listen error' event. Emit an event once the connection is re-established after an error. You'll probably need to expose this functionality from pg-client-hs, probably listen can take something like this as an argument:

data NotifyHandler 
  = NotifyHandler 
  { _ehOnInit :: IO ()
  , _ehOnMessage :: Notify -> IO ()
  }

logInfo logger threadType $
object ["message" .= ("retrying in 10 seconds" :: T.Text)]
C.threadDelay $ 10 * 1000 * 1000

-- Postgres notification handler
notifyHandler notif = do
let eventChannel = PGChannel $ bsToTxt $ PQ.notifyRelname notif
when (eventChannel == pgChannel) $
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do this check again?

case eitherDecodeStrict $ PQ.notifyExtra notif of
Left e -> logError logger threadType $ TEJsonParse $ T.pack e
Right payload -> do
let newEvent = CUEListenSuccess payload
logInfo logger threadType $ object [ "received_event" .= newEvent]
-- Push a success event to Queue along with event payload
STM.atomically $ STM.writeTQueue eventsQueue newEvent

-- | An IO action that processes events from Queue
cacheUpdateEventProcessor
:: PGPool
-> Logger
-> HTTP.Manager
-> STM.TQueue CacheUpdateEvent
-> IORef.IORef SchemaCache
-> MVar ()
-> UUID
-> UTC.UTCTime
-> IO ()
cacheUpdateEventProcessor pool logger httpManager eventsQueue
cacheRef lk serverId cacheInit = do
-- Initiate previous event IO reference
prevEventRef <- IORef.newIORef Nothing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't have to store the previous event ref after the above suggested change (emit event on reconnection).


-- Never exits
forever $ do
event <- STM.atomically $ STM.readTQueue eventsQueue
prevEvent <- IORef.readIORef prevEventRef
logInfo logger threadType $
object [ "previous_event" .= prevEvent
, "processed_event" .= event
]
when (shouldReload prevEvent event) $ do
-- Build schema cache from catalog
scE <- liftIO $ runExceptT $ withLock lk $
snd <$> peelRun emptySchemaCache adminUserInfo
httpManager pool Serializable buildSchemaCache
case scE of
Left e -> logError logger threadType $ TEQueryError e
Right sc -> do
-- Updata cache reference with newly built cache
IORef.writeIORef cacheRef sc
logInfo logger threadType $
object ["message" .= ("schema cache reloaded" :: T.Text)]

IORef.writeIORef prevEventRef $ Just event
where
threadType = TTProcessor

-- If previous event is failed and current event is success,
-- reload irrespective current event payload
shouldReload (Just CUEListenFail) (CUEListenSuccess _) = True
-- If current event is failed, do not reload
shouldReload _ CUEListenFail = False
shouldReload _ (CUEListenSuccess payload) =
-- If event is from another server and occurred after
-- init schema cache built then reload
(_epServerId payload /= serverId) && (_epOccurredAt payload > cacheInit)

logInfo :: Logger -> ThreadType -> Value -> IO ()
logInfo logger threadType val = unLogger logger $
CacheUpdateEventLog LevelInfo threadType val

logError :: ToJSON a => Logger -> ThreadType -> a -> IO ()
logError logger threadType err = unLogger logger $
CacheUpdateEventLog LevelError threadType $ object ["error" .= toJSON err]
Loading