Skip to content

Commit

Permalink
#188 Use locking to ensure that only one task instance is running at …
Browse files Browse the repository at this point in the history
…the same time.
  • Loading branch information
yruslan committed Apr 17, 2023
1 parent 0657c1e commit a8082d5
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ class MetastorePersistenceDelta(query: Query,
case Query.Table(table) =>
writer.saveAsTable(table)
log.info(s"Writing to table '$table' WHERE $whereCondition...")
case Query.Sql(_) =>
throw new IllegalStateException(s"An sql expression is not supported as a write target for Delta.")
case q =>
throw new IllegalStateException(s"The '${q.name}' option is not supported as a write target for Delta.")
}

val stats = getStats(infoDate)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ object AppRunner {
state: PipelineState,
appContext: AppContext): Try[TaskRunner] = {
handleFailure(Try {
new TaskRunnerMultithreaded(conf, appContext.bookkeeper, appContext.journal, state, appContext.appConfig.runtimeConfig)
new TaskRunnerMultithreaded(conf, appContext.bookkeeper, appContext.journal, appContext.tokenLockFactory, state, appContext.appConfig.runtimeConfig)
}, state, "initialization of the task runner")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ class ScheduleStrategySourcing extends ScheduleStrategy {
getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper)
}

val dayBeforeMinimum = minimumDate.minusDays(1)

dates.filter(_.infoDate.isAfter(dayBeforeMinimum))
filterOutPastMinimumDates(outputTable, dates, minimumDate)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ class ScheduleStrategyTransformation extends ScheduleStrategy {
getHistorical(outputTable, dateFrom, dateTo, schedule, mode, infoDateExpression, minimumDate, inverseDateOrder, bookkeeper)
}

val dayBeforeMinimum = minimumDate.minusDays(1)

dates.filter(_.infoDate.isAfter(dayBeforeMinimum))
filterOutPastMinimumDates(outputTable, dates, minimumDate)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.pramen.core.metastore.model.MetastoreDependency
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.{TaskPreDef, TaskRunReason}
import za.co.absa.pramen.core.schedule.Schedule
import za.co.absa.pramen.core.utils.Emoji.WARNING

import java.time.LocalDate
import scala.collection.mutable
Expand Down Expand Up @@ -163,8 +164,19 @@ object ScheduleStrategyUtils {
datesWithAlreadyRanSkipped
}

val beforeMinDate = minimumDate.minusDays(1)
datesWithProperOrder.filter(t => t.infoDate.isAfter(beforeMinDate))
filterOutPastMinimumDates(outputTable, datesWithProperOrder, minimumDate)
}

private[core] def filterOutPastMinimumDates(tableName: String, dates: List[TaskPreDef], minimumDate: LocalDate): List[TaskPreDef] = {
val dayBeforeMinimum = minimumDate.minusDays(1)

dates.filter { taskPreDef =>
val isOk = taskPreDef.infoDate.isAfter(dayBeforeMinimum)
if (!isOk) {
log.warn(s"$WARNING '${taskPreDef.infoDate}' is before the minimum date '$minimumDate' for the table '$tableName'. The task is skipped.")
}
isOk
}
}

