-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathJobBase.scala
269 lines (225 loc) · 10.8 KB
/
JobBase.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package za.co.absa.pramen.core.pipeline
import com.typesafe.config.Config
import org.apache.spark.sql.types.StructType
import org.slf4j.{Logger, LoggerFactory}
import za.co.absa.pramen.api.jobdef.Schedule
import za.co.absa.pramen.api.status.{DependencyFailure, DependencyWarning, JobType, MetastoreDependency, TaskDef, TaskRunReason}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.{Emoji, TimeUtils}
import java.time.{Instant, LocalDate}
import scala.util.{Failure, Success, Try}
abstract class JobBase(operationDef: OperationDef,
metastore: Metastore,
bookkeeper: Bookkeeper,
jobNotificationTargets: Seq[JobNotificationTarget],
outputTableDef: MetaTable
) extends Job {
protected val log: Logger = LoggerFactory.getLogger(this.getClass)
def jobType: JobType
override def taskDef: TaskDef = TaskDef(
name, jobType, MetaTable.getMetaTableDef(outputTableDef), operationDef.schedule, operationDef.operationConf
)
override val name: String = operationDef.name
override val outputTable: MetaTable = outputTableDef
override val operation: OperationDef = operationDef
override val allowRunningTasksInParallel: Boolean = operationDef.allowParallel && !hasSelfDependencies
override def notificationTargets: Seq[JobNotificationTarget] = jobNotificationTargets
override def trackDays: Int = outputTable.trackDays
def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult
def isIncremental: Boolean = operationDef.schedule == Schedule.Incremental
final override def preRunCheck(infoDate: LocalDate,
runReason: TaskRunReason,
conf: Config): JobPreRunResult = {
val validationFailures = operationDef.dependencies.flatMap(dependency => {
checkDependency(dependency, infoDate)
})
val dependencyErrors = validationFailures.filter(!_.dep.isOptional)
val dependencyWarnings = validationFailures
.filter(_.dep.isOptional)
.flatMap(failure => failure.failedTables)
.sortBy(identity)
.map(table => DependencyWarning(table))
if (dependencyErrors.nonEmpty) {
log.warn(s"Job for table ${outputTableDef.name} at $infoDate has validation failures.")
val isFailure = dependencyErrors.exists(!_.dep.isPassive)
JobPreRunResult(JobPreRunStatus.FailedDependencies(isFailure, dependencyErrors), None, dependencyWarnings, Seq.empty[String])
} else {
if (dependencyWarnings.nonEmpty) {
log.info(s"Job for table ${outputTableDef.name} at $infoDate has validation warnings: ${dependencyWarnings.map(_.table).mkString(", ")}.")
} else {
log.info(s"Job for table ${outputTableDef.name} at $infoDate has no validation failures.")
}
preRunCheckJob(infoDate, runReason, conf, dependencyWarnings)
}
}
override def createOrRefreshHiveTable(schema: StructType, infoDate: LocalDate, recreate: Boolean): Seq[String] = {
if (outputTableDef.hiveTable.isEmpty)
return Seq.empty
val hiveHelper = metastore.getHiveHelper(outputTableDef.name)
val attempt = Try {
metastore.repairOrCreateHiveTable(outputTableDef.name, infoDate, Option(schema), hiveHelper, recreate)
}
attempt match {
case Success(_) => Seq.empty
case Failure(ex) =>
if (outputTableDef.hiveConfig.ignoreFailures) {
val cause = if (ex.getCause != null) s" ${ex.getCause.getMessage}" else ""
val msg = s"Failed to create or update Hive table '${outputTableDef.hiveTable.get}': ${ex.getMessage}$cause"
log.error(s"$FAILURE $msg")
Seq(msg)
} else {
throw ex
}
}
}
protected def preRunTransformationCheck(infoDate: LocalDate, runReason: TaskRunReason, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = {
if (isIncremental || runReason == TaskRunReason.Rerun) {
JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
} else {
validateTransformationAlreadyRanCases(infoDate, dependencyWarnings) match {
case Some(result) => result
case None => JobPreRunResult(JobPreRunStatus.Ready, None, dependencyWarnings, Seq.empty[String])
}
}
}
protected def validateTransformationAlreadyRanCases(infoDate: LocalDate, dependencyWarnings: Seq[DependencyWarning]): Option[JobPreRunResult] = {
bookkeeper.getLatestDataChunk(outputTableDef.name, infoDate, infoDate) match {
case Some(chunk) =>
val outOfDateTables = getOutdatedTables(infoDate, chunk.jobFinished)
if (outOfDateTables.nonEmpty) {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate, but has outdated tables: ${outOfDateTables.mkString(", ")}")
val warning = s"Based on outdated tables: ${outOfDateTables.mkString(", ")}"
Some(JobPreRunResult(JobPreRunStatus.NeedsUpdate, None, dependencyWarnings, Seq(warning)))
} else {
log.info(s"Job for table ${outputTableDef.name} as already ran for $infoDate.")
Some(JobPreRunResult(JobPreRunStatus.AlreadyRan, None, dependencyWarnings, Seq.empty[String]))
}
case None =>
log.info(s"Job for table ${outputTableDef.name} has not yet ran $infoDate.")
None
}
}
private def getOutdatedTables(infoDate: LocalDate, targetJobFinishedSeconds: Long): Seq[String] = {
operationDef.dependencies
.filter(d => !d.isOptional && !d.isPassive)
.flatMap(_.tables)
.distinct
.filter { table =>
bookkeeper.getLatestDataChunk(table, infoDate, infoDate) match {
case Some(chunk) if chunk.jobFinished >= targetJobFinishedSeconds =>
log.warn(s"${Emoji.WARNING} The dependent table '$table' has been updated at ${Instant.ofEpochSecond(chunk.jobFinished)} retrospectively " +
s"after the transformation at ${Instant.ofEpochSecond(targetJobFinishedSeconds)} .")
true
case _ =>
false
}
}
}
protected def checkDependency(dep: MetastoreDependency, infoDate: LocalDate): Option[DependencyFailure] = {
val evaluator = new DateExprEvaluator
evaluator.setValue("infoDate", infoDate)
val dateFrom = evaluator.evalDate(dep.dateFromExpr)
val dateUntilOpt = dep.dateUntilExpr.map(dateUntilExpr => evaluator.evalDate(dateUntilExpr))
val q = '\"'
log.info(s"Given @infoDate = '$infoDate', $q${dep.dateFromExpr}$q => infoDate = '$dateFrom'")
dateUntilOpt.foreach(dateUntil => log.info(s"Given @infoDate = '$infoDate', $q${dep.dateUntilExpr.get}$q => infoDate = '$dateUntil'"))
val range = dateUntilOpt match {
case Some(dateUntil) => s"from '$dateFrom' to '$dateUntil''"
case None => s"from '$dateFrom'"
}
log.info(s"Validating @infoDate $range")
val failures = dep.tables.flatMap(table => {
val isAvailable = metastore.isDataAvailable(table, Option(dateFrom), dateUntilOpt)
if (!isAvailable) {
if (metastore.isDataAvailable(table, None, None)) {
log.warn(s"$WARNING No data found for '$table' $range.")
Some(Some(table), None)
} else {
log.warn(s"$STAR Empty input table (no bookkeeping information) for '$table'.")
Some(None, Some(table))
}
} else {
None
}
})
val failedTables = failures.flatMap(_._1)
val emptyTables = failures.flatMap(_._2)
val failedDateRanges = failedTables.map(_ => range)
if (failedTables.isEmpty && emptyTables.isEmpty) {
None
} else {
Some(DependencyFailure(dep, emptyTables, failedTables, failedDateRanges))
}
}
private[core] def hasSelfDependencies: Boolean = {
operationDef.dependencies.exists(_.tables.contains(outputTableDef.name))
}
private[core] def getInfoDateRange(infoDate: LocalDate, fromExpr: Option[String], toExpr: Option[String]): (LocalDate, LocalDate) = {
val evaluator = new DateExprEvaluator
evaluator.setValue("infoDate", infoDate)
evaluator.setValue("date", infoDate)
val fromDate = fromExpr.map(expr => {
evaluator.evalDate(expr)
})
val fromTo = toExpr.map(expr => {
evaluator.evalDate(expr)
})
val (effectiveFrom, effectiveTo) = (fromDate, fromTo) match {
case (None, None) => (infoDate, infoDate)
case (Some(from), None) => (from, infoDate)
case (None, Some(to)) => (infoDate, to)
case (Some(from), Some(to)) => (from, to)
}
if (effectiveTo.isBefore(effectiveFrom)) {
throw new IllegalArgumentException(s"Incorrect date range specified for ${outputTable.name}: from=$effectiveFrom > to=$effectiveTo.")
}
log.info(s"Input date range for ${outputTable.name}: from $effectiveFrom to $effectiveTo")
(effectiveFrom, effectiveTo)
}
private[core] def getTookTooLongWarnings(jobStarted: Instant, jobFinished: Instant, maxTimeSecondsOpt: Option[Int]): Seq[String] = {
val effectiveMaxTimeOpt = maxTimeSecondsOpt match {
case Some(maxTimeTable) =>
operationDef.warnMaxExecutionTimeSeconds match {
case Some(maxTimeOp) => Option(Math.min(maxTimeTable, maxTimeOp))
case None => maxTimeSecondsOpt
}
case None => operationDef.warnMaxExecutionTimeSeconds
}
effectiveMaxTimeOpt.flatMap { maxTimeSeconds =>
val actualTime = jobFinished.getEpochSecond - jobStarted.getEpochSecond
if (actualTime > maxTimeSeconds) {
val prettyActualTime = TimeUtils.prettyPrintElapsedTimeShort(actualTime * 1000)
val prettyMaxTime = TimeUtils.prettyPrintElapsedTimeShort(maxTimeSeconds * 1000)
Some(s"The job took longer ($prettyActualTime) than expected ($prettyMaxTime).")
} else {
None
}
}.toSeq
}
}
object JobBase {
val MINIMUM_RECORDS_KEY = "minimum.records"
val FAIL_NO_DATA_KEY = "fail.if.no.data"
val FAIL_NO_LATE_DATA_KEY = "fail.if.no.late.data"
val FAIL_NO_NEW_DATA_KEY = "fail.if.no.new.data"
val MINIMUM_RECORDS_DEFAULT = 1
}