-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Patch for SPARK-942 #50
Changes from all commits
efe1102
cac1fad
d32992f
81d670c
f403826
5eb2b7e
44ec35a
56f71cd
95c7f67
0e6f808
2eeda75
a6424ba
9df0276
31fe08e
40fe1d7
00c98e0
8644ee8
656c33e
0f28ec7
627a8b7
c2fb430
16a4cea
7ccc74b
f70d069
2f684ea
33ac390
8aa31cd
60e0c57
9ef7cb8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,10 +71,30 @@ private[spark] class CacheManager(blockManager: BlockManager) extends Logging { | |
val computedValues = rdd.computeOrReadCheckpoint(split, context) | ||
// Persist the result, so long as the task is not running locally | ||
if (context.runningLocally) { return computedValues } | ||
val elements = new ArrayBuffer[Any] | ||
elements ++= computedValues | ||
blockManager.put(key, elements, storageLevel, tellMaster = true) | ||
elements.iterator.asInstanceOf[Iterator[T]] | ||
if (storageLevel.useDisk && !storageLevel.useMemory) { | ||
// In the case that this RDD is to be persisted using DISK_ONLY | ||
// the iterator will be passed directly to the blockManager (rather then | ||
// caching it to an ArrayBuffer first), then the resulting block data iterator | ||
// will be passed back to the user. If the iterator generates a lot of data, | ||
// this means that it doesn't all have to be held in memory at one time. | ||
// This could also apply to MEMORY_ONLY_SER storage, but we need to make sure | ||
// blocks aren't dropped by the block store before enabling that. | ||
blockManager.put(key, computedValues, storageLevel, tellMaster = true) | ||
return blockManager.get(key) match { | ||
case Some(values) => | ||
return new InterruptibleIterator(context, values.asInstanceOf[Iterator[T]]) | ||
case None => | ||
logInfo("Failure to store %s".format(key)) | ||
throw new Exception("Block manager failed to return persisted valued") | ||
} | ||
} else { | ||
// In this case the RDD is cached to an array buffer. This will save the results | ||
// if we're dealing with a 'one-time' iterator | ||
val elements = new ArrayBuffer[Any] | ||
elements ++= computedValues | ||
blockManager.put(key, elements, storageLevel, tellMaster = true) | ||
return elements.iterator.asInstanceOf[Iterator[T]] | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason why we still want to keep around the legacy case? It seems that the IteratorValues and ArrayBufferValues are handled in very similar ways downstream (e.g. in BlockManager.doPut, and memoryStore.putValues), and it would be nice if we could somehow merge these two code paths into a simpler one. The other thing is that computeValues in L71 is already an iterator anyway, so we can just feed this directly into BlockManager. The existing way of making it an ArrayBuffer arbitrarily and back into an Iterator seems a little unnecessary. (That said, there might be a performance reason that I haven't thought of.) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, performance and correctness are actually both reasons. The code path for disk first writes the data to disk and then has to read and deserialize it from there, which is slow. Also, if you used the memory store in the same way, the store might drop it before you have a chance to call get(). See my comments on the main discussion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even if we use the same code path for say MEM_ONLY, we won't actually write the data to disk since we also pass the storage level to BlockManager along with the data. But I see your second point about possibly dropping a block before we read it. That does seem to prevent us from merging the two cases. |
||
} finally { | ||
loading.synchronized { | ||
loading.remove(key) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -37,7 +37,7 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage | |
diskManager.getBlockLocation(blockId).length | ||
} | ||
|
||
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) { | ||
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = { | ||
// So that we do not modify the input offsets ! | ||
// duplicate does not copy buffer, so inexpensive | ||
val bytes = _bytes.duplicate() | ||
|
@@ -52,20 +52,30 @@ private class DiskStore(blockManager: BlockManager, diskManager: DiskBlockManage | |
val finishTime = System.currentTimeMillis | ||
logDebug("Block %s stored as %s file on disk in %d ms".format( | ||
file.getName, Utils.bytesToString(bytes.limit), (finishTime - startTime))) | ||
return PutResult(bytes.limit(), Right(bytes.duplicate())) | ||
} | ||
|
||
override def putValues( | ||
blockId: BlockId, | ||
values: ArrayBuffer[Any], | ||
level: StorageLevel, | ||
returnValues: Boolean) | ||
: PutResult = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The formatting of this parameter list is wrong (sorry, not sure why Jenkins is not catching this); the proper formatting should be
See https://cwiki.apache.org/confluence/display/SPARK/Spark+Code+Style+Guide for some examples |
||
return putValues(blockId, values.toIterator, level, returnValues) | ||
} | ||
|
||
override def putValues( | ||
blockId: BlockId, | ||
values: Iterator[Any], | ||
level: StorageLevel, | ||
returnValues: Boolean) | ||
: PutResult = { | ||
|
||
logDebug("Attempting to write values for block " + blockId) | ||
val startTime = System.currentTimeMillis | ||
val file = diskManager.getFile(blockId) | ||
val outputStream = new FileOutputStream(file) | ||
blockManager.dataSerializeStream(blockId, outputStream, values.iterator) | ||
blockManager.dataSerializeStream(blockId, outputStream, values) | ||
val length = file.length | ||
|
||
val timeTaken = System.currentTimeMillis - startTime | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -49,7 +49,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) | |
} | ||
} | ||
|
||
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) { | ||
override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel) : PutResult = { | ||
// Work on a duplicate - since the original input might be used elsewhere. | ||
val bytes = _bytes.duplicate() | ||
bytes.rewind() | ||
|
@@ -59,8 +59,10 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) | |
elements ++= values | ||
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) | ||
tryToPut(blockId, elements, sizeEstimate, true) | ||
PutResult(sizeEstimate, Left(values.toIterator)) | ||
} else { | ||
tryToPut(blockId, bytes, bytes.limit, false) | ||
PutResult(bytes.limit(), Right(bytes.duplicate())) | ||
} | ||
} | ||
|
||
|
@@ -69,14 +71,33 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long) | |
values: ArrayBuffer[Any], | ||
level: StorageLevel, | ||
returnValues: Boolean) | ||
: PutResult = { | ||
|
||
: PutResult = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same thing on formatting |
||
if (level.deserialized) { | ||
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef]) | ||
tryToPut(blockId, values, sizeEstimate, true) | ||
PutResult(sizeEstimate, Left(values.iterator)) | ||
PutResult(sizeEstimate, Left(values.toIterator)) | ||
} else { | ||
val bytes = blockManager.dataSerialize(blockId, values.toIterator) | ||
tryToPut(blockId, bytes, bytes.limit, false) | ||
PutResult(bytes.limit(), Right(bytes.duplicate())) | ||
} | ||
} | ||
|
||
override def putValues( | ||
blockId: BlockId, | ||
values: Iterator[Any], | ||
level: StorageLevel, | ||
returnValues: Boolean) | ||
: PutResult = { | ||
|
||
if (level.deserialized) { | ||
val valueEntries = new ArrayBuffer[Any]() | ||
valueEntries ++= values | ||
val sizeEstimate = SizeEstimator.estimate(valueEntries.asInstanceOf[AnyRef]) | ||
tryToPut(blockId, valueEntries, sizeEstimate, true) | ||
PutResult(sizeEstimate, Left(valueEntries.toIterator)) | ||
} else { | ||
val bytes = blockManager.dataSerialize(blockId, values.iterator) | ||
val bytes = blockManager.dataSerialize(blockId, values) | ||
tryToPut(blockId, bytes, bytes.limit, false) | ||
PutResult(bytes.limit(), Right(bytes.duplicate())) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not the only condition where we want to do this. For example we might also want it for MEMORY_ONLY_SER, where the serialized data might fit in RAM but the ArrayBuffer of raw objects might not. (Especially if you set spark.rdd.compress to compress the serialized data.)