Skip to content

Commit

Permalink
Spark 3.4.0 and DBR 12.2 support (#699)
Browse files Browse the repository at this point in the history
* #698 - build changes to see 3.4 errors

* #698 - copied over dataTypeFor impl and empty ordering 'works'

* #698 - split out mapgroups impl

* #698 - regen workflows

* #698 - cross build working

* #698 - bump to rebuild as 3.4 is out

* #698 - regenerated

* (cherry picked from commit 31c8db5)

* #698 - regenerated has windows path for mdocs

* #698 - work with new 3.4, needs cross testing

* #698 - kmeans only works locally, extra info for CI builds

* #698 - tidy up before pull

* #698 - fix rlike test, removing unused code from spark c+p, arrays are derived in TypedEncoder so no need for that either

* #698 - jumped the gun on mima version checks

* #698 - jumped the gun on mima version checks - with 'column' method ignore

* #698 - should also be current version

* #698 - clearly the wrong thing to do, reverting

* #698 - tell mima to look for a different name

* #698 - tell mima to look for a different name

* #698 - retry x times if the array error occurs, even more rare for the avg error

* #698 - regex is a, not b...

* #698 - refactor to be minimal change at import only, other than tests, and attempt to remove that from coverage

* #698 - force testing of code gen on DisambiguateLeft/Right

* build.sbt cleanup

* regexp code cleanup

---------

Co-authored-by: Grigory Pomadchin <grigory.pomadchin@disneystreaming.com>
  • Loading branch information
chris-twiner and pomadchin authored Jun 2, 2023
1 parent b75cf4d commit f1fcef1
Show file tree
Hide file tree
Showing 17 changed files with 343 additions and 109 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ jobs:
os: [ubuntu-latest]
scala: [2.13.10, 2.12.16]
java: [temurin@8]
project: [root-spark31, root-spark32, root-spark33]
project: [root-spark32, root-spark33, root-spark34]
exclude:
- scala: 2.13.10
project: root-spark31
- scala: 2.13.10
project: root-spark32
- scala: 2.13.10
project: root-spark33
runs-on: ${{ matrix.os }}
steps:
- name: Checkout current branch (full)
Expand Down
32 changes: 17 additions & 15 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,22 @@ associated channels (e.g. GitHub, Discord) to be a safe and friendly environment
The compatible versions of [Spark](http://spark.apache.org/) and
[cats](https://github.com/typelevel/cats) are as follows:

| Frameless | Spark | Cats | Cats-Effect | Scala
| --------- | ----- | -------- | ----------- | ---
| 0.4.0 | 2.2.0 | 1.0.0-IF | 0.4 | 2.11
| 0.4.1 | 2.2.0 | 1.x | 0.8 | 2.11
| 0.5.2 | 2.2.1 | 1.x | 0.8 | 2.11
| 0.6.1 | 2.3.0 | 1.x | 0.8 | 2.11
| 0.7.0 | 2.3.1 | 1.x | 1.x | 2.11
| 0.8.0 | 2.4.0 | 1.x | 1.x | 2.11 / 2.12
| 0.9.0 | 3.0.0 | 1.x | 1.x | 2.12
| 0.10.1 | 3.1.0 | 2.x | 2.x | 2.12
| 0.11.0* | 3.2.0 / 3.1.2 / 3.0.1| 2.x | 2.x | 2.12 / 2.13
| 0.11.1 | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13
| 0.12.0 | 3.2.1 / 3.1.3 / 3.0.3 | 2.x | 3.x | 2.12 / 2.13
| 0.13.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13
| Frameless | Spark | Cats | Cats-Effect | Scala |
|-----------|-----------------------|----------|-------------|-------------|
| 0.4.0 | 2.2.0 | 1.0.0-IF | 0.4 | 2.11 |
| 0.4.1 | 2.2.0 | 1.x | 0.8 | 2.11 |
| 0.5.2 | 2.2.1 | 1.x | 0.8 | 2.11 |
| 0.6.1 | 2.3.0 | 1.x | 0.8 | 2.11 |
| 0.7.0 | 2.3.1 | 1.x | 1.x | 2.11 |
| 0.8.0 | 2.4.0 | 1.x | 1.x | 2.11 / 2.12 |
| 0.9.0 | 3.0.0 | 1.x | 1.x | 2.12 |
| 0.10.1 | 3.1.0 | 2.x | 2.x | 2.12 |
| 0.11.0* | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.11.1 | 3.2.0 / 3.1.2 / 3.0.1 | 2.x | 2.x | 2.12 / 2.13 |
| 0.12.0 | 3.2.1 / 3.1.3 / 3.0.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.13.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.0 | 3.3.0 / 3.2.2 / 3.1.3 | 2.x | 3.x | 2.12 / 2.13 |
| 0.14.1 | 3.4.0 / 3.3.0 / 3.2.2 | 2.x | 3.x | 2.12 / 2.13 |

_\* 0.11.0 has broken Spark 3.1.2 and 3.0.1 artifacts published._

Expand All @@ -50,8 +52,8 @@ Starting 0.11 we introduced Spark cross published artifacts:
Artifact names examples:

* `frameless-dataset` (the latest Spark dependency)
* `frameless-dataset-spark33` (Spark 3.3.x dependency)
* `frameless-dataset-spark32` (Spark 3.2.x dependency)
* `frameless-dataset-spark31` (Spark 3.1.x dependency)

Versions 0.5.x and 0.6.x have identical features. The first is compatible with Spark 2.2.1 and the second with 2.3.0.

Expand Down
126 changes: 67 additions & 59 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
val sparkVersion = "3.3.2"
val sparkVersion = "3.4.0"
val spark33Version = "3.3.2"
val spark32Version = "3.2.4"
val spark31Version = "3.1.3"
val catsCoreVersion = "2.9.0"
val catsEffectVersion = "3.5.0"
val catsMtlVersion = "1.3.1"
Expand All @@ -23,23 +23,23 @@ ThisBuild / githubWorkflowArtifactUpload := false // doesn't work with scoverage

lazy val root = project.in(file("."))
.enablePlugins(NoPublishPlugin)
.aggregate(`root-spark33`, `root-spark32`, `root-spark31`, docs)
.aggregate(`root-spark34`, `root-spark33`, `root-spark32`, docs)

lazy val `root-spark34` = project
.in(file(".spark34"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, cats, dataset, refined, ml)

lazy val `root-spark33` = project
.in(file(".spark33"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, cats, dataset, refined, ml)
.aggregate(core, `cats-spark33`, `dataset-spark33`, `refined-spark33`, `ml-spark33`)

lazy val `root-spark32` = project
.in(file(".spark32"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, `cats-spark32`, `dataset-spark32`, `refined-spark32`, `ml-spark32`)

lazy val `root-spark31` = project
.in(file(".spark31"))
.enablePlugins(NoPublishPlugin)
.aggregate(core, `cats-spark31`, `dataset-spark31`, `refined-spark31`, `ml-spark31`)

lazy val core = project
.settings(name := "frameless-core")
.settings(framelessSettings)
Expand All @@ -49,61 +49,64 @@ lazy val cats = project
.settings(catsSettings)
.dependsOn(dataset % "test->test;compile->compile;provided->provided")

lazy val `cats-spark33` = project
.settings(name := "frameless-cats-spark33")
.settings(sourceDirectory := (cats / sourceDirectory).value)
.settings(catsSettings)
.settings(spark33Settings)
.dependsOn(`dataset-spark33` % "test->test;compile->compile;provided->provided")

lazy val `cats-spark32` = project
.settings(name := "frameless-cats-spark32")
.settings(sourceDirectory := (cats / sourceDirectory).value)
.settings(catsSettings)
.settings(spark32Settings)
.dependsOn(`dataset-spark32` % "test->test;compile->compile;provided->provided")

lazy val `cats-spark31` = project
.settings(name := "frameless-cats-spark31")
.settings(sourceDirectory := (cats / sourceDirectory).value)
.settings(catsSettings)
.settings(spark31Settings)
.dependsOn(`dataset-spark31` % "test->test;compile->compile;provided->provided")

lazy val dataset = project
.settings(name := "frameless-dataset")
.settings(Compile / unmanagedSourceDirectories += baseDirectory.value / "src" / "main" / "spark-3.4+")
.settings(datasetSettings)
.settings(sparkDependencies(sparkVersion))
.dependsOn(core % "test->test;compile->compile")

lazy val `dataset-spark32` = project
.settings(name := "frameless-dataset-spark32")
lazy val `dataset-spark33` = project
.settings(name := "frameless-dataset-spark33")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
.settings(Compile / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "main" / "spark-3")
.settings(datasetSettings)
.settings(sparkDependencies(spark32Version))
.settings(spark32Settings)
.settings(sparkDependencies(spark33Version))
.settings(spark33Settings)
.dependsOn(core % "test->test;compile->compile")

lazy val `dataset-spark31` = project
.settings(name := "frameless-dataset-spark31")
lazy val `dataset-spark32` = project
.settings(name := "frameless-dataset-spark32")
.settings(sourceDirectory := (dataset / sourceDirectory).value)
.settings(Compile / unmanagedSourceDirectories += (dataset / baseDirectory).value / "src" / "main" / "spark-3")
.settings(datasetSettings)
.settings(sparkDependencies(spark31Version))
.settings(spark31Settings)
.settings(sparkDependencies(spark32Version))
.settings(spark32Settings)
.dependsOn(core % "test->test;compile->compile")

lazy val refined = project
.settings(name := "frameless-refined")
.settings(refinedSettings)
.dependsOn(dataset % "test->test;compile->compile;provided->provided")

lazy val `refined-spark33` = project
.settings(name := "frameless-refined-spark33")
.settings(sourceDirectory := (refined / sourceDirectory).value)
.settings(refinedSettings)
.settings(spark33Settings)
.dependsOn(`dataset-spark33` % "test->test;compile->compile;provided->provided")

lazy val `refined-spark32` = project
.settings(name := "frameless-refined-spark32")
.settings(sourceDirectory := (refined / sourceDirectory).value)
.settings(refinedSettings)
.settings(spark32Settings)
.dependsOn(`dataset-spark32` % "test->test;compile->compile;provided->provided")

lazy val `refined-spark31` = project
.settings(name := "frameless-refined-spark31")
.settings(sourceDirectory := (refined / sourceDirectory).value)
.settings(refinedSettings)
.settings(spark31Settings)
.dependsOn(`dataset-spark31` % "test->test;compile->compile;provided->provided")

lazy val ml = project
.settings(name := "frameless-ml")
.settings(mlSettings)
Expand All @@ -113,26 +116,26 @@ lazy val ml = project
dataset % "test->test;compile->compile;provided->provided"
)

lazy val `ml-spark32` = project
.settings(name := "frameless-ml-spark32")
lazy val `ml-spark33` = project
.settings(name := "frameless-ml-spark33")
.settings(sourceDirectory := (ml / sourceDirectory).value)
.settings(mlSettings)
.settings(sparkMlDependencies(spark32Version))
.settings(spark32Settings)
.settings(sparkMlDependencies(spark33Version))
.settings(spark33Settings)
.dependsOn(
core % "test->test;compile->compile",
`dataset-spark32` % "test->test;compile->compile;provided->provided"
`dataset-spark33` % "test->test;compile->compile;provided->provided"
)

lazy val `ml-spark31` = project
.settings(name := "frameless-ml-spark31")
lazy val `ml-spark32` = project
.settings(name := "frameless-ml-spark32")
.settings(sourceDirectory := (ml / sourceDirectory).value)
.settings(mlSettings)
.settings(sparkMlDependencies(spark31Version))
.settings(spark31Settings)
.settings(sparkMlDependencies(spark32Version))
.settings(spark32Settings)
.dependsOn(
core % "test->test;compile->compile",
`dataset-spark31` % "test->test;compile->compile;provided->provided"
`dataset-spark32` % "test->test;compile->compile;provided->provided"
)

lazy val docs = project
Expand Down Expand Up @@ -185,23 +188,15 @@ lazy val datasetSettings = framelessSettings ++ framelessTypedDatasetREPL ++ Seq
imt("frameless.RecordEncoderFields.deriveRecordLast"),
mc("frameless.functions.FramelessLit"),
mc(f"frameless.functions.FramelessLit$$"),
dmm("frameless.functions.package.litAggr")
dmm("frameless.functions.package.litAggr"),
dmm("org.apache.spark.sql.FramelessInternals.column")
)
}
},
coverageExcludedPackages := "org.apache.spark.sql.reflection"
)

lazy val refinedSettings = framelessSettings ++ framelessTypedDatasetREPL ++ Seq(
libraryDependencies += "eu.timepit" %% "refined" % refinedVersion,
/**
* The old Scala XML is pulled from Scala 2.12.x.
*
* [error] (update) found version conflict(s) in library dependencies; some are suspected to be binary incompatible:
* [error]
* [error] * org.scala-lang.modules:scala-xml_2.12:2.1.0 (early-semver) is selected over 1.0.6
* [error] +- org.scoverage:scalac-scoverage-reporter_2.12:2.0.7 (depends on 2.1.0)
* [error] +- org.scala-lang:scala-compiler:2.12.16 (depends on 1.0.6)
*/
libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always
libraryDependencies += "eu.timepit" %% "refined" % refinedVersion
)

lazy val mlSettings = framelessSettings ++ framelessTypedDatasetREPL
Expand Down Expand Up @@ -266,16 +261,29 @@ lazy val framelessSettings = Seq(
mimaPreviousArtifacts ~= {
_.filterNot(_.revision == "0.11.0") // didn't release properly
},
) ++ consoleSettings

lazy val spark31Settings = Seq(
crossScalaVersions := Seq(Scala212)
)
/**
* The old Scala XML is pulled from Scala 2.12.x.
*
* [error] (update) found version conflict(s) in library dependencies; some are suspected to be binary incompatible:
* [error]
* [error] * org.scala-lang.modules:scala-xml_2.12:2.1.0 (early-semver) is selected over 1.0.6
* [error] +- org.scoverage:scalac-scoverage-reporter_2.12:2.0.7 (depends on 2.1.0)
* [error] +- org.scala-lang:scala-compiler:2.12.16 (depends on 1.0.6)
*/
libraryDependencySchemes += "org.scala-lang.modules" %% "scala-xml" % VersionScheme.Always
) ++ consoleSettings

