Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-48543][SS] Track state row validation failures using explicit error class #46885

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,12 @@
],
"sqlState" : "XX000"
},
"INTERNAL_ERROR_STATE_STORE" : {
"message" : [
"<message>"
],
"sqlState" : "XX000"
},
"INTERNAL_ERROR_STORAGE" : {
"message" : [
"<message>"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,16 +279,6 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat
SQLMetrics.createTimingMetric(sparkContext, desc)
}

/**
* An exception thrown when an invalid UnsafeRow is detected in state store.
*/
class InvalidUnsafeRowException(error: String)
extends RuntimeException("The streaming query failed by state format invalidation. " +
"The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " +
"incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " +
"among restart. For the first case, you can try to restart the application without " +
s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null)

sealed trait KeyStateEncoderSpec

case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends KeyStateEncoderSpec
Expand Down Expand Up @@ -434,12 +424,16 @@ object StateStoreProvider {
conf: StateStoreConf): Unit = {
if (conf.formatValidationEnabled) {
val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema)
validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
validationError.foreach { error =>
throw StateStoreErrors.stateFormatValidationFailure(error)
}

if (conf.formatValidationCheckValue) {
val validationError =
UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema)
validationError.foreach { error => throw new InvalidUnsafeRowException(error) }
validationError.foreach { error =>
throw StateStoreErrors.stateFormatValidationFailure(error)
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,19 @@ object StateStoreErrors {
)
}

def stateFormatValidationFailure(errorMsg: String): SparkException = {
SparkException.internalError(
msg = "The streaming query failed to validate written state. " +
"The following reasons may cause this: " +
"1. An old Spark version wrote the checkpoint that is incompatible with the current one; " +
"2. Corrupt checkpoint files; " +
"3. The query changed in an incompatible way between restarts. " +
"For the first case, use a new checkpoint directory or use the original Spark version" +
s"to process the streaming state. Retrieved error_message=$errorMsg",
category = "STATE_STORE"
)
}

def unsupportedOperationOnMissingColumnFamily(operationName: String, colFamilyName: String):
StateStoreUnsupportedOperationOnMissingColumnFamily = {
new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName, colFamilyName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1606,12 +1606,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider]
// By default, when there is an invalid pair of value row and value schema, it should throw
val keyRow = dataToKeyRow("key", 1)
val valueRow = dataToValueRow(2)
val e = intercept[InvalidUnsafeRowException] {
val e = intercept[SparkException] {
// Here valueRow doesn't match with prefixKeySchema
StateStoreProvider.validateStateRowFormat(
keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf())
}
assert(e.getMessage.contains("The streaming query failed by state format invalidation"))
assert(e.getMessage.contains("The streaming query failed to validate written state"))

// When sqlConf.stateStoreFormatValidationEnabled is set to false and
// StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException}
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete
import org.apache.spark.sql.execution.streaming.MemoryStream
import org.apache.spark.sql.execution.streaming.state.InvalidUnsafeRowException
import org.apache.spark.sql.functions._
import org.apache.spark.tags.SlowSQLTest
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -254,7 +253,7 @@ class StreamingStateStoreFormatCompatibilitySuite extends StreamTest {
private def findStateSchemaException(exc: Throwable): Boolean = {
exc match {
case _: SparkUnsupportedOperationException => true
case _: InvalidUnsafeRowException => true
case _: SparkException => true
anishshri-db marked this conversation as resolved.
Show resolved Hide resolved
case e1 if e1.getCause != null => findStateSchemaException(e1.getCause)
case _ => false
}
Expand Down