From a9864ab18cad3649cc13058dc7dee40f8f984a91 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 5 Jun 2024 13:12:28 -0700 Subject: [PATCH 1/6] [SPARK-48543] Track state row validation failures using explicit error class --- .../main/resources/error/error-conditions.json | 6 ++++++ .../execution/streaming/state/StateStore.scala | 18 ++++++------------ .../streaming/state/StateStoreErrors.scala | 13 +++++++++++++ .../streaming/state/StateStoreSuite.scala | 4 ++-- ...ingStateStoreFormatCompatibilitySuite.scala | 4 ++-- 5 files changed, 29 insertions(+), 16 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index 5bab14e3eebf7..6f71e14c2988e 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1802,6 +1802,12 @@ ], "sqlState" : "XX000" }, + "INTERNAL_ERROR_STATE_STORE" : { + "message" : [ + "" + ], + "sqlState" : "XX000" + }, "INTERNAL_ERROR_STORAGE" : { "message" : [ "" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index b59fe65fb14a7..8d847fee1cacb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -279,16 +279,6 @@ case class StateStoreCustomTimingMetric(name: String, desc: String) extends Stat SQLMetrics.createTimingMetric(sparkContext, desc) } -/** - * An exception thrown when an invalid UnsafeRow is detected in state store. - */ -class InvalidUnsafeRowException(error: String) - extends RuntimeException("The streaming query failed by state format invalidation. " + - "The following reasons may cause this: 1. An old Spark version wrote the checkpoint that is " + - "incompatible with the current one; 2. Broken checkpoint files; 3. The query is changed " + - "among restart. For the first case, you can try to restart the application without " + - s"checkpoint or use the legacy Spark version to process the streaming state.\n$error", null) - sealed trait KeyStateEncoderSpec case class NoPrefixKeyStateEncoderSpec(keySchema: StructType) extends KeyStateEncoderSpec @@ -434,12 +424,16 @@ object StateStoreProvider { conf: StateStoreConf): Unit = { if (conf.formatValidationEnabled) { val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema) - validationError.foreach { error => throw new InvalidUnsafeRowException(error) } + validationError.foreach { error => + throw StateStoreErrors.stateFormatValidationFailure(error) + } if (conf.formatValidationCheckValue) { val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema) - validationError.foreach { error => throw new InvalidUnsafeRowException(error) } + validationError.foreach { error => + throw StateStoreErrors.stateFormatValidationFailure(error) + } } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index b8ab32a00851f..33e1b57a51602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -39,6 +39,19 @@ object StateStoreErrors { ) } + def stateFormatValidationFailure(errorMsg: String): SparkException = { + SparkException.internalError( + msg = "The streaming query failed to validate written state. " + + "The following reasons may cause this: " + + "1. An old Spark version wrote the checkpoint that is incompatible with the current one; " + + "2. Corrupt checkpoint files; " + + "3. The query changed in an incompatible way between restarts. " + + "For the first case, use a new checkpoint directory or use the original Spark version" + + s"to process the streaming state. Retrieved error_message=$errorMsg", + category = "STATE_STORE" + ) + } + def unsupportedOperationOnMissingColumnFamily(operationName: String, colFamilyName: String): StateStoreUnsupportedOperationOnMissingColumnFamily = { new StateStoreUnsupportedOperationOnMissingColumnFamily(operationName, colFamilyName) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 6a6867fbb5523..14e00395c6e7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -1606,12 +1606,12 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] // By default, when there is an invalid pair of value row and value schema, it should throw val keyRow = dataToKeyRow("key", 1) val valueRow = dataToValueRow(2) - val e = intercept[InvalidUnsafeRowException] { + val e = intercept[SparkException] { // Here valueRow doesn't match with prefixKeySchema StateStoreProvider.validateStateRowFormat( keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf()) } - assert(e.getMessage.contains("The streaming query failed by state format invalidation")) + assert(e.getMessage.contains("The streaming query failed to validate written state")) // When sqlConf.stateStoreFormatValidationEnabled is set to false and // StateStoreConf.FORMAT_VALIDATION_CHECK_VALUE_CONFIG is set to true, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala index 4827d06d64d07..77cc4bf1efe45 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.SparkException import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.execution.streaming.state.{InvalidUnsafeRowException, StateSchemaNotCompatible} +import org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible import org.apache.spark.sql.functions._ import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.Utils @@ -254,7 +254,7 @@ class StreamingStateStoreFormatCompatibilitySuite extends StreamTest { private def findStateSchemaException(exc: Throwable): Boolean = { exc match { case _: StateSchemaNotCompatible => true - case _: InvalidUnsafeRowException => true + case _: SparkException => true case e1 if e1.getCause != null => findStateSchemaException(e1.getCause) case _ => false } From a31064cf54c11a20b525d2729a2a0bd9f7f9783d Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 12 Jun 2024 12:15:59 -0700 Subject: [PATCH 2/6] Change to address Jungtaek's comments --- .../resources/error/error-conditions.json | 30 ++++++++++++++---- .../streaming/state/StateStore.scala | 4 +-- .../streaming/state/StateStoreErrors.scala | 31 ++++++++++++------- 3 files changed, 45 insertions(+), 20 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index df75acf2abc16..d127de9d70455 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -1802,12 +1802,6 @@ ], "sqlState" : "XX000" }, - "INTERNAL_ERROR_STATE_STORE" : { - "message" : [ - "" - ], - "sqlState" : "XX000" - }, "INTERNAL_ERROR_STORAGE" : { "message" : [ "" @@ -3736,6 +3730,18 @@ ], "sqlState" : "42K06" }, + "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE" : { + "message" : [ + "The streaming query failed to validate written state for value row.", + "The following reasons may cause this:", + "1. An old Spark version wrote the checkpoint that is incompatible with the current one", + "2. Corrupt checkpoint files", + "3. The query changed in an incompatible way between restarts", + "For the first case, use a new checkpoint directory or use the original Spark version", + "to process the streaming state. Retrieved error_message=" + ], + "sqlState" : "XX000" + }, "STATE_STORE_KEY_SCHEMA_NOT_COMPATIBLE" : { "message" : [ "Provided key schema does not match existing state key schema.", @@ -3770,6 +3776,18 @@ ], "sqlState" : "42802" }, + "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE" : { + "message" : [ + "The streaming query failed to validate written state for key row.", + "The following reasons may cause this:", + "1. An old Spark version wrote the checkpoint that is incompatible with the current one", + "2. Corrupt checkpoint files", + "3. The query changed in an incompatible way between restarts", + "For the first case, use a new checkpoint directory or use the original Spark version", + "to process the streaming state. Retrieved error_message=" + ], + "sqlState" : "XX000" + }, "STATE_STORE_VALUE_SCHEMA_NOT_COMPATIBLE" : { "message" : [ "Provided value schema does not match existing state value schema.", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala index 8d847fee1cacb..2f9ce2c236f4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala @@ -425,14 +425,14 @@ object StateStoreProvider { if (conf.formatValidationEnabled) { val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(keyRow, keySchema) validationError.foreach { error => - throw StateStoreErrors.stateFormatValidationFailure(error) + throw StateStoreErrors.keyRowFormatValidationFailure(error) } if (conf.formatValidationCheckValue) { val validationError = UnsafeRowUtils.validateStructuralIntegrityWithReason(valueRow, valueSchema) validationError.foreach { error => - throw StateStoreErrors.stateFormatValidationFailure(error) + throw StateStoreErrors.valueRowFormatValidationFailure(error) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index 2e924215fc81f..e8da21983592f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.streaming.state -import org.apache.spark.{SparkException, SparkUnsupportedOperationException} +import org.apache.spark.{SparkException, SparkRuntimeException, SparkUnsupportedOperationException} /** * Object for grouping error messages from (most) exceptions thrown from State API V2 @@ -39,17 +39,14 @@ object StateStoreErrors { ) } - def stateFormatValidationFailure(errorMsg: String): SparkException = { - SparkException.internalError( - msg = "The streaming query failed to validate written state. " + - "The following reasons may cause this: " + - "1. An old Spark version wrote the checkpoint that is incompatible with the current one; " + - "2. Corrupt checkpoint files; " + - "3. The query changed in an incompatible way between restarts. " + - "For the first case, use a new checkpoint directory or use the original Spark version" + - s"to process the streaming state. Retrieved error_message=$errorMsg", - category = "STATE_STORE" - ) + def keyRowFormatValidationFailure(errorMsg: String): + StateStoreKeyRowFormatValidationFailure = { + new StateStoreKeyRowFormatValidationFailure(errorMsg) + } + + def valueRowFormatValidationFailure(errorMsg: String): + StateStoreValueRowFormatValidationFailure = { + new StateStoreValueRowFormatValidationFailure(errorMsg) } def unsupportedOperationOnMissingColumnFamily(operationName: String, colFamilyName: String): @@ -258,3 +255,13 @@ class StateStoreValueSchemaNotCompatible( "storedValueSchema" -> storedValueSchema, "newValueSchema" -> newValueSchema)) +class StateStoreKeyRowFormatValidationFailure( + errorMsg: String) extends SparkRuntimeException( + errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE", + messageParameters = Map("errorMsg" -> errorMsg)) + +class StateStoreValueRowFormatValidationFailure( + errorMsg: String) extends SparkRuntimeException( + errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE", + messageParameters = Map("errorMsg" -> errorMsg)) + From effb7cb84464c5c0a16bc0cb4a5f436dfdfc71c8 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 12 Jun 2024 12:17:48 -0700 Subject: [PATCH 3/6] Misc fix --- common/utils/src/main/resources/error/error-conditions.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index d127de9d70455..0a1ca3920fd08 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3732,7 +3732,7 @@ }, "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE" : { "message" : [ - "The streaming query failed to validate written state for value row.", + "The streaming query failed to validate written state for key row.", "The following reasons may cause this:", "1. An old Spark version wrote the checkpoint that is incompatible with the current one", "2. Corrupt checkpoint files", @@ -3778,7 +3778,7 @@ }, "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE" : { "message" : [ - "The streaming query failed to validate written state for key row.", + "The streaming query failed to validate written state for value row.", "The following reasons may cause this:", "1. An old Spark version wrote the checkpoint that is incompatible with the current one", "2. Corrupt checkpoint files", From c137d0643bb9dd50afef5317d1fd691f76cbe004 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 12 Jun 2024 12:57:41 -0700 Subject: [PATCH 4/6] Fix test --- .../spark/sql/execution/streaming/state/StateStoreSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala index 14e00395c6e7e..98b2030f1bac4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala @@ -41,6 +41,7 @@ import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeProj import org.apache.spark.sql.catalyst.util.quietly import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorSuite.withCoordinatorRef +import org.apache.spark.sql.execution.streaming.state.StateStoreValueRowFormatValidationFailure import org.apache.spark.sql.functions.count import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ @@ -1606,7 +1607,7 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] // By default, when there is an invalid pair of value row and value schema, it should throw val keyRow = dataToKeyRow("key", 1) val valueRow = dataToValueRow(2) - val e = intercept[SparkException] { + val e = intercept[StateStoreValueRowFormatValidationFailure] { // Here valueRow doesn't match with prefixKeySchema StateStoreProvider.validateStateRowFormat( keyRow, keySchema, valueRow, keySchema, getDefaultStoreConf()) From 8c39500b989751009091aa899661d2c1c212fe45 Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 12 Jun 2024 13:05:14 -0700 Subject: [PATCH 5/6] Fix indent --- .../sql/execution/streaming/state/StateStoreErrors.scala | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala index e8da21983592f..205e093e755d5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreErrors.scala @@ -255,13 +255,12 @@ class StateStoreValueSchemaNotCompatible( "storedValueSchema" -> storedValueSchema, "newValueSchema" -> newValueSchema)) -class StateStoreKeyRowFormatValidationFailure( - errorMsg: String) extends SparkRuntimeException( +class StateStoreKeyRowFormatValidationFailure(errorMsg: String) + extends SparkRuntimeException( errorClass = "STATE_STORE_KEY_ROW_FORMAT_VALIDATION_FAILURE", messageParameters = Map("errorMsg" -> errorMsg)) -class StateStoreValueRowFormatValidationFailure( - errorMsg: String) extends SparkRuntimeException( +class StateStoreValueRowFormatValidationFailure(errorMsg: String) + extends SparkRuntimeException( errorClass = "STATE_STORE_VALUE_ROW_FORMAT_VALIDATION_FAILURE", messageParameters = Map("errorMsg" -> errorMsg)) - From 4aa8b2d884284be65269234c29088a2396d7a48b Mon Sep 17 00:00:00 2001 From: Anish Shrigondekar Date: Wed, 12 Jun 2024 22:43:26 -0700 Subject: [PATCH 6/6] Address Jungtaek's comments --- .../StreamingStateStoreFormatCompatibilitySuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala index 4c6374c82f09d..8a9d4d42ef2b5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingStateStoreFormatCompatibilitySuite.scala @@ -27,6 +27,7 @@ import org.apache.spark.{SparkException, SparkUnsupportedOperationException} import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Complete import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.sql.execution.streaming.state.{StateStoreKeyRowFormatValidationFailure, StateStoreValueRowFormatValidationFailure} import org.apache.spark.sql.functions._ import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.Utils @@ -253,7 +254,8 @@ class StreamingStateStoreFormatCompatibilitySuite extends StreamTest { private def findStateSchemaException(exc: Throwable): Boolean = { exc match { case _: SparkUnsupportedOperationException => true - case _: SparkException => true + case _: StateStoreKeyRowFormatValidationFailure => true + case _: StateStoreValueRowFormatValidationFailure => true case e1 if e1.getCause != null => findStateSchemaException(e1.getCause) case _ => false }