Skip to content

Commit

Permalink
#495 Add operation definitions to the task results object used in not…
Browse files Browse the repository at this point in the history
…ifications.
  • Loading branch information
yruslan committed Nov 7, 2024
1 parent 3687457 commit 910c93d
Show file tree
Hide file tree
Showing 74 changed files with 686 additions and 291 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import java.time.{DayOfWeek, LocalDate}

sealed trait Schedule {
def isEnabled(day: LocalDate): Boolean
}

object Schedule {
case object Incremental extends Schedule {
def isEnabled(day: LocalDate): Boolean = true

override def toString: String = "incremental"
}

case class EveryDay() extends Schedule {
def isEnabled(day: LocalDate): Boolean = true

override def toString: String = "daily"
}

case class Weekly(days: Seq[DayOfWeek]) extends Schedule {
def isEnabled(day: LocalDate): Boolean = days.contains(day.getDayOfWeek)

override def toString: String = s"weekly (${days.mkString(", ")})"
}

case class Monthly(days: Seq[Int]) extends Schedule {
val hasPositiveDays: Boolean = days.exists(_ > 0)
val hasNegativeDays: Boolean = days.exists(_ < 0)

def isEnabled(day: LocalDate): Boolean = {
val isInPositives = hasPositiveDays && days.contains(day.getDayOfMonth)
val isInNegatives = hasNegativeDays && days.contains(-day.lengthOfMonth() + day.getDayOfMonth - 1)
isInPositives || isInNegatives
}

override def toString: String = s"monthly (${days.mkString(", ")})"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import com.typesafe.config.Config

case class SinkTable(
metaTableName: String,
outputTableName: Option[String],
conf: Config,
rangeFromExpr: Option[String],
rangeToExpr: Option[String],
warnMaxExecutionTimeSeconds: Option[Int],
transformations: Seq[TransformExpression],
filters: Seq[String],
columns: Seq[String],
options: Map[String, String],
overrideConf: Option[Config]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import com.typesafe.config.Config
import za.co.absa.pramen.api.Query

case class SourceTable(
metaTableName: String,
query: Query,
conf: Config,
rangeFromExpr: Option[String],
rangeToExpr: Option[String],
warnMaxExecutionTimeSeconds: Option[Int],
transformations: Seq[TransformExpression],
filters: Seq[String],
columns: Seq[String],
overrideConf: Option[Config]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import com.typesafe.config.Config
import za.co.absa.pramen.api.{DataFormat, Query}

import java.time.LocalDate

case class TransferTable(
query: Query,
jobMetaTableName: String,
conf: Config,
rangeFromExpr: Option[String],
rangeToExpr: Option[String],
infoDateStart: LocalDate,
trackDays: Int,
trackDaysExplicitlySet: Boolean,
warnMaxExecutionTimeSeconds: Option[Int],
transformations: Seq[TransformExpression],
filters: Seq[String],
columns: Seq[String],
readOptions: Map[String, String],
writeOptions: Map[String, String],
sparkConfig: Map[String, String],
sourceOverrideConf: Option[Config],
sinkOverrideConf: Option[Config]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

case class TransformExpression(
column: String,
expression: Option[String],
comment: Option[String]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.status

import com.typesafe.config.Config
import za.co.absa.pramen.api.jobdef.{SinkTable, SourceTable, TransferTable}

sealed trait JobType {
def name: String
}

object JobType {
case class Ingestion(
sourceName: String,
sourceTable: SourceTable,
sourceConfig: Config
) extends JobType {
override def name: String = "ingestion"
}

case class Transformation(
factoryClass: String
) extends JobType {
override def name: String = "transformation"
}

case class PythonTransformation(
pythonClass: String
) extends JobType {
override def name: String = "python_transformation"
}

case class Sink(
sinkName: String,
sinkTable: SinkTable,
sinkConfig: Config
) extends JobType {
override def name: String = "sink"
}

case class Transfer(
sourceName: String,
sourceConfig: Config,
sinkName: String,
sinkConfig: Config,
transferTable: TransferTable
) extends JobType {
override def name: String = "transfer"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.status

import com.typesafe.config.Config
import za.co.absa.pramen.api.MetaTableDef
import za.co.absa.pramen.api.jobdef.Schedule

case class TaskDef(
name: String,
jobType: JobType,
outputTable: MetaTableDef,
schedule: Schedule,
operationConf: Config
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package za.co.absa.pramen.api.status

import za.co.absa.pramen.api.{MetaTableDef, SchemaDifference}
import za.co.absa.pramen.api.SchemaDifference

case class TaskResult(
jobName: String,
outputTable: MetaTableDef,
taskDef: TaskDef,
runStatus: RunStatus,
runInfo: Option[RunInfo],
applicationId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class HyperdriveNotificationTarget(conf: Config,
log.info(s"Not sending '$token' to the Hyperdrive Kafka topic: '$topic' for the unsuccessful job...")
}
} else {
log.warn(s"$WARNING Token is not configured for ${notification.outputTable.name}. Hyperdrive notification won't be sent. Please, set 'notification.$TOKEN_KEY' option for the job.")
log.warn(s"$WARNING Token is not configured for ${notification.taskDef.outputTable.name}. Hyperdrive notification won't be sent. Please, set 'notification.$TOKEN_KEY' option for the job.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
private[core] def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = {
val paragraphBuilder = ParagraphBuilder()
.withText("Job ", Style.Exception)
.withText(taskResult.jobName, Style.Error)
.withText(taskResult.taskDef.name, Style.Error)
.withText(" outputting to ", Style.Exception)
.withText(taskResult.outputTable.name, Style.Error)
.withText(taskResult.taskDef.outputTable.name, Style.Error)

taskResult.runInfo.foreach(info =>
paragraphBuilder
Expand Down Expand Up @@ -404,8 +404,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
tasks.foreach(task => {
val row = new ListBuffer[TextElement]

row.append(TextElement(task.jobName, getTransientTextStyle(task)))
row.append(TextElement(task.outputTable.name, getTransientTextStyle(task)))
row.append(TextElement(task.taskDef.name, getTransientTextStyle(task)))
row.append(TextElement(task.taskDef.outputTable.name, getTransientTextStyle(task)))

if (haveHiveColumn) {
val hiveTable = task.runStatus match {
Expand Down Expand Up @@ -490,7 +490,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

val tableHeaders = new ListBuffer[TableHeader]

val taskName = s"Files sourced - ${task.outputTable.name} - ${task.runInfo.map(_.infoDate.toString).getOrElse(" ")}"
val taskName = s"Files sourced - ${task.taskDef.outputTable.name} - ${task.runInfo.map(_.infoDate.toString).getOrElse(" ")}"

tableHeaders.append(TableHeader(TextElement(taskName), Align.Left))
tableBuilder.withHeaders(tableHeaders.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import com.typesafe.config.Config
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Source}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package za.co.absa.pramen.core.pipeline
import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult}
import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
Expand Down Expand Up @@ -48,6 +49,8 @@ class IngestionJob(operationDef: OperationDef,
extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) {
import JobBase._

override val jobType: JobType = JobType.Ingestion(sourceName, sourceTable, source.config)

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing

override def trackDays: Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import za.co.absa.pramen.api.Reason
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason}
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategy

Expand All @@ -35,6 +35,8 @@ trait Job {

val scheduleStrategy: ScheduleStrategy

def taskDef: TaskDef

def allowRunningTasksInParallel: Boolean

def notificationTargets: Seq[JobNotificationTarget]
Expand Down
Loading

0 comments on commit 910c93d

Please sign in to comment.