lazy val spark32Settings = Seq(
tlVersionIntroduced := Map("2.12" -> "0.13.0", "2.13" -> "0.13.0")
)

lazy val spark33Settings = Seq[Setting[_]](
tlVersionIntroduced := Map("2.12" -> "0.13.0", "2.13" -> "0.13.0"),
// frameless-dataset-spark33 was originally frameless-dataset
mimaPreviousArtifacts := Set(organization.value %% moduleName.value.split("-").dropRight(1).mkString("-") % "0.14.0")
)

lazy val consoleSettings = Seq(
Compile / console / scalacOptions ~= {_.filterNot("-Ywarn-unused-import" == _)},
Test / console / scalacOptions := (Compile / console / scalacOptions).value
Expand Down Expand Up @@ -341,7 +349,7 @@ ThisBuild / githubWorkflowBuildPreamble ++= Seq(
)
)

val roots = List("root-spark31", "root-spark32", "root-spark33")
val roots = List("root-spark32", "root-spark33", "root-spark34")
ThisBuild / githubWorkflowBuildMatrixAdditions +=
"project" -> roots
ThisBuild / githubWorkflowArtifactDownloadExtraKeys += "project"
Expand Down
2 changes: 1 addition & 1 deletion dataset/src/main/scala/frameless/TypedEncoder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import scala.reflect.ClassTag

