Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#495 Add operation definitions to the task results object used in notifications. #514

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import java.time.{DayOfWeek, LocalDate}

sealed trait Schedule {
def isEnabled(day: LocalDate): Boolean
}

object Schedule {
case object Incremental extends Schedule {
def isEnabled(day: LocalDate): Boolean = true

override def toString: String = "incremental"
}

case class EveryDay() extends Schedule {
def isEnabled(day: LocalDate): Boolean = true

override def toString: String = "daily"
}

case class Weekly(days: Seq[DayOfWeek]) extends Schedule {
def isEnabled(day: LocalDate): Boolean = days.contains(day.getDayOfWeek)

override def toString: String = s"weekly (${days.mkString(", ")})"
}

case class Monthly(days: Seq[Int]) extends Schedule {
val hasPositiveDays: Boolean = days.exists(_ > 0)
val hasNegativeDays: Boolean = days.exists(_ < 0)

def isEnabled(day: LocalDate): Boolean = {
val isInPositives = hasPositiveDays && days.contains(day.getDayOfMonth)
val isInNegatives = hasNegativeDays && days.contains(-day.lengthOfMonth() + day.getDayOfMonth - 1)
isInPositives || isInNegatives
}

override def toString: String = s"monthly (${days.mkString(", ")})"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import com.typesafe.config.Config

case class SinkTable(
metaTableName: String,
outputTableName: Option[String],
conf: Config,
rangeFromExpr: Option[String],
rangeToExpr: Option[String],
warnMaxExecutionTimeSeconds: Option[Int],
transformations: Seq[TransformExpression],
filters: Seq[String],
columns: Seq[String],
options: Map[String, String],
overrideConf: Option[Config]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import com.typesafe.config.Config
import za.co.absa.pramen.api.Query

case class SourceTable(
metaTableName: String,
query: Query,
conf: Config,
rangeFromExpr: Option[String],
rangeToExpr: Option[String],
warnMaxExecutionTimeSeconds: Option[Int],
transformations: Seq[TransformExpression],
filters: Seq[String],
columns: Seq[String],
overrideConf: Option[Config]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

import com.typesafe.config.Config
import za.co.absa.pramen.api.{DataFormat, Query}

import java.time.LocalDate

case class TransferTable(
query: Query,
jobMetaTableName: String,
conf: Config,
rangeFromExpr: Option[String],
rangeToExpr: Option[String],
infoDateStart: LocalDate,
trackDays: Int,
trackDaysExplicitlySet: Boolean,
warnMaxExecutionTimeSeconds: Option[Int],
transformations: Seq[TransformExpression],
filters: Seq[String],
columns: Seq[String],
readOptions: Map[String, String],
writeOptions: Map[String, String],
sparkConfig: Map[String, String],
sourceOverrideConf: Option[Config],
sinkOverrideConf: Option[Config]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.jobdef

case class TransformExpression(
column: String,
expression: Option[String],
comment: Option[String]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.status

import com.typesafe.config.Config
import za.co.absa.pramen.api.jobdef.{SinkTable, SourceTable, TransferTable}

sealed trait JobType {
def name: String
}

object JobType {
case class Ingestion(
sourceName: String,
sourceTable: SourceTable,
sourceConfig: Config
) extends JobType {
override def name: String = "ingestion"
}

case class Transformation(
factoryClass: String
) extends JobType {
override def name: String = "transformation"
}

case class PythonTransformation(
pythonClass: String
) extends JobType {
override def name: String = "python_transformation"
}

case class Sink(
sinkName: String,
sinkTable: SinkTable,
sinkConfig: Config
) extends JobType {
override def name: String = "sink"
}

case class Transfer(
sourceName: String,
sourceConfig: Config,
sinkName: String,
sinkConfig: Config,
transferTable: TransferTable
) extends JobType {
override def name: String = "transfer"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api.status

import com.typesafe.config.Config
import za.co.absa.pramen.api.MetaTableDef
import za.co.absa.pramen.api.jobdef.Schedule

case class TaskDef(
name: String,
jobType: JobType,
outputTable: MetaTableDef,
schedule: Schedule,
operationConf: Config
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@

package za.co.absa.pramen.api.status

import za.co.absa.pramen.api.{MetaTableDef, SchemaDifference}
import za.co.absa.pramen.api.SchemaDifference

case class TaskResult(
jobName: String,
outputTable: MetaTableDef,
taskDef: TaskDef,
runStatus: RunStatus,
runInfo: Option[RunInfo],
applicationId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class HyperdriveNotificationTarget(conf: Config,
log.info(s"Not sending '$token' to the Hyperdrive Kafka topic: '$topic' for the unsuccessful job...")
}
} else {
log.warn(s"$WARNING Token is not configured for ${notification.outputTable.name}. Hyperdrive notification won't be sent. Please, set 'notification.$TOKEN_KEY' option for the job.")
log.warn(s"$WARNING Token is not configured for ${notification.taskDef.outputTable.name}. Hyperdrive notification won't be sent. Please, set 'notification.$TOKEN_KEY' option for the job.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,9 +295,9 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
private[core] def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = {
val paragraphBuilder = ParagraphBuilder()
.withText("Job ", Style.Exception)
.withText(taskResult.jobName, Style.Error)
.withText(taskResult.taskDef.name, Style.Error)
.withText(" outputting to ", Style.Exception)
.withText(taskResult.outputTable.name, Style.Error)
.withText(taskResult.taskDef.outputTable.name, Style.Error)

taskResult.runInfo.foreach(info =>
paragraphBuilder
Expand Down Expand Up @@ -404,8 +404,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
tasks.foreach(task => {
val row = new ListBuffer[TextElement]

row.append(TextElement(task.jobName, getTransientTextStyle(task)))
row.append(TextElement(task.outputTable.name, getTransientTextStyle(task)))
row.append(TextElement(task.taskDef.name, getTransientTextStyle(task)))
row.append(TextElement(task.taskDef.outputTable.name, getTransientTextStyle(task)))

if (haveHiveColumn) {
val hiveTable = task.runStatus match {
Expand Down Expand Up @@ -490,7 +490,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

val tableHeaders = new ListBuffer[TableHeader]

val taskName = s"Files sourced - ${task.outputTable.name} - ${task.runInfo.map(_.infoDate.toString).getOrElse(" ")}"
val taskName = s"Files sourced - ${task.taskDef.outputTable.name} - ${task.runInfo.map(_.infoDate.toString).getOrElse(" ")}"

tableHeaders.append(TableHeader(TextElement(taskName), Align.Left))
tableBuilder.withHeaders(tableHeaders.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ import com.typesafe.config.Config
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
import za.co.absa.pramen.api.sql.SqlGeneratorBase
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason}
import za.co.absa.pramen.api.{Reason, Source}
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ package za.co.absa.pramen.core.pipeline
import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.jobdef.SourceTable
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult}
import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
Expand Down Expand Up @@ -48,6 +49,8 @@ class IngestionJob(operationDef: OperationDef,
extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) {
import JobBase._

override val jobType: JobType = JobType.Ingestion(sourceName, sourceTable, source.config)

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing

override def trackDays: Int = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType
import za.co.absa.pramen.api.Reason
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason}
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategy

Expand All @@ -35,6 +35,8 @@ trait Job {

val scheduleStrategy: ScheduleStrategy

def taskDef: TaskDef

def allowRunningTasksInParallel: Boolean

def notificationTargets: Seq[JobNotificationTarget]
Expand Down
Loading
Loading