Skip to content

Commit

Permalink
[SPARK-5363] Fix bug in PythonRDD: remove() inside iterator is not safe
Browse files Browse the repository at this point in the history
Removing elements from a mutable HashSet while iterating over it can cause the
iteration to incorrectly skip over entries that were not removed. If this
happened, PythonRDD would write fewer broadcast variables than the Python
worker was expecting to read, which would cause the Python worker to hang
indefinitely.

Author: Davies Liu <davies@databricks.com>

Closes #4776 from davies/fix_hang and squashes the following commits:

a4384a5 [Davies Liu] fix bug: remvoe() inside iterator is not safe
  • Loading branch information
Davies Liu authored and JoshRosen committed Feb 26, 2015
1 parent cfff397 commit 7fa960e
Showing 1 changed file with 6 additions and 7 deletions.
13 changes: 6 additions & 7 deletions core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -219,14 +219,13 @@ private[spark] class PythonRDD(
val oldBids = PythonRDD.getWorkerBroadcasts(worker)
val newBids = broadcastVars.map(_.id).toSet
// number of different broadcasts
val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
val toRemove = oldBids.diff(newBids)
val cnt = toRemove.size + newBids.diff(oldBids).size
dataOut.writeInt(cnt)
for (bid <- oldBids) {
if (!newBids.contains(bid)) {
// remove the broadcast from worker
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
for (bid <- toRemove) {
// remove the broadcast from worker
dataOut.writeLong(- bid - 1) // bid >= 0
oldBids.remove(bid)
}
for (broadcast <- broadcastVars) {
if (!oldBids.contains(broadcast.id)) {
Expand Down

0 comments on commit 7fa960e

Please sign in to comment.