Skip to content

Commit

Permalink
[SPARK] Don't cache failed future result in LazyCheckpointProvider
Browse files Browse the repository at this point in the history
Don't cache failed future result in LazyCheckpointProvider.

Currently we define LazyCheckpointProvider as following:
```
    def createCheckpointProvider(): CheckpointProvider = {
        ...
        val v2Actions = ThreadUtils.wait(readActionsFuture)
        CheckpointProvider(v2Actions, ...)
    }

    lazy val underlyingCheckpointProvider = createCheckpointProvider()

```

If the future here fails, then the `createCheckpointProvider()` will fail and whoever is accessing the `underlyingCheckpointProvider` will also get exception. The `underlyingCheckpointProvider` is accessed by Snapshot class - so snapshot class will get some error and query will fail.

But a user might run the same query again in some time - since the snapshot is cached under deltalog, the snapshot will again invoke methods on checkpointProvider, which will invoke lazyCheckpointProvider.underlyingCheckpointProvider. Due to this, it will again fail as future has already failed once in the beginning due to some intermittent failure.

The solution here is to not use the future and instead invoke real readV2Actions method in the subsequent invocation of `createCheckpointProvider()`. If the `createCheckpointProvider()` has already succeeded in first attempt, then the checkpointProvider will be cached under `lazy val underlyingCheckpointProvider`. But if it failed in first attempt, then next time we should not use future result for getting v2 actions and we should do I/O and read v2 actions again.

GitOrigin-RevId: a5631e91a15ec7b991bf5c7ba213a59a465b1d1a
  • Loading branch information
prakharjain09 authored and vkorukanti committed Oct 6, 2023
1 parent 4e7cb5f commit bbf19c3
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,22 @@
package org.apache.spark.sql.delta

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.SnapshotManagement.checkpointV2ThreadPool
import org.apache.spark.sql.delta.actions._
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.LogStore
import org.apache.spark.sql.delta.util.FileNames._
import org.apache.spark.sql.delta.util.NonFateSharingFuture
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}

import org.apache.spark.sql.Dataset
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.ThreadUtils

