Skip to content

Commit

Permalink
Add in additional debug logs for validation wait conditions
Browse files Browse the repository at this point in the history
  • Loading branch information
pflooky committed Jun 24, 2024
1 parent 7c20ead commit 7278c0e
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import java.time.LocalDateTime
import scala.util.{Failure, Success, Try}

/*
Given a list of validations, check and report on the success and failure of each
Flag to enable
Validations can occur on any data source defined in application config
Validations will only occur on datasets not on the response from the data source (i.e. no HTTP status code validations)
Defined at plan level what validations are run post data generation
Validations lie within separate files
Validations have a wait condition. Wait for: webhook, pause, file exists, data exists
Different types of validations:
- simple column validations (amount < 100)
- aggregates (sum of amount per account is > 500)
- ordering (transactions are ordered by date)
- relationship (one account entry in history table per account in accounts table)
- data profile (how close the generated data profile is compared to the expected data profile)
/**
* Given a list of validations, check and report on the success and failure of each
* Flag to enable
* Validations can occur on any data source defined in application config
* Validations will only occur on datasets not on the response from the data source (i.e. no HTTP status code validations)
* Defined at plan level what validations are run post data generation
* Validations lie within separate files
* Validations have a wait condition. Wait for: webhook, pause, file exists, data exists
* Different types of validations:
* - simple column validations (amount < 100)
* - aggregates (sum of amount per account is > 500)
* - ordering (transactions are ordered by date)
* - relationship (one account entry in history table per account in accounts table)
* - data profile (how close the generated data profile is compared to the expected data profile)
*/
class ValidationProcessor(
connectionConfigsByName: Map[String, Map[String, String]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package io.github.datacatering.datacaterer.core.validator
import io.github.datacatering.datacaterer.api.model.Constants.FORMAT
import io.github.datacatering.datacaterer.api.model.{DataExistsWaitCondition, FileExistsWaitCondition, PauseWaitCondition, WaitCondition, WebhookWaitCondition}
import io.github.datacatering.datacaterer.core.exception.InvalidWaitConditionException
import io.github.datacatering.datacaterer.core.util.ConfigUtil
import io.github.datacatering.datacaterer.core.util.HttpUtil.getAuthHeader
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.log4j.Logger
Expand All @@ -14,48 +15,57 @@ import scala.util.{Failure, Success, Try}

object ValidationWaitImplicits {
implicit class WaitConditionOps(waitCondition: WaitCondition = PauseWaitCondition()) {
def checkCondition(implicit sparkSession: SparkSession): Boolean = true
private val LOGGER = Logger.getLogger(getClass.getName)

def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = true

def waitForCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Unit = {
if (waitCondition.isRetryable) {
var retries = 0
while (retries < waitCondition.maxRetries) {
val isDataAvailable = waitCondition match {
case DataExistsWaitCondition(_, _, _) | WebhookWaitCondition(_, _, _, _) => this.checkCondition(connectionConfigByName)
case FileExistsWaitCondition(_) => this.checkCondition
case x => throw new InvalidWaitConditionException(x.getClass.getName)
}
if (!isDataAvailable) {
if (!checkCondition(connectionConfigByName)) {
LOGGER.debug(s"Wait condition failed, pausing before retrying, pause-before-retry-seconds=${waitCondition.waitBeforeRetrySeconds}, " +
s"num-retries=$retries, max-retries=${waitCondition.maxRetries}")
Thread.sleep(waitCondition.waitBeforeRetrySeconds * 1000)
retries += 1
} else {
return
}
}
LOGGER.warn(s"Max retries has been reached for validation wait condition, continuing to try validation, " +
s"max-retries=${waitCondition.maxRetries}")
} else {
this.checkCondition
checkCondition(connectionConfigByName)
}
}
}

implicit class PauseWaitConditionOps(pauseWaitCondition: PauseWaitCondition) extends WaitConditionOps(pauseWaitCondition) {
override def checkCondition(implicit sparkSession: SparkSession): Boolean = {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Pausing execution before starting validation, pause-in-seconds=${pauseWaitCondition.pauseInSeconds}")
Thread.sleep(pauseWaitCondition.pauseInSeconds * 1000)
true
}
}

implicit class FileExistsWaitConditionOps(fileExistsWaitCondition: FileExistsWaitCondition) extends WaitConditionOps(fileExistsWaitCondition) {
override def checkCondition(implicit sparkSession: SparkSession): Boolean = {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Checking if file exists before running validations, file-path=${fileExistsWaitCondition.path}")
val fs = FileSystem.get(sparkSession.sparkContext.hadoopConfiguration)
fs.exists(new Path(fileExistsWaitCondition.path))
}
}

implicit class DataExistsWaitConditionOps(dataExistsWaitCondition: DataExistsWaitCondition) extends WaitConditionOps(dataExistsWaitCondition) {
private val LOGGER = Logger.getLogger(getClass.getName)

override def checkCondition(connectionConfigByName: Map[String, Map[String, String]])(implicit sparkSession: SparkSession): Boolean = {
LOGGER.debug(s"Checking if data exists before running validations, data-source-name=${dataExistsWaitCondition.dataSourceName}," +
s"data-source-options=${ConfigUtil.cleanseOptions(dataExistsWaitCondition.options)}, expression=${dataExistsWaitCondition.expr}")
val connectionOptions = connectionConfigByName(dataExistsWaitCondition.dataSourceName)
val loadData = sparkSession.read
.format(connectionOptions(FORMAT))
Expand All @@ -75,6 +85,7 @@ object ValidationWaitImplicits {
val authHeader = getAuthHeader(webhookOptions)
val requestWithAuth = if (authHeader.nonEmpty) request.setHeader(authHeader.head._1, authHeader.head._2) else request

LOGGER.debug(s"Attempting HTTP request, url=${webhookWaitCondition.url}")
val tryResponse = Try(requestWithAuth.execute().get())

tryResponse match {
Expand Down

0 comments on commit 7278c0e

Please sign in to comment.