-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18990][SQL] make DatasetBenchmark fairer for Dataset #16391
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,7 +17,6 @@ | |
|
||
package org.apache.spark.sql | ||
|
||
import org.apache.spark.{SparkConf, SparkContext} | ||
import org.apache.spark.sql.expressions.Aggregator | ||
import org.apache.spark.sql.expressions.scalalang.typed | ||
import org.apache.spark.sql.functions._ | ||
|
@@ -34,11 +33,13 @@ object DatasetBenchmark { | |
def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { | ||
import spark.implicits._ | ||
|
||
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) | ||
val rdd = spark.sparkContext.range(0, numRows) | ||
val ds = spark.range(0, numRows) | ||
val df = ds.toDF("l") | ||
val func = (l: Long) => l + 1 | ||
|
||
val benchmark = new Benchmark("back-to-back map", numRows) | ||
val func = (d: Data) => Data(d.l + 1, d.s) | ||
|
||
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) | ||
benchmark.addCase("RDD") { iter => | ||
var res = rdd | ||
var i = 0 | ||
|
@@ -53,14 +54,14 @@ object DatasetBenchmark { | |
var res = df | ||
var i = 0 | ||
while (i < numChains) { | ||
res = res.select($"l" + 1 as "l", $"s") | ||
res = res.select($"l" + 1 as "l") | ||
i += 1 | ||
} | ||
res.queryExecution.toRdd.foreach(_ => Unit) | ||
} | ||
|
||
benchmark.addCase("Dataset") { iter => | ||
var res = df.as[Data] | ||
var res = ds.as[Long] | ||
var i = 0 | ||
while (i < numChains) { | ||
res = res.map(func) | ||
|
@@ -75,14 +76,14 @@ object DatasetBenchmark { | |
def backToBackFilter(spark: SparkSession, numRows: Long, numChains: Int): Benchmark = { | ||
import spark.implicits._ | ||
|
||
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) | ||
val rdd = spark.sparkContext.range(0, numRows) | ||
val ds = spark.range(0, numRows) | ||
val df = ds.toDF("l") | ||
val func = (l: Long, i: Int) => l % (100L + i) == 0L | ||
val funcs = 0.until(numChains).map { i => (l: Long) => func(l, i) } | ||
|
||
val benchmark = new Benchmark("back-to-back filter", numRows) | ||
val func = (d: Data, i: Int) => d.l % (100L + i) == 0L | ||
val funcs = 0.until(numChains).map { i => | ||
(d: Data) => func(d, i) | ||
} | ||
|
||
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) | ||
benchmark.addCase("RDD") { iter => | ||
var res = rdd | ||
var i = 0 | ||
|
@@ -104,7 +105,7 @@ object DatasetBenchmark { | |
} | ||
|
||
benchmark.addCase("Dataset") { iter => | ||
var res = df.as[Data] | ||
var res = ds.as[Long] | ||
var i = 0 | ||
while (i < numChains) { | ||
res = res.filter(funcs(i)) | ||
|
@@ -133,24 +134,29 @@ object DatasetBenchmark { | |
def aggregate(spark: SparkSession, numRows: Long): Benchmark = { | ||
import spark.implicits._ | ||
|
||
val df = spark.range(1, numRows).select($"id".as("l"), $"id".cast(StringType).as("s")) | ||
val rdd = spark.sparkContext.range(0, numRows) | ||
val ds = spark.range(0, numRows) | ||
val df = ds.toDF("l") | ||
|
||
val benchmark = new Benchmark("aggregate", numRows) | ||
|
||
val rdd = spark.sparkContext.range(1, numRows).map(l => Data(l, l.toString)) | ||
benchmark.addCase("RDD sum") { iter => | ||
rdd.aggregate(0L)(_ + _.l, _ + _) | ||
rdd.map(l => (l % 10, l)).reduceByKey(_ + _).foreach(_ => Unit) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also test the grouping performance, not only aggregating. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason to add grouping operation? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. aggregate without grouping is not a common use case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i see There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I think we should also have a test case for aggregation without group by. |
||
} | ||
|
||
benchmark.addCase("DataFrame sum") { iter => | ||
df.select(sum($"l")).queryExecution.toRdd.foreach(_ => Unit) | ||
df.groupBy($"l" % 10).agg(sum($"l")).queryExecution.toRdd.foreach(_ => Unit) | ||
} | ||
|
||
benchmark.addCase("Dataset sum using Aggregator") { iter => | ||
df.as[Data].select(typed.sumLong((d: Data) => d.l)).queryExecution.toRdd.foreach(_ => Unit) | ||
val result = ds.as[Long].groupByKey(_ % 10).agg(typed.sumLong[Long](identity)) | ||
result.queryExecution.toRdd.foreach(_ => Unit) | ||
} | ||
|
||
val complexDs = df.select($"l", $"l".cast(StringType).as("s")).as[Data] | ||
benchmark.addCase("Dataset complex Aggregator") { iter => | ||
df.as[Data].select(ComplexAggregator.toColumn).queryExecution.toRdd.foreach(_ => Unit) | ||
val result = complexDs.groupByKey(_.l % 10).agg(ComplexAggregator.toColumn) | ||
result.queryExecution.toRdd.foreach(_ => Unit) | ||
} | ||
|
||
benchmark | ||
|
@@ -170,36 +176,39 @@ object DatasetBenchmark { | |
val benchmark3 = aggregate(spark, numRows) | ||
|
||
/* | ||
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 | ||
Intel Xeon E3-12xx v2 (Ivy Bridge) | ||
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 | ||
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz | ||
|
||
back-to-back map: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
RDD 3448 / 3646 29.0 34.5 1.0X | ||
DataFrame 2647 / 3116 37.8 26.5 1.3X | ||
Dataset 4781 / 5155 20.9 47.8 0.7X | ||
RDD 3963 / 3976 25.2 39.6 1.0X | ||
DataFrame 826 / 834 121.1 8.3 4.8X | ||
Dataset 5178 / 5198 19.3 51.8 0.8X | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. for "back-to-back map", the logic is so simple that the code generated by
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. IIUC, an signature of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the method signature in BTW, I think the best solution is to analyze the byte code(class file) of the lambda function, and turn it into expressions. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I noticed that Scala compiler automatically generates primitive version. Current Spark eventually calls primitive version thru generic version Here is a simple example. When we compile the following Dataset program, we can find that the following class is generated by scalac. Scalac automatically generates a primitive version Of course, I totally agree that the best solution is to analyze byte code and turn it into expression. This was already prototyped. Do you think it is good time to make this prototype more robust now? test("ds") {
val ds = sparkContext.parallelize((1 to 10), 1).toDS
ds.map(i => i * 7).show
}
$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
public static final long serialVersionUID;
public final int apply(int);
Code:
0: aload_0
1: iload_1
2: invokevirtual #18 // Method apply$mcII$sp:(I)I
5: ireturn
public int apply$mcII$sp(int);
Code:
0: iload_1
1: bipush 7
3: imul
4: ireturn
public final java.lang.Object apply(java.lang.Object);
Code:
0: aload_0
1: aload_1
2: invokestatic #29 // Method scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
5: invokevirtual #31 // Method apply:(I)I
8: invokestatic #35 // Method scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
11: areturn
public org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
Code:
0: aload_0
1: invokespecial #42 // Method scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
4: return
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah, scala compiler is smart! I think we can create a ticket to optimize this, i.e. call the primitive apply version, and update the benchmark result. For byte code analysis, let's discuss about it in the ticket later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sure, I will create a JIRA ticket for this optimization. For byte code analysis, let's restart discuss about it the JIRA entry. |
||
*/ | ||
benchmark.run() | ||
|
||
/* | ||
OpenJDK 64-Bit Server VM 1.8.0_91-b14 on Linux 3.10.0-327.18.2.el7.x86_64 | ||
Intel Xeon E3-12xx v2 (Ivy Bridge) | ||
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 | ||
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz | ||
|
||
back-to-back filter: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
RDD 1346 / 1618 74.3 13.5 1.0X | ||
DataFrame 59 / 72 1695.4 0.6 22.8X | ||
Dataset 2777 / 2805 36.0 27.8 0.5X | ||
RDD 533 / 587 187.6 5.3 1.0X | ||
DataFrame 79 / 91 1269.0 0.8 6.8X | ||
Dataset 550 / 559 181.7 5.5 1.0X | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For "back-to-back filter", |
||
*/ | ||
benchmark2.run() | ||
|
||
/* | ||
Java HotSpot(TM) 64-Bit Server VM 1.8.0_60-b27 on Mac OS X 10.12.1 | ||
Intel(R) Core(TM) i7-4960HQ CPU @ 2.60GHz | ||
|
||
aggregate: Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative | ||
------------------------------------------------------------------------------------------------ | ||
RDD sum 1913 / 1942 52.3 19.1 1.0X | ||
DataFrame sum 46 / 61 2157.7 0.5 41.3X | ||
Dataset sum using Aggregator 4656 / 4758 21.5 46.6 0.4X | ||
Dataset complex Aggregator 6636 / 7039 15.1 66.4 0.3X | ||
RDD sum 2297 / 2440 43.5 23.0 1.0X | ||
DataFrame sum 630 / 637 158.7 6.3 3.6X | ||
Dataset sum using Aggregator 3129 / 3247 32.0 31.3 0.7X | ||
Dataset complex Aggregator 12109 / 12142 8.3 121.1 0.2X | ||
*/ | ||
benchmark3.run() | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be good to update
aggregate
.