Skip to content

Commit

Permalink
[SQL] Add toString to DataFrame/Column
Browse files Browse the repository at this point in the history
Author: Michael Armbrust <michael@databricks.com>

Closes #4436 from marmbrus/dfToString and squashes the following commits:

8a3c35f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into dfToString
b72a81b [Michael Armbrust] add toString
  • Loading branch information
marmbrus committed Feb 10, 2015
1 parent c49a404 commit de80b1b
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 8 deletions.
2 changes: 1 addition & 1 deletion python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ def selectExpr(self, *expr):
`select` that accepts SQL expressions.
>>> df.selectExpr("age * 2", "abs(age)").collect()
[Row(('age * 2)=4, Abs('age)=2), Row(('age * 2)=10, Abs('age)=5)]
[Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)]
"""
jexpr = ListConverter().convert(expr, self._sc._gateway._gateway_client)
jdf = self._jdf.selectExpr(self._sc._jvm.PythonUtils.toSeq(jexpr))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.errors.TreeNodeException
import org.apache.spark.sql.catalyst.trees
import org.apache.spark.sql.catalyst.trees.TreeNode
Expand Down Expand Up @@ -66,6 +67,17 @@ abstract class Expression extends TreeNode[Expression] {
*/
def childrenResolved = !children.exists(!_.resolved)

/**
* Returns a string representation of this expression that does not have developer centric
* debugging information like the expression id.
*/
def prettyString: String = {
transform {
case a: AttributeReference => PrettyAttribute(a.name)
case u: UnresolvedAttribute => PrettyAttribute(u.name)
}.toString
}

/**
* A set of helper functions that return the correct descendant of `scala.math.Numeric[T]` type
* and do any casting necessary of child evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,26 @@ case class AttributeReference(
override def toString: String = s"$name#${exprId.id}$typeSuffix"
}

/**
* A place holder used when printing expressions without debugging information such as the
* expression id or the unresolved indicator.
*/
case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[Expression] {
type EvaluatedType = Any

override def toString = name

override def withNullability(newNullability: Boolean): Attribute = ???
override def newInstance(): Attribute = ???
override def withQualifiers(newQualifiers: Seq[String]): Attribute = ???
override def withName(newName: String): Attribute = ???
override def qualifiers: Seq[String] = ???
override def exprId: ExprId = ???
override def eval(input: Row): EvaluatedType = ???
override def nullable: Boolean = ???
override def dataType: DataType = ???
}

object VirtualColumn {
val groupingIdName = "grouping__id"
def newGroupingId = AttributeReference(groupingIdName, IntegerType, false)()
Expand Down
8 changes: 8 additions & 0 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

import scala.util.control.NonFatal


private[sql] object DataFrame {
def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
Expand Down Expand Up @@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] {
*/
def toDataFrame: DataFrame = this

override def toString =
try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch {
case NonFatal(e) =>
s"Invalid tree; ${e.getMessage}:\n$queryExecution"
}

/**
* Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion
* from a RDD of tuples into a [[DataFrame]] with meaningful names. For example:
Expand Down
10 changes: 4 additions & 6 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
Original file line number Diff line number Diff line change
Expand Up @@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql](
override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan)

override def select(cols: Column*): DataFrame = {
val exprs = cols.zipWithIndex.map {
case (Column(expr: NamedExpression), _) =>
expr
case (Column(expr: Expression), _) =>
Alias(expr, expr.toString)()
val namedExpressions = cols.map {
case Column(expr: NamedExpression) => expr
case Column(expr: Expression) => Alias(expr, expr.prettyString)()
}
Project(exprs.toSeq, logicalPlan)
Project(namedExpressions.toSeq, logicalPlan)
}

override def select(col: String, cols: String*): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
throw new UnsupportedOperationException("Cannot run this method on an UncomputableColumn")
}

override def toString = expr.prettyString

override def isComputable: Boolean = false

override val sqlContext: SQLContext = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet
import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.SparkContext._
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row}
import org.apache.spark.sql.catalyst.trees.TreeNodeRef
import org.apache.spark.sql.types._

Expand All @@ -37,6 +37,15 @@ import org.apache.spark.sql.types._
*/
package object debug {

/**
* Augments [[SQLContext]] with debug methods.
*/
implicit class DebugSQLContext(sqlContext: SQLContext) {
def debug() = {
sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false")
}
}

/**
* :: DeveloperApi ::
* Augments [[DataFrame]]s with debug methods.
Expand Down
29 changes: 29 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql

import org.apache.spark.sql.TestData._

import scala.language.postfixOps

import org.apache.spark.sql.Dsl._
Expand Down Expand Up @@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest {
TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
}

test("dataframe toString") {
assert(testData.toString === "[key: int, value: string]")
assert(testData("key").toString === "[key: int]")
}

test("incomputable toString") {
assert($"test".toString === "test")
}

test("invalid plan toString, debug mode") {
val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis
TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true")

// Turn on debug mode so we can see invalid query plans.
import org.apache.spark.sql.execution.debug._
TestSQLContext.debug()

val badPlan = testData.select('badColumn)

assert(badPlan.toString contains badPlan.queryExecution.toString,
"toString on bad query plans should include the query execution but was:\n" +
badPlan.toString)

// Set the flag back to original value before this test.
TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString)
}

test("table scan") {
checkAnswer(
testData,
Expand Down

0 comments on commit de80b1b

Please sign in to comment.