From de80b1ba4d3c4b1b3316d482d62e4668b996f6ac Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Tue, 10 Feb 2015 13:14:01 -0800 Subject: [PATCH] [SQL] Add toString to DataFrame/Column Author: Michael Armbrust Closes #4436 from marmbrus/dfToString and squashes the following commits: 8a3c35f [Michael Armbrust] Merge remote-tracking branch 'origin/master' into dfToString b72a81b [Michael Armbrust] add toString --- python/pyspark/sql/dataframe.py | 2 +- .../sql/catalyst/expressions/Expression.scala | 12 ++++++++ .../expressions/namedExpressions.scala | 20 +++++++++++++ .../org/apache/spark/sql/DataFrame.scala | 8 +++++ .../org/apache/spark/sql/DataFrameImpl.scala | 10 +++---- .../apache/spark/sql/IncomputableColumn.scala | 2 ++ .../spark/sql/execution/debug/package.scala | 11 ++++++- .../org/apache/spark/sql/DataFrameSuite.scala | 29 +++++++++++++++++++ 8 files changed, 86 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index cda704eea75f5..04be65fe241c4 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -447,7 +447,7 @@ def selectExpr(self, *expr): `select` that accepts SQL expressions. >>> df.selectExpr("age * 2", "abs(age)").collect() - [Row(('age * 2)=4, Abs('age)=2), Row(('age * 2)=10, Abs('age)=5)] + [Row((age * 2)=4, Abs(age)=2), Row((age * 2)=10, Abs(age)=5)] """ jexpr = ListConverter().convert(expr, self._sc._gateway._gateway_client) jdf = self._jdf.selectExpr(self._sc._jvm.PythonUtils.toSeq(jexpr)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index cf14992ef835c..c32a4b886eb82 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.errors.TreeNodeException import org.apache.spark.sql.catalyst.trees import org.apache.spark.sql.catalyst.trees.TreeNode @@ -66,6 +67,17 @@ abstract class Expression extends TreeNode[Expression] { */ def childrenResolved = !children.exists(!_.resolved) + /** + * Returns a string representation of this expression that does not have developer centric + * debugging information like the expression id. + */ + def prettyString: String = { + transform { + case a: AttributeReference => PrettyAttribute(a.name) + case u: UnresolvedAttribute => PrettyAttribute(u.name) + }.toString + } + /** * A set of helper functions that return the correct descendant of `scala.math.Numeric[T]` type * and do any casting necessary of child evaluation. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index e6ab1fd8d7939..7f122e9d55734 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -190,6 +190,26 @@ case class AttributeReference( override def toString: String = s"$name#${exprId.id}$typeSuffix" } +/** + * A place holder used when printing expressions without debugging information such as the + * expression id or the unresolved indicator. + */ +case class PrettyAttribute(name: String) extends Attribute with trees.LeafNode[Expression] { + type EvaluatedType = Any + + override def toString = name + + override def withNullability(newNullability: Boolean): Attribute = ??? + override def newInstance(): Attribute = ??? + override def withQualifiers(newQualifiers: Seq[String]): Attribute = ??? + override def withName(newName: String): Attribute = ??? + override def qualifiers: Seq[String] = ??? + override def exprId: ExprId = ??? + override def eval(input: Row): EvaluatedType = ??? + override def nullable: Boolean = ??? + override def dataType: DataType = ??? +} + object VirtualColumn { val groupingIdName = "grouping__id" def newGroupingId = AttributeReference(groupingIdName, IntegerType, false)() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6abfb7853cf1c..04e0d09947492 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -27,6 +27,8 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils +import scala.util.control.NonFatal + private[sql] object DataFrame { def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = { @@ -92,6 +94,12 @@ trait DataFrame extends RDDApi[Row] { */ def toDataFrame: DataFrame = this + override def toString = + try schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") catch { + case NonFatal(e) => + s"Invalid tree; ${e.getMessage}:\n$queryExecution" + } + /** * Returns a new [[DataFrame]] with columns renamed. This can be quite convenient in conversion * from a RDD of tuples into a [[DataFrame]] with meaningful names. For example: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala index 73393295ab0a5..1ee16ad5161c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala @@ -201,13 +201,11 @@ private[sql] class DataFrameImpl protected[sql]( override def as(alias: Symbol): DataFrame = Subquery(alias.name, logicalPlan) override def select(cols: Column*): DataFrame = { - val exprs = cols.zipWithIndex.map { - case (Column(expr: NamedExpression), _) => - expr - case (Column(expr: Expression), _) => - Alias(expr, expr.toString)() + val namedExpressions = cols.map { + case Column(expr: NamedExpression) => expr + case Column(expr: Expression) => Alias(expr, expr.prettyString)() } - Project(exprs.toSeq, logicalPlan) + Project(namedExpressions.toSeq, logicalPlan) } override def select(col: String, cols: String*): DataFrame = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala index 0600dcc226b4d..ce0557b88196f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala @@ -40,6 +40,8 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten throw new UnsupportedOperationException("Cannot run this method on an UncomputableColumn") } + override def toString = expr.prettyString + override def isComputable: Boolean = false override val sqlContext: SQLContext = null diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 5cc67cdd13944..acef49aabfe70 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -22,7 +22,7 @@ import scala.collection.mutable.HashSet import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.SparkContext._ -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{SQLConf, SQLContext, DataFrame, Row} import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.types._ @@ -37,6 +37,15 @@ import org.apache.spark.sql.types._ */ package object debug { + /** + * Augments [[SQLContext]] with debug methods. + */ + implicit class DebugSQLContext(sqlContext: SQLContext) { + def debug() = { + sqlContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "false") + } + } + /** * :: DeveloperApi :: * Augments [[DataFrame]]s with debug methods. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 5aa3db720c886..02623f73c7f76 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import org.apache.spark.sql.TestData._ + import scala.language.postfixOps import org.apache.spark.sql.Dsl._ @@ -53,6 +55,33 @@ class DataFrameSuite extends QueryTest { TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) } + test("dataframe toString") { + assert(testData.toString === "[key: int, value: string]") + assert(testData("key").toString === "[key: int]") + } + + test("incomputable toString") { + assert($"test".toString === "test") + } + + test("invalid plan toString, debug mode") { + val oldSetting = TestSQLContext.conf.dataFrameEagerAnalysis + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, "true") + + // Turn on debug mode so we can see invalid query plans. + import org.apache.spark.sql.execution.debug._ + TestSQLContext.debug() + + val badPlan = testData.select('badColumn) + + assert(badPlan.toString contains badPlan.queryExecution.toString, + "toString on bad query plans should include the query execution but was:\n" + + badPlan.toString) + + // Set the flag back to original value before this test. + TestSQLContext.setConf(SQLConf.DATAFRAME_EAGER_ANALYSIS, oldSetting.toString) + } + test("table scan") { checkAnswer( testData,