Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cross build doric from Spark 3.0 to 3.2 #184

Merged
merged 6 commits into from
Mar 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .scalafix.conf
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
OrganizeImports {
blankLines = Auto
coalesceToWildcardImportThreshold = 5
groups = ["habla.", "org.apache.spark.", "*"]
groups = ["doric.", "org.apache.spark.", "*"]
groupedImports = Merge
importSelectorsOrder = Ascii
removeUnused = true
Expand Down
127 changes: 108 additions & 19 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,4 +1,32 @@
import sbt.Compile
import sbt.{Compile, Def}

val sparkDefaultShortVersion = "3.1"
val spark30Version = "3.0.3"
val spark31Version = "3.1.3"
val spark32Version = "3.2.1"

val versionRegex = """^(.*)\.(.*)\.(.*)$""".r

val scala212 = "2.12.15"
val scala213 = "2.13.8"

val sparkShort: String => String = {
case "3.0" => spark30Version
case "3.1" => spark31Version
case "3.2" => spark32Version
}

val sparkLong2ShortVersion: String => String = {
case versionRegex("3", "0", _) => "3.0"
case versionRegex("3", "1", _) => "3.1"
case versionRegex("3", "2", _) => "3.2"
}

val scalaVersionSelect: String => String = {
case versionRegex("3", "0", _) => scala212
case versionRegex("3", "1", _) => scala212
case versionRegex("3", "2", _) => scala212
}

ThisBuild / organization := "org.hablapps"
ThisBuild / homepage := Some(url("https://github.com/hablapps/doric"))
Expand All @@ -13,14 +41,23 @@ ThisBuild / developers := List(
url("https://github.com/alfonsorr")
),
Developer(
"AlfonsoRR",
"eruizalo",
"Eduardo Ruiz",
"",
url("https://github.com/eruizalo")
)
)

Global / scalaVersion := "2.12.15"
val sparkVersion = settingKey[String]("Spark version")
Global / sparkVersion :=
System.getProperty(
"sparkVersion",
sparkShort(
System.getProperty("sparkShortVersion", sparkDefaultShortVersion)
)
)
Global / scalaVersion := scalaVersionSelect(sparkVersion.value)
Global / publish / skip := true
Global / publishArtifact := false

