diff --git a/dataset/src/main/scala/frameless/functions/NonAggregateFunctions.scala b/dataset/src/main/scala/frameless/functions/NonAggregateFunctions.scala index 451f63ad..be5c66e8 100644 --- a/dataset/src/main/scala/frameless/functions/NonAggregateFunctions.scala +++ b/dataset/src/main/scala/frameless/functions/NonAggregateFunctions.scala @@ -707,6 +707,10 @@ trait NonAggregateFunctions { def upper[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, String] = str.typed(sparkFunctions.upper(str.untyped)) + ////////////////////////////////////////////////////////////////////////////////////////////// + // DateTime functions + ////////////////////////////////////////////////////////////////////////////////////////////// + /** Non-Aggregate function: Extracts the year as an integer from a given date/timestamp/string. * * Differs from `Column#year` by wrapping it's result into an `Option`. @@ -715,4 +719,85 @@ trait NonAggregateFunctions { */ def year[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = str.typed(sparkFunctions.year(str.untyped)) + + /** Non-Aggregate function: Extracts the quarter as an integer from a given date/timestamp/string. + * + * Differs from `Column#quarter` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def quarter[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.quarter(str.untyped)) + + /** Non-Aggregate function Extracts the month as an integer from a given date/timestamp/string. + * + * Differs from `Column#month` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def month[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.month(str.untyped)) + + /** Non-Aggregate function: Extracts the day of the week as an integer from a given date/timestamp/string. + * + * Differs from `Column#dayofweek` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def dayofweek[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.dayofweek(str.untyped)) + + /** Non-Aggregate function: Extracts the day of the month as an integer from a given date/timestamp/string. + * + * Differs from `Column#dayofmonth` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def dayofmonth[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.dayofmonth(str.untyped)) + + /** Non-Aggregate function: Extracts the day of the year as an integer from a given date/timestamp/string. + * + * Differs from `Column#dayofyear` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def dayofyear[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.dayofyear(str.untyped)) + + /** Non-Aggregate function: Extracts the hours as an integer from a given date/timestamp/string. + * + * Differs from `Column#hour` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def hour[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.hour(str.untyped)) + + /** Non-Aggregate function: Extracts the minutes as an integer from a given date/timestamp/string. + * + * Differs from `Column#minute` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def minute[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.minute(str.untyped)) + + /** Non-Aggregate function: Extracts the seconds as an integer from a given date/timestamp/string. + * + * Differs from `Column#second` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def second[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.second(str.untyped)) + + /** Non-Aggregate function: Extracts the week number as an integer from a given date/timestamp/string. + * + * Differs from `Column#weekofyear` by wrapping it's result into an `Option`. + * + * apache/spark + */ + def weekofyear[T](str: AbstractTypedColumn[T, String]): str.ThisType[T, Option[Int]] = + str.typed(sparkFunctions.weekofyear(str.untyped)) } diff --git a/dataset/src/test/scala/frameless/functions/DateTimeStringBehaviourUtils.scala b/dataset/src/test/scala/frameless/functions/DateTimeStringBehaviourUtils.scala new file mode 100644 index 00000000..e22fe433 --- /dev/null +++ b/dataset/src/test/scala/frameless/functions/DateTimeStringBehaviourUtils.scala @@ -0,0 +1,10 @@ +package frameless.functions + +import org.apache.spark.sql.Row + +object DateTimeStringBehaviourUtils { + val nullHandler: Row => Option[Int] = _.get(0) match { + case i: Int => Some(i) + case _ => None + } +} diff --git a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala index d4e4c15a..f47891ef 100644 --- a/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala +++ b/dataset/src/test/scala/frameless/functions/NonAggregateFunctionsTests.scala @@ -5,7 +5,7 @@ import java.io.File import frameless.functions.nonAggregate._ import org.apache.commons.io.FileUtils -import org.apache.spark.sql.{Column, Encoder, Row, SaveMode, functions => sparkFunctions} +import org.apache.spark.sql.{Column, Encoder, SaveMode, functions => sparkFunctions} import org.scalacheck.Prop._ import org.scalacheck.{Arbitrary, Gen, Prop} @@ -2206,32 +2206,151 @@ class NonAggregateFunctionsTests extends TypedDatasetSuite { check(forAll(prop[Option[Boolean], Long] _)) } + def dateTimeStringProp(typedDS: TypedDataset[X1[String]]) + (typedCol: TypedColumn[X1[String], Option[Int]], sparkFunc: Column => Column): Prop = { + val spark = session + import spark.implicits._ + + val sparkResult = typedDS.dataset + .select(sparkFunc($"a")) + .map(DateTimeStringBehaviourUtils.nullHandler) + .collect() + .toList + + val typed = typedDS + .select(typedCol) + .collect() + .run() + .toList + + typed ?= sparkResult + } + test("year") { val spark = session import spark.implicits._ - val nullHandler: Row => Option[Int] = _.get(0) match { - case i: Int => Some(i) - case _ => None + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(year(ds[String]('a)), sparkFunctions.year) + } + + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("quarter") { + val spark = session + import spark.implicits._ + + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(quarter(ds[String]('a)), sparkFunctions.quarter) } + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("month") { + val spark = session + import spark.implicits._ + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { - val ds = TypedDataset.create(data) + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(month(ds[String]('a)), sparkFunctions.month) + } - val sparkResult = ds.toDF() - .select(sparkFunctions.year($"a")) - .map(nullHandler) - .collect() - .toList + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } - val typed = ds - .select(year(ds[String]('a))) - .collect() - .run() - .toList + test("dayofweek") { + val spark = session + import spark.implicits._ - typed ?= sparkResult - } + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(dayofweek(ds[String]('a)), sparkFunctions.dayofweek) + } + + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("dayofmonth") { + val spark = session + import spark.implicits._ + + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(dayofmonth(ds[String]('a)), sparkFunctions.dayofmonth) + } + + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("dayofyear") { + val spark = session + import spark.implicits._ + + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(dayofyear(ds[String]('a)), sparkFunctions.dayofyear) + } + + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("hour") { + val spark = session + import spark.implicits._ + + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(hour(ds[String]('a)), sparkFunctions.hour) + } + + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("minute") { + val spark = session + import spark.implicits._ + + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(minute(ds[String]('a)), sparkFunctions.minute) + } + + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("second") { + val spark = session + import spark.implicits._ + + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(second(ds[String]('a)), sparkFunctions.second) + } + + check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) + check(forAll(prop _)) + } + + test("weekofyear") { + val spark = session + import spark.implicits._ + + def prop(data: List[X1[String]])(implicit E: Encoder[Option[Int]]): Prop = { + val ds = TypedDataset.create(data) + dateTimeStringProp(ds)(weekofyear(ds[String]('a)), sparkFunctions.weekofyear) + } check(forAll(dateTimeStringGen)(data => prop(data.map(X1.apply)))) check(forAll(prop _))