/**
* Represents basic information about a checkpoint.
Expand Down Expand Up @@ -96,8 +96,14 @@ object CheckpointProvider extends DeltaLogging {
case uninitializedV2CheckpointProvider: UninitializedV2CheckpointProvider =>
new LazyCompleteCheckpointProvider(uninitializedV2CheckpointProvider) {
override def createCheckpointProvider(): CheckpointProvider = {
val (checkpointMetadata, sidecarFiles) =
uninitializedV2CheckpointProvider.finishIOAndGetActions()
val (checkpointMetadataOpt, sidecarFiles) =
uninitializedV2CheckpointProvider.nonFateSharingCheckpointReadFuture.get(Duration.Inf)
// This must be a v2 checkpoint, so checkpointMetadataOpt must be non empty.
val checkpointMetadata = checkpointMetadataOpt.getOrElse {
val checkpointFile = uninitializedV2CheckpointProvider.topLevelFiles.head
throw new IllegalStateException(s"V2 Checkpoint ${checkpointFile.getPath} " +
s"has no CheckpointMetadata action")
}
require(isV2CheckpointEnabled(snapshotDescriptor.protocol))
V2CheckpointProvider(uninitializedV2CheckpointProvider, checkpointMetadata, sidecarFiles)
}
Expand All @@ -113,14 +119,13 @@ object CheckpointProvider extends DeltaLogging {
// Start a future to start reading the v2 actions from the parquet checkpoint and return
// a lazy checkpoint provider wrapping the future. we won't wait on the future unless/until
// somebody calls a complete checkpoint provider method.
val checkpointReadFuture = SnapshotManagement.checkpointV2ThreadPool.submit(spark) {
val future = checkpointV2ThreadPool.submitNonFateSharing { spark: SparkSession =>
readV2ActionsFromParquetCheckpoint(
spark, provider.logPath, provider.fileStatus, snapshotDescriptor.deltaLog.options)
}
new LazyCompleteCheckpointProvider(provider) {
override def createCheckpointProvider(): CheckpointProvider = {
val (checkpointMetadataOpt, sidecarFiles) =
ThreadUtils.awaitResult(checkpointReadFuture, Duration.Inf)
val (checkpointMetadataOpt, sidecarFiles) = future.get(Duration.Inf)
checkpointMetadataOpt match {
case Some(cm) =>
require(isV2CheckpointEnabled(snapshotDescriptor))
Expand Down Expand Up @@ -184,8 +189,7 @@ object CheckpointProvider extends DeltaLogging {
try {
var checkpointMetadataOpt: Option[CheckpointMetadata] = None
val sidecarFileActions: ArrayBuffer[SidecarFile] = ArrayBuffer.empty
logStore
.readAsIterator(fileStatus, hadoopConf)
logStore.readAsIterator(fileStatus, hadoopConf).processAndClose { _
.map(Action.fromJson)
.foreach {
case cm: CheckpointMetadata if checkpointMetadataOpt.isEmpty =>
Expand All @@ -197,6 +201,7 @@ object CheckpointProvider extends DeltaLogging {
sidecarFileActions.append(sidecarFile)
case _ => ()
}
}
val checkpointMetadata = checkpointMetadataOpt.getOrElse {
throw new IllegalStateException("Json V2 Checkpoint has no CheckpointMetadata action")
}
Expand Down Expand Up @@ -360,42 +365,27 @@ case class UninitializedV2CheckpointProvider(
}

/** Helper method to do I/O and read v2 actions from the underlying v2 checkpoint file */
private def readV2Actions(
spark: SparkSession, logStore: LogStore): (CheckpointMetadata, Seq[SidecarFile]) = {
private def readV2Actions(spark: SparkSession): (Option[CheckpointMetadata], Seq[SidecarFile]) = {
v2CheckpointFormat match {
case V2Checkpoint.Format.JSON =>
CheckpointProvider.readV2ActionsFromJsonCheckpoint(
logStore, logPath, fileStatus, hadoopConf)
val (checkpointMetadata, sidecars) = CheckpointProvider.readV2ActionsFromJsonCheckpoint(
logStore, logPath, fileStatus, hadoopConf)
(Some(checkpointMetadata), sidecars)
case V2Checkpoint.Format.PARQUET =>
val (checkpointMetadataOpt, sidecarFiles) =
CheckpointProvider.readV2ActionsFromParquetCheckpoint(
CheckpointProvider.readV2ActionsFromParquetCheckpoint(
spark, logPath, fileStatus, deltaLogOptions)
val checkpointMetadata = checkpointMetadataOpt.getOrElse {
throw new IllegalStateException(
s"CheckpointMetadata action not found in v2 checkpoint ${fileStatus.getPath}")
}
(checkpointMetadata, sidecarFiles)
}
}

private val checkpointReadFuture: Future[(CheckpointMetadata, Seq[SidecarFile])] = {
v2ActionsFromLastCheckpointOpt match {
case Some(result) => Future.successful(result)
case None =>
val spark = SparkSession.getActiveSession.getOrElse {
throw DeltaErrors.sparkSessionNotSetException()
}
SnapshotManagement.checkpointV2ThreadPool.submit(spark) { readV2Actions(spark, logStore) }
val nonFateSharingCheckpointReadFuture
: NonFateSharingFuture[(Option[CheckpointMetadata], Seq[SidecarFile])] = {
checkpointV2ThreadPool.submitNonFateSharing { spark: SparkSession =>
v2ActionsFromLastCheckpointOpt match {
case Some((cm, sidecars)) => Some(cm) -> sidecars
case None => readV2Actions(spark)
}
}
}

/**
* Finish any underlying I/O if needed and returns the v2 checkpoint actions
* i.e. [[CheckpointMetadata]] and Seq of [[SidecarFile]]s
*/
def finishIOAndGetActions(
timeout: Duration = Duration.Inf): (CheckpointMetadata, Seq[SidecarFile]) =
ThreadUtils.awaitResult(checkpointReadFuture, timeout)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import java.util.concurrent.ThreadPoolExecutor
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration.Duration

import org.apache.spark.sql.delta.DeltaErrors
import org.apache.spark.sql.delta.metering.DeltaLogging

import org.apache.spark.sql.SparkSession
import org.apache.spark.util.ThreadUtils

Expand All @@ -44,6 +47,9 @@ private[delta] class DeltaThreadPool(tpe: ThreadPoolExecutor) {
val futures = items.map(i => submit(spark)(f(i)))
ThreadUtils.awaitResult(Future.sequence(futures), timeout)
}

def submitNonFateSharing[T](f: SparkSession => T): NonFateSharingFuture[T] =
new NonFateSharingFuture(this)(f)
}


Expand All @@ -52,3 +58,40 @@ private[delta] object DeltaThreadPool {
def apply(prefix: String, numThreads: Int): DeltaThreadPool =
new DeltaThreadPool(ThreadUtils.newDaemonCachedThreadPool(prefix, numThreads))
}

/**
* Helper class to run a function `f` immediately in a threadpool and avoid sharing [[SparkSession]]
* on further retries if the the first attempt of function `f` (in the future) fails due to some
* reason.
* Everyone will use the future that prefetches f -- it succeeds -- but if the future fails,
* everyone will call f themselves.
*/
class NonFateSharingFuture[T](pool: DeltaThreadPool)(f: SparkSession => T)
extends DeltaLogging {

// We may not have a spark session yet, but that's ok (the future is best-effort)
// Submit a future if a spark session is available
private var futureOpt = SparkSession.getActiveSession.map { spark =>
pool.submit(spark) { f(spark) }
}

def get(timeout: Duration): T = {
// Prefer to get a prefetched result from the future
futureOpt.foreach { future =>
try {
return ThreadUtils.awaitResult(future, timeout)
} catch {
case e: Throwable =>
logError("Failed to get result from future", e)
futureOpt = None // avoid excessive log spam
throw e
}
}

// Future missing or failed, so fall back to direct execution
SparkSession.getActiveSession match {
case Some(spark) => f(spark)
case _ => throw DeltaErrors.sparkSessionNotSetException()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -653,6 +653,54 @@ class CheckpointsSuite
}
}
}

def checkIntermittentError(tempDir: File, lastCheckpointMissing: Boolean): Unit = {
// Create a table with commit version 0, 1 and a checkpoint.
val tablePath = tempDir.getAbsolutePath
spark.range(10).write.format("delta").save(tablePath)
spark.sql(s"INSERT INTO delta.`$tablePath`" +
s"SELECT * FROM delta.`$tablePath` WHERE id = 1").collect()

val log = DeltaLog.forTable(spark, tablePath)
val conf = log.newDeltaHadoopConf()
log.checkpoint()

// Delete _last_checkpoint, CRC file based on test configuration.
val fs = log.logPath.getFileSystem(conf)
if (lastCheckpointMissing) {
fs.delete(log.LAST_CHECKPOINT)
}

// In order to trigger an intermittent failure while reading checkpoint, this test corrupts
// the checkpoint temporarily so that json/parquet checkpoint reader fails. The corrupted
// file is written with same length so that when the file is uncorrupted in future, then we
// can test that delta is able to read that file and produce correct results. If the "bad" file
// is not of same length, then the read with "good" file will also fail as parquet reader will
// use the cache file status's getLen to find out where the footer is and will fail after not
// finding the magic bytes.
val checkpointFileStatus =
log.listFrom(0).filter(FileNames.isCheckpointFile).toSeq.head
// Rename the correct checkpoint to a temp path and create a checkpoint with character 'r'
// repeated.
val tempPath = checkpointFileStatus.getPath.suffix(".temp")
fs.rename(checkpointFileStatus.getPath, tempPath)
val randomContentToWrite = Seq("r" * (checkpointFileStatus.getLen.toInt - 1)) // + 1 (\n)
log.store.write(
checkpointFileStatus.getPath, randomContentToWrite.toIterator, overwrite = true, conf)
assert(log.store.read(checkpointFileStatus.getPath, conf) === randomContentToWrite)
assert(fs.getFileStatus(tempPath).getLen === checkpointFileStatus.getLen)

DeltaLog.clearCache()
sql(s"SELECT * FROM delta.`$tablePath`").collect()
val snapshot = DeltaLog.forTable(spark, tablePath).unsafeVolatileSnapshot
snapshot.computeChecksum
assert(snapshot.checkpointProvider.isEmpty)
}
for (lastCheckpointMissing <- BOOLEAN_DOMAIN)
testDifferentCheckpoints("intermittent error while reading checkpoint should not" +
s" stick to snapshot [lastCheckpointMissing: $lastCheckpointMissing]") { (_, _) =>
withTempDir { tempDir => checkIntermittentError(tempDir, lastCheckpointMissing) }
}
}

/**
Expand Down

0 comments on commit bbf19c3

Please sign in to comment.