Skip to content

Commit

Permalink
#221 Improve IdentityTransformer to use more convenient option.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 12, 2023
1 parent 0c88f50 commit 498b1a3
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 7 deletions.
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1397,6 +1397,10 @@ pramen.operations = [
date.to = "@infoDate" // optional
}
]
option {
input.table = "table1"
}
},
{
name = "A Kafka sink"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

val taskName = s"Files sourced - ${task.job.outputTable.name} - ${task.runInfo.map(_.infoDate.toString).getOrElse(" ")}"

tableHeaders.append(TableHeader(TextElement(taskName), Align.Center))
tableHeaders.append(TableHeader(TextElement(taskName), Align.Left))
tableBuilder.withHeaders(tableHeaders.toSeq)

runStatus.filesRead.sorted.foreach(fileName => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,19 @@ package za.co.absa.pramen.core.transformers

import org.apache.spark.sql.DataFrame
import za.co.absa.pramen.api.{MetastoreReader, Reason, Transformer}
import za.co.absa.pramen.core.transformers.IdentityTransformer._

import java.time.LocalDate

class IdentityTransformer extends Transformer {
override def validate(metastore: MetastoreReader, infoDate: LocalDate, options: Map[String, String]): Reason = {
if (!options.contains("table")) {
throw new IllegalArgumentException(s"Option 'table' is not defined")
if (!options.contains(INPUT_TABLE_KEY) && !options.contains(INPUT_TABLE_LEGACY_KEY)) {
throw new IllegalArgumentException(s"Option '$INPUT_TABLE_KEY' is not defined")
}

val emptyAllowed = options.getOrElse("empty.allowed", "true").toBoolean
val emptyAllowed = options.getOrElse(EMPTY_ALLOWED_KEY, "true").toBoolean

val tableName = options("table")
val tableName = options.getOrElse(INPUT_TABLE_KEY, options(INPUT_TABLE_LEGACY_KEY))

val df = metastore.getTable(tableName, Option(infoDate), Option(infoDate))

Expand All @@ -43,8 +44,14 @@ class IdentityTransformer extends Transformer {
override def run(metastore: MetastoreReader,
infoDate: LocalDate,
options: Map[String, String]): DataFrame = {
val tableName = options("table")
val tableName = options.getOrElse(INPUT_TABLE_KEY, options(INPUT_TABLE_LEGACY_KEY))

metastore.getTable(tableName, Option(infoDate), Option(infoDate))
}
}

object IdentityTransformer {
val INPUT_TABLE_KEY = "input.table"
val INPUT_TABLE_LEGACY_KEY = "table"
val EMPTY_ALLOWED_KEY = "empty.allowed"
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC
"pass when the mandatory option is present" in {
val (transformer, metastore) = getUseCase

val outcome = transformer.validate(metastore, infoDateWithData, Map("input.table" -> "table1"))

assert(outcome == Reason.Ready)
}

"pass when the legacy mandatory option is present" in {
val (transformer, metastore) = getUseCase

val outcome = transformer.validate(metastore, infoDateWithData, Map("table" -> "table1"))

assert(outcome == Reason.Ready)
Expand Down Expand Up @@ -66,7 +74,7 @@ class IdentityTransformerSuite extends AnyWordSpec with SparkTestBase with TextC
transformer.validate(metastore, infoDateWithData, Map.empty)
}

assert(ex.getMessage.contains("Option 'table' is not defined"))
assert(ex.getMessage.contains("Option 'input.table' is not defined"))
}
}

Expand Down

0 comments on commit 498b1a3

Please sign in to comment.