Skip to content

Commit

Permalink
#374 Remove parenthesis of several get methods.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 11, 2024
1 parent 9264c6b commit a9643d0
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,15 @@ class PramenImpl extends Pramen {
throw new IllegalStateException("Pipeline state is not available at the context.")
)

pipelineState.getState().pipelineInfo
pipelineState.getState.pipelineInfo
}

override def pipelineState: PipelineStateSnapshot = {
val pipelineState = _pipelineState.getOrElse(
throw new IllegalStateException("Pipeline state is not available at the context.")
)

pipelineState.getState()
pipelineState.getState
}

override def notificationBuilder: NotificationBuilder = notificationBuilderImpl
Expand All @@ -66,7 +66,7 @@ class PramenImpl extends Pramen {
throw new IllegalStateException("Pipeline state is not available at the context.")
)

val state = pipelineState.getState()
val state = pipelineState.getState

state.taskResults
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@
package za.co.absa.pramen.core.pipeline

import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult}
import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated
import za.co.absa.pramen.core.metastore.Metastore
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.peristence.TransientTableManager
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategySourcing}
import za.co.absa.pramen.core.utils.ConfigUtils
import za.co.absa.pramen.core.utils.Emoji.WARNING
import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental}
import za.co.absa.pramen.core.utils.SparkUtils._

import java.time.{Instant, LocalDate}
Expand All @@ -48,9 +43,8 @@ class IncrementalIngestionJob(operationDef: OperationDef,
specialCharacters: String)
(implicit spark: SparkSession)
extends IngestionJob(operationDef, metastore, bookkeeper, notificationTargets, sourceName, source, sourceTable, outputTable, specialCharacters, None, false) {
import JobBase._

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategySourcing
override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset)

override def trackDays: Int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ object AppRunner {
private[core] def createPipelineState(implicit conf: Config): Try[PipelineState] = {
Try {
val state = new PipelineStateImpl()(conf, PramenImpl.instance.notificationBuilder)
log.info(s"Starting pipeline, batchId=${state.getBatchId()}")
log.info(s"Starting pipeline, batchId=${state.getBatchId}")
state
}
}
Expand All @@ -107,7 +107,7 @@ object AppRunner {
spark: SparkSession): Try[AppContext] = {
handleFailure(Try {
PramenImpl.instance.asInstanceOf[PramenImpl].setPipelineState(state)
AppContextImpl(conf, state.getBatchId())
AppContextImpl(conf, state.getBatchId)
}, state, "initialization of the pipeline")
}

Expand Down Expand Up @@ -185,7 +185,7 @@ object AppRunner {
spark: SparkSession): Try[Seq[Job]] = {
handleFailure(Try {
val isHistoricalRun = appContext.appConfig.runtimeConfig.runDateTo.nonEmpty
val splitter = new OperationSplitter(conf, appContext.metastore, appContext.bookkeeper, state.getBatchId())
val splitter = new OperationSplitter(conf, appContext.metastore, appContext.bookkeeper, state.getBatchId)

if (isHistoricalRun)
log.info("This is a historical run. Making all dependencies 'passive' for all jobs...")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.runner.splitter

import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.TaskPreDef
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._
import za.co.absa.pramen.core.schedule.Schedule

import java.time.LocalDate

class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated]) extends ScheduleStrategy {
private val log = org.slf4j.LoggerFactory.getLogger(this.getClass)

override def getDaysToRun(
outputTable: String,
dependencies: Seq[MetastoreDependency],
bookkeeper: Bookkeeper,
infoDateExpression: String,
schedule: Schedule,
params: ScheduleParams,
initialSourcingDateExpr: String,
minimumDate: LocalDate
): Seq[TaskPreDef] = {
val dates = params match {
case ScheduleParams.Normal(runDate, _, _, _, _) =>
val infoDate = evaluateRunDate(runDate, infoDateExpression)
log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate")

val runInfoDays = lastOffsets match {
case Some(offset) =>
if (offset.maximumInfoDate.isAfter(infoDate)) {
Seq.empty
} else {
Seq(infoDate)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.New))
}
case None =>
Seq(infoDate)
.map(d => pipeline.TaskPreDef(d, TaskRunReason.New))
}

log.info(s"Days to run: ${runInfoDays.map(_.infoDate).mkString(", ")}")

runInfoDays.toList
case ScheduleParams.Rerun(runDate) =>
log.info(s"Rerun strategy for a single day: $runDate")
getRerun(outputTable, runDate, schedule, infoDateExpression, bookkeeper)
case ScheduleParams.Historical(dateFrom, dateTo, inverseDateOrder, mode) =>
log.info(s"Ranged strategy: from $dateFrom to $dateTo, mode = '${mode.toString}', minimumDate = $minimumDate")
getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper)
}

filterOutPastMinimumDates(dates, minimumDate)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils}
import za.co.absa.pramen.core.utils.hive.HiveHelper
import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils}

