Skip to content

Commit

Permalink
Update validation model attribute names to allow for YAML parsing to …
Browse files Browse the repository at this point in the history
…be correct, parse YAML validation files if they exist
  • Loading branch information
pflooky committed Jun 23, 2024
1 parent 099c147 commit e16d33b
Show file tree
Hide file tree
Showing 19 changed files with 449 additions and 168 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package io.github.datacatering.datacaterer.api
import io.github.datacatering.datacaterer.api.converter.Converters.toScalaMap
import io.github.datacatering.datacaterer.api.model.Constants._
import com.softwaremill.quicklens.ModifyPimp
import io.github.datacatering.datacaterer.api.connection.{CassandraBuilder, ConnectionTaskBuilder, FileBuilder, HttpBuilder, KafkaBuilder, MySqlBuilder, PostgresBuilder, SolaceBuilder}
import io.github.datacatering.datacaterer.api.connection.{CassandraBuilder, ConnectionTaskBuilder, FileBuilder, HttpBuilder, KafkaBuilder, MySqlBuilder, NoopBuilder, PostgresBuilder, SolaceBuilder}
import io.github.datacatering.datacaterer.api.model.DataCatererConfiguration

import scala.annotation.varargs
Expand Down Expand Up @@ -372,6 +372,12 @@ final case class ConnectionConfigWithTaskBuilder(
) {
def this() = this(DEFAULT_DATA_SOURCE_NAME, Map())

def noop(): NoopBuilder = {
val noopBuilder = NoopBuilder()
noopBuilder.connectionConfigWithTaskBuilder = this
noopBuilder
}

def file(name: String, format: String, path: String = "", options: Map[String, String] = Map()): FileBuilder = {
val configBuilder = DataCatererConfigurationBuilder()
val fileConnectionConfig = format match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ case class ValidationBuilder(validation: Validation = ExpressionValidation(), op
val grpWithExpr = GroupByValidation(grpCols, aggCol, aggType, expr)
copyWithDescAndThreshold(grpWithExpr)
case expressionValidation: ExpressionValidation =>
val withExpr = expressionValidation.modify(_.whereExpr).setTo(expr)
val withExpr = expressionValidation.modify(_.expr).setTo(expr)
copyWithDescAndThreshold(withExpr)
case _ => copyWithDescAndThreshold(ExpressionValidation(expr))
}
Expand Down Expand Up @@ -925,7 +925,7 @@ case class CombinationPreFilterBuilder(
validationPreFilterBuilders.map {
case Left(validationBuilder) =>
validationBuilder.validation match {
case exprValidation: ExpressionValidation => exprValidation.whereExpr
case exprValidation: ExpressionValidation => exprValidation.expr
case _ => "true"
}
case Right(conditionType) => conditionType.toString
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ trait ConnectionTaskBuilder[T] {
}
}

case class NoopBuilder() extends ConnectionTaskBuilder[NoopBuilder] {
override def fromBaseConfig(connectionTaskBuilder: ConnectionTaskBuilder[NoopBuilder]): NoopBuilder = {
this.connectionConfigWithTaskBuilder = connectionTaskBuilder.connectionConfigWithTaskBuilder
this
}
}

case class FileBuilder() extends ConnectionTaskBuilder[FileBuilder] {
override def fromBaseConfig(connectionTaskBuilder: ConnectionTaskBuilder[FileBuilder]): FileBuilder = {
this.connectionConfigWithTaskBuilder = connectionTaskBuilder.connectionConfigWithTaskBuilder
Expand Down
Original file line number Diff line number Diff line change
@@ -1,83 +1,116 @@
package io.github.datacatering.datacaterer.api.model

import com.fasterxml.jackson.annotation.JsonTypeInfo.Id
import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonTypeInfo}
import com.fasterxml.jackson.databind.annotation.{JsonDeserialize, JsonTypeIdResolver}
import Constants.{AGGREGATION_SUM, DEFAULT_VALIDATION_COLUMN_NAME_TYPE, DEFAULT_VALIDATION_CONFIG_NAME, DEFAULT_VALIDATION_DESCRIPTION, DEFAULT_VALIDATION_JOIN_TYPE, DEFAULT_VALIDATION_WEBHOOK_HTTP_METHOD, DEFAULT_VALIDATION_WEBHOOK_HTTP_STATUS_CODES, VALIDATION_COLUMN_NAME_COUNT_BETWEEN, VALIDATION_COLUMN_NAME_COUNT_EQUAL, VALIDATION_COLUMN_NAME_MATCH_ORDER, VALIDATION_COLUMN_NAME_MATCH_SET}
import io.github.datacatering.datacaterer.api.{CombinationPreFilterBuilder, PreFilterBuilder, ValidationBuilder}
import com.fasterxml.jackson.annotation.JsonSubTypes.Type
import com.fasterxml.jackson.annotation.{JsonIgnoreProperties, JsonSubTypes, JsonTypeInfo}
import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import io.github.datacatering.datacaterer.api.connection.{ConnectionTaskBuilder, FileBuilder}
import io.github.datacatering.datacaterer.api.parser.ValidationIdResolver
import io.github.datacatering.datacaterer.api.model.Constants.{AGGREGATION_SUM, DEFAULT_VALIDATION_COLUMN_NAME_TYPE, DEFAULT_VALIDATION_CONFIG_NAME, DEFAULT_VALIDATION_DESCRIPTION, DEFAULT_VALIDATION_JOIN_TYPE, DEFAULT_VALIDATION_WEBHOOK_HTTP_METHOD, DEFAULT_VALIDATION_WEBHOOK_HTTP_STATUS_CODES, VALIDATION_COLUMN_NAME_COUNT_BETWEEN, VALIDATION_COLUMN_NAME_COUNT_EQUAL, VALIDATION_COLUMN_NAME_MATCH_ORDER, VALIDATION_COLUMN_NAME_MATCH_SET}
import io.github.datacatering.datacaterer.api.{CombinationPreFilterBuilder, ValidationBuilder}


@JsonTypeInfo(use = Id.CUSTOM, defaultImpl = classOf[ExpressionValidation])
@JsonTypeIdResolver(classOf[ValidationIdResolver])
@JsonSubTypes(Array(
new Type(value = classOf[YamlUpstreamDataSourceValidation]),
new Type(value = classOf[GroupByValidation]),
new Type(value = classOf[ColumnNamesValidation]),
new Type(value = classOf[ExpressionValidation]),
))
@JsonTypeInfo(use = JsonTypeInfo.Id.DEDUCTION)
@JsonIgnoreProperties(ignoreUnknown = true)
trait Validation {
var description: Option[String] = None
@JsonDeserialize(contentAs = classOf[java.lang.Double]) var errorThreshold: Option[Double] = None
var preFilter: Option[CombinationPreFilterBuilder] = None
var preFilterExpr: Option[String] = None

def toOptions: List[List[String]]

def baseOptions: List[List[String]] = {
val descriptionOption = description.map(d => List(List("description", d))).getOrElse(List())
val errorThresholdOption = errorThreshold.map(e => List(List("errorThreshold", e.toString))).getOrElse(List())
val preFilterOptions = getPreFilterExpression match {
case Some(preFilterStringExpr) => List(List("preFilterExpr", preFilterStringExpr))
case _ => List()
}
descriptionOption ++ errorThresholdOption ++ preFilterOptions
}

def getPreFilterExpression: Option[String] = {
(preFilterExpr, preFilter) match {
case (Some(preFilterStringExpr), _) => Some(preFilterStringExpr)
case (_, Some(preFilterBuilder)) =>
if (preFilterBuilder.validate()) Some(preFilterBuilder.toExpression) else None
case _ => None
}
}
}

case class ExpressionValidation(
whereExpr: String = "true",
expr: String = "true",
selectExpr: List[String] = List("*")
) extends Validation {
override def toOptions: List[List[String]] = List(
List("selectExpr", selectExpr.mkString(", ")),
List("whereExpr", whereExpr),
List("errorThreshold", errorThreshold.getOrElse(0.0).toString)
)
List("expr", expr),
) ++ baseOptions
}

case class GroupByValidation(
groupByCols: Seq[String] = Seq(),
aggCol: String = "",
aggType: String = AGGREGATION_SUM,
expr: String = "true"
aggExpr: String = "true"
) extends Validation {
override def toOptions: List[List[String]] = List(
List("expr", expr),
List("aggExpr", aggExpr),
List("groupByColumns", groupByCols.mkString(",")),
List("aggregationColumn", aggCol),
List("aggregationType", aggType),
List("errorThreshold", errorThreshold.getOrElse(0.0).toString)
)
List("aggCol", aggCol),
List("aggType", aggType),
) ++ baseOptions
}

case class UpstreamDataSourceValidation(
validationBuilder: ValidationBuilder = ValidationBuilder(),
validation: ValidationBuilder = ValidationBuilder(),
upstreamDataSource: ConnectionTaskBuilder[_] = FileBuilder(),
upstreamReadOptions: Map[String, String] = Map(),
joinCols: List[String] = List(),
joinColumns: List[String] = List(),
joinType: String = DEFAULT_VALIDATION_JOIN_TYPE,
) extends Validation {
override def toOptions: List[List[String]] = {
val nestedValidation = validationBuilder.validation.toOptions
val nestedValidation = validation.validation.toOptions
List(
List("upstreamDataSource", upstreamDataSource.connectionConfigWithTaskBuilder.dataSourceName),
List("joinColumns", joinCols.mkString(",")),
List("upstreamReadOptions", upstreamReadOptions.mkString(", ")),
List("joinColumns", joinColumns.mkString(",")),
List("joinType", joinType),
) ++ nestedValidation
) ++ nestedValidation ++ baseOptions
}
}

case class YamlUpstreamDataSourceValidation(
upstreamDataSource: String,
validation: Validation = ExpressionValidation(),
upstreamReadOptions: Map[String, String] = Map(),
joinColumns: List[String] = List(),
joinType: String = DEFAULT_VALIDATION_JOIN_TYPE,
) extends Validation {
override def toOptions: List[List[String]] = List()
}

case class ColumnNamesValidation(
`type`: String = DEFAULT_VALIDATION_COLUMN_NAME_TYPE,
columnNameType: String = DEFAULT_VALIDATION_COLUMN_NAME_TYPE,
count: Int = 0,
minCount: Int = 0,
maxCount: Int = 0,
names: Array[String] = Array()
) extends Validation {
override def toOptions: List[List[String]] = {
val baseAttributes = `type` match {
val baseAttributes = columnNameType match {
case VALIDATION_COLUMN_NAME_COUNT_EQUAL => List(List("count", count.toString))
case VALIDATION_COLUMN_NAME_COUNT_BETWEEN => List(List("min", minCount.toString), List("max", maxCount.toString))
case VALIDATION_COLUMN_NAME_MATCH_ORDER => List(List("matchOrder", names.mkString(",")))
case VALIDATION_COLUMN_NAME_MATCH_SET => List(List("matchSet", names.mkString(",")))
case VALIDATION_COLUMN_NAME_COUNT_BETWEEN => List(List("minCount", minCount.toString), List("maxCount", maxCount.toString))
case VALIDATION_COLUMN_NAME_MATCH_ORDER => List(List("names", names.mkString(",")))
case VALIDATION_COLUMN_NAME_MATCH_SET => List(List("names", names.mkString(",")))
}
List(List("columnNameValidationType", `type`)) ++ baseAttributes
List(List("columnNameValidationType", columnNameType)) ++ baseAttributes ++ baseOptions
}
}

Expand All @@ -93,6 +126,18 @@ case class DataSourceValidation(
validations: List[ValidationBuilder] = List()
)

case class YamlValidationConfiguration(
name: String = DEFAULT_VALIDATION_CONFIG_NAME,
description: String = DEFAULT_VALIDATION_DESCRIPTION,
dataSources: Map[String, List[YamlDataSourceValidation]] = Map()
)

case class YamlDataSourceValidation(
options: Map[String, String] = Map(),
waitCondition: WaitCondition = PauseWaitCondition(),
validations: List[Validation] = List()
)

trait WaitCondition {
val isRetryable: Boolean = true
val maxRetries: Int = 10
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package io.github.datacatering.datacaterer.api.parser

import com.fasterxml.jackson.annotation.JsonTypeInfo.Id
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.core.{JsonGenerator, JsonParser}
import com.fasterxml.jackson.databind.jsontype.impl.TypeIdResolverBase
import com.fasterxml.jackson.databind.{DatabindContext, JavaType, JsonSerializer, SerializerProvider}
import com.fasterxml.jackson.databind.{DatabindContext, DeserializationContext, JavaType, JsonDeserializer, JsonSerializer, SerializerProvider}
import io.github.datacatering.datacaterer.api.ValidationBuilder
import io.github.datacatering.datacaterer.api.model.{ExpressionValidation, GroupByValidation}
import io.github.datacatering.datacaterer.api.model.Constants.{VALIDATION_COLUMN_NAME_COUNT_BETWEEN, VALIDATION_COLUMN_NAME_COUNT_EQUAL, VALIDATION_COLUMN_NAME_MATCH_ORDER, VALIDATION_COLUMN_NAME_MATCH_SET}
import io.github.datacatering.datacaterer.api.model.{ColumnNamesValidation, ExpressionValidation, GroupByValidation, UpstreamDataSourceValidation, Validation}

import scala.util.Try

class ValidationIdResolver extends TypeIdResolverBase {
private var superType: JavaType = null
Expand All @@ -14,22 +17,52 @@ class ValidationIdResolver extends TypeIdResolverBase {
superType = bt
}

override def idFromValue(value: Any): String = null
override def idFromValue(value: Any): String = {
idFromValueAndType(value, value.getClass)
}

override def idFromBaseType(): String = {
idFromValueAndType(null, superType.getRawClass)
}

override def idFromValueAndType(value: Any, suggestedType: Class[_]): String = null
override def idFromValueAndType(value: Any, suggestedType: Class[_]): String = {
val Expr = classOf[ExpressionValidation]
val Group = classOf[GroupByValidation]
val Upstream = classOf[UpstreamDataSourceValidation]
val Columns = classOf[ColumnNamesValidation]
suggestedType match {
case Expr => "ExpressionValidation"
case Group => "GroupByValidation"
case Upstream => "UpstreamDataSourceValidation"
case Columns => "ColumnNamesValidation"
case _ => "ExpressionValidation"
}
}

override def getMechanism: Id = null

override def typeFromId(context: DatabindContext, id: String): JavaType = {
val subType = classOf[ExpressionValidation]
val subType = id match {
case "ExpressionValidation" => classOf[ExpressionValidation]
case "GroupByValidation" => classOf[GroupByValidation]
case "UpstreamDataSourceValidation" => classOf[UpstreamDataSourceValidation]
case "ColumnNamesValidation" => classOf[ColumnNamesValidation]
case _ => classOf[ExpressionValidation]
}
context.constructSpecializedType(superType, subType)
}
}

//class ValidationDeserializer extends JsonDeserializer[Validation] {
// override def deserialize(p: JsonParser, ctxt: DeserializationContext): Validation = {
//
// }
//}

class ValidationBuilderSerializer extends JsonSerializer[ValidationBuilder] {
override def serialize(value: ValidationBuilder, gen: JsonGenerator, serializers: SerializerProvider): Unit = {
val validation = value.validation
gen.writeStartObject()
Try(gen.writeStartObject())
validation.preFilter.foreach(preFilter => {
gen.writeStringField("preFilterExpr", preFilter.toExpression)
})
Expand All @@ -46,6 +79,28 @@ class ValidationBuilderSerializer extends JsonSerializer[ValidationBuilder] {
gen.writeStringField("aggCol", aggCol)
gen.writeStringField("aggType", aggType)
gen.writeStringField("expr", expr)
case ColumnNamesValidation(columnNameValidationType, count, minCount, maxCount, names) =>
gen.writeStringField("columnNameType", columnNameValidationType)
columnNameValidationType match {
case VALIDATION_COLUMN_NAME_COUNT_EQUAL =>
gen.writeStringField("count", count.toString)
case VALIDATION_COLUMN_NAME_COUNT_BETWEEN =>
gen.writeStringField("min", minCount.toString)
gen.writeStringField("max", maxCount.toString)
case VALIDATION_COLUMN_NAME_MATCH_ORDER =>
gen.writeStringField("matchOrder", names.mkString(","))
case VALIDATION_COLUMN_NAME_MATCH_SET =>
gen.writeStringField("matchSet", names.mkString(","))
}
case UpstreamDataSourceValidation(validationBuilder, upstreamDataSource, upstreamReadOptions, joinCols, joinType) =>
gen.writeStringField("upstreamDataSource", upstreamDataSource.connectionConfigWithTaskBuilder.dataSourceName)
gen.writeObjectFieldStart("upstreamReadOptions")
upstreamReadOptions.foreach(opt => gen.writeObjectField(opt._1, opt._2))
gen.writeEndObject()
gen.writeStringField("joinColumns", joinCols.mkString(","))
gen.writeStringField("joinType", joinType)
gen.writeObjectFieldStart("validation")
serialize(validationBuilder, gen, serializers)
case _ =>
}
gen.writeEndObject()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class PlanBuilderTest extends AnyFunSuite {
assert(validationHead.description.contains("name is equal to Peter"))
assert(validationHead.errorThreshold.contains(0.1))
assert(validationHead.isInstanceOf[ExpressionValidation])
assert(validationHead.asInstanceOf[ExpressionValidation].whereExpr == "name == 'Peter'")
assert(validationHead.asInstanceOf[ExpressionValidation].expr == "name == 'Peter'")
assert(dataSourceHead._2.head.options == Map("path" -> "test/path/json"))
assert(dataSourceHead._2.head.waitCondition == PauseWaitCondition())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class PlanRunTest extends AnyFunSuite {
assert(dsValidation._2.head.validations.size == 1)
assert(dsValidation._2.head.validations.head.validation.isInstanceOf[ExpressionValidation])
val expressionValidation = dsValidation._2.head.validations.head.validation.asInstanceOf[ExpressionValidation]
assert(expressionValidation.whereExpr == "account_id != ''")
assert(expressionValidation.expr == "account_id != ''")
}

test("Can create plan with multiple validations for one data source") {
Expand All @@ -105,10 +105,10 @@ class PlanRunTest extends AnyFunSuite {
assert(dsValidation._1 == "my_postgres")
val accountValid = dsValidation._2.filter(_.options.get(JDBC_TABLE).contains("account.accounts")).head
assert(accountValid.validations.size == 1)
assert(accountValid.validations.exists(v => v.validation.asInstanceOf[ExpressionValidation].whereExpr == "account_id != ''"))
assert(accountValid.validations.exists(v => v.validation.asInstanceOf[ExpressionValidation].expr == "account_id != ''"))
val txnValid = dsValidation._2.filter(_.options.get(JDBC_TABLE).contains("account.transactions")).head
assert(txnValid.validations.size == 1)
assert(txnValid.validations.exists(v => v.validation.asInstanceOf[ExpressionValidation].whereExpr == "txn_id IS NOT NULL"))
assert(txnValid.validations.exists(v => v.validation.asInstanceOf[ExpressionValidation].expr == "txn_id IS NOT NULL"))
}

test("Can create plan with validations only defined") {
Expand All @@ -124,7 +124,7 @@ class PlanRunTest extends AnyFunSuite {
assert(result._validations.head.dataSources.contains("my_csv"))
val validRes = result._validations.head.dataSources("my_csv").head
assert(validRes.validations.size == 1)
assert(validRes.validations.head.validation.asInstanceOf[ExpressionValidation].whereExpr == "account_id != 'acc123'")
assert(validRes.validations.head.validation.asInstanceOf[ExpressionValidation].expr == "account_id != 'acc123'")
assert(validRes.options.nonEmpty)
assert(validRes.options == Map(FORMAT -> "csv", PATH -> "/my/csv"))
}
Expand Down
Loading

0 comments on commit e16d33b

Please sign in to comment.