Skip to content

Commit

Permalink
[SPARK-48543][SS] Track state row validation failures using explicit …
Browse files Browse the repository at this point in the history
…error class

### What changes were proposed in this pull request?
Track state row validation failures using explicit error class

### Why are the changes needed?
We want to track these exceptions explicitly since they could be indicative of underlying corruptions/data loss scenarios.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existing unit tests

```
13:06:32.803 INFO org.apache.spark.util.ShutdownHookManager: Deleting directory /Users/anish.shrigondekar/spark/spark/target/tmp/spark-6d90d3f3-0f37-48b8-8506-a8cdee3d25d7
[info] Run completed in 9 seconds, 861 milliseconds.
[info] Total number of tests run: 4
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 4, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
```

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46885 from anishshri-db/task/SPARK-48543.

Authored-by: Anish Shrigondekar <anish.shrigondekar@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
anishshri-db authored and HeartSaVioR committed Jun 13, 2024
1 parent b8c7aee commit bdcb79f
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 17 deletions.
24 changes: 24 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3735,6 +3735,18 @@
],
"sqlState" : "42K06"
},
"STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE" : {
"message" : [
"The streaming query failed to validate written state for key row.",
"The following reasons may cause this:",
"1. An old Spark version wrote the checkpoint that is incompatible with the current one",
"2. Corrupt checkpoint files",
"3. The query changed in an incompatible way between restarts",
"For the first case, use a new checkpoint directory or use the original Spark version",
"to process the streaming state. Retrieved error_message=<errorMsg>"
],
"sqlState" : "XX000"
},
"STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" : {
"message" : [
"Provided key schema does not match existing state key schema.",
Expand Down Expand Up @@ -3769,6 +3781,18 @@
],
"sqlState" : "42802"
},
"STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE" : {
"message" : [
"The streaming query failed to validate written state for value row.",
"The following reasons may cause this:",
"1. An old Spark version wrote the checkpoint that is incompatible with the current one",
"2. Corrupt checkpoint files",
"3. The query changed in an incompatible way between restarts",
"For the first case, use a new checkpoint directory or use the original Spark version",
"to process the streaming state. Retrieved error_message=<errorMsg>"
],
"sqlState" : "XX000"
},
"STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE" : {
"message" : [
"Provided value schema does not match existing state value schema.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,6 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}

/**
* An exception thrown when an invalid UnsafeRow is detected in state store.
*/
class InvalidUnsafeRowException(error: String)
extends RuntimeException("The streaming query failed by state format invalidation. " +
"The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " +
"incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " +
"among restart. For the first case, you can try to restart the application without " +
s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null)

sealed trait KeyStateEncoderSpec

case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends KeyStateEncoderSpec
Expand Down Expand Up @@ -434,12 +424,16 @@ object StateStoreProvider {
conf: StateStoreConf): Unit = {
if (conf.formatValidationEnabled) {
val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
validationError.foreach { error =>
throw StateStoreErrors.keyRowFormatValidationFailure(error)
}

if (conf.formatValidationCheckValue) {
val validationError =
UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema)
validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
validationError.foreach { error =>
throw StateStoreErrors.valueRowFormatValidationFailure(error)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.execution.streaming.state

import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException}

/**
* Object for grouping error messages from (most) exceptions thrown from State API V2
Expand All @@ -39,6 +39,16 @@ object StateStoreErrors {
)
}

def keyRowFormatValidationFailure(errorMsg: String):
StateStoreKeyRowFormatValidationFailure = {
new StateStoreKeyRowFormatValidationFailure(errorMsg)
}

def valueRowFormatValidationFailure(errorMsg: String):
StateStoreValueRowFormatValidationFailure = {
new StateStoreValueRowFormatValidationFailure(errorMsg)
}

def unsupportedOperationOnMissingColumnFamily(operationName: String, colFamilyName: String):
StateStoreUnsupportedOperationOnMissingColumnFamily = {
new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName, colFamilyName)
Expand Down Expand Up @@ -245,3 +255,12 @@ class StateStoreValueSchemaNotCompatible(
"storedValueSchema" -> storedValueSchema,
"newValueSchema" -> newValueSchema))

class StateStoreKeyRowFormatValidationFailure(errorMsg: String)
extends SparkRuntimeException(
errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE",
messageParameters = Map("errorMsg" -> errorMsg))

class StateStoreValueRowFormatValidationFailure(errorMsg: String)
extends SparkRuntimeException(
errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE",
messageParameters = Map("errorMsg" -> errorMsg))
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProj
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef
import org.apache.spark.sql.execution.streaming.state.StateStoreValueRowFormatValidationFailure
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
Expand Down Expand Up @@ -1606,12 +1607,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
// By default, when there is an invalid pair of value row and value schema, it should throw
val keyRow = dataToKeyRow("key", 1)
val valueRow = dataToValueRow(2)
val e = intercept[InvalidUnsafeRowException] {
val e = intercept[StateStoreValueRowFormatValidationFailure] {
// Here valueRow doesn't match with prefixKeySchema
StateStoreProvider.validateStateRowFormat(
keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf())
}
assert(e.getMessage.contains("The streaming query failed by state format invalidation"))
assert(e.getMessage.contains("The streaming query failed to validate written state"))

// When sqlConf.stateStoreFormatValidationEnabled is set to false and
// StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.InvalidUnsafeRowException
import org.apache.spark.sql.execution.streaming.state.{StateStoreKeyRowFormatValidationFailure, StateStoreValueRowFormatValidationFailure}
import org.apache.spark.sql.functions._
import org.apache.spark.tags.SlowSQLTest
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -254,7 +254,8 @@ class StreamingStateStoreFormatCompatibilitySuite extends StreamTest {
private def findStateSchemaException(exc: Throwable): Boolean = {
exc match {
case _: SparkUnsupportedOperationException => true
case _: InvalidUnsafeRowException => true
case _: StateStoreKeyRowFormatValidationFailure => true
case _: StateStoreValueRowFormatValidationFailure => true
case e1 if e1.getCause != null => findStateSchemaException(e1.getCause)
case _ => false
}
Expand Down

0 comments on commit bdcb79f

Please sign in to comment.