import java.sql.Date
import java.time.{Instant, LocalDate}
Expand Down Expand Up @@ -453,7 +453,7 @@ abstract class TaskRunnerBase(conf: Config,
log.warn("Skipping the interrupted exception of the killed task.")
} else {
pipelineState.addTaskCompletion(Seq(updatedResult))
addJournalEntry(task, updatedResult, pipelineState.getState().pipelineInfo)
addJournalEntry(task, updatedResult, pipelineState.getState.pipelineInfo)
}

updatedResult.runStatus
Expand Down Expand Up @@ -491,7 +491,7 @@ abstract class TaskRunnerBase(conf: Config,
}

private def sendNotifications(task: Task, result: TaskResult): Seq[NotificationFailure] = {
val pipelineInfo = pipelineState.getState().pipelineInfo
val pipelineInfo = pipelineState.getState.pipelineInfo
task.job.notificationTargets.flatMap(notificationTarget => sendNotifications(task, result, notificationTarget, pipelineInfo))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ package za.co.absa.pramen.core.state
import za.co.absa.pramen.api.status.{PipelineStateSnapshot, TaskResult}

trait PipelineState extends AutoCloseable {
def getState(): PipelineStateSnapshot
def getState: PipelineStateSnapshot

def getBatchId(): Long
def getBatchId: Long

def setShutdownHookCanRun(): Unit

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
pipelineNotificationTargets = PipelineNotificationTargetFactory.fromConfig(conf)
}

override def getState(): PipelineStateSnapshot = synchronized {
override def getState: PipelineStateSnapshot = synchronized {
val appException = if (!exitedNormally && failureException.isEmpty && signalException.isDefined) {
signalException
} else
Expand Down Expand Up @@ -127,7 +127,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
)
}

override def getBatchId(): Long = batchId
override def getBatchId: Long = batchId

override def setShutdownHookCanRun(): Unit = synchronized {
customShutdownHookCanRun = true
Expand Down Expand Up @@ -232,7 +232,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
}

private[state] def sendPipelineNotifications(): Unit = {
pipelineNotificationTargets.foreach(notificationTarget => sendCustomNotification(notificationTarget, getState(), taskResults.toSeq))
pipelineNotificationTargets.foreach(notificationTarget => sendCustomNotification(notificationTarget, getState, taskResults.toSeq))
}

private[state] def sendCustomNotification(pipelineNotificationTarget: PipelineNotificationTarget, pipelineStateSnapshot: PipelineStateSnapshot, taskResults: Seq[TaskResult]): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ class PramenImplSuite extends AnyWordSpec {
)

val pipelineState = mock(classOf[PipelineState])
when(pipelineState.getState()).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults))
when(pipelineState.getState).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults))

pramen.setPipelineState(pipelineState)

Expand Down Expand Up @@ -153,7 +153,7 @@ class PramenImplSuite extends AnyWordSpec {
)

val pipelineState = mock(classOf[PipelineState])
when(pipelineState.getState()).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults))
when(pipelineState.getState).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults))

pramen.setPipelineState(pipelineState)

Expand Down Expand Up @@ -188,7 +188,7 @@ class PramenImplSuite extends AnyWordSpec {
)

val pipelineState = mock(classOf[PipelineState])
when(pipelineState.getState()).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults))
when(pipelineState.getState).thenReturn(PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(taskResults = taskResults))

