Skip to content

Commit

Permalink
Date time functions (#355)
Browse files Browse the repository at this point in the history
  • Loading branch information
kujon authored and imarios committed Jan 3, 2019
1 parent 59df2a8 commit 68667d5
Show file tree
Hide file tree
Showing 3 changed files with 231 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -707,6 +707,10 @@ trait NonAggregateFunctions {
def upper[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, String] =
str.typed(sparkFunctions.upper(str.untyped))

//////////////////////////////////////////////////////////////////////////////////////////////
// DateTime functions
//////////////////////////////////////////////////////////////////////////////////////////////

/** Non-Aggregate function: Extracts the year as an integer from a given date/timestamp/string.
*
* Differs from `Column#year` by wrapping it's result into an `Option`.
Expand All @@ -715,4 +719,85 @@ trait NonAggregateFunctions {
*/
def year[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.year(str.untyped))

/** Non-Aggregate function: Extracts the quarter as an integer from a given date/timestamp/string.
*
* Differs from `Column#quarter` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def quarter[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.quarter(str.untyped))

/** Non-Aggregate function Extracts the month as an integer from a given date/timestamp/string.
*
* Differs from `Column#month` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def month[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.month(str.untyped))

/** Non-Aggregate function: Extracts the day of the week as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofweek` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def dayofweek[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.dayofweek(str.untyped))

/** Non-Aggregate function: Extracts the day of the month as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofmonth` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def dayofmonth[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.dayofmonth(str.untyped))

/** Non-Aggregate function: Extracts the day of the year as an integer from a given date/timestamp/string.
*
* Differs from `Column#dayofyear` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def dayofyear[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.dayofyear(str.untyped))

/** Non-Aggregate function: Extracts the hours as an integer from a given date/timestamp/string.
*
* Differs from `Column#hour` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def hour[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.hour(str.untyped))

/** Non-Aggregate function: Extracts the minutes as an integer from a given date/timestamp/string.
*
* Differs from `Column#minute` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def minute[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.minute(str.untyped))

/** Non-Aggregate function: Extracts the seconds as an integer from a given date/timestamp/string.
*
* Differs from `Column#second` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def second[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.second(str.untyped))

/** Non-Aggregate function: Extracts the week number as an integer from a given date/timestamp/string.
*
* Differs from `Column#weekofyear` by wrapping it's result into an `Option`.
*
* apache/spark
*/
def weekofyear[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] =
str.typed(sparkFunctions.weekofyear(str.untyped))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package frameless.functions

import org.apache.spark.sql.Row

object DateTimeStringBehaviourUtils {
val nullHandler: Row => Option[Int] = _.get(0) match {
case i: Int => Some(i)
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import java.io.File

import frameless.functions.nonAggregate._
import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{Column, Encoder, Row, SaveMode, functions => sparkFunctions}
import org.apache.spark.sql.{Column, Encoder, SaveMode, functions => sparkFunctions}
import org.scalacheck.Prop._
import org.scalacheck.{Arbitrary, Gen, Prop}

Expand Down Expand Up @@ -2206,32 +2206,151 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite {
check(forAll(prop[Option[Boolean], Long] _))
}

def dateTimeStringProp(typedDS: TypedDataset[X1[String]])
(typedCol: TypedColumn[X1[String], Option[Int]], sparkFunc: Column => Column): Prop = {
val spark = session
import spark.implicits._

val sparkResult = typedDS.dataset
.select(sparkFunc($"a"))
.map(DateTimeStringBehaviourUtils.nullHandler)
.collect()
.toList

val typed = typedDS
.select(typedCol)
.collect()
.run()
.toList

typed ?= sparkResult
}

test("year") {
val spark = session
import spark.implicits._

val nullHandler: Row => Option[Int] = _.get(0) match {
case i: Int => Some(i)
case _ => None
def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(year(ds[String]('a)), sparkFunctions.year)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("quarter") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(quarter(ds[String]('a)), sparkFunctions.quarter)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("month") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(month(ds[String]('a)), sparkFunctions.month)
}

val sparkResult = ds.toDF()
.select(sparkFunctions.year($"a"))
.map(nullHandler)
.collect()
.toList
check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

val typed = ds
.select(year(ds[String]('a)))
.collect()
.run()
.toList
test("dayofweek") {
val spark = session
import spark.implicits._

typed ?= sparkResult
}
def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(dayofweek(ds[String]('a)), sparkFunctions.dayofweek)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("dayofmonth") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(dayofmonth(ds[String]('a)), sparkFunctions.dayofmonth)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("dayofyear") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(dayofyear(ds[String]('a)), sparkFunctions.dayofyear)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("hour") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(hour(ds[String]('a)), sparkFunctions.hour)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("minute") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(minute(ds[String]('a)), sparkFunctions.minute)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("second") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(second(ds[String]('a)), sparkFunctions.second)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
}

test("weekofyear") {
val spark = session
import spark.implicits._

def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = {
val ds = TypedDataset.create(data)
dateTimeStringProp(ds)(weekofyear(ds[String]('a)), sparkFunctions.weekofyear)
}

check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply))))
check(forAll(prop _))
Expand Down

0 comments on commit 68667d5

Please sign in to comment.