From bcc333faa52fe600222c4b9bdde03f7bc235b22e Mon Sep 17 00:00:00 2001 From: SandishKumarHN Date: Fri, 4 Nov 2022 17:10:57 -0700 Subject: [PATCH 1/5] null check for data generator --- .../ProtobufCatalystDataConversionSuite.scala | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index 271c5b0fec894..896f5092a5182 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -123,7 +123,7 @@ class ProtobufCatalystDataConversionSuite StringType -> ("StringMsg", "")) testingTypes.foreach { dt => - val seed = 1 + scala.util.Random.nextInt((1024 - 1) + 1) + val seed = scala.util.Random.nextInt(RandomDataGenerator.MAX_STR_LEN) test(s"single $dt with seed $seed") { val (messageName, defaultValue) = catalystTypesToProtoMessages(dt.fields(0).dataType) @@ -131,8 +131,13 @@ class ProtobufCatalystDataConversionSuite val rand = new scala.util.Random(seed) val generator = RandomDataGenerator.forType(dt, rand = rand).get var data = generator() - while (data.asInstanceOf[Row].get(0) == defaultValue) // Do not use default values, since - data = generator() // from_protobuf() returns null in v3. + // Do not use default values, since from_protobuf() returns null in v3. + while ( + data != null && + (data.asInstanceOf[Row].get(0) == defaultValue || + (data.asInstanceOf[Row].get(0).isInstanceOf[Array[Byte]] && + data.asInstanceOf[Row].get(0).asInstanceOf[Array[Byte]].isEmpty))) + data = generator() val converter = CatalystTypeConverters.createToCatalystConverter(dt) val input = Literal.create(converter(data), dt) From d4e26b9f9ae3f5e1e19a9cd4e6c69850fe2897e7 Mon Sep 17 00:00:00 2001 From: SandishKumarHN Date: Sat, 5 Nov 2022 12:01:57 -0700 Subject: [PATCH 2/5] bug fix Map.empty to Map with Utest --- .../spark/sql/protobuf/ProtobufSerdeSuite.scala | 13 +++++++++++++ .../spark/sql/errors/QueryCompilationErrors.scala | 4 ++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala index 840535654ed6a..a5d6b51c1a1a9 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala @@ -177,6 +177,19 @@ class ProtobufSerdeSuite extends SharedSparkSession { withFieldMatchType(Deserializer.create(CATALYST_STRUCT, protoNestedFile, _)) } + test("raise cannot parse protobuf descriptor error") { + // passing serde_suite.proto instead serde_suite.desc + val testFileDesc = testFile("serde_suite.proto").replace("file:/", "/") + val e = intercept[AnalysisException] { + ProtobufUtils.buildDescriptor(testFileDesc, "FieldMissingInSQLRoot") + } + + checkError( + exception = e, + errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR", + parameters = Map("descFilePath" -> testFileDesc)) + } + /** * Attempt to convert `catalystSchema` to `protoSchema` (or vice-versa if `deserialize` is * true), assert that it fails, and assert that the _cause_ of the thrown exception has a diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index d1b94fc9e32c1..e093b1ad3e07a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -3330,7 +3330,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def descrioptorParseError(descFilePath: String, cause: Throwable): Throwable = { new AnalysisException( errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR", - messageParameters = Map.empty("descFilePath" -> descFilePath), + messageParameters = Map("descFilePath" -> descFilePath), cause = Option(cause.getCause)) } @@ -3344,7 +3344,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase { def failedParsingDescriptorError(descFilePath: String, cause: Throwable): Throwable = { new AnalysisException( errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", - messageParameters = Map.empty("descFilePath" -> descFilePath), + messageParameters = Map("descFilePath" -> descFilePath), cause = Option(cause.getCause)) } From f11b04d527bce6abe94c7d6131661adb5f4ee368 Mon Sep 17 00:00:00 2001 From: SandishKumarHN Date: Sun, 6 Nov 2022 12:53:46 -0800 Subject: [PATCH 3/5] review changes Utest for cannt construct proto desc --- .../sql/protobuf/utils/ProtobufUtils.scala | 4 ++-- .../test/resources/protobuf/basicmessage.proto | 1 + .../protobuf/basicmessage_noimports.desc | 18 ++++++++++++++++++ .../sql/protobuf/ProtobufSerdeSuite.scala | 12 ++++++++++++ 4 files changed, 33 insertions(+), 2 deletions(-) create mode 100644 connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index 4bd59ddce6cee..f1a073197da2b 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -189,7 +189,7 @@ private[sql] object ProtobufUtils extends Logging { } } - private def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = { + def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = { var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null try { val dscFile = new BufferedInputStream(new FileInputStream(descFilePath)) @@ -208,7 +208,7 @@ private[sql] object ProtobufUtils extends Logging { ).toList fileDescriptorList } catch { - case e: Descriptors.DescriptorValidationException => + case e: Exception => throw QueryCompilationErrors.failedParsingDescriptorError(descFilePath, e) } } diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto index 4252f349cf045..8f4c1bb8eae42 100644 --- a/connector/protobuf/src/test/resources/protobuf/basicmessage.proto +++ b/connector/protobuf/src/test/resources/protobuf/basicmessage.proto @@ -17,6 +17,7 @@ // cd connector/protobuf/src/test/resources/protobuf // protoc --java_out=./ basicmessage.proto // protoc --include_imports --descriptor_set_out=basicmessage.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto +// protoc --descriptor_set_out=basicmessage_noimports.desc --java_out=org/apache/spark/sql/protobuf/ basicmessage.proto syntax = "proto3"; diff --git a/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc b/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc new file mode 100644 index 0000000000000..26ba6552cb01d --- /dev/null +++ b/connector/protobuf/src/test/resources/protobuf/basicmessage_noimports.desc @@ -0,0 +1,18 @@ + +È +basicmessage.proto$org.apache.spark.sql.protobuf.protosnestedenum.proto"Ü + BasicMessage +id (Rid! + string_value ( R stringValue + int32_value (R +int32Value + int64_value (R +int64Value! + double_value (R doubleValue + float_value (R +floatValue + +bool_value (R boolValue + bytes_value ( R +bytesValueS + rnested_enum (20.org.apache.spark.sql.protobuf.protos.NestedEnumR rnestedEnumBBBasicMessageProtobproto3 \ No newline at end of file diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala index a5d6b51c1a1a9..fb7e8434cfb1f 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala @@ -190,6 +190,18 @@ class ProtobufSerdeSuite extends SharedSparkSession { parameters = Map("descFilePath" -> testFileDesc)) } + test("raise cannot construct protobuf descriptor error") { + val testFileDesc = testFile("basicmessage_noimports.desc").replace("file:/", "/") + val e = intercept[AnalysisException] { + ProtobufUtils.parseFileDescriptorSet(testFileDesc) + } + + checkError( + exception = e, + errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", + parameters = Map("descFilePath" -> testFileDesc)) + } + /** * Attempt to convert `catalystSchema` to `protoSchema` (or vice-versa if `deserialize` is * true), assert that it fails, and assert that the _cause_ of the thrown exception has a From c9abb2d1145df8a94c338d52e5944b1ef2293c82 Mon Sep 17 00:00:00 2001 From: SandishKumarHN Date: Mon, 7 Nov 2022 10:46:03 -0800 Subject: [PATCH 4/5] review changes nit, utests changes --- .../sql/protobuf/utils/ProtobufUtils.scala | 2 +- .../ProtobufCatalystDataConversionSuite.scala | 10 +++---- .../sql/protobuf/ProtobufFunctionsSuite.scala | 30 +++++++++++++++++++ .../sql/protobuf/ProtobufSerdeSuite.scala | 18 +++++------ 4 files changed, 44 insertions(+), 16 deletions(-) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index f1a073197da2b..52870be5fbe07 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -189,7 +189,7 @@ private[sql] object ProtobufUtils extends Logging { } } - def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = { + private def parseFileDescriptorSet(descFilePath: String): List[Descriptors.FileDescriptor] = { var fileDescriptorSet: DescriptorProtos.FileDescriptorSet = null try { val dscFile = new BufferedInputStream(new FileInputStream(descFilePath)) diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala index 896f5092a5182..9f9b51006ca81 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufCatalystDataConversionSuite.scala @@ -130,14 +130,14 @@ class ProtobufCatalystDataConversionSuite val rand = new scala.util.Random(seed) val generator = RandomDataGenerator.forType(dt, rand = rand).get - var data = generator() + var data = generator().asInstanceOf[Row] // Do not use default values, since from_protobuf() returns null in v3. while ( data != null && - (data.asInstanceOf[Row].get(0) == defaultValue || - (data.asInstanceOf[Row].get(0).isInstanceOf[Array[Byte]] && - data.asInstanceOf[Row].get(0).asInstanceOf[Array[Byte]].isEmpty))) - data = generator() + (data.get(0) == defaultValue || + (dt == BinaryType && + data.get(0).asInstanceOf[Array[Byte]].isEmpty))) + data = generator().asInstanceOf[Row] val converter = CatalystTypeConverters.createToCatalystConverter(dt) val input = Literal.create(converter(data), dt) diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala index 199ef235f1496..3aa76be5c3a3c 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala @@ -677,4 +677,34 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri === inputDf.select("durationMsg.duration").take(1).toSeq(0).get(0)) } } + + test("raise cannot construct protobuf descriptor error") { + val basicMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "BasicMessage") + + val basicMessage = DynamicMessage + .newBuilder(basicMessageDesc) + .setField(basicMessageDesc.findFieldByName("id"), 1111L) + .setField(basicMessageDesc.findFieldByName("string_value"), "slam") + .setField(basicMessageDesc.findFieldByName("int32_value"), 12345) + .setField(basicMessageDesc.findFieldByName("int64_value"), 0x90000000000L) + .setField(basicMessageDesc.findFieldByName("double_value"), 10000000000.0d) + .setField(basicMessageDesc.findFieldByName("float_value"), 10902.0f) + .setField(basicMessageDesc.findFieldByName("bool_value"), true) + .setField( + basicMessageDesc.findFieldByName("bytes_value"), + ByteString.copyFromUtf8("ProtobufDeserializer")) + .build() + + val df = Seq(basicMessage.toByteArray).toDF("value") + val testFileDescriptor = testFile("basicmessage_noimports.desc").replace("file:/", "/") + + val e = intercept[AnalysisException] { + df.select(functions.from_protobuf($"value", "BasicMessage", testFileDescriptor) as 'sample) + .where("sample.string_value == \"slam\"").show() + } + checkError( + exception = e, + errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", + parameters = Map("descFilePath" -> testFileDescriptor)) + } } diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala index fb7e8434cfb1f..22b9d58bbd449 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufSerdeSuite.scala @@ -177,27 +177,25 @@ class ProtobufSerdeSuite extends SharedSparkSession { withFieldMatchType(Deserializer.create(CATALYST_STRUCT, protoNestedFile, _)) } - test("raise cannot parse protobuf descriptor error") { + test("raise cannot parse and construct protobuf descriptor error") { // passing serde_suite.proto instead serde_suite.desc - val testFileDesc = testFile("serde_suite.proto").replace("file:/", "/") - val e = intercept[AnalysisException] { + var testFileDesc = testFile("serde_suite.proto").replace("file:/", "/") + val e1 = intercept[AnalysisException] { ProtobufUtils.buildDescriptor(testFileDesc, "FieldMissingInSQLRoot") } checkError( - exception = e, + exception = e1, errorClass = "CANNOT_PARSE_PROTOBUF_DESCRIPTOR", parameters = Map("descFilePath" -> testFileDesc)) - } - test("raise cannot construct protobuf descriptor error") { - val testFileDesc = testFile("basicmessage_noimports.desc").replace("file:/", "/") - val e = intercept[AnalysisException] { - ProtobufUtils.parseFileDescriptorSet(testFileDesc) + testFileDesc = testFile("basicmessage_noimports.desc").replace("file:/", "/") + val e2 = intercept[AnalysisException] { + ProtobufUtils.buildDescriptor(testFileDesc, "FieldMissingInSQLRoot") } checkError( - exception = e, + exception = e2, errorClass = "CANNOT_CONSTRUCT_PROTOBUF_DESCRIPTOR", parameters = Map("descFilePath" -> testFileDesc)) } From 44e920960e235336ceb576216fa59387b1234130 Mon Sep 17 00:00:00 2001 From: SandishKumarHN Date: Mon, 7 Nov 2022 10:55:53 -0800 Subject: [PATCH 5/5] review changes utest, empty protobuf --- .../sql/protobuf/ProtobufFunctionsSuite.scala | 18 +----------------- 1 file changed, 1 insertion(+), 17 deletions(-) diff --git a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala index 3aa76be5c3a3c..00ec56f90a632 100644 --- a/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala +++ b/connector/protobuf/src/test/scala/org/apache/spark/sql/protobuf/ProtobufFunctionsSuite.scala @@ -679,23 +679,7 @@ class ProtobufFunctionsSuite extends QueryTest with SharedSparkSession with Seri } test("raise cannot construct protobuf descriptor error") { - val basicMessageDesc = ProtobufUtils.buildDescriptor(testFileDesc, "BasicMessage") - - val basicMessage = DynamicMessage - .newBuilder(basicMessageDesc) - .setField(basicMessageDesc.findFieldByName("id"), 1111L) - .setField(basicMessageDesc.findFieldByName("string_value"), "slam") - .setField(basicMessageDesc.findFieldByName("int32_value"), 12345) - .setField(basicMessageDesc.findFieldByName("int64_value"), 0x90000000000L) - .setField(basicMessageDesc.findFieldByName("double_value"), 10000000000.0d) - .setField(basicMessageDesc.findFieldByName("float_value"), 10902.0f) - .setField(basicMessageDesc.findFieldByName("bool_value"), true) - .setField( - basicMessageDesc.findFieldByName("bytes_value"), - ByteString.copyFromUtf8("ProtobufDeserializer")) - .build() - - val df = Seq(basicMessage.toByteArray).toDF("value") + val df = Seq(ByteString.empty().toByteArray).toDF("value") val testFileDescriptor = testFile("basicmessage_noimports.desc").replace("file:/", "/") val e = intercept[AnalysisException] {