-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for partial aggregation support #15703
Conversation
Test build #67842 has finished for PR 15703 at commit
|
Test build #67844 has finished for PR 15703 at commit
|
cc @tejasapatil |
I can't reproduce those test failures when executing failed test cases individually. Seems that it's related to execution order. Still investigating. |
private lazy val partialResultInspector = | ||
function.init(GenericUDAFEvaluator.Mode.PARTIAL1, inspectors) | ||
|
||
// The following two lines initializes `function: GenericUDAFEvaluator` eagerly. These two fields |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels little weird. If you want eager init and not serialize, why not remove lazy
and just keep it @transient
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I didn't check whether these two fields are really used on executor side. If not, it's true that we can remove the lazy
modifier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that we can remove this hack. Thanks!
This will surely improve performance for the UDAFs where the data shrinks (eg. It would be good to see some perf numbers to compare (native Spark UDFs) vs (before this change) vs (after the change). |
@tejasapatil For In the long run, this PR is mostly useful for running legacy UDAFs developed by the users rather than Hive built-in ones since we'd prefer to migrate all the built-in ones to Spark at last. |
I found that I'm not handling bridged UDAFs properly, which caused a few test failures. Working on it. |
@tejasapatil Another point that I'd like to add is that even if the performance for a single UDAF like |
There is no doubt that this PR is not going to make things better. I'm already sold :) |
Test build #67910 has finished for PR 15703 at commit
|
It turned out that I didn't initialize Hive UDAF evaluators properly. Quoted from commit message of my previous commit:
|
Test build #67927 has finished for PR 15703 at commit
|
Test build #67930 has finished for PR 15703 at commit
|
Test build #67938 has finished for PR 15703 at commit
|
OK, now it's ready for review and merge. |
override def eval(input: InternalRow): Any = unwrapper(function.evaluate(buffer)) | ||
private lazy val finalModeEvaluator = { | ||
val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false) | ||
resolver.getEvaluator(parameterInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These 2 lines are duplicated many times, should we abstract them to a method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, thanks!
|
||
private val partialResultWrapper = wrapperFor(partialResultInspector, partialResultDataType) | ||
|
||
private val projection = UnsafeProjection.create(Array(partialResultDataType)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you try to run hive udaf in spark shell? IIUC, we can't create unsafe projection inside UDAF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does work as expected:
scala> sql("CREATE TEMPORARY FUNCTION hive_max AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax'")
res0: org.apache.spark.sql.DataFrame = []
scala> spark.range(100).createOrReplaceTempView("t")
scala> sql("SELECT hive_max(id) FROM t").explain()
== Physical Plan ==
SortAggregate(key=[], functions=[hive_max(hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@144792d5), id#1L, false, 0, 0)])
+- Exchange SinglePartition
+- SortAggregate(key=[], functions=[partial_hive_max(hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@144792d5), id#1L, false, 0, 0)])
+- *Range (0, 100, step=1, splits=Some(8))
scala> sql("SELECT hive_max(id) FROM t").show()
+-------------+
|hive_max( id)|
+-------------+
| 99|
+-------------+
Why do you think UnsafeProjection
can't be used here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to use unsafe projection in percentile_approx
before, but failed in spark shell, maybe it's a different problem, nvm.
Test build #68433 has finished for PR 15703 at commit
|
The last build failure was because of a logical conflict between this PR and the master branch. Resolving it. |
4c6284b
to
3ffafe7
Compare
Test build #68488 has finished for PR 15703 at commit
|
Hive UDAFs are sensitive to aggregation mode, and must be initialized with proper modes before being used. Basically, it means that you can't use an evaluator initialized with mode PARTIAL1 to merge two aggregation states (although it still works for aggregate functions whose partial result type is the same as the final result type).
3ffafe7
to
b418cd7
Compare
We're now using ObjectHashAggregateExec instead of SortAggregateExec to evaluate Hive UDAFs.
Test build #68491 has finished for PR 15703 at commit
|
Test build #68493 has finished for PR 15703 at commit
|
@transient | ||
private lazy val function = functionAndInspector._1 | ||
private lazy val partial1ModeEvaluator = newEvaluator() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to make it a lazy val since partialResultInspector is uses it right below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. It has to be transient and lazy so that it's also available on executor side since Hive UDAF evaluators are not serializable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. I think in general we should avoid of using this pattern. If we have to use it now, let's explain it in the comment.
|
||
// We rely on Hive to check the input data types, so use `AnyDataType` here to bypass our | ||
// catalyst type checking framework. | ||
override def inputTypes: Seq[AbstractDataType] = children.map(_ => AnyDataType) | ||
|
||
override def nullable: Boolean = true | ||
|
||
override def supportsPartial: Boolean = false | ||
override def supportsPartial: Boolean = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any Hive UDAF that does not support partial aggregation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. Hive doesn't have an equivalent flag and all UDAFs inherently support partial aggregation since they have to implement callbacks of all phases.
partial1ModeEvaluator.getNewAggregationBuffer | ||
|
||
@transient | ||
private lazy val inputProjection = new InterpretedProjection(children) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why InterpretedProjection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copied from the original code. Replaced with UnsafeProjection
instead. Thanks.
|
||
override def deserialize(bytes: Array[Byte]): AggregationBuffer = { | ||
aggBufferSerDe.deserialize(bytes) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add docs to explain what these functions are doing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
override def merge(buffer: AggregationBuffer, input: AggregationBuffer): Unit = { | ||
partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's explain what we are trying to do using partial1ModeEvaluator.terminatePartial(input)
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comment added.
|
||
private val mutableRow = new GenericInternalRow(1) | ||
|
||
def serialize(buffer: AggregationBuffer): Array[Byte] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
test("customized Hive UDAF") { | ||
val df = sql("SELECT mock(value) FROM t GROUP BY key % 2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about we also keep the key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
|
||
test("built-in Hive UDAF") { | ||
val df = sql("SELECT hive_max(key) FROM t GROUP BY key % 2") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also keep the key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Thanks.
* 1. a Spark SQL value, or | ||
* 2. an instance of some concrete `GenericUDAFEvaluator.AggregationBuffer` class, or | ||
* 3. a Java object that can be inspected using the `ObjectInspector` returned by the | ||
* `GenericUDAFEvaluator.init()` method. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Besides of explaining what are these three formats, let's also explain when we will use each of them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(we can just put the pr description to here)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(is the doc below enough?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, added.
@transient | ||
private lazy val inspectors = children.map(toInspector).toArray | ||
private lazy val inputInspectors = children.map(toInspector).toArray |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's add docs to explain when these internal vals are used (like which vals are needed for a given mode).
|
||
// Hive `ObjectInspector` used to inspect partial aggregation results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial aggregation result is aggregation buffer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea. They are those objects returned by terminatePartial()
, which is the inspectable version of Hive AggregationBuffer
.
// in the superclass because that will lead to initialization ordering issues. | ||
override val inputAggBufferAttributes: Seq[AttributeReference] = Nil | ||
@transient | ||
private lazy val aggBufferSerDe: AggregationBufferSerDe = new AggregationBufferSerDe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc
} | ||
|
||
// Helper class used to de/serialize Hive UDAF `AggregationBuffer` objects | ||
private class AggregationBufferSerDe { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we take this class out from HiveUDAFFunction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can, but it doesn't seem to be necessary. Make it a nested class also simplifies implementation since it has access to fields of the outer class.
val unsafeRow = projection(mutableRow) | ||
val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes) | ||
unsafeRow.writeTo(bytes) | ||
bytes.array() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we just use unsafeRow.getBytes
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't they equivalent in this case? UnsafeRow.getBytes
also performs some more checks that are not necessary here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but you also create an unnecessary ByteBuffer
... as they are equivalent, doesn't unsafeRow.getBytes
simpler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually they are different. If the buffer type is fixed length, then the unsafeRow
is just a fixed-length bytes array, and UnsafeRow.getBytes
will just return that array, instead of copying the memory.
Test build #68734 has finished for PR 15703 at commit
|
Code changes looks good to me. Let's also do a benchmark to sanity check our implementation. |
LGTM. Merging to master. |
Thanks everyone for the review! |
…mperativeAggregate for partial aggregation support While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates `HiveUDAFFunction`s to `TypedImperativeAggregate`, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states. The following snippet shows the effect of this PR: ```scala import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'") spark.range(100).createOrReplaceTempView("t") // A query using both Spark SQL native `max` and Hive `max` sql(s"SELECT max(id), hive_max(id) FROM t").explain() ``` Before this PR: ``` == Physical Plan == SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax7475f57e), id#1L, false, 0, 0)]) +- Exchange SinglePartition +- *Range (0, 100, step=1, splits=Some(1)) ``` After this PR: ``` == Physical Plan == SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)]) +- Exchange SinglePartition +- SortAggregate(key=[], functions=[partial_max(id#1L), partial_default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)]) +- *Range (0, 100, step=1, splits=Some(1)) ``` The tricky part of the PR is mostly about updating and passing around aggregation states of `HiveUDAFFunction`s since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testing `MockUDAF` added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time: 1. A `MockUDAFBuffer`, which is a concrete subclass of `GenericUDAFEvaluator.AggregationBuffer` The form used by Hive UDAF API. This form is required by the following scenarios: - Calling `GenericUDAFEvaluator.iterate()` to update an existing aggregation state with new input values. - Calling `GenericUDAFEvaluator.terminate()` to get the final aggregated value from an existing aggregation state. - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state. The existing aggregation state to be updated must be in this form. Conversions: - To form 2: `GenericUDAFEvaluator.terminatePartial()` - To form 3: Convert to form 2 first, and then to 3. 2. An `Object[]` array containing two `java.lang.Long` values. The form used to interact with Hive's `ObjectInspector`s. This form is required by the following scenarios: - Calling `GenericUDAFEvaluator.terminatePartial()` to convert an existing aggregation state in form 1 to form 2. - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state. The input aggregation state must be in this form. Conversions: - To form 1: No direct method. Have to create an empty `AggregationBuffer` and merge it into the empty buffer. - To form 3: `unwrapperFor()`/`unwrap()` method of `HiveInspectors` 3. The byte array that holds data of an `UnsafeRow` with two `LongType` fields. The form used by Spark SQL to shuffle partial aggregation results. This form is required because `TypedImperativeAggregate` always asks its subclasses to serialize their aggregation states into a byte array. Conversions: - To form 1: Convert to form 2 first, and then to 1. - To form 2: `wrapperFor()`/`wrap()` method of `HiveInspectors` Here're some micro-benchmark results produced by the most recent master and this PR branch. Master: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o groupBy 339 / 372 3.1 323.2 1.0X w/ groupBy 503 / 529 2.1 479.7 0.7X ``` This PR: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o groupBy 116 / 126 9.0 110.8 1.0X w/ groupBy 151 / 159 6.9 144.0 0.8X ``` Benchmark code snippet: ```scala test("Hive UDAF benchmark") { val N = 1 << 20 sparkSession.sql(s"CREATE TEMPORARY FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'") val benchmark = new Benchmark( name = "hive udaf vs spark af", valuesPerIteration = N, minNumIters = 5, warmupTime = 5.seconds, minTime = 5.seconds, outputPerIteration = true ) benchmark.addCase("w/o groupBy") { _ => sparkSession.range(N).agg("id" -> "hive_max").collect() } benchmark.addCase("w/ groupBy") { _ => sparkSession.range(N).groupBy($"id" % 10).agg("id" -> "hive_max").collect() } benchmark.run() sparkSession.sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max") } ``` New test suite `HiveUDAFSuite` is added. Author: Cheng Lian <liandatabricks.com> Closes apache#15703 from liancheng/partial-agg-hive-udaf. Author: Cheng Lian <lian@databricks.com> Closes apache#144 from yhuai/branch-2.1-hive-udaf.
…r partial aggregation support ## What changes were proposed in this pull request? While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates `HiveUDAFFunction`s to `TypedImperativeAggregate`, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states. The following snippet shows the effect of this PR: ```scala import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'") spark.range(100).createOrReplaceTempView("t") // A query using both Spark SQL native `max` and Hive `max` sql(s"SELECT max(id), hive_max(id) FROM t").explain() ``` Before this PR: ``` == Physical Plan == SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax7475f57e), id#1L, false, 0, 0)]) +- Exchange SinglePartition +- *Range (0, 100, step=1, splits=Some(1)) ``` After this PR: ``` == Physical Plan == SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)]) +- Exchange SinglePartition +- SortAggregate(key=[], functions=[partial_max(id#1L), partial_default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)]) +- *Range (0, 100, step=1, splits=Some(1)) ``` The tricky part of the PR is mostly about updating and passing around aggregation states of `HiveUDAFFunction`s since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testing `MockUDAF` added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time: 1. A `MockUDAFBuffer`, which is a concrete subclass of `GenericUDAFEvaluator.AggregationBuffer` The form used by Hive UDAF API. This form is required by the following scenarios: - Calling `GenericUDAFEvaluator.iterate()` to update an existing aggregation state with new input values. - Calling `GenericUDAFEvaluator.terminate()` to get the final aggregated value from an existing aggregation state. - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state. The existing aggregation state to be updated must be in this form. Conversions: - To form 2: `GenericUDAFEvaluator.terminatePartial()` - To form 3: Convert to form 2 first, and then to 3. 2. An `Object[]` array containing two `java.lang.Long` values. The form used to interact with Hive's `ObjectInspector`s. This form is required by the following scenarios: - Calling `GenericUDAFEvaluator.terminatePartial()` to convert an existing aggregation state in form 1 to form 2. - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state. The input aggregation state must be in this form. Conversions: - To form 1: No direct method. Have to create an empty `AggregationBuffer` and merge it into the empty buffer. - To form 3: `unwrapperFor()`/`unwrap()` method of `HiveInspectors` 3. The byte array that holds data of an `UnsafeRow` with two `LongType` fields. The form used by Spark SQL to shuffle partial aggregation results. This form is required because `TypedImperativeAggregate` always asks its subclasses to serialize their aggregation states into a byte array. Conversions: - To form 1: Convert to form 2 first, and then to 1. - To form 2: `wrapperFor()`/`wrap()` method of `HiveInspectors` Here're some micro-benchmark results produced by the most recent master and this PR branch. Master: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o groupBy 339 / 372 3.1 323.2 1.0X w/ groupBy 503 / 529 2.1 479.7 0.7X ``` This PR: ``` Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5 Intel(R) Core(TM) i7-4960HQ CPU 2.60GHz hive udaf vs spark af: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative ------------------------------------------------------------------------------------------------ w/o groupBy 116 / 126 9.0 110.8 1.0X w/ groupBy 151 / 159 6.9 144.0 0.8X ``` Benchmark code snippet: ```scala test("Hive UDAF benchmark") { val N = 1 << 20 sparkSession.sql(s"CREATE TEMPORARY FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'") val benchmark = new Benchmark( name = "hive udaf vs spark af", valuesPerIteration = N, minNumIters = 5, warmupTime = 5.seconds, minTime = 5.seconds, outputPerIteration = true ) benchmark.addCase("w/o groupBy") { _ => sparkSession.range(N).agg("id" -> "hive_max").collect() } benchmark.addCase("w/ groupBy") { _ => sparkSession.range(N).groupBy($"id" % 10).agg("id" -> "hive_max").collect() } benchmark.run() sparkSession.sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max") } ``` ## How was this patch tested? New test suite `HiveUDAFSuite` is added. Author: Cheng Lian <lian@databricks.com> Closes apache#15703 from liancheng/partial-agg-hive-udaf.
// buffer in the 3rd format mentioned in the ScalaDoc of this class. Originally, Hive converts | ||
// this `AggregationBuffer`s into this format before shuffling partial aggregation results, and | ||
// calls `GenericUDAFEvaluator.terminatePartial()` to do the conversion. | ||
partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we follow the code flow from interfaces.scala, we see that the results of aggregation buffer mode in PARTIAL2 is merged with the aggregation buffer in PARTIAL1. I am new to Spark and Hive, so just wanted to know the reason behind the above behaviour. If there are any docs suggesting this, do let me know. Thank you.
What changes were proposed in this pull request?
While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates
HiveUDAFFunction
s toTypedImperativeAggregate
, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states.The following snippet shows the effect of this PR:
Before this PR:
After this PR:
The tricky part of the PR is mostly about updating and passing around aggregation states of
HiveUDAFFunction
s since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testingMockUDAF
added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time:A
MockUDAFBuffer
, which is a concrete subclass ofGenericUDAFEvaluator.AggregationBuffer
The form used by Hive UDAF API. This form is required by the following scenarios:
Calling
GenericUDAFEvaluator.iterate()
to update an existing aggregation state with new input values.Calling
GenericUDAFEvaluator.terminate()
to get the final aggregated value from an existing aggregation state.Calling
GenericUDAFEvaluator.merge()
to merge other aggregation states into an existing aggregation state.The existing aggregation state to be updated must be in this form.
Conversions:
To form 2:
GenericUDAFEvaluator.terminatePartial()
To form 3:
Convert to form 2 first, and then to 3.
An
Object[]
array containing twojava.lang.Long
values.The form used to interact with Hive's
ObjectInspector
s. This form is required by the following scenarios:Calling
GenericUDAFEvaluator.terminatePartial()
to convert an existing aggregation state in form 1 to form 2.Calling
GenericUDAFEvaluator.merge()
to merge other aggregation states into an existing aggregation state.The input aggregation state must be in this form.
Conversions:
To form 1:
No direct method. Have to create an empty
AggregationBuffer
and merge it into the empty buffer.To form 3:
unwrapperFor()
/unwrap()
method ofHiveInspectors
The byte array that holds data of an
UnsafeRow
with twoLongType
fields.The form used by Spark SQL to shuffle partial aggregation results. This form is required because
TypedImperativeAggregate
always asks its subclasses to serialize their aggregation states into a byte array.Conversions:
To form 1:
Convert to form 2 first, and then to 1.
To form 2:
wrapperFor()
/wrap()
method ofHiveInspectors
Here're some micro-benchmark results produced by the most recent master and this PR branch.
Master:
This PR:
Benchmark code snippet:
How was this patch tested?
New test suite
HiveUDAFSuite
is added.