diff --git a/core/src/main/scala/org/apache/spark/CacheManager.scala b/core/src/main/scala/org/apache/spark/CacheManager.scala index 8f867686a0443..38fd14acab917 100644 --- a/core/src/main/scala/org/apache/spark/CacheManager.scala +++ b/core/src/main/scala/org/apache/spark/CacheManager.scala @@ -45,6 +45,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { case Some(blockResult) => // Partition is already materialized, so just return its values context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics) + if (blockResult.inputMetrics.bytesRead > 0) { + rdd.inputBytes += blockResult.inputMetrics.bytesRead + } new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]]) case None => diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 041028514399b..1987e9ef99930 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -119,6 +119,8 @@ class HadoopRDD[K, V]( minPartitions) } + val hadoopInputBytes = sc.accumulator(0L, s"rdd-$id.input.bytes.hadoop")(SparkContext.LongAccumulatorParam) + protected val jobConfCacheKey = "rdd_%d_job_conf".format(id) protected val inputFormatCacheKey = "rdd_%d_input_format".format(id) @@ -205,6 +207,7 @@ class HadoopRDD[K, V]( * always at record boundaries, so tasks may need to read into other splits to complete * a record. */ inputMetrics.bytesRead = split.inputSplit.value.getLength() + hadoopInputBytes += split.inputSplit.value.getLength() } catch { case e: java.io.IOException => logWarning("Unable to get input size to set InputMetrics for task", e) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 4e841bc992bff..e43625ea34950 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1270,4 +1270,10 @@ abstract class RDD[T: ClassTag]( def toJavaRDD() : JavaRDD[T] = { new JavaRDD(this)(elementClassTag) } + + // ======================================================================= + // Common metrics + // ======================================================================= + // Input bytes if this RDD was read from persisted data or a filesystem + val inputBytes = sc.accumulator(0L, s"rdd-$id.input.bytes.persisted") }