Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-30936][Core]Set FAIL_ON_UNKNOWN_PROPERTIES to false by default to parse Spark events #27680

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{Properties, UUID}
import scala.collection.JavaConverters._
import scala.collection.Map

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonAST._
Expand Down Expand Up @@ -59,6 +59,7 @@ private[spark] object JsonProtocol {
private implicit val format = DefaultFormats

private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,28 @@ class JsonProtocolSuite extends SparkFunSuite {
testAccumValue(Some("anything"), blocks, JString(blocks.toString))
testAccumValue(Some("anything"), 123, JString("123"))
}

test("SPARK-30936: forwards compatibility - ignore unknown fields") {
val expected = TestListenerEvent("foo", 123)
val unknownFieldsJson =
"""{
| "Event" : "org.apache.spark.util.TestListenerEvent",
| "foo" : "foo",
| "bar" : 123,
| "unknown" : "unknown"
|}""".stripMargin
assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
}

test("SPARK-30936: backwards compatibility - set default values for missing fields") {
val expected = TestListenerEvent("foo", 0)
val unknownFieldsJson =
"""{
| "Event" : "org.apache.spark.util.TestListenerEvent",
| "foo" : "foo"
|}""".stripMargin
assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
}
}


Expand Down Expand Up @@ -2313,3 +2335,5 @@ private[spark] object JsonProtocolSuite extends Assertions {
|}
""".stripMargin
}

case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent
Original file line number Diff line number Diff line change
Expand Up @@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed test names to remove . so that we can get a better test report.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes .. this is sad. It's actually a bug in SBT, see #25630.

// query-event-logs-version-2.0.0.txt has all types of events generated by
// Structured Streaming in Spark 2.0.0.
// Structured Streaming in Spark 2.0.0. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
// query-event-logs-version-2.0.1.txt has all types of events generated by
// Structured Streaming in Spark 2.0.1.
// Structured Streaming in Spark 2.0.1. Because we renamed the classes,
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
}

testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") {
testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
// query-event-logs-version-2.0.2.txt has all types of events generated by
// Structured Streaming in Spark 2.0.2.
// SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
// to verify that we can skip broken jsons generated by Structured Streaming.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt")
// Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events
// in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2.
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
}

test("listener propagates observable metrics") {
Expand Down Expand Up @@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
}
}

private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
private def testReplayListenerBusWithBorkenEventJsons(
fileName: String,
expectedEventSize: Int): Unit = {
val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
val events = mutable.ArrayBuffer[SparkListenerEvent]()
try {
Expand All @@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
replayer.addListener(new SparkListener {})
replayer.replay(input, fileName)
// SparkListenerApplicationEnd is the only valid event
assert(events.size === 1)
assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
assert(events.size === expectedEventSize)
assert(events.last.isInstanceOf[SparkListenerApplicationEnd])
} finally {
input.close()
}
Expand Down