Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidations of shuffle files from different map tasks #635

Closed
wants to merge 2 commits into from
Closed
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
update shuffle consolidation code per review comments
jason-dai committed Jun 4, 2013
commit d53d332b7ebc37c0eb5c99e8835d826170f4c876
2 changes: 2 additions & 0 deletions core/src/main/scala/spark/MapOutputTracker.scala
Original file line number Diff line number Diff line change
@@ -117,6 +117,8 @@ private[spark] class MapOutputTracker extends Logging {
private val fetching = new HashSet[Int]

// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle
// Return an array of map output locations of the specific reduceId, one for each ShuffleMapTask, in the form of
// (BlockManagerId, groupId of the shuffle file, size of the shuffle file when the task writes its output)
def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Int, Long)] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a comment on what the Int and Long represent.

val statuses = mapStatuses.get(shuffleId).orNull
if (statuses == null) {
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/PairRDDFunctions.scala
Original file line number Diff line number Diff line change
@@ -69,14 +69,14 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
} else if (mapSideCombine) {
val mapSideCombined = self.mapPartitions(aggregator.combineValuesByKey(_), true)
val partitioned = new ShuffledRDD[K, C](mapSideCombined, partitioner, serializerClass)
logInfo("serializerClass=" + serializerClass)
logDebug("serializerClass=" + serializerClass)
partitioned.mapPartitions(aggregator.combineCombinersByKey(_), true)
} else {
// Don't apply map-side combiner.
// A sanity check to make sure mergeCombiners is not defined.
assert(mergeCombiners == null)
val values = new ShuffledRDD[K, V](self, partitioner, serializerClass)
logInfo("serializerClass=" + serializerClass)
logDebug("serializerClass=" + serializerClass)
values.mapPartitions(aggregator.combineValuesByKey(_), true)
}
}
4 changes: 2 additions & 2 deletions core/src/main/scala/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
@@ -76,12 +76,12 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
// Revert by discarding current writes, except that if no values have been committed,
// we revert by recreate the file (otherwise there are errors when reading objects from the file later on
if (lastValidPosition == initialPosition)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

initialPosition is set to the size of the file when the file is opened. Isn't it problematic if we open an existing file, did some writes and then want to revert, we could delete the old file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A shuffle file is closed when we get a shuffle request, and it should never be re-opened (and if you re-open the file and append it, you may corrupt the file as it may have a tailer). Maybe we should first delete the shuffle file if it exits when we first open it (e.g., in case we need to re-run the map tasks).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But if we re-run the map tasks, wouldn't that wipe out the shuffle outputs for all other previous map tasks?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The shuffle file is closed when all the map tasks are done (after the shuffle request is received). If we need to re-run a map task afterwards,
(1) There is no easy way to remove the results of the previous run of this task from the shuffle file
(2) You cannot re-open a closed shuffle file and append to it (e.g., kyro will append some tailer when closing the file)
(3) If a fetch fails, Spark will re-run all the map tasks (for the same shuffle) whose output locations are at the failing node; so it's OK to delete their outputs.

I actually also tried throwing random exceptions during shuffle and it works fine. Again, is there a good way to include such tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the response. It would be useful to actually put whatever you just wrote into the code comment block.

For failure testing, can we do something similar to what FailureSutie does?

recerateFile()
recreateFile()
else
discardWrites()
}

private def recerateFile () {
private def recreateFile () {
close ()
f.delete()
f = createFile(blockId)
84 changes: 50 additions & 34 deletions core/src/main/scala/spark/storage/ShuffleBlockManager.scala
Original file line number Diff line number Diff line change
@@ -11,16 +11,36 @@ import scala.collection.JavaConversions
import scala.collection.mutable.ArrayBuffer
import spark.Logging
import spark.SparkException
import scala.collection.JavaConverters._

private[spark] class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter])
private[spark] class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter]) {
def open() {
//open all the writers
writers.map { writer => if (!writer.isOpen) writer.open() }
}
}

private[spark]
trait ShuffleBlocks {
def acquireWriters(mapId: Int): ShuffleWriterGroup
def releaseWriters(group: ShuffleWriterGroup)
}