// scaladoc settings
Compile / doc / scalacOptions ++= Seq("-groups")
Expand All @@ -45,20 +82,32 @@ scmInfo := Some(

updateOptions := updateOptions.value.withLatestSnapshots(false)

val sparkVersion = "3.1.3"
val configSpark = Seq(
sparkVersion := System.getProperty(
"sparkVersion",
sparkShort(
System.getProperty("sparkShortVersion", sparkDefaultShortVersion)
)
)
)

lazy val core = project
.in(file("core"))
.settings(
name := "doric",
run / fork := true,
configSpark,
name := "doric_" + sparkLong2ShortVersion(sparkVersion.value),
run / fork := true,
publish / skip := false,
publishArtifact := true,
scalaVersion := scalaVersionSelect(sparkVersion.value),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % "provided",
"org.typelevel" %% "cats-core" % "2.7.0",
"com.lihaoyi" %% "sourcecode" % "0.2.8",
"io.monix" %% "newtypes-core" % "0.2.1",
"com.github.mrpowers" %% "spark-daria" % "1.2.3" % "test",
"com.github.mrpowers" %% "spark-fast-tests" % "1.2.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.11" % "test"
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",
"org.typelevel" %% "cats-core" % "2.7.0",
"com.lihaoyi" %% "sourcecode" % "0.2.8",
"io.monix" %% "newtypes-core" % "0.2.1",
"com.github.mrpowers" %% "spark-daria" % "1.2.3" % "test",
"com.github.mrpowers" %% "spark-fast-tests" % "1.2.0" % "test",
"org.scalatest" %% "scalatest" % "3.2.11" % "test"
),
// docs
run / fork := true,
Expand All @@ -68,23 +117,63 @@ lazy val core = project
"-implicits",
"-skip-packages",
"org.apache.spark"
)
),
Compile / unmanagedSourceDirectories ++= {
sparkVersion.value match {
case versionRegex("3", "0", _) =>
Seq(
(Compile / sourceDirectory)(_ / "spark_3.0_mount" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.0_3.1" / "scala")
).join.value
case versionRegex("3", "1", _) =>
Seq(
(Compile / sourceDirectory)(_ / "spark_3.0_3.1" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.1" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.1_mount" / "scala")
).join.value
case versionRegex("3", "2", _) =>
Seq(
(Compile / sourceDirectory)(_ / "spark_3.1" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.2" / "scala"),
(Compile / sourceDirectory)(_ / "spark_3.2_mount" / "scala")
).join.value
}
},
Test / unmanagedSourceDirectories ++= {
sparkVersion.value match {
case versionRegex("3", "0", _) =>
Seq.empty[Def.Initialize[File]].join.value
case versionRegex("3", "1", _) =>
Seq(
(Test / sourceDirectory)(_ / "spark_3.1" / "scala")
).join.value
case versionRegex("3", "2", _) =>
Seq(
(Test / sourceDirectory)(_ / "spark_3.1" / "scala"),
(Test / sourceDirectory)(_ / "spark_3.2" / "scala")
).join.value
}
}
)

lazy val docs = project
.in(file("docs"))
.dependsOn(core)
.settings(
run / fork := true,
configSpark,
run / fork := true,
publish / skip := true,
publishArtifact := false,
run / javaOptions += "-XX:MaxJavaStackTraceDepth=10",
mdocIn := baseDirectory.value / "docs",
scalaVersion := scalaVersionSelect(sparkVersion.value),
mdocIn := baseDirectory.value / "docs",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion
"org.apache.spark" %% "spark-sql" % sparkVersion.value
),
mdocVariables := Map(
"VERSION" -> version.value,
"STABLE_VERSION" -> "0.0.2",
"SPARK_VERSION" -> sparkVersion
"SPARK_VERSION" -> sparkVersion.value
),
mdocExtraArguments := Seq(
"--clean-target"
Expand Down
76 changes: 9 additions & 67 deletions core/src/main/scala/doric/syntax/AggregationColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package doric
package syntax

import cats.implicits.{catsSyntaxTuple2Semigroupal, toTraverseOps}
import doric.types.{DoubleC, NumericType}
import org.apache.spark.sql.{functions => f}
import doric.types.NumericType

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions.aggregate.Sum

private[syntax] trait AggregationColumns {

Expand Down Expand Up @@ -252,70 +254,6 @@ private[syntax] trait AggregationColumns {
def mean[T: NumericType](col: DoricColumn[T]): DoubleColumn =
col.elem.map(f.mean).toDC

/**
* Aggregate function: returns the approximate `percentile` of the numeric column `col` which
* is the smallest value in the ordered `col` values (sorted from least to greatest) such that
* no more than `percentage` of `col` values is less than the value or equal to that value.
*
* @param percentage each value must be between 0.0 and 1.0.
* @param accuracy controls approximation accuracy at the cost of memory. Higher value of accuracy
* yields better accuracy, 1.0/accuracy is the relative error of the approximation.
* @note Support NumericType, DateType and TimestampType since their internal types are all numeric,
* and can be easily cast to double for processing.
* @group Aggregation DoubleC Type
* @see [[org.apache.spark.sql.functions.percentile_approx]]
*/
def percentileApprox[T: DoubleC](
col: DoricColumn[T],
percentage: Array[Double],
accuracy: Int
): ArrayColumn[T] = {
require(
percentage.forall(x => x >= 0.0 && x <= 1.0),
"Each value of percentage must be between 0.0 and 1.0."
)
require(
accuracy >= 0 && accuracy < Int.MaxValue,
s"The accuracy provided must be a literal between (0, ${Int.MaxValue}]" +
s" (current value = $accuracy)"
)
col.elem
.map(f.percentile_approx(_, f.lit(percentage), f.lit(accuracy)))
.toDC
}

/**
* Aggregate function: returns the approximate `percentile` of the numeric column `col` which
* is the smallest value in the ordered `col` values (sorted from least to greatest) such that
* no more than `percentage` of `col` values is less than the value or equal to that value.
*
* @param percentage must be between 0.0 and 1.0.
* @param accuracy controls approximation accuracy at the cost of memory. Higher value of accuracy
* yields better accuracy, 1.0/accuracy is the relative error of the approximation.
* @note Support NumericType, DateType and TimestampType since their internal types are all numeric,
* and can be easily cast to double for processing.
* @group Aggregation DoubleC Type
* @see [[org.apache.spark.sql.functions.percentile_approx]]
*/
def percentileApprox[T: DoubleC](
col: DoricColumn[T],
percentage: Double,
accuracy: Int
): DoricColumn[T] = {
require(
percentage >= 0.0 && percentage <= 1.0,
"Percentage must be between 0.0 and 1.0."
)
require(
accuracy >= 0 && accuracy < Int.MaxValue,
s"The accuracy provided must be a literal between (0, ${Int.MaxValue}]" +
s" (current value = $accuracy)"
)
col.elem
.map(f.percentile_approx(_, f.lit(percentage), f.lit(accuracy)))
.toDC
}

/**
* Aggregate function: returns the skewness of the values in a group.
*
Expand Down Expand Up @@ -361,7 +299,11 @@ private[syntax] trait AggregationColumns {
def sumDistinct[T](col: DoricColumn[T])(implicit
nt: NumericType[T]
): DoricColumn[nt.Sum] =
col.elem.map(f.sumDistinct).toDC
col.elem
.map(e =>
new Column(Sum(e.expr).toAggregateExpression(isDistinct = true))
)
.toDC

/**
* Aggregate function: alias for `var_samp`.
Expand Down
9 changes: 6 additions & 3 deletions core/src/main/scala/doric/syntax/ArrayColumns.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ package syntax

import cats.implicits._
import doric.types.CollectionType
import org.apache.spark.sql.{Column, Row, functions => f}

import org.apache.spark.sql.{Column, functions => f}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.LambdaFunction.identity

Expand Down Expand Up @@ -469,12 +470,14 @@ private[syntax] trait ArrayColumns {
* end if `start` is negative) with the specified `length`.
*
* @note
* if `start` == 0 an exception will be thrown
* if `start` == 0 an exception will be thrown
* @group Array Type
* @see [[org.apache.spark.sql.functions.slice(x:org\.apache\.spark\.sql\.Column,start:org\.apache\.spark\.sql\.Column,length* org.apache.spark.sql.functions.slice]]
*/
def slice(start: IntegerColumn, length: IntegerColumn): ArrayColumn[T] =
(col.elem, start.elem, length.elem).mapN(f.slice).toDC
(col.elem, start.elem, length.elem)
.mapN((a, b, c) => new Column(Slice(a.expr, b.expr, c.expr)))
.toDC

/**
* Merge two given arrays, element-wise, into a single array using a function.
Expand Down
21 changes: 3 additions & 18 deletions core/src/main/scala/doric/syntax/BinaryColumns.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package doric
package syntax

import cats.implicits.{catsSyntaxTuple2Semigroupal, toTraverseOps}
import cats.implicits.toTraverseOps
import doric.types.{BinaryType, SparkType}
import org.apache.spark.sql.catalyst.expressions.Decode
import org.apache.spark.sql.{Column, functions => f}

import org.apache.spark.sql.{functions => f}

private[syntax] trait BinaryColumns {

Expand Down Expand Up @@ -76,21 +76,6 @@ private[syntax] trait BinaryColumns {
* @see [[org.apache.spark.sql.functions.base64]]
*/
def base64: StringColumn = column.elem.map(f.base64).toDC

/**
* Computes the first argument into a string from a binary using the provided character set
* (one of 'US-ASCII', 'ISO-8859-1', 'UTF-8', 'UTF-16BE', 'UTF-16LE', 'UTF-16').
* If either argument is null, the result will also be null.
*
* @group Binary Type
* @see [[org.apache.spark.sql.functions.decode]]
*/
def decode(charset: StringColumn): StringColumn =
(column.elem, charset.elem)
.mapN((col, char) => {
new Column(Decode(col.expr, char.expr))
})
.toDC
}

}
21 changes: 1 addition & 20 deletions core/src/main/scala/doric/syntax/BooleanColumns.scala
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package doric
package syntax

import cats.implicits._
import doric.DoricColumn.sparkFunction

import org.apache.spark.sql.{functions => f}

private[syntax] trait BooleanColumns {
Expand Down Expand Up @@ -61,24 +61,5 @@ private[syntax] trait BooleanColumns {
*/
def ||(other: DoricColumn[Boolean]): DoricColumn[Boolean] =
or(other)

/**
* Returns null if the condition is true, and throws an exception otherwise.
*
* @throws java.lang.RuntimeException if the condition is false
* @group Boolean Type
* @see [[org.apache.spark.sql.functions.assert_true(c:org\.apache\.spark\.sql\.Column):* org.apache.spark.sql.functions.assert_true]]
*/
def assertTrue: NullColumn = column.elem.map(f.assert_true).toDC

/**
* Returns null if the condition is true; throws an exception with the error message otherwise.
*
* @throws java.lang.RuntimeException if the condition is false
* @group Boolean Type
* @see [[org.apache.spark.sql.functions.assert_true(c:org\.apache\.spark\.sql\.Column,e:* org.apache.spark.sql.functions.assert_true]]
*/
def assertTrue(msg: StringColumn): NullColumn =
(column.elem, msg.elem).mapN(f.assert_true).toDC
}
}
Loading