Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch for SPARK-942 #50

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
efe1102
Changing CacheManager and BlockManager to pass iterators directly to …
kellrott Nov 13, 2013
cac1fad
Fixing MemoryStore, so that it converts incoming iterators to ArrayBu…
kellrott Nov 13, 2013
d32992f
Merge remote-tracking branch 'origin/master' into iterator-to-disk
kellrott Nov 13, 2013
81d670c
Adding unit test for straight to disk iterator methods.
kellrott Nov 14, 2013
f403826
Merge branch 'master' into iterator-to-disk
kellrott Nov 15, 2013
5eb2b7e
Changing the JavaSerializer reset to occur every 1000 objects.
kellrott Nov 17, 2013
44ec35a
Adding some comments.
kellrott Nov 17, 2013
56f71cd
Merge branch 'master' into iterator-to-disk
kellrott Feb 4, 2014
95c7f67
Simplifying StorageLevel checks
kellrott Feb 24, 2014
0e6f808
Deleting temp output directory when done
kellrott Feb 24, 2014
2eeda75
Fixing dumb mistake ("||" instead of "&&")
kellrott Feb 25, 2014
a6424ba
Wrapping long line
kellrott Feb 25, 2014
9df0276
Added check to make sure that streamed-to-dist RDD actually returns g…
kellrott Feb 25, 2014
31fe08e
Removing un-needed semi-colons
kellrott Feb 25, 2014
40fe1d7
Removing rouge space
kellrott Feb 25, 2014
00c98e0
Making the Java ObjectStreamSerializer reset rate configurable by the…
kellrott Feb 25, 2014
8644ee8
Merge branch 'master' into iterator-to-disk
kellrott Feb 25, 2014
656c33e
Fixing the JavaSerializer to read from the SparkConf rather then the …
kellrott Feb 25, 2014
0f28ec7
Adding second putValues to BlockStore interface that accepts an Array…
kellrott Feb 25, 2014
627a8b7
Wrapping a few long lines
kellrott Feb 25, 2014
c2fb430
Removing more un-needed array-buffer to iterator conversions
kellrott Feb 26, 2014
16a4cea
Streamlined the LargeIteratorSuite unit test. It should now run in ~2…
kellrott Feb 26, 2014
7ccc74b
Moving the 'LargeIteratorSuite' to simply test persistance of iterato…
kellrott Feb 27, 2014
f70d069
Adding docs for spark.serializer.objectStreamReset configuration
kellrott Feb 27, 2014
2f684ea
Refactoring the BlockManager to replace the Either[Either[A,B]] usage…
kellrott Feb 27, 2014
33ac390
Merge branch 'iterator-to-disk' of github.com:kellrott/incubator-spar…
kellrott Feb 27, 2014
8aa31cd
Merge ../incubator-spark into iterator-to-disk
kellrott Feb 28, 2014
60e0c57
Fixing issues (formatting, variable names, etc.) from review comments
kellrott Mar 4, 2014
9ef7cb8
Fixing formatting issues.
kellrott Mar 4, 2014
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions core/src/main/scala/org/apache/spark/CacheManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,30 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if (context.runningLocally) { return computedValues }
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
elements.iterator.asInstanceOf[Iterator[T]]
if (storageLevel.useDisk && !storageLevel.useMemory) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not the only condition where we want to do this. For example we might also want it for MEMORY_ONLY_SER, where the serialized data might fit in RAM but the ArrayBuffer of raw objects might not. (Especially if you set spark.rdd.compress to compress the serialized data.)

// In the case that this RDD is to be persisted using DISK_ONLY
// the iterator will be passed directly to the blockManager (rather then
// caching it to an ArrayBuffer first), then the resulting block data iterator
// will be passed back to the user. If the iterator generates a lot of data,
// this means that it doesn't all have to be held in memory at one time.
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure
// blocks aren't dropped by the block store before enabling that.
blockManager.put(key, computedValues, storageLevel, tellMaster = true)
return blockManager.get(key) match {
case Some(values) =>
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]])
case None =>
logInfo("Failure to store %s".format(key))
throw new Exception("Block manager failed to return persisted valued")
}
} else {
// In this case the RDD is cached to an array buffer. This will save the results
// if we're dealing with a 'one-time' iterator
val elements = new ArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel, tellMaster = true)
return elements.iterator.asInstanceOf[Iterator[T]]
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason why we still want to keep around the legacy case? It seems that the IteratorValues and ArrayBufferValues are handled in very similar ways downstream (e.g. in BlockManager.doPut, and memoryStore.putValues), and it would be nice if we could somehow merge these two code paths into a simpler one.

The other thing is that computeValues in L71 is already an iterator anyway, so we can just feed this directly into BlockManager. The existing way of making it an ArrayBuffer arbitrarily and back into an Iterator seems a little unnecessary.

