Skip to content

Commit

Permalink
#423 Add pipeline info to a special API case class, and use it in all…
Browse files Browse the repository at this point in the history
… notification targets.
  • Loading branch information
yruslan committed Jun 17, 2024
1 parent 48f0ab2 commit e7446d8
Show file tree
Hide file tree
Showing 20 changed files with 110 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,5 +18,5 @@ package za.co.absa.pramen.api

trait NotificationTarget extends ExternalChannel {
/** Sends a notification after completion of a task. */
def sendNotification(notification: TaskNotification): Unit
def sendNotification(pipelineInfo: PipelineInfo, notification: TaskNotification): Unit
}
27 changes: 27 additions & 0 deletions pramen/api/src/main/scala/za/co/absa/pramen/api/PipelineInfo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.api

import java.time.Instant

case class PipelineInfo(
pipelineName: String,
environment: String,
startedAt: Instant,
sparkApplicationId: Option[String],
failureException: Option[Throwable]
)
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,8 @@

package za.co.absa.pramen.api

import java.time.Instant

trait PipelineNotificationTarget extends ExternalChannel {
/** Sends a notification after completion of the pipeline. */
def sendNotification(pipelineStarted: Instant,
applicationId: Option[String],
appException: Option[Throwable],
def sendNotification(pipelineInfo: PipelineInfo,
tasksCompleted: Seq[TaskNotification]): Unit
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.notify
import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, TaskNotification, TaskStatus}
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, PipelineInfo, TaskNotification, TaskStatus}
import za.co.absa.pramen.core.notify.mq.{SingleMessageProducer, SingleMessageProducerKafka}
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.Emoji._
Expand All @@ -38,7 +38,7 @@ class HyperdriveNotificationTarget(conf: Config,
// conditions are met to send a notification.
}

