-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-4433] fix a racing condition in zipWithIndex #3291
Conversation
Test build #23433 has started for PR 3291 at commit
|
Test build #23433 has finished for PR 3291 at commit
|
Test PASSed. |
This makes the whole thing eager, isn't it? |
One way to do this lazily is via shuffle: val identityPartitioner = new Partitioner {
override def numPartitions: Int = p
override def getPartition(key: Any): Int = key.asInstanceOf[Int]
}
val startIndices = PartitionPruningRDD.create(this, _ < p - 1) // skip the last partition
.mapPartitionsWithIndex { (split, iter) =>
val size = Utils.getIteratorSize(iter)
Iterator.range(split + 1, p).map { i =>
(i, size)
}
}.reduceByKey(identityPartitioner, _ + _)
.values
this.zipPartitions(startIndices) { (iter, startIndexIter) =>
val startIndex = if (startIndexIter.hasNext) startIndexIter.next() else 0L
iter.zipWithIndex.map { case (item, localIndex) =>
(item, startIndex + localIndex)
}
} But I think this is more expensive. |
@rxin Does it look good to you? I hope that this fix can get into 1.1.1. |
lgtm |
Thanks! Merged into master, branch-1.2, 1.1, and 1.0. |
Spark hangs with the following code: ~~~ sc.parallelize(1 to 10).zipWithIndex.repartition(10).count() ~~~ This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction. This should be applied to branch-1.0, branch-1.1, and branch-1.2. pwendell Author: Xiangrui Meng <meng@databricks.com> Closes apache#3291 from mengxr/SPARK-4433 and squashes the following commits: c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex
Spark hangs with the following code: ~~~ sc.parallelize(1 to 10).zipWithIndex.repartition(10).count() ~~~ This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction. This should be applied to branch-1.0, branch-1.1, and branch-1.2. pwendell Author: Xiangrui Meng <meng@databricks.com> Closes apache#3291 from mengxr/SPARK-4433 and squashes the following commits: c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex (cherry picked from commit bb46046) Signed-off-by: Xiangrui Meng <meng@databricks.com>
Spark hangs with the following code: ~~~ sc.parallelize(1 to 10).zipWithIndex.repartition(10).count() ~~~ This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute `startIndices` during construction. This should be applied to branch-1.0, branch-1.1, and branch-1.2. pwendell Author: Xiangrui Meng <meng@databricks.com> Closes #3291 from mengxr/SPARK-4433 and squashes the following commits: c284d9f [Xiangrui Meng] fix a racing condition in zipWithIndex (cherry picked from commit bb46046) Signed-off-by: Xiangrui Meng <meng@databricks.com>
Hey @mengxr can you close this |
Spark hangs with the following code:
This is because ZippedWithIndexRDD triggers a job in getPartitions and it causes a deadlock in DAGScheduler.getPreferredLocs (synced). The fix is to compute
startIndices
during construction.This should be applied to branch-1.0, branch-1.1, and branch-1.2.
@pwendell