Skip to content

Commit

Permalink
Example of using named accumulators for custom RDD metrics.
Browse files Browse the repository at this point in the history
  • Loading branch information
pwendell committed Jul 6, 2014
1 parent 0b72660 commit ad85076
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 0 deletions.
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
context.taskMetrics.inputMetrics = Some(blockResult.inputMetrics)
if (blockResult.inputMetrics.bytesRead > 0) {
rdd.inputBytes += blockResult.inputMetrics.bytesRead
}
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

case None =>
Expand Down
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ class HadoopRDD[K, V](
minPartitions)
}

val hadoopInputBytes = sc.accumulator(0L, s"rdd-$id.input.bytes.hadoop")(SparkContext.LongAccumulatorParam)

protected val jobConfCacheKey = "rdd_%d_job_conf".format(id)

protected val inputFormatCacheKey = "rdd_%d_input_format".format(id)
Expand Down Expand Up @@ -205,6 +207,7 @@ class HadoopRDD[K, V](
* always at record boundaries, so tasks may need to read into other splits to complete
* a record. */
inputMetrics.bytesRead = split.inputSplit.value.getLength()
hadoopInputBytes += split.inputSplit.value.getLength()
} catch {
case e: java.io.IOException =>
logWarning("Unable to get input size to set InputMetrics for task", e)
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1270,4 +1270,10 @@ abstract class RDD[T: ClassTag](
def toJavaRDD() : JavaRDD[T] = {
new JavaRDD(this)(elementClassTag)
}

// =======================================================================
// Common metrics
// =======================================================================
// Input bytes if this RDD was read from persisted data or a filesystem
val inputBytes = sc.accumulator(0L, s"rdd-$id.input.bytes.persisted")
}

0 comments on commit ad85076

Please sign in to comment.