Skip to content

Commit

Permalink
[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-ba…
Browse files Browse the repository at this point in the history
…tch-start checkpoint

This is another alternative approach to apache#4964
I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#5008 from tdas/SPARK-6222 and squashes the following commits:

7315bc2 [Tathagata Das] Removed empty line.
c438de4 [Tathagata Das] Revert unnecessary change.
5e98374 [Tathagata Das] Added unit test
50cb60b [Tathagata Das] Fixed style issue
295ca5c [Tathagata Das] Fixing SPARK-6222
  • Loading branch information
tdas committed Mar 19, 2015
1 parent 540b2a4 commit 645cf3f
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,10 @@ class CheckpointWriter(
private var stopped = false
private var fs_ : FileSystem = _

class CheckpointWriteHandler(checkpointTime: Time, bytes: Array[Byte]) extends Runnable {
class CheckpointWriteHandler(
checkpointTime: Time,
bytes: Array[Byte],
clearCheckpointDataLater: Boolean) extends Runnable {
def run() {
var attempts = 0
val startTime = System.currentTimeMillis()
Expand Down Expand Up @@ -166,7 +169,7 @@ class CheckpointWriter(
val finishTime = System.currentTimeMillis()
logInfo("Checkpoint for time " + checkpointTime + " saved to file '" + checkpointFile +
"', took " + bytes.length + " bytes and " + (finishTime - startTime) + " ms")
jobGenerator.onCheckpointCompletion(checkpointTime)
jobGenerator.onCheckpointCompletion(checkpointTime, clearCheckpointDataLater)
return
} catch {
case ioe: IOException =>
Expand All @@ -180,15 +183,16 @@ class CheckpointWriter(
}
}

def write(checkpoint: Checkpoint) {
def write(checkpoint: Checkpoint, clearCheckpointDataLater: Boolean) {
val bos = new ByteArrayOutputStream()
val zos = compressionCodec.compressedOutputStream(bos)
val oos = new ObjectOutputStream(zos)
oos.writeObject(checkpoint)
oos.close()
bos.close()
try {
executor.execute(new CheckpointWriteHandler(checkpoint.checkpointTime, bos.toByteArray))
executor.execute(new CheckpointWriteHandler(
checkpoint.checkpointTime, bos.toByteArray, clearCheckpointDataLater))
logDebug("Submitted checkpoint of time " + checkpoint.checkpointTime + " writer queue")
} catch {
case rej: RejectedExecutionException =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ import org.apache.spark.util.{Clock, ManualClock}
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent
private[scheduler] case class DoCheckpoint(time: Time) extends JobGeneratorEvent
private[scheduler] case class DoCheckpoint(
time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent

/**
Expand Down Expand Up @@ -163,8 +164,10 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
/**
* Callback called when the checkpoint of a batch has been written.
*/
def onCheckpointCompletion(time: Time) {
eventActor ! ClearCheckpointData(time)
def onCheckpointCompletion(time: Time, clearCheckpointDataLater: Boolean) {
if (clearCheckpointDataLater) {
eventActor ! ClearCheckpointData(time)
}
}

/** Processes all events */
Expand All @@ -173,7 +176,8 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
event match {
case GenerateJobs(time) => generateJobs(time)
case ClearMetadata(time) => clearMetadata(time)
case DoCheckpoint(time) => doCheckpoint(time)
case DoCheckpoint(time, clearCheckpointDataLater) =>
doCheckpoint(time, clearCheckpointDataLater)
case ClearCheckpointData(time) => clearCheckpointData(time)
}
}
Expand Down Expand Up @@ -245,7 +249,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
case Failure(e) =>
jobScheduler.reportError("Error generating jobs for time " + time, e)
}
eventActor ! DoCheckpoint(time)
eventActor ! DoCheckpoint(time, clearCheckpointDataLater = false)
}

/** Clear DStream metadata for the given `time`. */
Expand All @@ -255,7 +259,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
// If checkpointing is enabled, then checkpoint,
// else mark batch to be fully processed
if (shouldCheckpoint) {
eventActor ! DoCheckpoint(time)
eventActor ! DoCheckpoint(time, clearCheckpointDataLater = true)
} else {
// If checkpointing is not enabled, then delete metadata information about
// received blocks (block data not saved in any case). Otherwise, wait for
Expand All @@ -278,11 +282,11 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
}

/** Perform checkpoint for the give `time`. */
private def doCheckpoint(time: Time) {
private def doCheckpoint(time: Time, clearCheckpointDataLater: Boolean) {
if (shouldCheckpoint && (time - graph.zeroTime).isMultipleOf(ssc.checkpointDuration)) {
logInfo("Checkpointing graph for time " + time)
ssc.graph.updateCheckpointData(time)
checkpointWriter.write(new Checkpoint(ssc, time))
checkpointWriter.write(new Checkpoint(ssc, time), clearCheckpointDataLater)
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.streaming.scheduler

import java.util.concurrent.CountDownLatch

import scala.concurrent.duration._
import scala.language.postfixOps

import org.scalatest.concurrent.Eventually._

import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.util.{ManualClock, Utils}

class JobGeneratorSuite extends TestSuiteBase {

// SPARK-6222 is a tricky regression bug which causes received block metadata
// to be deleted before the corresponding batch has completed. This occurs when
// the following conditions are met.
// 1. streaming checkpointing is enabled by setting streamingContext.checkpoint(dir)
// 2. input data is received through a receiver as blocks
// 3. a batch processing a set of blocks takes a long time, such that a few subsequent
// batches have been generated and submitted for processing.
//
// The JobGenerator (as of Mar 16, 2015) checkpoints twice per batch, once after generation
// of a batch, and another time after the completion of a batch. The cleanup of
// checkpoint data (including block metadata, etc.) from DStream must be done only after the
// 2nd checkpoint has completed, that is, after the batch has been completely processed.
// However, the issue is that the checkpoint data and along with it received block data is
// cleaned even in the case of the 1st checkpoint, causing pre-mature deletion of received block
// data. For example, if the 3rd batch is still being process, the 7th batch may get generated,
// and the corresponding "1st checkpoint" will delete received block metadata of batch older
// than 6th batch. That, is 3rd batch's block metadata gets deleted even before 3rd batch has
// been completely processed.
//
// This test tries to create that scenario by the following.
// 1. enable checkpointing
// 2. generate batches with received blocks
// 3. make the 3rd batch never complete
// 4. allow subsequent batches to be generated (to allow premature deletion of 3rd batch metadata)
// 5. verify whether 3rd batch's block metadata still exists
//
test("SPARK-6222: Do not clear received block data too soon") {
import JobGeneratorSuite._
val checkpointDir = Utils.createTempDir()
val testConf = conf
testConf.set("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock")
testConf.set("spark.streaming.receiver.writeAheadLog.rollingInterval", "1")

withStreamingContext(new StreamingContext(testConf, batchDuration)) { ssc =>
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
val numBatches = 10
val longBatchNumber = 3 // 3rd batch will take a long time
val longBatchTime = longBatchNumber * batchDuration.milliseconds

val testTimeout = timeout(10 seconds)
val inputStream = ssc.receiverStream(new TestReceiver)

inputStream.foreachRDD((rdd: RDD[Int], time: Time) => {
if (time.milliseconds == longBatchTime) {
while (waitLatch.getCount() > 0) {
waitLatch.await()
println("Await over")
}
}
})

val batchCounter = new BatchCounter(ssc)
ssc.checkpoint(checkpointDir.getAbsolutePath)
ssc.start()

// Make sure the only 1 batch of information is to be remembered
assert(inputStream.rememberDuration === batchDuration)
val receiverTracker = ssc.scheduler.receiverTracker

// Get the blocks belonging to a batch
def getBlocksOfBatch(batchTime: Long) = {
receiverTracker.getBlocksOfBatchAndStream(Time(batchTime), inputStream.id)
}

// Wait for new blocks to be received
def waitForNewReceivedBlocks() {
eventually(testTimeout) {
assert(receiverTracker.hasUnallocatedBlocks)
}
}

// Wait for received blocks to be allocated to a batch
def waitForBlocksToBeAllocatedToBatch(batchTime: Long) {
eventually(testTimeout) {
assert(getBlocksOfBatch(batchTime).nonEmpty)
}
}

// Generate a large number of batches with blocks in them
for (batchNum <- 1 to numBatches) {
waitForNewReceivedBlocks()
clock.advance(batchDuration.milliseconds)
waitForBlocksToBeAllocatedToBatch(clock.getTimeMillis())
}

// Wait for 3rd batch to start
eventually(testTimeout) {
ssc.scheduler.getPendingTimes().contains(Time(numBatches * batchDuration.milliseconds))
}

// Verify that the 3rd batch's block data is still present while the 3rd batch is incomplete
assert(getBlocksOfBatch(longBatchTime).nonEmpty, "blocks of incomplete batch already deleted")
assert(batchCounter.getNumCompletedBatches < longBatchNumber)
waitLatch.countDown()
}
}
}

object JobGeneratorSuite {
val waitLatch = new CountDownLatch(1)
}

0 comments on commit 645cf3f

Please sign in to comment.