Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into thread-local-date-f…
Browse files Browse the repository at this point in the history
…ormat

# Conflicts:
#	sql/core/src/test/scala/org/apache/spark/sql/execution/QueryExecutionSuite.scala
  • Loading branch information
MaxGekk committed Dec 28, 2018
2 parents 56bdae4 + f2adb61 commit 8541602
Show file tree
Hide file tree
Showing 45 changed files with 218 additions and 135 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* a long time to ramp up under heavy workloads.
*
* The remove policy is simpler: If an executor has been idle for K seconds, meaning it has not
* been scheduled to run any tasks, then it is removed.
* been scheduled to run any tasks, then it is removed. Note that an executor caching any data
* blocks will be removed if it has been idle for more than L seconds.
*
* There is no retry logic in either case because we make the assumption that the cluster manager
* will eventually fulfill all requests it receives asynchronously.
Expand All @@ -81,7 +82,12 @@ import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
* This is used only after the initial backlog timeout is exceeded
*
* spark.dynamicAllocation.executorIdleTimeout (K) -
* If an executor has been idle for this duration, remove it
* If an executor without caching any data blocks has been idle for this duration, remove it
*
* spark.dynamicAllocation.cachedExecutorIdleTimeout (L) -
* If an executor with caching data blocks has been idle for more than this duration,
* the executor will be removed
*
*/
private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
Expand Down
12 changes: 6 additions & 6 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -798,7 +798,7 @@ def factorial(col):
# --------------- Window functions ------------------------

