Skip to content

Commit

Permalink
[SPARK-40777][SQL][PROTOBUF] Protobuf import support and move error-c…
Browse files Browse the repository at this point in the history
…lasses

This is the follow-up PR to #37972 and #38212

### What changes were proposed in this pull request?
1. Move spark-protobuf error classes to the spark error-classes framework(core/src/main/resources/error/error-classes.json).
2. Support protobuf imports
3. validate protobuf timestamp and duration types.

### Why are the changes needed?
N/A

### Does this PR introduce _any_ user-facing change?
None

### How was this patch tested?
Existing tests should cover the validation of this PR.

CC: rangadi mposdev21 gengliangwang

Closes #38344 from SandishKumarHN/SPARK-40777-ProtoErrorCls.

Authored-by: SandishKumarHN <sanysandish@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
  • Loading branch information
SandishKumarHN authored and HeartSaVioR committed Nov 4, 2022
1 parent d1dfa43 commit 5741d38
Show file tree
Hide file tree
Showing 20 changed files with 625 additions and 191 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import scala.util.control.NonFatal

import com.google.protobuf.DynamicMessage

import org.apache.spark.SparkException
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{ExpectsInputTypes, Expression, SpecificInternalRow, UnaryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGenerator, ExprCode}
import org.apache.spark.sql.catalyst.util.{FailFastMode, ParseMode, PermissiveMode}
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.protobuf.utils.{ProtobufOptions, ProtobufUtils, SchemaConverters}
import org.apache.spark.sql.types.{AbstractDataType, BinaryType, DataType, StructType}

Expand Down Expand Up @@ -71,16 +70,11 @@ private[protobuf] case class ProtobufDataToCatalyst(
@transient private lazy val parseMode: ParseMode = {
val mode = protobufOptions.parseMode
if (mode != PermissiveMode && mode != FailFastMode) {
throw new AnalysisException(unacceptableModeMessage(mode.name))
throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, mode)
}
mode
}

private def unacceptableModeMessage(name: String): String = {
s"from_protobuf() doesn't support the $name mode. " +
s"Acceptable modes are ${PermissiveMode.name} and ${FailFastMode.name}."
}

@transient private lazy val nullResultRow: Any = dataType match {
case st: StructType =>
val resultRow = new SpecificInternalRow(st.map(_.dataType))
Expand All @@ -98,13 +92,9 @@ private[protobuf] case class ProtobufDataToCatalyst(
case PermissiveMode =>
nullResultRow
case FailFastMode =>
throw new SparkException(
"Malformed records are detected in record parsing. " +
s"Current parse Mode: ${FailFastMode.name}. To process malformed records as null " +
"result, try setting the option 'mode' as 'PERMISSIVE'.",
e)
throw QueryExecutionErrors.malformedProtobufMessageDetectedInMessageParsingError(e)
case _ =>
throw new AnalysisException(unacceptableModeMessage(parseMode.name))
throw QueryCompilationErrors.parseModeUnsupportedError(prettyName, parseMode)
}
}

Expand All @@ -119,8 +109,8 @@ private[protobuf] case class ProtobufDataToCatalyst(
case Some(number) =>
// Unknown fields contain a field with same number as a known field. Must be due to
// mismatch of schema between writer and reader here.
throw new IllegalArgumentException(s"Type mismatch encountered for field:" +
s" ${messageDescriptor.getFields.get(number)}")
throw QueryCompilationErrors.protobufFieldTypeMismatchError(
messageDescriptor.getFields.get(number).toString)
case None =>
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import com.google.protobuf.{ByteString, DynamicMessage, Message}
import com.google.protobuf.Descriptors._
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, StructFilters}
import org.apache.spark.sql.catalyst.expressions.{SpecificInternalRow, UnsafeArrayData}
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, DateTimeUtils, GenericArrayData}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.ProtoMatchedField
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.toFieldStr
import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String

