Skip to content

Commit

Permalink
[SPARK-14205][SQL] remove trait Queryable
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

After DataFrame and Dataset are merged, the trait `Queryable` becomes unnecessary as it has only one implementation. We should remove it.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenchen@databricks.com>

Closes #12001 from cloud-fan/df-ds.
  • Loading branch information
cloud-fan authored and rxin committed Mar 29, 2016
1 parent 289257c commit 38326ca
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 161 deletions.
3 changes: 3 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,9 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.util.MLUtils.loadLabeledData"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.optimization.LBFGS.setMaxNumIterations"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.evaluation.BinaryClassificationEvaluator.setScoreCol")
) ++ Seq(
// [SPARK-14205][SQL] remove trait Queryable
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.Dataset")
)
case v if v.startsWith("1.6") =>
Seq(
Expand Down
88 changes: 79 additions & 9 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@ import java.io.CharArrayWriter
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.runtime.universe.TypeTag
import scala.util.control.NonFatal

import com.fasterxml.jackson.core.JsonFactory
import org.apache.commons.lang3.StringUtils

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.api.java.JavaRDD
Expand All @@ -39,7 +41,7 @@ import org.apache.spark.sql.catalyst.optimizer.CombineUnions
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.usePrettyExpression
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, Queryable, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
import org.apache.spark.sql.execution.command.ExplainCommand
import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
Expand Down Expand Up @@ -150,10 +152,10 @@ private[sql] object Dataset {
* @since 1.6.0
*/
class Dataset[T] private[sql](
@transient override val sqlContext: SQLContext,
@DeveloperApi @transient override val queryExecution: QueryExecution,
@transient val sqlContext: SQLContext,
@DeveloperApi @transient val queryExecution: QueryExecution,
encoder: Encoder[T])
extends Queryable with Serializable {
extends Serializable {

queryExecution.assertAnalyzed()

Expand Down Expand Up @@ -224,7 +226,7 @@ class Dataset[T] private[sql](
* @param _numRows Number of rows to show
* @param truncate Whether truncate long strings and align cells right
*/
override private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
private[sql] def showString(_numRows: Int, truncate: Boolean = true): String = {
val numRows = _numRows.max(0)
val takeResult = take(numRows + 1)
val hasMoreData = takeResult.length > numRows
Expand All @@ -249,7 +251,75 @@ class Dataset[T] private[sql](
}: Seq[String]
}

formatString ( rows, numRows, hasMoreData, truncate )
val sb = new StringBuilder
val numCols = schema.fieldNames.length

// Initialise the width of each column to a minimum value of '3'
val colWidths = Array.fill(numCols)(3)

// Compute the width of each column
for (row <- rows) {
for ((cell, i) <- row.zipWithIndex) {
colWidths(i) = math.max(colWidths(i), cell.length)
}
}

// Create SeparateLine
val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()

// column names
rows.head.zipWithIndex.map { case (cell, i) =>
if (truncate) {
StringUtils.leftPad(cell, colWidths(i))
} else {
StringUtils.rightPad(cell, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")

sb.append(sep)

// data
rows.tail.map {
_.zipWithIndex.map { case (cell, i) =>
if (truncate) {
StringUtils.leftPad(cell.toString, colWidths(i))
} else {
StringUtils.rightPad(cell.toString, colWidths(i))
}
}.addString(sb, "|", "|", "|\n")
}

sb.append(sep)

// For Data that has more than "numRows" records
if (hasMoreData) {
val rowsString = if (numRows == 1) "row" else "rows"
sb.append(s"only showing top $numRows $rowsString\n")
}

sb.toString()
}

override def toString: String = {
try {
val builder = new StringBuilder
val fields = schema.take(2).map {
case f => s"${f.name}: ${f.dataType.simpleString(2)}"
}
builder.append("[")
builder.append(fields.mkString(", "))
if (schema.length > 2) {
if (schema.length - fields.size == 1) {
builder.append(" ... 1 more field")
} else {
builder.append(" ... " + (schema.length - 2) + " more fields")
}
}
builder.append("]").toString()
} catch {
case NonFatal(e) =>
s"Invalid tree; ${e.getMessage}:\n$queryExecution"
}
}

/**
Expand Down Expand Up @@ -325,7 +395,7 @@ class Dataset[T] private[sql](
* @since 1.6.0
*/
// scalastyle:off println
override def printSchema(): Unit = println(schema.treeString)
def printSchema(): Unit = println(schema.treeString)
// scalastyle:on println

/**
Expand All @@ -334,7 +404,7 @@ class Dataset[T] private[sql](
* @group basic
* @since 1.6.0
*/
override def explain(extended: Boolean): Unit = {
def explain(extended: Boolean): Unit = {
val explain = ExplainCommand(queryExecution.logical, extended = extended)
sqlContext.executePlan(explain).executedPlan.executeCollect().foreach {
// scalastyle:off println
Expand All @@ -349,7 +419,7 @@ class Dataset[T] private[sql](
* @group basic
* @since 1.6.0
*/
override def explain(): Unit = explain(extended = false)
def explain(): Unit = explain(extended = false)

/**
* Returns all column names and their data types as an array.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,6 @@ class KeyValueGroupedDataset[K, V] private[sql](
private def logicalPlan = queryExecution.analyzed
private def sqlContext = queryExecution.sqlContext

private def groupedData = {
new RelationalGroupedDataset(
Dataset.ofRows(sqlContext, logicalPlan),
groupingAttributes,
RelationalGroupedDataset.GroupByType)
}

/**
* Returns a new [[KeyValueGroupedDataset]] where the type of the key has been mapped to the
* specified type. The mapping of key columns to the type follows the same rules as `as` on
Expand Down Expand Up @@ -207,12 +200,6 @@ class KeyValueGroupedDataset[K, V] private[sql](
reduceGroups(f.call _)
}

private def withEncoder(c: Column): Column = c match {
case tc: TypedColumn[_, _] =>
tc.withInputType(resolvedVEncoder.bind(dataAttributes), dataAttributes)
case _ => c
}

/**
* Internal helper function for building typed aggregations that return tuples. For simplicity
* and code reuse, we do this without the help of the type system and then use helper functions
Expand Down
4 changes: 2 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -272,11 +272,11 @@ class SQLContext private[sql](
}

/**
* Returns true if the [[Queryable]] is currently cached in-memory.
* Returns true if the [[Dataset]] is currently cached in-memory.
* @group cachemgmt
* @since 1.3.0
*/
private[sql] def isCached(qName: Queryable): Boolean = {
private[sql] def isCached(qName: Dataset[_]): Boolean = {
cacheManager.lookupCachedData(qName).nonEmpty
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.columnar.InMemoryRelation
import org.apache.spark.sql.Dataset
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK

Expand Down Expand Up @@ -74,12 +75,12 @@ private[sql] class CacheManager extends Logging {
}

/**
* Caches the data produced by the logical representation of the given [[Queryable]].
* Caches the data produced by the logical representation of the given [[Dataset]].
* Unlike `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because
* recomputing the in-memory columnar representation of the underlying table is expensive.
*/
private[sql] def cacheQuery(
query: Queryable,
query: Dataset[_],
tableName: Option[String] = None,
storageLevel: StorageLevel = MEMORY_AND_DISK): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
Expand All @@ -99,20 +100,20 @@ private[sql] class CacheManager extends Logging {
}
}

/** Removes the data for the given [[Queryable]] from the cache */
private[sql] def uncacheQuery(query: Queryable, blocking: Boolean = true): Unit = writeLock {
/** Removes the data for the given [[Dataset]] from the cache */
private[sql] def uncacheQuery(query: Dataset[_], blocking: Boolean = true): Unit = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
require(dataIndex >= 0, s"Table $query is not cached.")
cachedData(dataIndex).cachedRepresentation.uncache(blocking)
cachedData.remove(dataIndex)
}

/** Tries to remove the data for the given [[Queryable]] from the cache
/** Tries to remove the data for the given [[Dataset]] from the cache
* if it's cached
*/
private[sql] def tryUncacheQuery(
query: Queryable,
query: Dataset[_],
blocking: Boolean = true): Boolean = writeLock {
val planToCache = query.queryExecution.analyzed
val dataIndex = cachedData.indexWhere(cd => planToCache.sameResult(cd.plan))
Expand All @@ -124,8 +125,8 @@ private[sql] class CacheManager extends Logging {
found
}

/** Optionally returns cached data for the given [[Queryable]] */
private[sql] def lookupCachedData(query: Queryable): Option[CachedData] = readLock {
/** Optionally returns cached data for the given [[Dataset]] */
private[sql] def lookupCachedData(query: Dataset[_]): Option[CachedData] = readLock {
lookupCachedData(query.queryExecution.analyzed)
}

Expand Down
124 changes: 0 additions & 124 deletions sql/core/src/main/scala/org/apache/spark/sql/execution/Queryable.scala

This file was deleted.

Loading

0 comments on commit 38326ca

Please sign in to comment.