Skip to content

Commit

Permalink
[SPARK-50144][SS] Address the limitation of metrics calculation with …
Browse files Browse the repository at this point in the history
…DSv1 streaming sources
  • Loading branch information
HeartSaVioR committed Oct 28, 2024
1 parent 5538109 commit 1f12679
Show file tree
Hide file tree
Showing 27 changed files with 252 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2228,20 +2228,21 @@ object DecimalAggregates extends Rule[LogicalPlan] {
object ConvertToLocalRelation extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning(
_.containsPattern(LOCAL_RELATION), ruleId) {
case Project(projectList, LocalRelation(output, data, isStreaming))
case Project(projectList, LocalRelation(output, data, isStreaming, stream))
if !projectList.exists(hasUnevaluableExpr) =>
val projection = new InterpretedMutableProjection(projectList, output)
projection.initialize(0)
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()), isStreaming)
LocalRelation(projectList.map(_.toAttribute), data.map(projection(_).copy()),
isStreaming, stream)

case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming)) =>
LocalRelation(output, data.take(limit), isStreaming)
case Limit(IntegerLiteral(limit), LocalRelation(output, data, isStreaming, stream)) =>
LocalRelation(output, data.take(limit), isStreaming, stream)

case Filter(condition, LocalRelation(output, data, isStreaming))
case Filter(condition, LocalRelation(output, data, isStreaming, stream))
if !hasUnevaluableExpr(condition) =>
val predicate = Predicate.create(condition, output)
predicate.initialize(0)
LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming)
LocalRelation(output, data.filter(row => predicate.eval(row)), isStreaming, stream)
}

