Skip to content

Commit

Permalink
Make objectStreamReset counter count the last object written too
Browse files Browse the repository at this point in the history
This makes it precise -- before we'd only reset after (reset + 1) writes
  • Loading branch information
mateiz committed Aug 3, 2014
1 parent 18fe865 commit 5d4bfb5
Show file tree
Hide file tree
Showing 3 changed files with 4 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,10 @@ private[spark] class JavaSerializationStream(out: OutputStream, counterReset: In
*/
def writeObject[T: ClassTag](t: T): SerializationStream = {
objOut.writeObject(t)
if (counterReset >= 0 && counter >= counterReset) {
counter += 1
if (counterReset > 0 && counter >= counterReset) {
objOut.reset()
counter = 0
} else {
counter += 1
}
this
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class ExternalAppendOnlyMapSuite extends FunSuite with LocalSparkContext {
val conf = new SparkConf(loadDefaults)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "0")
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext {
val conf = new SparkConf(loadDefaults)
// Make the Java serializer write a reset instruction (TC_RESET) after each object to test
// for a bug we had with bytes written past the last object in a batch (SPARK-2792)
conf.set("spark.serializer.objectStreamReset", "0")
conf.set("spark.serializer.objectStreamReset", "1")
conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
// Ensure that we actually have multiple batches per spill file
conf.set("spark.shuffle.spill.batchSize", "10")
Expand Down

0 comments on commit 5d4bfb5

Please sign in to comment.