Skip to content

Commit

Permalink
[SPARK-23375][SQL][FOLLOWUP][TEST] Test Sort metrics while Sort is mi…
Browse files Browse the repository at this point in the history
…ssing

## What changes were proposed in this pull request?
apache#20560/[SPARK-23375](https://issues.apache.org/jira/browse/SPARK-23375) introduced an optimizer rule to eliminate redundant Sort. For a test case named "Sort metrics" in `SQLMetricsSuite`, because range is already sorted, sort is removed by the `RemoveRedundantSorts`, which makes this test case meaningless.

This PR modifies the query for testing Sort metrics and checks Sort exists in the plan.

## How was this patch tested?
Modify the existing test case.

Closes apache#23258 from seancxmao/sort-metrics.

Authored-by: seancxmao <seancxmao@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
seancxmao authored and jackylee-ch committed Feb 18, 2019
1 parent 4c10479 commit 605a833
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -194,10 +194,20 @@ class SQLMetricsSuite extends SparkFunSuite with SQLMetricsTestUtils with Shared
}

test("Sort metrics") {
// Assume the execution plan is
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Sort(nodeId = 1))
val ds = spark.range(10).sort('id)
testSparkPlanMetrics(ds.toDF(), 2, Map.empty)
// Assume the execution plan with node id is
// Sort(nodeId = 0)
// Exchange(nodeId = 1)
// Project(nodeId = 2)
// LocalTableScan(nodeId = 3)
// Because of SPARK-25267, ConvertToLocalRelation is disabled in the test cases of sql/core,
// so Project here is not collapsed into LocalTableScan.
val df = Seq(1, 3, 2).toDF("id").sort('id)
testSparkPlanMetricsWithPredicates(df, 2, Map(
0L -> (("Sort", Map(
"sort time total (min, med, max)" -> {_.toString.matches(timingMetricPattern)},
"peak memory total (min, med, max)" -> {_.toString.matches(sizeMetricPattern)},
"spill size total (min, med, max)" -> {_.toString.matches(sizeMetricPattern)})))
))
}

test("SortMergeJoin metrics") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,18 @@ trait SQLMetricsTestUtils extends SQLTestUtils {

protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore

// Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
protected val sizeMetricPattern = {
val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
s"\\n$bytes \\($bytes, $bytes, $bytes\\)"
}

// Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
protected val timingMetricPattern = {
val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
s"\\n$duration \\($duration, $duration, $duration\\)"
}

/**
* Get execution metrics for the SQL execution and verify metrics values.
*
Expand Down Expand Up @@ -185,15 +197,34 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
val optActualMetrics = getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetrics.keySet)
val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) =>
(nodeName, nodeMetrics.mapValues(expectedMetricValue =>
(actualMetricValue: Any) => expectedMetricValue.toString === actualMetricValue))
}
testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates)
}

/**
* Call `df.collect()` and verify if the collected metrics satisfy the specified predicates.
* @param df `DataFrame` to run
* @param expectedNumOfJobs number of jobs that will run
* @param expectedMetricsPredicates the expected metrics predicates. The format is
* `nodeId -> (operatorName, metric name -> metric predicate)`.
*/
protected def testSparkPlanMetricsWithPredicates(
df: DataFrame,
expectedNumOfJobs: Int,
expectedMetricsPredicates: Map[Long, (String, Map[String, Any => Boolean])]): Unit = {
val optActualMetrics =
getSparkPlanMetrics(df, expectedNumOfJobs, expectedMetricsPredicates.keySet)
optActualMetrics.foreach { actualMetrics =>
assert(expectedMetrics.keySet === actualMetrics.keySet)
for (nodeId <- expectedMetrics.keySet) {
val (expectedNodeName, expectedMetricsMap) = expectedMetrics(nodeId)
assert(expectedMetricsPredicates.keySet === actualMetrics.keySet)
for ((nodeId, (expectedNodeName, expectedMetricsPredicatesMap))
<- expectedMetricsPredicates) {
val (actualNodeName, actualMetricsMap) = actualMetrics(nodeId)
assert(expectedNodeName === actualNodeName)
for (metricName <- expectedMetricsMap.keySet) {
assert(expectedMetricsMap(metricName).toString === actualMetricsMap(metricName))
for ((metricName, metricPredicate) <- expectedMetricsPredicatesMap) {
assert(metricPredicate(actualMetricsMap(metricName)))
}
}
}
Expand Down

0 comments on commit 605a833

Please sign in to comment.