Skip to content

Commit

Permalink
SPARK-47609. Making CacheLookup more optimal to minimize cache miss
Browse files Browse the repository at this point in the history
  • Loading branch information
ashahid committed Apr 8, 2024
1 parent 04bf981 commit 6ced3ce
Show file tree
Hide file tree
Showing 8 changed files with 434 additions and 44 deletions.
3 changes: 2 additions & 1 deletion sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3874,7 +3874,8 @@ class Dataset[T] private[sql](
*/
def storageLevel: StorageLevel = {
sparkSession.sharedState.cacheManager.lookupCachedData(this).map { cachedData =>
cachedData.cachedRepresentation.cacheBuilder.storageLevel
cachedData.cachedRepresentation.fold(CacheManager.inMemoryRelationExtractor, identity).
cacheBuilder.storageLevel
}.getOrElse(StorageLevel.NONE)
}

Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,16 @@ case class AnalyzeColumnCommand(
private def analyzeColumnInCachedData(plan: LogicalPlan, sparkSession: SparkSession): Boolean = {
val cacheManager = sparkSession.sharedState.cacheManager
val planToLookup = sparkSession.sessionState.executePlan(plan).analyzed
cacheManager.lookupCachedData(planToLookup).map { cachedData =>
val columnsToAnalyze = getColumnsToAnalyze(
tableIdent, cachedData.cachedRepresentation, columnNames, allColumns)
cacheManager.analyzeColumnCacheQuery(sparkSession, cachedData, columnsToAnalyze)
cachedData
}.isDefined
cacheManager.lookupCachedData(planToLookup).exists { cachedData =>
if (cachedData.cachedRepresentation.isRight) {
val columnsToAnalyze = getColumnsToAnalyze(
tableIdent, cachedData.cachedRepresentation.merge, columnNames, allColumns)
cacheManager.analyzeColumnCacheQuery(sparkSession, cachedData, columnsToAnalyze)
true
} else {
false
}
}
}

private def analyzeColumnInTempView(plan: LogicalPlan, sparkSession: SparkSession): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ object CommandUtils extends Logging {
// Analyzes a catalog view if the view is cached
val table = sparkSession.table(tableIdent.quotedString)
val cacheManager = sparkSession.sharedState.cacheManager
if (cacheManager.lookupCachedData(table.logicalPlan).isDefined) {
if (cacheManager.lookupCachedData(table.logicalPlan).exists(_.cachedRepresentation.isRight)) {
if (!noScan) {
// To collect table stats, materializes an underlying columnar RDD
table.count()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import org.apache.spark.sql.catalyst.util.{escapeSingleQuotedString, quoteIfNeed
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.CURRENT_DEFAULT_COLUMN_METADATA_KEY
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.TableIdentifierHelper
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
Expand Down Expand Up @@ -201,7 +202,8 @@ case class AlterTableRenameCommand(
// If `optStorageLevel` is defined, the old table was cached.
val optCachedData = sparkSession.sharedState.cacheManager.lookupCachedData(
sparkSession.table(oldName.unquotedString))
val optStorageLevel = optCachedData.map(_.cachedRepresentation.cacheBuilder.storageLevel)
val optStorageLevel = optCachedData.map(_.cachedRepresentation.
fold(CacheManager.inMemoryRelationExtractor, identity).cacheBuilder.storageLevel)
if (optStorageLevel.isDefined) {
CommandUtils.uncacheTableOrView(sparkSession, oldName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import org.apache.spark.sql.connector.read.LocalScan
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.{FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.{CacheManager, FilterExec, InSubqueryExec, LeafExecNode, LocalTableScanExec, ProjectExec, RowDataSourceScanExec, SparkPlan}
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, LogicalRelation, PushableColumnAndNestedColumn}
import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
Expand Down Expand Up @@ -85,8 +85,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
val v2Relation = DataSourceV2Relation.create(r.table, Some(r.catalog), Some(r.identifier))
val cache = session.sharedState.cacheManager.lookupCachedData(v2Relation)
session.sharedState.cacheManager.uncacheQuery(session, v2Relation, cascade = true)
if (cache.isDefined) {
val cacheLevel = cache.get.cachedRepresentation.cacheBuilder.storageLevel
if (cache.exists(_.cachedRepresentation.isRight)) {
val cacheLevel = cache.get.cachedRepresentation.
fold(CacheManager.inMemoryRelationExtractor, identity).cacheBuilder.storageLevel
Some(cacheLevel)
} else {
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql
import org.scalatest.concurrent.TimeLimits
import org.scalatest.time.SpanSugar._

import org.apache.spark.sql.execution.{ProjectExec, UnaryExecNode}
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
import org.apache.spark.sql.functions._
Expand Down Expand Up @@ -244,15 +245,26 @@ class DatasetCacheSuite extends QueryTest
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)

// Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
// The assertion below is incorrect in context of bug SPARK-47609.
// as df2 is derivable from df1 ( which is an InMemoryRelation)

/*
// Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
// on df, since df2's cache had not been loaded before df.unpersist().
assert(df2LimitInnerPlan.isDefined &&
!df2LimitInnerPlan.get.exists(_.isInstanceOf[InMemoryTableScanExec]))
*/
assert(df2LimitInnerPlan.isDefined)
val innerImr = df2LimitInnerPlan.get.collectFirst {
case imrExec: InMemoryTableScanExec => imrExec.relation
}
assert(innerImr.isDefined)
assert(innerImr.get.cacheBuilder.cachedPlan.asInstanceOf[UnaryExecNode].
child.isInstanceOf[ProjectExec])
}

test("SPARK-27739 Save stats from optimized plan") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.command

import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.storage.StorageLevel

/**
Expand Down Expand Up @@ -73,7 +74,8 @@ trait AlterTableRenameSuiteBase extends QueryTest with DDLCommandTestUtils {
def getStorageLevel(tableName: String): StorageLevel = {
val table = spark.table(tableName)
val cachedData = spark.sharedState.cacheManager.lookupCachedData(table).get
cachedData.cachedRepresentation.cacheBuilder.storageLevel
cachedData.cachedRepresentation.fold(CacheManager.inMemoryRelationExtractor, identity).
cacheBuilder.storageLevel
}
sql(s"CREATE TABLE $src (c0 INT) $defaultUsing")
sql(s"INSERT INTO $src SELECT 0")
Expand Down

0 comments on commit 6ced3ce

Please sign in to comment.