Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SPARK-25299: add CI infrastructure and SortShuffleWriterBenchmark #498

Merged
merged 70 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
c7abec6
add initial bypass merge sort shuffle writer benchmarks
yifeih Feb 23, 2019
22ef648
dd unsafe shuffle writer benchmarks
yifeih Feb 25, 2019
4084e27
changes in bypassmergesort benchmarks
yifeih Feb 25, 2019
fb8266d
cleanup
yifeih Feb 26, 2019
89104e2
add circle script
yifeih Feb 26, 2019
b90b381
add this branch for testing
yifeih Feb 26, 2019
5e13dd8
fix circle attempt 1
yifeih Feb 26, 2019
845e645
checkout code
yifeih Feb 26, 2019
a68f459
add some caches?
yifeih Feb 26, 2019
757f6fe
why is it not pull caches...
yifeih Feb 26, 2019
0bcd5d9
save as artifact instead of publishing
yifeih Feb 26, 2019
26c01ec
mkdir
yifeih Feb 26, 2019
0d7a036
typo
yifeih Feb 26, 2019
3fc5331
try uploading artifacts again
yifeih Feb 26, 2019
8c33701
try print per iteration to avoid circle erroring out on idle
yifeih Feb 26, 2019
9546397
blah (#495)
yifeih Feb 27, 2019
d72ba73
make a PR comment
yifeih Feb 27, 2019
1859805
actually delete files
yifeih Feb 27, 2019
c20f0be
run benchmarks on test build branch
yifeih Feb 27, 2019
444d46a
oops forgot to enable upload
yifeih Feb 27, 2019
2322933
add sort shuffle writer benchmarks
yifeih Feb 27, 2019
da0d91c
add stdev
yifeih Feb 27, 2019
e590917
cleanup sort a bit
yifeih Feb 27, 2019
cbfdb99
fix stdev text
yifeih Feb 27, 2019
cbe38c6
fix sort shuffle
yifeih Feb 27, 2019
acdda71
initial code for read side
yifeih Feb 28, 2019
fd7a7c5
format
yifeih Feb 28, 2019
d82618b
use times and sample stdev
yifeih Feb 28, 2019
610ea1d
add assert for at least one iteration
yifeih Feb 28, 2019
295d7f3
cleanup shuffle write to use fewer mocks and single base interface
yifeih Mar 1, 2019
0c696dc
shuffle read works with transport client... needs lots of cleaning
yifeih Mar 1, 2019
323a296
test running in cicle
yifeih Mar 1, 2019
85836c2
scalastyle
yifeih Mar 1, 2019
b67d1f3
dont publish results yet
yifeih Mar 1, 2019
252963d
cleanup writer code
yifeih Mar 4, 2019
f72afb2
get only git message
yifeih Mar 4, 2019
3bcd35e
fix command to get PR number
yifeih Mar 4, 2019
d8b5d79
add SortshuffleWriterBenchmark
yifeih Mar 4, 2019
d9fb78a
writer code
yifeih Mar 4, 2019
b142951
cleanup
yifeih Mar 5, 2019
d0466b8
Merge remote-tracking branch 'origin' into yh/add-benchmarks-and-ci
yifeih Mar 5, 2019
f91dfad
fix benchmark script
yifeih Mar 5, 2019
5839b1d
use ArgumentMatchers
yifeih Mar 5, 2019
0b8c7ed
also in shufflewriterbenchmarkbase
yifeih Mar 5, 2019
d11f87f
scalastyle
yifeih Mar 5, 2019
6f2779f
add apache license
yifeih Mar 5, 2019
bbe9edc
fix some scale stuff
yifeih Mar 5, 2019
567d372
fix up tests
yifeih Mar 5, 2019
47c1938
only copy benchmarks we care about
yifeih Mar 5, 2019
e79ac28
increase size for reader again
yifeih Mar 5, 2019
c3858df
delete two writers and reader for PR
yifeih Mar 5, 2019
9d46fae
SPARK-25299: Add shuffle reader benchmarks (#506)
yifeih Mar 5, 2019
9f51758
Revert "SPARK-25299: Add shuffle reader benchmarks (#506)"
yifeih Mar 5, 2019
bcb09c5
add -e to bash script
yifeih Mar 5, 2019
25da723
blah
yifeih Mar 5, 2019
13703fa
enable upload as a PR comment and prevent running benchmarks on this …
yifeih Mar 6, 2019
e3751cd
Revert "enable upload as a PR comment and prevent running benchmarks …
yifeih Mar 6, 2019
33a1b72
try machine execution
yifeih Mar 6, 2019
fa1b96c
try uploading benchmarks (#498)
yifeih Mar 7, 2019
37cef1f
only upload results when merging into the feature branch
yifeih Mar 11, 2019
459e1b5
lock down machine image
yifeih Mar 12, 2019
4cabdbd
don't write input data to disk
yifeih Mar 13, 2019
47d2dcf
run benchmark test
yifeih Mar 13, 2019
c78e491
stop creating file cleanup threads for every block manager
yifeih Mar 13, 2019
f28b75c
use alphanumeric again
yifeih Mar 13, 2019
a85acf4
use a new random everytime
yifeih Mar 13, 2019
f26ab40
close the writers -__________-
yifeih Mar 13, 2019
103c660
delete branch and publish results as comment
yifeih Mar 13, 2019
c3e58c5
close in finally
yifeih Mar 14, 2019
96f1d0d
Merge branch 'yh/add-benchmarks-and-ci' of github.com:yifeih/spark in…
yifeih Mar 14, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@ defaults: &defaults
TERM: dumb
BUILD_SBT_CACHE: "/home/circleci/build-sbt-cache"

spark-25299-config: &spark-25299-config
machine:
image: circleci/classic:201808-01
environment: &defaults-environment
TERM: dumb
BUILD_SBT_CACHE: "/home/circleci/build-sbt-cache"

test-defaults: &test-defaults
<<: *defaults
Expand All @@ -23,6 +29,12 @@ all-branches-and-tags: &all-branches-and-tags
tags:
only: /.*/

spark-25299-branch-only: &spark-25299-branch-only
filters:
branches:
only:
- spark-25299

deployable-branches-and-tags: &deployable-branches-and-tags
filters:
tags:
Expand Down Expand Up @@ -452,6 +464,22 @@ jobs:
key: v1-maven-dependency-cache-versioned-{{ checksum "pom.xml" }}
paths: ~/.m2

run-spark-25299-benchmarks:
<<: *spark-25299-config
steps:
- *checkout-code
- attach_workspace:
at: .
- *restore-build-sbt-cache
- *link-in-build-sbt-cache
- *restore-ivy-cache
- *restore-build-binaries-cache
- *restore-home-sbt-cache
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if i'm using a superset or a subset of the useful caches or some random other combination... just sort of copied things that seemed related to sbt because that's what i'm running in this step

- run:
command: ./dev/run-spark-25299-benchmarks.sh -u
- store_artifacts:
path: /tmp/artifacts/

deploy-gradle:
<<: *defaults
docker:
Expand Down Expand Up @@ -512,6 +540,10 @@ workflows:
requires:
- build-sbt
<<: *all-branches-and-tags
- run-spark-25299-benchmarks:
requires:
- build-sbt
<<: *spark-25299-branch-only
- run-scala-tests:
requires:
- build-sbt
Expand Down
18 changes: 12 additions & 6 deletions core/src/test/scala/org/apache/spark/benchmark/Benchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,15 @@ private[spark] class Benchmark(
// The results are going to be processor specific so it is useful to include that.
out.println(Benchmark.getJVMOSInfo())
out.println(Benchmark.getProcessorName())
out.printf("%-40s %16s %12s %13s %10s\n", name + ":", "Best/Avg Time(ms)", "Rate(M/s)",
out.printf("%-40s %14s %14s %11s %12s %13s %10s\n", name + ":", "Best Time(ms)", "Avg Time(ms)", "Stdev(ms)", "Rate(M/s)",
"Per Row(ns)", "Relative")
out.println("-" * 96)
out.println("-" * 120)
results.zip(benchmarks).foreach { case (result, benchmark) =>
out.printf("%-40s %16s %12s %13s %10s\n",
out.printf("%-40s %14s %14s %11s %12s %13s %10s\n",
benchmark.name,
"%5.0f / %4.0f" format (result.bestMs, result.avgMs),
"%5.0f" format result.bestMs,
"%4.0f" format result.avgMs,
"%5.0f" format result.stdevMs,
"%10.1f" format result.bestRate,
"%6.1f" format (1000 / result.bestRate),
"%3.1fX" format (firstBest / result.bestMs))
Expand Down Expand Up @@ -156,9 +158,13 @@ private[spark] class Benchmark(
// scalastyle:off
println(s" Stopped after $i iterations, ${runTimes.sum / 1000000} ms")
// scalastyle:on
assert(runTimes.nonEmpty)
val best = runTimes.min
val avg = runTimes.sum / runTimes.size
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0)
val stdev = if (runTimes.size > 1) {
math.sqrt(runTimes.map(time => (time - avg) * (time - avg)).sum / (runTimes.size - 1))
} else 0
Result(avg / 1000000.0, num / (best / 1000.0), best / 1000000.0, stdev / 1000000.0)
}
}

Expand Down Expand Up @@ -191,7 +197,7 @@ private[spark] object Benchmark {
}

case class Case(name: String, fn: Timer => Unit, numIters: Int)
case class Result(avgMs: Double, bestRate: Double, bestMs: Double)
case class Result(avgMs: Double, bestRate: Double, bestMs: Double, stdevMs: Double)

/**
* This should return a user helpful processor information. Getting at this depends on the OS.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.shuffle.sort

import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream}
import java.util.UUID

import org.apache.commons.io.FileUtils
import org.mockito.{Mock, MockitoAnnotations}
import org.mockito.Answers.RETURNS_SMART_NULLS
import org.mockito.ArgumentMatchers.any
import org.mockito.Mockito.when
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.util.Random

import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkConf, TaskContext}
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager}
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv}
import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager}
import org.apache.spark.shuffle.IndexShuffleBlockResolver
import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId}
import org.apache.spark.util.Utils

abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase {

protected val DEFAULT_DATA_STRING_SIZE = 5

// This is only used in the writer constructors, so it's ok to mock
@Mock(answer = RETURNS_SMART_NULLS) protected var dependency:
ShuffleDependency[String, String, String] = _
// This is only used in the stop() function, so we can safely mock this without affecting perf
@Mock(answer = RETURNS_SMART_NULLS) protected var taskContext: TaskContext = _
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEnv: RpcEnv = _
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEndpointRef: RpcEndpointRef = _

protected val defaultConf: SparkConf = new SparkConf(loadDefaults = false)
protected val serializer: Serializer = new KryoSerializer(defaultConf)
protected val partitioner: HashPartitioner = new HashPartitioner(10)
protected val serializerManager: SerializerManager =
new SerializerManager(serializer, defaultConf)
protected val shuffleMetrics: TaskMetrics = new TaskMetrics

protected val tempFilesCreated: ArrayBuffer[File] = new ArrayBuffer[File]
protected val filenameToFile: mutable.Map[String, File] = new mutable.HashMap[String, File]

class TestDiskBlockManager(tempDir: File) extends DiskBlockManager(defaultConf, false) {
override def getFile(filename: String): File = {
if (filenameToFile.contains(filename)) {
filenameToFile(filename)
} else {
val outputFile = File.createTempFile("shuffle", null, tempDir)
filenameToFile(filename) = outputFile
outputFile
}
}

override def createTempShuffleBlock(): (TempShuffleBlockId, File) = {
var blockId = new TempShuffleBlockId(UUID.randomUUID())
val file = getFile(blockId)
tempFilesCreated += file
(blockId, file)
}
}

class TestBlockManager(tempDir: File, memoryManager: MemoryManager) extends BlockManager("0",
rpcEnv,
null,
serializerManager,
defaultConf,
memoryManager,
null,
null,
null,
null,
1) {
override val diskBlockManager = new TestDiskBlockManager(tempDir)
override val remoteBlockTempFileManager = null
}

protected var tempDir: File = _

protected var blockManager: BlockManager = _
protected var blockResolver: IndexShuffleBlockResolver = _

protected var memoryManager: TestMemoryManager = _
protected var taskMemoryManager: TaskMemoryManager = _

MockitoAnnotations.initMocks(this)
when(dependency.partitioner).thenReturn(partitioner)
when(dependency.serializer).thenReturn(serializer)
when(dependency.shuffleId).thenReturn(0)
when(taskContext.taskMetrics()).thenReturn(shuffleMetrics)
when(rpcEnv.setupEndpoint(any[String], any[RpcEndpoint])).thenReturn(rpcEndpointRef)

def setup(): Unit = {
memoryManager = new TestMemoryManager(defaultConf)
memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES)
taskMemoryManager = new TaskMemoryManager(memoryManager, 0)
tempDir = Utils.createTempDir()
blockManager = new TestBlockManager(tempDir, memoryManager)
blockResolver = new IndexShuffleBlockResolver(
defaultConf,
blockManager)
}

def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = {
benchmark.addTimerCase(name) { timer =>
setup()
func(timer)
teardown()
}
}

def teardown(): Unit = {
FileUtils.deleteDirectory(tempDir)
tempFilesCreated.clear()
filenameToFile.clear()
}

protected class DataIterator (size: Int)
extends Iterator[Product2[String, String]] {
val random = new Random(123)
var count = 0
override def hasNext: Boolean = {
count < size
}

override def next(): Product2[String, String] = {
count+=1
val string = random.alphanumeric.take(5).mkString
(string, string)
}
}


def createDataIterator(size: Int): DataIterator = {
new DataIterator(size)
}

}
Loading