Skip to content

Commit

Permalink
Doric now always carries the schema in the columns. No more hidden se…
Browse files Browse the repository at this point in the history
…lects. CurrentDate now is resolved in creation.
  • Loading branch information
alfonsorr committed Jun 17, 2022
1 parent fa3c0a5 commit a82b36c
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 30 deletions.
23 changes: 13 additions & 10 deletions core/src/main/scala/doric/DoricColumn.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import doric.types.{LiteralSparkType, SparkType}
import org.apache.spark.sql.{Column, Dataset}
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.types.DataType

sealed trait DoricColumn[T] extends DynamicFieldAccessor[T] {
val elem: Doric[Column]
Expand Down Expand Up @@ -62,30 +63,32 @@ object DoricColumn extends ColGetters[NamedDoricColumn] {
private[doric] def apply[T](dcolumn: Doric[Column]): DoricColumn[T] =
TransformationDoricColumn(dcolumn)

private[doric] def apply[T](f: Dataset[_] => Column): DoricColumn[T] =
Kleisli[DoricValidated, Dataset[_], Column](f(_).valid).toDC

override protected def constructSide[T](
column: Doric[Column],
colName: String
): NamedDoricColumn[T] = NamedDoricColumn(column, colName)

private[doric] def uncheckedTypeAndExistence[T](
col: Column
): DoricColumn[T] = {
Kleisli[DoricValidated, Dataset[_], Column]((_: Dataset[_]) => col.valid)
}.toDC

def apply[T: SparkType](
column: Column
)(implicit location: Location): DoricColumn[T] =
Kleisli[DoricValidated, Dataset[_], Column](df => {
try {
val head = df.select(column).schema.head
if (SparkType[T].isEqual(head.dataType))
val dataType: DataType =
try {
column.expr.dataType
} catch {
case _: Throwable => df.select(column).schema.head.dataType
}
if (SparkType[T].isEqual(dataType))
Validated.valid(column)
else
ColumnTypeError(
head.name,
df.select(column).schema.head.name,
SparkType[T].dataType,
head.dataType
dataType
).invalidNec
} catch {
case e: Throwable => SparkErrorWrapper(e).invalidNec
Expand Down
10 changes: 7 additions & 3 deletions core/src/main/scala/doric/syntax/DateColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import doric.types.{DateType, SparkType}
import java.sql.Date

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions.{AddMonths, DateAdd, DateFormatClass, DateSub, MonthsBetween, NextDay, TruncDate, TruncTimestamp}
import org.apache.spark.sql.catalyst.expressions.{AddMonths, CurrentDate, DateAdd, DateFormatClass, DateSub, MonthsBetween, NextDay, TruncDate, TruncTimestamp}

private[syntax] trait DateColumns {

Expand All @@ -17,7 +17,7 @@ private[syntax] trait DateColumns {
* @group Date Type
* @see [[org.apache.spark.sql.functions.current_date]]
*/
def currentDate(): DateColumn = f.current_date().asDoric[Date]
def currentDate(): DateColumn = currentDateT[Date]()

/**
* Returns the current date at the start of query evaluation as a date column typed with the provided T.
Expand All @@ -27,7 +27,11 @@ private[syntax] trait DateColumns {
* @see [[org.apache.spark.sql.functions.current_date]]
*/
def currentDateT[T: DateType: SparkType](): DoricColumn[T] =
f.current_date().asDoric[T]
DoricColumn({ df =>
new Column(
CurrentDate(df.sparkSession.sessionState.conf.sessionLocalTimeZone.some)
)
})

implicit class DateColumnLikeSyntax[T: DateType: SparkType](
column: DoricColumn[T]
Expand Down
15 changes: 12 additions & 3 deletions core/src/main/scala/doric/syntax/TimestampColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@ package doric
package syntax

import cats.implicits._
import doric.types.TimestampType
import java.sql.Timestamp
import doric.types.{SparkType, TimestampType}

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions.{FromUTCTimestamp, ToUTCTimestamp}
Expand All @@ -18,7 +17,17 @@ private[syntax] trait TimestampColumns {
* @see [[org.apache.spark.sql.functions.current_timestamp]]
*/
def currentTimestamp(): TimestampColumn =
f.current_timestamp().asDoric[Timestamp]
currentTimestampT()

/**
* Returns the current timestamp at the start of query evaluation as a timestamp column.
* All calls of current_timestamp within the same query return the same value.
*
* @group Timestamp Type
* @see [[org.apache.spark.sql.functions.current_timestamp]]
*/
def currentTimestampT[T: TimestampType: SparkType](): DoricColumn[T] =
f.current_timestamp().asDoric[T]

implicit class TimestampColumnLikeSyntax[T: TimestampType](
column: DoricColumn[T]
Expand Down
14 changes: 0 additions & 14 deletions core/src/test/scala/doric/DoricColumnSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package doric
import doric.sem.{DoricMultiError, SparkErrorWrapper}
import doric.syntax.User
import doric.types.SparkType

import java.sql.{Date, Timestamp}
import org.scalatest.EitherValues

Expand Down Expand Up @@ -96,19 +95,6 @@ class DoricColumnSpec extends DoricTestElements with EitherValues {
describe("DoricColumn") {
val df = Seq("val1", "val2").toDF("myColumn")

it("should create a column using uncheckedTypeAndExistence") {
val unchecked = DoricColumn.uncheckedTypeAndExistence(f.col("myColumn"))

unchecked.elem.run(df).toEither shouldBe Right(f.col("myColumn"))
}

it("should not fail creating a column using uncheckedTypeAndExistence") {
val unchecked =
DoricColumn.uncheckedTypeAndExistence(f.col("nonExistentCol"))

unchecked.elem.run(df).toEither shouldBe Right(f.col("nonExistentCol"))
}

it("should create an uncheckedType") {
val dCol: DoricColumn[_] = DoricColumn.uncheckedType(f.col("myColumn"))

Expand Down

0 comments on commit a82b36c

Please sign in to comment.