Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add ArrayType and explode function #25

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/main/api/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ export org.virtuslab.iskra.$
export org.virtuslab.iskra.{Column, DataFrame, UntypedColumn, UntypedDataFrame, :=, /}

object functions:
export org.virtuslab.iskra.functions.{lit, when}
export org.virtuslab.iskra.functions.{explode, lit, when}
export org.virtuslab.iskra.functions.Aggregates.*

export org.apache.spark.sql.SparkSession
8 changes: 8 additions & 0 deletions src/main/functions/explode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.virtuslab.iskra.functions

import org.apache.spark.sql
import org.virtuslab.iskra.Column
import org.virtuslab.iskra.types.{ ArrayOptType, DataType }

def explode[T <: DataType](c: Column[ArrayOptType[T]]): Column[T] = Column(sql.functions.explode(c.untyped))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should have a way to prevent users from using explode more than once in the same select clause as that would result in a runtime error. This constraint doesn't seem to be easy to express in the current model of iskra. However I'm in the middle of a major redesign of the library's model so I'll try to take this use case into account


5 changes: 5 additions & 0 deletions src/main/types/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ object DataType:
case FloatOptType => FloatOptType
case DoubleOptType => DoubleOptType
case StructOptType[schema] => StructOptType[schema]
case ArrayOptType[schema] => ArrayOptType[schema]

type NonNullable[T <: DataType] <: DataType = T match
case BooleanOptType => BooleanType
Expand All @@ -32,6 +33,7 @@ object DataType:
case FloatOptType => FloatOptType
case DoubleOptType => DoubleOptType
case StructOptType[schema] => StructOptType[schema]
case ArrayOptType[schema] => ArrayType[schema]

type CommonNumericNullableType[T1 <: DataType, T2 <: DataType] <: NumericOptType = (T1, T2) match
case (DoubleOptType, _) | (_, DoubleOptType) => DoubleOptType
Expand Down Expand Up @@ -77,3 +79,6 @@ final class DoubleType extends DoubleOptType, NotNull

sealed class StructOptType[Schema <: Tuple] extends DataType
final class StructType[Schema <: Tuple] extends StructOptType[Schema], NotNull

sealed class ArrayOptType[T <: DataType] extends DataType
final class ArrayType[T <: DataType] extends ArrayOptType[T], NotNull
8 changes: 8 additions & 0 deletions src/main/types/Encoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ object Encoder:
type ColumnType = DoubleOptType
def catalystType = sql.types.DoubleType

inline given arrayFromMirror[A](using encoder: Encoder[A]): (Encoder[Seq[A]] { type ColumnType = ArrayOptType[encoder.ColumnType] }) =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding basic support for arrays is something that probably deserves a separate PR on its own. As it's slightly more complex: we should reuse encoders of element types and support both nullable and nonnullable arrays. I have some drafts of the implementation mixed with other changes locally but I'll try to extract it and get merged to main

new Encoder[Seq[A]]:
override type ColumnType = ArrayOptType[encoder.ColumnType]
override def encode(value: Seq[A]): Any = if (value == null) Seq() else value.map(encoder.encode)
override def decode(value: Any): Any = Seq(encoder.decode)
override def catalystType = sql.types.ArrayType(encoder.catalystType)
override def isNullable = true

export StructEncoder.{fromMirror, optFromMirror}

trait StructEncoder[-A] extends Encoder[A]:
Expand Down
24 changes: 24 additions & 0 deletions src/test/ExplodeTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package org.virtuslab.iskra.test

class ExplodeTest extends SparkUnitTest:
import org.virtuslab.iskra.api.*
import functions.explode

case class Foo(ints: Seq[Int])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should consider more cases here: Not only Seq[Int] but also Seq[Option[Int]], Option[Seq[Int]] and Option[Seq[Option[Int]]] and check how these should behave at compile time and at runtime


val foos = Seq(
Foo(Seq(1)),
Foo(Seq(2)),
Foo(Seq()),
Foo(null),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For maximal type safety optional values should be represented by Option[...]. TBH I haven't thought about how to prevent users from using nulls explicitly yet. Maybe -Yexplicit-nulls could come to the rescue. Alternatively we could have some runtime assertions performed when toTypedDF is called`. However both these things would probably have to be opt-in

Foo(Seq(3,4))
).toTypedDF

test("explode") {
val result = foos
.select(explode($.ints).as("int"))
.collectAs[Int]

result shouldEqual Seq(1,2,3,4)
}