Skip to content

Commit

Permalink
test PR apache#498 (take 1) (apache#501)
Browse files Browse the repository at this point in the history
  • Loading branch information
yifeih authored and bulldozer-bot[bot] committed Feb 27, 2019
1 parent 3084918 commit 33c0a8b
Show file tree
Hide file tree
Showing 4 changed files with 565 additions and 0 deletions.
29 changes: 29 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,13 @@ all-branches-and-tags: &all-branches-and-tags
tags:
only: /.*/

spark-25299-branch-only: &spark-25299-branch-only
filters:
branches:
only:
- spark-25299
- spark-25299-test-build

deployable-branches-and-tags: &deployable-branches-and-tags
filters:
tags:
Expand Down Expand Up @@ -452,6 +459,24 @@ jobs:
key: v1-maven-dependency-cache-versioned-{{ checksum "pom.xml" }}
paths: ~/.m2

run-spark-25299-benchmarks:
<<: *defaults
docker:
- image: palantirtechnologies/circle-spark-r:0.1.3
steps:
- *checkout-code
- attach_workspace:
at: .
- *restore-build-sbt-cache
- *link-in-build-sbt-cache
- *restore-ivy-cache
- *restore-build-binaries-cache
- *restore-home-sbt-cache
- run:
command: ./dev/run-spark-25299-benchmarks.sh
- store_artifacts:
path: /tmp/artifacts/

deploy-gradle:
<<: *defaults
docker:
Expand Down Expand Up @@ -512,6 +537,10 @@ workflows:
requires:
- build-sbt
<<: *all-branches-and-tags
- run-spark-25299-benchmarks:
requires:
- build-sbt
<<: *spark-25299-branch-only
- run-scala-tests:
requires:
- build-sbt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort

import java.io.File
import java.util.UUID

import org.apache.commons.io.FileUtils
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.Matchers.{any, anyInt}
import org.mockito.Mockito.{doAnswer, when}
import org.mockito.invocation.InvocationOnMock
import org.mockito.stubbing.Answer
import scala.collection.mutable
import scala.util.Random

import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkConf, TaskContext}
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.executor.{ShuffleWriteMetrics, TaskMetrics}
import org.apache.spark.serializer.{JavaSerializer, SerializerInstance, SerializerManager}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.storage.{BlockId, BlockManager, DiskBlockManager, DiskBlockObjectWriter, TempShuffleBlockId}
import org.apache.spark.util.Utils

