Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#415 Add notification targets for ECS cleanup (to the extras package) #418

Merged
merged 10 commits into from
Jun 4, 2024
Merged
1 change: 1 addition & 0 deletions pramen/build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,7 @@ lazy val assemblySettingsCommon = Seq(
)

lazy val assemblySettingsExtras = assemblySettingsCommon ++ Seq(assembly / assemblyShadeRules:= Seq(
ShadeRule.rename(shade("org.apache.http")).inAll,
ShadeRule.zap("com.101tec.**").inAll,
ShadeRule.zap("buildinfo.**").inAll,
ShadeRule.zap("com.databricks.**").inAll,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object AppRunner {
jobs <- filterJobs(state, jobsOrig, appContext.appConfig.runtimeConfig)
_ <- runStartupHook(state, appContext.appConfig.hookConfig)
_ <- validateShutdownHook(state, appContext.appConfig.hookConfig)
_ <- initPipelineNotificationTargets(state)
_ <- validatePipeline(jobs, state, appContext, spark)
_ <- runPipeline(conf, jobs, state, appContext, taskRunner, spark)
_ <- shutdownTaskRunner(taskRunner, state)
Expand Down Expand Up @@ -117,6 +118,12 @@ object AppRunner {
}, state, "initialization of the task runner")
}

private[core] def initPipelineNotificationTargets(implicit state: PipelineState): Try[Unit] = {
handleFailure(Try {
state.asInstanceOf[PipelineStateImpl].initNotificationTargets()
}, state, "Initialization of piepline notification targets")
}

private[core] def getSparkSession(implicit conf: Config,
state: PipelineState): Try[SparkSession] = {
handleFailure(Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import java.io.FileNotFoundException
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal
import scala.util.matching.Regex

/**
Expand Down Expand Up @@ -189,10 +190,17 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] {
private[core] def getListOfFiles(pathPattern: String, caseSensitive: Boolean)(implicit spark: SparkSession): Seq[FileStatus] = {
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, pathPattern)
val hadoopPath = new Path(pathPattern)

val parentPath = new Path(pathPattern).getParent

if (!parentPath.isRoot && !fsUtils.exists(parentPath)) {
val pathExists = {
try {
parentPath.isRoot || fsUtils.exists(parentPath)
} catch {
case NonFatal(ex) => throw new IllegalArgumentException(s"Unable to access path: $parentPath", ex)
}
}

if (!pathExists) {
throw new FileNotFoundException(s"Input path does not exist: $parentPath")
}

Expand All @@ -205,7 +213,10 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] {
fsUtils.getHadoopFilesCaseInsensitive(hadoopPath, includeHiddenFiles = true)
}
} catch {
case ex: IllegalArgumentException if ex.getMessage.contains("Input path does not exist") => Seq.empty[FileStatus]
case ex: IllegalArgumentException if ex.getMessage.contains("Input path does not exist") =>
Seq.empty[FileStatus]
case NonFatal(ex) =>
throw new IllegalArgumentException(s"Unable to access path: $hadoopPath", ex)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
private val environmentName = conf.getString(ENVIRONMENT_NAME)
private val sendEmailIfNoNewData: Boolean = conf.getBoolean(EMAIL_IF_NO_CHANGES)
private val hookConfig = HookConfig.fromConfig(conf)
private val pipelineNotificationTargets = PipelineNotificationTargetFactory.fromConfig(conf)
private var pipelineNotificationTargets: Seq[PipelineNotificationTarget] = Seq.empty

// State
private val startedInstant = Instant.now
Expand All @@ -70,6 +70,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
setSignalHandler(new Signal("HUP"), "SIGHUP (network connection to the terminal has been lost)")
}

private[core] def initNotificationTargets(): Unit = {
pipelineNotificationTargets = PipelineNotificationTargetFactory.fromConfig(conf)
}

override def getState(): PipelineStateSnapshot = synchronized {
PipelineStateSnapshot(
isFinished,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.notification

import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.http.HttpStatus
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{DataFormat, MetaTableDef, NotificationTarget, TaskNotification}
import za.co.absa.pramen.core.utils.Emoji
import za.co.absa.pramen.extras.utils.ConfigUtils
import za.co.absa.pramen.extras.utils.httpclient.{HttpMethod, SimpleHttpClient, SimpleHttpRequest}

import java.time.LocalDate
import java.time.format.DateTimeFormatter

/**
* Runs the ECS cleanup API against the target partition after the job jas completed.
*/
class EcsNotificationTarget(conf: Config) extends NotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

override def config: Config = conf

override def sendNotification(notification: TaskNotification): Unit = {
if (notification.infoDate.isEmpty) {
log.warn(s"Information date not provided - skipping ECS cleanup.")
return
}

val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = EcsNotificationTarget.getEcsDetails(conf)
val tableDef = notification.tableDef
val httpClient = getHttpClient(trustAllSslCerts)

try {
EcsNotificationTarget.cleanUpS3VersionsForTable(tableDef, notification.infoDate.get, ecsApiUrl, ecsApiKey, httpClient)
} finally {
httpClient.close()
}
}

protected def getHttpClient(trustAllSslCerts: Boolean): SimpleHttpClient = {
EcsNotificationTarget.getHttpClient(trustAllSslCerts)
}
}

object EcsNotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

val ECS_API_URL_KEY = "ecs.api.url"
val ECS_API_SECRET_KEY = "ecs.api.key"
val ECS_API_TRUST_SSL_KEY = "ecs.api.trust.all.ssl.certificates"

val ECS_PREFIXES: Seq[String] = Seq("s3a://")

/**
* Cleans up a Pramen metatable via a special REST API call.
*/
def cleanUpS3VersionsForTable(tableDef: MetaTableDef,
infoDate: LocalDate,
apiUrl: String,
apiKey: String,
httpClient: SimpleHttpClient): Unit = {
log.info(s"Running the ECS cleanup notification target: $apiUrl, metatable=${tableDef.name}, format=${tableDef.format.name}, " +
s"Info date column=${tableDef.infoDateColumn}, Info date format=${tableDef.infoDateFormat}, Info date=$infoDate")

val formatter = DateTimeFormatter.ofPattern(tableDef.infoDateFormat)
val infoDateStr = formatter.format(infoDate)

tableDef.format match {
case DataFormat.Parquet(basePath, _) =>
log.info(s"Base path: $basePath")
val basePathLowerCase = basePath.toLowerCase
if (!ECS_PREFIXES.exists(prefix => basePathLowerCase.startsWith(prefix))) {
log.info(s"The base bath ($basePath) is not on S3. S3 versions cleanup won't be done.")
return
}

val partitionPath = new Path(basePath, s"${tableDef.infoDateColumn}=$infoDateStr")
log.info(s"Partition path: $partitionPath")

EcsNotificationTarget.cleanUpS3VersionsForPath(partitionPath, apiUrl, apiKey, httpClient)
case format =>
log.warn(s"Format ${format.name} is not supported. Skipping cleanup.")
}
}

/**
* Cleans up an ECS path via a special REST API call.
*/
def cleanUpS3VersionsForPath(partitionPath: Path,
apiUrl: String,
apiKey: String,
httpClient: SimpleHttpClient): Unit = {
val body = getCleanUpS3VersionsRequestBody(partitionPath)
log.info(s"Sending: $body")

val httpDelete = getCleanUpS3VersionsRequest(body, apiUrl, apiKey)

try {
val response = httpClient.execute(httpDelete)
val statusCode = response.statusCode
val responseBody = response.body.getOrElse("")

if (statusCode != HttpStatus.SC_OK) {
log.error(s"${Emoji.FAILURE} Failed to clean up S3 versions for $partitionPath. Response: $statusCode $responseBody")
} else {
log.info(s"${Emoji.SUCCESS} S3 versions cleanup for $partitionPath was successful. Response: $responseBody")
}
} catch {
case ex: Throwable =>
log.error(s"${Emoji.FAILURE} Unable to call the cleanup API via URL: $apiUrl.", ex)
}
}

/**
* Returns an instance of an HTTP client.
*
* Do not forget to close the client after use.
*
* @param trustAllSslCerts if true, the client will trust any SSL certificate.
* @return an Http Client
*/
def getHttpClient(trustAllSslCerts: Boolean): SimpleHttpClient = {
log.info(s"Trust all SSL certificates: $trustAllSslCerts")
SimpleHttpClient(trustAllSslCerts)
}

private[extras] def getEcsDetails(conf: Config): (String, String, Boolean) = {
require(conf.hasPath(ECS_API_URL_KEY), s"The key is not defined: '$ECS_API_URL_KEY'")
require(conf.hasPath(ECS_API_SECRET_KEY), s"The key is not defined: '$ECS_API_SECRET_KEY'")

val ecsApiUrl = conf.getString(ECS_API_URL_KEY)
val ecsApiKey = conf.getString(ECS_API_SECRET_KEY)
val trustAllSslCerts = ConfigUtils.getOptionBoolean(conf, ECS_API_TRUST_SSL_KEY).getOrElse(false)

(ecsApiUrl, ecsApiKey, trustAllSslCerts)
}

private[extras] def getCleanUpS3VersionsRequestBody(partitionPath: Path): String = {
val partitionPathWithoutAuthority = removeAuthority(partitionPath)
s"""{"ecs_path":"$partitionPathWithoutAuthority"}"""
}

private[extras] def getCleanUpS3VersionsRequest(requestBody: String, apiUrl: String, apiKey: String): SimpleHttpRequest = {
val effectiveUrl = if (apiUrl.endsWith("/kk")) {
apiUrl
} else {
s"$apiUrl/kk"
}

SimpleHttpRequest(
effectiveUrl,
HttpMethod.DELETE,
Map("x-api-key" -> apiKey),
Some(requestBody)
)
}

private[extras] def removeAuthority(path: Path): String = {
val uri = path.toUri
if (uri.getHost != null) {
s"${uri.getHost}${uri.getPath}"
} else {
s"${uri.getPath}"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.extras.notification

import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{DataFormat, PipelineNotificationTarget, TaskNotification, TaskStatus}
import za.co.absa.pramen.extras.utils.ConfigUtils
import za.co.absa.pramen.extras.utils.httpclient.SimpleHttpClient

import java.time.Instant

/**
* Runs the ECS cleanup API against the target partition after the job jas completed.
*
* Example usage:
* {{{
* pramen.ecs.api {
* url = "https://dummy.local"
* secret = "aabbcc"
* trust.all.ssl.certificates = false
* }
*
* pramen.pipeline.notification.targets = [ "za.co.absa.pramen.extras.notification.EcsPipelineNotificationTarget" ]
* }}}
*/
class EcsPipelineNotificationTarget(conf: Config) extends PipelineNotificationTarget {
private val log = LoggerFactory.getLogger(this.getClass)

override def config: Config = conf

/** Sends a notification after completion of the pipeline. */
override def sendNotification(pipelineStarted: Instant,
applicationId: Option[String],
appException: Option[Throwable],
tasksCompleted: Seq[TaskNotification]): Unit = {
log.info(s"Running the ECS cleanup pipeline notification target...")
val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = EcsPipelineNotificationTarget.getEcsDetails(conf)

val httpClient = getHttpClient(trustAllSslCerts)

try {
tasksCompleted.foreach { task =>
(task.infoDate, task.status) match {
case (Some(infoDate), _: TaskStatus.Succeeded) =>
if (!task.tableDef.format.isTransient &&
!task.tableDef.format.isInstanceOf[DataFormat.Null] &&
!task.tableDef.format.isInstanceOf[DataFormat.Raw]) {
EcsNotificationTarget.cleanUpS3VersionsForTable(task.tableDef, infoDate, ecsApiUrl, ecsApiKey, httpClient)
} else {
log.info(s"The task outputting to '${task.tableName}' for '$infoDate' outputs to ${task.tableDef.format.name} format - skipping ECS cleanup...")
}
case (Some(infoDate), _) =>
log.info(s"The task outputting to '${task.tableName}' for '$infoDate' status is not a success - skipping ECS cleanup...")
case (None, status) =>
log.info(s"The task outputting to '${task.tableName}' status is not a success - skipping ECS cleanup...")
}
}
} finally {
httpClient.close()
}
}

protected def getHttpClient(trustAllSslCerts: Boolean): SimpleHttpClient = {
EcsNotificationTarget.getHttpClient(trustAllSslCerts)
}
}

object EcsPipelineNotificationTarget {
val ECS_API_URL_KEY = "pramen.ecs.api.url"
val ECS_API_SECRET_KEY = "pramen.ecs.api.secret"
val ECS_API_TRUST_SSL_KEY = "pramen.ecs.api.trust.all.ssl.certificates"

private[extras] def getEcsDetails(conf: Config): (String, String, Boolean) = {
require(conf.hasPath(ECS_API_URL_KEY), s"The key is not defined: '$ECS_API_URL_KEY'")
require(conf.hasPath(ECS_API_SECRET_KEY), s"The key is not defined: '$ECS_API_SECRET_KEY'")

val ecsApiUrl = conf.getString(ECS_API_URL_KEY)
val ecsApiKey = conf.getString(ECS_API_SECRET_KEY)
val trustAllSslCerts = ConfigUtils.getOptionBoolean(conf, ECS_API_TRUST_SSL_KEY).getOrElse(false)

(ecsApiUrl, ecsApiKey, trustAllSslCerts)
}

}
Loading
Loading