From a9643d059263667014dad185bd7e1663d32afbde Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 11 Sep 2024 08:17:03 +0200 Subject: [PATCH] #374 Remove parenthesis of several get methods. --- .../za/co/absa/pramen/core/PramenImpl.scala | 6 +- .../pipeline/IncrementalIngestionJob.scala | 10 +-- .../absa/pramen/core/runner/AppRunner.scala | 6 +- .../ScheduleStrategyIncremental.scala | 73 +++++++++++++++++++ .../core/runner/task/TaskRunnerBase.scala | 6 +- .../pramen/core/state/PipelineState.scala | 4 +- .../pramen/core/state/PipelineStateImpl.scala | 6 +- .../co/absa/pramen/core/PramenImplSuite.scala | 6 +- .../core/mocks/state/PipelineStateSpy.scala | 4 +- .../pramen/core/schedule/ScheduleSuite.scala | 9 +++ .../core/state/PipelineStateSuite.scala | 37 +++++----- 11 files changed, 121 insertions(+), 46 deletions(-) create mode 100644 pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala index f70c08806..48765f908 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/PramenImpl.scala @@ -44,7 +44,7 @@ class PramenImpl extends Pramen { throw new IllegalStateException("Pipeline state is not available at the context.") ) - pipelineState.getState().pipelineInfo + pipelineState.getState.pipelineInfo } override def pipelineState: PipelineStateSnapshot = { @@ -52,7 +52,7 @@ class PramenImpl extends Pramen { throw new IllegalStateException("Pipeline state is not available at the context.") ) - pipelineState.getState() + pipelineState.getState } override def notificationBuilder: NotificationBuilder = notificationBuilderImpl @@ -66,7 +66,7 @@ class PramenImpl extends Pramen { throw new IllegalStateException("Pipeline state is not available at the context.") ) - val state = pipelineState.getState() + val state = pipelineState.getState state.taskResults } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 7e364c754..13dbc2bb1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -17,20 +17,15 @@ package za.co.absa.pramen.core.pipeline import com.typesafe.config.Config -import org.apache.hadoop.fs.Path import org.apache.spark.sql.functions.lit import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason} import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult} -import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated import za.co.absa.pramen.core.metastore.Metastore import za.co.absa.pramen.core.metastore.model.MetaTable -import za.co.absa.pramen.core.metastore.peristence.TransientTableManager -import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing} -import za.co.absa.pramen.core.utils.ConfigUtils -import za.co.absa.pramen.core.utils.Emoji.WARNING +import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental} import za.co.absa.pramen.core.utils.SparkUtils._ import java.time.{Instant, LocalDate} @@ -48,9 +43,8 @@ class IncrementalIngestionJob(operationDef: OperationDef, specialCharacters: String) (implicit spark: SparkSession) extends IngestionJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, sourceTable, outputTable, specialCharacters, None, false) { - import JobBase._ - override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing + override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset) override def trackDays: Int = 0 diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index a1334c900..d700a6280 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -97,7 +97,7 @@ object AppRunner { private[core] def createPipelineState(implicit conf: Config): Try[PipelineState] = { Try { val state = new PipelineStateImpl()(conf, PramenImpl.instance.notificationBuilder) - log.info(s"Starting pipeline, batchId=${state.getBatchId()}") + log.info(s"Starting pipeline, batchId=${state.getBatchId}") state } } @@ -107,7 +107,7 @@ object AppRunner { spark: SparkSession): Try[AppContext] = { handleFailure(Try { PramenImpl.instance.asInstanceOf[PramenImpl].setPipelineState(state) - AppContextImpl(conf, state.getBatchId()) + AppContextImpl(conf, state.getBatchId) }, state, "initialization of the pipeline") } @@ -185,7 +185,7 @@ object AppRunner { spark: SparkSession): Try[Seq[Job]] = { handleFailure(Try { val isHistoricalRun = appContext.appConfig.runtimeConfig.runDateTo.nonEmpty - val splitter = new OperationSplitter(conf, appContext.metastore, appContext.bookkeeper, state.getBatchId()) + val splitter = new OperationSplitter(conf, appContext.metastore, appContext.bookkeeper, state.getBatchId) if (isHistoricalRun) log.info("This is a historical run. Making all dependencies 'passive' for all jobs...") diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala new file mode 100644 index 000000000..f0beee233 --- /dev/null +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala @@ -0,0 +1,73 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.runner.splitter + +import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} +import za.co.absa.pramen.core.bookkeeper.Bookkeeper +import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated +import za.co.absa.pramen.core.pipeline +import za.co.absa.pramen.core.pipeline.TaskPreDef +import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._ +import za.co.absa.pramen.core.schedule.Schedule + +import java.time.LocalDate + +class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated]) extends ScheduleStrategy { + private val log = org.slf4j.LoggerFactory.getLogger(this.getClass) + + override def getDaysToRun( + outputTable: String, + dependencies: Seq[MetastoreDependency], + bookkeeper: Bookkeeper, + infoDateExpression: String, + schedule: Schedule, + params: ScheduleParams, + initialSourcingDateExpr: String, + minimumDate: LocalDate + ): Seq[TaskPreDef] = { + val dates = params match { + case ScheduleParams.Normal(runDate, _, _, _, _) => + val infoDate = evaluateRunDate(runDate, infoDateExpression) + log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate") + + val runInfoDays = lastOffsets match { + case Some(offset) => + if (offset.maximumInfoDate.isAfter(infoDate)) { + Seq.empty + } else { + Seq(infoDate) + .map(d => pipeline.TaskPreDef(d, TaskRunReason.New)) + } + case None => + Seq(infoDate) + .map(d => pipeline.TaskPreDef(d, TaskRunReason.New)) + } + + log.info(s"Days to run: ${runInfoDays.map(_.infoDate).mkString(", ")}") + + runInfoDays.toList + case ScheduleParams.Rerun(runDate) => + log.info(s"Rerun strategy for a single day: $runDate") + getRerun(outputTable, runDate, schedule, infoDateExpression, bookkeeper) + case ScheduleParams.Historical(dateFrom, dateTo, inverseDateOrder, mode) => + log.info(s"Ranged strategy: from $dateFrom to $dateTo, mode = '${mode.toString}', minimumDate = $minimumDate") + getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper) + } + + filterOutPastMinimumDates(dates, minimumDate) + } +} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index de2683a82..6df1d0816 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -35,8 +35,8 @@ import za.co.absa.pramen.core.pipeline._ import za.co.absa.pramen.core.state.PipelineState import za.co.absa.pramen.core.utils.Emoji._ import za.co.absa.pramen.core.utils.SparkUtils._ -import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils} import za.co.absa.pramen.core.utils.hive.HiveHelper +import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils} import java.sql.Date import java.time.{Instant, LocalDate} @@ -453,7 +453,7 @@ abstract class TaskRunnerBase(conf: Config, log.warn("Skipping the interrupted exception of the killed task.") } else { pipelineState.addTaskCompletion(Seq(updatedResult)) - addJournalEntry(task, updatedResult, pipelineState.getState().pipelineInfo) + addJournalEntry(task, updatedResult, pipelineState.getState.pipelineInfo) } updatedResult.runStatus @@ -491,7 +491,7 @@ abstract class TaskRunnerBase(conf: Config, } private def sendNotifications(task: Task, result: TaskResult): Seq[NotificationFailure] = { - val pipelineInfo = pipelineState.getState().pipelineInfo + val pipelineInfo = pipelineState.getState.pipelineInfo task.job.notificationTargets.flatMap(notificationTarget => sendNotifications(task, result, notificationTarget, pipelineInfo)) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala index 8038023d9..cd61e3efe 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala @@ -19,9 +19,9 @@ package za.co.absa.pramen.core.state import za.co.absa.pramen.api.status.{PipelineStateSnapshot, TaskResult} trait PipelineState extends AutoCloseable { - def getState(): PipelineStateSnapshot + def getState: PipelineStateSnapshot - def getBatchId(): Long + def getBatchId: Long def setShutdownHookCanRun(): Unit diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index 3bc3606d6..5ec62bc21 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -80,7 +80,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification pipelineNotificationTargets = PipelineNotificationTargetFactory.fromConfig(conf) } - override def getState(): PipelineStateSnapshot = synchronized { + override def getState: PipelineStateSnapshot = synchronized { val appException = if (!exitedNormally && failureException.isEmpty && signalException.isDefined) { signalException } else @@ -127,7 +127,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification ) } - override def getBatchId(): Long = batchId + override def getBatchId: Long = batchId override def setShutdownHookCanRun(): Unit = synchronized { customShutdownHookCanRun = true @@ -232,7 +232,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification } private[state] def sendPipelineNotifications(): Unit = { - pipelineNotificationTargets.foreach(notificationTarget => sendCustomNotification(notificationTarget, getState(), taskResults.toSeq)) + pipelineNotificationTargets.foreach(notificationTarget => sendCustomNotification(notificationTarget, getState, taskResults.toSeq)) } private[state] def sendCustomNotification(pipelineNotificationTarget: PipelineNotificationTarget, pipelineStateSnapshot: PipelineStateSnapshot, taskResults: Seq[TaskResult]): Unit = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/PramenImplSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/PramenImplSuite.scala index 62bc9f6e4..590db7ab1 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/PramenImplSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/PramenImplSuite.scala @@ -119,7 +119,7 @@ class PramenImplSuite extends AnyWordSpec { ) val pipelineState = mock(classOf[PipelineState]) - when(pipelineState.getState()).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults)) + when(pipelineState.getState).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults)) pramen.setPipelineState(pipelineState) @@ -153,7 +153,7 @@ class PramenImplSuite extends AnyWordSpec { ) val pipelineState = mock(classOf[PipelineState]) - when(pipelineState.getState()).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults)) + when(pipelineState.getState).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults)) pramen.setPipelineState(pipelineState) @@ -188,7 +188,7 @@ class PramenImplSuite extends AnyWordSpec { ) val pipelineState = mock(classOf[PipelineState]) - when(pipelineState.getState()).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults)) + when(pipelineState.getState).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults)) pramen.setPipelineState(pipelineState) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala index 92015b8f1..fd68398bd 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala @@ -33,14 +33,14 @@ class PipelineStateSpy extends PipelineState { var closeCalled = 0 var sparkAppId: Option[String] = None - override def getState(): PipelineStateSnapshot = { + override def getState: PipelineStateSnapshot = { PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(PipelineInfoFactory.getDummyPipelineInfo(sparkApplicationId = sparkAppId), customShutdownHookCanRun = setShutdownHookCanRunCount > 0, taskResults = completedStatuses.toList ) } - override def getBatchId(): Long = 0L + override def getBatchId: Long = 0L override def setShutdownHookCanRun(): Unit = synchronized { setShutdownHookCanRunCount += 1 diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala index 46447b841..576b09bbe 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/schedule/ScheduleSuite.scala @@ -26,6 +26,15 @@ import java.time.{DayOfWeek, LocalDate} class ScheduleSuite extends AnyWordSpec { "Schedule.fromConfig" should { + "Deserialize incremental jobs" when { + "a normal incremental job is provided" in { + val config = ConfigFactory.parseString(s"$SCHEDULE_TYPE_KEY = incremental") + val schedule = fromConfig(config) + + assert(schedule == Schedule.Incremental) + } + } + "Deserialize daily jobs" when { "a normal daily job is provided" in { val config = ConfigFactory.parseString(s"$SCHEDULE_TYPE_KEY = daily") diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/state/PipelineStateSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/state/PipelineStateSuite.scala index f8d014e2a..6c1dac961 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/state/PipelineStateSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/state/PipelineStateSuite.scala @@ -38,11 +38,11 @@ class PipelineStateSuite extends AnyWordSpec { "set the flag to true" in { val stateManager = new PipelineStateImpl()(conf, PramenImpl.instance.notificationBuilder) - assert(!stateManager.getState().customShutdownHookCanRun) + assert(!stateManager.getState.customShutdownHookCanRun) stateManager.setShutdownHookCanRun() - assert(stateManager.getState().customShutdownHookCanRun) + assert(stateManager.getState.customShutdownHookCanRun) } } @@ -50,14 +50,14 @@ class PipelineStateSuite extends AnyWordSpec { "set the success flag to true" in { val stateManager = getMockPipelineState() - assert(!stateManager.getState().isFinished) - assert(!stateManager.getState().exitedNormally) + assert(!stateManager.getState.isFinished) + assert(!stateManager.getState.exitedNormally) stateManager.setSuccess() - assert(stateManager.getState().isFinished) - assert(stateManager.getState().exitedNormally) - assert(stateManager.getState().pipelineInfo.failureException.isEmpty) + assert(stateManager.getState.isFinished) + assert(stateManager.getState.exitedNormally) + assert(stateManager.getState.pipelineInfo.failureException.isEmpty) } } @@ -65,14 +65,14 @@ class PipelineStateSuite extends AnyWordSpec { "set the failure flag to true" in { val stateManager = getMockPipelineState() - assert(!stateManager.getState().isFinished) - assert(!stateManager.getState().exitedNormally) + assert(!stateManager.getState.isFinished) + assert(!stateManager.getState.exitedNormally) stateManager.setFailure("test", new RuntimeException("test")) - assert(stateManager.getState().isFinished) - assert(!stateManager.getState().exitedNormally) - assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[RuntimeException])) + assert(stateManager.getState.isFinished) + assert(!stateManager.getState.exitedNormally) + assert(stateManager.getState.pipelineInfo.failureException.exists(_.isInstanceOf[RuntimeException])) } } @@ -80,14 +80,14 @@ class PipelineStateSuite extends AnyWordSpec { "add the task completion statuses" in { val stateManager = getMockPipelineState() - assert(stateManager.getState().taskResults.isEmpty) + assert(stateManager.getState.taskResults.isEmpty) stateManager.addTaskCompletion(Seq( TaskResultFactory.getDummyTaskResult(runStatus = RunStatus.Failed(new RuntimeException("test"))) )) - assert(stateManager.getState().taskResults.size == 1) - assert(stateManager.getState().exitCode == 2) + assert(stateManager.getState.taskResults.size == 1) + assert(stateManager.getState.exitCode == 2) } } @@ -127,7 +127,7 @@ class PipelineStateSuite extends AnyWordSpec { stateManager.runCustomShutdownHook() assert(ShutdownHookFailureMock.ranTimes > 0) - assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[LinkageError])) + assert(stateManager.getState.pipelineInfo.failureException.exists(_.isInstanceOf[LinkageError])) } "handle class does not exists errors" in { @@ -137,9 +137,8 @@ class PipelineStateSuite extends AnyWordSpec { stateManager.runCustomShutdownHook() assert(ShutdownHookFailureMock.ranTimes > 0) - assert(stateManager.getState().pipelineInfo.failureException.isDefined) - assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[ClassNotFoundException])) - + assert(stateManager.getState.pipelineInfo.failureException.isDefined) + assert(stateManager.getState.pipelineInfo.failureException.exists(_.isInstanceOf[ClassNotFoundException])) } }