Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23433][CORE] Late zombie task completions update all tasksets #21131

Closed
wants to merge 3 commits into from

Conversation

squito
Copy link
Contributor

@squito squito commented Apr 23, 2018

Fetch failure lead to multiple tasksets which are active for a given
stage. While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully. So a task completion needs to update every taskset
so that it knows the partition is completed. That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

After a fetch failure and stage retry, we may have multiple tasksets
which are active for a given stage.  A late completion from an earlier
attempt of the stage should update the most recent attempt for the
stage, so it does not try to submit another task for the same partition,
and so that it knows when it is completed.
@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89739 has finished for PR 21131 at commit 0720a7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor Author

squito commented Apr 24, 2018

@markhamstra @zsxwing @jiangxb1987 @Ngone51 would appreciate a review, thanks

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the improvement is reasonable. Though, DAGScheduler has already supported to handle an earlier stage attempt's successful task as a part of the stage's success at the end, but DAGScheduler lose control of the stage when it is (re)submitted. So, this improvement likes a second fence to avoid submitting a unnecessary task at low level.

And I'm considering, do we need to handle other running tasks in those TaskSet(Manager)s like speculative task do once a task completed. As we do not need to wait a task complete which has already succeed in other TaskSet. Though, maybe, there're some listeners waiting for their own TaskSet's tasks running status. But, I guess they care more about the task's success result.