pramen.setPipelineState(pipelineState)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ class PipelineStateSpy extends PipelineState {
var closeCalled = 0
var sparkAppId: Option[String] = None

override def getState(): PipelineStateSnapshot = {
override def getState: PipelineStateSnapshot = {
PipelineStateSnapshotFactory.getDummyPipelineStateSnapshot(PipelineInfoFactory.getDummyPipelineInfo(sparkApplicationId = sparkAppId),
customShutdownHookCanRun = setShutdownHookCanRunCount > 0,
taskResults = completedStatuses.toList
)
}

override def getBatchId(): Long = 0L
override def getBatchId: Long = 0L

override def setShutdownHookCanRun(): Unit = synchronized {
setShutdownHookCanRunCount += 1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,15 @@ import java.time.{DayOfWeek, LocalDate}
class ScheduleSuite extends AnyWordSpec {

"Schedule.fromConfig" should {
"Deserialize incremental jobs" when {
"a normal incremental job is provided" in {
val config = ConfigFactory.parseString(s"$SCHEDULE_TYPE_KEY = incremental")
val schedule = fromConfig(config)

assert(schedule == Schedule.Incremental)
}
}

"Deserialize daily jobs" when {
"a normal daily job is provided" in {
val config = ConfigFactory.parseString(s"$SCHEDULE_TYPE_KEY = daily")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,56 +38,56 @@ class PipelineStateSuite extends AnyWordSpec {
"set the flag to true" in {
val stateManager = new PipelineStateImpl()(conf, PramenImpl.instance.notificationBuilder)

assert(!stateManager.getState().customShutdownHookCanRun)
assert(!stateManager.getState.customShutdownHookCanRun)

stateManager.setShutdownHookCanRun()

assert(stateManager.getState().customShutdownHookCanRun)
assert(stateManager.getState.customShutdownHookCanRun)
}
}

"setSuccess()" should {
"set the success flag to true" in {
val stateManager = getMockPipelineState()

assert(!stateManager.getState().isFinished)
assert(!stateManager.getState().exitedNormally)
assert(!stateManager.getState.isFinished)
assert(!stateManager.getState.exitedNormally)

stateManager.setSuccess()

assert(stateManager.getState().isFinished)
assert(stateManager.getState().exitedNormally)
assert(stateManager.getState().pipelineInfo.failureException.isEmpty)
assert(stateManager.getState.isFinished)
assert(stateManager.getState.exitedNormally)
assert(stateManager.getState.pipelineInfo.failureException.isEmpty)
}
}

"setFailure()" should {
"set the failure flag to true" in {
val stateManager = getMockPipelineState()

assert(!stateManager.getState().isFinished)
assert(!stateManager.getState().exitedNormally)
assert(!stateManager.getState.isFinished)
assert(!stateManager.getState.exitedNormally)

stateManager.setFailure("test", new RuntimeException("test"))

assert(stateManager.getState().isFinished)
assert(!stateManager.getState().exitedNormally)
assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[RuntimeException]))
assert(stateManager.getState.isFinished)
assert(!stateManager.getState.exitedNormally)
assert(stateManager.getState.pipelineInfo.failureException.exists(_.isInstanceOf[RuntimeException]))
}
}

"addTaskCompletion" should {
"add the task completion statuses" in {
val stateManager = getMockPipelineState()

assert(stateManager.getState().taskResults.isEmpty)
assert(stateManager.getState.taskResults.isEmpty)

stateManager.addTaskCompletion(Seq(
TaskResultFactory.getDummyTaskResult(runStatus = RunStatus.Failed(new RuntimeException("test")))
))

assert(stateManager.getState().taskResults.size == 1)
assert(stateManager.getState().exitCode == 2)
assert(stateManager.getState.taskResults.size == 1)
assert(stateManager.getState.exitCode == 2)
}
}

Expand Down Expand Up @@ -127,7 +127,7 @@ class PipelineStateSuite extends AnyWordSpec {
stateManager.runCustomShutdownHook()

assert(ShutdownHookFailureMock.ranTimes > 0)
assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[LinkageError]))
assert(stateManager.getState.pipelineInfo.failureException.exists(_.isInstanceOf[LinkageError]))
}

"handle class does not exists errors" in {
Expand All @@ -137,9 +137,8 @@ class PipelineStateSuite extends AnyWordSpec {
stateManager.runCustomShutdownHook()

assert(ShutdownHookFailureMock.ranTimes > 0)
assert(stateManager.getState().pipelineInfo.failureException.isDefined)
assert(stateManager.getState().pipelineInfo.failureException.exists(_.isInstanceOf[ClassNotFoundException]))

assert(stateManager.getState.pipelineInfo.failureException.isDefined)
assert(stateManager.getState.pipelineInfo.failureException.exists(_.isInstanceOf[ClassNotFoundException]))
}
}

Expand Down

0 comments on commit a9643d0

Please sign in to comment.