-
Notifications
You must be signed in to change notification settings - Fork 10
Spark_SQL_macros
- Vanilla Spark behavior
- Spark SQL Macro behavior
- Spark Macro: initial scala translation support
- Note on Performance of SQL Macros
- Design Notes
- Writing macros FAQ
This is a generic Spark capability developed by us.
Spark SQL Macros provide a capability to register custom functions into a Spark Session that is similar to custom UDF Registration capability of Spark. The difference is that the SQL macros registration mechanism attempts to generate an equivalent Spark catalyst expression for the function body.
We explain the capability with an example.
A custom function can be registered, in the following way. Consider a very simple function that adds 2
as its
argument.
spark.udf.register("intUDF", (i: Int) => {
val j = 2
i + j
})
Under the covers an Invoke Catalyst Expression
is associated with the function name in
Spark's Function Registry. At runtime, an Invoke Catalyst Expression
runs the associated
function body.
Then, the following spark-sql query (assuming sparktest.unit_test
has a column c_int : Int
):
select intUDF(c_int)
from sparktest.unit_test
where intUDF(c_int) < 0
generates the following physical plan:
|== Physical Plan ==
Project (3)
+- * Filter (2)
+- BatchScan (1)
(1) BatchScan
Output [1]: [C_INT#2271]
OraPlan: 00 OraSingleQueryBlock [C_INT#2271], [oracolumnref(C_INT#2271)]
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2271]
ReadSchema: struct<C_INT:int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownSQL: select "C_INT"
from SPARKTEST.UNIT_TEST
(2) Filter [codegen id : 1]
Input [1]: [C_INT#2271]
Condition : (if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) < 0)
(3) Project [codegen id : 1]
Output [1]: [if (isnull(C_INT#2271)) null else intUDF(knownnotnull(C_INT#2271)) AS intUDF(c_int)#2278]
Input [1]: [C_INT#2271]
- The
intUDF
is invoked in theFilter operator
for evaluating theintUDF(c_int) < 0
predicate; - The
intUDF
is invoked in theProject operator
to evaluate the projectionintUDF(c_int)
Logically the evaluation of these function calls involves moving values between Catalyst and Scala and making a function call on the JVM. Catalyst CodeGen (and Java JIT) does a good job of optimizing away the Serde and function calls for simple functions; but in general this cannot be avoided. Besides since the function code is a black box, no further optimization, like expression pushdown to Oracle is possible.
The intUDF
is a trivial function that just adds 2
to its argument.
It would be nice if we convert the function body into a + 2
expressions.
With Spark SQL Macros you can register the function as a macro like this:
import org.apache.spark.sql.defineMacros._
spark.registerMacro("intUDM", spark.udm((i: Int) => {
val j = 2
i + j
}))
This is almost identical to the registration process for custom functions in Spark. But under the covers, we leverage the Macro mechanics of the Scala Compiler to analyze the Scala AST of the function body and try to generate an equivalent Catalyst Expression for the function body.
If we succeed then what is registered in the Function Registry is a plain old Catalyst Expression with holes that get replaced with argument expressions at any call-site of the function.
The query:
select intUDM(c_int)
from sparktest.unit_test
where intUDM(c_int) < 0
generates the following physical plan:
|== Physical Plan ==
Project (2)
+- BatchScan (1)
(1) BatchScan
Output [1]: [(c_int + 2)#2316]
OraPlan: 00 OraSingleQueryBlock [(C_INT#2309 + 2) AS (c_int + 2)#2316], [oraalias((C_INT#2309 + 2) AS (c_int + 2)#2316)], orabinaryopexpression((((C_INT#2309 + 2) < 0) AND isnotnull(C_INT#2309)))
01 +- OraTableScan SPARKTEST.UNIT_TEST, [C_INT#2309]
ReadSchema: struct<(c_int + 2):int>
dsKey: DataSourceKey(jdbc:oracle:thin:@den02ads:1531/cdb1_pdb7.regress.rdbms.dev.us.oracle.com,tpcds)
oraPushdownBindValues: 2, 0
oraPushdownSQL: select ("C_INT" + 2) AS "(c_int + 2)"
from SPARKTEST.UNIT_TEST
where ((("C_INT" + ?) < ?) AND "C_INT" IS NOT NULL)
(2) Project [codegen id : 1]
Output [1]: [(c_int + 2)#2316]
Input [1]: [(c_int + 2)#2316]
- The predicate
intUDM(c_int) < 0
becomes("C_INT" + ?) < ?
(the literal in a predicate is converted to a bind value) - The projection
intUDM(c_int)
becomes"C_INT" + 2
. - Since macro calls are just plain old catalyst expressions, The project and filters are pushable to Oracle. So the entire query is collapsed into an Oracle scan.
We will provide translation support for the following Scala constructs:
- Primitive datatypes, Case Classes in Scala, Tuples, Arrays, Collections and Maps
- For Tuples and Case Classes we will translate field access
- Value Definitions
- Arbitrary References at the macro call-site that can be evaluated at macro definition time
- Arithmetic, Math, Datetime, String, and Decimal functions supported in Catalyst
- Recursive macro invocations
- Scala Case and If statements.
See Spark SQL Macro examples page.
In addition to supporting user-defined macros, we also plan to provide a way such that argument functions provided to DataFrame map and flatMap high-order functions get converted to equivalent catalyst expressions.
There are many cases where Spark SQL macros based plans will perform better than equivalent user-defined functions.
Obviously as we demonstrated in the Macro behavior section Spark macro plans enable more pushdown processing to the data source. In these cases by-and-large the Macro based plan will provide a lot of performance gain.
But even without the advantage of pushdown macros, plans can perform better because the serialization-deserialization at function call boundaries is avoided; also the function call itself is avoided.
Consider the following calculation of tax and discount:
Let's define a Product data set:
case class Product(prod : String, prodCat : String, amt : Double)
val prods = for(i <- (0 until 1000000)) yield {
Product(s"p_$i", {val m = i % 3; if (m == 0) "alcohol" else if (i == 1) "grocery" else "rest"}, (i % 200).toDouble)
}
val prodDF = spark.createDataset(prods).coalesce(1).cache
prodDF.count
prodDF.createOrReplaceGlobalTempView("products")
For which tax and discount is calculated as:
- No tax on groceries, alcohol is
10.5%
, everything else is9.5%
- On Sundays give a discount of
5%
on alcohol.
We can define this logic as a Spark UDF and as a Spark SQL Macro:
// function registration
spark.udf.register("taxAndDiscountF", {(prodCat : String, amt : Double) =>
import org.apache.spark.sql.catalyst.util.DateTimeUtils._
val taxRate = prodCat match {
case "grocery" => 0.0
case "alcohol" => 10.5
case _ => 9.5
}
val currDate = currentDate(ZoneId.systemDefault())
val discount = if (getDayOfWeek(currDate) == 1 && prodCat == "alcohol") 0.05 else 0.0
amt * ( 1.0 - discount) * (1.0 + taxRate)
})
// macro registration
import org.apache.spark.sql.defineMacros._
import org.apache.spark.sql.sqlmacros.DateTimeUtils._
import java.time.ZoneId
spark.registerMacro("taxAndDiscountM", spark.udm({(prodCat : String, amt : Double) =>
val taxRate = prodCat match {
case "grocery" => 0.0
case "alcohol" => 10.5
case _ => 9.5
}
val currDate = currentDate(ZoneId.systemDefault())
val discount = if (getDayOfWeek(currDate) == 1 && prodCat == "alcohol") 0.05 else 0.0
amt * ( 1.0 - discount) * (1.0 + taxRate)
}))
Now consider the 2 queries:
// queries
val macroBasedResDF = sql("select prod, taxAndDiscountM(prod, amt) from global_temp.products")
val funcBasedResDF = sql("select prod, taxAndDiscountF(prod, amt) from global_temp.products")
val funcBasedRes = funcBasedResDF.collect
val macroBasedRes = macroBasedResDF.collect
In our testing we found the task times for the macro based query to be around 2-3 faster than the function based query
We allow macro call-site static values to be used in the macro code.
These values need to be translated to catalyst expression trees.
Spark's org.apache.spark.sql.catalyst.ScalaReflection already
provides a mechanism for inferring and converting to catalyst expressions
(via org.apache.spark.sql.catalyst.encoders.ExpressionEncoders)
values for supported types. We leverage
this mechanism. But in order to leverage it we need to stand-up
a runtime Universe inside the macro invocation. This is fine because
SQLMacro is invoked in an environment that has all the Spark classes in the
classpath. The only issue is that we cannot use the Thread Classloader
of the Macro invocation. For this reason MacrosScalaReflection
is a copy of org.apache.spark.sql.catalyst.ScalaReflection with its
mirror
setup on org.apache.spark.util.Utils.getSparkClassLoader
Instead of developing a new builder capability to construct macro universe trees for catalyst expressions, we directly construct catalyst expressions. To lift these catalyst expressions back to the runtime world we use the serialization mechanism of catalyst expressions. So the SQLMacroExpressionBuilder is constructed with the serialized form of the catalyst expression. In the runtime world this serialized form is deserialized and on macro invocation MacroArg positions are replaced with the catalyst expressions at the invocation site.
The following macro definition doesn't get translated.
val a = Array(5)
spark.registerMacro("intUDM", spark.udm((i: Int) => {
val j = a(0)
i + j
}))
- Macro translation happens during the compilation of the above code snippet.
- At Macro translation time the value of
a
is unknown
- At Macro translation time the value of
- For identifiers that are not macro parameters or are well-known symbols
(such as
java.lang.Math.abs
) SQL expression translation attempts to replace them with their value at macro compilation time by issuing a macro contexteval
. This fails, and so the overall macro translation falls back to registering a user-defined function.
- Quick Start
- Latest Demo
- Configuration
- Catalog
- Translation
- Query Splitting details
- DML Operations
- Language Integration
- Dockerized Demo env.
- Sharded Database
- Developer Notes