Skip to content

Commit

Permalink
[SPARK-3116] Remove the excessive lockings in TorrentBroadcast
Browse files Browse the repository at this point in the history
Author: Reynold Xin <rxin@apache.org>

Closes #2028 from rxin/torrentBroadcast and squashes the following commits:

92c62a5 [Reynold Xin] Revert the MEMORY_AND_DISK_SER changes.
03a5221 [Reynold Xin] [SPARK-3116] Remove the excessive lockings in TorrentBroadcast
  • Loading branch information
rxin committed Aug 19, 2014
1 parent 1f1819b commit 8257733
Showing 1 changed file with 27 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@

package org.apache.spark.broadcast

import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream,
ObjectInputStream, ObjectOutputStream, OutputStream}
import java.io._

import scala.reflect.ClassTag
import scala.util.Random
Expand Down Expand Up @@ -53,10 +52,8 @@ private[spark] class TorrentBroadcast[T: ClassTag](

private val broadcastId = BroadcastBlockId(id)

TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)

@transient private var arrayOfBlocks: Array[TorrentBlock] = null
@transient private var totalBlocks = -1
Expand Down Expand Up @@ -91,18 +88,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](
// Store meta-info
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)

// Store individual pieces
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}

Expand Down Expand Up @@ -165,21 +158,20 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val metaId = BroadcastBlockId(id, "meta")
var attemptId = 10
while (attemptId > 0 && totalBlocks == -1) {
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0

case None =>
Thread.sleep(500)
}
SparkEnv.get.blockManager.getSingle(metaId) match {
case Some(x) =>
val tInfo = x.asInstanceOf[TorrentInfo]
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
arrayOfBlocks = new Array[TorrentBlock](totalBlocks)
hasBlocks = 0

case None =>
Thread.sleep(500)
}
attemptId -= 1
}

if (totalBlocks == -1) {
return false
}
Expand All @@ -192,17 +184,15 @@ private[spark] class TorrentBroadcast[T: ClassTag](
val recvOrder = new Random().shuffle(Array.iterate(0, totalBlocks)(_ + 1).toList)
for (pid <- recvOrder) {
val pieceId = BroadcastBlockId(id, "piece" + pid)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
SparkEnv.get.blockManager.getSingle(pieceId) match {
case Some(x) =>
arrayOfBlocks(pid) = x.asInstanceOf[TorrentBlock]
hasBlocks += 1
SparkEnv.get.blockManager.putSingle(
pieceId, arrayOfBlocks(pid), StorageLevel.MEMORY_AND_DISK, tellMaster = true)

case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
case None =>
throw new SparkException("Failed to get " + pieceId + " of " + broadcastId)
}
}

Expand Down Expand Up @@ -291,9 +281,7 @@ private[broadcast] object TorrentBroadcast extends Logging {
* If removeFromDriver is true, also remove these persisted blocks on the driver.
*/
def unpersist(id: Long, removeFromDriver: Boolean, blocking: Boolean) = {
synchronized {
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
SparkEnv.get.blockManager.master.removeBroadcast(id, removeFromDriver, blocking)
}
}

Expand Down

0 comments on commit 8257733

Please sign in to comment.