Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] [SQL] [PROTO] : Protobuf support for Spark - from_proto AND to_proto #3

Closed
wants to merge 3 commits into from

Conversation

SandishKumarHN
Copy link
Owner

From SandishKumarHN(sanysandish@gmail.com) and Mohan Parthasarathy(mposdev21@gmail.com)

Introduction

Protocol buffers are Google's language-neutral, platform-neutral, extensible mechanism for serializing structured data. It is widely used in Kafka-based data pipelines. Unlike Avro, Spark does not have native support for protobuf. This PR provides two new functions from_proto/to_proto to read and write Protobuf data within a data frame.

The implementation is closely modeled after Avro implementation so that it is easy to understand and review the changes.

Following is an example of a typical usage.

// `from_proto` requires absolute path of Protobuf schema file
// and the protobuf message within the file
val userProtoFile = "./examples/src/main/resources/user.desc"
val userProtoMsg = "User"

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("subscribe", "proto-topic-in")
  .load()

// 1. Decode the Protobuf data into a struct;
// 2. Filter by column `favorite_color`;
// 3. Encode the column `name` in Protobuf format.
val output = df
  .select(from_proto('value, userProtoFile, userProtoMsg) as 'user)
  .where("user.favorite_color == \"red\"")
  .select(to_proto($"user.name") as 'value)

val query = output
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")
  .option("topic", "proto-topic-out")
  .start()

The new functions are very similar to Avro

  • from_proto requires the proto descriptor file and the message type within that file which is similar to from_avro requiring the JSON schema.
  • to_proto is similar to to_avro and does not require the proto descriptor file as it can build the schema (protobuf descriptor) from the catalyst types. Similarly, to_proto (like to_avro) can also take in the descriptor file for describing the schema

What is supported

  • Protobuf format proto3 is supported ( Even though proto2 and proto3 is inter-operable, we have explicitly tested only with proto3)
  • Google protobuf supported types
    • Scalar value types
    • Enumerations
    • Message types as field types
    • Nested Messages
    • Maps
    • Unknown fields are well-formed protocol buffer serialized data representing fields that the parser does not recognize. Original version of proto3 did not include this when there are parsing problems. This feature is needed to detect schemas that does not match the message type and needed to support FAIL_SAFE and PERMISSIVE mode. This feature is available in proto3 with version. 3.5 onwards

What is not supported

  • Any requires the knowledge of the underlying object type when deserializing the message and generally not considered type safe
  • OneOf requires the knowledge of the object type that was encoded when deserializing the message
  • Custom Options is an advanced feature within protobuf where the users can define their own options
  • Catalyst types that are not natively supported in protobuf. This happens normally during serialization and an exception will be thrown when following types are encountered
    • DecimalType
    • DateType
    • TimestampType

Test cases covered

Tests have been written to test at different levels

  • from_proto / to_proto (ProtoFunctionSuite)
  • ProtoToCatalyst / CatalystToProto (ProtoCatalystDataConversionSuite)
  • ProtoDeserializer / ProtoSerializer (ProtoSerdeSuite)

ProtoFunctionSuite

A bunch of roundtrip tests that go through to_proto(from_proto) or from_proto(to_proto) and compare the results. It also repeats some of the tests where to_proto is called without a descriptor file where the protobuf descriptor is built from the catalyst types.

  • roundtrip in to_proto and from_proto for struct for protobuf scalar types
  • roundtrip in to_proto(without descriptor params) and from_proto - struct for protobuf scalar types
  • roundtrip in from_proto and to_proto - Repeated protobuf types
  • roundtrip in from_proto and to_proto - Repeated Message Once
  • roundtrip in from_proto and to_proto(without descriptor params) - Repeated Message Once”
  • roundtrip in from_proto and to_proto - Repeated Message Twice
  • roundtrip in from_proto and to_proto(without descriptor params) - Repeated Message Twice
  • roundtrip in from_proto and to_proto - Map
  • roundtrip in from_proto and to_proto(without descriptor params) - Map
  • roundtrip in from_proto and to_proto - Enum
  • roundtrip in from_proto and to_proto - Multiple Message
  • roundtrip in from_proto and to_proto(without descriptor params) - Multiple Message
  • roundtrip in to_proto and from_proto - with null

ProtoSerdeSuite

  • Test basic conversion - serialize(deserialize(message)) == message
  • Fail to convert with field type mismatch - Make sure the right exception is thrown for incompatible schema for serializer and deserializer
  • Fail to convert with missing nested Proto fields
  • Fail to convert with deeply nested field type mismatch
  • Fail to convert with missing Catalyst fields

ProtoCatalystDataConversionSuite

  • ProtoToCatalyst(to_proto(basic_catalyst_types )): Boolean,Integer,Double,Float,Binary,String,Byte,Shost
  • Handle unsupported input of Message type: Serialize a message first and deserialize using a bad schema. Test with FAILFAST to get an exception and PERMISSIVE to get a null row
  • filter push-down to proto deserializer: Filtering the rows based on the filter during proto deserialization
  • Test ProtoDeserializer with binary message type

@SandishKumarHN SandishKumarHN changed the title [WIP] [SQL] [INFRS] : Protobuf support for Spark - from_proto AND to_proto [WIP] [SQL] [PROTO] : Protobuf support for Spark - from_proto AND to_proto Sep 25, 2022
@SandishKumarHN SandishKumarHN force-pushed the PROTOBUF_SUPPORT branch 4 times, most recently from e5964f6 to 39b7bc9 Compare September 27, 2022 08:38
SandishKumarHN pushed a commit that referenced this pull request Oct 18, 2022
…ly equivalent children in `RewriteDistinctAggregates`

### What changes were proposed in this pull request?

In `RewriteDistinctAggregates`, when grouping aggregate expressions by function children, treat children that are semantically equivalent as the same.

### Why are the changes needed?

This PR will reduce the number of projections in the Expand operator when there are multiple distinct aggregations with superficially different children. In some cases, it will eliminate the need for an Expand operator.

Example: In the following query, the Expand operator creates 3\*n rows (where n is the number of incoming rows) because it has a projection for each of function children `b + 1`, `1 + b` and `c`.

```
create or replace temp view v1 as
select * from values
(1, 2, 3.0),
(1, 3, 4.0),
(2, 4, 2.5),
(2, 3, 1.0)
v1(a, b, c);

select
  a,
  count(distinct b + 1),
  avg(distinct 1 + b) filter (where c > 0),
  sum(c)
from
  v1
group by a;
```
The Expand operator has three projections (each producing a row for each incoming row):
```
[a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation)
[a#87, (b#88 + 1), null, 1, null, null],          <== projection #2 (for distinct aggregation of b + 1)
[a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)
```
In reality, the Expand only needs one projection for `1 + b` and `b + 1`, because they are semantically equivalent.

With the proposed change, the Expand operator's projections look like this:
```
[a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular aggregations)
[a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct aggregation on b + 1 and 1 + b)
```
With one less projection, Expand produces 2\*n rows instead of 3\*n rows, but still produces the correct result.

In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.

Benchmark code in the JIRA (SPARK-40382).

Before the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                       14721          14859         195          5.7         175.5       1.0X
some semantically equivalent                      14569          14572           5          5.8         173.7       1.0X
none semantically equivalent                      14408          14488         113          5.8         171.8       1.0X
```
After the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                        3658           3692          49         22.9          43.6       1.0X
some semantically equivalent                       9124           9214         127          9.2         108.8       0.4X
none semantically equivalent                      14601          14777         250          5.7         174.1       0.3X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests.

Closes apache#37825 from bersprockets/rewritedistinct_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
SandishKumarHN pushed a commit that referenced this pull request Dec 12, 2022
…ly equivalent children in `RewriteDistinctAggregates`

### What changes were proposed in this pull request?

In `RewriteDistinctAggregates`, when grouping aggregate expressions by function children, treat children that are semantically equivalent as the same.

### Why are the changes needed?

This PR will reduce the number of projections in the Expand operator when there are multiple distinct aggregations with superficially different children. In some cases, it will eliminate the need for an Expand operator.

Example: In the following query, the Expand operator creates 3\*n rows (where n is the number of incoming rows) because it has a projection for each of function children `b + 1`, `1 + b` and `c`.

```
create or replace temp view v1 as
select * from values
(1, 2, 3.0),
(1, 3, 4.0),
(2, 4, 2.5),
(2, 3, 1.0)
v1(a, b, c);

select
  a,
  count(distinct b + 1),
  avg(distinct 1 + b) filter (where c > 0),
  sum(c)
from
  v1
group by a;
```
The Expand operator has three projections (each producing a row for each incoming row):
```
[a#87, null, null, 0, null, UnscaledValue(c#89)], <== projection #1 (for regular aggregation)
[a#87, (b#88 + 1), null, 1, null, null],          <== projection #2 (for distinct aggregation of b + 1)
[a#87, null, (1 + b#88), 2, (c#89 > 0.0), null]], <== projection #3 (for distinct aggregation of 1 + b)
```
In reality, the Expand only needs one projection for `1 + b` and `b + 1`, because they are semantically equivalent.

With the proposed change, the Expand operator's projections look like this:
```
[a#67, null, 0, null, UnscaledValue(c#69)],  <== projection #1 (for regular aggregations)
[a#67, (b#68 + 1), 1, (c#69 > 0.0), null]],  <== projection #2 (for distinct aggregation on b + 1 and 1 + b)
```
With one less projection, Expand produces 2\*n rows instead of 3\*n rows, but still produces the correct result.

In the case where all distinct aggregates have semantically equivalent children, the Expand operator is not needed at all.

Benchmark code in the JIRA (SPARK-40382).

Before the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                       14721          14859         195          5.7         175.5       1.0X
some semantically equivalent                      14569          14572           5          5.8         173.7       1.0X
none semantically equivalent                      14408          14488         113          5.8         171.8       1.0X
```
After the PR:
```
distinct aggregates:                      Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
all semantically equivalent                        3658           3692          49         22.9          43.6       1.0X
some semantically equivalent                       9124           9214         127          9.2         108.8       0.4X
none semantically equivalent                      14601          14777         250          5.7         174.1       0.3X
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New unit tests.

Closes apache#37825 from bersprockets/rewritedistinct_issue.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
SandishKumarHN pushed a commit that referenced this pull request Apr 28, 2023
…edExpression()

### What changes were proposed in this pull request?

In `EquivalentExpressions.addExpr()`, add a guard `supportedExpression()` to make it consistent with `addExprTree()` and `getExprState()`.

### Why are the changes needed?

This fixes a regression caused by apache#39010 which added the `supportedExpression()` to `addExprTree()` and `getExprState()` but not `addExpr()`.

One example of a use case affected by the inconsistency is the `PhysicalAggregation` pattern in physical planning. There, it calls `addExpr()` to deduplicate the aggregate expressions, and then calls `getExprState()` to deduplicate the result expressions. Guarding inconsistently will cause the aggregate and result expressions go out of sync, eventually resulting in query execution error (or whole-stage codegen error).

### Does this PR introduce _any_ user-facing change?

This fixes a regression affecting Spark 3.3.2+, where it may manifest as an error running aggregate operators with higher-order functions.

Example running the SQL command:
```sql
select max(transform(array(id), x -> x)), max(transform(array(id), x -> x)) from range(2)
```
example error message before the fix:
```
java.lang.IllegalStateException: Couldn't find max(transform(array(id#0L), lambdafunction(lambda x#2L, lambda x#2L, false)))#4 in [max(transform(array(id#0L), lambdafunction(lambda x#1L, lambda x#1L, false)))#3]
```
after the fix this error is gone.

### How was this patch tested?

Added new test cases to `SubexpressionEliminationSuite` for the immediate issue, and to `DataFrameAggregateSuite` for an example of user-visible symptom.

Closes apache#40473 from rednaxelafx/spark-42851.

Authored-by: Kris Mok <kris.mok@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant