forked from apache/spark
-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-25299: add CI infrastructure and SortShuffleWriterBenchmark #498
Merged
mccheah
merged 70 commits into
palantir:spark-25299
from
yifeih:yh/add-benchmarks-and-ci
Mar 14, 2019
Merged
Changes from all commits
Commits
Show all changes
70 commits
Select commit
Hold shift + click to select a range
c7abec6
add initial bypass merge sort shuffle writer benchmarks
yifeih 22ef648
dd unsafe shuffle writer benchmarks
yifeih 4084e27
changes in bypassmergesort benchmarks
yifeih fb8266d
cleanup
yifeih 89104e2
add circle script
yifeih b90b381
add this branch for testing
yifeih 5e13dd8
fix circle attempt 1
yifeih 845e645
checkout code
yifeih a68f459
add some caches?
yifeih 757f6fe
why is it not pull caches...
yifeih 0bcd5d9
save as artifact instead of publishing
yifeih 26c01ec
mkdir
yifeih 0d7a036
typo
yifeih 3fc5331
try uploading artifacts again
yifeih 8c33701
try print per iteration to avoid circle erroring out on idle
yifeih 9546397
blah (#495)
yifeih d72ba73
make a PR comment
yifeih 1859805
actually delete files
yifeih c20f0be
run benchmarks on test build branch
yifeih 444d46a
oops forgot to enable upload
yifeih 2322933
add sort shuffle writer benchmarks
yifeih da0d91c
add stdev
yifeih e590917
cleanup sort a bit
yifeih cbfdb99
fix stdev text
yifeih cbe38c6
fix sort shuffle
yifeih acdda71
initial code for read side
yifeih fd7a7c5
format
yifeih d82618b
use times and sample stdev
yifeih 610ea1d
add assert for at least one iteration
yifeih 295d7f3
cleanup shuffle write to use fewer mocks and single base interface
yifeih 0c696dc
shuffle read works with transport client... needs lots of cleaning
yifeih 323a296
test running in cicle
yifeih 85836c2
scalastyle
yifeih b67d1f3
dont publish results yet
yifeih 252963d
cleanup writer code
yifeih f72afb2
get only git message
yifeih 3bcd35e
fix command to get PR number
yifeih d8b5d79
add SortshuffleWriterBenchmark
yifeih d9fb78a
writer code
yifeih b142951
cleanup
yifeih d0466b8
Merge remote-tracking branch 'origin' into yh/add-benchmarks-and-ci
yifeih f91dfad
fix benchmark script
yifeih 5839b1d
use ArgumentMatchers
yifeih 0b8c7ed
also in shufflewriterbenchmarkbase
yifeih d11f87f
scalastyle
yifeih 6f2779f
add apache license
yifeih bbe9edc
fix some scale stuff
yifeih 567d372
fix up tests
yifeih 47c1938
only copy benchmarks we care about
yifeih e79ac28
increase size for reader again
yifeih c3858df
delete two writers and reader for PR
yifeih 9d46fae
SPARK-25299: Add shuffle reader benchmarks (#506)
yifeih 9f51758
Revert "SPARK-25299: Add shuffle reader benchmarks (#506)"
yifeih bcb09c5
add -e to bash script
yifeih 25da723
blah
yifeih 13703fa
enable upload as a PR comment and prevent running benchmarks on this …
yifeih e3751cd
Revert "enable upload as a PR comment and prevent running benchmarks …
yifeih 33a1b72
try machine execution
yifeih fa1b96c
try uploading benchmarks (#498)
yifeih 37cef1f
only upload results when merging into the feature branch
yifeih 459e1b5
lock down machine image
yifeih 4cabdbd
don't write input data to disk
yifeih 47d2dcf
run benchmark test
yifeih c78e491
stop creating file cleanup threads for every block manager
yifeih f28b75c
use alphanumeric again
yifeih a85acf4
use a new random everytime
yifeih f26ab40
close the writers -__________-
yifeih 103c660
delete branch and publish results as comment
yifeih c3e58c5
close in finally
yifeih 96f1d0d
Merge branch 'yh/add-benchmarks-and-ci' of github.com:yifeih/spark in…
yifeih File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
158 changes: 158 additions & 0 deletions
158
core/src/test/scala/org/apache/spark/shuffle/sort/ShuffleWriterBenchmarkBase.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,158 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.shuffle.sort | ||
|
||
import java.io.{BufferedInputStream, Closeable, File, FileInputStream, FileOutputStream} | ||
import java.util.UUID | ||
|
||
import org.apache.commons.io.FileUtils | ||
import org.mockito.{Mock, MockitoAnnotations} | ||
import org.mockito.Answers.RETURNS_SMART_NULLS | ||
import org.mockito.ArgumentMatchers.any | ||
import org.mockito.Mockito.when | ||
import scala.collection.mutable | ||
import scala.collection.mutable.ArrayBuffer | ||
import scala.util.Random | ||
|
||
import org.apache.spark.{HashPartitioner, ShuffleDependency, SparkConf, TaskContext} | ||
import org.apache.spark.benchmark.{Benchmark, BenchmarkBase} | ||
import org.apache.spark.executor.TaskMetrics | ||
import org.apache.spark.memory.{MemoryManager, TaskMemoryManager, TestMemoryManager} | ||
import org.apache.spark.rpc.{RpcEndpoint, RpcEndpointRef, RpcEnv} | ||
import org.apache.spark.serializer.{KryoSerializer, Serializer, SerializerManager} | ||
import org.apache.spark.shuffle.IndexShuffleBlockResolver | ||
import org.apache.spark.storage.{BlockManager, DiskBlockManager, TempShuffleBlockId} | ||
import org.apache.spark.util.Utils | ||
|
||
abstract class ShuffleWriterBenchmarkBase extends BenchmarkBase { | ||
|
||
protected val DEFAULT_DATA_STRING_SIZE = 5 | ||
|
||
// This is only used in the writer constructors, so it's ok to mock | ||
@Mock(answer = RETURNS_SMART_NULLS) protected var dependency: | ||
ShuffleDependency[String, String, String] = _ | ||
// This is only used in the stop() function, so we can safely mock this without affecting perf | ||
@Mock(answer = RETURNS_SMART_NULLS) protected var taskContext: TaskContext = _ | ||
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEnv: RpcEnv = _ | ||
@Mock(answer = RETURNS_SMART_NULLS) protected var rpcEndpointRef: RpcEndpointRef = _ | ||
|
||
protected val defaultConf: SparkConf = new SparkConf(loadDefaults = false) | ||
protected val serializer: Serializer = new KryoSerializer(defaultConf) | ||
protected val partitioner: HashPartitioner = new HashPartitioner(10) | ||
protected val serializerManager: SerializerManager = | ||
new SerializerManager(serializer, defaultConf) | ||
protected val shuffleMetrics: TaskMetrics = new TaskMetrics | ||
|
||
protected val tempFilesCreated: ArrayBuffer[File] = new ArrayBuffer[File] | ||
protected val filenameToFile: mutable.Map[String, File] = new mutable.HashMap[String, File] | ||
|
||
class TestDiskBlockManager(tempDir: File) extends DiskBlockManager(defaultConf, false) { | ||
override def getFile(filename: String): File = { | ||
if (filenameToFile.contains(filename)) { | ||
filenameToFile(filename) | ||
} else { | ||
val outputFile = File.createTempFile("shuffle", null, tempDir) | ||
filenameToFile(filename) = outputFile | ||
outputFile | ||
} | ||
} | ||
|
||
override def createTempShuffleBlock(): (TempShuffleBlockId, File) = { | ||
var blockId = new TempShuffleBlockId(UUID.randomUUID()) | ||
val file = getFile(blockId) | ||
tempFilesCreated += file | ||
(blockId, file) | ||
} | ||
} | ||
|
||
class TestBlockManager(tempDir: File, memoryManager: MemoryManager) extends BlockManager("0", | ||
rpcEnv, | ||
null, | ||
serializerManager, | ||
defaultConf, | ||
memoryManager, | ||
null, | ||
null, | ||
null, | ||
null, | ||
1) { | ||
override val diskBlockManager = new TestDiskBlockManager(tempDir) | ||
override val remoteBlockTempFileManager = null | ||
} | ||
|
||
protected var tempDir: File = _ | ||
|
||
protected var blockManager: BlockManager = _ | ||
protected var blockResolver: IndexShuffleBlockResolver = _ | ||
|
||
protected var memoryManager: TestMemoryManager = _ | ||
protected var taskMemoryManager: TaskMemoryManager = _ | ||
|
||
MockitoAnnotations.initMocks(this) | ||
when(dependency.partitioner).thenReturn(partitioner) | ||
when(dependency.serializer).thenReturn(serializer) | ||
when(dependency.shuffleId).thenReturn(0) | ||
when(taskContext.taskMetrics()).thenReturn(shuffleMetrics) | ||
when(rpcEnv.setupEndpoint(any[String], any[RpcEndpoint])).thenReturn(rpcEndpointRef) | ||
|
||
def setup(): Unit = { | ||
memoryManager = new TestMemoryManager(defaultConf) | ||
memoryManager.limit(PackedRecordPointer.MAXIMUM_PAGE_SIZE_BYTES) | ||
taskMemoryManager = new TaskMemoryManager(memoryManager, 0) | ||
tempDir = Utils.createTempDir() | ||
blockManager = new TestBlockManager(tempDir, memoryManager) | ||
blockResolver = new IndexShuffleBlockResolver( | ||
defaultConf, | ||
blockManager) | ||
} | ||
|
||
def addBenchmarkCase(benchmark: Benchmark, name: String)(func: Benchmark.Timer => Unit): Unit = { | ||
benchmark.addTimerCase(name) { timer => | ||
setup() | ||
func(timer) | ||
teardown() | ||
} | ||
} | ||
|
||
def teardown(): Unit = { | ||
FileUtils.deleteDirectory(tempDir) | ||
tempFilesCreated.clear() | ||
filenameToFile.clear() | ||
} | ||
|
||
protected class DataIterator (size: Int) | ||
extends Iterator[Product2[String, String]] { | ||
val random = new Random(123) | ||
var count = 0 | ||
override def hasNext: Boolean = { | ||
count < size | ||
} | ||
|
||
override def next(): Product2[String, String] = { | ||
count+=1 | ||
val string = random.alphanumeric.take(5).mkString | ||
(string, string) | ||
} | ||
} | ||
|
||
|
||
def createDataIterator(size: Int): DataIterator = { | ||
new DataIterator(size) | ||
} | ||
|
||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not sure if i'm using a superset or a subset of the useful caches or some random other combination... just sort of copied things that seemed related to sbt because that's what i'm running in this step