/**
* On each slave, ShuffleBlockManager maintains a shuffle block pool for each shuffle.
*
* Each pool maintains a list of shuffle block group; a ShuffleMapTask acquires a free group
* when it needs to write its results, and returns the group when it's done.
*
* Each group maintains a list of block writers, each for a different bucket (reduce partition).
*
* Each block writer is closed when the BlockManager receives a shuffle request for that block
* (i.e., all map tasks are done) and will not be re-opened again
*
* If we need to re-run a map task afterwards, Spark will actually re-run all the map tasks on
* the same slave; these tasks will then acquire new groups, which effectively discard the
* previous shuffle outputs for all these map tasks
*/
private[spark]
class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
initLogging()
@@ -41,8 +61,13 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
* Get a Shuffle Pool
*/
private def getPool(shuffleId: Int) : ShuffleBlocksPool = {
pools.putIfAbsent(shuffleId,new ShuffleBlocksPool(shuffleId))
pools.get(shuffleId)
val pool = pools.get(shuffleId)
if (pool == null) {
pools.putIfAbsent(shuffleId, new ShuffleBlocksPool(shuffleId))
pools.get(shuffleId)
}
else
pool
}

/**
@@ -57,7 +82,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
logDebug("closeBlock shuffleId: " + shuffleId + ", groupId: " + groupId + ", bucketId: " + bucketId)
val pool = getPool(shuffleId.toInt)
if (pool != null)
pool.closeBlock(groupId.toInt,bucketId.toInt)
pool.closeBlock(groupId.toInt, bucketId.toInt)
case _ =>
throw new SparkException(
"Failed to get block " + blockId + ", which is not a shuffle block")
@@ -70,7 +95,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
* for writing (e.g. in some failure recovery progress). 2. Better consider ttl
*/
def cleanup(cleanupTime: Long){
JavaConversions.asScalaConcurrentMap(pools).retain( (shuffleId,pool) => pool.allGroupsClosed() )
pools.asScala.retain( (shuffleId,pool) => pool.allGroupsClosed() )
}

class ShuffleBlocksPool (val shuffleId: Int) {
@@ -92,22 +117,22 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
* Acquire a new group from pool
*/
def acquireGroup(numBuckets: Int, serializer: Serializer) : ShuffleWriterGroup = {
//TODO. throws exception now. This needs to be handled. maybe reopen it.
if (isClosed)
throw new SparkException("ShuffleBlocksPool "+ shuffleId +" is closed and can not be used to acquired new blocks")
var group = freeGroups.poll()
if (group == null) {
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
val groupId = nextGroupID.getAndIncrement()
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, groupId)
blockManager.getDiskBlockWriter(blockId, serializer, bufferSize)
}
group = new ShuffleWriterGroup(groupId, writers)
openGroup(group)
allGroups += group
//TODO. throws exception now. This needs to be handled. maybe reopen it.
if (isClosed)
throw new SparkException("ShuffleBlocksPool "+ shuffleId +" is closed and can not be used to acquired new blocks")
var group = freeGroups.poll()
if (group == null) {
val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024
val groupId = nextGroupID.getAndIncrement()
val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, groupId)
blockManager.getDiskBlockWriter(blockId, serializer, bufferSize)
}
group
group = new ShuffleWriterGroup(groupId, writers)
group.open()
allGroups += group
}
group
}

/**
@@ -129,8 +154,8 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
/**
* Close all writers in a group. Reserved for possible optimization
*/
def closeGroup(groupId:Int){
allGroups(groupId.toInt).writers.map { _.close()}
def closeGroup(groupId:Int) {
allGroups(groupId.toInt).writers.map { _.close() }
}

/**
@@ -144,7 +169,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
* Check if all the groups in this shuffle is released (no writing is going-on)
*/
def allGroupsReleased() : Boolean = {
return (freeGroups.size == nextGroupID.get - 1)
freeGroups.size == nextGroupID.get - 1
}

/**
@@ -157,19 +182,10 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
else
false
}

/**
* Open all writers in a group
*/
private def openGroup(group: ShuffleWriterGroup) : ShuffleWriterGroup = {
//open all the writers
group.writers.map { writer => { if(!writer.isOpen) writer.open()} }
group
}
}

//keep track of pools for all shuffles indexed by Id
val pools = new ConcurrentHashMap[Int,ShuffleBlocksPool]
val pools = new ConcurrentHashMap[Int, ShuffleBlocksPool]

}