/**
* Marks the task has completed in all TaskSetManagers for the given stage.
*
* After stage failure and retry, there may be multiple active TaskSetManagers for the stage.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, there's only one active TaskSetManager for a given stage, and with some zombie TaskSetManagers possibly. Though, there may be some running tasks in zombie TaskSetManagers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah the terminology is a bit of mess here ... I dunno if we consistently distinguish the use of "active" for one taskset which is non-zombie vs. all the tasksets which have some tasks that are running (though all-but-one must be zombies).
@markhamstra @kayousterhout thoughts on naming?

In any case, I think you're right, I will remove "active" here.

* attempt can lead to the entire stage getting marked as successful.
*/
private[scheduler] def markPartitionCompletedInAllTaskSets(stageId: Int, partitionId: Int) = {
taskSetsByStageIdAndAttempt.getOrElse(stageId, Map()).values.foreach { tsm =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally, it seems impossible for a unfinished TaskSet to get an empty Map() in taskSetsByStageIdAndAttempt . But, if it does, maybe, we can tell the caller the stage has already finished.

// we update the blacklist for the stage attempts with all successful tasks. Even though
// some tasksets had failures, we still consider them all successful from a blacklisting
// perspective, as the failures weren't from a problem w/ the tasks themselves.
verify(blacklist).updateBlacklistForSuccessfulTaskSet(meq(0), meq(stageAttempt), anyObject())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is meq() ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is mockito's eq matcher which is renamed to avoid clashing with scala's eq, this is a standard rename we use in the codebase:

https://github.com/apache/spark/blob/master/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala#L24

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the code is folded, no wonder I didn't find it. Thank you.

taskScheduler.submitTasks(finalAttempt)
val finalTsm = taskScheduler.taskSetManagerForAttempt(0, 2).get
val offers = (0 until 5).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
val finalAttemptLaunchedPartitions = taskScheduler.resourceOffers(offers).flatten.map { task =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yet, launched tasks has nothing to do with other running tasks in other TaskSets. But, is it possible to take those running tasks into consideration when launch a new task (in source code) ? For example, launching FetchFailed task or tasks who do not have a running copy across TaskSets firstly ?

(But, it seems we will always have running copies in other TaskSets for our final TaskSet, except FetchFailed task, right? It's more like we are not talking about resubmitting a stage, but resubmitting tasks who do not have running copies across previous TaskSets.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've previously debated about what to do with the tasks still running in a zombie attempt, and there hasn't been any definitive conclusion. I'm just trying to do a correctness fix here. Briefly, in general there is an expectation that those tasks are unlikely to succeed (because they won't be able to get their shuffle input, same as the original fetch failure), so we don't want to delay starting a new attempt of that task. And perhaps we should even actively kill those tasks (you'll see comments about that in various places). But if they do succeed, we need to handle them correctly. Note that even if we did try to actively kill them, you'd still need to handle a late-completion, as killing would only be "best-effort".

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because they won't be able to get their shuffle input, same as the original fetch failure

why? In DAGScheduler, we only unregister one MapStatus of parent stage, so other running tasks within the failed (child) stage (caused by a fetch fail task) may still get MapOutputs from MapOutputTrackerMaster, and fetch data from other Executors. So, they can success normally.
Do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the assumption is that a fetchfailure means that all data on that host is unavailable. As shuffles are all-to-all, its very likely that every task is going to need some piece of data from that host. Its possible that they already grabbed all the data they need, before the problem occurred with the host, we don't know. Also, there is no "partial progress" for a task -- tasks don't know how to grab all the shuffle output they can, then just wait until the missing bit becomes available again. They fail as soon as the data they need is unavailable (with some retries, but there is no "pause" nor a check for data on another source).

Also the dagscheduler is a little confusing on this -- it does the unregister in two parts (I have no idea why anymore, to be honest):

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1391

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1406

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The explanation is quite clear and I get understand now. Thank you very mush! @squito

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice patch only a tiny nit! Thanks for working on this!

// should be active in every taskset. We choose a zombie taskset just to make sure that
// we transition the active taskset correctly even if the final completion comes
// from a zombie.
zombieAttempts(partition % 2)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... I know it's pretty nitpick but since remainingTasks is a set, you can't guarantee the final completion comes from a zombie. It's fine to keep this, or we can finish the partition 0 first instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not a nitpick at all, thanks for catching this! I'll update


// finally, lets complete all the tasks. We simulate failures in attempt 1, but everything
// else succeeds, to make sure we get the right updates to the blacklist in all cases.
(zombieAttempts ++ Seq(finalTsm)).foreach { tsm =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you can reuse the val "allTaskSets".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing that out -- though actually I'm going to go the other direction, I realized allTaskSets is not necessary at all.

val offers = (0 until 10).map{ idx => WorkerOffer(s"exec-$idx", s"host-$idx", 1) }
taskScheduler.resourceOffers(offers)
assert(tsm.runningTasks === 10)
if (stageAttempt < 2) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This condition is not needed as the stageAttempt iterates on Range(0, 1).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, fixed

taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec-1", "host-1", 1))).flatten.isEmpty)

val allTaskSets = zombieAttempts ++ Seq(finalTsm)
val remainingTasks = (0 until 10).toSet.diff(finalAttemptPendingPartitions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I see remainingTasks is always the same as finalAttemptLaunchedPartitions. I am wondering whether it is more readable to use finalAttemptLaunchedPartitions here for initialisation.

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89790 has finished for PR 21131 at commit 707307f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 24, 2018

Test build #89792 has finished for PR 21131 at commit 168fd46.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member

Ngone51 commented Apr 27, 2018

LGTM, and nice UT.

asfgit pushed a commit that referenced this pull request May 3, 2018
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes #21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
ghost pushed a commit to dbtsai/spark that referenced this pull request May 3, 2018
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#21131 from squito/SPARK-23433.
asfgit pushed a commit that referenced this pull request May 3, 2018
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes #21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
@squito
Copy link
Contributor Author

squito commented May 3, 2018

merged to master / 2.3 / 2.2

@squito squito closed this May 3, 2018
csd-jenkins pushed a commit to alteryx/spark that referenced this pull request May 15, 2018
* [SPARK-23816][CORE] Killed tasks should ignore FetchFailures.

SPARK-19276 ensured that FetchFailures do not get swallowed by other
layers of exception handling, but it also meant that a killed task could
look like a fetch failure.  This is particularly a problem with
speculative execution, where we expect to kill tasks as they are reading
shuffle data.  The fix is to ensure that we always check for killed
tasks first.

Added a new unit test which fails before the fix, ran it 1k times to
check for flakiness.  Full suite of tests on jenkins.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#20987 from squito/SPARK-23816.

(cherry picked from commit 10f45bb)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-24007][SQL] EqualNullSafe for FloatType and DoubleType might generate a wrong result by codegen.

`EqualNullSafe` for `FloatType` and `DoubleType` might generate a wrong result by codegen.

```scala
scala> val df = Seq((Some(-1.0d), None), (None, Some(-1.0d))).toDF()
df: org.apache.spark.sql.DataFrame = [_1: double, _2: double]

scala> df.show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+

scala> df.filter("_1 <=> _2").show()
+----+----+
|  _1|  _2|
+----+----+
|-1.0|null|
|null|-1.0|
+----+----+
```

The result should be empty but the result remains two rows.

Added a test.

Author: Takuya UESHIN <ueshin@databricks.com>

Closes apache#21094 from ueshin/issues/SPARK-24007/equalnullsafe.

(cherry picked from commit f09a9e9)
Signed-off-by: gatorsmile <gatorsmile@gmail.com>

* [SPARK-23963][SQL] Properly handle large number of columns in query on text-based Hive table

## What changes were proposed in this pull request?

TableReader would get disproportionately slower as the number of columns in the query increased.

I fixed the way TableReader was looking up metadata for each column in the row. Previously, it had been looking up this data in linked lists, accessing each linked list by an index (column number). Now it looks up this data in arrays, where indexing by column number works better.

## How was this patch tested?

Manual testing
All sbt unit tests
python sql tests

Author: Bruce Robbins <bersprockets@gmail.com>

Closes apache#21043 from bersprockets/tabreadfix.

* [MINOR][DOCS] Fix comments of SQLExecution#withExecutionId

## What changes were proposed in this pull request?
Fix comment. Change `BroadcastHashJoin.broadcastFuture` to `BroadcastExchangeExec.relationFuture`: https://github.com/apache/spark/blob/d28d5732ae205771f1f443b15b10e64dcffb5ff0/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala#L66

## How was this patch tested?
N/A

Author: seancxmao <seancxmao@gmail.com>

Closes apache#21113 from seancxmao/SPARK-13136.

(cherry picked from commit c303b1b)
Signed-off-by: hyukjinkwon <gurwls223@apache.org>

* [SPARK-23941][MESOS] Mesos task failed on specific spark app name

## What changes were proposed in this pull request?
Shell escaped the name passed to spark-submit and change how conf attributes are shell escaped.

## How was this patch tested?
This test has been tested manually with Hive-on-spark with mesos or with the use case described in the issue with the sparkPi application with a custom name which contains illegal shell characters.

With this PR, hive-on-spark on mesos works like a charm with hive 3.0.0-SNAPSHOT.

I state that this contribution is my original work and that I license the work to the project under the project’s open source license

Author: Bounkong Khamphousone <bounkong.khamphousone@ebiznext.com>

Closes apache#21014 from tiboun/fix/SPARK-23941.

(cherry picked from commit 6782359)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-23433][CORE] Late zombie task completions update all tasksets

Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe)
Signed-off-by: Imran Rashid <irashid@cloudera.com>

* [SPARK-23489][SQL][TEST][BRANCH-2.2] HiveExternalCatalogVersionsSuite should verify the downloaded file

## What changes were proposed in this pull request?

This is a backport of apache#21210 because `branch-2.2` also faces the same failures.

Although [SPARK-22654](https://issues.apache.org/jira/browse/SPARK-22654) made `HiveExternalCatalogVersionsSuite` download from Apache mirrors three times, it has been flaky because it didn't verify the downloaded file. Some Apache mirrors terminate the downloading abnormally, the *corrupted* file shows the following errors.

```
gzip: stdin: not in gzip format
tar: Child returned status 1
tar: Error is not recoverable: exiting now
22:46:32.700 WARN org.apache.spark.sql.hive.HiveExternalCatalogVersionsSuite:

===== POSSIBLE THREAD LEAK IN SUITE o.a.s.sql.hive.HiveExternalCatalogVersionsSuite, thread names: Keep-Alive-Timer =====

*** RUN ABORTED ***
  java.io.IOException: Cannot run program "./bin/spark-submit" (in directory "/tmp/test-spark/spark-2.2.0"): error=2, No such file or directory
```

This has been reported weirdly in two ways. For example, the above case is reported as Case 2 `no failures`.

- Case 1. [Test Result (1 failure / +1)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-sbt-hadoop-2.7/4389/)
- Case 2. [Test Result (no failures)](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.6/4811/)

This PR aims to make `HiveExternalCatalogVersionsSuite` more robust by verifying the downloaded `tgz` file by extracting and checking the existence of `bin/spark-submit`. If it turns out that the file is empty or corrupted, `HiveExternalCatalogVersionsSuite` will do retry logic like the download failure.

## How was this patch tested?

Pass the Jenkins.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#21232 from dongjoon-hyun/SPARK-23489-2.

* [SPARK-23697][CORE] LegacyAccumulatorWrapper should define isZero correctly

## What changes were proposed in this pull request?

It's possible that Accumulators of Spark 1.x may no longer work with Spark 2.x. This is because `LegacyAccumulatorWrapper.isZero` may return wrong answer if `AccumulableParam` doesn't define equals/hashCode.

This PR fixes this by using reference equality check in `LegacyAccumulatorWrapper.isZero`.

## How was this patch tested?

a new test

Author: Wenchen Fan <wenchen@databricks.com>

Closes apache#21229 from cloud-fan/accumulator.

(cherry picked from commit 4d5de4d)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>

* [SPARK-21278][PYSPARK] Upgrade to Py4J 0.10.6

This PR aims to bump Py4J in order to fix the following float/double bug.
Py4J 0.10.5 fixes this (py4j/py4j#272) and the latest Py4J is 0.10.6.

**BEFORE**
```
>>> df = spark.range(1)
>>> df.select(df['id'] + 17.133574204226083).show()
+--------------------+
|(id + 17.1335742042)|
+--------------------+
|       17.1335742042|
+--------------------+
```

**AFTER**
```
>>> df = spark.range(1)
>>> df.select(df['id'] + 17.133574204226083).show()
+-------------------------+
|(id + 17.133574204226083)|
+-------------------------+
|       17.133574204226083|
+-------------------------+
```

Manual.

Author: Dongjoon Hyun <dongjoon@apache.org>

Closes apache#18546 from dongjoon-hyun/SPARK-21278.

(cherry picked from commit c8d0aba)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARK-16406][SQL] Improve performance of LogicalPlan.resolve

`LogicalPlan.resolve(...)` uses linear searches to find an attribute matching a name. This is fine in normal cases, but gets problematic when you try to resolve a large number of columns on a plan with a large number of attributes.

This PR adds an indexing structure to `resolve(...)` in order to find potential matches quicker. This PR improves the reference resolution time for the following code by 4x (11.8s -> 2.4s):

``` scala
val n = 4000
val values = (1 to n).map(_.toString).mkString(", ")
val columns = (1 to n).map("column" + _).mkString(", ")
val query =
  s"""
     |SELECT $columns
     |FROM VALUES ($values) T($columns)
     |WHERE 1=2 AND 1 IN ($columns)
     |GROUP BY $columns
     |ORDER BY $columns
     |""".stripMargin

spark.time(sql(query))
```

Existing tests.

Author: Herman van Hovell <hvanhovell@databricks.com>

Closes apache#14083 from hvanhovell/SPARK-16406.

* [PYSPARK] Update py4j to version 0.10.7.

(cherry picked from commit cc613b5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 323dc3a)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* [SPARKR] Match pyspark features in SparkR communication protocol.

(cherry picked from commit 628c7b5)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
(cherry picked from commit 16cd9ac)
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>

* Keep old-style messages for AnalysisException with ambiguous references
curtishoward pushed a commit to twosigma/spark that referenced this pull request Jun 19, 2018
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe)
curtishoward pushed a commit to twosigma/spark that referenced this pull request Jun 19, 2018
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe)
@cloud-fan
Copy link
Contributor

cloud-fan commented Jul 11, 2018

Fetch failure lead to multiple tasksets which are active for a given stage.

How can this happen? the TaskSetManager will mark itself as zombie when it receives a fetch failed.

@squito
Copy link
Contributor Author

squito commented Jul 12, 2018

Fetch failure lead to multiple tasksets which are active for a given stage.
How can this happen? the TaskSetManager will mark itself as zombie when it receives a fetch failed.

We don't have very precise terminology here -- I'm using "active" to mean a taskset which still has running tasks. Even when a taskset is zombie, it will have many previously launched tasks still going.

@cloud-fan
Copy link
Contributor

ah i see. Does it only apply to the result stage? IIRC shuffle stage tracks shuffle epoch and will ignore the tasks from a killed stage.

@cloud-fan
Copy link
Contributor

hmm, will we have a problem for shuffle here? Assuming a shuffle stage has 2 task sets, one is active, one is zombie. Both of them have running tasks.

If a task from zombie task set finishes, it will send a task completion event to DAG scheduler. The event might be ignored later because the epoch is outdated. When the task in the normal task set finishes, it will not send event to DAG scheduler because this task is already marked as finished in this task set. Then the shuffle stage never finishes.

cc @JoshRosen @zsxwing @vanzin

@squito
Copy link
Contributor Author

squito commented Jul 17, 2018

The DAGSCheudler is notified about successfully completed tasks, whether or not the tsm.successful is already true:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala#L745-L768

I don't think there are problems here ... though I agree its confusing and we could have better tests here ...

if (tasksSuccessful == numTasks) {
isZombie = true
}
maybeFinishTaskSet()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this line needed? We will call maybeFinishTaskSet() at the end of handleSuccessfulTask

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you're right, its not needed, its called when the tasks succeed, fail, or are aborted, and when this called while that taskset still has running tasks, then its a no-op, as it would fail the runningTasks == 0 check inside maybeFinishTaskSet().

do you think its worth removing? I'm fine either way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's too minor. If we touch this file again, let's remove it. Otherwise maybe not bother about it.

@cloud-fan
Copy link
Contributor

a late LGTM

MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#21131 from squito/SPARK-23433.

(cherry picked from commit 94641fe)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
pgandhi999 pushed a commit to pgandhi999/spark that referenced this pull request Jan 24, 2019
Reverting redundant method call from PR apache#21131, adding test setup code in test, changing from index to partition id etc.
asfgit pushed a commit that referenced this pull request Mar 6, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for #22806 .

#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue.

This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes #23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <ngone_5451@163.com>
Co-authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 6, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for #22806 .

#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue.

This pr extends #21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes #23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <ngone_5451@163.com>
Co-authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 6, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes #23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 6, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes #23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
asfgit pushed a commit that referenced this pull request Mar 7, 2019
…ould learn about the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for #22806 .

#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen. But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why #22806 hit the issue.

This pr extends #21131 's behavior by adding stageIdToFinishedPartitions into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into stageIdToFinishedPartitions and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes #24007 from Ngone51/dev-23433-25250-branch-2.3.

Authored-by: wuyi <ngone_5451@163.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
cloud-fan added a commit that referenced this pull request Apr 29, 2019
…eption many times

## What changes were proposed in this pull request?

https://issues.apache.org/jira/browse/SPARK-25250 reports a bug that, a task which is failed with `CommitDeniedException` gets retried many times.

This can happen when a stage has 2 task set managers, one is zombie, one is active. A task from the zombie TSM completes, and commits to a central coordinator(assuming it's a file writing task). Then the corresponding task from the active TSM will fail with `CommitDeniedException`. `CommitDeniedException.countTowardsTaskFailures` is false, so the active TSM will keep retrying this task, until the job finishes. This wastes resource a lot.

#21131 firstly implements that a previous successful completed task from zombie `TaskSetManager` could mark the task of the same partition completed in the active `TaskSetManager`. Later #23871 improves the implementation to cover a corner case that, an active `TaskSetManager` hasn't been created when a previous task succeed.

However, #23871 has a bug and was reverted in #24359. With hindsight, #23781 is fragile because we need to sync the states between `DAGScheduler` and `TaskScheduler`, about which partitions are completed.

This PR proposes a new fix:
1. When `DAGScheduler` gets a task success event from an earlier attempt, notify the `TaskSchedulerImpl` about it
2. When `TaskSchedulerImpl` knows a partition is already completed, ask the active `TaskSetManager` to mark the corresponding task as finished, if the task is not finished yet.

This fix covers the corner case, because:
1. If `DAGScheduler` gets the task completion event from zombie TSM before submitting the new stage attempt, then `DAGScheduler` knows that this partition is completed, and it will exclude this partition when creating task set for the new stage attempt. See `DAGScheduler.submitMissingTasks`
2. If `DAGScheduler` gets the task completion event from zombie TSM after submitting the new stage attempt, then the active TSM is already created.

Compared to the previous fix, the message loop becomes longer, so it's likely that, the active task set manager has already retried the task multiple times. But this failure window won't be too big, and we want to avoid the worse case that retries the task many times until the job finishes. So this solution is acceptable.

## How was this patch tested?

a new test case.

Closes #24375 from cloud-fan/fix2.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for apache#22806 .

apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue.

This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes apache#23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <ngone_5451@163.com>
Co-authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 23, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for apache#22806 .

apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue.

This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes apache#23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <ngone_5451@163.com>
Co-authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Jul 25, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…bout the finished partitions

## What changes were proposed in this pull request?

This is an optional solution for apache#22806 .

apache#21131 firstly implement that a previous successful completed task from zombie TaskSetManager could also succeed the active TaskSetManager, which based on an assumption that an active TaskSetManager always exists for that stage when this happen.  But that's not always true as an active TaskSetManager may haven't been created when a previous task succeed, and this is the reason why apache#22806 hit the issue.

This pr extends apache#21131 's behavior by adding `stageIdToFinishedPartitions` into TaskSchedulerImpl, which recording the finished partition whenever a task(from zombie or active) succeed. Thus, a later created active TaskSetManager could also learn about the finished partition by looking into `stageIdToFinishedPartitions ` and won't launch any duplicate tasks.

## How was this patch tested?

Add.

Closes apache#23871 from Ngone51/dev-23433-25250.

Lead-authored-by: wuyi <ngone_5451@163.com>
Co-authored-by: Ngone51 <ngone_5451@163.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit e5c6143)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Aug 1, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
zhongjinhan pushed a commit to zhongjinhan/spark-1 that referenced this pull request Sep 3, 2019
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache/spark#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache/spark#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, #21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

#22806 and #23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, #21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). #22806 and #23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes #23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit 7df5aa6)
sumwale pushed a commit to TIBCOSoftware/snappy-spark that referenced this pull request Jun 27, 2021
…a stage

## What changes were proposed in this pull request?

This is another attempt to fix the more-than-one-active-task-set-managers bug.

apache#17208 is the first attempt. It marks the TSM as zombie before sending a task completion event to DAGScheduler. This is necessary, because when the DAGScheduler gets the task completion event, and it's for the last partition, then the stage is finished. However, if it's a shuffle stage and it has missing map outputs, DAGScheduler will resubmit it(see the [code](https://github.com/apache/spark/blob/v2.4.0/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1416-L1422)) and create a new TSM for this stage. This leads to more than one active TSM of a stage and fail.

This fix has a hole: Let's say a stage has 10 partitions and 2 task set managers: TSM1(zombie) and TSM2(active). TSM1 has a running task for partition 10 and it completes. TSM2 finishes tasks for partitions 1-9, and thinks he is still active because he hasn't finished partition 10 yet. However, DAGScheduler gets task completion events for all the 10 partitions and thinks the stage is finished. Then the same problem occurs: DAGScheduler may resubmit the stage and cause more than one actice TSM error.

apache#21131 fixed this hole by notifying all the task set managers when a task finishes. For the above case, TSM2 will know that partition 10 is already completed, so he can mark himself as zombie after partitions 1-9 are completed.

However, apache#21131 still has a hole: TSM2 may be created after the task from TSM1 is completed. Then TSM2 can't get notified about the task completion, and leads to the more than one active TSM error.

apache#22806 and apache#23871 are created to fix this hole. However the fix is complicated and there are still ongoing discussions.

This PR proposes a simple fix, which can be easy to backport: mark all existing task set managers as zombie when trying to create a new task set manager.

After this PR, apache#21131 is still necessary, to avoid launching unnecessary tasks and fix [SPARK-25250](https://issues.apache.org/jira/browse/SPARK-25250 ). apache#22806 and apache#23871 are its followups to fix the hole.

## How was this patch tested?

existing tests.

Closes apache#23927 from cloud-fan/scheduler.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
(cherry picked from commit cb20fbc)
Signed-off-by: Imran Rashid <irashid@cloudera.com>
otterc pushed a commit to linkedin/spark that referenced this pull request Mar 22, 2023
Ref: LIHADOOP-52383

Fetch failure lead to multiple tasksets which are active for a given
stage.  While there is only one "active" version of the taskset, the
earlier attempts can still have running tasks, which can complete
successfully.  So a task completion needs to update every taskset
so that it knows the partition is completed.  That way the final active
taskset does not try to submit another task for the same partition,
and so that it knows when it is completed and when it should be
marked as a "zombie".

Added a regression test.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#21131 from squito/SPARK-23433.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants