Skip to content

Commit

Permalink
Add explain support for DF
Browse files Browse the repository at this point in the history
  • Loading branch information
chenghao-intel committed Feb 11, 2015
1 parent 6195e24 commit 552aa58
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 10 deletions.
8 changes: 8 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/Column.scala
Original file line number Diff line number Diff line change
Expand Up @@ -600,6 +600,14 @@ trait Column extends DataFrame {
def desc: Column = exprToColumn(SortOrder(expr, Descending), computable = false)

def asc: Column = exprToColumn(SortOrder(expr, Ascending), computable = false)

override def explain(extended: Boolean): Unit = {
if (extended) {
println(expr)
} else {
println(expr.prettyString)
}
}
}


Expand Down
6 changes: 6 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,12 @@ trait DataFrame extends RDDApi[Row] {
/** Prints the schema to the console in a nice tree format. */
def printSchema(): Unit

/** Prints the plans (logical and physical) to the console for debugging purpose. */
def explain(extended: Boolean): Unit

/** Only prints the physical plan to the console for debugging purpose. */
def explain(): Unit = explain(false)

/**
* Returns true if the `collect` and `take` methods can be run locally
* (without any Spark executors).
Expand Down
13 changes: 10 additions & 3 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,11 @@ import org.apache.spark.api.python.SerDeUtil
import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel
import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection}
import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, ResolvedStar, UnresolvedRelation}
import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.sideBySide
import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
import org.apache.spark.sql.execution.{ExplainCommand, LogicalRDD, EvaluatePython}
import org.apache.spark.sql.json.JsonRDD
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types.{NumericType, StructType}
Expand Down Expand Up @@ -113,6 +112,14 @@ private[sql] class DataFrameImpl protected[sql](

override def printSchema(): Unit = println(schema.treeString)

override def explain(extended: Boolean): Unit = {
ExplainCommand(
logicalPlan,
extended = extended).queryExecution.executedPlan.executeCollect().map {
r => println(r.getString(0))
}
}

override def isLocal: Boolean = {
logicalPlan.isInstanceOf[LocalRelation]
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ package org.apache.spark.sql.execution
import org.apache.spark.Logging
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.{DataFrame, SQLConf, SQLContext}
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.expressions.{Row, Attribute}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row, Attribute}
import org.apache.spark.sql.catalyst.plans.logical
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -116,7 +117,9 @@ case class SetCommand(
@DeveloperApi
case class ExplainCommand(
logicalPlan: LogicalPlan,
override val output: Seq[Attribute], extended: Boolean = false) extends RunnableCommand {
override val output: Seq[Attribute] =
Seq(AttributeReference("plan", StringType, nullable = false)()),
extended: Boolean = false) extends RunnableCommand {

// Run through the optimizer to generate the physical plan.
override def run(sqlContext: SQLContext) = try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,23 +466,21 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C
// Just fake explain for any of the native commands.
case Token("TOK_EXPLAIN", explainArgs)
if noExplainCommands.contains(explainArgs.head.getText) =>
ExplainCommand(NoRelation, Seq(AttributeReference("plan", StringType, nullable = false)()))
ExplainCommand(NoRelation)
case Token("TOK_EXPLAIN", explainArgs)
if "TOK_CREATETABLE" == explainArgs.head.getText =>
val Some(crtTbl) :: _ :: extended :: Nil =
getClauses(Seq("TOK_CREATETABLE", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(
nodeToPlan(crtTbl),
Seq(AttributeReference("plan", StringType,nullable = false)()),
extended != None)
extended = extended.isDefined)
case Token("TOK_EXPLAIN", explainArgs) =>
// Ignore FORMATTED if present.
val Some(query) :: _ :: extended :: Nil =
getClauses(Seq("TOK_QUERY", "FORMATTED", "EXTENDED"), explainArgs)
ExplainCommand(
nodeToPlan(query),
Seq(AttributeReference("plan", StringType, nullable = false)()),
extended != None)
extended = extended.isDefined)

case Token("TOK_DESCTABLE", describeArgs) =>
// Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
Expand Down

0 comments on commit 552aa58

Please sign in to comment.