Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-6222][Streaming] Dont delete checkpoint data when doing pre-batch-start checkpoint #5008

Closed
wants to merge 5 commits into from

Conversation

tdas
Copy link
Contributor

@tdas tdas commented Mar 13, 2015

This is another alternative approach to #4964
I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily.

@tdas
Copy link
Contributor Author

tdas commented Mar 13, 2015

@harishreedharan Can you take a look.

@SparkQA
Copy link

SparkQA commented Mar 13, 2015

Test build #28551 has finished for PR 5008 at commit 295ca5c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • sealed abstract class AttributeType(val name: String)
    • sealed abstract class Attribute extends Serializable
    • class CheckpointWriteHandler(

@SparkQA
Copy link

SparkQA commented Mar 13, 2015

Test build #28552 has finished for PR 5008 at commit 50cb60b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • sealed abstract class AttributeType(val name: String)
    • sealed abstract class Attribute extends Serializable
    • class CheckpointWriteHandler(

@harishreedharan
Copy link
Contributor

@tdas - Actually this fixes one part of the problem, which is caused by starting of checkpoint at the time the job is generated.

But this can still cause an issue if you set concurrentJobs is pretty high. If you set that parameter high enough, a batch which may have started at time t + maxRememberDuration, might end up completing and checkpointing before a batch at time t if the batch at time t takes longer to get processed.

I have seen people set concurrentJobs to be pretty high when the cluster is large and the processing order is not exactly relevant. #4964 actually takes care of that situation (which is what the maps are for). If that is not required, there is an even easier fix than this one, which was in a previous commit in that PR: harishreedharan@fa93b87
Simply keep track of the last completed batch and delete the checkpoint when the checkpoint time is actually the same as the last completed batch.

@harishreedharan
Copy link
Contributor

For the case where concurrentJobs == 1, this works. So let's merge this in, while I work on a cleaner approach for the case where concurrentJobs > 1.

@tdas
Copy link
Contributor Author

tdas commented Mar 17, 2015

@harishreedharan Checkout the unit test.

@tdas
Copy link
Contributor Author

tdas commented Mar 17, 2015

@harishreedharan If you set cleanCheckpointDataLater = true in the clearMetadata function to emulate current master, then this unit test fails. You can also checkout this new testsuite file a Spark repo and test.

@tdas
Copy link
Contributor Author

tdas commented Mar 17, 2015

@pwendell If @harishreedharan gives a LGTM on this patch, and needs this merged urgently for CDH 5.3 (while I am in flight), please merge this.

@SparkQA
Copy link

SparkQA commented Mar 17, 2015

Test build #28721 has finished for PR 5008 at commit 7315bc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class CheckpointWriteHandler(

@SparkQA
Copy link

SparkQA commented Mar 17, 2015

Test build #627 has finished for PR 5008 at commit 7315bc2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@harishreedharan
Copy link
Contributor

LGTM. Ran the failing test on a real cluster - no data loss anymore when concurrentJobs = 1.

The unit test looks good too, ran it against current master - it fails, so it works as well.

@harishreedharan
Copy link
Contributor

@tdas, @pwendell - I think this one is ready to be merged.

@tdas
Copy link
Contributor Author

tdas commented Mar 17, 2015

Just landed in NY. Will merge when I get a chance.
On Mar 17, 2015 3:24 PM, "Hari Shreedharan" notifications@github.com
wrote:

@tdas https://github.com/tdas, @pwendell https://github.com/pwendell

  • I think this one is ready to be merged.


Reply to this email directly or view it on GitHub
#5008 (comment).

@harishreedharan
Copy link
Contributor

Great!

asfgit pushed a commit that referenced this pull request Mar 19, 2015
…tch-start checkpoint

This is another alternative approach to #4964
I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #5008 from tdas/SPARK-6222 and squashes the following commits:

7315bc2 [Tathagata Das] Removed empty line.
c438de4 [Tathagata Das] Revert unnecessary change.
5e98374 [Tathagata Das] Added unit test
50cb60b [Tathagata Das] Fixed style issue
295ca5c [Tathagata Das] Fixing SPARK-6222

(cherry picked from commit 645cf3f)
Signed-off-by: Tathagata Das <tathagata.das1565@gmail.com>
@asfgit asfgit closed this in 645cf3f Mar 19, 2015
vanzin pushed a commit to vanzin/spark that referenced this pull request Apr 20, 2015
…tch-start checkpoint

This is another alternative approach to apache#4964
I think this is a simpler fix that can be backported easily to other branches (1.2 and 1.3).

All it does it introduce a flag so that the pre-batch-start checkpoint does not call clear checkpoint.

There is not unit test yet. I will add it when this approach is commented upon. Not sure if this is testable easily.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes apache#5008 from tdas/SPARK-6222 and squashes the following commits:

7315bc2 [Tathagata Das] Removed empty line.
c438de4 [Tathagata Das] Revert unnecessary change.
5e98374 [Tathagata Das] Added unit test
50cb60b [Tathagata Das] Fixed style issue
295ca5c [Tathagata Das] Fixing SPARK-6222
@sardetushar
Copy link

sardetushar commented Jan 6, 2017

Hi @harishreedharan @tdas can you please help me, i am losing data when i killed the spark app (driver). Below is my case

I have one JeroMQ publisher
Spark Streaming will start the Custom JeroMQ receiver, WAL functionality is enabled i can see the messages are being stored in receivedData dir in checkpoint.
Not when i restart the spark streaming APP or driver, if the receivedData messages count is 25000 then only 400 are being processed after the driver or spark app restart.
I can confirm by looking at receivedData dir that in WAL, received 25000 messages but after restart driver not able to fetch or process all the records from WAL dir.

After looking at above patch, do i need to add any parameters in SparkConf like for WAL we have
spark.streaming.receiver.writeAheadLog.enable and I am using Java API.

@sardetushar
Copy link

Hi, I manage to solve this issue. now i am able to recover all the data but as discussed in this thread [Data Duplicate issue] (https://www.mail-archive.com/user@spark.apache.org/msg52687.html)
how do i avoid duplicate Messages With Spark Streaming Using Checkpoint After Restart In Case Of Failure
Ex. if i have published 500 records and spark processed 300 and after that i killed the driver and restart then in the output directory i see the count of 800 messages being processed. this means spark is processing same (300) records again.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants