Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-14205][SQL] remove trait Queryable #12001

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.util.MLUtils.loadLabeledData"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.optimization.LBFGS.setMaxNumIterations"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.setScoreCol")
) ++ Seq(
// [SPARK-14205][SQL] remove trait Queryable
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.Dataset")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down
88 changes: 79 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import java.io.CharArrayWriter
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import com.fasterxml.jackson.core.JsonFactory
import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
Expand All @@ -39,7 +41,7 @@ import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
Expand Down Expand Up @@ -150,10 +152,10 @@ private[sql] object Dataset {
* @since 1.6.0
*/
class Dataset[T] private[sql](
@transient override val sqlContext: SQLContext,
@DeveloperApi @transient override val queryExecution: QueryExecution,
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
extends Queryable with Serializable {
extends Serializable {

queryExecution.assertAnalyzed()

Expand Down Expand Up @@ -224,7 +226,7 @@ class Dataset[T] private[sql](
* @param _numRows Number of rows to show
* @param truncate Whether truncate long strings and align cells right
*/
override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
val numRows = _numRows.max(0)
val takeResult = take(numRows + 1)
val hasMoreData = takeResult.length > numRows
Expand All @@ -249,7 +251,75 @@ class Dataset[T] private[sql](
}: Seq[String]
}

formatString ( rows, numRows, hasMoreData, truncate )
val sb = new StringBuilder
val numCols = schema.fieldNames.length

// Initialise the width of each column to a minimum value of '3'
val colWidths = Array.fill(numCols)(3)

// Compute the width of each column
for (row <- rows) {
for ((cell, i) <- row.zipWithIndex) {
colWidths(i) = math.max(colWidths(i), cell.length)
}
}

// Create SeparateLine
val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()

// column names
rows.head.zipWithIndex.map { case (cell, i) =>
if (truncate) {
StringUtils.leftPad(cell, colWidths(i))
} else {
StringUtils.rightPad(cell, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")

sb.append(sep)

// data
rows.tail.map {
_.zipWithIndex.map { case (cell, i) =>
if (truncate) {
StringUtils.leftPad(cell.toString, colWidths(i))
} else {
StringUtils.rightPad(cell.toString, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")
}

sb.append(sep)

// For Data that has more than "numRows" records
if (hasMoreData) {
val rowsString = if (numRows == 1) "row" else "rows"
sb.append(s"only showing top $numRows $rowsString\n")
}

sb.toString()
}

override def toString: String = {
try {
val builder = new StringBuilder
val fields = schema.take(2).map {
case f => s"${f.name}: ${f.dataType.simpleString(2)}"
}
builder.append("[")
builder.append(fields.mkString(", "))
if (schema.length > 2) {
if (schema.length - fields.size == 1) {
builder.append(" ... 1 more field")
} else {
builder.append(" ... " + (schema.length - 2) + " more fields")
}
}
builder.append("]").toString()
} catch {
case NonFatal(e) =>
s"Invalid tree; ${e.getMessage}:\n$queryExecution"
}
}

/**
Expand Down Expand Up @@ -325,7 +395,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
// scalastyle:off println
override def printSchema(): Unit = println(schema.treeString)
def printSchema(): Unit = println(schema.treeString)
// scalastyle:on println

/**
Expand All @@ -334,7 +404,7 @@ class Dataset[T] private[sql](
* @group basic
* @since 1.6.0
*/
override def explain(extended: Boolean): Unit = {
def explain(extended: Boolean): Unit = {
val explain = ExplainCommand(queryExecution.logical, extended = extended)
sqlContext.executePlan(explain).executedPlan.executeCollect().foreach {
// scalastyle:off println
Expand All @@ -349,7 +419,7 @@ class Dataset[T] private[sql](
* @group basic
* @since 1.6.0
*/
override def explain(): Unit = explain(extended = false)
def explain(): Unit = explain(extended = false)

/**
* Returns all column names and their data types as an array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ class KeyValueGroupedDataset[K, V] private[sql](
private def logicalPlan = queryExecution.analyzed
private def sqlContext = queryExecution.sqlContext

private def groupedData = {
new RelationalGroupedDataset(
Dataset.ofRows(sqlContext, logicalPlan),
groupingAttributes,
RelationalGroupedDataset.GroupByType)
}

/**
* Returns a new [[KeyValueGroupedDataset]] where the type of the key has been mapped to the
* specified type. The mapping of key columns to the type follows the same rules as `as` on
Expand Down Expand Up @@ -207,12 +200,6 @@ class KeyValueGroupedDataset[K, V] private[sql](
reduceGroups(f.call _)
}

private def withEncoder(c: Column): Column = c match {
case tc: TypedColumn[_, _] =>
tc.withInputType(resolvedVEncoder.bind(dataAttributes), dataAttributes)
case _ => c
}

/**
* Internal helper function for building typed aggregations that return tuples. For simplicity
* and code reuse, we do this without the help of the type system and then use helper functions
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,11 @@ class SQLContext private[sql](
}

/**
* Returns true if the [[Queryable]] is currently cached in-memory.
* Returns true if the [[Dataset]] is currently cached in-memory.
* @group cachemgmt
* @since 1.3.0
*/
private[sql] def isCached(qName: Queryable): Boolean = {
private[sql] def isCached(qName: Dataset[_]): Boolean = {
cacheManager.lookupCachedData(qName).nonEmpty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.Dataset
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -74,12 +75,12 @@ private[sql] class CacheManager extends Logging {
}

/**
* Caches the data produced by the logical representation of the given [[Queryable]].
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
private[sql] def cacheQuery(
query: Queryable,
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
Expand All @@ -99,20 +100,20 @@ private[sql] class CacheManager extends Logging {
}
}

/** Removes the data for the given [[Queryable]] from the cache */
private[sql] def uncacheQuery(query: Queryable, blocking: Boolean = true): Unit = writeLock {
/** Removes the data for the given [[Dataset]] from the cache */
private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.uncache(blocking)
cachedData.remove(dataIndex)
}

/** Tries to remove the data for the given [[Queryable]] from the cache
/** Tries to remove the data for the given [[Dataset]] from the cache
* if it's cached
*/
private[sql] def tryUncacheQuery(
query: Queryable,
query: Dataset[_],
blocking: Boolean = true): Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
Expand All @@ -124,8 +125,8 @@ private[sql] class CacheManager extends Logging {
found
}

/** Optionally returns cached data for the given [[Queryable]] */
private[sql] def lookupCachedData(query: Queryable): Option[CachedData] = readLock {
/** Optionally returns cached data for the given [[Dataset]] */
private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
lookupCachedData(query.queryExecution.analyzed)
}

Expand Down
124 changes: 0 additions & 124 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala

This file was deleted.

Loading