From a1c1dd3484a4dcd7c38fe256e69dbaaaf10d1a92 Mon Sep 17 00:00:00 2001 From: Maxim Gekk Date: Thu, 27 Dec 2018 11:13:16 +0100 Subject: [PATCH 1/4] [SPARK-26191][SQL] Control truncation of Spark plans via maxFields parameter ## What changes were proposed in this pull request? In the PR, I propose to add `maxFields` parameter to all functions involved in creation of textual representation of spark plans such as `simpleString` and `verboseString`. New parameter restricts number of fields converted to truncated strings. Any elements beyond the limit will be dropped and replaced by a `"... N more fields"` placeholder. The threshold is bumped up to `Int.MaxValue` for `toFile()`. ## How was this patch tested? Added a test to `QueryExecutionSuite` which checks `maxFields` impacts on number of truncated fields in `LocalRelation`. Closes #23159 from MaxGekk/to-file-max-fields. Lead-authored-by: Maxim Gekk Co-authored-by: Maxim Gekk Signed-off-by: Herman van Hovell --- .../sql/catalyst/analysis/Analyzer.scala | 4 +- .../sql/catalyst/analysis/CheckAnalysis.scala | 8 ++- .../sql/catalyst/analysis/TypeCoercion.scala | 4 +- .../catalyst/encoders/ExpressionEncoder.scala | 8 ++- .../sql/catalyst/expressions/Expression.scala | 6 +- .../expressions/codegen/javaCode.scala | 2 +- .../sql/catalyst/expressions/generators.scala | 3 +- .../expressions/higherOrderFunctions.scala | 4 +- .../spark/sql/catalyst/expressions/misc.scala | 5 +- .../expressions/namedExpressions.scala | 4 +- .../spark/sql/catalyst/plans/QueryPlan.scala | 4 +- .../catalyst/plans/logical/LogicalPlan.scala | 4 +- .../plans/logical/basicLogicalOperators.scala | 8 +-- .../spark/sql/catalyst/trees/TreeNode.scala | 56 +++++++++++-------- .../spark/sql/catalyst/util/package.scala | 10 ++-- .../apache/spark/sql/types/StructType.scala | 6 +- .../aggregate/PercentileSuite.scala | 2 +- .../org/apache/spark/sql/util/UtilSuite.scala | 2 +- .../sql/execution/DataSourceScanExec.scala | 12 ++-- .../spark/sql/execution/ExistingRDD.scala | 6 +- .../spark/sql/execution/QueryExecution.scala | 21 ++++--- .../spark/sql/execution/SparkPlanInfo.scala | 6 +- .../sql/execution/WholeStageCodegenExec.scala | 28 ++++++++-- .../aggregate/HashAggregateExec.scala | 12 ++-- .../aggregate/ObjectHashAggregateExec.scala | 12 ++-- .../aggregate/SortAggregateExec.scala | 12 ++-- .../execution/basicPhysicalOperators.scala | 4 +- .../execution/columnar/InMemoryRelation.scala | 4 +- .../datasources/LogicalRelation.scala | 4 +- .../SaveIntoDataSourceCommand.scala | 2 +- .../spark/sql/execution/datasources/ddl.scala | 2 +- .../datasources/v2/DataSourceV2Relation.scala | 8 ++- .../datasources/v2/DataSourceV2ScanExec.scala | 4 +- .../v2/DataSourceV2StreamingScanExec.scala | 2 +- .../v2/DataSourceV2StringFormat.scala | 6 +- .../spark/sql/execution/debug/package.scala | 3 +- .../apache/spark/sql/execution/limit.scala | 6 +- .../streaming/MicroBatchExecution.scala | 6 +- .../continuous/ContinuousExecution.scala | 7 ++- .../sql/execution/streaming/memory.scala | 5 +- .../apache/spark/sql/execution/subquery.scala | 2 +- .../sql/execution/QueryExecutionSuite.scala | 13 +++++ .../CreateHiveTableAsSelectCommand.scala | 2 +- 43 files changed, 203 insertions(+), 126 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 777053168a056..198645d875c47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -979,7 +979,7 @@ class Analyzer( a.mapExpressions(resolveExpressionTopDown(_, appendColumns)) case q: LogicalPlan => - logTrace(s"Attempting to resolve ${q.simpleString}") + logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}") q.mapExpressions(resolveExpressionTopDown(_, q)) } @@ -1777,7 +1777,7 @@ class Analyzer( case p if p.expressions.exists(hasGenerator) => throw new AnalysisException("Generators are not supported outside the SELECT clause, but " + - "got: " + p.simpleString) + "got: " + p.simpleString(SQLConf.get.maxToStringFields)) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 88d41e8824405..c28a97839fe49 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -303,7 +304,7 @@ trait CheckAnalysis extends PredicateHelper { val missingAttributes = o.missingInput.mkString(",") val input = o.inputSet.mkString(",") val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " + - s"from $input in operator ${operator.simpleString}." + s"from $input in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}." val resolver = plan.conf.resolver val attrsWithSameName = o.missingInput.filter { missing => @@ -368,7 +369,7 @@ trait CheckAnalysis extends PredicateHelper { s"""nondeterministic expressions are only allowed in |Project, Filter, Aggregate or Window, found: | ${o.expressions.map(_.sql).mkString(",")} - |in operator ${operator.simpleString} + |in operator ${operator.simpleString(SQLConf.get.maxToStringFields)} """.stripMargin) case _: UnresolvedHint => @@ -380,7 +381,8 @@ trait CheckAnalysis extends PredicateHelper { } extendedCheckRules.foreach(_(plan)) plan.foreachUp { - case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}") + case o if !o.resolved => + failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}") case _ => } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala index 1706b3eece6d7..b19aa50ba2156 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala @@ -1069,8 +1069,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging { // Leave the same if the dataTypes match. case Some(newType) if a.dataType == newType.dataType => a case Some(newType) => - logDebug( - s"Promoting $a from ${a.dataType} to ${newType.dataType} in ${q.simpleString}") + logDebug(s"Promoting $a from ${a.dataType} to ${newType.dataType} in " + + s" ${q.simpleString(SQLConf.get.maxToStringFields)}") newType } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala index fbf0bd68b9584..da5c1fd0feb01 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, InitializeJavaBean, Invoke, NewInstance} import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LocalRelation} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType} import org.apache.spark.unsafe.types.UTF8String import org.apache.spark.util.Utils @@ -323,8 +324,8 @@ case class ExpressionEncoder[T]( extractProjection(inputRow) } catch { case e: Exception => - throw new RuntimeException( - s"Error while encoding: $e\n${serializer.map(_.simpleString).mkString("\n")}", e) + throw new RuntimeException(s"Error while encoding: $e\n" + + s"${serializer.map(_.simpleString(SQLConf.get.maxToStringFields)).mkString("\n")}", e) } /** @@ -336,7 +337,8 @@ case class ExpressionEncoder[T]( constructProjection(row).get(0, ObjectType(clsTag.runtimeClass)).asInstanceOf[T] } catch { case e: Exception => - throw new RuntimeException(s"Error while decoding: $e\n${deserializer.simpleString}", e) + throw new RuntimeException(s"Error while decoding: $e\n" + + s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index c89c2272be752..d5d119543da77 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -259,12 +259,12 @@ abstract class Expression extends TreeNode[Expression] { // Marks this as final, Expression.verboseString should never be called, and thus shouldn't be // overridden by concrete classes. - final override def verboseString: String = simpleString + final override def verboseString(maxFields: Int): String = simpleString(maxFields) - override def simpleString: String = toString + override def simpleString(maxFields: Int): String = toString override def toString: String = prettyName + truncatedString( - flatArguments.toSeq, "(", ", ", ")") + flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields) /** * Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]], diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala index 17d4a0dc4e884..17fff64a1b7df 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala @@ -197,7 +197,7 @@ trait Block extends TreeNode[Block] with JavaCode { case _ => code"$this\n$other" } - override def verboseString: String = toString + override def verboseString(maxFields: Int): String = toString } object Block { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala index 9c74fdf6c9a14..6b6da1c8b4142 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util.{ArrayData, MapData} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ /** @@ -101,7 +102,7 @@ case class UserDefinedGenerator( inputRow = new InterpretedProjection(children) convertToScala = { val inputSchema = StructType(children.map { e => - StructField(e.simpleString, e.dataType, nullable = true) + StructField(e.simpleString(SQLConf.get.maxToStringFields), e.dataType, nullable = true) }) CatalystTypeConverters.createToScalaConverter(inputSchema) }.asInstanceOf[InternalRow => Row] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index 7141b6e996389..e6cc11d1ad280 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -76,7 +76,9 @@ case class NamedLambdaVariable( override def toString: String = s"lambda $name#${exprId.id}$typeSuffix" - override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}" + override def simpleString(maxFields: Int): String = { + s"lambda $name#${exprId.id}: ${dataType.simpleString(maxFields)}" + } } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala index 0cdeda9b10516..1f1decc45a3f6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/misc.scala @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -40,7 +41,7 @@ case class PrintToStderr(child: Expression) extends UnaryExpression { input } - private val outputPrefix = s"Result of ${child.simpleString} is " + private val outputPrefix = s"Result of ${child.simpleString(SQLConf.get.maxToStringFields)} is " override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix) @@ -72,7 +73,7 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa override def prettyName: String = "assert_true" - private val errMsg = s"'${child.simpleString}' is not true!" + private val errMsg = s"'${child.simpleString(SQLConf.get.maxToStringFields)}' is not true!" override def eval(input: InternalRow) : Any = { val v = child.eval(input) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 02b48f9e30f2d..131459bf27bc8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -330,7 +330,9 @@ case class AttributeReference( // Since the expression id is not in the first constructor it is missing from the default // tree string. - override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}" + override def simpleString(maxFields: Int): String = { + s"$name#${exprId.id}: ${dataType.simpleString(maxFields)}" + } override def sql: String = { val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else "" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index ca0cea6ba7de3..125181fb213f8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -172,9 +172,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT */ protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else "" - override def simpleString: String = statePrefix + super.simpleString + override def simpleString(maxFields: Int): String = statePrefix + super.simpleString(maxFields) - override def verboseString: String = simpleString + override def verboseString(maxFields: Int): String = simpleString(maxFields) /** * All the subqueries of current plan. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala index 3ad2ee6923615..51e0f4b4c84dc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala @@ -36,8 +36,8 @@ abstract class LogicalPlan /** Returns true if this subtree has data from a streaming data source. */ def isStreaming: Boolean = children.exists(_.isStreaming == true) - override def verboseStringWithSuffix: String = { - super.verboseString + statsCache.map(", " + _.toString).getOrElse("") + override def verboseStringWithSuffix(maxFields: Int): String = { + super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("") } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index a26ec4eed8648..d8b3a4af4f7bf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -468,7 +468,7 @@ case class View( override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance())) - override def simpleString: String = { + override def simpleString(maxFields: Int): String = { s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})" } } @@ -484,8 +484,8 @@ case class View( case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode { override def output: Seq[Attribute] = child.output - override def simpleString: String = { - val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]") + override def simpleString(maxFields: Int): String = { + val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields) s"CTE $cteAliases" } @@ -557,7 +557,7 @@ case class Range( override def newInstance(): Range = copy(output = output.map(_.newInstance())) - override def simpleString: String = { + override def simpleString(maxFields: Int): String = { s"Range ($start, $end, step=$step, splits=$numSlices)" } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 2e9f9f53e94ac..21e59bbd283e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.JoinType import org.apache.spark.sql.catalyst.plans.physical.{BroadcastMode, Partitioning} import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel @@ -433,17 +434,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { private lazy val allChildren: Set[TreeNode[_]] = (children ++ innerChildren).toSet[TreeNode[_]] /** Returns a string representing the arguments to this node, minus any children */ - def argString: String = stringArgs.flatMap { + def argString(maxFields: Int): String = stringArgs.flatMap { case tn: TreeNode[_] if allChildren.contains(tn) => Nil case Some(tn: TreeNode[_]) if allChildren.contains(tn) => Nil - case Some(tn: TreeNode[_]) => tn.simpleString :: Nil - case tn: TreeNode[_] => tn.simpleString :: Nil + case Some(tn: TreeNode[_]) => tn.simpleString(maxFields) :: Nil + case tn: TreeNode[_] => tn.simpleString(maxFields) :: Nil case seq: Seq[Any] if seq.toSet.subsetOf(allChildren.asInstanceOf[Set[Any]]) => Nil case iter: Iterable[_] if iter.isEmpty => Nil - case seq: Seq[_] => truncatedString(seq, "[", ", ", "]") :: Nil - case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}") :: Nil + case seq: Seq[_] => truncatedString(seq, "[", ", ", "]", maxFields) :: Nil + case set: Set[_] => truncatedString(set.toSeq, "{", ", ", "}", maxFields) :: Nil case array: Array[_] if array.isEmpty => Nil - case array: Array[_] => truncatedString(array, "[", ", ", "]") :: Nil + case array: Array[_] => truncatedString(array, "[", ", ", "]", maxFields) :: Nil case null => Nil case None => Nil case Some(null) => Nil @@ -456,24 +457,33 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { case other => other :: Nil }.mkString(", ") - /** ONE line description of this node. */ - def simpleString: String = s"$nodeName $argString".trim + /** + * ONE line description of this node. + * @param maxFields Maximum number of fields that will be converted to strings. + * Any elements beyond the limit will be dropped. + */ + def simpleString(maxFields: Int): String = { + s"$nodeName ${argString(maxFields)}".trim + } /** ONE line description of this node with more information */ - def verboseString: String + def verboseString(maxFields: Int): String /** ONE line description of this node with some suffix information */ - def verboseStringWithSuffix: String = verboseString + def verboseStringWithSuffix(maxFields: Int): String = verboseString(maxFields) override def toString: String = treeString /** Returns a string representation of the nodes in this tree */ def treeString: String = treeString(verbose = true) - def treeString(verbose: Boolean, addSuffix: Boolean = false): String = { + def treeString( + verbose: Boolean, + addSuffix: Boolean = false, + maxFields: Int = SQLConf.get.maxToStringFields): String = { val writer = new StringBuilderWriter() try { - treeString(writer, verbose, addSuffix) + treeString(writer, verbose, addSuffix, maxFields) writer.toString } finally { writer.close() @@ -483,8 +493,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { def treeString( writer: Writer, verbose: Boolean, - addSuffix: Boolean): Unit = { - generateTreeString(0, Nil, writer, verbose, "", addSuffix) + addSuffix: Boolean, + maxFields: Int): Unit = { + generateTreeString(0, Nil, writer, verbose, "", addSuffix, maxFields) } /** @@ -550,7 +561,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): Unit = { + addSuffix: Boolean = false, + maxFields: Int): Unit = { if (depth > 0) { lastChildren.init.foreach { isLast => @@ -560,9 +572,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } val str = if (verbose) { - if (addSuffix) verboseStringWithSuffix else verboseString + if (addSuffix) verboseStringWithSuffix(maxFields) else verboseString(maxFields) } else { - simpleString + simpleString(maxFields) } writer.write(prefix) writer.write(str) @@ -571,17 +583,17 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { if (innerChildren.nonEmpty) { innerChildren.init.foreach(_.generateTreeString( depth + 2, lastChildren :+ children.isEmpty :+ false, writer, verbose, - addSuffix = addSuffix)) + addSuffix = addSuffix, maxFields = maxFields)) innerChildren.last.generateTreeString( depth + 2, lastChildren :+ children.isEmpty :+ true, writer, verbose, - addSuffix = addSuffix) + addSuffix = addSuffix, maxFields = maxFields) } if (children.nonEmpty) { children.init.foreach(_.generateTreeString( - depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix)) + depth + 1, lastChildren :+ false, writer, verbose, prefix, addSuffix, maxFields)) children.last.generateTreeString( - depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix) + depth + 1, lastChildren :+ true, writer, verbose, prefix, addSuffix, maxFields) } } @@ -664,7 +676,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { t.forall(_.isInstanceOf[Partitioning]) || t.forall(_.isInstanceOf[DataType]) => JArray(t.map(parseToJson).toList) case t: Seq[_] if t.length > 0 && t.head.isInstanceOf[String] => - JString(truncatedString(t, "[", ", ", "]")) + JString(truncatedString(t, "[", ", ", "]", SQLConf.get.maxToStringFields)) case t: Seq[_] => JNull case m: Map[_, _] => JNull // if it's a scala object, we can simply keep the full class path. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala index 277584b20dcd2..7f5860e12cfd5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/package.scala @@ -184,14 +184,14 @@ package object util extends Logging { start: String, sep: String, end: String, - maxNumFields: Int = SQLConf.get.maxToStringFields): String = { - if (seq.length > maxNumFields) { + maxFields: Int): String = { + if (seq.length > maxFields) { if (truncationWarningPrinted.compareAndSet(false, true)) { logWarning( "Truncated the string representation of a plan since it was too large. This " + s"behavior can be adjusted by setting '${SQLConf.MAX_TO_STRING_FIELDS.key}'.") } - val numFields = math.max(0, maxNumFields - 1) + val numFields = math.max(0, maxFields - 1) seq.take(numFields).mkString( start, sep, sep + "... " + (seq.length - numFields) + " more fields" + end) } else { @@ -200,7 +200,9 @@ package object util extends Logging { } /** Shorthand for calling truncatedString() without start or end strings. */ - def truncatedString[T](seq: Seq[T], sep: String): String = truncatedString(seq, "", sep, "") + def truncatedString[T](seq: Seq[T], sep: String, maxFields: Int): String = { + truncatedString(seq, "", sep, "", maxFields) + } /* FIX ME implicit class debugLogging(a: Any) { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala index e01d7c59cac52..d563276a5711d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala @@ -28,6 +28,7 @@ import org.apache.spark.annotation.Stable import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, InterpretedOrdering} import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, LegacyTypeStringParser} import org.apache.spark.sql.catalyst.util.{quoteIdentifier, truncatedString} +import org.apache.spark.sql.internal.SQLConf /** * A [[StructType]] object can be constructed by @@ -343,7 +344,10 @@ case class StructType(fields: Array[StructField]) extends DataType with Seq[Stru override def simpleString: String = { val fieldTypes = fields.view.map(field => s"${field.name}:${field.dataType.simpleString}") - truncatedString(fieldTypes, "struct<", ",", ">") + truncatedString( + fieldTypes, + "struct<", ",", ">", + SQLConf.get.maxToStringFields) } override def catalogString: String = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala index 63c7b42978025..0e0c8e167a0a7 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/aggregate/PercentileSuite.scala @@ -215,7 +215,7 @@ class PercentileSuite extends SparkFunSuite { val percentile2 = new Percentile(child, percentage) assertEqual(percentile2.checkInputDataTypes(), TypeCheckFailure(s"Percentage(s) must be between 0.0 and 1.0, " + - s"but got ${percentage.simpleString}")) + s"but got ${percentage.simpleString(100)}")) } val nonFoldablePercentage = Seq(NonFoldableLiteral(0.5), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala index 9c162026942f6..d95de71e897a2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/util/UtilSuite.scala @@ -26,6 +26,6 @@ class UtilSuite extends SparkFunSuite { assert(truncatedString(Seq(1, 2), "[", ", ", "]", 2) == "[1, 2]") assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", 2) == "[1, ... 2 more fields]") assert(truncatedString(Seq(1, 2, 3), "[", ", ", "]", -5) == "[, ... 3 more fields]") - assert(truncatedString(Seq(1, 2, 3), ", ") == "1, 2, 3") + assert(truncatedString(Seq(1, 2, 3), ", ", 10) == "1, 2, 3") } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 322ffffca564b..1d7dd73706c48 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -52,19 +52,19 @@ trait DataSourceScanExec extends LeafExecNode with CodegenSupport { // Metadata that describes more details of this scan. protected def metadata: Map[String, String] - override def simpleString: String = { + override def simpleString(maxFields: Int): String = { val metadataEntries = metadata.toSeq.sorted.map { case (key, value) => key + ": " + StringUtils.abbreviate(redact(value), 100) } - val metadataStr = truncatedString(metadataEntries, " ", ", ", "") - s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]")}$metadataStr" + val metadataStr = truncatedString(metadataEntries, " ", ", ", "", maxFields) + s"$nodeNamePrefix$nodeName${truncatedString(output, "[", ",", "]", maxFields)}$metadataStr" } - override def verboseString: String = redact(super.verboseString) + override def verboseString(maxFields: Int): String = redact(super.verboseString(maxFields)) - override def treeString(verbose: Boolean, addSuffix: Boolean): String = { - redact(super.treeString(verbose, addSuffix)) + override def treeString(verbose: Boolean, addSuffix: Boolean, maxFields: Int): String = { + redact(super.treeString(verbose, addSuffix, maxFields)) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala index 49fb288fdea6a..981ecae80a724 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala @@ -79,7 +79,7 @@ case class ExternalRDDScanExec[T]( } } - override def simpleString: String = { + override def simpleString(maxFields: Int): String = { s"$nodeName${output.mkString("[", ",", "]")}" } } @@ -156,8 +156,8 @@ case class RDDScanExec( } } - override def simpleString: String = { - s"$nodeName${truncatedString(output, "[", ",", "]")}" + override def simpleString(maxFields: Int): String = { + s"$nodeName${truncatedString(output, "[", ",", "]", maxFields)}" } // Input can be InternalRow, has to be turned into UnsafeRows. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index eef5a3f899f55..9b8d2e830867d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.command.{DescribeTableCommand, ExecutedCommandExec, ShowTablesCommand} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReuseExchange} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{BinaryType, DateType, DecimalType, TimestampType, _} import org.apache.spark.util.Utils @@ -208,27 +209,27 @@ class QueryExecution( } } - private def writePlans(writer: Writer): Unit = { + private def writePlans(writer: Writer, maxFields: Int): Unit = { val (verbose, addSuffix) = (true, false) writer.write("== Parsed Logical Plan ==\n") - writeOrError(writer)(logical.treeString(_, verbose, addSuffix)) + writeOrError(writer)(logical.treeString(_, verbose, addSuffix, maxFields)) writer.write("\n== Analyzed Logical Plan ==\n") val analyzedOutput = stringOrError(truncatedString( - analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ")) + analyzed.output.map(o => s"${o.name}: ${o.dataType.simpleString}"), ", ", maxFields)) writer.write(analyzedOutput) writer.write("\n") - writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix)) + writeOrError(writer)(analyzed.treeString(_, verbose, addSuffix, maxFields)) writer.write("\n== Optimized Logical Plan ==\n") - writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix)) + writeOrError(writer)(optimizedPlan.treeString(_, verbose, addSuffix, maxFields)) writer.write("\n== Physical Plan ==\n") - writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix)) + writeOrError(writer)(executedPlan.treeString(_, verbose, addSuffix, maxFields)) } override def toString: String = withRedaction { val writer = new StringBuilderWriter() try { - writePlans(writer) + writePlans(writer, SQLConf.get.maxToStringFields) writer.toString } finally { writer.close() @@ -280,14 +281,16 @@ class QueryExecution( /** * Dumps debug information about query execution into the specified file. + * + * @param maxFields maximim number of fields converted to string representation. */ - def toFile(path: String): Unit = { + def toFile(path: String, maxFields: Int = Int.MaxValue): Unit = { val filePath = new Path(path) val fs = filePath.getFileSystem(sparkSession.sessionState.newHadoopConf()) val writer = new BufferedWriter(new OutputStreamWriter(fs.create(filePath))) try { - writePlans(writer) + writePlans(writer, maxFields) writer.write("\n== Whole Stage Codegen ==\n") org.apache.spark.sql.execution.debug.writeCodegen(writer, executedPlan) } finally { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala index 59ffd16381116..f554ff0aa775f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanInfo.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi import org.apache.spark.sql.execution.exchange.ReusedExchangeExec import org.apache.spark.sql.execution.metric.SQLMetricInfo +import org.apache.spark.sql.internal.SQLConf /** * :: DeveloperApi :: @@ -62,7 +63,10 @@ private[execution] object SparkPlanInfo { case fileScan: FileSourceScanExec => fileScan.metadata case _ => Map[String, String]() } - new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan), + new SparkPlanInfo( + plan.nodeName, + plan.simpleString(SQLConf.get.maxToStringFields), + children.map(fromSparkPlan), metadata, metrics) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala index fbda0d87a175f..f4927dedabe56 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala @@ -87,7 +87,7 @@ trait CodegenSupport extends SparkPlan { this.parent = parent ctx.freshNamePrefix = variablePrefix s""" - |${ctx.registerComment(s"PRODUCE: ${this.simpleString}")} + |${ctx.registerComment(s"PRODUCE: ${this.simpleString(SQLConf.get.maxToStringFields)}")} |${doProduce(ctx)} """.stripMargin } @@ -188,7 +188,7 @@ trait CodegenSupport extends SparkPlan { parent.doConsume(ctx, inputVars, rowVar) } s""" - |${ctx.registerComment(s"CONSUME: ${parent.simpleString}")} + |${ctx.registerComment(s"CONSUME: ${parent.simpleString(SQLConf.get.maxToStringFields)}")} |$evaluated |$consumeFunc """.stripMargin @@ -496,8 +496,16 @@ case class InputAdapter(child: SparkPlan) extends UnaryExecNode with InputRDDCod writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): Unit = { - child.generateTreeString(depth, lastChildren, writer, verbose, prefix = "", addSuffix = false) + addSuffix: Boolean = false, + maxFields: Int): Unit = { + child.generateTreeString( + depth, + lastChildren, + writer, + verbose, + prefix = "", + addSuffix = false, + maxFields) } override def needCopyResult: Boolean = false @@ -772,8 +780,16 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int) writer: Writer, verbose: Boolean, prefix: String = "", - addSuffix: Boolean = false): Unit = { - child.generateTreeString(depth, lastChildren, writer, verbose, s"*($codegenStageId) ", false) + addSuffix: Boolean = false, + maxFields: Int): Unit = { + child.generateTreeString( + depth, + lastChildren, + writer, + verbose, + s"*($codegenStageId) ", + false, + maxFields) } override def needStopCheck: Boolean = true diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 4827f838fc514..2355d305c38e5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -922,18 +922,18 @@ case class HashAggregateExec( """ } - override def verboseString: String = toString(verbose = true) + override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields) - override def simpleString: String = toString(verbose = false) + override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields) - private def toString(verbose: Boolean): String = { + private def toString(verbose: Boolean, maxFields: Int): String = { val allAggregateExpressions = aggregateExpressions testFallbackStartsAt match { case None => - val keyString = truncatedString(groupingExpressions, "[", ", ", "]") - val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]") - val outputString = truncatedString(output, "[", ", ", "]") + val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields) + val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) + val outputString = truncatedString(output, "[", ", ", "]", maxFields) if (verbose) { s"HashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 7145bb03028d9..bd52c6321647a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -137,15 +137,15 @@ case class ObjectHashAggregateExec( } } - override def verboseString: String = toString(verbose = true) + override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields) - override def simpleString: String = toString(verbose = false) + override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields) - private def toString(verbose: Boolean): String = { + private def toString(verbose: Boolean, maxFields: Int): String = { val allAggregateExpressions = aggregateExpressions - val keyString = truncatedString(groupingExpressions, "[", ", ", "]") - val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]") - val outputString = truncatedString(output, "[", ", ", "]") + val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields) + val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) + val outputString = truncatedString(output, "[", ", ", "]", maxFields) if (verbose) { s"ObjectHashAggregate(keys=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index d732b905dcdd5..7ab6ecc08a7bc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -107,16 +107,16 @@ case class SortAggregateExec( } } - override def simpleString: String = toString(verbose = false) + override def simpleString(maxFields: Int): String = toString(verbose = false, maxFields) - override def verboseString: String = toString(verbose = true) + override def verboseString(maxFields: Int): String = toString(verbose = true, maxFields) - private def toString(verbose: Boolean): String = { + private def toString(verbose: Boolean, maxFields: Int): String = { val allAggregateExpressions = aggregateExpressions - val keyString = truncatedString(groupingExpressions, "[", ", ", "]") - val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]") - val outputString = truncatedString(output, "[", ", ", "]") + val keyString = truncatedString(groupingExpressions, "[", ", ", "]", maxFields) + val functionString = truncatedString(allAggregateExpressions, "[", ", ", "]", maxFields) + val outputString = truncatedString(output, "[", ", ", "]", maxFields) if (verbose) { s"SortAggregate(key=$keyString, functions=$functionString, output=$outputString)" } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 09effe087e195..2570b36b3166d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -586,7 +586,9 @@ case class RangeExec(range: org.apache.spark.sql.catalyst.plans.logical.Range) } } - override def simpleString: String = s"Range ($start, $end, step=$step, splits=$numSlices)" + override def simpleString(maxFields: Int): String = { + s"Range ($start, $end, step=$step, splits=$numSlices)" + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 73eb65f84489c..4109d9994dd8f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -209,6 +209,6 @@ case class InMemoryRelation( override protected def otherCopyArgs: Seq[AnyRef] = Seq(statsOfPlanToCache) - override def simpleString: String = - s"InMemoryRelation [${truncatedString(output, ", ")}], ${cacheBuilder.storageLevel}" + override def simpleString(maxFields: Int): String = + s"InMemoryRelation [${truncatedString(output, ", ", maxFields)}], ${cacheBuilder.storageLevel}" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala index 1023572d19e2e..db3604fe92cc0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala @@ -63,7 +63,9 @@ case class LogicalRelation( case _ => // Do nothing. } - override def simpleString: String = s"Relation[${truncatedString(output, ",")}] $relation" + override def simpleString(maxFields: Int): String = { + s"Relation[${truncatedString(output, ",", maxFields)}] $relation" + } } object LogicalRelation { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala index 00b1b5dedb593..f29e7869fb27c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SaveIntoDataSourceCommand.scala @@ -48,7 +48,7 @@ case class SaveIntoDataSourceCommand( Seq.empty[Row] } - override def simpleString: String = { + override def simpleString(maxFields: Int): String = { val redacted = SQLConf.get.redactOptions(options) s"SaveIntoDataSourceCommand ${dataSource}, ${redacted}, ${mode}" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala index fdc5e85f3c2ea..042320edea4f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala @@ -68,7 +68,7 @@ case class CreateTempViewUsing( s"Temporary view '$tableIdent' should not have specified a database") } - override def argString: String = { + override def argString(maxFields: Int): String = { s"[tableIdent:$tableIdent " + userSpecifiedSchema.map(_ + " ").getOrElse("") + s"replace:$replace " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala index 0a6b0afe6cfe5..7bf2b8bff3732 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala @@ -52,8 +52,8 @@ case class DataSourceV2Relation( override def name: String = table.name() - override def simpleString: String = { - s"RelationV2${truncatedString(output, "[", ", ", "]")} $name" + override def simpleString(maxFields: Int): String = { + s"RelationV2${truncatedString(output, "[", ", ", "]", maxFields)} $name" } def newWriteSupport(): BatchWriteSupport = source.createWriteSupport(options, schema) @@ -96,7 +96,9 @@ case class StreamingDataSourceV2Relation( override def isStreaming: Boolean = true - override def simpleString: String = "Streaming RelationV2 " + metadataString + override def simpleString(maxFields: Int): String = { + "Streaming RelationV2 " + metadataString(maxFields) + } override def pushedFilters: Seq[Expression] = Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala index 725bcc3af3ca5..53e4e77c65e26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExec.scala @@ -35,8 +35,8 @@ case class DataSourceV2ScanExec( @transient batch: Batch) extends LeafExecNode with ColumnarBatchScan { - override def simpleString: String = { - s"ScanV2${truncatedString(output, "[", ", ", "]")} $scanDesc" + override def simpleString(maxFields: Int): String = { + s"ScanV2${truncatedString(output, "[", ", ", "]", maxFields)} $scanDesc" } // TODO: unify the equal/hashCode implementation for all data source v2 query plans. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala index c872940909964..be75fe4f596dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StreamingScanExec.scala @@ -42,7 +42,7 @@ case class DataSourceV2StreamingScanExec( @transient scanConfig: ScanConfig) extends LeafExecNode with DataSourceV2StringFormat with ColumnarBatchScan { - override def simpleString: String = "ScanV2 " + metadataString + override def simpleString(maxFields: Int): String = "ScanV2 " + metadataString(maxFields) // TODO: unify the equal/hashCode implementation for all data source v2 query plans. override def equals(other: Any): Boolean = other match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala index e829f621b4ea3..f11703c8a2773 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2StringFormat.scala @@ -59,7 +59,7 @@ trait DataSourceV2StringFormat { case _ => Utils.getSimpleName(source.getClass) } - def metadataString: String = { + def metadataString(maxFields: Int): String = { val entries = scala.collection.mutable.ArrayBuffer.empty[(String, String)] if (pushedFilters.nonEmpty) { @@ -73,12 +73,12 @@ trait DataSourceV2StringFormat { }.mkString("[", ",", "]") } - val outputStr = truncatedString(output, "[", ", ", "]") + val outputStr = truncatedString(output, "[", ", ", "]", maxFields) val entriesStr = if (entries.nonEmpty) { truncatedString(entries.map { case (key, value) => key + ": " + StringUtils.abbreviate(value, 100) - }, " (", ", ", ")") + }, " (", ", ", ")", maxFields) } else { "" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala index 3511cefa7c292..ae8197f617a28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala @@ -33,6 +33,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{CodeFormatter, Codegen import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.trees.TreeNodeRef import org.apache.spark.sql.execution.streaming.{StreamExecution, StreamingQueryWrapper} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.util.{AccumulatorV2, LongAccumulator} @@ -216,7 +217,7 @@ package object debug { val columnStats: Array[ColumnMetrics] = Array.fill(child.output.size)(new ColumnMetrics()) def dumpStats(): Unit = { - debugPrint(s"== ${child.simpleString} ==") + debugPrint(s"== ${child.simpleString(SQLConf.get.maxToStringFields)} ==") debugPrint(s"Tuples output: ${tupleCount.value}") child.output.zip(columnStats).foreach { case (attr, metric) => // This is called on driver. All accumulator updates have a fixed value. So it's safe to use diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index bfaf080292bce..56973af8fd648 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -198,9 +198,9 @@ case class TakeOrderedAndProjectExec( override def outputPartitioning: Partitioning = SinglePartition - override def simpleString: String = { - val orderByString = truncatedString(sortOrder, "[", ",", "]") - val outputString = truncatedString(output, "[", ",", "]") + override def simpleString(maxFields: Int): String = { + val orderByString = truncatedString(sortOrder, "[", ",", "]", maxFields) + val outputString = truncatedString(output, "[", ",", "]", maxFields) s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index 8ad436a4ff57d..38ecb0dd12daa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec} import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWritSupport, RateControlMicroBatchReadSupport} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} @@ -482,9 +483,10 @@ class MicroBatchExecution( val newBatchesPlan = logicalPlan transform { case StreamingExecutionRelation(source, output) => newData.get(source).map { dataPlan => + val maxFields = SQLConf.get.maxToStringFields assert(output.size == dataPlan.output.size, - s"Invalid batch: ${truncatedString(output, ",")} != " + - s"${truncatedString(dataPlan.output, ",")}") + s"Invalid batch: ${truncatedString(output, ",", maxFields)} != " + + s"${truncatedString(dataPlan.output, ",", maxFields)}") val aliases = output.zip(dataPlan.output).map { case (to, from) => Alias(from, to.name)(exprId = to.exprId, explicitMetadata = Some(from.metadata)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index f0859aaaa3041..89033b70f1431 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2StreamingScanExec, StreamingDataSourceV2Relation} import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _} +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2 import org.apache.spark.sql.sources.v2.{ContinuousReadSupportProvider, DataSourceOptions, StreamingWriteSupportProvider} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReadSupport, PartitionOffset} @@ -166,10 +167,10 @@ class ContinuousExecution( val readSupport = continuousSources(insertedSourceId) insertedSourceId += 1 val newOutput = readSupport.fullSchema().toAttributes - + val maxFields = SQLConf.get.maxToStringFields assert(output.size == newOutput.size, - s"Invalid reader: ${truncatedString(output, ",")} != " + - s"${truncatedString(newOutput, ",")}") + s"Invalid reader: ${truncatedString(output, ",", maxFields)} != " + + s"${truncatedString(newOutput, ",", maxFields)}") replacements ++= output.zip(newOutput) val loggedOffset = offsets.offsets(0) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index daee089f3871d..13b75ae4a4339 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._ import org.apache.spark.sql.catalyst.util.truncatedString +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReadSupport, Offset => OffsetV2} import org.apache.spark.sql.streaming.OutputMode @@ -117,7 +118,9 @@ case class MemoryStream[A : Encoder](id: Int, sqlContext: SQLContext) } } - override def toString: String = s"MemoryStream[${truncatedString(output, ",")}]" + override def toString: String = { + s"MemoryStream[${truncatedString(output, ",", SQLConf.get.maxToStringFields)}]" + } override def deserializeOffset(json: String): OffsetV2 = LongOffset(json.toLong) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 310ebcdf67686..e180d2228c3b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -51,7 +51,7 @@ case class ScalarSubquery( override def dataType: DataType = plan.schema.fields.head.dataType override def children: Seq[Expression] = Nil override def nullable: Boolean = true - override def toString: String = plan.simpleString + override def toString: String = plan.simpleString(SQLConf.get.maxToStringFields) override def withNewPlan(query: SubqueryExec): ScalarSubquery = copy(plan = query) override def semanticEquals(other: Expression): Boolean = other match { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala index 0c47a2040f171..3cc97c995702a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala @@ -106,6 +106,19 @@ class QueryExecutionSuite extends SharedSQLContext { } } + test("check maximum fields restriction") { + withTempDir { dir => + val path = dir.getCanonicalPath + "/plans.txt" + val ds = spark.createDataset(Seq(QueryExecutionTestRecord( + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, + 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26))) + ds.queryExecution.debug.toFile(path) + val localRelations = Source.fromFile(path).getLines().filter(_.contains("LocalRelation")) + + assert(!localRelations.exists(_.contains("more fields"))) + } + } + test("toString() exception/error handling") { spark.experimental.extraStrategies = Seq( new SparkStrategy { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala index 608f21e726259..7249eacfbf9a6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala @@ -83,7 +83,7 @@ trait CreateHiveTableAsSelectBase extends DataWritingCommand { tableDesc: CatalogTable, tableExists: Boolean): DataWritingCommand - override def argString: String = { + override def argString(maxFields: Int): String = { s"[Database:${tableDesc.database}, " + s"TableName: ${tableDesc.identifier.table}, " + s"InsertIntoHiveTable]" From add287f397d41c1725464dff89d4a555ffc9db04 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Thu, 27 Dec 2018 22:26:37 +0800 Subject: [PATCH 2/4] [SPARK-25892][SQL] Change AttributeReference.withMetadata's return type to AttributeReference ## What changes were proposed in this pull request? Currently the `AttributeReference.withMetadata` method have return type `Attribute`, the rest of with methods in the `AttributeReference` return type are `AttributeReference`, as the [spark-25892](https://issues.apache.org/jira/browse/SPARK-25892?jql=project%20%3D%20SPARK%20AND%20component%20in%20(ML%2C%20PySpark%2C%20SQL)) mentioned. This PR will change `AttributeReference.withMetadata` method's return type from `Attribute` to `AttributeReference`. ## How was this patch tested? Run all `sql/test,` `catalyst/test` and `org.apache.spark.sql.execution.streaming.*` Closes #22918 from kevinyu98/spark-25892. Authored-by: Kevin Yu Signed-off-by: Hyukjin Kwon --- .../spark/sql/catalyst/expressions/namedExpressions.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 131459bf27bc8..7ebb171f34ba2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -311,7 +311,7 @@ case class AttributeReference( } } - override def withMetadata(newMetadata: Metadata): Attribute = { + override def withMetadata(newMetadata: Metadata): AttributeReference = { AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier) } From 68496c1af310aadfb1b226cb05be510252769d43 Mon Sep 17 00:00:00 2001 From: deepyaman Date: Fri, 28 Dec 2018 00:02:41 +0800 Subject: [PATCH 3/4] [SPARK-26451][SQL] Change lead/lag argument name from count to offset ## What changes were proposed in this pull request? Change aligns argument name with that in Scala version and documentation. ## How was this patch tested? (Please explain how this patch was tested. E.g. unit tests, integration tests, manual tests) (If this patch involves UI changes, please attach a screenshot; otherwise, remove this) Please review http://spark.apache.org/contributing.html before opening a pull request. Closes #23357 from deepyaman/patch-1. Authored-by: deepyaman Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/functions.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d2a771e9bb8ea..3c33e2bed92d9 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -798,7 +798,7 @@ def factorial(col): # --------------- Window functions ------------------------ @since(1.4) -def lag(col, count=1, default=None): +def lag(col, offset=1, default=None): """ Window function: returns the value that is `offset` rows before the current row, and `defaultValue` if there is less than `offset` rows before the current row. For example, @@ -807,15 +807,15 @@ def lag(col, count=1, default=None): This is equivalent to the LAG function in SQL. :param col: name of column or expression - :param count: number of row to extend + :param offset: number of row to extend :param default: default value """ sc = SparkContext._active_spark_context - return Column(sc._jvm.functions.lag(_to_java_column(col), count, default)) + return Column(sc._jvm.functions.lag(_to_java_column(col), offset, default)) @since(1.4) -def lead(col, count=1, default=None): +def lead(col, offset=1, default=None): """ Window function: returns the value that is `offset` rows after the current row, and `defaultValue` if there is less than `offset` rows after the current row. For example, @@ -824,11 +824,11 @@ def lead(col, count=1, default=None): This is equivalent to the LEAD function in SQL. :param col: name of column or expression - :param count: number of row to extend + :param offset: number of row to extend :param default: default value """ sc = SparkContext._active_spark_context - return Column(sc._jvm.functions.lead(_to_java_column(col), count, default)) + return Column(sc._jvm.functions.lead(_to_java_column(col), offset, default)) @since(1.4) From f2adb610680f9addec51bf470acf64f3849073e9 Mon Sep 17 00:00:00 2001 From: wuqingxin Date: Fri, 28 Dec 2018 00:15:57 -0800 Subject: [PATCH 4/4] [SPARK-26446][CORE] Add cachedExecutorIdleTimeout docs at ExecutorAllocationManager ## What changes were proposed in this pull request? Add docs to describe how remove policy act while considering the property `spark.dynamicAllocation.cachedExecutorIdleTimeout` in ExecutorAllocationManager ## How was this patch tested? comment-only PR. Closes #23386 from TopGunViper/SPARK-26446. Authored-by: wuqingxin Signed-off-by: Dongjoon Hyun --- .../org/apache/spark/ExecutorAllocationManager.scala | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index c3e5b96a55884..3f0b71bbe17f1 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -57,7 +57,8 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * a long time to ramp up under heavy workloads. * * The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not - * been scheduled to run any tasks, then it is removed. + * been scheduled to run any tasks, then it is removed. Note that an executor caching any data + * blocks will be removed if it has been idle for more than L seconds. * * There is no retry logic in either case because we make the assumption that the cluster manager * will eventually fulfill all requests it receives asynchronously. @@ -81,7 +82,12 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils} * This is used only after the initial backlog timeout is exceeded * * spark.dynamicAllocation.executorIdleTimeout (K) - - * If an executor has been idle for this duration, remove it + * If an executor without caching any data blocks has been idle for this duration, remove it + * + * spark.dynamicAllocation.cachedExecutorIdleTimeout (L) - + * If an executor with caching data blocks has been idle for more than this duration, + * the executor will be removed + * */ private[spark] class ExecutorAllocationManager( client: ExecutorAllocationClient,