Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4433] fix a racing condition in zipWithIndex #3291

Closed
wants to merge 1 commit into from

Conversation

mengxr
Copy link
Contributor

@mengxr mengxr commented Nov 16, 2014

Spark hangs with the following code:

sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute startIndices during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

@pwendell

@SparkQA
Copy link

SparkQA commented Nov 16, 2014

Test build #23433 has started for PR 3291 at commit c284d9f.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Nov 16, 2014

Test build #23433 has finished for PR 3291 at commit c284d9f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/23433/
Test PASSed.

@rxin
Copy link
Contributor

rxin commented Nov 16, 2014

This makes the whole thing eager, isn't it?

@mengxr
Copy link
Contributor Author

mengxr commented Nov 17, 2014

One way to do this lazily is via shuffle:

      val identityPartitioner = new Partitioner {
        override def numPartitions: Int = p
        override def getPartition(key: Any): Int = key.asInstanceOf[Int]
      }
      val startIndices = PartitionPruningRDD.create(this, _ < p - 1) // skip the last partition
        .mapPartitionsWithIndex { (split, iter) =>
          val size = Utils.getIteratorSize(iter)
          Iterator.range(split + 1, p).map { i =>
            (i, size)
          }
        }.reduceByKey(identityPartitioner, _ + _)
        .values
      this.zipPartitions(startIndices) { (iter, startIndexIter) =>
        val startIndex = if (startIndexIter.hasNext) startIndexIter.next() else 0L
        iter.zipWithIndex.map { case (item, localIndex) =>
          (item, startIndex + localIndex)
        }
      }

But I think this is more expensive.

@mengxr
Copy link
Contributor Author

mengxr commented Nov 18, 2014

@rxin Does it look good to you? I hope that this fix can get into 1.1.1.

@rxin
Copy link
Contributor

rxin commented Nov 18, 2014

lgtm

@mengxr
Copy link
Contributor Author

mengxr commented Nov 19, 2014

Thanks! Merged into master, branch-1.2, 1.1, and 1.0.

davies pushed a commit to davies/spark that referenced this pull request Nov 19, 2014
Spark hangs with the following code:

~~~
sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()
~~~

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

pwendell

Author: Xiangrui Meng <meng@databricks.com>

Closes apache#3291 from mengxr/SPARK-4433 and squashes the following commits:

c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex
guavuslabs-builder pushed a commit to ThalesGroup/spark that referenced this pull request Nov 19, 2014
Spark hangs with the following code:

~~~
sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()
~~~

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

pwendell

Author: Xiangrui Meng <meng@databricks.com>

Closes apache#3291 from mengxr/SPARK-4433 and squashes the following commits:

c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex

(cherry picked from commit bb46046)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
mengxr added a commit that referenced this pull request Nov 19, 2014
Spark hangs with the following code:

~~~
sc.parallelize(1 to 10).zipWithIndex.repartition(10).count()
~~~

This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction.

This should be applied to branch-1.0, branch-1.1, and branch-1.2.

pwendell

Author: Xiangrui Meng <meng@databricks.com>

Closes #3291 from mengxr/SPARK-4433 and squashes the following commits:

c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex

(cherry picked from commit bb46046)
Signed-off-by: Xiangrui Meng <meng@databricks.com>
@andrewor14
Copy link
Contributor

Hey @mengxr can you close this

@mengxr mengxr closed this Nov 20, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants