Skip to content

Commit

Permalink
Basic shuffle file consolidation
Browse files Browse the repository at this point in the history
The Spark shuffle phase can produce a large number of files, as one file is created
per mapper per reducer. For large or repeated jobs, this often produces millions of
shuffle files, which sees extremely degredaded performance from the OS file system.
This patch seeks to reduce that burden by combining multipe shuffle files into one.

This PR draws upon the work of Jason Dai in mesos/spark#669.
However, it simplifies the design in order to get the majority of the gain with less
overall intellectual and code burden. The vast majority of code in this pull request
is a refactor to allow the insertion of a clean layer of indirection between logical
block ids and physical files. This, I feel, provides some design clarity in addition
to enabling shuffle file consolidation.

The main goal is to produce one shuffle file per reducer per active mapper thread.
This allows us to isolate the mappers (simplifying the failure modes), while still
allowing us to reduce the number of mappers tremendously for large tasks. In order
to accomplish this, we simply create a new set of shuffle files for every parallel
task, and return the files to a pool which will be given out to the next run task.
  • Loading branch information
aarondav committed Oct 20, 2013
1 parent 861dc40 commit 136b9b3
Show file tree
Hide file tree
Showing 8 changed files with 57 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void messageReceived(ChannelHandlerContext ctx, String blockIdString) {
ctx.flush();
return;
}
long length = file.length();
long length = fileSegment.length();
if (length > Integer.MAX_VALUE || length <= 0) {
ctx.write(new FileHeader(0, blockId).buffer());
ctx.flush();
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/TaskContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class TaskContext(
val stageId: Int,
val partitionId: Int,
val attemptId: Long,
val executorId: String,
val runningLocally: Boolean = false,
@volatile var interrupted: Boolean = false,
private[spark] val taskMetrics: TaskMetrics = TaskMetrics.empty()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ private[spark] class Executor(

// Run the actual task and measure its runtime.
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val value = task.run(taskId.toInt, executorId)
val taskFinish = System.currentTimeMillis()

// If the task has been killed, let's fail it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,8 @@ private[spark] class ShuffleMapTask(
try {
// Obtain all the block writers for shuffle blocks.
val ser = SparkEnv.get.serializerManager.get(dep.serializerClass)
shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser)
shuffle = blockManager.shuffleBlockManager.forShuffle(
dep.shuffleId, context.executorId, numOutputSplits, ser)
buckets = shuffle.acquireWriters(partitionId)

// Write the map output to its associated buckets.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ import org.apache.spark.util.ByteBufferInputStream
*/
private[spark] abstract class Task[T](val stageId: Int, var partitionId: Int) extends Serializable {

def run(attemptId: Long): T = {
context = new TaskContext(stageId, partitionId, attemptId, runningLocally = false)
final def run(attemptId: Long, executorId: String): T = {
context = new TaskContext(stageId, partitionId, attemptId, executorId, runningLocally = false)
if (_killed) {
kill()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -578,6 +578,7 @@ private[spark] class BlockManager(
val file = diskBlockManager.createBlockFile(blockId, filename, allowAppending = true)
val writer = new DiskBlockObjectWriter(blockId, file, serializer, bufferSize, compressStream)
writer.registerCloseEventHandler(() => {
diskBlockManager.mapBlockToFileSegment(blockId, writer.fileSegment())
val myInfo = new BlockInfo(StorageLevel.DISK_ONLY, false)
blockInfo.put(blockId, myInfo)
myInfo.markReady(writer.fileSegment().length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,77 @@

package org.apache.spark.storage

import org.apache.spark.serializer.Serializer
import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

import org.apache.spark.serializer.Serializer

private[spark]
class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])

class ShuffleWriterGroup(val id: Int, val fileId: Int, val writers: Array[BlockObjectWriter])

private[spark]
trait ShuffleBlocks {
def acquireWriters(mapId: Int): ShuffleWriterGroup
def releaseWriters(group: ShuffleWriterGroup)
}

/**
* Manages assigning disk-based block writers to shuffle tasks. Each shuffle task gets one writer
* per reducer.
*
* As an optimization to reduce the number of physical shuffle files produced, multiple shuffle
* Blocks are aggregated into the same file. There is one "combined shuffle file" per reducer
* per concurrently executing shuffle task. As soon as a task finishes writing to its shuffle files,
* it releases them for another task.
* Regarding the implementation of this feature, shuffle files are identified by a 4-tuple:
* - shuffleId: The unique id given to the entire shuffle stage.
* - executorId: The id of the executor running the task. Required in order to ensure that
* multiple executors running on the same node do not collide.
* - bucketId: The id of the output partition (i.e., reducer id)
* - fileId: The unique id identifying a group of "combined shuffle files." Only one task at a
* time owns a particular fileId, and this id is returned to a pool when the task finishes.
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) {
/** Turning off shuffle file consolidation causes all shuffle Blocks to get their own file. */
val consolidateShuffleFiles =
System.getProperty("spark.storage.consolidateShuffleFiles", "true").toBoolean

var nextFileId = new AtomicInteger(0)
val unusedFileIds = new ConcurrentLinkedQueue[java.lang.Integer]()

def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = {
def forShuffle(shuffleId: Int, executorId: String, numBuckets: Int, serializer: Serializer) = {
new ShuffleBlocks {
// Get a group of writers for a map task.
override def acquireWriters(mapId: Int): ShuffleWriterGroup = {
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
val fileId = getUnusedFileId()
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
blockManager.getDiskWriter(blockId, blockId.name, serializer, bufferSize)
val filename = physicalFileName(shuffleId, executorId, bucketId, fileId)
blockManager.getDiskWriter(blockId, filename, serializer, bufferSize)
}
new ShuffleWriterGroup(mapId, writers)
new ShuffleWriterGroup(mapId, fileId, writers)
}

override def releaseWriters(group: ShuffleWriterGroup) = {
// Nothing really to release here.
override def releaseWriters(group: ShuffleWriterGroup) {
recycleFileId(group.fileId)
}
}
}

private def getUnusedFileId(): Int = {
val fileId = unusedFileIds.poll()
if (fileId == null) nextFileId.getAndIncrement()
else fileId
}

private def recycleFileId(fileId: Int) {
if (!consolidateShuffleFiles) { return } // ensures we always generate new file id
unusedFileIds.add(fileId)
}

private def physicalFileName(shuffleId: Int, executorId: String, bucketId: Int, fileId: Int) = {
"merged_shuffle_%d_%s_%d_%d".format(shuffleId, executorId, bucketId, fileId)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class TaskContextSuite extends FunSuite with BeforeAndAfter with LocalSparkConte
val func = (c: TaskContext, i: Iterator[String]) => i.next
val task = new ResultTask[String, String](0, rdd, func, 0, Seq(), 0)
intercept[RuntimeException] {
task.run(0)
task.run(0, "test")
}
assert(completed === true)
}
Expand Down

0 comments on commit 136b9b3

Please sign in to comment.