Expand Down Expand Up @@ -61,10 +61,10 @@ private[sql] class ProtobufDeserializer(
}
}
} catch {
case ise: IncompatibleSchemaException =>
throw new IncompatibleSchemaException(
s"Cannot convert Protobuf type ${rootDescriptor.getName} " +
s"to SQL type ${rootCatalystType.sql}.",
case ise: AnalysisException =>
throw QueryCompilationErrors.cannotConvertProtobufTypeToCatalystTypeError(
rootDescriptor.getName,
rootCatalystType,
ise)
}

Expand Down Expand Up @@ -152,11 +152,6 @@ private[sql] class ProtobufDeserializer(
catalystType: DataType,
protoPath: Seq[String],
catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
val errorPrefix = s"Cannot convert Protobuf ${toFieldStr(protoPath)} to " +
s"SQL ${toFieldStr(catalystPath)} because "
val incompatibleMsg = errorPrefix +
s"schema is incompatible (protoType = ${protoType} ${protoType.toProto.getLabel} " +
s"${protoType.getJavaType} ${protoType.getType}, sqlType = ${catalystType.sql})"

(protoType.getJavaType, catalystType) match {

Expand All @@ -175,8 +170,9 @@ private[sql] class ProtobufDeserializer(
case (INT, ShortType) =>
(updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])

case (BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
case (
BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)

case (LONG, LongType) =>
Expand All @@ -199,7 +195,8 @@ private[sql] class ProtobufDeserializer(
(updater, ordinal, value) =>
val byte_array = value match {
case s: ByteString => s.toByteArray
case _ => throw new Exception("Invalid ByteString format")
case unsupported =>
throw QueryCompilationErrors.invalidByteStringFormatError(unsupported)
}
updater.set(ordinal, byte_array)

Expand Down Expand Up @@ -244,7 +241,13 @@ private[sql] class ProtobufDeserializer(
case (ENUM, StringType) =>
(updater, ordinal, value) => updater.set(ordinal, UTF8String.fromString(value.toString))

case _ => throw new IncompatibleSchemaException(incompatibleMsg)
case _ =>
throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
toFieldStr(protoPath),
catalystPath,
s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
s" ${protoType.getType}",
catalystType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ import com.google.protobuf.Descriptors.{Descriptor, FieldDescriptor}
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, IntervalUtils}
import org.apache.spark.sql.catalyst.util.IntervalStringStyles.ANSI_STYLE
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.protobuf.utils.ProtobufUtils.{toFieldStr, ProtoMatchedField}
import org.apache.spark.sql.protobuf.utils.SchemaConverters.IncompatibleSchemaException
import org.apache.spark.sql.types._

/**
Expand All @@ -53,10 +54,10 @@ private[sql] class ProtobufSerializer(
newStructConverter(st, rootDescriptor, Nil, Nil).asInstanceOf[Any => Any]
}
} catch {
case ise: IncompatibleSchemaException =>
throw new IncompatibleSchemaException(
s"Cannot convert SQL type ${rootCatalystType.sql} to Protobuf type " +
s"${rootDescriptor.getName}.",
case ise: AnalysisException =>
throw QueryCompilationErrors.cannotConvertSqlTypeToProtobufError(
rootDescriptor.getName,
rootCatalystType,
ise)
}
if (nullable) { (data: Any) =>
Expand All @@ -77,8 +78,6 @@ private[sql] class ProtobufSerializer(
fieldDescriptor: FieldDescriptor,
catalystPath: Seq[String],
protoPath: Seq[String]): Converter = {
val errorPrefix = s"Cannot convert SQL ${toFieldStr(catalystPath)} " +
s"to Protobuf ${toFieldStr(protoPath)} because "
(catalystType, fieldDescriptor.getJavaType) match {
case (NullType, _) =>
(getter, ordinal) => null
Expand All @@ -104,10 +103,11 @@ private[sql] class ProtobufSerializer(
(getter, ordinal) =>
val data = getter.getUTF8String(ordinal).toString
if (!enumSymbols.contains(data)) {
throw new IncompatibleSchemaException(
errorPrefix +
s""""$data" cannot be written since it's not defined in enum """ +
enumSymbols.mkString("\"", "\", \"", "\""))
throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufEnumTypeError(
catalystPath,
toFieldStr(protoPath),
data,
enumSymbols.mkString("\"", "\", \"", "\""))
}
fieldDescriptor.getEnumType.findValueByName(data)
case (StringType, STRING) =>
Expand All @@ -124,7 +124,8 @@ private[sql] class ProtobufSerializer(
case (TimestampType, MESSAGE) =>
(getter, ordinal) =>
val millis = DateTimeUtils.microsToMillis(getter.getLong(ordinal))
Timestamp.newBuilder()
Timestamp
.newBuilder()
.setSeconds((millis / 1000))
.setNanos(((millis % 1000) * 1000000).toInt)
.build()
Expand Down Expand Up @@ -201,7 +202,8 @@ private[sql] class ProtobufSerializer(
val calendarInterval = IntervalUtils.fromIntervalString(dayTimeIntervalString)

val millis = DateTimeUtils.microsToMillis(calendarInterval.microseconds)
val duration = Duration.newBuilder()
val duration = Duration
.newBuilder()
.setSeconds((millis / 1000))
.setNanos(((millis % 1000) * 1000000).toInt)

Expand All @@ -215,10 +217,12 @@ private[sql] class ProtobufSerializer(
duration.build()

case _ =>
throw new IncompatibleSchemaException(
errorPrefix +
s"schema is incompatible (sqlType = ${catalystType.sql}, " +
s"protoType = ${fieldDescriptor.getJavaType})")
throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
catalystPath,
toFieldStr(protoPath),
catalystType,
s"${fieldDescriptor} ${fieldDescriptor.toProto.getLabel} ${fieldDescriptor.getJavaType}" +
s" ${fieldDescriptor.getType}")
}
}

Expand Down
Loading

1 comment on commit 5741d38

@baganokodo2022
Copy link

@baganokodo2022 baganokodo2022 commented on 5741d38 Dec 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In many places (micro services), engineers love to use oneof data type and circular references in their schema models for the sake of flexibility. Whereas, handling them in data warehouse or data lake is non-trivial.

We can't reject these use cases by erroring out on a circular reference to prevent an infinite looping during schema parsing. Thereby, I propose the following configuration parameters to empower users to choose their way in handling circular references.

protobufDescriptorConfig: { descriptorFilePath: /dbfs/FileStore/users/xinyu_liu/protobuf/event-trading-prime.desc messageName: MaterializedEvent circularReferenceTolerance: 0 circularReferenceType: field_name }

In which, circularReferenceType has 2 enum values,
When navigating a Protobuf schema, a repetitive **fully-qualified field name** is considered a circular reference,
FIELD_NAME
When navigating a Protobuf schema, a repetitive **field type** is considered a circular reference,
FIELD_TYPE

circularReferenceTolerance has Int type and may take a value of (-1, 0, 1, 2, ...).
When circularReferenceTolerance=-1, a RuntimeException is raised by detecting a circular reference. circularReferenceTolerance=0 will drop the field when it is repetitively entered. circularReferenceTolerance=1 allows the same Protobuf message name/type to be entered twice, but dropped the third time encountered.

Hope this design is simple but flexible to help engineers cope with circular references in schemas.

Above as a followup to the delightful discussion with Sandish.

Thank you
Xinyu Liu

Please sign in to comment.