Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for partial aggregation support #15703

Closed
wants to merge 10 commits into from

Conversation

liancheng
Copy link
Contributor

@liancheng liancheng commented Oct 31, 2016

What changes were proposed in this pull request?

While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates HiveUDAFFunctions to TypedImperativeAggregate, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states.

The following snippet shows the effect of this PR:

import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax
sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

spark.range(100).createOrReplaceTempView("t")

// A query using both Spark SQL native `max` and Hive `max`
sql(s"SELECT max(id), hive_max(id) FROM t").explain()

Before this PR:

== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@7475f57e), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- *Range (0, 100, step=1, splits=Some(1))

After this PR:

== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@5e18a6a7), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- SortAggregate(key=[], functions=[partial_max(id#1L), partial_default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@5e18a6a7), id#1L, false, 0, 0)])
      +- *Range (0, 100, step=1, splits=Some(1))

The tricky part of the PR is mostly about updating and passing around aggregation states of HiveUDAFFunctions since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testing MockUDAF added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time:

  1. A MockUDAFBuffer, which is a concrete subclass of GenericUDAFEvaluator.AggregationBuffer

    The form used by Hive UDAF API. This form is required by the following scenarios:

    • Calling GenericUDAFEvaluator.iterate() to update an existing aggregation state with new input values.

    • Calling GenericUDAFEvaluator.terminate() to get the final aggregated value from an existing aggregation state.

    • Calling GenericUDAFEvaluator.merge() to merge other aggregation states into an existing aggregation state.

      The existing aggregation state to be updated must be in this form.

    Conversions:

    • To form 2:

      GenericUDAFEvaluator.terminatePartial()

    • To form 3:

      Convert to form 2 first, and then to 3.

  2. An Object[] array containing two java.lang.Long values.

    The form used to interact with Hive's ObjectInspectors. This form is required by the following scenarios:

    • Calling GenericUDAFEvaluator.terminatePartial() to convert an existing aggregation state in form 1 to form 2.

    • Calling GenericUDAFEvaluator.merge() to merge other aggregation states into an existing aggregation state.

      The input aggregation state must be in this form.

    Conversions:

    • To form 1:

      No direct method. Have to create an empty AggregationBuffer and merge it into the empty buffer.

    • To form 3:

      unwrapperFor()/unwrap() method of HiveInspectors

  3. The byte array that holds data of an UnsafeRow with two LongType fields.

    The form used by Spark SQL to shuffle partial aggregation results. This form is required because TypedImperativeAggregate always asks its subclasses to serialize their aggregation states into a byte array.

    Conversions:

    • To form 1:

      Convert to form 2 first, and then to 1.

    • To form 2:

      wrapperFor()/wrap() method of HiveInspectors

Here're some micro-benchmark results produced by the most recent master and this PR branch.

Master:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    339 /  372          3.1         323.2       1.0X
w/ groupBy                                     503 /  529          2.1         479.7       0.7X

This PR:

Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    116 /  126          9.0         110.8       1.0X
w/ groupBy                                     151 /  159          6.9         144.0       0.8X

Benchmark code snippet:

  test("Hive UDAF benchmark") {
    val N = 1 << 20

    sparkSession.sql(s"CREATE TEMPORARY FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

    val benchmark = new Benchmark(
      name = "hive udaf vs spark af",
      valuesPerIteration = N,
      minNumIters = 5,
      warmupTime = 5.seconds,
      minTime = 5.seconds,
      outputPerIteration = true
    )

    benchmark.addCase("w/o groupBy") { _ =>
      sparkSession.range(N).agg("id" -> "hive_max").collect()
    }

    benchmark.addCase("w/ groupBy") { _ =>
      sparkSession.range(N).groupBy($"id" % 10).agg("id" -> "hive_max").collect()
    }

    benchmark.run()

    sparkSession.sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
  }

How was this patch tested?

New test suite HiveUDAFSuite is added.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67842 has finished for PR 15703 at commit c0029f1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67844 has finished for PR 15703 at commit 5a23a97.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@rxin
Copy link
Contributor

rxin commented Nov 1, 2016

cc @tejasapatil

@liancheng
Copy link
Contributor Author

I can't reproduce those test failures when executing failed test cases individually. Seems that it's related to execution order. Still investigating.

private lazy val partialResultInspector =
function.init(GenericUDAFEvaluator.Mode.PARTIAL1, inspectors)

// The following two lines initializes `function: GenericUDAFEvaluator` eagerly. These two fields
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This feels little weird. If you want eager init and not serialize, why not remove lazy and just keep it @transient ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I didn't check whether these two fields are really used on executor side. If not, it's true that we can remove the lazy modifier.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Confirmed that we can remove this hack. Thanks!

@tejasapatil
Copy link
Contributor

This will surely improve performance for the UDAFs where the data shrinks (eg. max as you pointed out). I am not sure if it would be better for UDAFs like GenericUDAFCollectSet, GenericUDAFCollectList where the aggregation does not shrink the data (it might be worse because of conversion cost ?).

It would be good to see some perf numbers to compare (native Spark UDFs) vs (before this change) vs (after the change).

@liancheng
Copy link
Contributor Author

liancheng commented Nov 1, 2016

@tejasapatil For collect_set and collect_list, we'll simply migrate them to TypedImperativeAggregate so that they become Spark native aggregate functions. We can also handle other built-in Hive UDAFs that share the same issue in this approach.

In the long run, this PR is mostly useful for running legacy UDAFs developed by the users rather than Hive built-in ones since we'd prefer to migrate all the built-in ones to Spark at last.

@liancheng
Copy link
Contributor Author

liancheng commented Nov 1, 2016

I found that I'm not handling bridged UDAFs properly, which caused a few test failures. Working on it.

@liancheng
Copy link
Contributor Author

@tejasapatil Another point that I'd like to add is that even if the performance for a single UDAF like GenericUDAFCollectList regresses, you still have performance gains if such UDAFs are used together with other aggregate functions within a single query. Before this PR, all the aggregate functions have to be evaluated without partial aggregation support in this case.

@tejasapatil
Copy link
Contributor

There is no doubt that this PR is not going to make things better. I'm already sold :)

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67910 has finished for PR 15703 at commit 88ef7ea.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

It turned out that I didn't initialize Hive UDAF evaluators properly.

Quoted from commit message of my previous commit:

Hive UDAFs are sensitive to aggregation mode, and must be initialized with proper modes before being used. Basically, it means that you can't use an evaluator initialized with mode PARTIAL1 to merge two aggregation states (although it still works for aggregate functions whose partial result type is the same as the final result type).

@SparkQA
Copy link

SparkQA commented Nov 1, 2016

Test build #67927 has finished for PR 15703 at commit 3af4f21.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 2, 2016

Test build #67930 has finished for PR 15703 at commit 6f28688.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 2, 2016

Test build #67938 has finished for PR 15703 at commit 1a2f4c7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

OK, now it's ready for review and merge.

cc @yhuai @JoshRosen @cloud-fan

override def eval(input: InternalRow): Any = unwrapper(function.evaluate(buffer))
private lazy val finalModeEvaluator = {
val parameterInfo = new SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
resolver.getEvaluator(parameterInfo)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These 2 lines are duplicated many times, should we abstract them to a method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, thanks!


private val partialResultWrapper = wrapperFor(partialResultInspector, partialResultDataType)

private val projection = UnsafeProjection.create(Array(partialResultDataType))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try to run hive udaf in spark shell? IIUC, we can't create unsafe projection inside UDAF

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does work as expected:

scala> sql("CREATE TEMPORARY FUNCTION hive_max AS 'org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax'")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.range(100).createOrReplaceTempView("t")

scala> sql("SELECT hive_max(id) FROM t").explain()
== Physical Plan ==
SortAggregate(key=[], functions=[hive_max(hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@144792d5), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- SortAggregate(key=[], functions=[partial_hive_max(hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax@144792d5), id#1L, false, 0, 0)])
      +- *Range (0, 100, step=1, splits=Some(8))

scala> sql("SELECT hive_max(id) FROM t").show()
+-------------+
|hive_max( id)|
+-------------+
|           99|
+-------------+

Why do you think UnsafeProjection can't be used here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried to use unsafe projection in percentile_approx before, but failed in spark shell, maybe it's a different problem, nvm.

@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68433 has finished for PR 15703 at commit 4c6284b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@liancheng
Copy link
Contributor Author

The last build failure was because of a logical conflict between this PR and the master branch. Resolving it.

@liancheng liancheng force-pushed the partial-agg-hive-udaf branch from 4c6284b to 3ffafe7 Compare November 10, 2016 19:31
@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68488 has finished for PR 15703 at commit 3ffafe7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Hive UDAFs are sensitive to aggregation mode, and must be initialized
with proper modes before being used. Basically, it means that you can't
use an evaluator initialized with mode PARTIAL1 to merge two aggregation
states (although it still works for aggregate functions whose partial
result type is the same as the final result type).
@liancheng liancheng force-pushed the partial-agg-hive-udaf branch from 3ffafe7 to b418cd7 Compare November 10, 2016 21:51
We're now using ObjectHashAggregateExec instead of SortAggregateExec to
evaluate Hive UDAFs.
@SparkQA
Copy link

SparkQA commented Nov 10, 2016

Test build #68491 has finished for PR 15703 at commit b418cd7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 11, 2016

Test build #68493 has finished for PR 15703 at commit e88db5c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@transient
private lazy val function = functionAndInspector._1
private lazy val partial1ModeEvaluator = newEvaluator()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to make it a lazy val since partialResultInspector is uses it right below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. It has to be transient and lazy so that it's also available on executor side since Hive UDAF evaluators are not serializable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok. I think in general we should avoid of using this pattern. If we have to use it now, let's explain it in the comment.


// We rely on Hive to check the input data types, so use `AnyDataType` here to bypass our
// catalyst type checking framework.
override def inputTypes: Seq[AbstractDataType] = children.map(_ => AnyDataType)

override def nullable: Boolean = true

override def supportsPartial: Boolean = false
override def supportsPartial: Boolean = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any Hive UDAF that does not support partial aggregation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so. Hive doesn't have an equivalent flag and all UDAFs inherently support partial aggregation since they have to implement callbacks of all phases.

partial1ModeEvaluator.getNewAggregationBuffer

@transient
private lazy val inputProjection = new InterpretedProjection(children)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why InterpretedProjection?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copied from the original code. Replaced with UnsafeProjection instead. Thanks.


override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
aggBufferSerDe.deserialize(bytes)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add docs to explain what these functions are doing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

override def merge(buffer: AggregationBuffer, input: AggregationBuffer): Unit = {
partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's explain what we are trying to do using partial1ModeEvaluator.terminatePartial(input).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment added.


private val mutableRow = new GenericInternalRow(1)

def serialize(buffer: AggregationBuffer): Array[Byte] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

test("customized Hive UDAF") {
val df = sql("SELECT mock(value) FROM t GROUP BY key % 2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about we also keep the key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}

test("built-in Hive UDAF") {
val df = sql("SELECT hive_max(key) FROM t GROUP BY key % 2")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also keep the key?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks.

* 1. a Spark SQL value, or
* 2. an instance of some concrete `GenericUDAFEvaluator.AggregationBuffer` class, or
* 3. a Java object that can be inspected using the `ObjectInspector` returned by the
* `GenericUDAFEvaluator.init()` method.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Besides of explaining what are these three formats, let's also explain when we will use each of them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(we can just put the pr description to here)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(is the doc below enough?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, added.

@transient
private lazy val inspectors = children.map(toInspector).toArray
private lazy val inputInspectors = children.map(toInspector).toArray
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add docs to explain when these internal vals are used (like which vals are needed for a given mode).


// Hive `ObjectInspector` used to inspect partial aggregation results.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial aggregation result is aggregation buffer, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea. They are those objects returned by terminatePartial(), which is the inspectable version of Hive AggregationBuffer.

// in the superclass because that will lead to initialization ordering issues.
override val inputAggBufferAttributes: Seq[AttributeReference] = Nil
@transient
private lazy val aggBufferSerDe: AggregationBufferSerDe = new AggregationBufferSerDe
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doc

}

// Helper class used to de/serialize Hive UDAF `AggregationBuffer` objects
private class AggregationBufferSerDe {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we take this class out from HiveUDAFFunction?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can, but it doesn't seem to be necessary. Make it a nested class also simplifies implementation since it has access to fields of the outer class.

val unsafeRow = projection(mutableRow)
val bytes = ByteBuffer.allocate(unsafeRow.getSizeInBytes)
unsafeRow.writeTo(bytes)
bytes.array()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use unsafeRow.getBytes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't they equivalent in this case? UnsafeRow.getBytes also performs some more checks that are not necessary here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but you also create an unnecessary ByteBuffer... as they are equivalent, doesn't unsafeRow.getBytes simpler?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually they are different. If the buffer type is fixed length, then the unsafeRow is just a fixed-length bytes array, and UnsafeRow.getBytes will just return that array, instead of copying the memory.

@SparkQA
Copy link

SparkQA commented Nov 16, 2016

Test build #68734 has finished for PR 15703 at commit ca3978c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Nov 16, 2016

Code changes looks good to me. Let's also do a benchmark to sanity check our implementation.

@yhuai
Copy link
Contributor

yhuai commented Nov 16, 2016

LGTM. Merging to master.

@liancheng
Copy link
Contributor Author

Thanks everyone for the review!

@asfgit asfgit closed this in 2ca8ae9 Nov 16, 2016
@liancheng liancheng deleted the partial-agg-hive-udaf branch November 21, 2016 20:27
liancheng added a commit to liancheng/spark that referenced this pull request Nov 22, 2016
ericl pushed a commit to ericl/spark that referenced this pull request Dec 30, 2016
…mperativeAggregate for partial aggregation support

While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates `HiveUDAFFunction`s to `TypedImperativeAggregate`, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states.

The following snippet shows the effect of this PR:

```scala
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax
sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

spark.range(100).createOrReplaceTempView("t")

// A query using both Spark SQL native `max` and Hive `max`
sql(s"SELECT max(id), hive_max(id) FROM t").explain()
```

Before this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax7475f57e), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- *Range (0, 100, step=1, splits=Some(1))
```

After this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- SortAggregate(key=[], functions=[partial_max(id#1L), partial_default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)])
      +- *Range (0, 100, step=1, splits=Some(1))
```

The tricky part of the PR is mostly about updating and passing around aggregation states of `HiveUDAFFunction`s since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testing `MockUDAF` added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time:

1. A `MockUDAFBuffer`, which is a concrete subclass of `GenericUDAFEvaluator.AggregationBuffer`

   The form used by Hive UDAF API. This form is required by the following scenarios:

   - Calling `GenericUDAFEvaluator.iterate()` to update an existing aggregation state with new input values.
   - Calling `GenericUDAFEvaluator.terminate()` to get the final aggregated value from an existing aggregation state.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state.

     The existing aggregation state to be updated must be in this form.

   Conversions:

   - To form 2:

     `GenericUDAFEvaluator.terminatePartial()`

   - To form 3:

     Convert to form 2 first, and then to 3.

2. An `Object[]` array containing two `java.lang.Long` values.

   The form used to interact with Hive's `ObjectInspector`s. This form is required by the following scenarios:

   - Calling `GenericUDAFEvaluator.terminatePartial()` to convert an existing aggregation state in form 1 to form 2.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state.

     The input aggregation state must be in this form.

   Conversions:

   - To form 1:

     No direct method. Have to create an empty `AggregationBuffer` and merge it into the empty buffer.

   - To form 3:

     `unwrapperFor()`/`unwrap()` method of `HiveInspectors`

3. The byte array that holds data of an `UnsafeRow` with two `LongType` fields.

   The form used by Spark SQL to shuffle partial aggregation results. This form is required because `TypedImperativeAggregate` always asks its subclasses to serialize their aggregation states into a byte array.

   Conversions:

   - To form 1:

     Convert to form 2 first, and then to 1.

   - To form 2:

     `wrapperFor()`/`wrap()` method of `HiveInspectors`

Here're some micro-benchmark results produced by the most recent master and this PR branch.

Master:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    339 /  372          3.1         323.2       1.0X
w/ groupBy                                     503 /  529          2.1         479.7       0.7X
```

This PR:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    116 /  126          9.0         110.8       1.0X
w/ groupBy                                     151 /  159          6.9         144.0       0.8X
```

Benchmark code snippet:

```scala
  test("Hive UDAF benchmark") {
    val N = 1 << 20

    sparkSession.sql(s"CREATE TEMPORARY FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

    val benchmark = new Benchmark(
      name = "hive udaf vs spark af",
      valuesPerIteration = N,
      minNumIters = 5,
      warmupTime = 5.seconds,
      minTime = 5.seconds,
      outputPerIteration = true
    )

    benchmark.addCase("w/o groupBy") { _ =>
      sparkSession.range(N).agg("id" -> "hive_max").collect()
    }

    benchmark.addCase("w/ groupBy") { _ =>
      sparkSession.range(N).groupBy($"id" % 10).agg("id" -> "hive_max").collect()
    }

    benchmark.run()

    sparkSession.sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
  }
```

New test suite `HiveUDAFSuite` is added.

Author: Cheng Lian <liandatabricks.com>

Closes apache#15703 from liancheng/partial-agg-hive-udaf.

Author: Cheng Lian <lian@databricks.com>

Closes apache#144 from yhuai/branch-2.1-hive-udaf.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…r partial aggregation support

## What changes were proposed in this pull request?

While being evaluated in Spark SQL, Hive UDAFs don't support partial aggregation. This PR migrates `HiveUDAFFunction`s to `TypedImperativeAggregate`, which already provides partial aggregation support for aggregate functions that may use arbitrary Java objects as aggregation states.

The following snippet shows the effect of this PR:

```scala
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax
sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

spark.range(100).createOrReplaceTempView("t")

// A query using both Spark SQL native `max` and Hive `max`
sql(s"SELECT max(id), hive_max(id) FROM t").explain()
```

Before this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax7475f57e), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- *Range (0, 100, step=1, splits=Some(1))
```

After this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- SortAggregate(key=[], functions=[partial_max(id#1L), partial_default.hive_max(default.hive_max, HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7), id#1L, false, 0, 0)])
      +- *Range (0, 100, step=1, splits=Some(1))
```

The tricky part of the PR is mostly about updating and passing around aggregation states of `HiveUDAFFunction`s since the aggregation state of a Hive UDAF may appear in three different forms. Let's take a look at the testing `MockUDAF` added in this PR as an example. This UDAF computes the count of non-null values together with the count of nulls of a given column. Its aggregation state may appear as the following forms at different time:

1. A `MockUDAFBuffer`, which is a concrete subclass of `GenericUDAFEvaluator.AggregationBuffer`

   The form used by Hive UDAF API. This form is required by the following scenarios:

   - Calling `GenericUDAFEvaluator.iterate()` to update an existing aggregation state with new input values.
   - Calling `GenericUDAFEvaluator.terminate()` to get the final aggregated value from an existing aggregation state.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state.

     The existing aggregation state to be updated must be in this form.

   Conversions:

   - To form 2:

     `GenericUDAFEvaluator.terminatePartial()`

   - To form 3:

     Convert to form 2 first, and then to 3.

2. An `Object[]` array containing two `java.lang.Long` values.

   The form used to interact with Hive's `ObjectInspector`s. This form is required by the following scenarios:

   - Calling `GenericUDAFEvaluator.terminatePartial()` to convert an existing aggregation state in form 1 to form 2.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states into an existing aggregation state.

     The input aggregation state must be in this form.

   Conversions:

   - To form 1:

     No direct method. Have to create an empty `AggregationBuffer` and merge it into the empty buffer.

   - To form 3:

     `unwrapperFor()`/`unwrap()` method of `HiveInspectors`

3. The byte array that holds data of an `UnsafeRow` with two `LongType` fields.

   The form used by Spark SQL to shuffle partial aggregation results. This form is required because `TypedImperativeAggregate` always asks its subclasses to serialize their aggregation states into a byte array.

   Conversions:

   - To form 1:

     Convert to form 2 first, and then to 1.

   - To form 2:

     `wrapperFor()`/`wrap()` method of `HiveInspectors`

Here're some micro-benchmark results produced by the most recent master and this PR branch.

Master:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    339 /  372          3.1         323.2       1.0X
w/ groupBy                                     503 /  529          2.1         479.7       0.7X
```

This PR:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
w/o groupBy                                    116 /  126          9.0         110.8       1.0X
w/ groupBy                                     151 /  159          6.9         144.0       0.8X
```

Benchmark code snippet:

```scala
  test("Hive UDAF benchmark") {
    val N = 1 << 20

    sparkSession.sql(s"CREATE TEMPORARY FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

    val benchmark = new Benchmark(
      name = "hive udaf vs spark af",
      valuesPerIteration = N,
      minNumIters = 5,
      warmupTime = 5.seconds,
      minTime = 5.seconds,
      outputPerIteration = true
    )

    benchmark.addCase("w/o groupBy") { _ =>
      sparkSession.range(N).agg("id" -> "hive_max").collect()
    }

    benchmark.addCase("w/ groupBy") { _ =>
      sparkSession.range(N).groupBy($"id" % 10).agg("id" -> "hive_max").collect()
    }

    benchmark.run()

    sparkSession.sql(s"DROP TEMPORARY FUNCTION IF EXISTS hive_max")
  }
```

## How was this patch tested?

New test suite `HiveUDAFSuite` is added.

Author: Cheng Lian <lian@databricks.com>

Closes apache#15703 from liancheng/partial-agg-hive-udaf.
// buffer in the 3rd format mentioned in the ScalaDoc of this class. Originally, Hive converts
// this `AggregationBuffer`s into this format before shuffling partial aggregation results, and
// calls `GenericUDAFEvaluator.terminatePartial()` to do the conversion.
partial2ModeEvaluator.merge(buffer, partial1ModeEvaluator.terminatePartial(input))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we follow the code flow from interfaces.scala, we see that the results of aggregation buffer mode in PARTIAL2 is merged with the aggregation buffer in PARTIAL1. I am new to Spark and Hive, so just wanted to know the reason behind the above behaviour. If there are any docs suggesting this, do let me know. Thank you.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants