diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index f311141148cb0..0b4892f98f039 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -102,4 +102,9 @@ class ShuffleWriteMetrics extends Serializable { * Number of bytes written for a shuffle */ var shuffleBytesWritten: Long = _ + + /** + * Time spent blocking on writes to disk or buffer cache, in nanoseconds. + */ + var shuffleWriteTime: Long = _ } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala index 802791797a6f3..40baea69e8c44 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala @@ -164,17 +164,20 @@ private[spark] class ShuffleMapTask( // Commit the writes. Get the size of each bucket block (total block size). var totalBytes = 0L + var totalTime = 0L val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() val size = writer.size() totalBytes += size + totalTime += writer.timeWriting() MapOutputTracker.compressSize(size) } // Update shuffle metrics. val shuffleMetrics = new ShuffleWriteMetrics shuffleMetrics.shuffleBytesWritten = totalBytes + shuffleMetrics.shuffleWriteTime = totalTime metrics.get.shuffleWriteMetrics = Some(shuffleMetrics) new MapStatus(blockManager.blockManagerId, compressedSizes) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 2a67800c45a4d..76c92cefd8072 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -62,4 +62,9 @@ abstract class BlockObjectWriter(val blockId: BlockId) { * Size of the valid writes, in bytes. */ def size(): Long + + /** + * Cumulative time spent performing blocking writes, in ns. + */ + def timeWriting(): Long } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala index b7ca61e9381f2..2a9a3f61bd3ba 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskStore.scala @@ -45,19 +45,40 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) class DiskBlockObjectWriter(blockId: BlockId, serializer: Serializer, bufferSize: Int) extends BlockObjectWriter(blockId) { + /** Intercepts write calls and tracks total time spent writing. Not thread safe. */ + private class TimeTrackingOutputStream(out: OutputStream) extends OutputStream { + def timeWriting = _timeWriting + private var _timeWriting = 0L + + private def callWithTiming(f: => Unit) = { + val start = System.nanoTime() + f + _timeWriting += (System.nanoTime() - start) + } + + def write(i: Int): Unit = callWithTiming(out.write(i)) + override def write(b: Array[Byte]) = callWithTiming(out.write(b)) + override def write(b: Array[Byte], off: Int, len: Int) = callWithTiming(out.write(b, off, len)) + } + private val f: File = createFile(blockId /*, allowAppendExisting */) + private val syncWrites = System.getProperty("spark.shuffle.sync", "false").toBoolean // The file channel, used for repositioning / truncating the file. private var channel: FileChannel = null private var bs: OutputStream = null + private var fos: FileOutputStream = null + private var ts: TimeTrackingOutputStream = null private var objOut: SerializationStream = null private var lastValidPosition = 0L private var initialized = false + private var _timeWriting = 0L override def open(): DiskBlockObjectWriter = { - val fos = new FileOutputStream(f, true) + fos = new FileOutputStream(f, true) + ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() - bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(fos, bufferSize)) + bs = blockManager.wrapForCompression(blockId, new FastBufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) initialized = true this @@ -65,9 +86,23 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) override def close() { if (initialized) { - objOut.close() + if (syncWrites) { + // Force outstanding writes to disk and track how long it takes + objOut.flush() + val start = System.nanoTime() + fos.getFD.sync() + _timeWriting += System.nanoTime() - start + objOut.close() + } else { + objOut.close() + } + + _timeWriting += ts.timeWriting + channel = null bs = null + fos = null + ts = null objOut = null } // Invoke the close callback handler. @@ -110,6 +145,9 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String) } override def size(): Long = lastValidPosition + + // Only valid if called after close() + override def timeWriting = _timeWriting } private val MAX_DIR_CREATION_ATTEMPTS: Int = 10 diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 163a3746ea00e..b7c81d091cd22 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -86,7 +86,7 @@ private[spark] class StagePage(parent: JobProgressUI) { Seq("Task ID", "Status", "Locality Level", "Executor", "Launch Time", "Duration") ++ Seq("GC Time") ++ {if (hasShuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (hasShuffleWrite) Seq("Shuffle Write") else Nil} ++ + {if (hasShuffleWrite) Seq("Write Time", "Shuffle Write") else Nil} ++ Seq("Errors") val taskTable = listingTable(taskHeaders, taskRow(hasShuffleRead, hasShuffleWrite), tasks) @@ -169,6 +169,8 @@ private[spark] class StagePage(parent: JobProgressUI) { Utils.bytesToString(s.remoteBytesRead)}.getOrElse("")} }} {if (shuffleWrite) { +