-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-7375] [SQL] Avoid row copying in exchange when sort.serializeMapOutputs takes effect #5948
Changes from 2 commits
ad006a4
6a6bfce
899e1d7
f305ff3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution | |
|
||
import org.apache.spark.annotation.DeveloperApi | ||
import org.apache.spark.shuffle.sort.SortShuffleManager | ||
import org.apache.spark.{SparkEnv, HashPartitioner, RangePartitioner} | ||
import org.apache.spark.{HashPartitioner, Partitioner, RangePartitioner, SparkEnv} | ||
import org.apache.spark.rdd.{RDD, ShuffledRDD} | ||
import org.apache.spark.serializer.Serializer | ||
import org.apache.spark.sql.{SQLContext, Row} | ||
|
@@ -59,12 +59,67 @@ case class Exchange( | |
|
||
override def output: Seq[Attribute] = child.output | ||
|
||
/** We must copy rows when sort based shuffle is on */ | ||
protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] | ||
private val sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] | ||
|
||
private val bypassMergeThreshold = | ||
child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) | ||
|
||
private val serializeMapOutputs = | ||
child.sqlContext.sparkContext.conf.getBoolean("spark.shuffle.sort.serializeMapOutputs", true) | ||
|
||
/** | ||
* Determines whether records must be defensively copied before being sent to the shuffle. | ||
* Several of Spark's shuffle components will buffer deserialized Java objects in memory. The | ||
* shuffle code assumes that objects are immutable and hence does not perform its own defensive | ||
* copying. In Spark SQL, however, operators' iterators return the same mutable `Row` object. In | ||
* order to properly shuffle the output of these operators, we need to perform our own copying | ||
* prior to sending records to the shuffle. This copying is expensive, so we try to avoid it | ||
* whenever possible. This method encapsulates the logic for choosing when to copy. | ||
* | ||
* In the long run, we might want to push this logic into core's shuffle APIs so that we don't | ||
* have to rely on knowledge of core internals here in SQL. | ||
* | ||
* See SPARK-2967, SPARK-4479, and SPARK-7375 for more discussion of this issue. | ||
* | ||
* @param partitioner the partitioner for the shuffle | ||
* @param serializer the serializer that will be used to write rows | ||
* @return true if rows should be copied before being shuffled, false otherwise | ||
*/ | ||
private def needToCopyObjectsBeforeShuffle( | ||
partitioner: Partitioner, | ||
serializer: Serializer): Boolean = { | ||
// Note: even though we only use the partitioner's `numPartitions` field, we require it to be | ||
// passed instead of directly passing the number of partitions in order to guard against | ||
// corner-cases where a partitioner constructed with `numPartitions` partitions may output | ||
// fewer partitions (like RangeParittioner, for example). | ||
if (newOrdering.nonEmpty) { | ||
// If a new ordering is required, then records will be sorted with Spark's `ExternalSorter`, | ||
// which requires a defensive copy. | ||
true | ||
} else if (sortBasedShuffleOn) { | ||
// Spark's sort-based shuffle also uses `ExternalSorter` to buffer records in memory. | ||
// However, there are two special cases where we can avoid the copy, described below: | ||
if (partitioner.numPartitions <= bypassMergeThreshold) { | ||
// If the number of output partitions is sufficiently small, then Spark will fall back to | ||
// the old hash-based shuffle write path which doesn't buffer deserialized records. | ||
// Note that we'll have to remove this case if we fix SPARK-6026 and remove this bypass. | ||
false | ||
} else if (serializeMapOutputs && serializer.supportsRelocationOfSerializedObjects) { | ||
// SPARK-4550 extended sort-based shuffle to serialize individual records prior to sorting | ||
// them. This optimization is guarded by a feature-flag and is only applied in cases where | ||
// shuffle dependency does not specify an ordering and the record serializer has certain | ||
// properties. If this optimization is enabled, we can safely avoid the copy. | ||
false | ||
} else { | ||
// None of the special cases held, so we must copy. | ||
true | ||
} | ||
} else { | ||
// We're using hash-based shuffle, so we don't need to copy. | ||
false | ||
} | ||
} | ||
|
||
private val keyOrdering = { | ||
if (newOrdering.nonEmpty) { | ||
val key = newPartitioning.keyExpressions | ||
|
@@ -81,7 +136,7 @@ case class Exchange( | |
|
||
@transient private lazy val sparkConf = child.sqlContext.sparkContext.getConf | ||
|
||
def serializer( | ||
private def getSerializer( | ||
keySchema: Array[DataType], | ||
valueSchema: Array[DataType], | ||
numPartitions: Int): Serializer = { | ||
|
@@ -123,17 +178,12 @@ case class Exchange( | |
override def execute(): RDD[Row] = attachTree(this , "execute") { | ||
newPartitioning match { | ||
case HashPartitioning(expressions, numPartitions) => | ||
// TODO: Eliminate redundant expressions in grouping key and value. | ||
// This is a workaround for SPARK-4479. When: | ||
// 1. sort based shuffle is on, and | ||
// 2. the partition number is under the merge threshold, and | ||
// 3. no ordering is required | ||
// we can avoid the defensive copies to improve performance. In the long run, we probably | ||
// want to include information in shuffle dependencies to indicate whether elements in the | ||
// source RDD should be copied. | ||
val willMergeSort = sortBasedShuffleOn && numPartitions > bypassMergeThreshold | ||
|
||
val rdd = if (willMergeSort || newOrdering.nonEmpty) { | ||
val keySchema = expressions.map(_.dataType).toArray | ||
val valueSchema = child.output.map(_.dataType).toArray | ||
val serializer = getSerializer(keySchema, valueSchema, numPartitions) | ||
val part = new HashPartitioner(numPartitions) | ||
|
||
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { | ||
child.execute().mapPartitions { iter => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @liancheng, I see that you added this comment but I'm a bit confused by it. It doesn't look like orderings are used anywhere in the body of this There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be honest, I'm also confused by this comment now. Especially, |
||
val hashExpressions = newMutableProjection(expressions, child.output)() | ||
iter.map(r => (hashExpressions(r).copy(), r.copy())) | ||
|
@@ -145,61 +195,61 @@ case class Exchange( | |
iter.map(r => mutablePair.update(hashExpressions(r), r)) | ||
} | ||
} | ||
val part = new HashPartitioner(numPartitions) | ||
val shuffled = | ||
if (newOrdering.nonEmpty) { | ||
new ShuffledRDD[Row, Row, Row](rdd, part).setKeyOrdering(keyOrdering) | ||
} else { | ||
new ShuffledRDD[Row, Row, Row](rdd, part) | ||
} | ||
val keySchema = expressions.map(_.dataType).toArray | ||
val valueSchema = child.output.map(_.dataType).toArray | ||
shuffled.setSerializer(serializer(keySchema, valueSchema, numPartitions)) | ||
|
||
val shuffled = new ShuffledRDD[Row, Row, Row](rdd, part) | ||
if (newOrdering.nonEmpty) { | ||
shuffled.setKeyOrdering(keyOrdering) | ||
} | ||
shuffled.setSerializer(serializer) | ||
shuffled.map(_._2) | ||
|
||
case RangePartitioning(sortingExpressions, numPartitions) => | ||
val rdd = if (sortBasedShuffleOn || newOrdering.nonEmpty) { | ||
child.execute().mapPartitions { iter => iter.map(row => (row.copy(), null))} | ||
} else { | ||
child.execute().mapPartitions { iter => | ||
val mutablePair = new MutablePair[Row, Null](null, null) | ||
iter.map(row => mutablePair.update(row, null)) | ||
val keySchema = child.output.map(_.dataType).toArray | ||
val serializer = getSerializer(keySchema, null, numPartitions) | ||
|
||
val childRdd = child.execute() | ||
val part: Partitioner = { | ||
// Internally, RangePartitioner runs a job on the RDD that samples keys to compute | ||
// partition bounds. To get accurate samples, we need to copy the mutable keys. | ||
val rddForSampling = childRdd.mapPartitions { iter => | ||
val mutablePair = new MutablePair[Row, Null]() | ||
iter.map(row => mutablePair.update(row.copy(), null)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In order to know how many partitions will be shuffled, we need to create the RangePartitioner and compute its partition bounds. Depending on the results of the sampling, we may end up with a partitioner that produces fewer than In the old code, the input to the range partitioner was |
||
} | ||
// TODO: RangePartitioner should take an Ordering. | ||
implicit val ordering = new RowOrdering(sortingExpressions, child.output) | ||
new RangePartitioner(numPartitions, rddForSampling, ascending = true) | ||
} | ||
|
||
// TODO: RangePartitioner should take an Ordering. | ||
implicit val ordering = new RowOrdering(sortingExpressions, child.output) | ||
|
||
val part = new RangePartitioner(numPartitions, rdd, ascending = true) | ||
val shuffled = | ||
if (newOrdering.nonEmpty) { | ||
new ShuffledRDD[Row, Null, Null](rdd, part).setKeyOrdering(keyOrdering) | ||
} else { | ||
new ShuffledRDD[Row, Null, Null](rdd, part) | ||
val rdd = if (needToCopyObjectsBeforeShuffle(part, serializer)) { | ||
childRdd.mapPartitions { iter => iter.map(row => (row.copy(), null))} | ||
} else { | ||
childRdd.mapPartitions { iter => | ||
val mutablePair = new MutablePair[Row, Null]() | ||
iter.map(row => mutablePair.update(row, null)) | ||
} | ||
val keySchema = child.output.map(_.dataType).toArray | ||
shuffled.setSerializer(serializer(keySchema, null, numPartitions)) | ||
} | ||
|
||
val shuffled = new ShuffledRDD[Row, Null, Null](rdd, part) | ||
if (newOrdering.nonEmpty) { | ||
shuffled.setKeyOrdering(keyOrdering) | ||
} | ||
shuffled.setSerializer(serializer) | ||
shuffled.map(_._1) | ||
|
||
case SinglePartition => | ||
// SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since | ||
// operators like `TakeOrdered` may require an ordering within the partition, and currently | ||
// `SinglePartition` doesn't include ordering information. | ||
// TODO Add `SingleOrderedPartition` for operators like `TakeOrdered` | ||
val rdd = if (sortBasedShuffleOn) { | ||
val valueSchema = child.output.map(_.dataType).toArray | ||
val serializer = getSerializer(null, valueSchema, 1) | ||
val partitioner = new HashPartitioner(1) | ||
|
||
val rdd = if (needToCopyObjectsBeforeShuffle(partitioner, serializer)) { | ||
child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } | ||
} else { | ||
child.execute().mapPartitions { iter => | ||
val mutablePair = new MutablePair[Null, Row]() | ||
iter.map(r => mutablePair.update(null, r)) | ||
} | ||
} | ||
val partitioner = new HashPartitioner(1) | ||
val shuffled = new ShuffledRDD[Null, Row, Row](rdd, partitioner) | ||
val valueSchema = child.output.map(_.dataType).toArray | ||
shuffled.setSerializer(serializer(null, valueSchema, 1)) | ||
shuffled.setSerializer(serializer) | ||
shuffled.map(_._2) | ||
|
||
case _ => sys.error(s"Exchange not implemented for $newPartitioning") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved the serializer creation calls a bit earlier here since we technically need to know whether the serializer supports object relocation in order to figure out whether
spark.shuffle.sort.serializeMapOutputs
will actually take effect. In theory, we might be able to avoid this check because SQL only uses SqlSerializer and SqlSerializer2, both of which support relocation. However, SqlSerializer extends KryoSerializer and there's a rare corner-case where user-provided Kryo registrators can change Kryo settings in a way that break's Kryo's relocatibility, causing the serialization to be bypassed. As a result, I think it's safer to create the serializer earlier and perform the full safety check.