@since(1.4)
def lag(col, count=1, default=None):
def lag(col, offset=1, default=None):
"""
Window function: returns the value that is `offset` rows before the current row, and
`defaultValue` if there is less than `offset` rows before the current row. For example,
Expand All @@ -807,15 +807,15 @@ def lag(col, count=1, default=None):
This is equivalent to the LAG function in SQL.
:param col: name of column or expression
:param count: number of row to extend
:param offset: number of row to extend
:param default: default value
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.lag(_to_java_column(col), count, default))
return Column(sc._jvm.functions.lag(_to_java_column(col), offset, default))


@since(1.4)
def lead(col, count=1, default=None):
def lead(col, offset=1, default=None):
"""
Window function: returns the value that is `offset` rows after the current row, and
`defaultValue` if there is less than `offset` rows after the current row. For example,
Expand All @@ -824,11 +824,11 @@ def lead(col, count=1, default=None):
This is equivalent to the LEAD function in SQL.
:param col: name of column or expression
:param count: number of row to extend
:param offset: number of row to extend
:param default: default value
"""
sc = SparkContext._active_spark_context
return Column(sc._jvm.functions.lead(_to_java_column(col), count, default))
return Column(sc._jvm.functions.lead(_to_java_column(col), offset, default))


@since(1.4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ class Analyzer(
a.mapExpressions(resolveExpressionTopDown(_, appendColumns))

case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
logTrace(s"Attempting to resolve ${q.simpleString(SQLConf.get.maxToStringFields)}")
q.mapExpressions(resolveExpressionTopDown(_, q))
}

Expand Down Expand Up @@ -1777,7 +1777,7 @@ class Analyzer(

case p if p.expressions.exists(hasGenerator) =>
throw new AnalysisException("Generators are not supported outside the SELECT clause, but " +
"got: " + p.simpleString)
"got: " + p.simpleString(SQLConf.get.maxToStringFields))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -303,7 +304,7 @@ trait CheckAnalysis extends PredicateHelper {
val missingAttributes = o.missingInput.mkString(",")
val input = o.inputSet.mkString(",")
val msgForMissingAttributes = s"Resolved attribute(s) $missingAttributes missing " +
s"from $input in operator ${operator.simpleString}."
s"from $input in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}."

val resolver = plan.conf.resolver
val attrsWithSameName = o.missingInput.filter { missing =>
Expand Down Expand Up @@ -368,7 +369,7 @@ trait CheckAnalysis extends PredicateHelper {
s"""nondeterministic expressions are only allowed in
|Project, Filter, Aggregate or Window, found:
| ${o.expressions.map(_.sql).mkString(",")}
|in operator ${operator.simpleString}
|in operator ${operator.simpleString(SQLConf.get.maxToStringFields)}
""".stripMargin)

case _: UnresolvedHint =>
Expand All @@ -380,7 +381,8 @@ trait CheckAnalysis extends PredicateHelper {
}
extendedCheckRules.foreach(_(plan))
plan.foreachUp {
case o if !o.resolved => failAnalysis(s"unresolved operator ${o.simpleString}")
case o if !o.resolved =>
failAnalysis(s"unresolved operator ${o.simpleString(SQLConf.get.maxToStringFields)}")
case _ =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,8 +1069,8 @@ trait TypeCoercionRule extends Rule[LogicalPlan] with Logging {
// Leave the same if the dataTypes match.
case Some(newType) if a.dataType == newType.dataType => a
case Some(newType) =>
logDebug(
s"Promoting $a from ${a.dataType} to ${newType.dataType} in ${q.simpleString}")
logDebug(s"Promoting $a from ${a.dataType} to ${newType.dataType} in " +
s" ${q.simpleString(SQLConf.get.maxToStringFields)}")
newType
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{GenerateSafeProjection
import org.apache.spark.sql.catalyst.expressions.objects.{AssertNotNull, InitializeJavaBean, Invoke, NewInstance}
import org.apache.spark.sql.catalyst.optimizer.SimplifyCasts
import org.apache.spark.sql.catalyst.plans.logical.{CatalystSerde, DeserializeToObject, LocalRelation}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ObjectType, StringType, StructField, StructType}
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -323,8 +324,8 @@ case class ExpressionEncoder[T](
extractProjection(inputRow)
} catch {
case e: Exception =>
throw new RuntimeException(
s"Error while encoding: $e\n${serializer.map(_.simpleString).mkString("\n")}", e)
throw new RuntimeException(s"Error while encoding: $e\n" +
s"${serializer.map(_.simpleString(SQLConf.get.maxToStringFields)).mkString("\n")}", e)
}

/**
Expand All @@ -336,7 +337,8 @@ case class ExpressionEncoder[T](
constructProjection(row).get(0, ObjectType(clsTag.runtimeClass)).asInstanceOf[T]
} catch {
case e: Exception =>
throw new RuntimeException(s"Error while decoding: $e\n${deserializer.simpleString}", e)
throw new RuntimeException(s"Error while decoding: $e\n" +
s"${deserializer.simpleString(SQLConf.get.maxToStringFields)}", e)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,12 +259,12 @@ abstract class Expression extends TreeNode[Expression] {

// Marks this as final, Expression.verboseString should never be called, and thus shouldn't be
// overridden by concrete classes.
final override def verboseString: String = simpleString
final override def verboseString(maxFields: Int): String = simpleString(maxFields)

override def simpleString: String = toString
override def simpleString(maxFields: Int): String = toString

override def toString: String = prettyName + truncatedString(
flatArguments.toSeq, "(", ", ", ")")
flatArguments.toSeq, "(", ", ", ")", SQLConf.get.maxToStringFields)

/**
* Returns SQL representation of this expression. For expressions extending [[NonSQLExpression]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ trait Block extends TreeNode[Block] with JavaCode {
case _ => code"$this\n$other"
}

override def verboseString: String = toString
override def verboseString(maxFields: Int): String = toString
}

object Block {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.{ArrayData, MapData}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -101,7 +102,7 @@ case class UserDefinedGenerator(
inputRow = new InterpretedProjection(children)
convertToScala = {
val inputSchema = StructType(children.map { e =>
StructField(e.simpleString, e.dataType, nullable = true)
StructField(e.simpleString(SQLConf.get.maxToStringFields), e.dataType, nullable = true)
})
CatalystTypeConverters.createToScalaConverter(inputSchema)
}.asInstanceOf[InternalRow => Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ case class NamedLambdaVariable(

override def toString: String = s"lambda $name#${exprId.id}$typeSuffix"

override def simpleString: String = s"lambda $name#${exprId.id}: ${dataType.simpleString}"
override def simpleString(maxFields: Int): String = {
s"lambda $name#${exprId.id}: ${dataType.simpleString(maxFields)}"
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.codegen._
import org.apache.spark.sql.catalyst.expressions.codegen.Block._
import org.apache.spark.sql.catalyst.util.RandomUUIDGenerator
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand All @@ -40,7 +41,7 @@ case class PrintToStderr(child: Expression) extends UnaryExpression {
input
}

private val outputPrefix = s"Result of ${child.simpleString} is "
private val outputPrefix = s"Result of ${child.simpleString(SQLConf.get.maxToStringFields)} is "

override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val outputPrefixField = ctx.addReferenceObj("outputPrefix", outputPrefix)
Expand Down Expand Up @@ -72,7 +73,7 @@ case class AssertTrue(child: Expression) extends UnaryExpression with ImplicitCa

override def prettyName: String = "assert_true"

private val errMsg = s"'${child.simpleString}' is not true!"
private val errMsg = s"'${child.simpleString(SQLConf.get.maxToStringFields)}' is not true!"

override def eval(input: InternalRow) : Any = {
val v = child.eval(input)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ case class AttributeReference(
}
}

override def withMetadata(newMetadata: Metadata): Attribute = {
override def withMetadata(newMetadata: Metadata): AttributeReference = {
AttributeReference(name, dataType, nullable, newMetadata)(exprId, qualifier)
}

Expand All @@ -330,7 +330,9 @@ case class AttributeReference(

// Since the expression id is not in the first constructor it is missing from the default
// tree string.
override def simpleString: String = s"$name#${exprId.id}: ${dataType.simpleString}"
override def simpleString(maxFields: Int): String = {
s"$name#${exprId.id}: ${dataType.simpleString(maxFields)}"
}

override def sql: String = {
val qualifierPrefix = if (qualifier.nonEmpty) qualifier.mkString(".") + "." else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,9 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT
*/
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""

override def simpleString: String = statePrefix + super.simpleString
override def simpleString(maxFields: Int): String = statePrefix + super.simpleString(maxFields)

override def verboseString: String = simpleString
override def verboseString(maxFields: Int): String = simpleString(maxFields)

/**
* All the subqueries of current plan.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ abstract class LogicalPlan
/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming == true)

override def verboseStringWithSuffix: String = {
super.verboseString + statsCache.map(", " + _.toString).getOrElse("")
override def verboseStringWithSuffix(maxFields: Int): String = {
super.verboseString(maxFields) + statsCache.map(", " + _.toString).getOrElse("")
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ case class View(

override def newInstance(): LogicalPlan = copy(output = output.map(_.newInstance()))

override def simpleString: String = {
override def simpleString(maxFields: Int): String = {
s"View (${desc.identifier}, ${output.mkString("[", ",", "]")})"
}
}
Expand All @@ -484,8 +484,8 @@ case class View(
case class With(child: LogicalPlan, cteRelations: Seq[(String, SubqueryAlias)]) extends UnaryNode {
override def output: Seq[Attribute] = child.output

override def simpleString: String = {
val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]")
override def simpleString(maxFields: Int): String = {
val cteAliases = truncatedString(cteRelations.map(_._1), "[", ", ", "]", maxFields)
s"CTE $cteAliases"
}

Expand Down Expand Up @@ -557,7 +557,7 @@ case class Range(

override def newInstance(): Range = copy(output = output.map(_.newInstance()))

override def simpleString: String = {
override def simpleString(maxFields: Int): String = {
s"Range ($start, $end, step=$step, splits=$numSlices)"
}

Expand Down
Loading

0 comments on commit 8541602

Please sign in to comment.