From c2ce5661d3d87ebff4e6d247efc672064d756b7a Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 25 Feb 2020 12:28:31 +0800 Subject: [PATCH] [SPARK-30936][CORE] Set FAIL_ON_UNKNOWN_PROPERTIES to false by default to parse Spark events ### What changes were proposed in this pull request? Set `FAIL_ON_UNKNOWN_PROPERTIES` to `false` in `JsonProtocol` to allow ignore unknown fields in a Spark event. After this change, if we add new fields to a Spark event parsed by `ObjectMapper`, the event json string generated by a new Spark version can still be read by an old Spark History Server. Since Spark History Server is an extra service, it usually takes time to upgrade, and it's possible that a Spark application is upgraded before SHS. Forwards-compatibility will allow an old SHS to support new Spark applications (may lose some new features but most of functions should still work). ### Why are the changes needed? `JsonProtocol` is supposed to provide strong backwards-compatibility and forwards-compatibility guarantees: any version of Spark should be able to read JSON output written by any other version, including newer versions. However, the forwards-compatibility guarantee is broken for events parsed by `ObjectMapper`. If a new field is added to an event parsed by `ObjectMapper` (e.g., https://github.com/apache/spark/commit/6dc5921e66d56885b95c07e56e687f9f6c1eaca7#diff-dc5c7a41fbb7479cef48b67eb41ad254R33), the event json string generated by a new Spark version cannot be parsed by an old version of SHS right now. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? The new added tests. Closes #27680 from zsxwing/SPARK-30936. Authored-by: Shixiong Zhu Signed-off-by: Wenchen Fan --- .../org/apache/spark/util/JsonProtocol.scala | 3 +- .../apache/spark/util/JsonProtocolSuite.scala | 24 +++++++++++++++ .../StreamingQueryListenerSuite.scala | 29 ++++++++++--------- 3 files changed, 41 insertions(+), 15 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 53824735d2fc5..9254ac94005f1 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -22,7 +22,7 @@ import java.util.{Properties, UUID} import scala.collection.JavaConverters._ import scala.collection.Map -import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.json4s.DefaultFormats import org.json4s.JsonAST._ @@ -59,6 +59,7 @@ private[spark] object JsonProtocol { private implicit val format = DefaultFormats private val mapper = new ObjectMapper().registerModule(DefaultScalaModule) + .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) /** ------------------------------------------------- * * JSON serialization methods for SparkListenerEvents | diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index edc0662a0f73e..eb7f3079bee36 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -483,6 +483,28 @@ class JsonProtocolSuite extends SparkFunSuite { testAccumValue(Some("anything"), blocks, JString(blocks.toString)) testAccumValue(Some("anything"), 123, JString("123")) } + + test("SPARK-30936: forwards compatibility - ignore unknown fields") { + val expected = TestListenerEvent("foo", 123) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo", + | "bar" : 123, + | "unknown" : "unknown" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } + + test("SPARK-30936: backwards compatibility - set default values for missing fields") { + val expected = TestListenerEvent("foo", 0) + val unknownFieldsJson = + """{ + | "Event" : "org.apache.spark.util.TestListenerEvent", + | "foo" : "foo" + |}""".stripMargin + assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected) + } } @@ -2313,3 +2335,5 @@ private[spark] object JsonProtocolSuite extends Assertions { |} """.stripMargin } + +case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9d0f829ac9684..6bb1646becf22 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") { // query-event-logs-version-2.0.0.txt has all types of events generated by - // Structured Streaming in Spark 2.0.0. + // Structured Streaming in Spark 2.0.0. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt") + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1) } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") { // query-event-logs-version-2.0.1.txt has all types of events generated by - // Structured Streaming in Spark 2.0.1. + // Structured Streaming in Spark 2.0.1. Because we renamed the classes, // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt") + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1) } - testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") { + testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") { // query-event-logs-version-2.0.2.txt has all types of events generated by - // Structured Streaming in Spark 2.0.2. - // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it - // to verify that we can skip broken jsons generated by Structured Streaming. - testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt") + // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events + // in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2. + testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5) } test("listener propagates observable metrics") { @@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } } - private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = { + private def testReplayListenerBusWithBorkenEventJsons( + fileName: String, + expectedEventSize: Int): Unit = { val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName") val events = mutable.ArrayBuffer[SparkListenerEvent]() try { @@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { replayer.addListener(new SparkListener {}) replayer.replay(input, fileName) // SparkListenerApplicationEnd is the only valid event - assert(events.size === 1) - assert(events(0).isInstanceOf[SparkListenerApplicationEnd]) + assert(events.size === expectedEventSize) + assert(events.last.isInstanceOf[SparkListenerApplicationEnd]) } finally { input.close() }