override def sendNotification(notification: TaskNotification): Unit = {
override def sendNotification(pipelineInfo: PipelineInfo, notification: TaskNotification): Unit = {
if (notification.options.contains(TOKEN_KEY)) {
val token = notification.options(TOKEN_KEY)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.lit
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{DataFormat, Reason, SchemaDifference, TaskNotification}
import za.co.absa.pramen.api.{DataFormat, PipelineInfo, Reason, SchemaDifference, TaskNotification}
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ReasonException}
Expand All @@ -32,7 +32,7 @@ import za.co.absa.pramen.core.metastore.{MetaTableStats, MetastoreImpl}
import za.co.absa.pramen.core.notify.NotificationTargetManager
import za.co.absa.pramen.core.pipeline.JobPreRunStatus._
import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.state.{PipelineState, PipelineStateSnapshot}
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.hive.HiveHelper
Expand Down Expand Up @@ -383,10 +383,11 @@ abstract class TaskRunnerBase(conf: Config,
}

private def sendNotifications(task: Task, result: TaskResult): Seq[NotificationFailure] = {
task.job.notificationTargets.flatMap(notificationTarget => sendNotifications(task, result, notificationTarget))
val pipelineInfo = pipelineState.getState().pipelineInfo
task.job.notificationTargets.flatMap(notificationTarget => sendNotifications(task, result, notificationTarget, pipelineInfo))
}

private def sendNotifications(task: Task, result: TaskResult, notificationTarget: JobNotificationTarget): Option[NotificationFailure] = {
private def sendNotifications(task: Task, result: TaskResult, notificationTarget: JobNotificationTarget, pipelineInfo: PipelineInfo): Option[NotificationFailure] = {
Try {
val target = notificationTarget.target

Expand All @@ -408,7 +409,7 @@ abstract class TaskRunnerBase(conf: Config,

target.connect()
try {
target.sendNotification(notification)
target.sendNotification(pipelineInfo, notification)
} finally {
target.close()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.state
import com.typesafe.config.Config
import org.slf4j.{Logger, LoggerFactory}
import sun.misc.Signal
import za.co.absa.pramen.api.{NotificationBuilder, PipelineNotificationTarget, TaskNotification}
import za.co.absa.pramen.api.{NotificationBuilder, PipelineInfo, PipelineNotificationTarget, TaskNotification}
import za.co.absa.pramen.core.app.config.HookConfig
import za.co.absa.pramen.core.app.config.RuntimeConfig.EMAIL_IF_NO_CHANGES
import za.co.absa.pramen.core.metastore.MetastoreImpl
Expand Down Expand Up @@ -81,14 +81,17 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
failureException

PipelineStateSnapshot(
pipelineName,
environmentName,
sparkAppId,
PipelineInfo(
pipelineName,
environmentName,
startedInstant,
sparkAppId,
appException
),
isFinished,
exitedNormally,
exitCode,
customShutdownHookCanRun,
appException,
taskResults.toList,
pipelineNotificationFailures.toList
)
Expand Down Expand Up @@ -193,16 +196,15 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification

private[state] def sendPipelineNotifications(): Unit = {
val taskNotifications = taskResults.flatMap(taskResultToTaskNotification).toSeq
val pipelineInfo = getState().pipelineInfo

pipelineNotificationTargets.foreach(notificationTarget => sendCustomNotification(notificationTarget, taskNotifications))
pipelineNotificationTargets.foreach(notificationTarget => sendCustomNotification(notificationTarget, pipelineInfo, taskNotifications))
}

private[state] def sendCustomNotification(pipelineNotificationTarget: PipelineNotificationTarget, taskNotifications: Seq[TaskNotification]): Unit = {
private[state] def sendCustomNotification(pipelineNotificationTarget: PipelineNotificationTarget, pipelineInfo: PipelineInfo, taskNotifications: Seq[TaskNotification]): Unit = {
try {
pipelineNotificationTarget.sendNotification(
startedInstant,
sparkAppId,
failureException,
pipelineInfo,
taskNotifications
)
} catch {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,15 @@

package za.co.absa.pramen.core.state

import za.co.absa.pramen.api.PipelineInfo
import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult}

case class PipelineStateSnapshot(
pipelineName: String,
environmentName: String,
sparkAppId: Option[String],
pipelineInfo: PipelineInfo,
isFinished: Boolean,
exitedNormally: Boolean,
exitCode: Int,
customShutdownHookCanRun: Boolean,
failureException: Option[Throwable] = None,
taskResults: Seq[TaskResult],
pipelineNotificationFailures: Seq[PipelineNotificationFailure]
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.mocks

import za.co.absa.pramen.api.PipelineInfo

import java.time.Instant

object PipelineInfoFactory {
def getDummyPipelineInfo(pipelineName: String = "Dummy Pipeline",
environment: String = "DEV",
startedAt: Instant = Instant.ofEpochSecond(1718609409),
sparkApplicationId: Option[String] = Some("testid-12345"),
failureException: Option[Throwable] = None): PipelineInfo = {
PipelineInfo(pipelineName, environment, startedAt, sparkApplicationId, failureException)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,23 @@

package za.co.absa.pramen.core.mocks

import za.co.absa.pramen.api.PipelineInfo
import za.co.absa.pramen.core.runner.task.{PipelineNotificationFailure, TaskResult}
import za.co.absa.pramen.core.state.PipelineStateSnapshot

object PipelineStateSnapshotFactory {
def getDummyPipelineStateSnapshot(pipelineName: String = "TestApp",
environmentName: String = "DEV",
sparkAppId: Option[String] = Some("testid-12345"),
def getDummyPipelineStateSnapshot(pipelineInfo: PipelineInfo = PipelineInfoFactory.getDummyPipelineInfo(),
isFinished: Boolean = false,
exitedNormally: Boolean = false,
exitCode: Int = 0,
customShutdownHookCanRun: Boolean = false,
failureException: Option[Throwable] = None,
taskResults: Seq[TaskResult] = Seq.empty,
pipelineNotificationFailures: Seq[PipelineNotificationFailure] = Seq.empty): PipelineStateSnapshot = {
PipelineStateSnapshot(pipelineName,
environmentName,
sparkAppId,
PipelineStateSnapshot(pipelineInfo,
isFinished,
exitedNormally,
exitCode,
customShutdownHookCanRun,
failureException,
taskResults,
pipelineNotificationFailures
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks.notify

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, TaskNotification}
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, PipelineInfo, TaskNotification}
import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY

import scala.collection.mutable.ListBuffer
Expand All @@ -28,7 +28,7 @@ class NotificationTargetMock(conf: Config) extends NotificationTarget {

override def config: Config = conf

override def sendNotification(notification: TaskNotification): Unit = {
override def sendNotification(pipelineInfo: PipelineInfo, notification: TaskNotification): Unit = {
if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
System.setProperty("pramen.test.notification.target.failure", "true")
throw new RuntimeException("Notification target test exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks.notify

import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, TaskNotification}
import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, PipelineInfo, TaskNotification}

import scala.collection.mutable.ListBuffer

Expand All @@ -27,7 +27,7 @@ class NotificationTargetSpy(conf: Config, action: TaskNotification => Unit) exte

override def config: Config = conf

override def sendNotification(notification: TaskNotification): Unit = {
override def sendNotification(pipelineInfo: PipelineInfo, notification: TaskNotification): Unit = {
notificationsSent += notification
action(notification)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@
package za.co.absa.pramen.core.mocks.notify

import com.typesafe.config.Config
import za.co.absa.pramen.api.{PipelineNotificationTarget, TaskNotification}
import za.co.absa.pramen.api.{PipelineInfo, PipelineNotificationTarget, TaskNotification}
import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY

import java.time.Instant

class PipelineNotificationTargetMock(conf: Config) extends PipelineNotificationTarget {

override def sendNotification(pipelineStarted: Instant, applicationId: Option[String], appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = {
override def sendNotification(pipelineInfo: PipelineInfo, tasksCompleted: Seq[TaskNotification]): Unit = {
if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) {
System.setProperty("pramen.test.notification.pipeline.failure", "true")
throw new RuntimeException("Pipeline notification target test exception")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.mocks.state

import za.co.absa.pramen.core.mocks.PipelineStateSnapshotFactory
import za.co.absa.pramen.core.mocks.{PipelineInfoFactory, PipelineStateSnapshotFactory}
import za.co.absa.pramen.core.runner.task.TaskResult
import za.co.absa.pramen.core.state.{PipelineState, PipelineStateSnapshot}

Expand All @@ -33,8 +33,7 @@ class PipelineStateSpy extends PipelineState {
var sparkAppId: Option[String] = None

override def getState(): PipelineStateSnapshot = {
PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(
sparkAppId = sparkAppId,
PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(PipelineInfoFactory.getDummyPipelineInfo(sparkApplicationId = sparkAppId),
customShutdownHookCanRun = setShutdownHookCanRunCount > 0,
taskResults = completedStatuses.toList
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class PipelineStateSuite extends AnyWordSpec {

assert(stateManager.getState().isFinished)
assert(stateManager.getState().exitedNormally)
assert(stateManager.getState().failureException.isEmpty)
assert(stateManager.getState().pipelineInfo.failureException.isEmpty)
}
}

Expand All @@ -72,7 +72,7 @@ class PipelineStateSuite extends AnyWordSpec {

assert(stateManager.getState().isFinished)
assert(!stateManager.getState().exitedNormally)
assert(stateManager.getState().failureException.exists(_.isInstanceOf[RuntimeException]))
assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[RuntimeException]))
}
}

Expand Down Expand Up @@ -127,7 +127,7 @@ class PipelineStateSuite extends AnyWordSpec {
stateManager.runCustomShutdownHook()

assert(ShutdownHookFailureMock.ranTimes > 0)
assert(stateManager.getState().failureException.exists(_.isInstanceOf[LinkageError]))
assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[LinkageError]))
}

"handle class does not exists errors" in {
Expand All @@ -137,8 +137,8 @@ class PipelineStateSuite extends AnyWordSpec {
stateManager.runCustomShutdownHook()

assert(ShutdownHookFailureMock.ranTimes > 0)
assert(stateManager.getState().failureException.isDefined)
assert(stateManager.getState().failureException.exists(_.isInstanceOf[ClassNotFoundException]))
assert(stateManager.getState().pipelineInfo.failureException.isDefined)
assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[ClassNotFoundException]))

}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class HyperdriveNotificationTargetSuite extends AnyWordSpec with SparkTestBase {

val taskNotification = TaskNotificationFactory.getDummyTaskNotification(options = options)

notificationTarget.sendNotification(taskNotification)
notificationTarget.sendNotification(null, taskNotification)

assert(producer.connectInvoked == 1)
assert(producer.sendInvoked == 1)
Expand All @@ -83,7 +83,7 @@ class HyperdriveNotificationTargetSuite extends AnyWordSpec with SparkTestBase {

val taskNotification = TaskNotificationFactory.getDummyTaskNotification(status = TaskStatus.Skipped("dummy"), options = options)

notificationTarget.sendNotification(taskNotification)
notificationTarget.sendNotification(null, taskNotification)

assert(producer.connectInvoked == 0)
assert(producer.sendInvoked == 0)
Expand All @@ -94,7 +94,7 @@ class HyperdriveNotificationTargetSuite extends AnyWordSpec with SparkTestBase {

val taskNotification = TaskNotificationFactory.getDummyTaskNotification()

notificationTarget.sendNotification(taskNotification)
notificationTarget.sendNotification(null, taskNotification)

assert(producer.connectInvoked == 0)
assert(producer.sendInvoked == 0)
Expand All @@ -108,7 +108,7 @@ class HyperdriveNotificationTargetSuite extends AnyWordSpec with SparkTestBase {

val taskNotification = TaskNotificationFactory.getDummyTaskNotification(options = options)

notificationTarget.sendNotification(taskNotification)
notificationTarget.sendNotification(null, taskNotification)
notificationTarget.close()

assert(producer.connectInvoked == 1)
Expand Down
Loading

0 comments on commit e7446d8

Please sign in to comment.