Skip to content

Commit

Permalink
[SPARK-32629][SQL] Track metrics of BitSet/OpenHashSet in full outer SHJ
Browse files Browse the repository at this point in the history
### What changes were proposed in this pull request?

This is followup from apache#29342, where to do two things:
* Per apache#29342 (comment), change from java `HashSet` to spark in-house `OpenHashSet` to track matched rows for non-unique join keys. I checked `OpenHashSet` implementation which is built from a key index (`OpenHashSet._bitset` as `BitSet`) and key array (`OpenHashSet._data` as `Array`). Java `HashSet` is built from `HashMap`, which stores value in `Node` linked list and by theory should have taken more memory than `OpenHashSet`. Reran the same benchmark query used in apache#29342, and verified the query has similar performance here between `HashSet` and `OpenHashSet`.
* Track metrics of the extra data structure `BitSet`/`OpenHashSet` for full outer SHJ. This depends on above thing, because there seems no easy way to get java `HashSet` memory size.

### Why are the changes needed?

To better surface the memory usage for full outer SHJ more accurately.
This can help users/developers to debug/improve full outer SHJ.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Added unite test in `SQLMetricsSuite.scala` .

Closes apache#29566 from c21/add-metrics.

Authored-by: Cheng Su <chengsu@fb.com>
Signed-off-by: Takeshi Yamamuro <yamamuro@apache.org>
  • Loading branch information
c21 authored and maropu committed Aug 29, 2020
1 parent ccc0250 commit cfe012a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution.joins

import java.util.concurrent.TimeUnit._

import scala.collection.mutable

import org.apache.spark.TaskContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
Expand All @@ -31,7 +29,7 @@ import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution.{RowIterator, SparkPlan}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.util.collection.BitSet
import org.apache.spark.util.collection.{BitSet, OpenHashSet}

/**
* Performs a hash join of two child relations by first shuffling the data using the join keys.
Expand Down Expand Up @@ -136,10 +134,10 @@ case class ShuffledHashJoinExec(
* Full outer shuffled hash join with unique join keys:
* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A `BitSet` is used to track matched rows with key index.
* A bit set is used to track matched rows with key index.
* 2. Process rows from build side by iterating hash relation.
* Filter out rows from build side being matched already,
* by checking key index from `BitSet`.
* by checking key index from bit set.
*/
private def fullOuterJoinWithUniqueKey(
streamIter: Iterator[InternalRow],
Expand All @@ -150,9 +148,8 @@ case class ShuffledHashJoinExec(
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow,
streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
// TODO(SPARK-32629):record metrics of extra BitSet/HashSet
// in full outer shuffled hash join
val matchedKeys = new BitSet(hashedRelation.maxNumKeysIndex)
longMetric("buildDataSize") += matchedKeys.capacity / 8

// Process stream side with looking up hash relation
val streamResultIter = streamIter.map { srow =>
Expand Down Expand Up @@ -198,11 +195,11 @@ case class ShuffledHashJoinExec(
* Full outer shuffled hash join with non-unique join keys:
* 1. Process rows from stream side by looking up hash relation.
* Mark the matched rows from build side be looked up.
* A `HashSet[Long]` is used to track matched rows with
* A [[OpenHashSet]] (Long) is used to track matched rows with
* key index (Int) and value index (Int) together.
* 2. Process rows from build side by iterating hash relation.
* Filter out rows from build side being matched already,
* by checking key index and value index from `HashSet`.
* by checking key index and value index from [[OpenHashSet]].
*
* The "value index" is defined as the index of the tuple in the chain
* of tuples having the same key. For example, if certain key is found thrice,
Expand All @@ -218,9 +215,15 @@ case class ShuffledHashJoinExec(
streamNullJoinRowWithBuild: => InternalRow => JoinedRow,
buildNullRow: GenericInternalRow,
streamNullRow: GenericInternalRow): Iterator[InternalRow] = {
// TODO(SPARK-32629):record metrics of extra BitSet/HashSet
// in full outer shuffled hash join
val matchedRows = new mutable.HashSet[Long]
val matchedRows = new OpenHashSet[Long]
TaskContext.get().addTaskCompletionListener[Unit](_ => {
// At the end of the task, update the task's memory usage for this
// [[OpenHashSet]] to track matched rows, which has two parts:
// [[OpenHashSet._bitset]] and [[OpenHashSet._data]].
val bitSetEstimatedSize = matchedRows.getBitSet.capacity / 8
val dataEstimatedSize = matchedRows.capacity * 8
longMetric("buildDataSize") += bitSetEstimatedSize + dataEstimatedSize
})

def markRowMatched(keyIndex: Int, valueIndex: Int): Unit = {
val rowIndex: Long = (keyIndex.toLong << 32) | valueIndex
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.{FilterExec, RangeExec, SparkPlan, WholeSt
import org.apache.spark.sql.execution.adaptive.DisableAdaptiveExecutionSuite
import org.apache.spark.sql.execution.aggregate.HashAggregateExec
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -363,6 +364,41 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}
}

test("SPARK-32629: ShuffledHashJoin(full outer) metrics") {
val uniqueLeftDf = Seq(("1", "1"), ("11", "11")).toDF("key", "value")
val nonUniqueLeftDf = Seq(("1", "1"), ("1", "2"), ("11", "11")).toDF("key", "value")
val rightDf = (1 to 10).map(i => (i.toString, i.toString)).toDF("key2", "value")
Seq(
// Test unique key on build side
(uniqueLeftDf, rightDf, 11, 134228048, 10, 134221824),
// Test non-unique key on build side
(nonUniqueLeftDf, rightDf, 12, 134228552, 11, 134221824)
).foreach { case (leftDf, rightDf, fojRows, fojBuildSize, rojRows, rojBuildSize) =>
val fojDf = leftDf.hint("shuffle_hash").join(
rightDf, $"key" === $"key2", "full_outer")
fojDf.collect()
val fojPlan = fojDf.queryExecution.executedPlan.collectFirst {
case s: ShuffledHashJoinExec => s
}
assert(fojPlan.isDefined, "The query plan should have shuffled hash join")
testMetricsInSparkPlanOperator(fojPlan.get,
Map("numOutputRows" -> fojRows, "buildDataSize" -> fojBuildSize))

// Test right outer join as well to verify build data size to be different
// from full outer join. This makes sure we take extra BitSet/OpenHashSet
// for full outer join into account.
val rojDf = leftDf.hint("shuffle_hash").join(
rightDf, $"key" === $"key2", "right_outer")
rojDf.collect()
val rojPlan = rojDf.queryExecution.executedPlan.collectFirst {
case s: ShuffledHashJoinExec => s
}
assert(rojPlan.isDefined, "The query plan should have shuffled hash join")
testMetricsInSparkPlanOperator(rojPlan.get,
Map("numOutputRows" -> rojRows, "buildDataSize" -> rojBuildSize))
}
}

test("BroadcastHashJoin(outer) metrics") {
val df1 = Seq((1, "a"), (1, "b"), (4, "c")).toDF("key", "value")
val df2 = Seq((1, "a"), (1, "b"), (2, "c"), (3, "d")).toDF("key2", "value")
Expand Down Expand Up @@ -686,16 +722,6 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
}

test("SPARK-28332: SQLMetric merge should handle -1 properly") {
def checkSparkPlanMetrics(plan: SparkPlan, expected: Map[String, Long]): Unit = {
expected.foreach { case (metricName: String, metricValue: Long) =>
assert(plan.metrics.contains(metricName), s"The query plan should have metric $metricName")
val actualMetric = plan.metrics.get(metricName).get
assert(actualMetric.value == metricValue,
s"The query plan metric $metricName did not match, " +
s"expected:$metricValue, actual:${actualMetric.value}")
}
}

val df = testData.join(testData2.filter('b === 0), $"key" === $"a", "left_outer")
df.collect()
val plan = df.queryExecution.executedPlan
Expand All @@ -706,7 +732,8 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils

assert(exchanges.size == 2, "The query plan should have two shuffle exchanges")

checkSparkPlanMetrics(exchanges(0), Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
checkSparkPlanMetrics(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0))
testMetricsInSparkPlanOperator(exchanges.head,
Map("dataSize" -> 3200, "shuffleRecordsWritten" -> 100))
testMetricsInSparkPlanOperator(exchanges(1), Map("dataSize" -> 0, "shuffleRecordsWritten" -> 0))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.TestUtils
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.{SparkPlan, SparkPlanInfo}
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SQLAppStatusStore}
import org.apache.spark.sql.internal.SQLConf.WHOLESTAGE_CODEGEN_ENABLED
import org.apache.spark.sql.test.SQLTestUtils
Expand Down Expand Up @@ -254,6 +254,24 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
}
}
}

/**
* Verify if the metrics in `SparkPlan` operator are same as expected metrics.
*
* @param plan `SparkPlan` operator to check metrics
* @param expectedMetrics the expected metrics. The format is `metric name -> metric value`.
*/
protected def testMetricsInSparkPlanOperator(
plan: SparkPlan,
expectedMetrics: Map[String, Long]): Unit = {
expectedMetrics.foreach { case (metricName: String, metricValue: Long) =>
assert(plan.metrics.contains(metricName), s"The query plan should have metric $metricName")
val actualMetric = plan.metrics(metricName)
assert(actualMetric.value == metricValue,
s"The query plan metric $metricName did not match, " +
s"expected:$metricValue, actual:${actualMetric.value}")
}
}
}


Expand Down

0 comments on commit cfe012a

Please sign in to comment.