private def hasUnevaluableExpr(expr: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, Literal}
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.trees.TreePattern.{LOCAL_RELATION, TreePattern}
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -61,12 +62,21 @@ case class LocalRelation(
output: Seq[Attribute],
data: Seq[InternalRow] = Nil,
// Indicates whether this relation has data from a streaming source.
override val isStreaming: Boolean = false)
extends LeafNode with analysis.MultiInstanceRelation {
override val isStreaming: Boolean = false,
@transient stream: Option[SparkDataStream] = None)
extends LeafNode
with StreamSourceAwareLogicalPlan
with analysis.MultiInstanceRelation {

// A local relation must have resolved output.
require(output.forall(_.resolved), "Unresolved attributes found when constructing LocalRelation.")

override def withStream(stream: SparkDataStream): LocalRelation = {
copy(stream = Some(stream))
}

override def getStream: Option[SparkDataStream] = stream

/**
* Returns an identical copy of this relation with new exprIds for all attributes. Different
* attributes are required when a relation is going to be included multiple times in the same
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.catalyst.plans.logical

import org.apache.spark.sql.connector.read.streaming.SparkDataStream

/**
* This trait is a mixin for source logical nodes to represent the stream. This is required to the
* logical nodes which can be used in the leaf node of Source.getBatch().
*/
trait StreamSourceAwareLogicalPlan extends LogicalPlan {
/**
* Set the stream associated with this node.
* Spark will use this method to set the stream, and the implementation should copy the node with
* setting up the stream.
*/
def withStream(stream: SparkDataStream): LogicalPlan

/** Get the stream associated with this node. */
def getStream: Option[SparkDataStream]
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
assert(resolved.isInstanceOf[ResolvedInlineTable])

EvalInlineTables(ComputeCurrentTime(resolved)) match {
case LocalRelation(output, data, _) =>
case LocalRelation(output, data, _, _) =>
assert(output.map(_.dataType) == Seq(TimestampType))
assert(data.size == 2)
// Make sure that both CURRENT_TIMESTAMP expressions are evaluated to the same value.
Expand All @@ -117,7 +117,8 @@ class ResolveInlineTablesSuite extends AnalysisTest with BeforeAndAfter {
val table = UnresolvedInlineTable(Seq("c1"),
Seq(Seq(Cast(lit("1991-12-06 00:00:00.0"), TimestampType))))
val withTimeZone = ResolveTimeZone.apply(table)
val LocalRelation(output, data, _) = EvalInlineTables(ResolveInlineTables.apply(withTimeZone))
val LocalRelation(output, data, _, _) =
EvalInlineTables(ResolveInlineTables.apply(withTimeZone))
val correct = Cast(lit("1991-12-06 00:00:00.0"), TimestampType)
.withTimeZone(conf.sessionLocalTimeZone).eval().asInstanceOf[Long]
assert(output.map(_.dataType) == Seq(TimestampType))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class RewriteDistinctAggregatesSuite extends PlanTest {

val rewrite = RewriteDistinctAggregates(input)
rewrite match {
case Aggregate(_, _, LocalRelation(_, _, _)) =>
case Aggregate(_, _, _: LocalRelation) =>
case _ => fail(s"Plan is not as expected:\n$rewrite")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.{truncatedString, CaseInsensitiveMap}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution
import org.apache.spark.sql.execution.datasources._
Expand All @@ -46,7 +47,7 @@ import org.apache.spark.util.ArrayImplicits._
import org.apache.spark.util.Utils
import org.apache.spark.util.collection.BitSet

trait DataSourceScanExec extends LeafExecNode {
trait DataSourceScanExec extends LeafExecNode with StreamSourceAwareSparkPlan {
def relation: BaseRelation
def tableIdentifier: Option[TableIdentifier]

Expand Down Expand Up @@ -114,6 +115,7 @@ case class RowDataSourceScanExec(
pushedDownOperators: PushedDownOperators,
rdd: RDD[InternalRow],
@transient relation: BaseRelation,
@transient stream: Option[SparkDataStream],
tableIdentifier: Option[TableIdentifier])
extends DataSourceScanExec with InputRDDCodegen {

Expand Down Expand Up @@ -207,6 +209,8 @@ case class RowDataSourceScanExec(
output.map(QueryPlan.normalizeExpressions(_, output)),
rdd = null,
tableIdentifier = None)

override def getStream: Option[SparkDataStream] = stream
}

/**
Expand Down Expand Up @@ -599,6 +603,7 @@ trait FileSourceScanLike extends DataSourceScanExec {
*/
case class FileSourceScanExec(
@transient override val relation: HadoopFsRelation,
@transient stream: Option[SparkDataStream],
override val output: Seq[Attribute],
override val requiredSchema: StructType,
override val partitionFilters: Seq[Expression],
Expand Down Expand Up @@ -817,6 +822,7 @@ case class FileSourceScanExec(
override def doCanonicalize(): FileSourceScanExec = {
FileSourceScanExec(
relation,
stream,
output.map(QueryPlan.normalizeExpressions(_, output)),
requiredSchema,
QueryPlan.normalizePredicates(
Expand All @@ -827,4 +833,6 @@ case class FileSourceScanExec(
None,
disableBucketedScan)
}

override def getStream: Option[SparkDataStream] = stream
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning}
import org.apache.spark.sql.catalyst.util.truncatedString
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.collection.Utils

Expand Down Expand Up @@ -97,13 +98,16 @@ case class LogicalRDD(
rdd: RDD[InternalRow],
outputPartitioning: Partitioning = UnknownPartitioning(0),
override val outputOrdering: Seq[SortOrder] = Nil,
override val isStreaming: Boolean = false)(
override val isStreaming: Boolean = false,
@transient stream: Option[SparkDataStream] = None)(
session: SparkSession,
// originStats and originConstraints are intentionally placed to "second" parameter list,
// to prevent catalyst rules to mistakenly transform and rewrite them. Do not change this.
originStats: Option[Statistics] = None,
originConstraints: Option[ExpressionSet] = None)
extends LeafNode with MultiInstanceRelation {
extends LeafNode
with StreamSourceAwareLogicalPlan
with MultiInstanceRelation {

import LogicalRDD._

Expand Down Expand Up @@ -134,7 +138,8 @@ case class LogicalRDD(
rdd,
rewrittenPartitioning,
rewrittenOrdering,
isStreaming
isStreaming,
stream
)(session, rewrittenStatistics, rewrittenConstraints).asInstanceOf[this.type]
}

Expand All @@ -158,6 +163,13 @@ case class LogicalRDD(
// Therefore we assume that all subqueries are non-deterministic, and we do not expose any
// constraints that contain a subquery.
.filterNot(SubqueryExpression.hasSubquery)

override def withStream(stream: SparkDataStream): LogicalRDD = {
copy(stream = Some(stream))(session, originStats, originConstraints)
}

override def getStream: Option[SparkDataStream] = stream

}

object LogicalRDD extends Logging {
Expand Down Expand Up @@ -191,7 +203,8 @@ object LogicalRDD extends Logging {
rdd,
firstLeafPartitioning(executedPlan.outputPartitioning),
executedPlan.outputOrdering,
isStreaming
isStreaming,
None
)(originDataset.sparkSession, stats, constraints)
}

Expand Down Expand Up @@ -264,7 +277,11 @@ case class RDDScanExec(
rdd: RDD[InternalRow],
name: String,
override val outputPartitioning: Partitioning = UnknownPartitioning(0),
override val outputOrdering: Seq[SortOrder] = Nil) extends LeafExecNode with InputRDDCodegen {
override val outputOrdering: Seq[SortOrder] = Nil,
@transient stream: Option[SparkDataStream] = None)
extends LeafExecNode
with StreamSourceAwareSparkPlan
with InputRDDCodegen {

private def rddName: String = Option(rdd.name).map(n => s" $n").getOrElse("")

Expand Down Expand Up @@ -293,4 +310,6 @@ case class RDDScanExec(
override protected val createUnsafeProjection: Boolean = true

override def inputRDD: RDD[InternalRow] = rdd

override def getStream: Option[SparkDataStream] = stream
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Attribute, UnsafeProjection}
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.util.ArrayImplicits._

Expand All @@ -32,7 +33,11 @@ import org.apache.spark.util.ArrayImplicits._
*/
case class LocalTableScanExec(
output: Seq[Attribute],
@transient rows: Seq[InternalRow]) extends LeafExecNode with InputRDDCodegen {
@transient rows: Seq[InternalRow],
@transient stream: Option[SparkDataStream])
extends LeafExecNode
with StreamSourceAwareSparkPlan
with InputRDDCodegen {

override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"))
Expand Down Expand Up @@ -97,6 +102,8 @@ case class LocalTableScanExec(

override def inputRDD: RDD[InternalRow] = rdd

override def getStream: Option[SparkDataStream] = stream

private def sendDriverMetrics(): Unit = {
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, metrics.values.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -865,7 +865,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case MemoryPlan(sink, output) =>
val encoder = ExpressionEncoder(DataTypeUtils.fromAttributes(output))
val toRow = encoder.createSerializer()
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy())) :: Nil
LocalTableScanExec(output, sink.allData.map(r => toRow(r).copy()), None) :: Nil

case logical.Distinct(child) =>
throw SparkException.internalError(
Expand Down Expand Up @@ -985,8 +985,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
execution.ExpandExec(e.projections, e.output, planLater(child)) :: Nil
case logical.Sample(lb, ub, withReplacement, seed, child) =>
execution.SampleExec(lb, ub, withReplacement, seed, planLater(child)) :: Nil
case logical.LocalRelation(output, data, _) =>
LocalTableScanExec(output, data) :: Nil
case logical.LocalRelation(output, data, _, stream) =>
LocalTableScanExec(output, data, stream) :: Nil
case logical.EmptyRelation(l) => EmptyRelationExec(l) :: Nil
case CommandResult(output, _, plan, data) => CommandResultExec(output, plan, data) :: Nil
// We should match the combination of limit and offset first, to get the optimal physical
Expand Down Expand Up @@ -1036,7 +1036,8 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
shuffleOrigin, r.optAdvisoryPartitionSize) :: Nil
case ExternalRDD(outputObjAttr, rdd) => ExternalRDDScanExec(outputObjAttr, rdd) :: Nil
case r: LogicalRDD =>
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering) :: Nil
RDDScanExec(r.output, r.rdd, "ExistingRDD", r.outputPartitioning, r.outputOrdering,
r.stream) :: Nil
case _: UpdateTable =>
throw QueryExecutionErrors.ddlUnsupportedTemporarilyError("UPDATE TABLE")
case _: MergeIntoTable =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.connector.read.streaming.SparkDataStream

/**
* This trait is a mixin for source physical nodes to represent the stream. This is required to the
* physical nodes which is transformed from source logical nodes inheriting
* [[org.apache.spark.sql.catalyst.plans.logical.StreamSourceAwareLogicalPlan]].
*/
trait StreamSourceAwareSparkPlan extends SparkPlan {
/** Get the stream associated with this node. */
def getStream: Option[SparkDataStream]
}
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ object DataSourceStrategy
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
toCatalystRDD(l, baseRelation.buildScan()),
baseRelation,
l.stream,
None) :: Nil

case _ => Nil
Expand Down Expand Up @@ -436,6 +437,7 @@ object DataSourceStrategy
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.stream,
relation.catalogTable.map(_.identifier))
filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan)
} else {
Expand All @@ -459,6 +461,7 @@ object DataSourceStrategy
PushedDownOperators(None, None, None, None, Seq.empty, Seq.empty),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation,
relation.stream,
relation.catalogTable.map(_.identifier))
execution.ProjectExec(
projects, filterCondition.map(execution.FilterExec(_, scan)).getOrElse(scan))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
val scan =
FileSourceScanExec(
fsRelation,
l.stream,
outputAttributes,
outputDataSchema,
partitionKeyFilters.toSeq,
Expand Down
Loading

0 comments on commit 1f12679

Please sign in to comment.