private[core] def anyDependencyUpdatedRetrospectively(outputTable: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.ReasonException
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.journal.model.TaskCompleted
import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.notify.NotificationTargetManager
import za.co.absa.pramen.core.notify.pipeline.SchemaDifference
Expand All @@ -42,6 +43,7 @@ import scala.util.{Failure, Success, Try}
abstract class TaskRunnerBase(conf: Config,
bookkeeper: Bookkeeper,
journal: Journal,
lockFactory: TokenLockFactory,
runtimeConfig: RuntimeConfig,
pipelineState: PipelineState) extends TaskRunner {
implicit private val ecDefault: ExecutionContext = ExecutionContext.global
Expand Down Expand Up @@ -219,7 +221,12 @@ abstract class TaskRunnerBase(conf: Config,
* @return an instance of TaskResult.
*/
private[core] def run(task: Task, started: Instant, validationResult: JobPreRunResult): TaskResult = {
Try {
val lock = lockFactory.getLock(getTokenName(task))

val attempt = Try {
if (runtimeConfig.useLocks && !lock.tryAcquire())
throw new IllegalStateException(s"Another instance is already running for ${task.job.outputTable.name} for ${task.infoDate}")

val recordCountOldOpt = bookkeeper.getLatestDataChunk(task.job.outputTable.name, task.infoDate, task.infoDate).map(_.outputRecordCount)

val runResult = task.job.run(task.infoDate, conf)
Expand Down Expand Up @@ -286,7 +293,13 @@ abstract class TaskRunnerBase(conf: Config,
schemaChangesBeforeTransform ::: schemaChangesAfterTransform,
validationResult.dependencyWarnings,
Seq.empty)
} match {
}

if (runtimeConfig.useLocks) {
lock.release()
}

attempt match {
case Success(result) =>
result
case Failure(ex) =>
Expand All @@ -295,6 +308,10 @@ abstract class TaskRunnerBase(conf: Config,
}
}

private def getTokenName(task: Task): String = {
s"${task.job.outputTable.name}_${task.infoDate}"
}

/** Logs task completion and sends corresponding notifications. */
private def onTaskCompletion(task: Task, taskResult: TaskResult): RunStatus = {
val notificationTargetErrors = sendNotifications(task, taskResult)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.pipeline.Task
import za.co.absa.pramen.core.state.PipelineState

import java.util.concurrent.{ExecutorService, Semaphore}
import java.util.concurrent.Executors.newFixedThreadPool
import java.util.concurrent.{ExecutorService, Semaphore}
import scala.concurrent.ExecutionContext.fromExecutorService
import scala.concurrent.{ExecutionContextExecutorService, Future}
import scala.util.Try
Expand All @@ -37,8 +38,9 @@ import scala.util.Try
class TaskRunnerMultithreaded(conf: Config,
bookkeeper: Bookkeeper,
journal: Journal,
lockFactory: TokenLockFactory,
pipelineState: PipelineState,
runtimeConfig: RuntimeConfig) extends TaskRunnerBase(conf, bookkeeper, journal, runtimeConfig, pipelineState) {
runtimeConfig: RuntimeConfig) extends TaskRunnerBase(conf, bookkeeper, journal, lockFactory, runtimeConfig, pipelineState) {
private val log = LoggerFactory.getLogger(this.getClass)

private val executor: ExecutorService = newFixedThreadPool(runtimeConfig.parallelTasks)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.pramen.core.source
import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.{ExternalChannelFactory, Query, Source, SourceResult, TableReader}
import za.co.absa.pramen.api._
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.reader.{TableReaderJdbc, TableReaderJdbcNative}

Expand Down Expand Up @@ -54,8 +54,8 @@ class JdbcSource(sourceConfig: Config,
case Query.Sql(sql) =>
log.info(s"Using TableReaderJdbcNative to read the query: $sql")
new TableReaderJdbcNative(jdbcReaderConfig.jdbcConfig)
case Query.Path(_) =>
throw new IllegalArgumentException(s"Unexpected 'path' spec for the JDBC reader. Only 'table' or 'sql' are supported. Config path: $sourceConfigParentPath")
case q =>
throw new IllegalArgumentException(s"Unexpected '${q.name}' spec for the JDBC reader. Only 'table' or 'sql' are supported. Config path: $sourceConfigParentPath")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,16 @@ class SparkSource(val format: String,

def getReader(query: Query): TableReader = {
val tableReader = query match {
case Query.Table(table) =>
case Query.Table(table) =>
log.info(s"Using TableReaderSpark to read table: $table")
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateFormat, options)
case Query.Sql(sql) =>
log.info(s"Using TableReaderSpark to read SQL for: $sql")
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateFormat, options)
case Query.Path(path) =>
case Query.Path(path) =>
log.info(s"Using TableReaderSpark to read '$format' from: $path")
new TableReaderSpark(format, schema, hasInfoDateCol, infoDateColumn, infoDateFormat, options)
case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the Spark source. Use 'path', 'table' or 'sql' instead.")
}
log.info(s"Options passed for '$format':")
ConfigUtils.renderExtraOptions(options, KEYS_TO_REDACT)(s => log.info(s))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,19 @@ import com.github.yruslan.channel.Channel
import com.typesafe.config.ConfigFactory
import org.apache.spark.sql.DataFrame
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.{OperationDefFactory, RuntimeConfigFactory}
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.pipeline.{Job, RunResult}
import za.co.absa.pramen.core.mocks.bookkeeper.SyncBookkeeperMock
import za.co.absa.pramen.core.mocks.job.JobSpy
import za.co.absa.pramen.core.mocks.journal.JournalMock
import za.co.absa.pramen.core.mocks.lock.TokenLockFactoryMock
import za.co.absa.pramen.core.mocks.state.PipelineStateSpy
import za.co.absa.pramen.core.pipeline.{Job, RunResult}
import za.co.absa.pramen.core.runner.jobrunner.ConcurrentJobRunnerImpl
import za.co.absa.pramen.core.runner.task.RunStatus.{Failed, Succeeded}
import za.co.absa.pramen.core.runner.task.TaskRunnerMultithreaded
import za.co.absa.pramen.core.{OperationDefFactory, RuntimeConfigFactory}

import java.time.{Instant, LocalDate, Duration => Dur}

Expand Down Expand Up @@ -182,6 +183,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {

val bookkeeper = new SyncBookkeeperMock
val journal = new JournalMock
val tokenLockFactory = new TokenLockFactoryMock

val state = new PipelineStateSpy

Expand All @@ -192,7 +194,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase {
val operationDef = OperationDefFactory.getDummyOperationDef(consumeThreads = consumeThreads)
val job = new JobSpy(runFunction = runFunction, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel)

val taskRunner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, state, runtimeConfig)
val taskRunner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, tokenLockFactory, state, runtimeConfig)

val jobRunner = new ConcurrentJobRunnerImpl(runtimeConfig, bookkeeper, taskRunner)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import za.co.absa.pramen.core.metastore.model.MetastoreDependency
import za.co.absa.pramen.core.mocks.bookkeeper.SyncBookkeeperMock
import za.co.absa.pramen.core.mocks.job.JobSpy
import za.co.absa.pramen.core.mocks.journal.JournalMock
import za.co.absa.pramen.core.mocks.lock.TokenLockFactoryMock
import za.co.absa.pramen.core.mocks.state.PipelineStateSpy
import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.runner.task.RunStatus.{Failed, NotRan, Skipped, Succeeded}
Expand Down Expand Up @@ -481,6 +482,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar

val bookkeeper = if (bookkeeperIn == null) new SyncBookkeeperMock else bookkeeperIn
val journal = new JournalMock
val tokenLockFactory = new TokenLockFactoryMock

val state = new PipelineStateSpy

Expand All @@ -501,7 +503,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar

val tasks = infoDates.map(d => core.pipeline.Task(job, d, TaskRunReason.New))

val runner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, state, runtimeConfig)
val runner = new TaskRunnerMultithreaded(conf, bookkeeper, journal, tokenLockFactory, state, runtimeConfig)

(runner, bookkeeper, journal, state, tasks)
}
Expand Down

0 comments on commit a8082d5

Please sign in to comment.