-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
resolve #427- Spark 3 support #433
Changes from all commits
23d93f9
fe4798c
0d27d43
dd4eb67
af34ebf
fdccfb8
23868dc
f1e6b72
a1d006b
729c56d
a1a7d5b
7896e8a
f39f550
c5a7078
918ca8c
9e0de29
69ae591
4713716
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,48 +1,46 @@ | ||
package frameless | ||
|
||
import org.apache.spark.sql.Encoder | ||
import org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal | ||
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder | ||
import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If, Literal} | ||
import org.apache.spark.sql.catalyst.expressions.{BoundReference, CreateNamedStruct, If} | ||
import org.apache.spark.sql.types.StructType | ||
|
||
object TypedExpressionEncoder { | ||
|
||
/** In Spark, DataFrame has always schema of StructType | ||
* | ||
* DataFrames of primitive types become records with a single field called "_1". | ||
* DataFrames of primitive types become records with a single field called "value" set in ExpressionEncoder. | ||
*/ | ||
def targetStructType[A](encoder: TypedEncoder[A]): StructType = { | ||
encoder.catalystRepr match { | ||
case x: StructType => | ||
if (encoder.nullable) StructType(x.fields.map(_.copy(nullable = true))) | ||
else x | ||
case dt => new StructType().add("_1", dt, nullable = encoder.nullable) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I should have seen this before my last comment. Was there a reason going back to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per ExpressionEncoder it's no longer possible to inject this schema, so this reflects the "correct", if breaking, change for any other code that uses it. |
||
case dt => new StructType().add("value", dt, nullable = encoder.nullable) | ||
} | ||
} | ||
|
||
def apply[T: TypedEncoder]: ExpressionEncoder[T] = { | ||
def apply[T: TypedEncoder]: Encoder[T] = { | ||
val encoder = TypedEncoder[T] | ||
val schema = targetStructType(encoder) | ||
|
||
val in = BoundReference(0, encoder.jvmRepr, encoder.nullable) | ||
|
||
val (out, toRowExpressions) = encoder.toCatalyst(in) match { | ||
case If(_, _, x: CreateNamedStruct) => | ||
val out = BoundReference(0, encoder.catalystRepr, encoder.nullable) | ||
val (out, serializer) = encoder.toCatalyst(in) match { | ||
case it @ If(_, _, _: CreateNamedStruct) => | ||
val out = GetColumnByOrdinal(0, encoder.catalystRepr) | ||
|
||
(out, x.flatten) | ||
(out, it) | ||
case other => | ||
val out = GetColumnByOrdinal(0, encoder.catalystRepr) | ||
|
||
(out, CreateNamedStruct(Literal("_1") :: other :: Nil).flatten) | ||
(out, other) | ||
} | ||
|
||
new ExpressionEncoder[T]( | ||
schema = schema, | ||
flat = false, | ||
serializer = toRowExpressions, | ||
deserializer = encoder.fromCatalyst(out), | ||
objSerializer = serializer, | ||
objDeserializer = encoder.fromCatalyst(out), | ||
clsTag = encoder.classTag | ||
) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@chris-twiner was this part of the code causing issues? I am trying to think if this has any unexpected side effects. I am already seeing some difference with the previous version here:
In frameless 0.8
In new PR
At least the field name seems to be printed differently (was
_1
before but not it'svalue
).There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is it's not set by frameless code, rather by the expression encoder. You can't inject the schema any more.
But you are right its probably breaking - I've updated the readme to mention this.