-
Notifications
You must be signed in to change notification settings - Fork 200
/
Copy pathRunner.hs
197 lines (157 loc) · 7.46 KB
/
Runner.hs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
{-# LANGUAGE AllowAmbiguousTypes #-}
{-|
Module: IHP.Job.Runner
Description: Functions to run jobs
Copyright: (c) digitally induced GmbH, 2020
-}
module IHP.Job.Runner where
import IHP.Prelude
import IHP.ControllerPrelude
import IHP.ScriptSupport
import qualified IHP.Job.Queue as Queue
import qualified Control.Exception as Exception
import qualified Database.PostgreSQL.Simple.FromField as PG
import qualified Data.UUID.V4 as UUID
import qualified Control.Concurrent as Concurrent
import qualified Control.Concurrent.Async as Async
import qualified System.Posix.Signals as Signals
import qualified System.Exit as Exit
import qualified System.Timeout as Timeout
import qualified IHP.PGListener as PGListener
import qualified IHP.Log as Log
-- | Used by the RunJobs binary
runJobWorkers :: [JobWorker] -> Script
runJobWorkers jobWorkers = dedicatedProcessMainLoop jobWorkers
-- | This job worker main loop is used when the job workers are running as part of their own binary
--
-- In dev mode the IHP dev server is using the 'devServerMainLoop' instead. We have two main loops
-- as the stop handling works a different in those cases.
--
dedicatedProcessMainLoop :: (?modelContext :: ModelContext, ?context :: FrameworkConfig) => [JobWorker] -> IO ()
dedicatedProcessMainLoop jobWorkers = do
threadId <- Concurrent.myThreadId
exitSignalsCount <- newIORef 0
workerId <- UUID.nextRandom
let logger = ?context.logger
Log.info ("Starting worker " <> tshow workerId)
-- The job workers use their own dedicated PG listener as e.g. AutoRefresh or DataSync
-- could overload the main PGListener connection. In that case we still want jobs to be
-- run independent of the system being very busy.
pgListener <- PGListener.init ?modelContext
stopSignal <- Concurrent.newEmptyMVar
waitForExitSignal <- installSignalHandlers
let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }
processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)
waitForExitSignal
Log.info ("Waiting for jobs to complete. CTRL+C again to force exit" :: Text)
-- Stop subscriptions and poller already
-- This will stop all producers for the queue MVar
forEach processes \JobWorkerProcess { poller, subscription, action } -> do
PGListener.unsubscribe subscription pgListener
Async.cancel poller
Concurrent.putMVar action Stop
PGListener.stop pgListener
-- While waiting for all jobs to complete, we also wait for another exit signal
-- If the user sends two exit signals, we just kill all processes
async do
waitForExitSignal
Log.info ("Canceling all running jobs. CTRL+C again to force exit" :: Text)
forEach processes \JobWorkerProcess { runners } -> do
forEach runners Async.cancel
Concurrent.throwTo threadId Exit.ExitSuccess
pure ()
-- Wait for all runners to complete
forEach processes \JobWorkerProcess { runners } -> do
forEach runners Async.wait
Concurrent.throwTo threadId Exit.ExitSuccess
devServerMainLoop :: (?modelContext :: ModelContext) => FrameworkConfig -> PGListener.PGListener -> [JobWorker] -> IO ()
devServerMainLoop frameworkConfig pgListener jobWorkers = do
workerId <- UUID.nextRandom
let ?context = frameworkConfig
let logger = frameworkConfig.logger
Log.info ("Starting worker " <> tshow workerId)
let jobWorkerArgs = JobWorkerArgs { workerId, modelContext = ?modelContext, frameworkConfig = ?context, pgListener }
processes <- jobWorkers
|> mapM (\(JobWorker listenAndRun)-> listenAndRun jobWorkerArgs)
(forever (Concurrent.threadDelay maxBound)) `Exception.finally` do
forEach processes \JobWorkerProcess { poller, subscription, runners, action } -> do
Concurrent.putMVar action Stop
Async.cancel poller
forEach runners Async.cancel
-- | Installs signals handlers and returns an IO action that blocks until the next sigINT or sigTERM is sent
installSignalHandlers :: IO (IO ())
installSignalHandlers = do
exitSignal <- Concurrent.newEmptyMVar
let catchHandler = Concurrent.putMVar exitSignal ()
Signals.installHandler Signals.sigINT (Signals.Catch catchHandler) Nothing
Signals.installHandler Signals.sigTERM (Signals.Catch catchHandler) Nothing
pure (Concurrent.takeMVar exitSignal)
stopExitHandler JobWorkerArgs { .. } main = main
worker :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
, FromRow job
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "runAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorker
worker = JobWorker (jobWorkerFetchAndRunLoop @job)
jobWorkerFetchAndRunLoop :: forall job.
( job ~ GetModelByTableName (GetTableName job)
, FilterPrimaryKey (GetTableName job)
, FromRow job
, Show (PrimaryKey (GetTableName job))
, PG.FromField (PrimaryKey (GetTableName job))
, KnownSymbol (GetTableName job)
, SetField "attemptsCount" job Int
, SetField "lockedBy" job (Maybe UUID)
, SetField "status" job JobStatus
, SetField "updatedAt" job UTCTime
, HasField "attemptsCount" job Int
, SetField "lastError" job (Maybe Text)
, Job job
, CanUpdate job
, Show job
, Table job
) => JobWorkerArgs -> IO JobWorkerProcess
jobWorkerFetchAndRunLoop JobWorkerArgs { .. } = do
let ?context = frameworkConfig
let ?modelContext = modelContext
action <- Concurrent.newMVar JobAvailable
runners <- forM [1..(maxConcurrency @job)] \index -> async do
let loop = do
receivedAction <- Concurrent.takeMVar action
case receivedAction of
JobAvailable -> do
maybeJob <- Queue.fetchNextJob @job (timeoutInMicroseconds @job) (backoffStrategy @job) workerId
case maybeJob of
Just job -> do
Log.info ("Starting job: " <> tshow job)
let ?job = job
let timeout :: Int = fromMaybe (-1) (timeoutInMicroseconds @job)
resultOrException <- Exception.try (Timeout.timeout timeout (perform job))
case resultOrException of
Left exception -> Queue.jobDidFail job exception
Right Nothing -> Queue.jobDidTimeout job
Right (Just _) -> Queue.jobDidSucceed job
loop
Nothing -> loop
Stop -> do
-- Put the stop signal back in to stop the other runners as well
Concurrent.putMVar action Stop
pure ()
loop
(subscription, poller) <- Queue.watchForJob pgListener (tableName @job) (queuePollInterval @job) (timeoutInMicroseconds @job) (backoffStrategy @job) action
pure JobWorkerProcess { runners, subscription, poller, action }