(That said, there might be a performance reason that I haven't thought of.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, performance and correctness are actually both reasons. The code path for disk first writes the data to disk and then has to read and deserialize it from there, which is slow. Also, if you used the memory store in the same way, the store might drop it before you have a chance to call get(). See my comments on the main discussion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if we use the same code path for say MEM_ONLY, we won't actually write the data to disk since we also pass the storage level to BlockManager along with the data.

But I see your second point about possibly dropping a block before we read it. That does seem to prevent us from merging the two cases.

} finally {
loading.synchronized {
loading.remove(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,28 @@ import java.nio.ByteBuffer
import org.apache.spark.SparkConf
import org.apache.spark.util.ByteBufferInputStream

private[spark] class JavaSerializationStream(out: OutputStream) extends SerializationStream {
private[spark] class JavaSerializationStream(out: OutputStream, conf: SparkConf)
extends SerializationStream {
val objOut = new ObjectOutputStream(out)
def writeObject[T](t: T): SerializationStream = { objOut.writeObject(t); this }
var counter = 0
val counterReset = conf.getInt("spark.serializer.objectStreamReset", 10000)

/**
* Calling reset to avoid memory leak:
* http://stackoverflow.com/questions/1281549/memory-leak-traps-in-the-java-standard-api
* But only call it every 10,000th time to avoid bloated serialization streams (when
* the stream 'resets' object class descriptions have to be re-written)
*/
def writeObject[T](t: T): SerializationStream = {
objOut.writeObject(t)
if (counterReset > 0 && counter >= counterReset) {
objOut.reset()
counter = 0
} else {
counter += 1
}
this
}
def flush() { objOut.flush() }
def close() { objOut.close() }
}
Expand All @@ -41,7 +60,7 @@ extends DeserializationStream {
def close() { objIn.close() }
}

private[spark] class JavaSerializerInstance extends SerializerInstance {
private[spark] class JavaSerializerInstance(conf: SparkConf) extends SerializerInstance {
def serialize[T](t: T): ByteBuffer = {
val bos = new ByteArrayOutputStream()
val out = serializeStream(bos)
Expand All @@ -63,7 +82,7 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
}

def serializeStream(s: OutputStream): SerializationStream = {
new JavaSerializationStream(s)
new JavaSerializationStream(s, conf)
}

def deserializeStream(s: InputStream): DeserializationStream = {
Expand All @@ -79,5 +98,5 @@ private[spark] class JavaSerializerInstance extends SerializerInstance {
* A Spark serializer that uses Java's built-in serialization.
*/
class JavaSerializer(conf: SparkConf) extends Serializer {
def newInstance(): SerializerInstance = new JavaSerializerInstance
def newInstance(): SerializerInstance = new JavaSerializerInstance(conf)
}
87 changes: 51 additions & 36 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ import org.apache.spark.network._
import org.apache.spark.serializer.Serializer
import org.apache.spark.util._

sealed trait Values

case class ByteBufferValues(buffer: ByteBuffer) extends Values
case class IteratorValues(iterator: Iterator[Any]) extends Values
case class ArrayBufferValues(buffer: ArrayBuffer[Any]) extends Values

private[spark] class BlockManager(
executorId: String,
actorSystem: ActorSystem,
Expand Down Expand Up @@ -455,9 +461,7 @@ private[spark] class BlockManager(

def put(blockId: BlockId, values: Iterator[Any], level: StorageLevel, tellMaster: Boolean)
: Long = {
val elements = new ArrayBuffer[Any]
elements ++= values
put(blockId, elements, level, tellMaster)
doPut(blockId, IteratorValues(values), level, tellMaster)
}

/**
Expand All @@ -479,7 +483,7 @@ private[spark] class BlockManager(
def put(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
tellMaster: Boolean = true) : Long = {
require(values != null, "Values is null")
doPut(blockId, Left(values), level, tellMaster)
doPut(blockId, ArrayBufferValues(values), level, tellMaster)
}

/**
Expand All @@ -488,10 +492,11 @@ private[spark] class BlockManager(
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel,
tellMaster: Boolean = true) {
require(bytes != null, "Bytes is null")
doPut(blockId, Right(bytes), level, tellMaster)
doPut(blockId, ByteBufferValues(bytes), level, tellMaster)
}

private def doPut(blockId: BlockId, data: Either[ArrayBuffer[Any], ByteBuffer],
private def doPut(blockId: BlockId,
data: Values,
level: StorageLevel, tellMaster: Boolean = true): Long = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
Expand Down Expand Up @@ -534,8 +539,9 @@ private[spark] class BlockManager(

// If we're storing bytes, then initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = if (data.isRight && level.replication > 1) {
val bufferView = data.right.get.duplicate() // Doesn't copy the bytes, just creates a wrapper
val replicationFuture = if (data.isInstanceOf[ByteBufferValues] && level.replication > 1) {
// Duplicate doesn't copy the bytes, just creates a wrapper
val bufferView = data.asInstanceOf[ByteBufferValues].buffer.duplicate()
Future {
replicate(blockId, bufferView, level)
}
Expand All @@ -549,34 +555,43 @@ private[spark] class BlockManager(

var marked = false
try {
data match {
case Left(values) => {
if (level.useMemory) {
// Save it just to memory first, even if it also has useDisk set to true; we will
// drop it to disk later if the memory store can't hold it.
val res = memoryStore.putValues(blockId, values, level, true)
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
val askForBytes = level.replication > 1
val res = diskStore.putValues(blockId, values, level, askForBytes)
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
if (level.useMemory) {
// Save it just to memory first, even if it also has useDisk set to true; we will
// drop it to disk later if the memory store can't hold it.
val res = data match {
case IteratorValues(iterator) =>
memoryStore.putValues(blockId, iterator, level, true)
case ArrayBufferValues(array) =>
memoryStore.putValues(blockId, array, level, true)
case ByteBufferValues(bytes) => {
bytes.rewind();
memoryStore.putBytes(blockId, bytes, level)
}
}
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case Left(newIterator) => valuesAfterPut = newIterator
}
} else {
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
val askForBytes = level.replication > 1

val res = data match {
case IteratorValues(iterator) =>
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) => {
bytes.rewind();
diskStore.putBytes(blockId, bytes, level)
}
}
case Right(bytes) => {
bytes.rewind()
// Store it only in memory at first, even if useDisk is also set to true
(if (level.useMemory) memoryStore else diskStore).putBytes(blockId, bytes, level)
size = bytes.limit
size = res.size
res.data match {
case Right(newBytes) => bytesAfterPut = newBytes
case _ =>
}
}

Expand Down Expand Up @@ -605,8 +620,8 @@ private[spark] class BlockManager(
// values and need to serialize and replicate them now:
if (level.replication > 1) {
data match {
case Right(bytes) => Await.ready(replicationFuture, Duration.Inf)
case Left(values) => {
case ByteBufferValues(bytes) => Await.ready(replicationFuture, Duration.Inf)
case _ => {
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.Logging
*/
private[spark]
abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel)
def putBytes(blockId: BlockId, bytes: ByteBuffer, level: StorageLevel) : PutResult

/**
* Put in a block and, possibly, also return its content as either bytes or another Iterator.
Expand All @@ -37,6 +37,9 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
* @return a PutResult that contains the size of the data, as well as the values put if
* returnValues is true (if not, the result's data field can be null)
*/
def putValues(blockId: BlockId, values: Iterator[Any], level: StorageLevel,
returnValues: Boolean) : PutResult

def putValues(blockId: BlockId, values: ArrayBuffer[Any], level: StorageLevel,
returnValues: Boolean) : PutResult

Expand Down
14 changes: 12 additions & 2 deletions core/src/main/scala/org/apache/spark/storage/DiskStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
diskManager.getBlockLocation(blockId).length
}

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
Expand All @@ -52,20 +52,30 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage
val finishTime = System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime)))
return PutResult(bytes.limit(), Right(bytes.duplicate()))
}

override def putValues(
blockId: BlockId,
values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The formatting of this parameter list is wrong (sorry, not sure why Jenkins is not catching this); the proper formatting should be

  override def putValues(
      blockId: BlockId,
      values: ArrayBuffer[Any],
      level: StorageLevel,
      returnValues: Boolean): PutResult = {
    ...
  }

See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide for some examples

return putValues(blockId, values.toIterator, level, returnValues)
}

override def putValues(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {

logDebug("Attempting to write values for block " + blockId)
val startTime = System.currentTimeMillis
val file = diskManager.getFile(blockId)
val outputStream = new FileOutputStream(file)
blockManager.dataSerializeStream(blockId, outputStream, values.iterator)
blockManager.dataSerializeStream(blockId, outputStream, values)
val length = file.length

val timeTaken = System.currentTimeMillis - startTime
Expand Down
31 changes: 26 additions & 5 deletions core/src/main/scala/org/apache/spark/storage/MemoryStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) {
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = {
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
Expand All @@ -59,8 +59,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true)
PutResult(sizeEstimate, Left(values.toIterator))
} else {
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}

Expand All @@ -69,14 +71,33 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
values: ArrayBuffer[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {

: PutResult = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same thing on formatting

if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
tryToPut(blockId, values, sizeEstimate, true)
PutResult(sizeEstimate, Left(values.iterator))
PutResult(sizeEstimate, Left(values.toIterator))
} else {
val bytes = blockManager.dataSerialize(blockId, values.toIterator)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}

override def putValues(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
returnValues: Boolean)
: PutResult = {

if (level.deserialized) {
val valueEntries = new ArrayBuffer[Any]()
valueEntries ++= values
val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef])
tryToPut(blockId, valueEntries, sizeEstimate, true)
PutResult(sizeEstimate, Left(valueEntries.toIterator))
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
val bytes = blockManager.dataSerialize(blockId, values)
tryToPut(blockId, bytes, bytes.limit, false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
Expand Down
Loading