diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index e79613749f0ce..ec9bb5a338eff 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -22,6 +22,7 @@ import java.net.URI import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.fs.Path +import org.json4s.DefaultFormats import org.json4s.JsonAST.JValue import org.json4s.jackson.JsonMethods._ import org.scalatest.BeforeAndAfter @@ -218,8 +219,17 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp // Verify the same events are replayed in the same order assert(sc.eventLogger.isDefined) - val originalEvents = sc.eventLogger.get.loggedEvents - val replayedEvents = eventMonster.loggedEvents + implicit val format = DefaultFormats + def exceptCase(a: JValue) = ( a \ "Event").extract[String] match { + // If we are logging stage executor metrics, there is a bulk call to logEvent with + // SparkListenerStageExecutorMetrics events via a Map.foreach. The Map.foreach bulk + // operation may not log the events with the same order. So here we should not compare + // SparkListenerStageExecutorMetrics here. + case "SparkListenerStageExecutorMetrics" => false + case _ => true + } + val originalEvents = sc.eventLogger.get.loggedEvents.filter(exceptCase(_)) + val replayedEvents = eventMonster.loggedEvents.filter(exceptCase(_)) originalEvents.zip(replayedEvents).foreach { case (e1, e2) => // Don't compare the JSON here because accumulators in StageInfo may be out of order JsonProtocolSuite.assertEquals(