import org.apache.spark.sql.FramelessInternals
import org.apache.spark.sql.FramelessInternals.UserDefinedType
import org.apache.spark.sql.catalyst.ScalaReflection
import org.apache.spark.sql.{reflection => ScalaReflection}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.objects._
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData}
Expand Down
4 changes: 1 addition & 3 deletions dataset/src/main/scala/frameless/functions/Lit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,7 @@ private[frameless] case class Lit[T <: AnyVal](

def children: Seq[Expression] = Nil

override def genCode(ctx: CodegenContext): ExprCode = toCatalyst(ctx)

protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ???
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = toCatalyst(ctx)

protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = this
}
6 changes: 1 addition & 5 deletions dataset/src/main/scala/frameless/functions/Udf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,14 @@ case class Spark2_4_LambdaVariable(
}
}

override def genCode(ctx: CodegenContext): ExprCode = {
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
val isNullValue = if (nullable) {
JavaCode.isNullVariable(isNull)
} else {
FalseLiteral
}
ExprCode(value = JavaCode.variable(value, dataType), isNull = isNullValue)
}

// This won't be called as `genCode` is overrided, just overriding it to make
// `LambdaVariable` non-abstract.
override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ev
}

object FramelessUdf {
Expand Down
2 changes: 1 addition & 1 deletion dataset/src/main/scala/frameless/ops/GroupByOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package frameless
package ops

import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
import org.apache.spark.sql.catalyst.plans.logical.{MapGroups, Project}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.{Column, Dataset, FramelessInternals, RelationalGroupedDataset}
import shapeless._
import shapeless.ops.hlist.{Length, Mapped, Prepend, ToList, ToTraversable, Tupler}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ object FramelessInternals {

def expr(column: Column): Expression = column.expr

def column(column: Column): Expression = column.expr

def logicalPlan(ds: Dataset[_]): LogicalPlan = ds.logicalPlan

def executePlan(ds: Dataset[_], plan: LogicalPlan): QueryExecution =
Expand All @@ -51,14 +49,15 @@ object FramelessInternals {
// because org.apache.spark.sql.types.UserDefinedType is private[spark]
type UserDefinedType[A >: Null] = org.apache.spark.sql.types.UserDefinedType[A]

// below only tested in SelfJoinTests.colLeft and colRight are equivalent to col outside of joins
// - via files (codegen) forces doGenCode eval.
/** Expression to tag columns from the left hand side of join expression. */
case class DisambiguateLeft[T](tagged: Expression) extends Expression with NonSQLExpression {
def eval(input: InternalRow): Any = tagged.eval(input)
def nullable: Boolean = false
def children: Seq[Expression] = tagged :: Nil
def dataType: DataType = tagged.dataType
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ???
override def genCode(ctx: CodegenContext): ExprCode = tagged.genCode(ctx)
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = tagged.genCode(ctx)
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy(newChildren.head)
}

Expand All @@ -68,8 +67,7 @@ object FramelessInternals {
def nullable: Boolean = false
def children: Seq[Expression] = tagged :: Nil
def dataType: DataType = tagged.dataType
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = ???
override def genCode(ctx: CodegenContext): ExprCode = tagged.genCode(ctx)
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = tagged.genCode(ctx)
protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = copy(newChildren.head)
}
}
Loading

0 comments on commit f1fcef1

Please sign in to comment.