-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-23433][CORE] Late zombie task completions update all tasksets #21131
Conversation
After a fetch failure and stage retry, we may have multiple tasksets which are active for a given stage. A late completion from an earlier attempt of the stage should update the most recent attempt for the stage, so it does not try to submit another task for the same partition, and so that it knows when it is completed.
Test build #89739 has finished for PR 21131 at commit
|
@markhamstra @zsxwing @jiangxb1987 @Ngone51 would appreciate a review, thanks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the improvement is reasonable. Though, DAGScheduler
has already supported to handle an earlier stage attempt's successful task as a part of the stage's success at the end, but DAGScheduler
lose control of the stage when it is (re)submitted. So, this improvement likes a second fence to avoid submitting a unnecessary task at low level.
And I'm considering, do we need to handle other running tasks in those TaskSet(Manager)
s like speculative task do once a task completed. As we do not need to wait a task complete which has already succeed in other TaskSet
. Though, maybe, there're some listeners waiting for their own TaskSet
's tasks running status. But, I guess they care more about the task's success result.
/** | ||
* Marks the task has completed in all TaskSetManagers for the given stage. | ||
* | ||
* After stage failure and retry, there may be multiple active TaskSetManagers for the stage. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIRC, there's only one active TaskSetManager
for a given stage, and with some zombie TaskSetManager
s possibly. Though, there may be some running tasks in zombie TaskSetManager
s.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah the terminology is a bit of mess here ... I dunno if we consistently distinguish the use of "active" for one taskset which is non-zombie vs. all the tasksets which have some tasks that are running (though all-but-one must be zombies).
@markhamstra @kayousterhout thoughts on naming?
In any case, I think you're right, I will remove "active" here.
* attempt can lead to the entire stage getting marked as successful. | ||
*/ | ||
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = { | ||
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally, it seems impossible for a unfinished TaskSet
to get an empty Map()
in taskSetsByStageIdAndAttempt
. But, if it does, maybe, we can tell the caller the stage has already finished.
// we update the blacklist for the stage attempts with all successful tasks. Even though | ||
// some tasksets had failures, we still consider them all successful from a blacklisting | ||
// perspective, as the failures weren't from a problem w/ the tasks themselves. | ||
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is meq()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is mockito's eq
matcher which is renamed to avoid clashing with scala's eq
, this is a standard rename we use in the codebase:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, the code is folded, no wonder I didn't find it. Thank you.
taskScheduler.submitTasks(finalAttempt) | ||
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get | ||
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } | ||
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yet, launched tasks has nothing to do with other running tasks in other TaskSet
s. But, is it possible to take those running tasks into consideration when launch a new task (in source code) ? For example, launching FetchFailed task or tasks who do not have a running copy across TaskSet
s firstly ?
(But, it seems we will always have running copies in other TaskSet
s for our final TaskSet
, except FetchFailed task, right? It's more like we are not talking about resubmitting a stage, but resubmitting tasks who do not have running copies across previous TaskSet
s.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we've previously debated about what to do with the tasks still running in a zombie attempt, and there hasn't been any definitive conclusion. I'm just trying to do a correctness fix here. Briefly, in general there is an expectation that those tasks are unlikely to succeed (because they won't be able to get their shuffle input, same as the original fetch failure), so we don't want to delay starting a new attempt of that task. And perhaps we should even actively kill those tasks (you'll see comments about that in various places). But if they do succeed, we need to handle them correctly. Note that even if we did try to actively kill them, you'd still need to handle a late-completion, as killing would only be "best-effort".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because they won't be able to get their shuffle input, same as the original fetch failure
why? In DAGScheduler
, we only unregister one MapStatus of parent stage, so other running tasks within the failed (child) stage (caused by a fetch fail task) may still get MapOutputs from MapOutputTrackerMaster
, and fetch data from other Executor
s. So, they can success normally.
Do I miss something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the assumption is that a fetchfailure means that all data on that host is unavailable. As shuffles are all-to-all, its very likely that every task is going to need some piece of data from that host. Its possible that they already grabbed all the data they need, before the problem occurred with the host, we don't know. Also, there is no "partial progress" for a task -- tasks don't know how to grab all the shuffle output they can, then just wait until the missing bit becomes available again. They fail as soon as the data they need is unavailable (with some retries, but there is no "pause" nor a check for data on another source).
Also the dagscheduler is a little confusing on this -- it does the unregister in two parts (I have no idea why anymore, to be honest):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The explanation is quite clear and I get understand now. Thank you very mush! @squito
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice patch only a tiny nit! Thanks for working on this!
// should be active in every taskset. We choose a zombie taskset just to make sure that | ||
// we transition the active taskset correctly even if the final completion comes | ||
// from a zombie. | ||
zombieAttempts(partition % 2) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... I know it's pretty nitpick but since remainingTasks
is a set, you can't guarantee the final completion comes from a zombie. It's fine to keep this, or we can finish the partition 0 first instead.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not a nitpick at all, thanks for catching this! I'll update
|
||
// finally, lets complete all the tasks. We simulate failures in attempt 1, but everything | ||
// else succeeds, to make sure we get the right updates to the blacklist in all cases. | ||
(zombieAttempts ++ Seq(finalTsm)).foreach { tsm => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here you can reuse the val "allTaskSets".
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for pointing that out -- though actually I'm going to go the other direction, I realized allTaskSets
is not necessary at all.
val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) } | ||
taskScheduler.resourceOffers(offers) | ||
assert(tsm.runningTasks === 10) | ||
if (stageAttempt < 2) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This condition is not needed as the stageAttempt iterates on Range(0, 1).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, fixed
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty) | ||
|
||
val allTaskSets = zombieAttempts ++ Seq(finalTsm) | ||
val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I see remainingTasks is always the same as finalAttemptLaunchedPartitions. I am wondering whether it is more readable to use finalAttemptLaunchedPartitions here for initialisation.
Test build #89790 has finished for PR 21131 at commit
|
Test build #89792 has finished for PR 21131 at commit
|
LGTM, and nice UT. |
Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes #21131 from squito/SPARK-23433. (cherry picked from commit 94641fe) Signed-off-by: Imran Rashid <irashid@cloudera.com>
Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21131 from squito/SPARK-23433.
Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes #21131 from squito/SPARK-23433. (cherry picked from commit 94641fe) Signed-off-by: Imran Rashid <irashid@cloudera.com>
merged to master / 2.3 / 2.2 |
* [SPARK-23816][CORE] Killed tasks should ignore FetchFailures. SPARK-19276 ensured that FetchFailures do not get swallowed by other layers of exception handling, but it also meant that a killed task could look like a fetch failure. This is particularly a problem with speculative execution, where we expect to kill tasks as they are reading shuffle data. The fix is to ensure that we always check for killed tasks first. Added a new unit test which fails before the fix, ran it 1k times to check for flakiness. Full suite of tests on jenkins. Author: Imran Rashid <irashid@cloudera.com> Closes apache#20987 from squito/SPARK-23816. (cherry picked from commit 10f45bb) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen. `EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen. ```scala scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF() df: org.apache.spark.sql.DataFrame = [_1: double, _2: double] scala> df.show() +----+----+ | _1| _2| +----+----+ |-1.0|null| |null|-1.0| +----+----+ scala> df.filter("_1 <=> _2").show() +----+----+ | _1| _2| +----+----+ |-1.0|null| |null|-1.0| +----+----+ ``` The result should be empty but the result remains two rows. Added a test. Author: Takuya UESHIN <ueshin@databricks.com> Closes apache#21094 from ueshin/issues/SPARK-24007/equalnullsafe. (cherry picked from commit f09a9e9) Signed-off-by: gatorsmile <gatorsmile@gmail.com> * [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table ## What changes were proposed in this pull request? TableReader would get disproportionately slower as the number of columns in the query increased. I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better. ## How was this patch tested? Manual testing All sbt unit tests python sql tests Author: Bruce Robbins <bersprockets@gmail.com> Closes apache#21043 from bersprockets/tabreadfix. * [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId ## What changes were proposed in this pull request? Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66 ## How was this patch tested? N/A Author: seancxmao <seancxmao@gmail.com> Closes apache#21113 from seancxmao/SPARK-13136. (cherry picked from commit c303b1b) Signed-off-by: hyukjinkwon <gurwls223@apache.org> * [SPARK-23941][MESOS] Mesos task failed on specific spark app name ## What changes were proposed in this pull request? Shell escaped the name passed to spark-submit and change how conf attributes are shell escaped. ## How was this patch tested? This test has been tested manually with Hive-on-spark with mesos or with the use case described in the issue with the sparkPi application with a custom name which contains illegal shell characters. With this PR, hive-on-spark on mesos works like a charm with hive 3.0.0-SNAPSHOT. I state that this contribution is my original work and that I license the work to the project under the project’s open source license Author: Bounkong Khamphousone <bounkong.khamphousone@ebiznext.com> Closes apache#21014 from tiboun/fix/SPARK-23941. (cherry picked from commit 6782359) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-23433][CORE] Late zombie task completions update all tasksets Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21131 from squito/SPARK-23433. (cherry picked from commit 94641fe) Signed-off-by: Imran Rashid <irashid@cloudera.com> * [SPARK-23489][SQL][TEST][BRANCH-2.2] HiveExternalCatalogVersionsSuite should verify the downloaded file ## What changes were proposed in this pull request? This is a backport of apache#21210 because `branch-2.2` also faces the same failures. Although [SPARK-22654](https://issues.apache.org/jira/browse/SPARK-22654) made `HiveExternalCatalogVersionsSuite` download from Apache mirrors three times, it has been flaky because it didn't verify the downloaded file. Some Apache mirrors terminate the downloading abnormally, the *corrupted* file shows the following errors. ``` gzip: stdin: not in gzip format tar: Child returned status 1 tar: Error is not recoverable: exiting now 22:46:32.700 WARN org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite: ===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.hive.HiveExternalCatalogVersionsSuite, thread names: Keep-Alive-Timer ===== *** RUN ABORTED *** java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "/tmp/test-spark/spark-2.2.0"): error=2, No such file or directory ``` This has been reported weirdly in two ways. For example, the above case is reported as Case 2 `no failures`. - Case 1. [Test Result (1 failure / +1)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/4389/) - Case 2. [Test Result (no failures)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.6/4811/) This PR aims to make `HiveExternalCatalogVersionsSuite` more robust by verifying the downloaded `tgz` file by extracting and checking the existence of `bin/spark-submit`. If it turns out that the file is empty or corrupted, `HiveExternalCatalogVersionsSuite` will do retry logic like the download failure. ## How was this patch tested? Pass the Jenkins. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#21232 from dongjoon-hyun/SPARK-23489-2. * [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly ## What changes were proposed in this pull request? It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode. This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`. ## How was this patch tested? a new test Author: Wenchen Fan <wenchen@databricks.com> Closes apache#21229 from cloud-fan/accumulator. (cherry picked from commit 4d5de4d) Signed-off-by: Wenchen Fan <wenchen@databricks.com> * [SPARK-21278][PYSPARK] Upgrade to Py4J 0.10.6 This PR aims to bump Py4J in order to fix the following float/double bug. Py4J 0.10.5 fixes this (py4j/py4j#272) and the latest Py4J is 0.10.6. **BEFORE** ``` >>> df = spark.range(1) >>> df.select(df['id'] + 17.133574204226083).show() +--------------------+ |(id + 17.1335742042)| +--------------------+ | 17.1335742042| +--------------------+ ``` **AFTER** ``` >>> df = spark.range(1) >>> df.select(df['id'] + 17.133574204226083).show() +-------------------------+ |(id + 17.133574204226083)| +-------------------------+ | 17.133574204226083| +-------------------------+ ``` Manual. Author: Dongjoon Hyun <dongjoon@apache.org> Closes apache#18546 from dongjoon-hyun/SPARK-21278. (cherry picked from commit c8d0aba) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve `LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes. This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s): ``` scala val n = 4000 val values = (1 to n).map(_.toString).mkString(", ") val columns = (1 to n).map("column" + _).mkString(", ") val query = s""" |SELECT $columns |FROM VALUES ($values) T($columns) |WHERE 1=2 AND 1 IN ($columns) |GROUP BY $columns |ORDER BY $columns |""".stripMargin spark.time(sql(query)) ``` Existing tests. Author: Herman van Hovell <hvanhovell@databricks.com> Closes apache#14083 from hvanhovell/SPARK-16406. * [PYSPARK] Update py4j to version 0.10.7. (cherry picked from commit cc613b5) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 323dc3a) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * [SPARKR] Match pyspark features in SparkR communication protocol. (cherry picked from commit 628c7b5) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> (cherry picked from commit 16cd9ac) Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com> * Keep old-style messages for AnalysisException with ambiguous references
Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21131 from squito/SPARK-23433. (cherry picked from commit 94641fe)
Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21131 from squito/SPARK-23433. (cherry picked from commit 94641fe)
How can this happen? the |
We don't have very precise terminology here -- I'm using "active" to mean a taskset which still has running tasks. Even when a taskset is zombie, it will have many previously launched tasks still going. |
ah i see. Does it only apply to the result stage? IIRC shuffle stage tracks shuffle epoch and will ignore the tasks from a killed stage. |
hmm, will we have a problem for shuffle here? Assuming a shuffle stage has 2 task sets, one is active, one is zombie. Both of them have running tasks. If a task from zombie task set finishes, it will send a task completion event to DAG scheduler. The event might be ignored later because the epoch is outdated. When the task in the normal task set finishes, it will not send event to DAG scheduler because this task is already marked as finished in this task set. Then the shuffle stage never finishes. |
The DAGSCheudler is notified about successfully completed tasks, whether or not the I don't think there are problems here ... though I agree its confusing and we could have better tests here ... |
if (tasksSuccessful == numTasks) { | ||
isZombie = true | ||
} | ||
maybeFinishTaskSet() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this line needed? We will call maybeFinishTaskSet()
at the end of handleSuccessfulTask
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're right, its not needed, its called when the tasks succeed, fail, or are aborted, and when this called while that taskset still has running tasks, then its a no-op, as it would fail the runningTasks == 0
check inside maybeFinishTaskSet()
.
do you think its worth removing? I'm fine either way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's too minor. If we touch this file again, let's remove it. Otherwise maybe not bother about it.
a late LGTM |
Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21131 from squito/SPARK-23433. (cherry picked from commit 94641fe) Signed-off-by: Imran Rashid <irashid@cloudera.com>
Reverting redundant method call from PR apache#21131, adding test setup code in test, changing from index to partition id etc.
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. #17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. #21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…ould learn about the finished partitions ## What changes were proposed in this pull request? This is an optional solution for #22806 . #21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue. This pr extends #21131 's behavior by adding stageIdToFinishedPartitions into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into stageIdToFinishedPartitions and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes #24007 from Ngone51/dev-23433-25250-branch-2.3. Authored-by: wuyi <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com>
…eption many times ## What changes were proposed in this pull request? https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times. This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot. #21131 firstly implements that a previous successful completed task from zombie `TaskSetManager` could mark the task of the same partition completed in the active `TaskSetManager`. Later #23871 improves the implementation to cover a corner case that, an active `TaskSetManager` hasn't been created when a previous task succeed. However, #23871 has a bug and was reverted in #24359. With hindsight, #23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed. This PR proposes a new fix: 1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it 2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet. This fix covers the corner case, because: 1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks` 2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created. Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable. ## How was this patch tested? a new test case. Closes #24375 from cloud-fan/fix2. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…bout the finished partitions ## What changes were proposed in this pull request? This is an optional solution for apache#22806 . apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue. This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks. ## How was this patch tested? Add. Closes apache#23871 from Ngone51/dev-23433-25250. Lead-authored-by: wuyi <ngone_5451@163.com> Co-authored-by: Ngone51 <ngone_5451@163.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit e5c6143) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache/spark#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache/spark#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. #22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes #23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit 7df5aa6)
…a stage ## What changes were proposed in this pull request? This is another attempt to fix the more-than-one-active-task-set-managers bug. apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail. This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error. apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed. However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error. apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions. This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager. After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole. ## How was this patch tested? existing tests. Closes apache#23927 from cloud-fan/scheduler. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Imran Rashid <irashid@cloudera.com> (cherry picked from commit cb20fbc) Signed-off-by: Imran Rashid <irashid@cloudera.com>
Ref: LIHADOOP-52383 Fetch failure lead to multiple tasksets which are active for a given stage. While there is only one "active" version of the taskset, the earlier attempts can still have running tasks, which can complete successfully. So a task completion needs to update every taskset so that it knows the partition is completed. That way the final active taskset does not try to submit another task for the same partition, and so that it knows when it is completed and when it should be marked as a "zombie". Added a regression test. Author: Imran Rashid <irashid@cloudera.com> Closes apache#21131 from squito/SPARK-23433.
Fetch failure lead to multiple tasksets which are active for a given
stage. While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully. So a task completion needs to update every taskset
so that it knows the partition is completed. That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".
Added a regression test.