Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make Literals foldable, ensure Parquet predicates pushdown #721

Merged
merged 27 commits into from
Jun 10, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
3bbdb9c
#343 - unpack to Literals
chris-twiner Jun 5, 2023
3df02ec
#343 - unpack to Literals - more test
chris-twiner Jun 5, 2023
c8ecea8
#343 - unpack to Literals - comment
chris-twiner Jun 5, 2023
b7c3132
#343 - per review - docs missing
chris-twiner Jun 5, 2023
81d9315
#343 - per review - docs missing - fix reflection for all versions
chris-twiner Jun 5, 2023
a3567c2
#343 - add struct test showing difference between extension and exper…
chris-twiner Jun 6, 2023
bee3cd0
#343 - toString test to stop the patch complaint
chris-twiner Jun 6, 2023
bba92cb
#343 - sample docs
chris-twiner Jun 6, 2023
28bde88
#343 - package rename and adding logging that the extension is injected
chris-twiner Jun 6, 2023
f4e99b5
#343 - doc fixes
chris-twiner Jun 6, 2023
0cbe684
#343 - doc fixes
chris-twiner Jun 6, 2023
c308241
#343 - can't run that code
chris-twiner Jun 6, 2023
381931c
#343 - didn't stop the new sparkSession
chris-twiner Jun 6, 2023
3df725f
Apply suggestions from code review
chris-twiner Jun 6, 2023
23c3eb7
#343 - more z's, debug removal, comment adjust and brackets around ex…
chris-twiner Jun 6, 2023
2a83510
Refactor LitRule and LitRules tests by making them slightly more gene…
pomadchin Jun 7, 2023
e7ba599
Fix mdoc compilation
pomadchin Jun 7, 2023
e9999c1
#343 - added the struct test back
chris-twiner Jun 7, 2023
4e7bee3
#343 - disable the rule, foldable and eval evals
chris-twiner Jun 7, 2023
27e7c25
#343 - cleaned up
chris-twiner Jun 7, 2023
18f2bc6
More code cleanup
pomadchin Jun 7, 2023
82bf013
#343 - true with link for 3.2 support
chris-twiner Jun 7, 2023
c6bbe2c
#343 - bring back code gen with lazy to stop recompiles
chris-twiner Jun 7, 2023
31a023f
#343 - disable tests on 3.2, document why and renable the proper fold…
chris-twiner Jun 8, 2023
d7db649
#343 - more compat and a foldable only backport of SPARK-39106 and SP…
chris-twiner Jun 8, 2023
0e6c561
#343 - option 3 - let 3.2 fail as per oss impl, seperate tests
chris-twiner Jun 8, 2023
411871b
#343 - option 3 - better dir names
chris-twiner Jun 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ val shapeless = "2.3.10"
val scalacheck = "1.17.0"
val scalacheckEffect = "1.0.4"
val refinedVersion = "0.10.3"
val nakedFSVersion = "0.1.0"

val Scala212 = "2.12.17"
val Scala213 = "2.13.10"
Expand Down Expand Up @@ -192,7 +193,9 @@ lazy val datasetSettings = framelessSettings ++ framelessTypedDatasetREPL ++ Seq
dmm("org.apache.spark.sql.FramelessInternals.column")
)
},
coverageExcludedPackages := "org.apache.spark.sql.reflection"
coverageExcludedPackages := "org.apache.spark.sql.reflection",

libraryDependencies += "com.globalmentor" % "hadoop-bare-naked-local-fs" % nakedFSVersion % Test exclude("org.apache.hadoop", "hadoop-commons")
)

lazy val refinedSettings = framelessSettings ++ framelessTypedDatasetREPL ++ Seq(
Expand Down
6 changes: 3 additions & 3 deletions dataset/src/main/scala/frameless/functions/Lit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import org.apache.spark.sql.types.DataType
private[frameless] case class Lit[T <: AnyVal](
dataType: DataType,
nullable: Boolean,
toCatalyst: CodegenContext => ExprCode,
show: () => String
show: () => String,
convertedExpr: Expression // must be the same toCatalyst as the toCatalyst function
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
) extends Expression with NonSQLExpression {
override def toString: String = s"FramelessLit(${show()})"

Expand Down Expand Up @@ -53,7 +53,7 @@ private[frameless] case class Lit[T <: AnyVal](

def children: Seq[Expression] = Nil

protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = toCatalyst(ctx)
protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = convertedExpr.genCode(ctx)

protected def withNewChildrenInternal(newChildren: IndexedSeq[Expression]): Expression = this
}
12 changes: 6 additions & 6 deletions dataset/src/main/scala/frameless/functions/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ package object functions extends Udf with UnaryFunctions {
Lit(
dataType = encoder.catalystRepr,
nullable = encoder.nullable,
toCatalyst = encoder.toCatalyst(expr).genCode(_),
show = () => value.toString
show = () => value.toString,
encoder.toCatalyst(expr)
chris-twiner marked this conversation as resolved.
Show resolved Hide resolved
)
)
}
Expand Down Expand Up @@ -84,8 +84,8 @@ package object functions extends Udf with UnaryFunctions {
Lit(
dataType = i7.catalystRepr,
nullable = i7.nullable,
toCatalyst = i7.toCatalyst(expr).genCode(_),
show = () => value.toString
show = () => value.toString,
i7.toCatalyst(expr)
)
)
}
Expand Down Expand Up @@ -127,8 +127,8 @@ package object functions extends Udf with UnaryFunctions {
Lit(
dataType = i7.catalystRepr,
nullable = true,
toCatalyst = i7.toCatalyst(expr).genCode(_),
show = () => value.toString
show = () => value.toString,
i7.toCatalyst(expr)
)
)
}
Expand Down
17 changes: 17 additions & 0 deletions dataset/src/main/scala/frameless/optimiser/LiteralRule.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package frameless.optimiser