/**
* Benchmark to measure performance for aggregate primitives.
* {{{
* To run this benchmark:
* 1. without sbt: bin/spark-submit --class <this class> <spark sql test jar>
* 2. build/sbt "sql/test:runMain <this class>"
* 3. generate result: SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "sql/test:runMain <this class>"
* Results will be written to "benchmarks/<this class>-results.txt".
* }}}
*/
object BypassMergeSortShuffleWriterBenchmark extends BenchmarkBase {

@Mock(answer = RETURNS_SMART_NULLS) private var blockManager: BlockManager = _
@Mock(answer = RETURNS_SMART_NULLS) private var diskBlockManager: DiskBlockManager = _
@Mock(answer = RETURNS_SMART_NULLS) private var taskContext: TaskContext = _
@Mock(answer = RETURNS_SMART_NULLS) private var blockResolver: IndexShuffleBlockResolver = _
@Mock(answer = RETURNS_SMART_NULLS) private var dependency:
ShuffleDependency[String, String, String] = _

private var tempDir: File = _
private var shuffleHandle: BypassMergeSortShuffleHandle[String, String] = _
private val blockIdToFileMap: mutable.Map[BlockId, File] = new mutable.HashMap[BlockId, File]
private val partitioner: HashPartitioner = new HashPartitioner(10)
private val defaultConf: SparkConf = new SparkConf(loadDefaults = false)
private val javaSerializer: JavaSerializer = new JavaSerializer(defaultConf)

private val MIN_NUM_ITERS = 10

def setup(transferTo: Boolean): BypassMergeSortShuffleWriter[String, String] = {
MockitoAnnotations.initMocks(this)
val conf = new SparkConf(loadDefaults = false)
conf.set("spark.file.transferTo", String.valueOf(transferTo))
conf.set("spark.shuffle.file.buffer", "32k")

if (shuffleHandle == null) {
shuffleHandle = new BypassMergeSortShuffleHandle[String, String](
shuffleId = 0,
numMaps = 1,
dependency = dependency
)
}

val taskMetrics = new TaskMetrics
when(dependency.partitioner).thenReturn(partitioner)
when(dependency.serializer).thenReturn(javaSerializer)
when(dependency.shuffleId).thenReturn(0)

// Create the temporary directory to write local shuffle and temp files
tempDir = Utils.createTempDir()
val outputFile = File.createTempFile("shuffle", null, tempDir)
// Final mapper data file output
when(blockResolver.getDataFile(0, 0)).thenReturn(outputFile)

// Create the temporary writers (backed by files), one for each partition.
when(blockManager.diskBlockManager).thenReturn(diskBlockManager)
when(diskBlockManager.createTempShuffleBlock()).thenAnswer(
(invocation: InvocationOnMock) => {
val blockId = new TempShuffleBlockId(UUID.randomUUID)
val file = new File(tempDir, blockId.name)
blockIdToFileMap.put(blockId, file)
(blockId, file)
})
when(blockManager.getDiskWriter(
any[BlockId],
any[File],
any[SerializerInstance],
anyInt(),
any[ShuffleWriteMetrics]
)).thenAnswer(new Answer[DiskBlockObjectWriter] {
override def answer(invocation: InvocationOnMock): DiskBlockObjectWriter = {
val args = invocation.getArguments
val manager = new SerializerManager(javaSerializer, conf)
new DiskBlockObjectWriter(
args(1).asInstanceOf[File],
manager,
args(2).asInstanceOf[SerializerInstance],
args(3).asInstanceOf[Int],
syncWrites = false,
args(4).asInstanceOf[ShuffleWriteMetrics],
blockId = args(0).asInstanceOf[BlockId]
)
}
})

// writing the index file
doAnswer(new Answer[Void] {
def answer(invocationOnMock: InvocationOnMock): Void = {
val tmp: File = invocationOnMock.getArguments()(3).asInstanceOf[File]
if (tmp != null) {
outputFile.delete
tmp.renameTo(outputFile)
}
null
}
}).when(blockResolver)
.writeIndexFileAndCommit(anyInt, anyInt, any(classOf[Array[Long]]), any(classOf[File]))
when(blockManager.diskBlockManager).thenReturn(diskBlockManager)

val shuffleWriter = new BypassMergeSortShuffleWriter[String, String](
blockManager,
blockResolver,
shuffleHandle,
0,
conf,
taskMetrics.shuffleWriteMetrics
)

shuffleWriter
}

def cleanupTempFiles(): Unit = {
FileUtils.deleteDirectory(tempDir)
}

def writeBenchmarkWithLargeDataset(): Unit = {
val size = 10000000
val random = new Random(123)
val data = (1 to size).map { i => {
val x = random.alphanumeric.take(5).mkString
Tuple2(x, x)
} }.toArray
val benchmark = new Benchmark(
"BypassMergeSortShuffleWrite (with spill) " + size,
size,
minNumIters = MIN_NUM_ITERS,
output = output)
benchmark.addTimerCase("without transferTo") { timer =>
val shuffleWriter = setup(false)
timer.startTiming()
shuffleWriter.write(data.iterator)
timer.stopTiming()
cleanupTempFiles()
}
benchmark.addTimerCase("with transferTo") { timer =>
val shuffleWriter = setup(true)
timer.startTiming()
shuffleWriter.write(data.iterator)
timer.stopTiming()
cleanupTempFiles()
}
benchmark.run()
}

def writeBenchmarkWithSmallDataset(): Unit = {
val size = 10000
val random = new Random(123)
val data = (1 to size).map { i => {
val x = random.alphanumeric.take(5).mkString
Tuple2(x, x)
} }.toArray
val benchmark = new Benchmark("BypassMergeSortShuffleWrite (in memory buffer) " + size,
size,
minNumIters = MIN_NUM_ITERS,
output = output)
benchmark.addTimerCase("small dataset without spills on disk") { timer =>
val shuffleWriter = setup(false)
timer.startTiming()
shuffleWriter.write(data.iterator)
timer.stopTiming()
cleanupTempFiles()
}
benchmark.run()
}

override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
runBenchmark("BypassMergeSortShuffleWriter write") {
writeBenchmarkWithSmallDataset()
writeBenchmarkWithLargeDataset()
}
}
}
Loading

0 comments on commit 33c0a8b

Please sign in to comment.