From 910c93d9234433d226b833848dc518dc148d90f4 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 6 Nov 2024 16:30:23 +0100 Subject: [PATCH] #495 Add operation definitions to the task results object used in notifications. --- .../co/absa/pramen/api/jobdef/Schedule.scala | 56 ++++++++++++ .../co/absa/pramen/api/jobdef/SinkTable.scala | 33 +++++++ .../absa/pramen/api/jobdef/SourceTable.scala | 33 +++++++ .../pramen/api/jobdef/TransferTable.scala | 42 +++++++++ .../api/jobdef/TransformExpression.scala | 23 +++++ .../co/absa/pramen/api/status/JobType.scala | 64 +++++++++++++ .../co/absa/pramen/api/status/TaskDef.scala | 29 ++++++ .../absa/pramen/api/status/TaskResult.scala | 5 +- .../notify/HyperdriveNotificationTarget.scala | 2 +- .../PipelineNotificationBuilderHtml.scala | 10 +-- .../pipeline/IncrementalIngestionJob.scala | 3 +- .../pramen/core/pipeline/IngestionJob.scala | 5 +- .../za/co/absa/pramen/core/pipeline/Job.scala | 4 +- .../absa/pramen/core/pipeline/JobBase.scala | 10 ++- .../pramen/core/pipeline/OperationDef.scala | 7 +- .../core/pipeline/OperationSplitter.scala | 10 +-- .../pramen/core/pipeline/OperationType.scala | 7 +- .../pipeline/PythonTransformationJob.scala | 4 +- .../absa/pramen/core/pipeline/SinkJob.scala | 6 +- ...{SinkTable.scala => SinkTableParser.scala} | 19 +--- ...rceTable.scala => SourceTableParser.scala} | 19 +--- .../pramen/core/pipeline/TransferJob.scala | 10 ++- ...rTable.scala => TransferTableParser.scala} | 90 ++++++++++++------- ....scala => TransformExpressionParser.scala} | 11 +-- .../core/pipeline/TransformationJob.scala | 5 +- .../jobrunner/ConcurrentJobRunnerImpl.scala | 3 +- .../orchestrator/OrchestratorImpl.scala | 3 +- .../runner/splitter/ScheduleStrategy.scala | 2 +- .../ScheduleStrategyIncremental.scala | 2 +- .../splitter/ScheduleStrategySourcing.scala | 2 +- .../ScheduleStrategyTransformation.scala | 2 +- .../splitter/ScheduleStrategyUtils.scala | 2 +- .../core/runner/task/TaskRunnerBase.scala | 55 ++++++------ .../{Schedule.scala => ScheduleParser.scala} | 46 ++-------- .../pramen/core/utils/ScheduleUtils.scala | 2 +- .../absa/pramen/core/utils/SparkUtils.scala | 2 +- .../pramen/core/OperationDefFactory.scala | 4 +- .../co/absa/pramen/core/TaskDefFactory.scala | 34 +++++++ .../pramen/core/TaskNotificationFactory.scala | 6 +- .../core/metastore/MetastoreSuite.scala | 2 +- .../pramen/core/mocks/SinkTableFactory.scala | 2 +- .../core/mocks/SourceTableFactory.scala | 2 +- .../pramen/core/mocks/TaskResultFactory.scala | 7 +- .../core/mocks/TransferTableFactory.scala | 2 +- .../pramen/core/mocks/job/JobBaseDummy.scala | 3 +- .../absa/pramen/core/mocks/job/JobSpy.scala | 8 +- .../mocks/notify/NotificationTargetMock.scala | 2 +- .../mocks/runner/ConcurrentJobRunnerSpy.scala | 3 +- .../core/pipeline/IngestionJobSuite.scala | 1 + .../core/pipeline/OperationDefSuite.scala | 2 +- .../core/pipeline/PipelineDefSuite.scala | 2 +- .../pramen/core/pipeline/SinkJobSuite.scala | 3 +- ...Suite.scala => SinkTableParserSuite.scala} | 10 +-- ...ite.scala => SourceTableParserSuite.scala} | 12 +-- .../core/pipeline/TransferJobSuite.scala | 5 +- .../core/pipeline/TransferTableSuite.scala | 18 ++-- .../pipeline/TransformationJobSuite.scala | 2 +- .../pramen/core/schedule/ScheduleSuite.scala | 43 ++++----- .../tests/journal/TaskCompletedSuite.scala | 6 +- .../TaskRunnerMultithreadedSuite.scala | 4 +- .../ScheduleStrategyIncrementalSuite.scala | 2 +- .../splitter/ScheduleStrategySuite.scala | 2 +- .../splitter/ScheduleStrategyUtilsSuite.scala | 2 +- .../runner/task/TaskRunnerBaseSuite.scala | 1 + .../core/tests/schedule/ScheduleSuite.scala | 19 ++-- .../core/tests/utils/ScheduleUtilsSuite.scala | 4 +- .../core/tests/utils/SparkUtilsSuite.scala | 2 +- .../notification/EcsNotificationTarget.scala | 2 +- .../EcsPipelineNotificationTarget.scala | 14 +-- .../pramen/extras/MetaTableDefFactory.scala | 53 +++++++++++ .../absa/pramen/extras/TaskDefFactory.scala | 32 +++++++ .../pramen/extras/mocks/TestPrototypes.scala | 13 ++- .../EcsNotificationTargetSuite.scala | 5 +- .../EcsPipelineNotificationTargetSuite.scala | 15 +++- 74 files changed, 686 insertions(+), 291 deletions(-) create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/Schedule.scala create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SinkTable.scala create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SourceTable.scala create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransferTable.scala create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransformExpression.scala create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/status/JobType.scala create mode 100644 pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskDef.scala rename pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/{SinkTable.scala => SinkTableParser.scala} (81%) rename pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/{SourceTable.scala => SourceTableParser.scala} (78%) rename pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/{TransferTable.scala => TransferTableParser.scala} (73%) rename pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/{TransformExpression.scala => TransformExpressionParser.scala} (89%) rename pramen/core/src/main/scala/za/co/absa/pramen/core/schedule/{Schedule.scala => ScheduleParser.scala} (62%) create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/TaskDefFactory.scala rename pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/{SinkTableSuite.scala => SinkTableParserSuite.scala} (94%) rename pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/{SourceTableSuite.scala => SourceTableParserSuite.scala} (94%) create mode 100644 pramen/extras/src/test/scala/za/co/absa/pramen/extras/MetaTableDefFactory.scala create mode 100644 pramen/extras/src/test/scala/za/co/absa/pramen/extras/TaskDefFactory.scala diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/Schedule.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/Schedule.scala new file mode 100644 index 000000000..4bdf0c0c2 --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/Schedule.scala @@ -0,0 +1,56 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.jobdef + +import java.time.{DayOfWeek, LocalDate} + +sealed trait Schedule { + def isEnabled(day: LocalDate): Boolean +} + +object Schedule { + case object Incremental extends Schedule { + def isEnabled(day: LocalDate): Boolean = true + + override def toString: String = "incremental" + } + + case class EveryDay() extends Schedule { + def isEnabled(day: LocalDate): Boolean = true + + override def toString: String = "daily" + } + + case class Weekly(days: Seq[DayOfWeek]) extends Schedule { + def isEnabled(day: LocalDate): Boolean = days.contains(day.getDayOfWeek) + + override def toString: String = s"weekly (${days.mkString(", ")})" + } + + case class Monthly(days: Seq[Int]) extends Schedule { + val hasPositiveDays: Boolean = days.exists(_ > 0) + val hasNegativeDays: Boolean = days.exists(_ < 0) + + def isEnabled(day: LocalDate): Boolean = { + val isInPositives = hasPositiveDays && days.contains(day.getDayOfMonth) + val isInNegatives = hasNegativeDays && days.contains(-day.lengthOfMonth() + day.getDayOfMonth - 1) + isInPositives || isInNegatives + } + + override def toString: String = s"monthly (${days.mkString(", ")})" + } +} diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SinkTable.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SinkTable.scala new file mode 100644 index 000000000..3c5d44977 --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SinkTable.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.jobdef + +import com.typesafe.config.Config + +case class SinkTable( + metaTableName: String, + outputTableName: Option[String], + conf: Config, + rangeFromExpr: Option[String], + rangeToExpr: Option[String], + warnMaxExecutionTimeSeconds: Option[Int], + transformations: Seq[TransformExpression], + filters: Seq[String], + columns: Seq[String], + options: Map[String, String], + overrideConf: Option[Config] + ) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SourceTable.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SourceTable.scala new file mode 100644 index 000000000..4182a855e --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/SourceTable.scala @@ -0,0 +1,33 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.jobdef + +import com.typesafe.config.Config +import za.co.absa.pramen.api.Query + +case class SourceTable( + metaTableName: String, + query: Query, + conf: Config, + rangeFromExpr: Option[String], + rangeToExpr: Option[String], + warnMaxExecutionTimeSeconds: Option[Int], + transformations: Seq[TransformExpression], + filters: Seq[String], + columns: Seq[String], + overrideConf: Option[Config] + ) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransferTable.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransferTable.scala new file mode 100644 index 000000000..433c3c9db --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransferTable.scala @@ -0,0 +1,42 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.jobdef + +import com.typesafe.config.Config +import za.co.absa.pramen.api.{DataFormat, Query} + +import java.time.LocalDate + +case class TransferTable( + query: Query, + jobMetaTableName: String, + conf: Config, + rangeFromExpr: Option[String], + rangeToExpr: Option[String], + infoDateStart: LocalDate, + trackDays: Int, + trackDaysExplicitlySet: Boolean, + warnMaxExecutionTimeSeconds: Option[Int], + transformations: Seq[TransformExpression], + filters: Seq[String], + columns: Seq[String], + readOptions: Map[String, String], + writeOptions: Map[String, String], + sparkConfig: Map[String, String], + sourceOverrideConf: Option[Config], + sinkOverrideConf: Option[Config] + ) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransformExpression.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransformExpression.scala new file mode 100644 index 000000000..95b51081c --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/jobdef/TransformExpression.scala @@ -0,0 +1,23 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.jobdef + +case class TransformExpression( + column: String, + expression: Option[String], + comment: Option[String] + ) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/JobType.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/JobType.scala new file mode 100644 index 000000000..d358ed11b --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/JobType.scala @@ -0,0 +1,64 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.status + +import com.typesafe.config.Config +import za.co.absa.pramen.api.jobdef.{SinkTable, SourceTable, TransferTable} + +sealed trait JobType { + def name: String +} + +object JobType { + case class Ingestion( + sourceName: String, + sourceTable: SourceTable, + sourceConfig: Config + ) extends JobType { + override def name: String = "ingestion" + } + + case class Transformation( + factoryClass: String + ) extends JobType { + override def name: String = "transformation" + } + + case class PythonTransformation( + pythonClass: String + ) extends JobType { + override def name: String = "python_transformation" + } + + case class Sink( + sinkName: String, + sinkTable: SinkTable, + sinkConfig: Config + ) extends JobType { + override def name: String = "sink" + } + + case class Transfer( + sourceName: String, + sourceConfig: Config, + sinkName: String, + sinkConfig: Config, + transferTable: TransferTable + ) extends JobType { + override def name: String = "transfer" + } +} diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskDef.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskDef.scala new file mode 100644 index 000000000..9686bb4f3 --- /dev/null +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskDef.scala @@ -0,0 +1,29 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.api.status + +import com.typesafe.config.Config +import za.co.absa.pramen.api.MetaTableDef +import za.co.absa.pramen.api.jobdef.Schedule + +case class TaskDef( + name: String, + jobType: JobType, + outputTable: MetaTableDef, + schedule: Schedule, + operationConf: Config + ) diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskResult.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskResult.scala index d057abd16..70c004e37 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskResult.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/TaskResult.scala @@ -16,11 +16,10 @@ package za.co.absa.pramen.api.status -import za.co.absa.pramen.api.{MetaTableDef, SchemaDifference} +import za.co.absa.pramen.api.SchemaDifference case class TaskResult( - jobName: String, - outputTable: MetaTableDef, + taskDef: TaskDef, runStatus: RunStatus, runInfo: Option[RunInfo], applicationId: String, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala index d7b1f1bae..d14bc8482 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/HyperdriveNotificationTarget.scala @@ -52,7 +52,7 @@ class HyperdriveNotificationTarget(conf: Config, log.info(s"Not sending '$token' to the Hyperdrive Kafka topic: '$topic' for the unsuccessful job...") } } else { - log.warn(s"$WARNING Token is not configured for ${notification.outputTable.name}. Hyperdrive notification won't be sent. Please, set 'notification.$TOKEN_KEY' option for the job.") + log.warn(s"$WARNING Token is not configured for ${notification.taskDef.outputTable.name}. Hyperdrive notification won't be sent. Please, set 'notification.$TOKEN_KEY' option for the job.") } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index ae22f4f06..f051330df 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -295,9 +295,9 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot private[core] def renderJobException(builder: MessageBuilder, taskResult: TaskResult, ex: Throwable): MessageBuilder = { val paragraphBuilder = ParagraphBuilder() .withText("Job ", Style.Exception) - .withText(taskResult.jobName, Style.Error) + .withText(taskResult.taskDef.name, Style.Error) .withText(" outputting to ", Style.Exception) - .withText(taskResult.outputTable.name, Style.Error) + .withText(taskResult.taskDef.outputTable.name, Style.Error) taskResult.runInfo.foreach(info => paragraphBuilder @@ -404,8 +404,8 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot tasks.foreach(task => { val row = new ListBuffer[TextElement] - row.append(TextElement(task.jobName, getTransientTextStyle(task))) - row.append(TextElement(task.outputTable.name, getTransientTextStyle(task))) + row.append(TextElement(task.taskDef.name, getTransientTextStyle(task))) + row.append(TextElement(task.taskDef.outputTable.name, getTransientTextStyle(task))) if (haveHiveColumn) { val hiveTable = task.runStatus match { @@ -490,7 +490,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot val tableHeaders = new ListBuffer[TableHeader] - val taskName = s"Files sourced - ${task.outputTable.name} - ${task.runInfo.map(_.infoDate.toString).getOrElse(" ")}" + val taskName = s"Files sourced - ${task.taskDef.outputTable.name} - ${task.runInfo.map(_.infoDate.toString).getOrElse(" ")}" tableHeaders.append(TableHeader(TextElement(taskName), Align.Left)) tableBuilder.withHeaders(tableHeaders.toSeq) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index a6a6e3afb..ea816828c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -20,10 +20,11 @@ import com.typesafe.config.Config import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession} +import za.co.absa.pramen.api.jobdef.SourceTable import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue} import za.co.absa.pramen.api.sql.SqlGeneratorBase -import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason} import za.co.absa.pramen.api.{Reason, Source} import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest} import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala index 5d8b14e41..f84f09f6c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IngestionJob.scala @@ -19,7 +19,8 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, SparkSession} -import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.jobdef.SourceTable +import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult} import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper @@ -48,6 +49,8 @@ class IngestionJob(operationDef: OperationDef, extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) { import JobBase._ + override val jobType: JobType = JobType.Ingestion(sourceName, sourceTable, source.config) + override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing override def trackDays: Int = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala index c28a404ea..20d050300 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/Job.scala @@ -20,7 +20,7 @@ import com.typesafe.config.Config import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType import za.co.absa.pramen.api.Reason -import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason} import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.runner.splitter.ScheduleStrategy @@ -35,6 +35,8 @@ trait Job { val scheduleStrategy: ScheduleStrategy + def taskDef: TaskDef + def allowRunningTasksInParallel: Boolean def notificationTargets: Seq[JobNotificationTarget] diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala index ffb4f5a90..dd36fe886 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala @@ -19,12 +19,12 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.types.StructType import org.slf4j.{Logger, LoggerFactory} -import za.co.absa.pramen.api.status.{DependencyFailure, DependencyWarning, MetastoreDependency, TaskRunReason} +import za.co.absa.pramen.api.jobdef.Schedule +import za.co.absa.pramen.api.status.{DependencyFailure, DependencyWarning, JobType, MetastoreDependency, TaskDef, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.expr.DateExprEvaluator import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable -import za.co.absa.pramen.core.schedule.Schedule import za.co.absa.pramen.core.utils.Emoji._ import za.co.absa.pramen.core.utils.{Emoji, TimeUtils} @@ -39,6 +39,12 @@ abstract class JobBase(operationDef: OperationDef, ) extends Job { protected val log: Logger = LoggerFactory.getLogger(this.getClass) + def jobType: JobType + + override def taskDef: TaskDef = TaskDef( + name, jobType, MetaTable.getMetaTableDef(outputTableDef), operationDef.schedule, operationDef.operationConf + ) + override val name: String = operationDef.name override val outputTable: MetaTable = outputTableDef diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala index 70f92f63d..a098a4493 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationDef.scala @@ -18,12 +18,13 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.jobdef.{Schedule, TransformExpression} import za.co.absa.pramen.api.status.MetastoreDependency import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_INCREMENTAL_INFO_DATE_EXPR import za.co.absa.pramen.core.config.Keys import za.co.absa.pramen.core.metastore.model.MetastoreDependencyFactory -import za.co.absa.pramen.core.schedule.Schedule +import za.co.absa.pramen.core.schedule.ScheduleParser import za.co.absa.pramen.core.utils.ConfigUtils import scala.collection.JavaConverters._ @@ -89,7 +90,7 @@ object OperationDef { } val operationType = OperationType.fromConfig(conf, appConfig, infoDateConfig, parent) - val schedule = Schedule.fromConfig(conf) + val schedule = ScheduleParser.fromConfig(conf) val expectedDelayDays = ConfigUtils.getOptionInt(conf, EXPECTED_DELAY_DAYS_KEY).getOrElse(defaultDelayDays) val consumeThreads = getThreadsToConsume(name, conf, appConfig) val allowParallel = ConfigUtils.getOptionBoolean(conf, ALLOW_PARALLEL_KEY).getOrElse(true) @@ -100,7 +101,7 @@ object OperationDef { val processingTimestampColumn = ConfigUtils.getOptionString(conf, PROCESSING_TIMESTAMP_COLUMN_KEY) val warnMaximumExecutionTimeSeconds = ConfigUtils.getOptionInt(conf, WARN_MAXIMUM_EXECUTION_TIME_SECONDS_KEY) val killMaximumExecutionTimeSeconds = ConfigUtils.getOptionInt(conf, KILL_MAXIMUM_EXECUTION_TIME_SECONDS_KEY) - val schemaTransformations = TransformExpression.fromConfig(conf, SCHEMA_TRANSFORMATIONS_KEY, parent) + val schemaTransformations = TransformExpressionParser.fromConfig(conf, SCHEMA_TRANSFORMATIONS_KEY, parent) val filters = ConfigUtils.getOptListStrings(conf, FILTERS_KEY) val notificationTargets = ConfigUtils.getOptListStrings(conf, NOTIFICATION_TARGETS_KEY) val sparkConfigOptions = ConfigUtils.getExtraOptions(conf, SPARK_CONFIG_PREFIX) ++ ConfigUtils.getExtraOptions(conf, SPARK_CONFIG_PREFIX_V2) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala index a89a0c8c1..b6f868c81 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationSplitter.scala @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.jobdef.{Schedule, SinkTable, SourceTable, TransferTable} import za.co.absa.pramen.api.{DataFormat, Transformer} import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper @@ -30,7 +31,6 @@ import za.co.absa.pramen.core.pipeline.OperationSplitter.{DISABLE_COUNT_QUERY, g import za.co.absa.pramen.core.pipeline.OperationType._ import za.co.absa.pramen.core.pipeline.PythonTransformationJob._ import za.co.absa.pramen.core.process.ProcessRunner -import za.co.absa.pramen.core.schedule.Schedule import za.co.absa.pramen.core.sink.SinkManager import za.co.absa.pramen.core.source.SourceManager import za.co.absa.pramen.core.utils.{ClassLoaderUtils, ConfigUtils} @@ -100,12 +100,12 @@ class OperationSplitter(conf: Config, } val disableCountQuery = ConfigUtils.getOptionBoolean(source.config, DISABLE_COUNT_QUERY).getOrElse(false) - val outputTable = transferTable.getMetaTable + val outputTable = TransferTableParser.getMetaTable(transferTable) val notificationTargets = operationDef.notificationTargets .map(targetName => getNotificationTarget(conf, targetName, transferTable.conf)) - new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, transferTable, outputTable, sink, specialCharacters, temporaryDirectory, disableCountQuery) + new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery) }) } @@ -119,7 +119,7 @@ class OperationSplitter(conf: Config, val notificationTargets = operationDef.notificationTargets .map(targetName => getNotificationTarget(conf, targetName, operationDef.operationConf)) - Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, transformer)) + Seq(new TransformationJob(operationDef, metastore, bookkeeper, notificationTargets, outputMetaTable, clazz, transformer)) } def createPythonTransformation(operationDef: OperationDef, @@ -163,7 +163,7 @@ class OperationSplitter(conf: Config, val notificationTargets = operationDef.notificationTargets .map(targetName => getNotificationTarget(conf, targetName, sinkTable.conf)) - new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, outputTable, sink, sinkTable) + new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, outputTable, sinkName, sink, sinkTable) }) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationType.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationType.scala index 3df636e7a..dea6803e5 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationType.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/OperationType.scala @@ -17,6 +17,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config +import za.co.absa.pramen.api.jobdef.{SinkTable, SourceTable, TransferTable} import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.utils.ConfigUtils @@ -61,7 +62,7 @@ object OperationType { ConfigUtils.validatePathsExistence(conf, parent, Seq(SOURCE_KEY, TABLES_KEY)) val source = conf.getString(SOURCE_KEY) - val tables = SourceTable.fromConfig(conf, TABLES_KEY) + val tables = SourceTableParser.fromConfig(conf, TABLES_KEY) Ingestion(source, tables) case "transformation" | "transformer" | "transform" => ConfigUtils.validatePathsExistence(conf, parent, Seq(CLASS_KEY, OUTPUT_TABLE_KEY)) @@ -79,14 +80,14 @@ object OperationType { ConfigUtils.validatePathsExistence(conf, parent, Seq(SINK_KEY, TABLES_KEY)) val sink = conf.getString(SINK_KEY) - val tables = SinkTable.fromConfig(conf, TABLES_KEY) + val tables = SinkTableParser.fromConfig(conf, TABLES_KEY) Sink(sink, tables) case "transfer" | "source2sink" => ConfigUtils.validatePathsExistence(conf, parent, Seq(SOURCE_KEY, SINK_KEY, TABLES_KEY)) val source = conf.getString(SOURCE_KEY) val sink = conf.getString(SINK_KEY) - val tables = TransferTable.fromConfig(conf, infoDateConfig, TABLES_KEY, sink) + val tables = TransferTableParser.fromConfig(conf, infoDateConfig, TABLES_KEY, sink) Transfer(source, sink, tables) case _ => throw new IllegalArgumentException(s"Unknown operation type: ${conf.getString(TYPE_KEY)} at $parent") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala index 33d6ebb2b..2a88bdd97 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} -import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{DataFormat, Reason} import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper @@ -67,6 +67,8 @@ class PythonTransformationJob(operationDef: OperationDef, (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper,notificationTargets, outputTable) { + override val jobType: JobType = JobType.PythonTransformation(pythonClass) + private val minimumRecords: Int = operationDef.extraOptions.getOrElse(MINIMUM_RECORDS_OPTION, "0").toInt override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index 7d3913474..8e2672ee2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -18,7 +18,8 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} -import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.jobdef.SinkTable +import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Sink} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.model.MetaTable @@ -37,12 +38,15 @@ class SinkJob(operationDef: OperationDef, bookkeeper: Bookkeeper, notificationTargets: Seq[JobNotificationTarget], outputTable: MetaTable, + sinkName: String, sink: Sink, sinkTable: SinkTable) (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) { import JobBase._ + override val jobType: JobType = JobType.Sink(sinkName, sinkTable, sink.config) + private val inputTables = operationDef.dependencies.flatMap(_.tables).distinct override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkTableParser.scala similarity index 81% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkTable.scala rename to pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkTableParser.scala index 16399addb..f5ee8f815 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkTableParser.scala @@ -18,26 +18,13 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.jobdef.SinkTable import za.co.absa.pramen.core.pipeline.OperationDef.WARN_MAXIMUM_EXECUTION_TIME_SECONDS_KEY import za.co.absa.pramen.core.utils.{AlgorithmUtils, ConfigUtils} import scala.collection.JavaConverters._ -case class SinkTable( - metaTableName: String, - outputTableName: Option[String], - conf: Config, - rangeFromExpr: Option[String], - rangeToExpr: Option[String], - warnMaxExecutionTimeSeconds: Option[Int], - transformations: Seq[TransformExpression], - filters: Seq[String], - columns: Seq[String], - options: Map[String, String], - overrideConf: Option[Config] - ) - -object SinkTable { +object SinkTableParser { private val log = LoggerFactory.getLogger(this.getClass) val METATABLE_TABLE_KEY = "input.metastore.table" @@ -59,7 +46,7 @@ object SinkTable { val dateFromExpr = ConfigUtils.getOptionString(conf, DATE_FROM_KEY) val dateToExpr = ConfigUtils.getOptionString(conf, DATE_TO_KEY) val maximumExecutionTimeSeconds = ConfigUtils.getOptionInt(conf, WARN_MAXIMUM_EXECUTION_TIME_SECONDS_KEY) - val transformations = TransformExpression.fromConfig(conf, TRANSFORMATIONS_KEY, parentPath) + val transformations = TransformExpressionParser.fromConfig(conf, TRANSFORMATIONS_KEY, parentPath) val filters = ConfigUtils.getOptListStrings(conf, FILTERS_KEY) val columns = ConfigUtils.getOptListStrings(conf, COLUMNS_KEY) val options = ConfigUtils.getExtraOptions(conf, "output") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SourceTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SourceTableParser.scala similarity index 78% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SourceTable.scala rename to pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SourceTableParser.scala index d94d17b79..8c7db2992 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SourceTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SourceTableParser.scala @@ -18,27 +18,14 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.slf4j.LoggerFactory -import za.co.absa.pramen.api.Query +import za.co.absa.pramen.api.jobdef.{SourceTable, TransformExpression} import za.co.absa.pramen.core.model.QueryBuilder import za.co.absa.pramen.core.pipeline.OperationDef.WARN_MAXIMUM_EXECUTION_TIME_SECONDS_KEY import za.co.absa.pramen.core.utils.{AlgorithmUtils, ConfigUtils} import scala.collection.JavaConverters._ -case class SourceTable( - metaTableName: String, - query: Query, - conf: Config, - rangeFromExpr: Option[String], - rangeToExpr: Option[String], - warnMaxExecutionTimeSeconds: Option[Int], - transformations: Seq[TransformExpression], - filters: Seq[String], - columns: Seq[String], - overrideConf: Option[Config] // ToDo: Add support for arbitrary read options passed to Spark (for cases like mergeSchema etc) - ) - -object SourceTable { +object SourceTableParser { private val log = LoggerFactory.getLogger(this.getClass) val METATABLE_TABLE_KEY = "output.metastore.table" @@ -60,7 +47,7 @@ object SourceTable { val dateToExpr = ConfigUtils.getOptionString(conf, DATE_TO_KEY) val maximumExecutionTimeSeconds = ConfigUtils.getOptionInt(conf, WARN_MAXIMUM_EXECUTION_TIME_SECONDS_KEY) val columns = ConfigUtils.getOptListStrings(conf, COLUMNS_KEY) - val transformations = TransformExpression.fromConfig(conf, TRANSFORMATIONS_KEY, parentPath) + val transformations = TransformExpressionParser.fromConfig(conf, TRANSFORMATIONS_KEY, parentPath) val filters = ConfigUtils.getOptListStrings(conf, FILTERS_KEY) val overrideConf = if (conf.hasPath(SOURCE_OVERRIDE_PREFIX)) { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala index 250115a07..7dd2684db 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferJob.scala @@ -18,7 +18,8 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SparkSession} -import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.jobdef.TransferTable +import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Sink, Source} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.Metastore @@ -35,6 +36,7 @@ class TransferJob(operationDef: OperationDef, source: Source, table: TransferTable, bookkeepingMetaTable: MetaTable, + sinkName: String, sink: Sink, specialCharacters: String, tempDirectory: Option[String], @@ -42,8 +44,10 @@ class TransferJob(operationDef: OperationDef, (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, bookkeepingMetaTable) { - val ingestionJob = new IngestionJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, table.getSourceTable, bookkeepingMetaTable, specialCharacters, tempDirectory, disableCountQuery) - val sinkJob = new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, bookkeepingMetaTable, sink, table.getSinkTable) + override val jobType: JobType = JobType.Transfer(sourceName, source.config, sinkName, sink.config, table) + + val ingestionJob = new IngestionJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, TransferTableParser.getSourceTable(table), bookkeepingMetaTable, specialCharacters, tempDirectory, disableCountQuery) + val sinkJob = new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, bookkeepingMetaTable, sinkName, sink, TransferTableParser.getSinkTable(table)) override val scheduleStrategy: ScheduleStrategy = ingestionJob.scheduleStrategy diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferTable.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferTableParser.scala similarity index 73% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferTable.scala rename to pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferTableParser.scala index 6da982287..c7f13d234 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferTable.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransferTableParser.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.jobdef.{SinkTable, SourceTable, TransferTable} import za.co.absa.pramen.api.{DataFormat, Query} import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.config.InfoDateOverride @@ -29,39 +30,7 @@ import za.co.absa.pramen.core.utils.{AlgorithmUtils, ConfigUtils} import java.time.LocalDate import scala.collection.JavaConverters._ -case class TransferTable( - query: Query, - jobMetaTableName: String, - conf: Config, - rangeFromExpr: Option[String], - rangeToExpr: Option[String], - infoDateStart: LocalDate, - trackDays: Int, - trackDaysExplicitlySet: Boolean, - warnMaxExecutionTimeSeconds: Option[Int], - transformations: Seq[TransformExpression], - filters: Seq[String], - columns: Seq[String], - readOptions: Map[String, String], - writeOptions: Map[String, String], - sparkConfig: Map[String, String], - sourceOverrideConf: Option[Config], - sinkOverrideConf: Option[Config] - ) { - def getSourceTable: SourceTable = { - SourceTable(jobMetaTableName, query, conf, rangeFromExpr, rangeToExpr, warnMaxExecutionTimeSeconds, transformations, filters, columns, sourceOverrideConf) - } - - def getSinkTable: SinkTable = { - SinkTable(jobMetaTableName, Option(jobMetaTableName), conf, rangeFromExpr, rangeToExpr, warnMaxExecutionTimeSeconds, transformations, filters, columns, writeOptions, sinkOverrideConf) - } - - def getMetaTable: MetaTable = { - MetaTable(jobMetaTableName, "", DataFormat.Null(), "", "", partitionByInfoDate = false, "", HiveConfig.getNullConfig, None, None, hivePreferAddPartition = true, None, infoDateStart, trackDays, trackDaysExplicitlySet = trackDaysExplicitlySet, None, readOptions, writeOptions, sparkConfig) - } -} - -object TransferTable { +object TransferTableParser { private val log = LoggerFactory.getLogger(this.getClass) val JOB_METASTORE_OUTPUT_TABLE_KEY = "job.metastore.table" @@ -83,7 +52,7 @@ object TransferTable { val trackDays = ConfigUtils.getOptionInt(conf, TRACK_DAYS_KEY).getOrElse(defaultTrackDays) val trackDaysExplicitlySet = conf.hasPath(TRACK_DAYS_KEY) val columns = ConfigUtils.getOptListStrings(conf, COLUMNS_KEY) - val transformations = TransformExpression.fromConfig(conf, TRANSFORMATIONS_KEY, parentPath) + val transformations = TransformExpressionParser.fromConfig(conf, TRANSFORMATIONS_KEY, parentPath) val filters = ConfigUtils.getOptListStrings(conf, FILTERS_KEY) val readOptions = ConfigUtils.getExtraOptions(conf, "read.option") val writeOptions = ConfigUtils.getExtraOptions(conf, "output") @@ -147,4 +116,57 @@ object TransferTable { } } } + + private[core] def getSourceTable(transferTable: TransferTable): SourceTable = { + SourceTable(transferTable.jobMetaTableName, + transferTable.query, + transferTable.conf, + transferTable.rangeFromExpr, + transferTable.rangeToExpr, + transferTable.warnMaxExecutionTimeSeconds, + transferTable.transformations, + transferTable.filters, + transferTable.columns, + transferTable.sourceOverrideConf + ) + } + + private[core] def getSinkTable(transferTable: TransferTable): SinkTable = { + SinkTable(transferTable.jobMetaTableName, + Option(transferTable.jobMetaTableName), + transferTable.conf, + transferTable.rangeFromExpr, + transferTable.rangeToExpr, + transferTable.warnMaxExecutionTimeSeconds, + transferTable.transformations, + transferTable.filters, + transferTable.columns, + transferTable.writeOptions, + transferTable.sinkOverrideConf + ) + } + + private[core] def getMetaTable(transferTable: TransferTable): MetaTable = { + MetaTable(transferTable.jobMetaTableName, + "", + DataFormat.Null(), + "", + "", + partitionByInfoDate = false, + "", + HiveConfig.getNullConfig, + None, + None, + hivePreferAddPartition = true, + None, + transferTable.infoDateStart, + transferTable.trackDays, + trackDaysExplicitlySet = transferTable.trackDaysExplicitlySet, + None, + transferTable.readOptions, + transferTable.writeOptions, + transferTable.sparkConfig + ) + } + } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformExpression.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformExpressionParser.scala similarity index 89% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformExpression.scala rename to pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformExpressionParser.scala index 593c933f8..771315622 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformExpression.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformExpressionParser.scala @@ -17,17 +17,12 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config +import za.co.absa.pramen.api.jobdef.TransformExpression import za.co.absa.pramen.core.utils.ConfigUtils import scala.collection.JavaConverters._ -case class TransformExpression( - column: String, - expression: Option[String], - comment: Option[String] - ) - -object TransformExpression { +object TransformExpressionParser { val COLUMN_KEY = "col" val EXPRESSION_KEY = "expr" val COMMENT_KEY = "comment" @@ -61,4 +56,4 @@ object TransformExpression { Seq.empty[TransformExpression] } } -} \ No newline at end of file +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala index 18d710f79..0cd611e3f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} -import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.api.{Reason, Transformer} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.Metastore @@ -32,10 +32,13 @@ class TransformationJob(operationDef: OperationDef, bookkeeper: Bookkeeper, notificationTargets: Seq[JobNotificationTarget], outputTable: MetaTable, + transformerFactoryClass: String, transformer: Transformer) (implicit spark: SparkSession) extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) { + override val jobType: JobType = JobType.Transformation(transformerFactoryClass) + private val inputTables = operationDef.dependencies.flatMap(_.tables).distinct override val scheduleStrategy: ScheduleStrategy = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index 8703c7295..ffb3ab79e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -112,8 +112,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = { completedJobsChannel.send((job, - TaskResult(job.name, - MetaTable.getMetaTableDef(job.outputTable), + TaskResult(job.taskDef, RunStatus.Failed(ex), None, applicationId, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala index cfbb4faa5..30076c571 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala @@ -127,8 +127,7 @@ class OrchestratorImpl extends Orchestrator { val isFailure = hasNonPassiveNonOptionalDeps(job, missingTables) val taskResult = TaskResult( - job.name, - MetaTable.getMetaTableDef(job.outputTable), + job.taskDef, RunStatus.MissingDependencies(isFailure, missingTables), None, applicationId, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategy.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategy.scala index 40baf99e6..5a4816e41 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategy.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategy.scala @@ -16,10 +16,10 @@ package za.co.absa.pramen.core.runner.splitter +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.MetastoreDependency import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.pipeline.TaskPreDef -import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala index 97ac5c7eb..cbde80dd2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala @@ -16,12 +16,12 @@ package za.co.absa.pramen.core.runner.splitter +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.pipeline import za.co.absa.pramen.core.pipeline.TaskPreDef import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._ -import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala index 29a7a9cc2..494668026 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategySourcing.scala @@ -16,12 +16,12 @@ package za.co.absa.pramen.core.runner.splitter +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.pipeline import za.co.absa.pramen.core.pipeline.TaskPreDef import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._ -import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyTransformation.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyTransformation.scala index b67efe22a..b150979ed 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyTransformation.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyTransformation.scala @@ -16,12 +16,12 @@ package za.co.absa.pramen.core.runner.splitter +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.pipeline import za.co.absa.pramen.core.pipeline.TaskPreDef import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._ -import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala index 070c7d7b5..ba87b5143 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyUtils.scala @@ -16,12 +16,12 @@ package za.co.absa.pramen.core.runner.splitter +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.expr.DateExprEvaluator import za.co.absa.pramen.core.pipeline import za.co.absa.pramen.core.pipeline.TaskPreDef -import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate import scala.collection.mutable diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 48d8bedf1..3f736a9fa 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -172,8 +172,7 @@ abstract class TaskRunnerBase(conf: Config, val runInfo = RunInfo(task.infoDate, now, now) val isTransient = task.job.outputTable.format.isTransient val taskResult = TaskResult( - task.job.name, - MetaTable.getMetaTableDef(task.job.outputTable), + task.job.taskDef, runStatus, Option(runInfo), applicationId, @@ -195,8 +194,7 @@ abstract class TaskRunnerBase(conf: Config, val isTransient = task.job.outputTable.format.isTransient val isLazy = task.job.outputTable.format.isLazy val taskResult = TaskResult( - task.job.name, - MetaTable.getMetaTableDef(task.job.outputTable), + task.job.taskDef, runStatus, Option(runInfo), applicationId, @@ -241,23 +239,23 @@ abstract class TaskRunnerBase(conf: Config, Right(validationResult) case NoData(isFailure) => log.info(s"NO DATA available for the task: $outputTableName for date: ${task.infoDate}.") - Left(TaskResult(jobName, outputTable, RunStatus.NoData(isFailure), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.NoData(isFailure), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) case InsufficientData(actual, expected, oldRecordCount) => log.info(s"INSUFFICIENT DATA available for the task: $outputTableName for date: ${task.infoDate}. Expected = $expected, actual = $actual") - Left(TaskResult(jobName, outputTable, RunStatus.InsufficientData(actual, expected, oldRecordCount), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.InsufficientData(actual, expected, oldRecordCount), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) case AlreadyRan => if (runtimeConfig.isRerun) { log.info(s"RE-RUNNING the task: $outputTableName for date: ${task.infoDate}.") Right(validationResult) } else { log.info(s"SKIPPING already ran job: $outputTableName for date: ${task.infoDate}.") - Left(TaskResult(jobName, outputTable, RunStatus.NotRan, getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.NotRan, getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) } case Skip(msg) => log.info(s"SKIPPING job: $outputTableName for date: ${task.infoDate}. Reason: msg") - Left(TaskResult(jobName, outputTable, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil, options)) case FailedDependencies(isFailure, failures) => - Left(TaskResult(jobName, outputTable, RunStatus.FailedDependencies(isFailure, failures), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, Nil, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.FailedDependencies(isFailure, failures), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, Nil, Nil, options)) } if (validationResult.dependencyWarnings.nonEmpty) { log.warn(s"$WARNING Validation of the task: $outputTableName for date: ${task.infoDate} has " + @@ -265,7 +263,7 @@ abstract class TaskRunnerBase(conf: Config, } resultToReturn case Failure(ex) => - Left(TaskResult(jobName, outputTable, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, Nil, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, Nil, Nil, options)) } } @@ -304,20 +302,20 @@ abstract class TaskRunnerBase(conf: Config, Right(status.copy(warnings = reason.warnings)) case Reason.NotReady(msg) => log.info(s"NOT READY validation failure for the task: $outputTableName for date: ${task.infoDate}. Reason: $msg") - Left(TaskResult(jobName, outputTable, RunStatus.ValidationFailed(new ReasonException(Reason.NotReady(msg), msg)), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, Map.empty)) + Left(TaskResult(task.job.taskDef, RunStatus.ValidationFailed(new ReasonException(Reason.NotReady(msg), msg)), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, Map.empty)) case Reason.Skip(msg) => log.info(s"SKIP validation failure for the task: $outputTableName for date: ${task.infoDate}. Reason: $msg") if (bookkeeper.getLatestDataChunk(outputTableName, task.infoDate, task.infoDate).isEmpty) { val isTransient = task.job.outputTable.format.isTransient bookkeeper.setRecordCount(outputTableName, task.infoDate, task.infoDate, task.infoDate, status.inputRecordsCount.getOrElse(0L), 0, started.getEpochSecond, Instant.now().getEpochSecond, isTransient) } - Left(TaskResult(jobName, outputTable, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, options)) case Reason.SkipOnce(msg) => log.info(s"SKIP today validation failure for the task: $outputTableName for date: ${task.infoDate}. Reason: $msg") - Left(TaskResult(jobName, outputTable, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, options)) } case Failure(ex) => - Left(TaskResult(jobName, outputTable, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, options)) + Left(TaskResult(task.job.taskDef, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil, options)) } } } @@ -403,8 +401,7 @@ abstract class TaskRunnerBase(conf: Config, val warnings = validationResult.warnings ++ runResult.warnings ++ saveResult.warnings ++ hiveWarnings - TaskResult(task.job.name, - MetaTable.getMetaTableDef(task.job.outputTable), + TaskResult(task.job.taskDef, RunStatus.Succeeded(recordCountOldOpt, stats.recordCount, stats.recordCountAppended, @@ -435,8 +432,7 @@ abstract class TaskRunnerBase(conf: Config, case Success(result) => result case Failure(ex) => - TaskResult(task.job.name, - MetaTable.getMetaTableDef(task.job.outputTable), + TaskResult(task.job.taskDef, RunStatus.Failed(ex), getRunInfo(task.infoDate, started), applicationId, @@ -512,8 +508,7 @@ abstract class TaskRunnerBase(conf: Config, val target = notificationTarget.target val notification = status.TaskResult( - task.job.name, - MetaTable.getMetaTableDef(task.job.outputTable), + task.job.taskDef, result.runStatus, result.runInfo, result.applicationId, @@ -535,7 +530,7 @@ abstract class TaskRunnerBase(conf: Config, case Success(_) => None case Failure(ex) => - log.error(s"$EXCLAMATION Failed to send notifications to '${notificationTarget.name}' for task: ${result.outputTable.name} for '${task.infoDate}'.", ex) + log.error(s"$EXCLAMATION Failed to send notifications to '${notificationTarget.name}' for task: ${result.taskDef.outputTable.name} for '${task.infoDate}'.", ex) Option(NotificationFailure( task.job.outputTable.name, notificationTarget.name, @@ -596,23 +591,23 @@ abstract class TaskRunnerBase(conf: Config, result.runStatus match { case _: RunStatus.Succeeded => - log.info(s"$SUCCESS $taskStr '${result.jobName}'$infoDateMsg has SUCCEEDED.$elapsedTimeStr") + log.info(s"$SUCCESS $taskStr '${result.taskDef.name}'$infoDateMsg has SUCCEEDED.$elapsedTimeStr") case RunStatus.ValidationFailed(ex) => - log.error(s"$FAILURE $taskStr '${result.jobName}'$infoDateMsg has FAILED VALIDATION.$elapsedTimeStr", ex) + log.error(s"$FAILURE $taskStr '${result.taskDef.name}'$infoDateMsg has FAILED VALIDATION.$elapsedTimeStr", ex) case RunStatus.Failed(ex) => - log.error(s"$FAILURE $taskStr '${result.jobName}'$infoDateMsg has FAILED.$elapsedTimeStr", ex) + log.error(s"$FAILURE $taskStr '${result.taskDef.name}'$infoDateMsg has FAILED.$elapsedTimeStr", ex) case RunStatus.MissingDependencies(_, tables) => - log.error(s"$emoji $taskStr '${result.jobName}'$infoDateMsg has MISSING TABLES: ${tables.mkString(", ")}") + log.error(s"$emoji $taskStr '${result.taskDef.name}'$infoDateMsg has MISSING TABLES: ${tables.mkString(", ")}") case RunStatus.FailedDependencies(_, deps) => - log.error(s"$emoji $taskStr '${result.jobName}'$infoDateMsg has FAILED DEPENDENCIES: ${deps.map(_.renderText).mkString("; ")}") + log.error(s"$emoji $taskStr '${result.taskDef.name}'$infoDateMsg has FAILED DEPENDENCIES: ${deps.map(_.renderText).mkString("; ")}") case _: RunStatus.NoData => - log.warn(s"$emoji $taskStr '${result.jobName}'$infoDateMsg has NO DATA AT SOURCE.") + log.warn(s"$emoji $taskStr '${result.taskDef.name}'$infoDateMsg has NO DATA AT SOURCE.") case _: RunStatus.InsufficientData => - log.error(s"$FAILURE $taskStr '${result.jobName}'$infoDateMsg has INSUFFICIENT DATA AT SOURCE.") + log.error(s"$FAILURE $taskStr '${result.taskDef.name}'$infoDateMsg has INSUFFICIENT DATA AT SOURCE.") case RunStatus.Skipped(msg, _) => - log.warn(s"$WARNING $taskStr '${result.jobName}'$infoDateMsg is SKIPPED: $msg.") + log.warn(s"$WARNING $taskStr '${result.taskDef.name}'$infoDateMsg is SKIPPED: $msg.") case RunStatus.NotRan => - log.info(s"$taskStr '${result.jobName}'$infoDateMsg is SKIPPED.") + log.info(s"$taskStr '${result.taskDef.name}'$infoDateMsg is SKIPPED.") } } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/schedule/Schedule.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/schedule/ScheduleParser.scala similarity index 62% rename from pramen/core/src/main/scala/za/co/absa/pramen/core/schedule/Schedule.scala rename to pramen/core/src/main/scala/za/co/absa/pramen/core/schedule/ScheduleParser.scala index 702846fd9..4435d8dec 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/schedule/Schedule.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/schedule/ScheduleParser.scala @@ -17,57 +17,23 @@ package za.co.absa.pramen.core.schedule import com.typesafe.config.Config +import za.co.absa.pramen.api.jobdef.Schedule import java.time.{DayOfWeek, LocalDate} import scala.collection.JavaConverters._ import scala.util.Try -sealed trait Schedule { - def isEnabled(day: LocalDate): Boolean -} - -object Schedule { +object ScheduleParser { val SCHEDULE_TYPE_KEY = "schedule.type" val SCHEDULE_DAYS_OF_WEEK_KEY = "schedule.days.of.week" val SCHEDULE_DAYS_OF_MONTH_KEY = "schedule.days.of.month" - case object Incremental extends Schedule { - def isEnabled(day: LocalDate): Boolean = true - - override def toString: String = "incremental" - } - - case class EveryDay() extends Schedule { - def isEnabled(day: LocalDate): Boolean = true - - override def toString: String = "daily" - } - - case class Weekly(days: Seq[DayOfWeek]) extends Schedule { - def isEnabled(day: LocalDate): Boolean = days.contains(day.getDayOfWeek) - - override def toString: String = s"weekly (${days.mkString(", ")})" - } - - case class Monthly(days: Seq[Int]) extends Schedule { - val hasPositiveDays: Boolean = days.exists(_ > 0) - val hasNegativeDays: Boolean = days.exists(_ < 0) - - def isEnabled(day: LocalDate): Boolean = { - val isInPositives = hasPositiveDays && days.contains(day.getDayOfMonth) - val isInNegatives = hasNegativeDays && days.contains(-day.lengthOfMonth() + day.getDayOfMonth - 1) - isInPositives || isInNegatives - } - - override def toString: String = s"monthly (${days.mkString(", ")})" - } - def fromConfig(conf: Config): Schedule = { conf.getString(SCHEDULE_TYPE_KEY) match { - case "incremental" => Incremental - case "daily" => EveryDay() - case "weekly" => Weekly(getDaysOfWeek(conf)) - case "monthly" => Monthly(getDaysOfMonth(conf)) + case "incremental" => Schedule.Incremental + case "daily" => Schedule.EveryDay() + case "weekly" => Schedule.Weekly(getDaysOfWeek(conf)) + case "monthly" => Schedule.Monthly(getDaysOfMonth(conf)) case s => throw new IllegalArgumentException(s"Unknown schedule type: $s") } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ScheduleUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ScheduleUtils.scala index 7ab085d30..2c4b3e703 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ScheduleUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/ScheduleUtils.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.utils import org.slf4j.LoggerFactory -import za.co.absa.pramen.core.schedule.Schedule +import za.co.absa.pramen.api.jobdef.Schedule import java.time.{Instant, LocalDate, ZoneId, ZonedDateTime} import scala.collection.mutable.ListBuffer diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index 0b7bf2cac..b01b54c65 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -25,8 +25,8 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.api.FieldChange +import za.co.absa.pramen.api.jobdef.TransformExpression import za.co.absa.pramen.core.expr.DateExprEvaluator -import za.co.absa.pramen.core.pipeline.TransformExpression import java.io.ByteArrayOutputStream import java.time.format.DateTimeFormatter diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala index 1391011e1..e3ac4a5fe 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/OperationDefFactory.scala @@ -17,9 +17,9 @@ package za.co.absa.pramen.core import com.typesafe.config.{Config, ConfigFactory} +import za.co.absa.pramen.api.jobdef.{Schedule, TransformExpression} import za.co.absa.pramen.api.status.MetastoreDependency -import za.co.absa.pramen.core.pipeline.{OperationDef, OperationType, TransformExpression} -import za.co.absa.pramen.core.schedule.Schedule +import za.co.absa.pramen.core.pipeline.{OperationDef, OperationType} object OperationDefFactory { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskDefFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskDefFactory.scala new file mode 100644 index 000000000..d9fb5cdb9 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskDefFactory.scala @@ -0,0 +1,34 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core + +import com.typesafe.config.{Config, ConfigFactory} +import za.co.absa.pramen.api.jobdef.Schedule +import za.co.absa.pramen.api.status._ +import za.co.absa.pramen.api.{MetaTableDef, SchemaDifference} + +import java.time.{Instant, LocalDate} + +object TaskDefFactory { + def getDummyTaskNotification(name: String = "Dummy Job", + jobType: JobType = JobType.Transformation("dummy.class"), + outputTable: MetaTableDef = MetaTableDefFactory.getDummyMetaTableDef(name = "dummy_table"), + schedule: Schedule = Schedule.EveryDay(), + operationConf: Config = ConfigFactory.empty()): TaskDef = { + TaskDef(name, jobType, outputTable, schedule, operationConf) + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala index 1bc2fe436..8c8d87797 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala @@ -22,8 +22,7 @@ import za.co.absa.pramen.api.{MetaTableDef, SchemaDifference} import java.time.{Instant, LocalDate} object TaskNotificationFactory { - def getDummyTaskNotification(jobName: String = "Dummy Job", - outputTable: MetaTableDef = MetaTableDefFactory.getDummyMetaTableDef(name = "dummy_table"), + def getDummyTaskNotification(taskDef: TaskDef = TaskDefFactory.getDummyTaskNotification(), runInfo: Option[RunInfo] = Some(RunInfo( LocalDate.of(2022, 2, 18), Instant.ofEpochMilli(1613600000000L), @@ -37,8 +36,7 @@ object TaskNotificationFactory { dependencyWarnings: Seq[DependencyWarning] = Seq.empty, notificationTargetErrors: Seq[NotificationFailure] = Seq.empty, options: Map[String, String] = Map.empty[String, String]): TaskResult = { - TaskResult(jobName, - outputTable, + TaskResult(taskDef, status, runInfo, applicationId, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index 488bab536..20a88c039 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -21,6 +21,7 @@ import org.apache.spark.sql.functions.lit import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{AnalysisException, DataFrame} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{CachePolicy, DataFormat} import za.co.absa.pramen.core.OperationDefFactory @@ -32,7 +33,6 @@ import za.co.absa.pramen.core.metastore.peristence.TransientJobManager import za.co.absa.pramen.core.mocks.bookkeeper.SyncBookkeeperMock import za.co.absa.pramen.core.mocks.job.JobSpy import za.co.absa.pramen.core.mocks.utils.hive.QueryExecutorMock -import za.co.absa.pramen.core.schedule.Schedule import za.co.absa.pramen.core.utils.SparkUtils import za.co.absa.pramen.core.utils.hive.{HiveHelperSql, HiveQueryTemplates, QueryExecutorSpark} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SinkTableFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SinkTableFactory.scala index d3864b4bc..5d0a05a9f 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SinkTableFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SinkTableFactory.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.mocks import com.typesafe.config.{Config, ConfigFactory} -import za.co.absa.pramen.core.pipeline.{SinkTable, TransformExpression} +import za.co.absa.pramen.api.jobdef.{SinkTable, TransformExpression} object SinkTableFactory { def getDummySinkTable(metaTableName: String = "table1", diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SourceTableFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SourceTableFactory.scala index f76eee9fa..1cec46c86 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SourceTableFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/SourceTableFactory.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks import com.typesafe.config.{Config, ConfigFactory} import za.co.absa.pramen.api.Query -import za.co.absa.pramen.core.pipeline.{SourceTable, TransformExpression} +import za.co.absa.pramen.api.jobdef.{SourceTable, TransformExpression} object SourceTableFactory { def getDummySourceTable(metaTableName: String = "table1", diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala index 3b334963b..887c3b8be 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala @@ -18,13 +18,13 @@ package za.co.absa.pramen.core.mocks import za.co.absa.pramen.api.status._ import za.co.absa.pramen.api.{MetaTableDef, SchemaDifference} +import za.co.absa.pramen.core.TaskDefFactory import za.co.absa.pramen.core.metastore.model.MetaTable import java.time.{Instant, LocalDate} object TaskResultFactory { - def getDummyTaskResult(jobName: String = "DummyJob", - outputTable: MetaTableDef = MetaTable.getMetaTableDef(MetaTableFactory.getDummyMetaTable(name = "table_out")), + def getDummyTaskResult(taskDef: TaskDef = TaskDefFactory.getDummyTaskNotification(name = "DummyJob", outputTable = MetaTable.getMetaTableDef(MetaTableFactory.getDummyMetaTable(name = "table_out"))), runStatus: RunStatus = RunStatus.Succeeded(Some(100), Some(200), None, Some(1000), TaskRunReason.New, Nil, Nil, Nil, Nil), runInfo: Option[RunInfo] = Some(RunInfo(LocalDate.of(2022, 2, 18), Instant.ofEpochSecond(1234), Instant.ofEpochSecond(5678))), applicationId: String = "app_123", @@ -34,8 +34,7 @@ object TaskResultFactory { dependencyWarnings: Seq[DependencyWarning] = Nil, notificationTargetErrors: Seq[NotificationFailure] = Nil, options: Map[String, String] = Map.empty): TaskResult = { - TaskResult(jobName, - outputTable, + TaskResult(taskDef, runStatus, runInfo, applicationId, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TransferTableFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TransferTableFactory.scala index 3f5bd4b23..fd525b032 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TransferTableFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TransferTableFactory.scala @@ -18,7 +18,7 @@ package za.co.absa.pramen.core.mocks import com.typesafe.config.{Config, ConfigFactory} import za.co.absa.pramen.api.Query -import za.co.absa.pramen.core.pipeline.{TransferTable, TransformExpression} +import za.co.absa.pramen.api.jobdef.{TransferTable, TransformExpression} import java.time.LocalDate diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobBaseDummy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobBaseDummy.scala index a764c6aa1..e4a0de3c9 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobBaseDummy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobBaseDummy.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.mocks.job import com.typesafe.config.Config import org.apache.spark.sql.DataFrame import za.co.absa.pramen.api.Reason -import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} +import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable @@ -34,6 +34,7 @@ class JobBaseDummy(operationDef: OperationDef, bookkeeper: Bookkeeper, outputTableDef: MetaTable) extends JobBase(operationDef, metastore, bookkeeper, jobNotificationTargets, outputTableDef) { + override def jobType: JobType = JobType.Transformation("dummy_class") override def preRunCheckJob(infoDate: LocalDate, runReason: TaskRunReason, jobConfig: Config, dependencyWarnings: Seq[DependencyWarning]): JobPreRunResult = { JobPreRunResult(null, None, dependencyWarnings, Seq.empty[String]) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala index 192800ec6..359c77d31 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala @@ -19,9 +19,9 @@ package za.co.absa.pramen.core.mocks.job import com.typesafe.config.Config import org.apache.spark.sql.DataFrame import org.apache.spark.sql.types.StructType -import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.api.status.{TaskDef, TaskRunReason} import za.co.absa.pramen.api.{DataFormat, Reason} -import za.co.absa.pramen.core.OperationDefFactory +import za.co.absa.pramen.core.{OperationDefFactory, TaskDefFactory} import za.co.absa.pramen.core.metastore.MetaTableStats import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.mocks.MetaTableFactory.getDummyMetaTable @@ -30,7 +30,7 @@ import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrateg import java.time.{Instant, LocalDate} -class JobSpy(jobName: String = "DummyJob", +class JobSpy(jobName: String = "Dummy Job", outputTableIn: String = "table_out", outputTableFormat: DataFormat = DataFormat.Parquet("/tmp/dummy", None), hiveTable: Option[String] = None, @@ -53,6 +53,8 @@ class JobSpy(jobName: String = "DummyJob", var saveDf: DataFrame = _ var createHiveTableCount = 0 + override def taskDef: TaskDef = TaskDefFactory.getDummyTaskNotification(outputTable = MetaTable.getMetaTableDef(outputTable)) + override val name: String = jobName override val outputTable: MetaTable = getDummyMetaTable(outputTableIn, format = outputTableFormat, hiveTable = hiveTable) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala index 2db871bbc..5913ef496 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala @@ -34,7 +34,7 @@ class NotificationTargetMock(conf: Config) extends NotificationTarget { System.setProperty("pramen.test.notification.target.failure", "true") throw new RuntimeException("Notification target test exception") } - System.setProperty("pramen.test.notification.table", notification.outputTable.name) + System.setProperty("pramen.test.notification.table", notification.taskDef.outputTable.name) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala index d84172d3c..7532dc095 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala @@ -76,8 +76,7 @@ class ConcurrentJobRunnerSpy(includeFails: Boolean = false, } val taskResult = TaskResult( - job.name, - MetaTable.getMetaTableDef(job.outputTable), + job.taskDef, status, Some(RunInfo(infoDate, started, finished)), "app_123", diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala index c9f1a556b..806aa3bc7 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala @@ -19,6 +19,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.apache.spark.sql.DataFrame import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.{SourceTable, TransformExpression} import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{Query, Reason} import za.co.absa.pramen.core.OperationDefFactory diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala index 7cc1c1bdc..962de31dc 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/OperationDefSuite.scala @@ -18,10 +18,10 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.fixtures.TempDirFixture import za.co.absa.pramen.core.pipeline.OperationType.{Ingestion, Transformation} -import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PipelineDefSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PipelineDefSuite.scala index 68b6fa4dc..2f4849fd2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PipelineDefSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PipelineDefSuite.scala @@ -18,11 +18,11 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.core.app.config.InfoDateConfig import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.TempDirFixture import za.co.absa.pramen.core.pipeline.OperationType.{Ingestion, Transformation} -import za.co.absa.pramen.core.schedule.Schedule import za.co.absa.pramen.core.sink.{SinkManager, SparkSink} import za.co.absa.pramen.core.source.{JdbcSource, SourceManager} import za.co.absa.pramen.core.utils.ResourceUtils diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala index 118d3d40d..0dc4dbcd5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkJobSuite.scala @@ -20,6 +20,7 @@ import com.typesafe.config.{ConfigFactory, ConfigValueFactory} import org.apache.spark.sql.functions.col import org.apache.spark.sql.{AnalysisException, DataFrame} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.{SinkTable, TransformExpression} import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.api.{Reason, Sink} import za.co.absa.pramen.core.OperationDefFactory @@ -261,7 +262,7 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix val outputTable = MetaTableFactory.getDummyMetaTable(name = "table1->mysink") - (new SinkJob(operation, metastore, bk, Nil, outputTable, sink, sinkTable), bk) + (new SinkJob(operation, metastore, bk, Nil, outputTable, "sink_name", sink, sinkTable), bk) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkTableSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkTableParserSuite.scala similarity index 94% rename from pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkTableSuite.scala rename to pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkTableParserSuite.scala index d1d77870c..0703f206b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkTableSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SinkTableParserSuite.scala @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec -class SinkTableSuite extends AnyWordSpec { +class SinkTableParserSuite extends AnyWordSpec { "fromConfig" should { "create a list of source tables" in { val conf = ConfigFactory.parseString( @@ -52,7 +52,7 @@ class SinkTableSuite extends AnyWordSpec { |] |""".stripMargin) - val sinkTables = SinkTable.fromConfig(conf, "sink.tables") + val sinkTables = SinkTableParser.fromConfig(conf, "sink.tables") assert(sinkTables.size == 4) @@ -116,7 +116,7 @@ class SinkTableSuite extends AnyWordSpec { |""".stripMargin) val ex = intercept[IllegalArgumentException] { - SinkTable.fromConfig(conf, "sink.tables") + SinkTableParser.fromConfig(conf, "sink.tables") } assert(ex.getMessage.contains("Duplicate sink table definitions for the sink job: table11")) @@ -136,7 +136,7 @@ class SinkTableSuite extends AnyWordSpec { |""".stripMargin) val ex = intercept[IllegalArgumentException] { - SinkTable.fromConfig(conf, "sink.tables") + SinkTableParser.fromConfig(conf, "sink.tables") } assert(ex.getMessage.contains("'col' not set for the transformation")) @@ -156,7 +156,7 @@ class SinkTableSuite extends AnyWordSpec { |""".stripMargin) val ex = intercept[IllegalArgumentException] { - SinkTable.fromConfig(conf, "sink.tables") + SinkTableParser.fromConfig(conf, "sink.tables") } assert(ex.getMessage.contains("Either 'expr' or 'comment' should be defined for for the transformation of '2.2' in sink.tables[0].transformations[0]")) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala similarity index 94% rename from pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableSuite.scala rename to pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala index f1b24f2fa..3e55b6faf 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala @@ -20,7 +20,7 @@ import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.Query -class SourceTableSuite extends AnyWordSpec { +class SourceTableParserSuite extends AnyWordSpec { "fromConfig" should { "create a list of source tables" in { val conf = ConfigFactory.parseString( @@ -54,7 +54,7 @@ class SourceTableSuite extends AnyWordSpec { |] |""".stripMargin) - val sourceTables = SourceTable.fromConfig(conf, "source.tables") + val sourceTables = SourceTableParser.fromConfig(conf, "source.tables") assert(sourceTables.size == 4) @@ -116,7 +116,7 @@ class SourceTableSuite extends AnyWordSpec { |""".stripMargin) val ex = intercept[IllegalArgumentException] { - SourceTable.fromConfig(conf, "source.tables") + SourceTableParser.fromConfig(conf, "source.tables") } assert(ex.getMessage.contains("No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table' at source.tables[0].")) @@ -143,7 +143,7 @@ class SourceTableSuite extends AnyWordSpec { |""".stripMargin) val ex = intercept[IllegalArgumentException] { - SourceTable.fromConfig(conf, "source.tables") + SourceTableParser.fromConfig(conf, "source.tables") } assert(ex.getMessage.contains("Duplicate source table definitions for the sourcing job: table11")) @@ -163,7 +163,7 @@ class SourceTableSuite extends AnyWordSpec { |""".stripMargin) val ex = intercept[IllegalArgumentException] { - SourceTable.fromConfig(conf, "source.tables") + SourceTableParser.fromConfig(conf, "source.tables") } assert(ex.getMessage.contains("'col' not set for the transformation")) @@ -183,7 +183,7 @@ class SourceTableSuite extends AnyWordSpec { |""".stripMargin) val ex = intercept[IllegalArgumentException] { - SourceTable.fromConfig(conf, "source.tables") + SourceTableParser.fromConfig(conf, "source.tables") } assert(ex.getMessage.contains("Either 'expr' or 'comment' should be defined for for the transformation")) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala index 5217ebef2..a8e1a71f6 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferJobSuite.scala @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory import org.apache.spark.sql.{AnalysisException, DataFrame} import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.Reason +import za.co.absa.pramen.api.jobdef.{TransferTable, TransformExpression} import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.core.OperationDefFactory import za.co.absa.pramen.core.base.SparkTestBase @@ -270,9 +271,9 @@ class TransferJobSuite extends AnyWordSpec with SparkTestBase with TextCompariso val sink = new SinkSpy() - val outputTable = transferTable.getMetaTable + val outputTable = TransferTableParser.getMetaTable(transferTable) - (new TransferJob(operation, metastore, bk, Nil, "testSource", source, transferTable, outputTable, sink, " ", tempDirectory, disableCountQuery), bk) + (new TransferJob(operation, metastore, bk, Nil, "testSource", source, transferTable, outputTable, "sink_name", sink, " ", tempDirectory, disableCountQuery), bk) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferTableSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferTableSuite.scala index abffbc552..f2b854348 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferTableSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransferTableSuite.scala @@ -76,7 +76,7 @@ class TransferTableSuite extends AnyWordSpec { .withFallback(ConfigFactory.load()) val infoDateConfig = InfoDateConfig.fromConfig(conf) - val tables = TransferTable.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") + val tables = TransferTableParser.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") assert(tables.size == 4) @@ -172,7 +172,7 @@ class TransferTableSuite extends AnyWordSpec { val infoDateConfig = InfoDateConfig.fromConfig(conf) val ex = intercept[IllegalArgumentException] { - TransferTable.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") + TransferTableParser.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") } assert(ex.getMessage.contains("Duplicate table definitions for the transfer job: db1.table1->mysink")) @@ -191,7 +191,7 @@ class TransferTableSuite extends AnyWordSpec { val infoDateConfig = InfoDateConfig.fromConfig(conf) val ex = intercept[IllegalArgumentException] { - TransferTable.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") + TransferTableParser.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") } assert(ex.getMessage.contains("Cannot determine metastore table name for 'Sql(SELECT * FROM users) -> mysink")) @@ -235,9 +235,9 @@ class TransferTableSuite extends AnyWordSpec { "getSourceTable()" should { "correctly create a source table" in { val infoDateConfig = InfoDateConfig.fromConfig(conf) - val tables = TransferTable.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") + val tables = TransferTableParser.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") - val sourceTable = tables.head.getSourceTable + val sourceTable = TransferTableParser.getSourceTable(tables.head) assert(tables.head.sparkConfig("key1") == "value1") assert(sourceTable.metaTableName == "table1->mysink") @@ -256,9 +256,9 @@ class TransferTableSuite extends AnyWordSpec { "getSinkTable()" should { "correctly create a sink table" in { val infoDateConfig = InfoDateConfig.fromConfig(conf) - val tables = TransferTable.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") + val tables = TransferTableParser.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") - val sinkTable = tables.head.getSinkTable + val sinkTable = TransferTableParser.getSinkTable(tables.head) assert(sinkTable.metaTableName == "table1->mysink") assert(sinkTable.outputTableName.contains("table1->mysink")) @@ -275,9 +275,9 @@ class TransferTableSuite extends AnyWordSpec { "getMetaTable()" should { "correctly create a metastore table" in { val infoDateConfig = InfoDateConfig.fromConfig(conf) - val tables = TransferTable.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") + val tables = TransferTableParser.fromConfig(conf, infoDateConfig, "transfer.tables", "mysink") - val metaTable = tables.head.getMetaTable + val metaTable = TransferTableParser.getMetaTable(tables.head) assert(metaTable.name == "table1->mysink") assert(metaTable.format.isInstanceOf[DataFormat.Null]) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala index 5cf43cf21..3b0531a82 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/TransformationJobSuite.scala @@ -137,7 +137,7 @@ class TransformationJobSuite extends AnyWordSpec with SparkTestBase { val outputTable = MetaTableFactory.getDummyMetaTable(name = "table1") - (new TransformationJob(operation, metastore, bk, Nil, outputTable, transformer), metastore) + (new TransformationJob(operation, metastore, bk, Nil, outputTable, "dummy_class", transformer), metastore) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala index 576b09bbe..892369c26 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala @@ -19,7 +19,8 @@ package za.co.absa.pramen.core.schedule import com.typesafe.config.ConfigException.WrongType import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.pramen.core.schedule.Schedule._ +import za.co.absa.pramen.api.jobdef.Schedule +import za.co.absa.pramen.core.schedule.ScheduleParser.{SCHEDULE_DAYS_OF_MONTH_KEY, SCHEDULE_DAYS_OF_WEEK_KEY, SCHEDULE_TYPE_KEY} import java.time.{DayOfWeek, LocalDate} @@ -29,7 +30,7 @@ class ScheduleSuite extends AnyWordSpec { "Deserialize incremental jobs" when { "a normal incremental job is provided" in { val config = ConfigFactory.parseString(s"$SCHEDULE_TYPE_KEY = incremental") - val schedule = fromConfig(config) + val schedule = ScheduleParser.fromConfig(config) assert(schedule == Schedule.Incremental) } @@ -38,7 +39,7 @@ class ScheduleSuite extends AnyWordSpec { "Deserialize daily jobs" when { "a normal daily job is provided" in { val config = ConfigFactory.parseString(s"$SCHEDULE_TYPE_KEY = daily") - val schedule = fromConfig(config) + val schedule = ScheduleParser.fromConfig(config) assert(schedule == Schedule.EveryDay()) } @@ -49,7 +50,7 @@ class ScheduleSuite extends AnyWordSpec { val config = ConfigFactory.parseString( s"""$SCHEDULE_TYPE_KEY = weekly |$SCHEDULE_DAYS_OF_WEEK_KEY = [ 7 ]""".stripMargin) - val schedule = fromConfig(config) + val schedule = ScheduleParser.fromConfig(config) assert(schedule == Schedule.Weekly(DayOfWeek.SUNDAY :: Nil)) } @@ -57,7 +58,7 @@ class ScheduleSuite extends AnyWordSpec { val config = ConfigFactory.parseString( s"""$SCHEDULE_TYPE_KEY = weekly |$SCHEDULE_DAYS_OF_WEEK_KEY = [ 1, 7 ]""".stripMargin) - val schedule = fromConfig(config) + val schedule = ScheduleParser.fromConfig(config) assert(schedule == Schedule.Weekly(DayOfWeek.MONDAY :: DayOfWeek.SUNDAY :: Nil)) } @@ -67,7 +68,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = weekly |$SCHEDULE_DAYS_OF_WEEK_KEY = [ 1, 7, 8 ]""".stripMargin) intercept[java.time.DateTimeException] { - fromConfig(config) + ScheduleParser.fromConfig(config) } } @@ -76,7 +77,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = weekly |$SCHEDULE_DAYS_OF_WEEK_KEY = [ 0 ]""".stripMargin) intercept[java.time.DateTimeException] { - fromConfig(config) + ScheduleParser.fromConfig(config) } } @@ -85,7 +86,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = weekly |$SCHEDULE_DAYS_OF_WEEK_KEY = [ ]""".stripMargin) val ex = intercept[IllegalArgumentException] { - fromConfig(config) + ScheduleParser.fromConfig(config) } assert(ex.getMessage.contains("No days of week are provided")) @@ -96,7 +97,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = weekly |$SCHEDULE_DAYS_OF_WEEK_KEY = aaa""".stripMargin) val ex = intercept[WrongType] { - fromConfig(config) + ScheduleParser.fromConfig(config) } assert(ex.getMessage.contains("has type STRING rather than LIST")) @@ -108,17 +109,17 @@ class ScheduleSuite extends AnyWordSpec { val config = ConfigFactory.parseString( s"""$SCHEDULE_TYPE_KEY = monthly |$SCHEDULE_DAYS_OF_MONTH_KEY = [ 1 ]""".stripMargin) - val schedule = fromConfig(config) + val schedule = ScheduleParser.fromConfig(config) - assert(schedule == Monthly(1 :: Nil)) + assert(schedule == Schedule.Monthly(1 :: Nil)) } "a multiple days weekly job is provided" in { val config = ConfigFactory.parseString( s"""$SCHEDULE_TYPE_KEY = monthly |$SCHEDULE_DAYS_OF_MONTH_KEY = [ 1, 2, 31 ]""".stripMargin) - val schedule = fromConfig(config) + val schedule = ScheduleParser.fromConfig(config) - assert(schedule == Monthly(1 :: 2 :: 31 :: Nil)) + assert(schedule == Schedule.Monthly(1 :: 2 :: 31 :: Nil)) } "throw an exception when a wrong month day is provided 1" in { @@ -126,7 +127,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = monthly |$SCHEDULE_DAYS_OF_MONTH_KEY = [ 1, 7, 32 ]""".stripMargin) val ex = intercept[IllegalArgumentException] { - fromConfig(config) + ScheduleParser.fromConfig(config) } assert(ex.getMessage.contains("Invalid day of month")) @@ -137,7 +138,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = monthly |$SCHEDULE_DAYS_OF_MONTH_KEY = [ 0 ]""".stripMargin) val ex = intercept[IllegalArgumentException] { - fromConfig(config) + ScheduleParser.fromConfig(config) } assert(ex.getMessage.contains("Invalid day of month")) @@ -148,7 +149,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = monthly |$SCHEDULE_DAYS_OF_MONTH_KEY = [ ]""".stripMargin) val ex = intercept[IllegalArgumentException] { - fromConfig(config) + ScheduleParser.fromConfig(config) } assert(ex.getMessage.contains("No days of month are provided")) @@ -159,7 +160,7 @@ class ScheduleSuite extends AnyWordSpec { s"""$SCHEDULE_TYPE_KEY = monthly |$SCHEDULE_DAYS_OF_MONTH_KEY = aaa""".stripMargin) val ex = intercept[WrongType] { - fromConfig(config) + ScheduleParser.fromConfig(config) } assert(ex.getMessage.contains("has type STRING rather than LIST")) @@ -171,7 +172,7 @@ class ScheduleSuite extends AnyWordSpec { val config = ConfigFactory.parseString(s"$SCHEDULE_TYPE_KEY = dummy") val ex = intercept[IllegalArgumentException] { - fromConfig(config) + ScheduleParser.fromConfig(config) } assert(ex.getMessage == "Unknown schedule type: dummy") @@ -188,7 +189,7 @@ class ScheduleSuite extends AnyWordSpec { val sunday = LocalDate.of(2020, 8, 16) "Always return true for everyday jobs" in { - val schedule = EveryDay() + val schedule = Schedule.EveryDay() assert(schedule.isEnabled(monday)) assert(schedule.isEnabled(tuesday)) @@ -200,7 +201,7 @@ class ScheduleSuite extends AnyWordSpec { } "Return true for specific week days for weekly jobs" in { - val schedule = Weekly(DayOfWeek.MONDAY :: DayOfWeek.SUNDAY :: Nil) + val schedule = Schedule.Weekly(DayOfWeek.MONDAY :: DayOfWeek.SUNDAY :: Nil) assert(schedule.isEnabled(monday)) assert(!schedule.isEnabled(tuesday)) @@ -212,7 +213,7 @@ class ScheduleSuite extends AnyWordSpec { } "Return true for specific days of month for monthly jobs" in { - val schedule = Monthly(1 :: 10 :: 15 :: Nil) + val schedule = Schedule.Monthly(1 :: 10 :: 15 :: Nil) assert(schedule.isEnabled(LocalDate.of(2020, 8, 1))) assert(schedule.isEnabled(LocalDate.of(2020, 9, 1))) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala index 0e53a3af7..e04779c2d 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala @@ -48,8 +48,7 @@ class TaskCompletedSuite extends AnyWordSpec { val runReason = TaskRunReason.Update val task = Task(job, infoDate, runReason) val taskResult = TaskResult( - job.name, - MetaTable.getMetaTableDef(job.outputTable), + job.taskDef, RunStatus.Succeeded(Some(1000), Some(2000), None, Some(3000), runReason, Nil, Nil, Nil, Nil), Some(RunInfo(infoDate, now.minusSeconds(10), now)), "app_123", @@ -87,8 +86,7 @@ class TaskCompletedSuite extends AnyWordSpec { val runReason = TaskRunReason.Update val task = Task(job, infoDate, runReason) val taskResult = TaskResult( - job.name, - MetaTable.getMetaTableDef(job.outputTable), + job.taskDef, RunStatus.Failed(new IllegalStateException("Dummy Exception")), None, "app_123", diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala index a736f079b..c3a5ffaf1 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala @@ -53,8 +53,8 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { assert(results.size == 1) assert(results.head.runStatus.isInstanceOf[Succeeded]) - assert(results.head.jobName == job.name) - assert(results.head.outputTable.name == job.outputTable.name) + assert(results.head.taskDef.name == job.name) + assert(results.head.taskDef.outputTable.name == job.outputTable.name) assert(results.head.runInfo.nonEmpty) assert(results.head.runInfo.get.infoDate == runDate) assert(results.head.schemaChanges.isEmpty) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala index 808f07f29..f5e6506f5 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala @@ -17,10 +17,10 @@ package za.co.absa.pramen.core.tests.runner.splitter import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.TaskRunReason import za.co.absa.pramen.core.bookkeeper.BookkeeperNull import za.co.absa.pramen.core.runner.splitter.{RunMode, ScheduleParams, ScheduleStrategyIncremental} -import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala index 191d47dbe..f07a54b67 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategySuite.scala @@ -19,12 +19,12 @@ package za.co.absa.pramen.core.tests.runner.splitter import org.mockito.ArgumentMatchers import org.mockito.Mockito.{mock, when} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.mocks.DataChunkFactory.getDummyDataChunk import za.co.absa.pramen.core.pipeline import za.co.absa.pramen.core.runner.splitter.{RunMode, ScheduleParams, ScheduleStrategySourcing, ScheduleStrategyTransformation} -import za.co.absa.pramen.core.schedule.Schedule import java.time.format.DateTimeFormatter import java.time.{DayOfWeek, LocalDate} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyUtilsSuite.scala index 68a75cd36..de75fa8c3 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyUtilsSuite.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.tests.runner.splitter import org.mockito.Mockito.{mock, when} import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.expr.exceptions.SyntaxErrorException @@ -25,7 +26,6 @@ import za.co.absa.pramen.core.model.DataChunk import za.co.absa.pramen.core.pipeline import za.co.absa.pramen.core.runner.splitter.RunMode import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._ -import za.co.absa.pramen.core.schedule.Schedule import java.time.{DayOfWeek, LocalDate} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala index ad897f1ca..4ee500cbc 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory import org.apache.spark.sql.DataFrame import org.apache.spark.sql.functions.lit import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.jobdef.TransformExpression import za.co.absa.pramen.api.status.RunStatus.{Failed, NotRan, Skipped, Succeeded} import za.co.absa.pramen.api.status._ import za.co.absa.pramen.api.{DataFormat, Reason} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/schedule/ScheduleSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/schedule/ScheduleSuite.scala index 4ac9d8bea..420d12a56 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/schedule/ScheduleSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/schedule/ScheduleSuite.scala @@ -18,7 +18,8 @@ package za.co.absa.pramen.core.tests.schedule import com.typesafe.config.{Config, ConfigFactory} import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.pramen.core.schedule.Schedule +import za.co.absa.pramen.api.jobdef.Schedule +import za.co.absa.pramen.core.schedule.ScheduleParser import java.time.{DayOfWeek, LocalDate} @@ -117,14 +118,14 @@ class ScheduleSuite extends AnyWordSpec { "fromConfig()" should { "work for daily schedules" in { val conf = getConfigString("daily") - val schedule = Schedule.fromConfig(conf) + val schedule = ScheduleParser.fromConfig(conf) assert(schedule == Schedule.EveryDay()) } "work for weekly schedules" in { val conf = getConfigString("weekly", "6, 7") - val schedule = Schedule.fromConfig(conf) + val schedule = ScheduleParser.fromConfig(conf) assert(schedule == Schedule.Weekly(Seq(DayOfWeek.SATURDAY, DayOfWeek.SUNDAY))) } @@ -132,21 +133,21 @@ class ScheduleSuite extends AnyWordSpec { "work for monthly schedules" when { "days of month are numeric" in { val conf = getConfigString("monthly", "2, -1") - val schedule = Schedule.fromConfig(conf) + val schedule = ScheduleParser.fromConfig(conf) assert(schedule == Schedule.Monthly(Seq(2, -1))) } "days of month contains 'last'" in { val conf = getConfigString("monthly", """5, "last"""") - val schedule = Schedule.fromConfig(conf) + val schedule = ScheduleParser.fromConfig(conf) assert(schedule == Schedule.Monthly(Seq(5, -1))) } "days of month contains 'L'" in { val conf = getConfigString("monthly", "L") - val schedule = Schedule.fromConfig(conf) + val schedule = ScheduleParser.fromConfig(conf) assert(schedule == Schedule.Monthly(Seq(-1))) } @@ -155,7 +156,7 @@ class ScheduleSuite extends AnyWordSpec { val conf = getConfigString("monthly", "") assertThrows[IllegalArgumentException] { - Schedule.fromConfig(conf) + ScheduleParser.fromConfig(conf) } } @@ -163,7 +164,7 @@ class ScheduleSuite extends AnyWordSpec { val conf = getConfigString("monthly", "first") assertThrows[IllegalArgumentException] { - Schedule.fromConfig(conf) + ScheduleParser.fromConfig(conf) } } @@ -171,7 +172,7 @@ class ScheduleSuite extends AnyWordSpec { val conf = getConfigString("monthly", "32") assertThrows[IllegalArgumentException] { - Schedule.fromConfig(conf) + ScheduleParser.fromConfig(conf) } } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/ScheduleUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/ScheduleUtilsSuite.scala index 4246248c2..0cf9b7a8e 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/ScheduleUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/ScheduleUtilsSuite.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.tests.utils import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.pramen.core.schedule.Schedule +import za.co.absa.pramen.api.jobdef.Schedule import za.co.absa.pramen.core.utils.ScheduleUtils import java.time._ @@ -25,7 +25,7 @@ import java.time._ class ScheduleUtilsSuite extends AnyWordSpec { // Various schedules - private val everyday = Schedule. EveryDay() + private val everyday = Schedule.EveryDay() private val onSundays = Schedule.Weekly(DayOfWeek.SUNDAY :: Nil) private val onSundaysAndWednesdays = Schedule.Weekly(DayOfWeek.WEDNESDAY :: DayOfWeek.SUNDAY :: Nil) private val onFirstDayOfMonth = Schedule.Monthly(1 :: Nil) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index 4e40b0506..979150966 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -22,10 +22,10 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{DataFrame, Row} import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.FieldChange._ +import za.co.absa.pramen.api.jobdef.TransformExpression import za.co.absa.pramen.core.NestedDataFrameFactory import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} -import za.co.absa.pramen.core.pipeline.TransformExpression import za.co.absa.pramen.core.samples.SampleCaseClass2 import za.co.absa.pramen.core.utils.SparkUtils import za.co.absa.pramen.core.utils.SparkUtils._ diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala index ddef5e1a9..fb719bc9d 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsNotificationTarget.scala @@ -44,7 +44,7 @@ class EcsNotificationTarget(conf: Config) extends NotificationTarget { } val (ecsApiUrl, ecsApiKey, trustAllSslCerts) = EcsNotificationTarget.getEcsDetails(conf) - val tableDef = notification.outputTable + val tableDef = notification.taskDef.outputTable val httpClient = getHttpClient(trustAllSslCerts) try { diff --git a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala index 9d4d46fd8..7143cc7b8 100644 --- a/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala +++ b/pramen/extras/src/main/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTarget.scala @@ -55,17 +55,17 @@ class EcsPipelineNotificationTarget(conf: Config) extends PipelineNotificationTa tasksCompleted.foreach { task => task.runInfo match { case Some(runInfo) if task.runStatus.isInstanceOf[RunStatus.Succeeded] => - if (!task.outputTable.format.isTransient && - !task.outputTable.format.isInstanceOf[DataFormat.Null] && - !task.outputTable.format.isInstanceOf[DataFormat.Raw]) { - EcsNotificationTarget.cleanUpS3VersionsForTable(task.outputTable, runInfo.infoDate, ecsApiUrl, ecsApiKey, httpClient) + if (!task.taskDef.outputTable.format.isTransient && + !task.taskDef.outputTable.format.isInstanceOf[DataFormat.Null] && + !task.taskDef.outputTable.format.isInstanceOf[DataFormat.Raw]) { + EcsNotificationTarget.cleanUpS3VersionsForTable(task.taskDef.outputTable, runInfo.infoDate, ecsApiUrl, ecsApiKey, httpClient) } else { - log.info(s"The task outputting to '${task.outputTable.name}' for '${runInfo.infoDate}' outputs to ${task.outputTable.format.name} format - skipping ECS cleanup...") + log.info(s"The task outputting to '${task.taskDef.outputTable.name}' for '${runInfo.infoDate}' outputs to ${task.taskDef.outputTable.format.name} format - skipping ECS cleanup...") } case Some(runInfo) => - log.info(s"The task outputting to '${task.outputTable.name}' for '${runInfo.infoDate}' status is not a success - skipping ECS cleanup...") + log.info(s"The task outputting to '${task.taskDef.outputTable.name}' for '${runInfo.infoDate}' status is not a success - skipping ECS cleanup...") case None => - log.info(s"The task outputting to '${task.outputTable.name}' status is not a success - skipping ECS cleanup...") + log.info(s"The task outputting to '${task.taskDef.outputTable.name}' status is not a success - skipping ECS cleanup...") } } } finally { diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/MetaTableDefFactory.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/MetaTableDefFactory.scala new file mode 100644 index 000000000..001ac4271 --- /dev/null +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/MetaTableDefFactory.scala @@ -0,0 +1,53 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.extras + +import za.co.absa.pramen.api.{DataFormat, MetaTableDef} + +import java.time.LocalDate + +object MetaTableDefFactory { + + def getDummyMetaTableDef( + name: String = "table", + description: String = "", + format: DataFormat = DataFormat.Null(), + infoDateColumn: String = "info_date", + infoDateFormat: String = "yyyy-MM-dd", + partitionByInfoDate: Boolean = true, + batchIdColumn: String = "pramen_batchid", + hiveTable: Option[String] = None, + hivePath: Option[String] = None, + infoDateStart: LocalDate = LocalDate.of(2022, 1, 1), + readOptions: Map[String, String] = Map.empty[String, String], + writeOptions: Map[String, String] = Map.empty[String, String] + ): MetaTableDef = { + MetaTableDef( + name, + description, + format, + infoDateColumn, + infoDateFormat, + partitionByInfoDate, + batchIdColumn, + hiveTable, + hivePath, + infoDateStart, + readOptions, + writeOptions) + } +} diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/TaskDefFactory.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/TaskDefFactory.scala new file mode 100644 index 000000000..b2a98ebe8 --- /dev/null +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/TaskDefFactory.scala @@ -0,0 +1,32 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.extras + +import com.typesafe.config.{Config, ConfigFactory} +import za.co.absa.pramen.api.MetaTableDef +import za.co.absa.pramen.api.jobdef.Schedule +import za.co.absa.pramen.api.status._ + +object TaskDefFactory { + def getDummyTaskNotification(name: String = "Dummy Job", + jobType: JobType = JobType.Transformation("dummy.class"), + outputTable: MetaTableDef = MetaTableDefFactory.getDummyMetaTableDef(name = "dummy_table"), + schedule: Schedule = Schedule.EveryDay(), + operationConf: Config = ConfigFactory.empty()): TaskDef = { + TaskDef(name, jobType, outputTable, schedule, operationConf) + } +} diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala index 2f6cb1fff..e0b041c9f 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala @@ -16,7 +16,9 @@ package za.co.absa.pramen.extras.mocks -import za.co.absa.pramen.api.status.{RunInfo, RunStatus, TaskResult, TaskRunReason} +import com.typesafe.config.ConfigFactory +import za.co.absa.pramen.api.jobdef.Schedule +import za.co.absa.pramen.api.status.{JobType, RunInfo, RunStatus, TaskDef, TaskResult, TaskRunReason} import za.co.absa.pramen.api.{DataFormat, MetaTableDef, status} import za.co.absa.pramen.extras.utils.httpclient.SimpleHttpResponse @@ -44,8 +46,13 @@ object TestPrototypes { val taskStatus: RunStatus = RunStatus.Succeeded(None, Some(100), None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) val taskNotification: TaskResult = status.TaskResult( - "Dummy Job", - metaTableDef, + TaskDef( + "Dummy Job", + JobType.Transformation("dummy_class"), + metaTableDef, + Schedule.EveryDay(), + ConfigFactory.empty() + ), taskStatus, Option(RunInfo(infoDate, Instant.ofEpochSecond(1645274606), Instant.ofEpochSecond(1645278206))), "test-1234", diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala index ad8e76ff6..c671ec71c 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsNotificationTargetSuite.scala @@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory import org.apache.hadoop.fs.Path import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.DataFormat +import za.co.absa.pramen.api.status.TaskDef +import za.co.absa.pramen.extras.TaskDefFactory import za.co.absa.pramen.extras.mocks.{SimpleHttpClientSpy, TestPrototypes} import za.co.absa.pramen.extras.notification.EcsNotificationTarget.{ECS_API_SECRET_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY} import za.co.absa.pramen.extras.utils.httpclient.SimpleHttpClient @@ -45,7 +47,8 @@ class EcsNotificationTargetSuite extends AnyWordSpec { override protected def getHttpClient(trustAllSslCerts: Boolean): SimpleHttpClient = httpClient } - notificationTarget.sendNotification(null, TestPrototypes.taskNotification.copy(outputTable = metaTableDef)) + val taskDef = TaskDefFactory.getDummyTaskNotification(outputTable = metaTableDef) + notificationTarget.sendNotification(null, TestPrototypes.taskNotification.copy(taskDef = taskDef)) assert(httpClient.executeCalled == 1) assert(httpClient.requests.head.url == "https://dummyurl.local/kk") diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala index 0e71549b4..0207eea9e 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/notification/EcsPipelineNotificationTargetSuite.scala @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.api.status.{CustomNotification, RuntimeInfo} import za.co.absa.pramen.api.{DataFormat, PipelineInfo, Query} +import za.co.absa.pramen.extras.TaskDefFactory import za.co.absa.pramen.extras.mocks.{SimpleHttpClientSpy, TestPrototypes} import za.co.absa.pramen.extras.notification.EcsPipelineNotificationTarget.{ECS_API_SECRET_KEY, ECS_API_TRUST_SSL_KEY, ECS_API_URL_KEY} import za.co.absa.pramen.extras.utils.httpclient.SimpleHttpClient @@ -46,13 +47,19 @@ class EcsPipelineNotificationTargetSuite extends AnyWordSpec { override protected def getHttpClient(trustAllSslCerts: Boolean): SimpleHttpClient = httpClient } - val task1 = TestPrototypes.taskNotification.copy(outputTable = metaTableDef) val dataFormat2 = DataFormat.Parquet("s3a://dummy_bucket_not_exist/dummy/path2", None) - val metaTableDef2 = TestPrototypes.metaTableDef.copy(name = "table2", format = dataFormat2) - val task2 = TestPrototypes.taskNotification.copy(jobName = "Job 3", outputTable = metaTableDef2) val dataFormat3 = DataFormat.Delta(Query.Table("table2"), None) + + val metaTableDef2 = TestPrototypes.metaTableDef.copy(name = "table2", format = dataFormat2) val metaTableDef3 = TestPrototypes.metaTableDef.copy(name = "table3", format = dataFormat3) - val task3 = TestPrototypes.taskNotification.copy(jobName = "Job 3", outputTable = metaTableDef3) + + val taskDef1 = TaskDefFactory.getDummyTaskNotification(name = "Job 1", outputTable = metaTableDef) + val taskDef2 = TaskDefFactory.getDummyTaskNotification(name = "Job 2", outputTable = metaTableDef2) + val taskDef3 = TaskDefFactory.getDummyTaskNotification(name = "Job 3", outputTable = metaTableDef3) + + val task1 = TestPrototypes.taskNotification.copy(taskDef = taskDef1) + val task2 = TestPrototypes.taskNotification.copy(taskDef = taskDef2) + val task3 = TestPrototypes.taskNotification.copy(taskDef = taskDef3) notificationTarget.sendNotification( PipelineInfo("Dummy", "DEV", RuntimeInfo(), Instant.now, None, None, None, Seq.empty, "pid_123", None),