import frameless.functions.Lit
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

object LiteralRule extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan.transformExpressions {

/*
* replace all literals to allow constant folding and push down
*/
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
case Lit(dataType, _, _, convertedValue) =>
Literal(convertedValue.eval(), dataType)
}
}
15 changes: 14 additions & 1 deletion dataset/src/test/scala/frameless/TypedDatasetSuite.scala
Original file line number Diff line number Diff line change
@@ -1,20 +1,33 @@
package frameless

import com.globalmentor.apache.hadoop.fs.BareLocalFileSystem
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
import org.apache.hadoop.fs.local.StreamingFS
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{SQLContext, SparkSession}
import org.scalactic.anyvals.PosZInt
import org.scalatest.BeforeAndAfterAll
import org.scalatestplus.scalacheck.Checkers
import org.scalacheck.Prop
import org.scalacheck.Prop._

import scala.util.{Properties, Try}
import org.scalatest.funsuite.AnyFunSuite

trait SparkTesting { self: BeforeAndAfterAll =>

val appID: String = new java.util.Date().toString + math.floor(math.random * 10E4).toLong.toString

val conf: SparkConf = new SparkConf()
/**
* Allows bare naked to be used instead of winutils for testing / dev
*/
def registerFS(sparkConf: SparkConf): SparkConf =
chris-twiner marked this conversation as resolved.
Show resolved Hide resolved
if (System.getProperty("os.name").startsWith("Windows"))
sparkConf.set("spark.hadoop.fs.file.impl", classOf[BareLocalFileSystem].getName).
set("spark.hadoop.fs.AbstractFileSystem.file.impl", classOf[StreamingFS].getName)
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
else
sparkConf

val conf: SparkConf = registerFS(new SparkConf())
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false")
Expand Down
44 changes: 44 additions & 0 deletions dataset/src/test/scala/frameless/optimiser/LitTests.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package frameless.optimiser
pomadchin marked this conversation as resolved.
Show resolved Hide resolved

import frameless._
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec

class LitTests extends TypedDatasetSuite {
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
test("sqlTimestamp pushdown") {
val ms = SQLTimestamp(System.currentTimeMillis())
TypedDataset.create(Seq(X1(ms))).write.mode("overwrite").parquet("./target/testData")
val dataset = TypedDataset.createUnsafe[X1[SQLTimestamp]](session.read.parquet("./target/testData"))
pomadchin marked this conversation as resolved.
Show resolved Hide resolved

{
val pushDowns = getPushDowns(dataset.filter(dataset('a) >= ms))
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
assert(!pushDowns.exists(_.contains("GreaterThanOrEqual")), "Should NOT have had a push down predicate for the underlying long as no optimiser is yet used")
}

val orig = session.sqlContext.experimental.extraOptimizations
try {
session.sqlContext.experimental.extraOptimizations ++= Seq(LiteralRule)

val pushDowns = getPushDowns(dataset.filter(dataset('a) >= ms))
assert(pushDowns.exists(_.contains("GreaterThanOrEqual")), "Should have had a push down predicate for the underlying long")
} finally {
session.sqlContext.experimental.extraOptimizations = orig
}
}

def getPushDowns(dataset: TypedDataset[_]): Seq[String] = {
val sparkPlan = dataset.queryExecution.executedPlan

(if (sparkPlan.children.isEmpty)
// assume it's AQE
sparkPlan match {
case aq: AdaptiveSparkPlanExec => aq.initialPlan
case _ => sparkPlan
}
else
sparkPlan).collect {
case fs: FileSourceScanExec =>
fs.metadata.collect { case ("PushedFilters", value) if value != "[]" => value }
}.flatten
}
pomadchin marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.apache.hadoop.fs.local

import com.globalmentor.apache.hadoop.fs.BareLocalFileSystem
import org.apache.hadoop.fs.DelegateToFileSystem

class StreamingFS(uri: java.net.URI, conf: org.apache.hadoop.conf.Configuration) extends
DelegateToFileSystem(uri, new BareLocalFileSystem(), conf, "file", false) {}