Skip to content

Commit

Permalink
spark-protobuf error clss frameworks, import support, timestamp and d…
Browse files Browse the repository at this point in the history
…uration validation
  • Loading branch information
SandishKumarHN committed Oct 22, 2022
1 parent 7c866de commit e6f3cab
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 23 deletions.
1 change: 1 addition & 0 deletions connector/protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
<protocVersion>${protobuf.version}</protocVersion>
<includeMavenTypes>direct</includeMavenTypes>
<inputDirectories>
<include>src/test/resources/protobuf</include>
</inputDirectories>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import scala.util.control.NonFatal

import com.google.protobuf.DynamicMessage

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters}
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType}

Expand Down Expand Up @@ -71,16 +70,11 @@ private[protobuf] case class ProtobufDataToCatalyst(
@transient private lazy val parseMode: ParseMode = {
val mode = protobufOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw new AnalysisException(unacceptableModeMessage(mode.name))
throw QueryCompilationErrors.parseModeUnsupportedError("from_protobuf", mode)
}
mode
}

private def unacceptableModeMessage(name: String): String = {
s"from_protobuf() doesn't support the $name mode. " +
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
}

@transient private lazy val nullResultRow: Any = dataType match {
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
Expand All @@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
case PermissiveMode =>
nullResultRow
case FailFastMode =>
throw new SparkException(
"Malformed records are detected in record parsing. " +
s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
"result, try setting the option 'mode' as 'PERMISSIVE'.",
e)
throw QueryCompilationErrors.malformedRecordsDetectedInRecordParsingError(e)
case _ =>
throw new AnalysisException(unacceptableModeMessage(parseMode.name))
throw QueryCompilationErrors.parseModeUnsupportedError("from_protobuf", parseMode)
}
}

Expand All @@ -119,8 +109,8 @@ private[protobuf] case class ProtobufDataToCatalyst(
case Some(number) =>
// Unknown fields contain a field with same number as a known field. Must be due to
// mismatch of schema between writer and reader here.
throw new IllegalArgumentException(s"Type mismatch encountered for field:" +
s" ${messageDescriptor.getFields.get(number)}")
throw QueryCompilationErrors.protobufFieldTypeMismatchError(
messageDescriptor.getFields.get(number).toString)
case None =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private[sql] class ProtobufDeserializer(
(updater, ordinal, value) =>
val byte_array = value match {
case s: ByteString => s.toByteArray
case _ => throw new Exception("Invalid ByteString format")
case _ => throw QueryCompilationErrors.invalidBytStringFormatError()
}
updater.set(ordinal, byte_array)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,12 @@ private[sql] object ProtobufUtils extends Logging {
} catch {
case _: ClassNotFoundException =>
val hasDots = protobufClassName.contains(".")
throw new IllegalArgumentException(
s"Could not load Protobuf class with name '$protobufClassName'" +
(if (hasDots) "" else ". Ensure the class name includes package prefix.")
)
throw QueryCompilationErrors.protobufClassLoadError(protobufClassName,
if (hasDots) "" else ". Ensure the class name includes package prefix.")
}

if (!classOf[Message].isAssignableFrom(protobufClass)) {
throw new IllegalArgumentException(s"$protobufClassName is not a Protobuf message type")
throw QueryCompilationErrors.protobufMessageTypeError(protobufClassName)
// TODO: Need to support V2. This might work with V2 classes too.
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri
.setField(messageEnumDesc.findFieldByName("value"), "value")
.setField(
messageEnumDesc.findFieldByName("nested_enum"),
messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("ESTED_NOTHING"))
messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_NOTHING"))
.setField(
messageEnumDesc.findFieldByName("nested_enum"),
messageEnumDesc.findEnumTypeByName("NestedEnum").findValueByName("NESTED_FIRST"))
Expand Down
20 changes: 20 additions & 0 deletions core/src/main/resources/error/error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -4355,5 +4355,25 @@
"message" : [
"Found recursive reference in Protobuf schema, which can not be processed by Spark: <fieldDescriptor>"
]
},
"_LEGACY_ERROR_TEMP_2267" : {
"message" : [
"Type mismatch encountered for field: <field>"
]
},
"_LEGACY_ERROR_TEMP_2268" : {
"message" : [
"Could not load Protobuf class with name <protobufClassName> <errorMessage>"
]
},
"_LEGACY_ERROR_TEMP_2269" : {
"message" : [
"<protobufClassName> is not a Protobuf message type"
]
},
"_LEGACY_ERROR_TEMP_2270" : {
"message" : [
"Invalid ByteString format"
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scala.collection.mutable

import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.SparkThrowableHelper
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, QualifiedTableName, TableIdentifier}
Expand Down Expand Up @@ -3357,4 +3358,36 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase {
errorClass = "_LEGACY_ERROR_TEMP_2266",
messageParameters = Map("fieldDescriptor" -> fieldDescriptor))
}

def protobufFieldTypeMismatchError(field: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_2267",
messageParameters = Map("field" -> field))
}

def protobufClassLoadError(protobufClassName: String, errorMessage: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_2268",
messageParameters = Map(
"protobufClassName" -> protobufClassName,
"errorMessage" -> errorMessage))
}

def protobufMessageTypeError(protobufClassName: String): Throwable = {
new AnalysisException(
errorClass = "_LEGACY_ERROR_TEMP_2269",
messageParameters = Map("protobufClassName" -> protobufClassName))
}

def invalidBytStringFormatError(): Throwable = {
new AnalysisException(errorClass = "_LEGACY_ERROR_TEMP_2270", messageParameters = Map.empty)
}

def malformedRecordsDetectedInRecordParsingError(e1: Throwable): Throwable = {
new SparkException(
errorClass = "_LEGACY_ERROR_TEMP_2177",
messageParameters = Map(
"failFastMode" -> FailFastMode.name),
cause = e1)
}
}

0 comments on commit e6f3cab

Please sign in to comment.