Skip to content

Commit

Permalink
Remove partitionMetrics changes from DataWritingCommand
Browse files Browse the repository at this point in the history
  • Loading branch information
Steve Vaughan Jr committed Mar 26, 2024
1 parent 824d8d4 commit 312f761
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.command

import java.net.URI

import scala.collection.mutable

import org.apache.hadoop.conf.Configuration

import org.apache.spark.SparkContext
Expand All @@ -29,7 +27,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, UnaryCommand}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{SparkPlan, SQLExecution}
import org.apache.spark.sql.execution.datasources.{BasicWriteJobStatsTracker, PartitionTaskStats}
import org.apache.spark.sql.execution.datasources.BasicWriteJobStatsTracker
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.SerializableConfiguration
Expand All @@ -55,12 +53,10 @@ trait DataWritingCommand extends UnaryCommand {
DataWritingCommand.logicalPlanOutputWithNames(query, outputColumnNames)

lazy val metrics: Map[String, SQLMetric] = BasicWriteJobStatsTracker.metrics
lazy val partitionMetrics: mutable.Map[String, PartitionTaskStats] =
BasicWriteJobStatsTracker.partitionMetrics

def basicWriteJobStatsTracker(hadoopConf: Configuration): BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
new BasicWriteJobStatsTracker(serializableHadoopConf, metrics, partitionMetrics)
new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
}

def run(sparkSession: SparkSession, child: SparkPlan): Seq[Row]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,16 +229,15 @@ class BasicWriteTaskStatsTracker(
class BasicWriteJobStatsTracker(
serializableHadoopConf: SerializableConfiguration,
@transient val driverSideMetrics: Map[String, SQLMetric],
@transient val driverSidePartitionMetrics: mutable.Map[String, PartitionTaskStats],
taskCommitTimeMetric: SQLMetric)
extends WriteJobStatsTracker {

val partitionMetrics: PartitionMetricsWriteInfo = new PartitionMetricsWriteInfo()

def this(
serializableHadoopConf: SerializableConfiguration,
metrics: Map[String, SQLMetric],
partitionMetrics: mutable.Map[String, PartitionTaskStats]) = {
this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, partitionMetrics,
metrics(TASK_COMMIT_TIME))
metrics: Map[String, SQLMetric]) = {
this(serializableHadoopConf, metrics - TASK_COMMIT_TIME, metrics(TASK_COMMIT_TIME))
}

override def newTaskInstance(): WriteTaskStatsTracker = {
Expand All @@ -265,12 +264,7 @@ class BasicWriteJobStatsTracker(
// Check if we know the mapping of the internal row to a partition path
if (partitionsMap.contains(s._1)) {
val path = partitionsMap(s._1)
val current = partitionMetrics(path)
driverSidePartitionMetrics(path) = BasicWritePartitionTaskStats(
current.numFiles + s._2.numFiles,
current.numBytes + s._2.numBytes,
current.numRows + s._2.numRows
)
partitionMetrics.update(path, s._2.numBytes, s._2.numRows, s._2.numFiles)
}
})
}
Expand All @@ -284,14 +278,8 @@ class BasicWriteJobStatsTracker(
val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverSideMetrics.values.toList)

val partitionMetricsWriteInfo = new PartitionMetricsWriteInfo()
driverSidePartitionMetrics.foreach(entry => {
val key = entry._1
val value = entry._2
partitionMetricsWriteInfo.update(key, value.numBytes, value.numRows, value.numFiles)
})
SQLPartitionMetrics.postDriverMetricUpdates(sparkContext, executionId,
partitionMetricsWriteInfo)
partitionMetrics)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ trait FileWrite extends Write {
val partitionMetrics: mutable.Map[String, PartitionTaskStats]
= BasicWriteJobStatsTracker.partitionMetrics
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
val statsTracker = new BasicWriteJobStatsTracker(serializableHadoopConf, metrics,
partitionMetrics)
val statsTracker = new BasicWriteJobStatsTracker(serializableHadoopConf, metrics)
// TODO: after partitioning is supported in V2:
// 1. filter out partition columns in `dataColumns`.
// 2. Don't use Seq.empty for `partitionColumns`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,7 @@ class FileStreamSink(

private def basicWriteJobStatsTracker: BasicWriteJobStatsTracker = {
val serializableHadoopConf = new SerializableConfiguration(hadoopConf)
new BasicWriteJobStatsTracker(serializableHadoopConf, BasicWriteJobStatsTracker.metrics,
BasicWriteJobStatsTracker.partitionMetrics)
new BasicWriteJobStatsTracker(serializableHadoopConf, BasicWriteJobStatsTracker.metrics)
}

override def addBatch(batchId: Long, data: DataFrame): Unit = {
Expand Down

0 comments on commit 312f761

Please sign in to comment.