Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Literals foldable, ensure Parquet predicates pushdown #721

Merged
merged 27 commits into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3bbdb9c
#343 - unpack to Literals
chris-twiner Jun 5, 2023
3df02ec
#343 - unpack to Literals - more test
chris-twiner Jun 5, 2023
c8ecea8
#343 - unpack to Literals - comment
chris-twiner Jun 5, 2023
b7c3132
#343 - per review - docs missing
chris-twiner Jun 5, 2023
81d9315
#343 - per review - docs missing - fix reflection for all versions
chris-twiner Jun 5, 2023
a3567c2
#343 - add struct test showing difference between extension and exper…
chris-twiner Jun 6, 2023
bee3cd0
#343 - toString test to stop the patch complaint
chris-twiner Jun 6, 2023
bba92cb
#343 - sample docs
chris-twiner Jun 6, 2023
28bde88
#343 - package rename and adding logging that the extension is injected
chris-twiner Jun 6, 2023
f4e99b5
#343 - doc fixes
chris-twiner Jun 6, 2023
0cbe684
#343 - doc fixes
chris-twiner Jun 6, 2023
c308241
#343 - can't run that code
chris-twiner Jun 6, 2023
381931c
#343 - didn't stop the new sparkSession
chris-twiner Jun 6, 2023
3df725f
Apply suggestions from code review
chris-twiner Jun 6, 2023
23c3eb7
#343 - more z's, debug removal, comment adjust and brackets around ex…
chris-twiner Jun 6, 2023
2a83510
Refactor LitRule and LitRules tests by making them slightly more gene…
pomadchin Jun 7, 2023
e7ba599
Fix mdoc compilation
pomadchin Jun 7, 2023
e9999c1
#343 - added the struct test back
chris-twiner Jun 7, 2023
4e7bee3
#343 - disable the rule, foldable and eval evals
chris-twiner Jun 7, 2023
27e7c25
#343 - cleaned up
chris-twiner Jun 7, 2023
18f2bc6
More code cleanup
pomadchin Jun 7, 2023
82bf013
#343 - true with link for 3.2 support
chris-twiner Jun 7, 2023
c6bbe2c
#343 - bring back code gen with lazy to stop recompiles
chris-twiner Jun 7, 2023
31a023f
#343 - disable tests on 3.2, document why and renable the proper fold…
chris-twiner Jun 8, 2023
d7db649
#343 - more compat and a foldable only backport of SPARK-39106 and SP…
chris-twiner Jun 8, 2023
0e6c561
#343 - option 3 - let 3.2 fail as per oss impl, seperate tests
chris-twiner Jun 8, 2023
411871b
#343 - option 3 - better dir names
chris-twiner Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val shapeless = "2.3.10"
val scalacheck = "1.17.0"
val scalacheckEffect = "1.0.4"
val refinedVersion = "0.10.3"
val nakedFSVersion = "0.1.0"

val Scala212 = "2.12.17"
val Scala213 = "2.13.10"
Expand Down Expand Up @@ -192,7 +193,9 @@ lazy val datasetSettings = framelessSettings ++ framelessTypedDatasetREPL ++ Seq
dmm("org.apache.spark.sql.FramelessInternals.column")
)
},
coverageExcludedPackages := "org.apache.spark.sql.reflection"
coverageExcludedPackages := "org.apache.spark.sql.reflection",

libraryDependencies += "com.globalmentor" % "hadoop-bare-naked-local-fs" % nakedFSVersion % Test exclude("org.apache.hadoop", "hadoop-commons")
)

lazy val refinedSettings = framelessSettings ++ framelessTypedDatasetREPL ++ Seq(
Expand Down
6 changes: 3 additions & 3 deletions dataset/src/main/scala/frameless/functions/Lit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import org.apache.spark.sql.types.DataType
private[frameless] case class Lit[T <: AnyVal](
dataType: DataType,
nullable: Boolean,
toCatalyst: CodegenContext => ExprCode,
show: () => String
show: () => String,
catalystExpr: Expression // must be the same toCatalyst as the toCatalyst function
chris-twiner marked this conversation as resolved.
Show resolved Hide resolved
) extends Expression with NonSQLExpression {
override def toString: String = s"FramelessLit(${show()})"

Expand Down Expand Up @@ -53,7 +53,7 @@ private[frameless] case class Lit[T <: AnyVal](

def children: Seq[Expression] = Nil

protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = toCatalyst(ctx)
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = catalystExpr.genCode(ctx)

protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = this
}
12 changes: 6 additions & 6 deletions dataset/src/main/scala/frameless/functions/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ package object functions extends Udf with UnaryFunctions {
Lit(
dataType = encoder.catalystRepr,
nullable = encoder.nullable,
toCatalyst = encoder.toCatalyst(expr).genCode(_),
show = () => value.toString
show = () => value.toString,
encoder.toCatalyst(expr)
chris-twiner marked this conversation as resolved.
Show resolved Hide resolved
)
)
}
Expand Down Expand Up @@ -84,8 +84,8 @@ package object functions extends Udf with UnaryFunctions {
Lit(
dataType = i7.catalystRepr,
nullable = i7.nullable,
toCatalyst = i7.toCatalyst(expr).genCode(_),
show = () => value.toString
show = () => value.toString,
i7.toCatalyst(expr)
)
)
}
Expand Down Expand Up @@ -127,8 +127,8 @@ package object functions extends Udf with UnaryFunctions {
Lit(
dataType = i7.catalystRepr,
nullable = true,
toCatalyst = i7.toCatalyst(expr).genCode(_),
show = () => value.toString
show = () => value.toString,
i7.toCatalyst(expr)
)
)
}
Expand Down
10 changes: 10 additions & 0 deletions dataset/src/main/scala/frameless/optimiser/Extension.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package frameless.optimiser

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSessionExtensions

class FramelessExtension extends ((SparkSessionExtensions) => Unit) with Logging {
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
override def apply(extensions: SparkSessionExtensions): Unit = {
extensions.injectOptimizerRule( _ => LiteralRule)
}
}
15 changes: 15 additions & 0 deletions dataset/src/main/scala/frameless/optimiser/LiteralRule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package frameless.optimiser

import frameless.functions.Lit
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

object LiteralRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformExpressions {

// replace all literals to allow constant folding and push down
case Lit(dataType, _, _, convertedValue) =>
Literal(convertedValue.eval(), dataType)
}
}
2 changes: 1 addition & 1 deletion dataset/src/test/scala/frameless/GroupByTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class GroupByTests extends TypedDatasetSuite {

val datasetSumByA = dataset.groupByMany(A).agg(sum(B)).collect().run.toVector.sortBy(_._1)
val sumByA = data.groupBy(_.a).map { case (k, v) => k -> v.map(_.b).map(widen).sum }.toVector.sortBy(_._1)

dataset.show().run()
chris-twiner marked this conversation as resolved.
Show resolved Hide resolved
datasetSumByA ?= sumByA
}

Expand Down
6 changes: 5 additions & 1 deletion dataset/src/test/scala/frameless/LitTests.scala
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,11 @@ class LitTests extends TypedDatasetSuite with Matchers {

val someIpsum: Option[Name] = Some(new Name("Ipsum"))

ds.withColumnReplaced('alias, functions.litValue(someIpsum)).
val lit = functions.litValue(someIpsum)
val tds = ds.withColumnReplaced('alias, functions.litValue(someIpsum))
tds.queryExecution.toString() should include (lit.toString)
chris-twiner marked this conversation as resolved.
Show resolved Hide resolved

tds.
collect.run() shouldBe initial.map(_.copy(alias = someIpsum))

ds.withColumnReplaced('alias, functions.litValue(Option.empty[Name])).
Expand Down
15 changes: 14 additions & 1 deletion dataset/src/test/scala/frameless/TypedDatasetSuite.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
package frameless

import com.globalmentor.apache.hadoop.fs.BareLocalFileSystem
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.hadoop.fs.local.StreamingFS
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalactic.anyvals.PosZInt
import org.scalatest.BeforeAndAfterAll
import org.scalatestplus.scalacheck.Checkers
import org.scalacheck.Prop
import org.scalacheck.Prop._

import scala.util.{Properties, Try}
import org.scalatest.funsuite.AnyFunSuite

trait SparkTesting { self: BeforeAndAfterAll =>

val appID: String = new java.util.Date().toString + math.floor(math.random * 10E4).toLong.toString

val conf: SparkConf = new SparkConf()
/**
* Allows bare naked to be used instead of winutils for testing / dev
*/
def registerFS(sparkConf: SparkConf): SparkConf =
chris-twiner marked this conversation as resolved.
Show resolved Hide resolved
if (System.getProperty("os.name").startsWith("Windows"))
sparkConf.set("spark.hadoop.fs.file.impl", classOf[BareLocalFileSystem].getName).
set("spark.hadoop.fs.AbstractFileSystem.file.impl", classOf[StreamingFS].getName)
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
else
sparkConf

val conf: SparkConf = registerFS(new SparkConf())
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
Expand Down
172 changes: 172 additions & 0 deletions dataset/src/test/scala/frameless/optimiser/LitTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
package frameless.optimiser
pomadchin marked this conversation as resolved.
Show resolved Hide resolved

import frameless._
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.catalyst.util.DateTimeUtils.{currentTimestamp, microsToInstant}
import org.apache.spark.sql.sources.{Filter, GreaterThanOrEqual, EqualTo}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers

import java.time.Instant

trait PushDownTests extends Matchers {

implicit def session: SparkSession

import Job.framelessSparkDelayForJob

def withoutOptimisation[A]( thunk : => A): A
def withOptimisation[A]( thunk : => A): A

def gteTest[A: TypedEncoder : CatalystOrdered](payload: A, expected: Any, expectFailureWithExperimental: Boolean = false) =
pushDownTest[A](payload, GreaterThanOrEqual("a", expected), _ >= payload, expectFailureWithExperimental)

def eqTest[A: TypedEncoder : CatalystOrdered](payload: A, expected: Any, expectFailureWithExperimental: Boolean = false) =
pushDownTest[A](payload, EqualTo("a", expected), _ === payload, expectFailureWithExperimental)

val isExperimental: Boolean

def pushDownTest[A: TypedEncoder : CatalystOrdered](payload: A, expected: Any, op: TypedColumn[X1[A],A] => TypedColumn[X1[A],Boolean], expectFailureWithExperimental: Boolean = false) = {
withoutOptimisation {
TypedDataset.create(Seq(X1(payload))).write.mode("overwrite").parquet("./target/optimiserTestData")
val dataset = TypedDataset.createUnsafe[X1[A]](session.read.parquet("./target/optimiserTestData"))

val pushDowns = getPushDowns(dataset.filter(op(dataset('a))))

pushDowns should not contain (expected)
}

withOptimisation {
TypedDataset.create(Seq(X1(payload))).write.mode("overwrite").parquet("./target/optimiserTestData")
val dataset = TypedDataset.createUnsafe[X1[A]](session.read.parquet("./target/optimiserTestData"))

val ds = dataset.filter(op(dataset('a) ))
ds.explain(true)
val pushDowns = getPushDowns(ds)

// prove the push down worked when expected
if (isExperimental && expectFailureWithExperimental)
pushDowns should not contain(expected)
else
pushDowns should contain(expected)

val collected = ds.collect().run.toVector.head
// prove the serde isn't affected
collected should be(X1(payload))
}
}

def getPushDowns(dataset: TypedDataset[_]): Seq[Filter] = {
val sparkPlan = dataset.queryExecution.executedPlan

(if (sparkPlan.children.isEmpty)
// assume it's AQE
sparkPlan match {
case aq: AdaptiveSparkPlanExec => aq.initialPlan
case _ => sparkPlan
}
else
sparkPlan).collect {
case fs: FileSourceScanExec =>
import scala.reflect.runtime.{universe => ru}

val runtimeMirror = ru.runtimeMirror(getClass.getClassLoader)
val instanceMirror = runtimeMirror.reflect(fs)
val getter = ru.typeOf[FileSourceScanExec].member(ru.TermName("pushedDownFilters")).asTerm.getter
val m = instanceMirror.reflectMethod(getter.asMethod)
val res = m.apply(fs).asInstanceOf[Seq[Filter]]

res
}.flatten
}

}

trait TheTests extends AnyFunSuite with PushDownTests {

test("sqlTimestamp pushdown") {
val now = currentTimestamp()
val sqlts = java.sql.Timestamp.from(microsToInstant(now))
val ts = SQLTimestamp(now)
val expected = sqlts

gteTest(ts, expected)
}

test("instant pushdown") {
val payload = Instant.now()
val expected = java.sql.Timestamp.from(payload)

gteTest(payload, expected)
}

test("struct pushdown") {
val payload = X1(X4(1,2,3,4))
val expected = new GenericRowWithSchema(Array(Row(1,2,3,4)), implicitly[TypedEncoder[X1[X4[Int,Int,Int,Int]]]].catalystRepr.asInstanceOf[StructType])

eqTest(payload, expected, expectFailureWithExperimental = true)
}
}

class ExperimentalLitTests extends TypedDatasetSuite with TheTests {
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
val isExperimental = true

def withoutOptimisation[A]( thunk : => A) = thunk

def withOptimisation[A](thunk: => A): A = {
val orig = session.sqlContext.experimental.extraOptimizations
try {
session.sqlContext.experimental.extraOptimizations ++= Seq(LiteralRule)

thunk
} finally {
session.sqlContext.experimental.extraOptimizations = orig
}
}

}

class ExtensionLitTests extends TypedDatasetSuite with TheTests {
val isExperimental = false

var s: SparkSession = null

override implicit def session: SparkSession = s

def withoutOptimisation[A]( thunk : => A): A =
try {
s = SparkSession.builder().config(conf).getOrCreate()

thunk
} finally {
stopSpark()
}

def withOptimisation[A](thunk: => A): A =
try {
s = SparkSession.builder().config(
conf.clone().set("spark.sql.extensions", classOf[FramelessExtension].getName)
).getOrCreate()

thunk
} finally {
stopSpark()
}

def stopSpark(): Unit =
if (s != null) {
s.stop()
s = null
}

override def beforeAll(): Unit =
stopSpark()

override def afterAll(): Unit =
stopSpark()

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.hadoop.fs.local

import com.globalmentor.apache.hadoop.fs.BareLocalFileSystem
import org.apache.hadoop.fs.DelegateToFileSystem

class StreamingFS(uri: java.net.URI, conf: org.apache.hadoop.conf.Configuration) extends
DelegateToFileSystem(uri, new BareLocalFileSystem(), conf, "file", false) {}