-
Notifications
You must be signed in to change notification settings - Fork 2
/
ConcurrentJobRunnerImpl.scala
176 lines (149 loc) · 6.43 KB
/
ConcurrentJobRunnerImpl.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package za.co.absa.pramen.core.runner.jobrunner
import com.github.yruslan.channel.{Channel, ReadChannel}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.DataFormat
import za.co.absa.pramen.api.status.{RunStatus, TaskResult}
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.peristence.TransientJobManager
import za.co.absa.pramen.core.pipeline.Job
import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunner.JobRunResults
import za.co.absa.pramen.core.runner.splitter.ScheduleParams
import za.co.absa.pramen.core.runner.task.TaskRunner
import za.co.absa.pramen.core.utils.Emoji
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors.newFixedThreadPool
import scala.concurrent.ExecutionContext.fromExecutorService
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, ExecutionContextExecutorService, Future}
import scala.util.control.NonFatal
class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
bookkeeper: Bookkeeper,
taskRunner: TaskRunner,
applicationId: String) extends ConcurrentJobRunner {
private val log = LoggerFactory.getLogger(this.getClass)
private var loopStarted = false
private val completedJobsChannel = Channel.make[JobRunResults](1)
private var workersFuture: Future[Unit] = _
private val executor: ExecutorService = newFixedThreadPool(runtimeConfig.parallelTasks)
implicit private val executionContext: ExecutionContextExecutorService = fromExecutorService(executor)
override def getCompletedJobsChannel: ReadChannel[JobRunResults] = {
completedJobsChannel
}
override def startWorkerLoop(incomingJobs: ReadChannel[Job]): Unit = {
if (loopStarted) {
throw new IllegalStateException("Worker loop already started")
}
loopStarted = true
val workers = Range(0, runtimeConfig.parallelTasks).map(workerNum => {
Future {
workerLoop(workerNum, incomingJobs)
}
})
workersFuture = Future.sequence(workers).map(_ => ())
}
def shutdown(): Unit = {
if (!loopStarted) {
throw new IllegalStateException("Worker loop hasn't started yet")
}
log.info("Waiting for worker threads to finish...")
Await.result(workersFuture, Duration.Inf)
log.info("Workers have finished. Shutting down the execution context...")
executionContext.shutdown()
log.info("The execution context is now finished.")
loopStarted = false
}
private def workerLoop(workerNum: Int, incomingJobs: ReadChannel[Job]): Unit = {
incomingJobs.foreach { job =>
val isTransient = job.outputTable.format.isTransient
try {
log.info(s"Worker $workerNum starting job '${job.name}' that outputs to '${job.outputTable.name}'...")
val isSucceeded = runJob(job)
completedJobsChannel.send((job, Nil, isSucceeded))
} catch {
case ex: FatalErrorWrapper if ex.cause != null => onFatalException(ex.cause, job, isTransient)
case NonFatal(ex) => onNonFatalException(ex, job, isTransient)
case ex: Throwable => onFatalException(ex, job, isTransient)
}
}
completedJobsChannel.close()
}
private[core] def onFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex)
val fatalEx = new FatalErrorWrapper(s"FATAL exception encountered, stopping the pipeline.", ex)
sendFailure(fatalEx, job, isTransient)
}
private[core] def onNonFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
log.error(s"${Emoji.FAILURE} Job '${job.name}' outputting to '${job.outputTable.name}' has thrown an error", ex)
sendFailure(ex, job, isTransient)
}
private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
completedJobsChannel.send((job,
TaskResult(job.name,
MetaTable.getMetaTableDef(job.outputTable),
RunStatus.Failed(ex),
None,
applicationId,
isTransient,
job.outputTable.format.isInstanceOf[DataFormat.Raw],
Nil,
Nil,
Nil,
job.operation.extraOptions
) :: Nil, false))
}
private[core] def runJob(job: Job): Boolean = {
if (job.outputTable.format.isLazy) {
runLazyJob(job)
} else {
runEagerJob(job)
}
}
private[core] def runEagerJob(job: Job): Boolean = {
val trackDays = job.trackDays
log.info(s"Effective track days for ${job.name} outputting to ${job.outputTable.name} = $trackDays")
val scheduleParams = ScheduleParams.fromRuntimeConfig(runtimeConfig, trackDays, job.operation.expectedDelayDays)
val taskDefs = job.scheduleStrategy.getDaysToRun(
job.outputTable.name,
job.operation.dependencies,
bookkeeper,
job.operation.outputInfoDateExpression,
job.operation.schedule,
scheduleParams,
job.operation.initialSourcingDateExpression,
job.outputTable.infoDateStart
)
log.info(s"Dates selected to run for '${job.outputTable.name}': ${taskDefs.map(_.infoDate).mkString(", ")}")
val fut = taskRunner.runJobTasks(job, taskDefs)
log.info("Waiting for all job tasks to finish...")
val statuses = Await.result(fut, Duration.Inf)
log.info("All job tasks have finished.")
// Rethrow fatal errors so the pipeline can be stopped asap.
statuses.foreach {
case RunStatus.Failed(ex) if ex.isInstanceOf[FatalErrorWrapper] => throw ex
case _ => // skip
}
statuses.forall(s => !s.isFailure)
}
private[core] def runLazyJob(job: Job): Boolean = {
TransientJobManager.addLazyJob(job)
true
}
}