Skip to content

Commit

Permalink
review changes, name protobuf error class and comment in pom.xml
Browse files Browse the repository at this point in the history
  • Loading branch information
SandishKumarHN committed Oct 24, 2022
1 parent 70a5983 commit 26e471b
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 160 deletions.
1 change: 1 addition & 0 deletions connector/protobuf/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}</protocArtifact>
<protocVersion>${protobuf.version}</protocVersion>
<!-- This extracts .proto files from maven dependencies and adds them to the protoc import path -->
<includeMavenTypes>direct</includeMavenTypes>
<inputDirectories>
<include>src/test/resources/protobuf</include>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ private[sql] class ProtobufDeserializer(
case ise: AnalysisException =>
throw QueryCompilationErrors.cannotConvertProtobufTypeToCatalystTypeError(
rootDescriptor.getName,
rootCatalystType.sql,
rootCatalystType,
ise)
}

Expand Down Expand Up @@ -170,8 +170,9 @@ private[sql] class ProtobufDeserializer(
case (INT, ShortType) =>
(updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])

case (BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
case (
BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)

case (LONG, LongType) =>
Expand Down Expand Up @@ -245,7 +246,7 @@ private[sql] class ProtobufDeserializer(
toFieldStr(catalystPath),
s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
s" ${protoType.getType}",
catalystType.sql)
catalystType)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ private[sql] class ProtobufSerializer(
case ise: AnalysisException =>
throw QueryCompilationErrors.cannotConvertSqlTypeToProtobufError(
rootDescriptor.getName,
rootCatalystType.sql,
rootCatalystType,
ise)
}
if (nullable) { (data: Any) =>
Expand Down Expand Up @@ -104,7 +104,9 @@ private[sql] class ProtobufSerializer(
val data = getter.getUTF8String(ordinal).toString
if (!enumSymbols.contains(data)) {
throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufEnumTypeError(
toFieldStr(catalystPath), toFieldStr(protoPath), data,
toFieldStr(catalystPath),
toFieldStr(protoPath),
data,
enumSymbols.mkString("\"", "\", \"", "\""))
}
fieldDescriptor.getEnumType.findValueByName(data)
Expand All @@ -122,7 +124,8 @@ private[sql] class ProtobufSerializer(
case (TimestampType, MESSAGE) =>
(getter, ordinal) =>
val millis = DateTimeUtils.microsToMillis(getter.getLong(ordinal))
Timestamp.newBuilder()
Timestamp
.newBuilder()
.setSeconds((millis / 1000))
.setNanos(((millis % 1000) * 1000000).toInt)
.build()
Expand Down Expand Up @@ -199,7 +202,8 @@ private[sql] class ProtobufSerializer(
val calendarInterval = IntervalUtils.fromIntervalString(dayTimeIntervalString)

val millis = DateTimeUtils.microsToMillis(calendarInterval.microseconds)
val duration = Duration.newBuilder()
val duration = Duration
.newBuilder()
.setSeconds((millis / 1000))
.setNanos(((millis % 1000) * 1000000).toInt)

Expand All @@ -216,7 +220,7 @@ private[sql] class ProtobufSerializer(
throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
toFieldStr(catalystPath),
toFieldStr(protoPath),
catalystType.sql,
catalystType,
s"${fieldDescriptor} ${fieldDescriptor.toProto.getLabel} ${fieldDescriptor.getJavaType}" +
s" ${fieldDescriptor.getType}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,9 @@ private[sql] object ProtobufUtils extends Logging {

/**
* Validate that there are no Catalyst fields which don't have a matching Protobuf field,
* throwing [[AnalysisException]] if such extra fields are found. If
* `ignoreNullable` is false, consider nullable Catalyst fields to be eligible to be an extra
* field; otherwise, ignore nullable Catalyst fields when checking for extras.
* throwing [[AnalysisException]] if such extra fields are found. If `ignoreNullable` is
* false, consider nullable Catalyst fields to be eligible to be an extra field; otherwise,
* ignore nullable Catalyst fields when checking for extras.
*/
def validateNoExtraCatalystFields(ignoreNullable: Boolean): Unit =
catalystSchema.fields.foreach { sqlField =>
Expand All @@ -94,8 +94,8 @@ private[sql] object ProtobufUtils extends Logging {

/**
* Validate that there are no Protobuf fields which don't have a matching Catalyst field,
* throwing [[AnalysisException]] if such extra fields are found. Only required
* (non-nullable) fields are checked; nullable fields are ignored.
* throwing [[AnalysisException]] if such extra fields are found. Only required (non-nullable)
* fields are checked; nullable fields are ignored.
*/
def validateNoExtraRequiredProtoFields(): Unit = {
val extraFields = protoFieldArray.toSet -- matchedFields.map(_.fieldDescriptor)
Expand Down Expand Up @@ -183,7 +183,8 @@ private[sql] object ProtobufUtils extends Logging {
descriptor match {
case Some(d) => d
case None =>
throw QueryCompilationErrors.unableToLocateProtobuMessageError(messageName) }
throw QueryCompilationErrors.unableToLocateProtobufMessageError(messageName)
}
}

private def parseFileDescriptor(descFilePath: String): Descriptors.FileDescriptor = {
Expand All @@ -199,7 +200,7 @@ private[sql] object ProtobufUtils extends Logging {
}

val descriptorProto: DescriptorProtos.FileDescriptorProto =
fileDescriptorSet.getFile(fileDescriptorSet.getFileList.size() - 1)
fileDescriptorSet.getFileList.asScala.last

var fileDescriptorList = List[Descriptors.FileDescriptor]()
for (fd <- fileDescriptorSet.getFileList.asScala) {
Expand All @@ -209,9 +210,8 @@ private[sql] object ProtobufUtils extends Logging {
}
}
try {
val fileDescriptor: Descriptors.FileDescriptor = Descriptors.FileDescriptor.buildFrom(
descriptorProto,
fileDescriptorList.toArray)
val fileDescriptor: Descriptors.FileDescriptor =
Descriptors.FileDescriptor.buildFrom(descriptorProto, fileDescriptorList.toArray)
if (fileDescriptor.getMessageTypes().isEmpty()) {
throw QueryCompilationErrors.noProtobufMessageTypeReturnError(fileDescriptor.getName())
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,14 @@ object SchemaConverters {
case STRING => Some(StringType)
case BYTE_STRING => Some(BinaryType)
case ENUM => Some(StringType)
case MESSAGE if fd.getMessageType.getName == "Duration" &&
case MESSAGE
if fd.getMessageType.getName == "Duration" &&
fd.getMessageType.getFields.size() == 2 &&
fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
fd.getMessageType.getFields.get(1).getName.equals("nanos") =>
Some(DayTimeIntervalType.defaultConcreteType)
case MESSAGE if fd.getMessageType.getName == "Timestamp" &&
case MESSAGE
if fd.getMessageType.getName == "Timestamp" &&
fd.getMessageType.getFields.size() == 2 &&
fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
fd.getMessageType.getFields.get(1).getName.equals("nanos") =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.protobuf
import com.google.protobuf.Descriptors.Descriptor
import com.google.protobuf.DynamicMessage

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.NoopFilters
import org.apache.spark.sql.catalyst.expressions.Cast.toSQLType
import org.apache.spark.sql.protobuf.utils.ProtobufUtils
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.{IntegerType, StructType}
Expand Down Expand Up @@ -163,15 +163,16 @@ class ProtobufSerdeSuite extends SharedSparkSession {
fieldMatchType: MatchType,
expectedCauseMessage: String,
catalystSchema: StructType = CATALYST_STRUCT): Unit = {
val e = intercept[AnalysisException] {
val e = intercept[Exception] {
serdeFactory.create(catalystSchema, protoSchema, fieldMatchType)
}
val expectMsg = serdeFactory match {
case Deserializer =>
s"Unable to convert ${protoSchema.getName} of Protobuf to SQL type ${catalystSchema.sql}."
s"[PROTOBUF_TYPE_TO_CATALYST_TYPE_ERROR] Unable to convert" +
s" ${protoSchema.getName} of Protobuf to SQL type ${toSQLType(catalystSchema)}."
case Serializer =>
s"Unable to convert SQL type ${catalystSchema.sql} to Protobuf type" +
s" ${protoSchema.getName}."
s"[UNABLE_TO_CONVERT_TO_PROTOBUF_TYPE] Unable to convert SQL type" +
s" ${toSQLType(catalystSchema)} to Protobuf type ${protoSchema.getName}."
}

assert(e.getMessage === expectMsg)
Expand Down
Loading

0 comments on commit 26e471b

Please sign in to comment.