Skip to content

Commit

Permalink
#234 Add cleanup of transient tables.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Sep 13, 2023
1 parent 9f93461 commit 3d600bb
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 11 deletions.
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2145,6 +2145,31 @@ You can use any source/sink combination in transfer jobs.

We describe here a more complicated use cases.

### Transient tables in the metastore
Transformers are useful as reusable components and for persisting intermediate reusable results. However, when splitting
up the pipeline into small reusable components, it is not always desirable to persist intermediate results. This is
solved by transient tables. You can define a table as transient in the metastore and it won't be persisted to a storage.
You can use transient tables in transformers and sinks, but only for the same pipeline and the same information date.

You can define a transient table in the metastore like this:
```hocon
pramen.metastore {
tables = [
{
name = "table1"
format = "transient"
cache.policy = "no_cache"
}
]
}
```

The cache policy can be:
- `no_cache` - the table is not cached
- `cache` - the table is cached using Spark cache
- `persist` - the table is persisted in the temporary directory for the duration of the pipeline run.


### File-based sourcing
Let's consider a use case when your data lake has 'landing' area where data is loaded from external sources, in addition
to the classic 'raw' area. The 'landing' area is owned by the data producer and source systems can write files there.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object DataFormatParser {
val path = Query.Path(conf.getString(PATH_KEY)).path
DataFormat.Raw(path)
case FORMAT_TRANSIENT =>
val cachePolicy = getCachePolicy(conf).getOrElse(CachePolicy.Persist)
val cachePolicy = getCachePolicy(conf).getOrElse(CachePolicy.NoCache)
DataFormat.Transient(cachePolicy)
case _ => throw new IllegalArgumentException(s"Unknown format: $format")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.metastore.peristence

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.CachePolicy
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.HiveConfig
Expand All @@ -31,7 +32,6 @@ class MetastorePersistenceTransient(tempPath: String,
tableName: String,
cachePolicy: CachePolicy
)(implicit spark: SparkSession) extends MetastorePersistence {

import MetastorePersistenceTransient._

override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
Expand Down Expand Up @@ -85,13 +85,17 @@ class MetastorePersistenceTransient(tempPath: String,
}

object MetastorePersistenceTransient {
private val log = LoggerFactory.getLogger(this.getClass)

case class MetastorePartition(tableName: String, infoDateStr: String)

private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
private val persistedLocations = new mutable.HashMap[MetastorePartition, String]()
private var spark: SparkSession = _

private[core] def addRawDataFrame(tableName: String, infoDate: LocalDate, df: DataFrame): (DataFrame, Option[Long]) = synchronized {
spark = df.sparkSession
val partition = getMetastorePartition(tableName, infoDate)

rawDataframes += partition -> df
Expand All @@ -105,6 +109,7 @@ object MetastorePersistenceTransient {
val cachedDf = df.cache()

this.synchronized {
spark = df.sparkSession
cachedDataframes += partition -> cachedDf
}

Expand All @@ -113,7 +118,10 @@ object MetastorePersistenceTransient {

private[core] def addPersistedDataFrame(tableName: String, infoDate: LocalDate, df: DataFrame, tempDir: String): (DataFrame, Option[Long]) = {
val partition = getMetastorePartition(tableName, infoDate)
val spark = df.sparkSession
this.synchronized {
spark = df.sparkSession
}

val partitionFolder = s"temp_partition_date=$infoDate"
val outputPath = new Path(tempDir, partitionFolder).toString

Expand All @@ -129,35 +137,41 @@ object MetastorePersistenceTransient {
persistedLocations += partition -> outputPath
}

(df.sparkSession.read.parquet(outputPath), Option(sizeBytes))
(spark.read.parquet(outputPath), Option(sizeBytes))
}

private[core] def getDataForTheDate(tableName: String, infoDate: LocalDate)(implicit spark: SparkSession): DataFrame = synchronized {
val partition = getMetastorePartition(tableName, infoDate)

if (MetastorePersistenceTransient.rawDataframes.contains(partition)) {
log.info(s"Using non-cached dataframe for '$tableName' for '$infoDate'...")
MetastorePersistenceTransient.rawDataframes(partition)
} else if (MetastorePersistenceTransient.cachedDataframes.contains(partition)) {
log.info(s"Using cached dataframe for '$tableName' for '$infoDate'...")
MetastorePersistenceTransient.cachedDataframes(partition)
} else if (MetastorePersistenceTransient.persistedLocations.contains(partition)) {
val path = MetastorePersistenceTransient.persistedLocations(partition)
log.info(s"Reading persisted transient table from $path...")
spark.read.parquet(path)
} else {
throw new IllegalStateException(s"No data for transient table '$tableName' for '$infoDate'")
}
}

private[core] def cleanup()(implicit spark: SparkSession): Unit = synchronized {
private[core] def cleanup(): Unit = synchronized {
rawDataframes.clear()
cachedDataframes.foreach { case (_, df) => df.unpersist() }
cachedDataframes.clear()

persistedLocations.foreach { case (_, path) =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, path)
if (spark != null) {
persistedLocations.foreach { case (_, path) =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, path)

fsUtils.deleteDirectoryRecursively(new Path(path))
log.info(s"Deleting $path...")
fsUtils.deleteDirectoryRecursively(new Path(path))
}
persistedLocations.clear()
}
persistedLocations.clear()
}

private def getMetastorePartition(tableName: String, infoDate: LocalDate): MetastorePartition = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.app.{AppContext, AppContextImpl}
import za.co.absa.pramen.core.config.Keys.LOG_EXECUTOR_NODES
import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceTransient
import za.co.absa.pramen.core.pipeline.{Job, OperationSplitter, PipelineDef}
import za.co.absa.pramen.core.runner.jobrunner.{ConcurrentJobRunner, ConcurrentJobRunnerImpl}
import za.co.absa.pramen.core.runner.orchestrator.OrchestratorImpl
Expand Down Expand Up @@ -217,6 +218,7 @@ object AppRunner {
private[core] def shutdown(taskRunner: TaskRunner, state: PipelineState): Try[Unit] = {
handleFailure(Try {
taskRunner.shutdown()
MetastorePersistenceTransient.cleanup()
}, state, "shutting down task runner execution context")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ import com.typesafe.config.Config
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Pramen
import za.co.absa.pramen.core.app.config.RuntimeConfig.EMAIL_IF_NO_CHANGES
import za.co.absa.pramen.core.metastore.peristence.MetastorePersistenceTransient
import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail}
import za.co.absa.pramen.core.pipeline.PipelineDef._
import za.co.absa.pramen.core.runner.task.RunStatus.{NotRan, Skipped}
import za.co.absa.pramen.core.runner.task.RunStatus.NotRan
import za.co.absa.pramen.core.runner.task.TaskResult

import java.time.Instant
import scala.collection.mutable.ListBuffer
import scala.util.Try
import scala.util.control.NonFatal

class PipelineStateImpl(implicit conf: Config) extends PipelineState {
Expand Down Expand Up @@ -87,6 +89,12 @@ class PipelineStateImpl(implicit conf: Config) extends PipelineState {
failureException = Some(new IllegalStateException("The application exited unexpectedly."))
}
sendNotificationEmail()
Try{
// Clean up transient metastore tables if any
MetastorePersistenceTransient.cleanup()
}.recover {
case NonFatal(ex) => log.error("Unable to clean up transient metastore tables.", ex)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class DataFormatSuite extends AnyWordSpec {

assert(format.name == "transient")
assert(format.isInstanceOf[Transient])
assert(format.asInstanceOf[Transient].cachePolicy == CachePolicy.Persist)
assert(format.asInstanceOf[Transient].cachePolicy == CachePolicy.NoCache)
}

"support cache policies for 'transient' format" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ class MetastorePersistenceTransientSuite extends AnyWordSpec with BeforeAndAfter
super.afterAll()
}

"cleanup should do nothing if spark session is not available" in {
MetastorePersistenceTransient.cleanup()
}

"loadTable" should {
"return data for a date when it is available" in {
val persistor = new MetastorePersistenceTransient(null, "table1", CachePolicy.NoCache)
Expand Down

0 comments on commit 3d600bb

Please sign in to comment.