-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-2792. Fix reading too much or too little data from each stream in ExternalMap / Sorter #1722
Changes from all commits
9a78e4b
0d6dad7
bda37bb
0374217
576ee83
18fe865
5d4bfb5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,7 @@ | |
|
||
package org.apache.spark.util.collection | ||
|
||
import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException} | ||
import java.io._ | ||
import java.util.Comparator | ||
|
||
import scala.collection.BufferedIterator | ||
|
@@ -28,7 +28,7 @@ import com.google.common.io.ByteStreams | |
|
||
import org.apache.spark.{Logging, SparkEnv} | ||
import org.apache.spark.annotation.DeveloperApi | ||
import org.apache.spark.serializer.Serializer | ||
import org.apache.spark.serializer.{DeserializationStream, Serializer} | ||
import org.apache.spark.storage.{BlockId, BlockManager} | ||
import org.apache.spark.util.collection.ExternalAppendOnlyMap.HashComparator | ||
|
||
|
@@ -199,13 +199,16 @@ class ExternalAppendOnlyMap[K, V, C]( | |
|
||
// Flush the disk writer's contents to disk, and update relevant variables | ||
def flush() = { | ||
writer.commitAndClose() | ||
val bytesWritten = writer.bytesWritten | ||
val w = writer | ||
writer = null | ||
w.commitAndClose() | ||
val bytesWritten = w.bytesWritten | ||
batchSizes.append(bytesWritten) | ||
_diskBytesSpilled += bytesWritten | ||
objectsWritten = 0 | ||
} | ||
|
||
var success = false | ||
try { | ||
val it = currentMap.destructiveSortedIterator(keyComparator) | ||
while (it.hasNext) { | ||
|
@@ -215,16 +218,28 @@ class ExternalAppendOnlyMap[K, V, C]( | |
|
||
if (objectsWritten == serializerBatchSize) { | ||
flush() | ||
writer.close() | ||
writer = blockManager.getDiskWriter(blockId, file, serializer, fileBufferSize) | ||
} | ||
} | ||
if (objectsWritten > 0) { | ||
flush() | ||
} else if (writer != null) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't appear to call writer.close() if objectsWritten == 0, is that the case? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when objectsWritten == 0, writer != null will hold. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, right, so it would be. |
||
val w = writer | ||
writer = null | ||
w.revertPartialWritesAndClose() | ||
} | ||
success = true | ||
} finally { | ||
// Partial failures cannot be tolerated; do not revert partial writes | ||
writer.close() | ||
if (!success) { | ||
// This code path only happens if an exception was thrown above before we set success; | ||
// close our stuff and let the exception be thrown further | ||
if (writer != null) { | ||
writer.revertPartialWritesAndClose() | ||
} | ||
if (file.exists()) { | ||
file.delete() | ||
} | ||
} | ||
} | ||
|
||
currentMap = new SizeTrackingAppendOnlyMap[K, C] | ||
|
@@ -389,27 +404,51 @@ class ExternalAppendOnlyMap[K, V, C]( | |
* An iterator that returns (K, C) pairs in sorted order from an on-disk map | ||
*/ | ||
private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) | ||
extends Iterator[(K, C)] { | ||
private val fileStream = new FileInputStream(file) | ||
private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) | ||
extends Iterator[(K, C)] | ||
{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here I also removed some of the more paranoid asserts about batchSizes There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Those asserts caught the bugs :-) Bug yeah, some of them might have been expensive. |
||
private val batchOffsets = batchSizes.scanLeft(0L)(_ + _) // Size will be batchSize.length + 1 | ||
assert(file.length() == batchOffsets(batchOffsets.length - 1)) | ||
|
||
private var batchIndex = 0 // Which batch we're in | ||
private var fileStream: FileInputStream = null | ||
|
||
// An intermediate stream that reads from exactly one batch | ||
// This guards against pre-fetching and other arbitrary behavior of higher level streams | ||
private var batchStream = nextBatchStream() | ||
private var compressedStream = blockManager.wrapForCompression(blockId, batchStream) | ||
private var deserializeStream = ser.deserializeStream(compressedStream) | ||
private var deserializeStream = nextBatchStream() | ||
private var nextItem: (K, C) = null | ||
private var objectsRead = 0 | ||
|
||
/** | ||
* Construct a stream that reads only from the next batch. | ||
*/ | ||
private def nextBatchStream(): InputStream = { | ||
if (batchSizes.length > 0) { | ||
ByteStreams.limit(bufferedStream, batchSizes.remove(0)) | ||
private def nextBatchStream(): DeserializationStream = { | ||
// Note that batchOffsets.length = numBatches + 1 since we did a scan above; check whether | ||
// we're still in a valid batch. | ||
if (batchIndex < batchOffsets.length - 1) { | ||
if (deserializeStream != null) { | ||
deserializeStream.close() | ||
fileStream.close() | ||
deserializeStream = null | ||
fileStream = null | ||
} | ||
|
||
val start = batchOffsets(batchIndex) | ||
fileStream = new FileInputStream(file) | ||
fileStream.getChannel.position(start) | ||
batchIndex += 1 | ||
|
||
val end = batchOffsets(batchIndex) | ||
|
||
assert(end >= start, "start = " + start + ", end = " + end + | ||
", batchOffsets = " + batchOffsets.mkString("[", ", ", "]")) | ||
|
||
val bufferedStream = new BufferedInputStream(ByteStreams.limit(fileStream, end - start)) | ||
val compressedStream = blockManager.wrapForCompression(blockId, bufferedStream) | ||
ser.deserializeStream(compressedStream) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One delta w.r.t. your patch, @mridulm: you used to do There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So that is something I was not sure of : particularly with kryo (not java). |
||
} else { | ||
// No more batches left | ||
bufferedStream | ||
cleanup() | ||
null | ||
} | ||
} | ||
|
||
|
@@ -424,10 +463,8 @@ class ExternalAppendOnlyMap[K, V, C]( | |
val item = deserializeStream.readObject().asInstanceOf[(K, C)] | ||
objectsRead += 1 | ||
if (objectsRead == serializerBatchSize) { | ||
batchStream = nextBatchStream() | ||
compressedStream = blockManager.wrapForCompression(blockId, batchStream) | ||
deserializeStream = ser.deserializeStream(compressedStream) | ||
objectsRead = 0 | ||
deserializeStream = nextBatchStream() | ||
} | ||
item | ||
} catch { | ||
|
@@ -439,6 +476,9 @@ class ExternalAppendOnlyMap[K, V, C]( | |
|
||
override def hasNext: Boolean = { | ||
if (nextItem == null) { | ||
if (deserializeStream == null) { | ||
return false | ||
} | ||
nextItem = readNextItem() | ||
} | ||
nextItem != null | ||
|
@@ -455,7 +495,11 @@ class ExternalAppendOnlyMap[K, V, C]( | |
|
||
// TODO: Ensure this gets called even if the iterator isn't drained. | ||
private def cleanup() { | ||
deserializeStream.close() | ||
batchIndex = batchOffsets.length // Prevent reading any other batch | ||
val ds = deserializeStream | ||
deserializeStream = null | ||
fileStream = null | ||
ds.close() | ||
file.delete() | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the right behavior, but is a slight change ... I dont think anyone is expecting the earlier behavior though !