Skip to content

Commit

Permalink
[SPARK-6621][Core] Fix the bug that calling EventLoop.stop in EventLo…
Browse files Browse the repository at this point in the history
…op.onReceive/onError/onStart doesn't call onStop

Author: zsxwing <zsxwing@gmail.com>

Closes #5280 from zsxwing/SPARK-6621 and squashes the following commits:

521125e [zsxwing] Fix the bug that calling EventLoop.stop in EventLoop.onReceive and EventLoop.onError doesn't call onStop
  • Loading branch information
zsxwing authored and JoshRosen committed Apr 3, 2015
1 parent 6e1c1ec commit 440ea31
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
18 changes: 15 additions & 3 deletions core/src/main/scala/org/apache/spark/util/EventLoop.scala
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,21 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging {
def stop(): Unit = {
if (stopped.compareAndSet(false, true)) {
eventThread.interrupt()
eventThread.join()
// Call onStop after the event thread exits to make sure onReceive happens before onStop
onStop()
var onStopCalled = false
try {
eventThread.join()
// Call onStop after the event thread exits to make sure onReceive happens before onStop
onStopCalled = true
onStop()
} catch {
case ie: InterruptedException =>
Thread.currentThread().interrupt()
if (!onStopCalled) {
// ie is thrown from `eventThread.join()`. Otherwise, we should not call `onStop` since
// it's already called.
onStop()
}
}
} else {
// Keep quiet to allow calling `stop` multiple times.
}
Expand Down
72 changes: 72 additions & 0 deletions core/src/test/scala/org/apache/spark/util/EventLoopSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -203,4 +203,76 @@ class EventLoopSuite extends FunSuite with Timeouts {
assert(!eventLoop.isActive)
}
}

test("EventLoop: stop() in onStart should call onStop") {
@volatile var onStopCalled: Boolean = false
val eventLoop = new EventLoop[Int]("test") {

override def onStart(): Unit = {
stop()
}

override def onReceive(event: Int): Unit = {
}

override def onError(e: Throwable): Unit = {
}

override def onStop(): Unit = {
onStopCalled = true
}
}
eventLoop.start()
eventually(timeout(5 seconds), interval(5 millis)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
}

test("EventLoop: stop() in onReceive should call onStop") {
@volatile var onStopCalled: Boolean = false
val eventLoop = new EventLoop[Int]("test") {

override def onReceive(event: Int): Unit = {
stop()
}

override def onError(e: Throwable): Unit = {
}

override def onStop(): Unit = {
onStopCalled = true
}
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
}

test("EventLoop: stop() in onError should call onStop") {
@volatile var onStopCalled: Boolean = false
val eventLoop = new EventLoop[Int]("test") {

override def onReceive(event: Int): Unit = {
throw new RuntimeException("Oops")
}

override def onError(e: Throwable): Unit = {
stop()
}

override def onStop(): Unit = {
onStopCalled = true
}
}
eventLoop.start()
eventLoop.post(1)
eventually(timeout(5 seconds), interval(5 millis)) {
assert(!eventLoop.isActive)
}
assert(onStopCalled)
}
}

0 comments on commit 440ea31

Please sign in to comment.