From 6906b69cf568015f20c7d7c77cbcba650e5431a9 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Thu, 7 Aug 2014 18:04:49 -0700 Subject: [PATCH 01/11] SPARK-2787: Make sort-based shuffle write files directly when there's no sorting/aggregation and # partitions is small As described in https://issues.apache.org/jira/browse/SPARK-2787, right now sort-based shuffle is more expensive than hash-based for map operations that do no partial aggregation or sorting, such as groupByKey. This is because it has to serialize each data item twice (once when spilling to intermediate files, and then again when merging these files object-by-object). This patch adds a code path to just write separate files directly if the # of output partitions is small, and concatenate them at the end to produce a sorted file. On the unit test side, I added some tests that force or don't force this bypass path to be used, and checked that our tests for other features (e.g. all the operations) cover both cases. Author: Matei Zaharia Closes #1799 from mateiz/SPARK-2787 and squashes the following commits: 88cf26a [Matei Zaharia] Fix rebase 10233af [Matei Zaharia] Review comments 398cb95 [Matei Zaharia] Fix looking up shuffle manager in conf ca3efd9 [Matei Zaharia] Add docs for shuffle manager properties, and allow short names for them d0ae3c5 [Matei Zaharia] Fix some comments 90d084f [Matei Zaharia] Add code path to bypass merge-sort in ExternalSorter, and tests 31e5d7c [Matei Zaharia] Move existing logic for writing partitioned files into ExternalSorter --- .../scala/org/apache/spark/SparkEnv.scala | 27 +- .../shuffle/hash/HashShuffleReader.scala | 2 +- .../shuffle/sort/SortShuffleWriter.scala | 80 ++---- .../util/collection/ExternalSorter.scala | 233 +++++++++++++++--- .../util/collection/ExternalSorterSuite.scala | 165 +++++++++++-- docs/configuration.md | 18 ++ 6 files changed, 407 insertions(+), 118 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 9d4edeb6d96cf..22d8d1cb1ddcf 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -156,11 +156,9 @@ object SparkEnv extends Logging { conf.set("spark.driver.port", boundPort.toString) } - // Create an instance of the class named by the given Java system property, or by - // defaultClassName if the property is not set, and return it as a T - def instantiateClass[T](propertyName: String, defaultClassName: String): T = { - val name = conf.get(propertyName, defaultClassName) - val cls = Class.forName(name, true, Utils.getContextOrSparkClassLoader) + // Create an instance of the class with the given name, possibly initializing it with our conf + def instantiateClass[T](className: String): T = { + val cls = Class.forName(className, true, Utils.getContextOrSparkClassLoader) // Look for a constructor taking a SparkConf and a boolean isDriver, then one taking just // SparkConf, then one taking no arguments try { @@ -178,11 +176,17 @@ object SparkEnv extends Logging { } } - val serializer = instantiateClass[Serializer]( + // Create an instance of the class named by the given SparkConf property, or defaultClassName + // if the property is not set, possibly initializing it with our conf + def instantiateClassFromConf[T](propertyName: String, defaultClassName: String): T = { + instantiateClass[T](conf.get(propertyName, defaultClassName)) + } + + val serializer = instantiateClassFromConf[Serializer]( "spark.serializer", "org.apache.spark.serializer.JavaSerializer") logDebug(s"Using serializer: ${serializer.getClass}") - val closureSerializer = instantiateClass[Serializer]( + val closureSerializer = instantiateClassFromConf[Serializer]( "spark.closure.serializer", "org.apache.spark.serializer.JavaSerializer") def registerOrLookup(name: String, newActor: => Actor): ActorRef = { @@ -246,8 +250,13 @@ object SparkEnv extends Logging { "." } - val shuffleManager = instantiateClass[ShuffleManager]( - "spark.shuffle.manager", "org.apache.spark.shuffle.hash.HashShuffleManager") + // Let the user specify short names for shuffle managers + val shortShuffleMgrNames = Map( + "hash" -> "org.apache.spark.shuffle.hash.HashShuffleManager", + "sort" -> "org.apache.spark.shuffle.sort.SortShuffleManager") + val shuffleMgrName = conf.get("spark.shuffle.manager", "hash") + val shuffleMgrClass = shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase, shuffleMgrName) + val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass) val shuffleMemoryManager = new ShuffleMemoryManager(conf) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 7c9dc8e5f88ef..88a5f1e5ddf58 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -58,7 +58,7 @@ private[spark] class HashShuffleReader[K, C]( // Create an ExternalSorter to sort the data. Note that if spark.shuffle.spill is disabled, // the ExternalSorter won't spill to disk. val sorter = new ExternalSorter[K, C, C](ordering = Some(keyOrd), serializer = Some(ser)) - sorter.write(aggregatedIter) + sorter.insertAll(aggregatedIter) context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled sorter.iterator diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index e54e6383d2ccc..22f656fa371ea 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -44,6 +44,7 @@ private[spark] class SortShuffleWriter[K, V, C]( private var sorter: ExternalSorter[K, V, _] = null private var outputFile: File = null + private var indexFile: File = null // Are we in the process of stopping? Because map tasks can call stop() with success = true // and then call stop() with success = false if they get an exception, we want to make sure @@ -57,78 +58,36 @@ private[spark] class SortShuffleWriter[K, V, C]( /** Write a bunch of records to this task's output */ override def write(records: Iterator[_ <: Product2[K, V]]): Unit = { - // Get an iterator with the elements for each partition ID - val partitions: Iterator[(Int, Iterator[Product2[K, _]])] = { - if (dep.mapSideCombine) { - if (!dep.aggregator.isDefined) { - throw new IllegalStateException("Aggregator is empty for map-side combine") - } - sorter = new ExternalSorter[K, V, C]( - dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) - sorter.write(records) - sorter.partitionedIterator - } else { - // In this case we pass neither an aggregator nor an ordering to the sorter, because we - // don't care whether the keys get sorted in each partition; that will be done on the - // reduce side if the operation being run is sortByKey. - sorter = new ExternalSorter[K, V, V]( - None, Some(dep.partitioner), None, dep.serializer) - sorter.write(records) - sorter.partitionedIterator + if (dep.mapSideCombine) { + if (!dep.aggregator.isDefined) { + throw new IllegalStateException("Aggregator is empty for map-side combine") } + sorter = new ExternalSorter[K, V, C]( + dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer) + sorter.insertAll(records) + } else { + // In this case we pass neither an aggregator nor an ordering to the sorter, because we don't + // care whether the keys get sorted in each partition; that will be done on the reduce side + // if the operation being run is sortByKey. + sorter = new ExternalSorter[K, V, V]( + None, Some(dep.partitioner), None, dep.serializer) + sorter.insertAll(records) } // Create a single shuffle file with reduce ID 0 that we'll write all results to. We'll later // serve different ranges of this file using an index file that we create at the end. val blockId = ShuffleBlockId(dep.shuffleId, mapId, 0) - outputFile = blockManager.diskBlockManager.getFile(blockId) - - // Track location of each range in the output file - val offsets = new Array[Long](numPartitions + 1) - val lengths = new Array[Long](numPartitions) - - for ((id, elements) <- partitions) { - if (elements.hasNext) { - val writer = blockManager.getDiskWriter(blockId, outputFile, ser, fileBufferSize, - writeMetrics) - for (elem <- elements) { - writer.write(elem) - } - writer.commitAndClose() - val segment = writer.fileSegment() - offsets(id + 1) = segment.offset + segment.length - lengths(id) = segment.length - } else { - // The partition is empty; don't create a new writer to avoid writing headers, etc - offsets(id + 1) = offsets(id) - } - } - - context.taskMetrics.memoryBytesSpilled += sorter.memoryBytesSpilled - context.taskMetrics.diskBytesSpilled += sorter.diskBytesSpilled - // Write an index file with the offsets of each block, plus a final offset at the end for the - // end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure - // out where each block begins and ends. + outputFile = blockManager.diskBlockManager.getFile(blockId) + indexFile = blockManager.diskBlockManager.getFile(blockId.name + ".index") - val diskBlockManager = blockManager.diskBlockManager - val indexFile = diskBlockManager.getFile(blockId.name + ".index") - val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) - try { - var i = 0 - while (i < numPartitions + 1) { - out.writeLong(offsets(i)) - i += 1 - } - } finally { - out.close() - } + val partitionLengths = sorter.writePartitionedFile(blockId, context) // Register our map output with the ShuffleBlockManager, which handles cleaning it over time blockManager.shuffleBlockManager.addCompletedMap(dep.shuffleId, mapId, numPartitions) mapStatus = new MapStatus(blockManager.blockManagerId, - lengths.map(MapOutputTracker.compressSize)) + partitionLengths.map(MapOutputTracker.compressSize)) } /** Close this writer, passing along whether the map completed */ @@ -145,6 +104,9 @@ private[spark] class SortShuffleWriter[K, V, C]( if (outputFile != null) { outputFile.delete() } + if (indexFile != null) { + indexFile.delete() + } return None } } finally { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index eb4849ebc6e52..b73d5e0cf1714 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -25,10 +25,10 @@ import scala.collection.mutable import com.google.common.io.ByteStreams -import org.apache.spark.{Aggregator, SparkEnv, Logging, Partitioner} +import org.apache.spark._ import org.apache.spark.serializer.{DeserializationStream, Serializer} -import org.apache.spark.storage.BlockId import org.apache.spark.executor.ShuffleWriteMetrics +import org.apache.spark.storage.{BlockObjectWriter, BlockId} /** * Sorts and potentially merges a number of key-value pairs of type (K, V) to produce key-combiner @@ -67,6 +67,13 @@ import org.apache.spark.executor.ShuffleWriteMetrics * for equality to merge values. * * - Users are expected to call stop() at the end to delete all the intermediate files. + * + * As a special case, if no Ordering and no Aggregator is given, and the number of partitions is + * less than spark.shuffle.sort.bypassMergeThreshold, we bypass the merge-sort and just write to + * separate files for each partition each time we spill, similar to the HashShuffleWriter. We can + * then concatenate these files to produce a single sorted file, without having to serialize and + * de-serialize each item twice (as is needed during the merge). This speeds up the map side of + * groupBy, sort, etc operations since they do no partial aggregation. */ private[spark] class ExternalSorter[K, V, C]( aggregator: Option[Aggregator[K, V, C]] = None, @@ -124,6 +131,18 @@ private[spark] class ExternalSorter[K, V, C]( // How much of the shared memory pool this collection has claimed private var myMemoryThreshold = 0L + // If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't need + // local aggregation and sorting, write numPartitions files directly and just concatenate them + // at the end. This avoids doing serialization and deserialization twice to merge together the + // spilled files, which would happen with the normal code path. The downside is having multiple + // files open at a time and thus more memory allocated to buffers. + private val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + private val bypassMergeSort = + (numPartitions <= bypassMergeThreshold && aggregator.isEmpty && ordering.isEmpty) + + // Array of file writers for each partition, used if bypassMergeSort is true and we've spilled + private var partitionWriters: Array[BlockObjectWriter] = null + // A comparator for keys K that orders them within a partition to allow aggregation or sorting. // Can be a partial ordering by hash code if a total ordering is not provided through by the // user. (A partial ordering means that equal keys have comparator.compare(k, k) = 0, but some @@ -137,7 +156,14 @@ private[spark] class ExternalSorter[K, V, C]( } }) - // A comparator for (Int, K) elements that orders them by partition and then possibly by key + // A comparator for (Int, K) pairs that orders them by only their partition ID + private val partitionComparator: Comparator[(Int, K)] = new Comparator[(Int, K)] { + override def compare(a: (Int, K), b: (Int, K)): Int = { + a._1 - b._1 + } + } + + // A comparator that orders (Int, K) pairs by partition ID and then possibly by key private val partitionKeyComparator: Comparator[(Int, K)] = { if (ordering.isDefined || aggregator.isDefined) { // Sort by partition ID then key comparator @@ -153,11 +179,7 @@ private[spark] class ExternalSorter[K, V, C]( } } else { // Just sort it by partition ID - new Comparator[(Int, K)] { - override def compare(a: (Int, K), b: (Int, K)): Int = { - a._1 - b._1 - } - } + partitionComparator } } @@ -171,7 +193,7 @@ private[spark] class ExternalSorter[K, V, C]( elementsPerPartition: Array[Long]) private val spills = new ArrayBuffer[SpilledFile] - def write(records: Iterator[_ <: Product2[K, V]]): Unit = { + def insertAll(records: Iterator[_ <: Product2[K, V]]): Unit = { // TODO: stop combining if we find that the reduction factor isn't high val shouldCombine = aggregator.isDefined @@ -242,6 +264,38 @@ private[spark] class ExternalSorter[K, V, C]( val threadId = Thread.currentThread().getId logInfo("Thread %d spilling in-memory batch of %d MB to disk (%d spill%s so far)" .format(threadId, memorySize / (1024 * 1024), spillCount, if (spillCount > 1) "s" else "")) + + if (bypassMergeSort) { + spillToPartitionFiles(collection) + } else { + spillToMergeableFile(collection) + } + + if (usingMap) { + map = new SizeTrackingAppendOnlyMap[(Int, K), C] + } else { + buffer = new SizeTrackingPairBuffer[(Int, K), C] + } + + // Release our memory back to the shuffle pool so that other threads can grab it + shuffleMemoryManager.release(myMemoryThreshold) + myMemoryThreshold = 0 + + _memoryBytesSpilled += memorySize + } + + /** + * Spill our in-memory collection to a sorted file that we can merge later (normal code path). + * We add this file into spilledFiles to find it later. + * + * Alternatively, if bypassMergeSort is true, we spill to separate files for each partition. + * See spillToPartitionedFiles() for that code path. + * + * @param collection whichever collection we're using (map or buffer) + */ + private def spillToMergeableFile(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { + assert(!bypassMergeSort) + val (blockId, file) = diskBlockManager.createTempBlock() curWriteMetrics = new ShuffleWriteMetrics() var writer = blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics) @@ -304,18 +358,36 @@ private[spark] class ExternalSorter[K, V, C]( } } - if (usingMap) { - map = new SizeTrackingAppendOnlyMap[(Int, K), C] - } else { - buffer = new SizeTrackingPairBuffer[(Int, K), C] - } + spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) + } - // Release our memory back to the shuffle pool so that other threads can grab it - shuffleMemoryManager.release(myMemoryThreshold) - myMemoryThreshold = 0 + /** + * Spill our in-memory collection to separate files, one for each partition. This is used when + * there's no aggregator and ordering and the number of partitions is small, because it allows + * writePartitionedFile to just concatenate files without deserializing data. + * + * @param collection whichever collection we're using (map or buffer) + */ + private def spillToPartitionFiles(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { + assert(bypassMergeSort) + + // Create our file writers if we haven't done so yet + if (partitionWriters == null) { + curWriteMetrics = new ShuffleWriteMetrics() + partitionWriters = Array.fill(numPartitions) { + val (blockId, file) = diskBlockManager.createTempBlock() + blockManager.getDiskWriter(blockId, file, ser, fileBufferSize, curWriteMetrics).open() + } + } - spills.append(SpilledFile(file, blockId, batchSizes.toArray, elementsPerPartition)) - _memoryBytesSpilled += memorySize + val it = collection.iterator // No need to sort stuff, just write each element out + while (it.hasNext) { + val elem = it.next() + val partitionId = elem._1._1 + val key = elem._1._2 + val value = elem._2 + partitionWriters(partitionId).write((key, value)) + } } /** @@ -479,7 +551,6 @@ private[spark] class ExternalSorter[K, V, C]( skipToNextPartition() - // Intermediate file and deserializer streams that read from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams var fileStream: FileInputStream = null @@ -619,23 +690,25 @@ private[spark] class ExternalSorter[K, V, C]( def partitionedIterator: Iterator[(Int, Iterator[Product2[K, C]])] = { val usingMap = aggregator.isDefined val collection: SizeTrackingPairCollection[(Int, K), C] = if (usingMap) map else buffer - if (spills.isEmpty) { + if (spills.isEmpty && partitionWriters == null) { // Special case: if we have only in-memory data, we don't need to merge streams, and perhaps // we don't even need to sort by anything other than partition ID if (!ordering.isDefined) { - // The user isn't requested sorted keys, so only sort by partition ID, not key - val partitionComparator = new Comparator[(Int, K)] { - override def compare(a: (Int, K), b: (Int, K)): Int = { - a._1 - b._1 - } - } + // The user hasn't requested sorted keys, so only sort by partition ID, not key groupByPartition(collection.destructiveSortedIterator(partitionComparator)) } else { // We do need to sort by both partition ID and key groupByPartition(collection.destructiveSortedIterator(partitionKeyComparator)) } + } else if (bypassMergeSort) { + // Read data from each partition file and merge it together with the data in memory; + // note that there's no ordering or aggregator in this case -- we just partition objects + val collIter = groupByPartition(collection.destructiveSortedIterator(partitionComparator)) + collIter.map { case (partitionId, values) => + (partitionId, values ++ readPartitionFile(partitionWriters(partitionId))) + } } else { - // General case: merge spilled and in-memory data + // Merge spilled and in-memory data merge(spills, collection.destructiveSortedIterator(partitionKeyComparator)) } } @@ -645,9 +718,113 @@ private[spark] class ExternalSorter[K, V, C]( */ def iterator: Iterator[Product2[K, C]] = partitionedIterator.flatMap(pair => pair._2) + /** + * Write all the data added into this ExternalSorter into a file in the disk store, creating + * an .index file for it as well with the offsets of each partition. This is called by the + * SortShuffleWriter and can go through an efficient path of just concatenating binary files + * if we decided to avoid merge-sorting. + * + * @param blockId block ID to write to. The index file will be blockId.name + ".index". + * @param context a TaskContext for a running Spark task, for us to update shuffle metrics. + * @return array of lengths, in bytes, of each partition of the file (used by map output tracker) + */ + def writePartitionedFile(blockId: BlockId, context: TaskContext): Array[Long] = { + val outputFile = blockManager.diskBlockManager.getFile(blockId) + + // Track location of each range in the output file + val offsets = new Array[Long](numPartitions + 1) + val lengths = new Array[Long](numPartitions) + + if (bypassMergeSort && partitionWriters != null) { + // We decided to write separate files for each partition, so just concatenate them. To keep + // this simple we spill out the current in-memory collection so that everything is in files. + spillToPartitionFiles(if (aggregator.isDefined) map else buffer) + partitionWriters.foreach(_.commitAndClose()) + var out: FileOutputStream = null + var in: FileInputStream = null + try { + out = new FileOutputStream(outputFile) + for (i <- 0 until numPartitions) { + val file = partitionWriters(i).fileSegment().file + in = new FileInputStream(file) + org.apache.spark.util.Utils.copyStream(in, out) + in.close() + in = null + lengths(i) = file.length() + offsets(i + 1) = offsets(i) + lengths(i) + } + } finally { + if (out != null) { + out.close() + } + if (in != null) { + in.close() + } + } + } else { + // Either we're not bypassing merge-sort or we have only in-memory data; get an iterator by + // partition and just write everything directly. + for ((id, elements) <- this.partitionedIterator) { + if (elements.hasNext) { + val writer = blockManager.getDiskWriter( + blockId, outputFile, ser, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) + for (elem <- elements) { + writer.write(elem) + } + writer.commitAndClose() + val segment = writer.fileSegment() + offsets(id + 1) = segment.offset + segment.length + lengths(id) = segment.length + } else { + // The partition is empty; don't create a new writer to avoid writing headers, etc + offsets(id + 1) = offsets(id) + } + } + } + + context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled + context.taskMetrics.diskBytesSpilled += diskBytesSpilled + + // Write an index file with the offsets of each block, plus a final offset at the end for the + // end of the output file. This will be used by SortShuffleManager.getBlockLocation to figure + // out where each block begins and ends. + + val diskBlockManager = blockManager.diskBlockManager + val indexFile = diskBlockManager.getFile(blockId.name + ".index") + val out = new DataOutputStream(new BufferedOutputStream(new FileOutputStream(indexFile))) + try { + var i = 0 + while (i < numPartitions + 1) { + out.writeLong(offsets(i)) + i += 1 + } + } finally { + out.close() + } + + lengths + } + + /** + * Read a partition file back as an iterator (used in our iterator method) + */ + def readPartitionFile(writer: BlockObjectWriter): Iterator[Product2[K, C]] = { + if (writer.isOpen) { + writer.commitAndClose() + } + blockManager.getLocalFromDisk(writer.blockId, ser).get.asInstanceOf[Iterator[Product2[K, C]]] + } + def stop(): Unit = { spills.foreach(s => s.file.delete()) spills.clear() + if (partitionWriters != null) { + partitionWriters.foreach { w => + w.revertPartialWritesAndClose() + diskBlockManager.getFile(w.blockId).delete() + } + partitionWriters = null + } } def memoryBytesSpilled: Long = _memoryBytesSpilled diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 57dcb4ffabac1..706faed980f31 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -19,12 +19,12 @@ package org.apache.spark.util.collection import scala.collection.mutable.ArrayBuffer -import org.scalatest.FunSuite +import org.scalatest.{PrivateMethodTester, FunSuite} import org.apache.spark._ import org.apache.spark.SparkContext._ -class ExternalSorterSuite extends FunSuite with LocalSparkContext { +class ExternalSorterSuite extends FunSuite with LocalSparkContext with PrivateMethodTester { private def createSparkConf(loadDefaults: Boolean): SparkConf = { val conf = new SparkConf(loadDefaults) // Make the Java serializer write a reset instruction (TC_RESET) after each object to test @@ -36,6 +36,16 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { conf } + private def assertBypassedMergeSort(sorter: ExternalSorter[_, _, _]): Unit = { + val bypassMergeSort = PrivateMethod[Boolean]('bypassMergeSort) + assert(sorter.invokePrivate(bypassMergeSort()), "sorter did not bypass merge-sort") + } + + private def assertDidNotBypassMergeSort(sorter: ExternalSorter[_, _, _]): Unit = { + val bypassMergeSort = PrivateMethod[Boolean]('bypassMergeSort) + assert(!sorter.invokePrivate(bypassMergeSort()), "sorter bypassed merge-sort") + } + test("empty data stream") { val conf = new SparkConf(false) conf.set("spark.shuffle.memoryFraction", "0.001") @@ -86,28 +96,28 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { // Both aggregator and ordering val sorter = new ExternalSorter[Int, Int, Int]( Some(agg), Some(new HashPartitioner(7)), Some(ord), None) - sorter.write(elements.iterator) + sorter.insertAll(elements.iterator) assert(sorter.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter.stop() // Only aggregator val sorter2 = new ExternalSorter[Int, Int, Int]( Some(agg), Some(new HashPartitioner(7)), None, None) - sorter2.write(elements.iterator) + sorter2.insertAll(elements.iterator) assert(sorter2.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter2.stop() // Only ordering val sorter3 = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(7)), Some(ord), None) - sorter3.write(elements.iterator) + sorter3.insertAll(elements.iterator) assert(sorter3.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter3.stop() // Neither aggregator nor ordering val sorter4 = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(7)), None, None) - sorter4.write(elements.iterator) + sorter4.insertAll(elements.iterator) assert(sorter4.partitionedIterator.map(p => (p._1, p._2.toSet)).toSet === expected) sorter4.stop() } @@ -118,13 +128,37 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") sc = new SparkContext("local", "test", conf) - val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) val ord = implicitly[Ordering[Int]] val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2)) + val sorter = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(7)), Some(ord), None) + assertDidNotBypassMergeSort(sorter) + sorter.insertAll(elements) + assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled + val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) + assert(iter.next() === (0, Nil)) + assert(iter.next() === (1, List((1, 1)))) + assert(iter.next() === (2, (0 until 100000).map(x => (2, 2)).toList)) + assert(iter.next() === (3, Nil)) + assert(iter.next() === (4, Nil)) + assert(iter.next() === (5, List((5, 5)))) + assert(iter.next() === (6, Nil)) + sorter.stop() + } + + test("empty partitions with spilling, bypass merge-sort") { + val conf = createSparkConf(false) + conf.set("spark.shuffle.memoryFraction", "0.001") + conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") + sc = new SparkContext("local", "test", conf) + + val elements = Iterator((1, 1), (5, 5)) ++ (0 until 100000).iterator.map(x => (2, 2)) + val sorter = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(7)), None, None) - sorter.write(elements) + assertBypassedMergeSort(sorter) + sorter.insertAll(elements) assert(sc.env.blockManager.diskBlockManager.getAllFiles().length > 0) // Make sure it spilled val iter = sorter.partitionedIterator.map(p => (p._1, p._2.toList)) assert(iter.next() === (0, Nil)) @@ -286,14 +320,43 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val ord = implicitly[Ordering[Int]] + + val sorter = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(3)), Some(ord), None) + assertDidNotBypassMergeSort(sorter) + sorter.insertAll((0 until 100000).iterator.map(i => (i, i))) + assert(diskBlockManager.getAllFiles().length > 0) + sorter.stop() + assert(diskBlockManager.getAllBlocks().length === 0) + + val sorter2 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(3)), Some(ord), None) + assertDidNotBypassMergeSort(sorter2) + sorter2.insertAll((0 until 100000).iterator.map(i => (i, i))) + assert(diskBlockManager.getAllFiles().length > 0) + assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet) + sorter2.stop() + assert(diskBlockManager.getAllBlocks().length === 0) + } + + test("cleanup of intermediate files in sorter, bypass merge-sort") { + val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found + conf.set("spark.shuffle.memoryFraction", "0.001") + conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") + sc = new SparkContext("local", "test", conf) + val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) - sorter.write((0 until 100000).iterator.map(i => (i, i))) + assertBypassedMergeSort(sorter) + sorter.insertAll((0 until 100000).iterator.map(i => (i, i))) assert(diskBlockManager.getAllFiles().length > 0) sorter.stop() assert(diskBlockManager.getAllBlocks().length === 0) val sorter2 = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) - sorter2.write((0 until 100000).iterator.map(i => (i, i))) + assertBypassedMergeSort(sorter2) + sorter2.insertAll((0 until 100000).iterator.map(i => (i, i))) assert(diskBlockManager.getAllFiles().length > 0) assert(sorter2.iterator.toSet === (0 until 100000).map(i => (i, i)).toSet) sorter2.stop() @@ -307,9 +370,35 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val ord = implicitly[Ordering[Int]] + + val sorter = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(3)), Some(ord), None) + assertDidNotBypassMergeSort(sorter) + intercept[SparkException] { + sorter.insertAll((0 until 100000).iterator.map(i => { + if (i == 99990) { + throw new SparkException("Intentional failure") + } + (i, i) + })) + } + assert(diskBlockManager.getAllFiles().length > 0) + sorter.stop() + assert(diskBlockManager.getAllBlocks().length === 0) + } + + test("cleanup of intermediate files in sorter if there are errors, bypass merge-sort") { + val conf = createSparkConf(true) // Load defaults, otherwise SPARK_HOME is not found + conf.set("spark.shuffle.memoryFraction", "0.001") + conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") + sc = new SparkContext("local", "test", conf) + val diskBlockManager = SparkEnv.get.blockManager.diskBlockManager + val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) + assertBypassedMergeSort(sorter) intercept[SparkException] { - sorter.write((0 until 100000).iterator.map(i => { + sorter.insertAll((0 until 100000).iterator.map(i => { if (i == 99990) { throw new SparkException("Intentional failure") } @@ -365,7 +454,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { sc = new SparkContext("local", "test", conf) val sorter = new ExternalSorter[Int, Int, Int](None, Some(new HashPartitioner(3)), None, None) - sorter.write((0 until 100000).iterator.map(i => (i / 4, i))) + sorter.insertAll((0 until 100000).iterator.map(i => (i / 4, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 100000).map(i => (i / 4, i)).filter(_._1 % 3 == p).toSet) @@ -381,7 +470,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) - sorter.write((0 until 100).iterator.map(i => (i / 2, i))) + sorter.insertAll((0 until 100).iterator.map(i => (i / 2, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 50).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) @@ -397,7 +486,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), None, None) - sorter.write((0 until 100000).iterator.map(i => (i / 2, i))) + sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) @@ -414,7 +503,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) val ord = implicitly[Ordering[Int]] val sorter = new ExternalSorter(Some(agg), Some(new HashPartitioner(3)), Some(ord), None) - sorter.write((0 until 100000).iterator.map(i => (i / 2, i))) + sorter.insertAll((0 until 100000).iterator.map(i => (i / 2, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSet)}.toSet val expected = (0 until 3).map(p => { (p, (0 until 50000).map(i => (i, i * 4 + 1)).filter(_._1 % 3 == p).toSet) @@ -431,7 +520,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] val sorter = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.write((0 until 100).iterator.map(i => (i, i))) + sorter.insertAll((0 until 100).iterator.map(i => (i, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq val expected = (0 until 3).map(p => { (p, (0 until 100).map(i => (i, i)).filter(_._1 % 3 == p).toSeq) @@ -448,7 +537,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val ord = implicitly[Ordering[Int]] val sorter = new ExternalSorter[Int, Int, Int]( None, Some(new HashPartitioner(3)), Some(ord), None) - sorter.write((0 until 100000).iterator.map(i => (i, i))) + sorter.insertAll((0 until 100000).iterator.map(i => (i, i))) val results = sorter.partitionedIterator.map{case (p, vs) => (p, vs.toSeq)}.toSeq val expected = (0 until 3).map(p => { (p, (0 until 100000).map(i => (i, i)).filter(_._1 % 3 == p).toSeq) @@ -495,7 +584,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val toInsert = (1 to 100000).iterator.map(_.toString).map(s => (s, s)) ++ collisionPairs.iterator ++ collisionPairs.iterator.map(_.swap) - sorter.write(toInsert) + sorter.insertAll(toInsert) // A map of collision pairs in both directions val collisionPairsMap = (collisionPairs ++ collisionPairs.map(_.swap)).toMap @@ -524,7 +613,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { // Insert 10 copies each of lots of objects whose hash codes are either 0 or 1. This causes // problems if the map fails to group together the objects with the same code (SPARK-2043). val toInsert = for (i <- 1 to 10; j <- 1 to 10000) yield (FixedHashObject(j, j % 2), 1) - sorter.write(toInsert.iterator) + sorter.insertAll(toInsert.iterator) val it = sorter.iterator var count = 0 @@ -548,7 +637,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val agg = new Aggregator[Int, Int, ArrayBuffer[Int]](createCombiner, mergeValue, mergeCombiners) val sorter = new ExternalSorter[Int, Int, ArrayBuffer[Int]](Some(agg), None, None, None) - sorter.write((1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue))) + sorter.insertAll((1 to 100000).iterator.map(i => (i, i)) ++ Iterator((Int.MaxValue, Int.MaxValue))) val it = sorter.iterator while (it.hasNext) { @@ -572,7 +661,7 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { val sorter = new ExternalSorter[String, String, ArrayBuffer[String]]( Some(agg), None, None, None) - sorter.write((1 to 100000).iterator.map(i => (i.toString, i.toString)) ++ Iterator( + sorter.insertAll((1 to 100000).iterator.map(i => (i.toString, i.toString)) ++ Iterator( (null.asInstanceOf[String], "1"), ("1", null.asInstanceOf[String]), (null.asInstanceOf[String], null.asInstanceOf[String]) @@ -584,4 +673,38 @@ class ExternalSorterSuite extends FunSuite with LocalSparkContext { it.next() } } + + test("conditions for bypassing merge-sort") { + val conf = createSparkConf(false) + conf.set("spark.shuffle.memoryFraction", "0.001") + conf.set("spark.shuffle.manager", "org.apache.spark.shuffle.sort.SortShuffleManager") + sc = new SparkContext("local", "test", conf) + + val agg = new Aggregator[Int, Int, Int](i => i, (i, j) => i + j, (i, j) => i + j) + val ord = implicitly[Ordering[Int]] + + // Numbers of partitions that are above and below the default bypassMergeThreshold + val FEW_PARTITIONS = 50 + val MANY_PARTITIONS = 10000 + + // Sorters with no ordering or aggregator: should bypass unless # of partitions is high + + val sorter1 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(FEW_PARTITIONS)), None, None) + assertBypassedMergeSort(sorter1) + + val sorter2 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(MANY_PARTITIONS)), None, None) + assertDidNotBypassMergeSort(sorter2) + + // Sorters with an ordering or aggregator: should not bypass even if they have few partitions + + val sorter3 = new ExternalSorter[Int, Int, Int]( + None, Some(new HashPartitioner(FEW_PARTITIONS)), Some(ord), None) + assertDidNotBypassMergeSort(sorter3) + + val sorter4 = new ExternalSorter[Int, Int, Int]( + Some(agg), Some(new HashPartitioner(FEW_PARTITIONS)), None, None) + assertDidNotBypassMergeSort(sorter4) + } } diff --git a/docs/configuration.md b/docs/configuration.md index 5e3eb0f0871af..4d27c5a918fe0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -281,6 +281,24 @@ Apart from these, the following properties are also available, and may be useful overhead per reduce task, so keep it small unless you have a large amount of memory. + + spark.shuffle.manager + HASH + + Implementation to use for shuffling data. A hash-based shuffle manager is the default, but + starting in Spark 1.1 there is an experimental sort-based shuffle manager that is more + memory-efficient in environments with small executors, such as YARN. To use that, change + this value to SORT. + + + + spark.shuffle.sort.bypassMergeThreshold + 200 + + (Advanced) In the sort-based shuffle manager, avoid merge-sorting data if there is no + map-side aggregation and there are at most this many reduce partitions. + + #### Spark UI From 4c51098f320f164eb66f92ff0f26b0b595a58f38 Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Thu, 7 Aug 2014 18:09:03 -0700 Subject: [PATCH 02/11] SPARK-2565. Update ShuffleReadMetrics as blocks are fetched Author: Sandy Ryza Closes #1507 from sryza/sandy-spark-2565 and squashes the following commits: 74dad41 [Sandy Ryza] SPARK-2565. Update ShuffleReadMetrics as blocks are fetched --- .../org/apache/spark/executor/Executor.scala | 1 + .../apache/spark/executor/TaskMetrics.scala | 55 ++++++++++++++----- .../hash/BlockStoreShuffleFetcher.scala | 13 ++--- .../shuffle/hash/HashShuffleReader.scala | 4 +- .../spark/storage/BlockFetcherIterator.scala | 40 +++++--------- .../apache/spark/storage/BlockManager.scala | 11 ++-- .../org/apache/spark/util/JsonProtocol.scala | 5 +- .../storage/BlockFetcherIteratorSuite.scala | 13 +++-- .../ui/jobs/JobProgressListenerSuite.scala | 4 +- .../apache/spark/util/JsonProtocolSuite.scala | 2 +- 10 files changed, 84 insertions(+), 64 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index c2b9c660ddaec..eac1f2326a29d 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -374,6 +374,7 @@ private[spark] class Executor( for (taskRunner <- runningTasks.values()) { if (!taskRunner.attemptedTask.isEmpty) { Option(taskRunner.task).flatMap(_.metrics).foreach { metrics => + metrics.updateShuffleReadMetrics tasksMetrics += ((taskRunner.taskId, metrics)) } } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 11a6e10243211..99a88c13456df 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -17,6 +17,8 @@ package org.apache.spark.executor +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.annotation.DeveloperApi import org.apache.spark.storage.{BlockId, BlockStatus} @@ -81,12 +83,27 @@ class TaskMetrics extends Serializable { var inputMetrics: Option[InputMetrics] = None /** - * If this task reads from shuffle output, metrics on getting shuffle data will be collected here + * If this task reads from shuffle output, metrics on getting shuffle data will be collected here. + * This includes read metrics aggregated over all the task's shuffle dependencies. */ private var _shuffleReadMetrics: Option[ShuffleReadMetrics] = None def shuffleReadMetrics = _shuffleReadMetrics + /** + * This should only be used when recreating TaskMetrics, not when updating read metrics in + * executors. + */ + private[spark] def setShuffleReadMetrics(shuffleReadMetrics: Option[ShuffleReadMetrics]) { + _shuffleReadMetrics = shuffleReadMetrics + } + + /** + * ShuffleReadMetrics per dependency for collecting independently while task is in progress. + */ + @transient private lazy val depsShuffleReadMetrics: ArrayBuffer[ShuffleReadMetrics] = + new ArrayBuffer[ShuffleReadMetrics]() + /** * If this task writes to shuffle output, metrics on the written shuffle data will be collected * here @@ -98,19 +115,31 @@ class TaskMetrics extends Serializable { */ var updatedBlocks: Option[Seq[(BlockId, BlockStatus)]] = None - /** Adds the given ShuffleReadMetrics to any existing shuffle metrics for this task. */ - def updateShuffleReadMetrics(newMetrics: ShuffleReadMetrics) = synchronized { - _shuffleReadMetrics match { - case Some(existingMetrics) => - existingMetrics.shuffleFinishTime = math.max( - existingMetrics.shuffleFinishTime, newMetrics.shuffleFinishTime) - existingMetrics.fetchWaitTime += newMetrics.fetchWaitTime - existingMetrics.localBlocksFetched += newMetrics.localBlocksFetched - existingMetrics.remoteBlocksFetched += newMetrics.remoteBlocksFetched - existingMetrics.remoteBytesRead += newMetrics.remoteBytesRead - case None => - _shuffleReadMetrics = Some(newMetrics) + /** + * A task may have multiple shuffle readers for multiple dependencies. To avoid synchronization + * issues from readers in different threads, in-progress tasks use a ShuffleReadMetrics for each + * dependency, and merge these metrics before reporting them to the driver. This method returns + * a ShuffleReadMetrics for a dependency and registers it for merging later. + */ + private [spark] def createShuffleReadMetricsForDependency(): ShuffleReadMetrics = synchronized { + val readMetrics = new ShuffleReadMetrics() + depsShuffleReadMetrics += readMetrics + readMetrics + } + + /** + * Aggregates shuffle read metrics for all registered dependencies into shuffleReadMetrics. + */ + private[spark] def updateShuffleReadMetrics() = synchronized { + val merged = new ShuffleReadMetrics() + for (depMetrics <- depsShuffleReadMetrics) { + merged.fetchWaitTime += depMetrics.fetchWaitTime + merged.localBlocksFetched += depMetrics.localBlocksFetched + merged.remoteBlocksFetched += depMetrics.remoteBlocksFetched + merged.remoteBytesRead += depMetrics.remoteBytesRead + merged.shuffleFinishTime = math.max(merged.shuffleFinishTime, depMetrics.shuffleFinishTime) } + _shuffleReadMetrics = Some(merged) } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala index 99788828981c7..12b475658e29d 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala @@ -32,7 +32,8 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { shuffleId: Int, reduceId: Int, context: TaskContext, - serializer: Serializer) + serializer: Serializer, + shuffleMetrics: ShuffleReadMetrics) : Iterator[T] = { logDebug("Fetching outputs for shuffle %d, reduce %d".format(shuffleId, reduceId)) @@ -73,17 +74,11 @@ private[hash] object BlockStoreShuffleFetcher extends Logging { } } - val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) + val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer, shuffleMetrics) val itr = blockFetcherItr.flatMap(unpackBlock) val completionIter = CompletionIterator[T, Iterator[T]](itr, { - val shuffleMetrics = new ShuffleReadMetrics - shuffleMetrics.shuffleFinishTime = System.currentTimeMillis - shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime - shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead - shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks - shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks - context.taskMetrics.updateShuffleReadMetrics(shuffleMetrics) + context.taskMetrics.updateShuffleReadMetrics() }) new InterruptibleIterator[T](context, completionIter) diff --git a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala index 88a5f1e5ddf58..7bed97a63f0f6 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/hash/HashShuffleReader.scala @@ -36,8 +36,10 @@ private[spark] class HashShuffleReader[K, C]( /** Read the combined key-values for this reduce task */ override def read(): Iterator[Product2[K, C]] = { + val readMetrics = context.taskMetrics.createShuffleReadMetricsForDependency() val ser = Serializer.getSerializer(dep.serializer) - val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser) + val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context, ser, + readMetrics) val aggregatedIter: Iterator[Product2[K, C]] = if (dep.aggregator.isDefined) { if (dep.mapSideCombine) { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala index 938af6f5b923a..5f44f5f3197fd 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockFetcherIterator.scala @@ -27,6 +27,7 @@ import scala.util.{Failure, Success} import io.netty.buffer.ByteBuf import org.apache.spark.{Logging, SparkException} +import org.apache.spark.executor.ShuffleReadMetrics import org.apache.spark.network.BufferMessage import org.apache.spark.network.ConnectionManagerId import org.apache.spark.network.netty.ShuffleCopier @@ -47,10 +48,6 @@ import org.apache.spark.util.Utils private[storage] trait BlockFetcherIterator extends Iterator[(BlockId, Option[Iterator[Any]])] with Logging { def initialize() - def numLocalBlocks: Int - def numRemoteBlocks: Int - def fetchWaitTime: Long - def remoteBytesRead: Long } @@ -72,14 +69,12 @@ object BlockFetcherIterator { class BasicBlockFetcherIterator( private val blockManager: BlockManager, val blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], - serializer: Serializer) + serializer: Serializer, + readMetrics: ShuffleReadMetrics) extends BlockFetcherIterator { import blockManager._ - private var _remoteBytesRead = 0L - private var _fetchWaitTime = 0L - if (blocksByAddress == null) { throw new IllegalArgumentException("BlocksByAddress is null") } @@ -89,13 +84,9 @@ object BlockFetcherIterator { protected var startTime = System.currentTimeMillis - // This represents the number of local blocks, also counting zero-sized blocks - private var numLocal = 0 // BlockIds for local blocks that need to be fetched. Excludes zero-sized blocks protected val localBlocksToFetch = new ArrayBuffer[BlockId]() - // This represents the number of remote blocks, also counting zero-sized blocks - private var numRemote = 0 // BlockIds for remote blocks that need to be fetched. Excludes zero-sized blocks protected val remoteBlocksToFetch = new HashSet[BlockId]() @@ -132,7 +123,10 @@ object BlockFetcherIterator { val networkSize = blockMessage.getData.limit() results.put(new FetchResult(blockId, sizeMap(blockId), () => dataDeserialize(blockId, blockMessage.getData, serializer))) - _remoteBytesRead += networkSize + // TODO: NettyBlockFetcherIterator has some race conditions where multiple threads can + // be incrementing bytes read at the same time (SPARK-2625). + readMetrics.remoteBytesRead += networkSize + readMetrics.remoteBlocksFetched += 1 logDebug("Got remote block " + blockId + " after " + Utils.getUsedTimeMs(startTime)) } } @@ -155,14 +149,14 @@ object BlockFetcherIterator { // Split local and remote blocks. Remote blocks are further split into FetchRequests of size // at most maxBytesInFlight in order to limit the amount of data in flight. val remoteRequests = new ArrayBuffer[FetchRequest] + var totalBlocks = 0 for ((address, blockInfos) <- blocksByAddress) { + totalBlocks += blockInfos.size if (address == blockManagerId) { - numLocal = blockInfos.size // Filter out zero-sized blocks localBlocksToFetch ++= blockInfos.filter(_._2 != 0).map(_._1) _numBlocksToFetch += localBlocksToFetch.size } else { - numRemote += blockInfos.size val iterator = blockInfos.iterator var curRequestSize = 0L var curBlocks = new ArrayBuffer[(BlockId, Long)] @@ -192,7 +186,7 @@ object BlockFetcherIterator { } } logInfo("Getting " + _numBlocksToFetch + " non-empty blocks out of " + - (numLocal + numRemote) + " blocks") + totalBlocks + " blocks") remoteRequests } @@ -205,6 +199,7 @@ object BlockFetcherIterator { // getLocalFromDisk never return None but throws BlockException val iter = getLocalFromDisk(id, serializer).get // Pass 0 as size since it's not in flight + readMetrics.localBlocksFetched += 1 results.put(new FetchResult(id, 0, () => iter)) logDebug("Got local block " + id) } catch { @@ -238,12 +233,6 @@ object BlockFetcherIterator { logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms") } - override def numLocalBlocks: Int = numLocal - override def numRemoteBlocks: Int = numRemote - override def fetchWaitTime: Long = _fetchWaitTime - override def remoteBytesRead: Long = _remoteBytesRead - - // Implementing the Iterator methods with an iterator that reads fetched blocks off the queue // as they arrive. @volatile protected var resultsGotten = 0 @@ -255,7 +244,7 @@ object BlockFetcherIterator { val startFetchWait = System.currentTimeMillis() val result = results.take() val stopFetchWait = System.currentTimeMillis() - _fetchWaitTime += (stopFetchWait - startFetchWait) + readMetrics.fetchWaitTime += (stopFetchWait - startFetchWait) if (! result.failed) bytesInFlight -= result.size while (!fetchRequests.isEmpty && (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) { @@ -269,8 +258,9 @@ object BlockFetcherIterator { class NettyBlockFetcherIterator( blockManager: BlockManager, blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], - serializer: Serializer) - extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer) { + serializer: Serializer, + readMetrics: ShuffleReadMetrics) + extends BasicBlockFetcherIterator(blockManager, blocksByAddress, serializer, readMetrics) { import blockManager._ diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 8d21b02b747ff..e8bbd298c631a 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -29,7 +29,7 @@ import akka.actor.{ActorSystem, Cancellable, Props} import sun.nio.ch.DirectBuffer import org.apache.spark._ -import org.apache.spark.executor.{DataReadMethod, InputMetrics, ShuffleWriteMetrics} +import org.apache.spark.executor._ import org.apache.spark.io.CompressionCodec import org.apache.spark.network._ import org.apache.spark.serializer.Serializer @@ -539,12 +539,15 @@ private[spark] class BlockManager( */ def getMultiple( blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])], - serializer: Serializer): BlockFetcherIterator = { + serializer: Serializer, + readMetrics: ShuffleReadMetrics): BlockFetcherIterator = { val iter = if (conf.getBoolean("spark.shuffle.use.netty", false)) { - new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer) + new BlockFetcherIterator.NettyBlockFetcherIterator(this, blocksByAddress, serializer, + readMetrics) } else { - new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer) + new BlockFetcherIterator.BasicBlockFetcherIterator(this, blocksByAddress, serializer, + readMetrics) } iter.initialize() iter diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index b112b359368cd..6f8eb1ee12634 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -560,9 +560,8 @@ private[spark] object JsonProtocol { metrics.resultSerializationTime = (json \ "Result Serialization Time").extract[Long] metrics.memoryBytesSpilled = (json \ "Memory Bytes Spilled").extract[Long] metrics.diskBytesSpilled = (json \ "Disk Bytes Spilled").extract[Long] - Utils.jsonOption(json \ "Shuffle Read Metrics").map { shuffleReadMetrics => - metrics.updateShuffleReadMetrics(shuffleReadMetricsFromJson(shuffleReadMetrics)) - } + metrics.setShuffleReadMetrics( + Utils.jsonOption(json \ "Shuffle Read Metrics").map(shuffleReadMetricsFromJson)) metrics.shuffleWriteMetrics = Utils.jsonOption(json \ "Shuffle Write Metrics").map(shuffleWriteMetricsFromJson) metrics.inputMetrics = diff --git a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala index 1538995a6b404..bcbfe8baf36ad 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockFetcherIteratorSuite.scala @@ -33,6 +33,7 @@ import org.mockito.invocation.InvocationOnMock import org.apache.spark.storage.BlockFetcherIterator._ import org.apache.spark.network.{ConnectionManager, Message} +import org.apache.spark.executor.ShuffleReadMetrics class BlockFetcherIteratorSuite extends FunSuite with Matchers { @@ -70,8 +71,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq) ) - val iterator = new BasicBlockFetcherIterator(blockManager, - blocksByAddress, null) + val iterator = new BasicBlockFetcherIterator(blockManager, blocksByAddress, null, + new ShuffleReadMetrics()) iterator.initialize() @@ -121,8 +122,8 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { (bmId, blIds.map(blId => (blId, 1.asInstanceOf[Long])).toSeq) ) - val iterator = new BasicBlockFetcherIterator(blockManager, - blocksByAddress, null) + val iterator = new BasicBlockFetcherIterator(blockManager, blocksByAddress, null, + new ShuffleReadMetrics()) iterator.initialize() @@ -165,7 +166,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { ) val iterator = new BasicBlockFetcherIterator(blockManager, - blocksByAddress, null) + blocksByAddress, null, new ShuffleReadMetrics()) iterator.initialize() iterator.foreach{ @@ -219,7 +220,7 @@ class BlockFetcherIteratorSuite extends FunSuite with Matchers { ) val iterator = new BasicBlockFetcherIterator(blockManager, - blocksByAddress, null) + blocksByAddress, null, new ShuffleReadMetrics()) iterator.initialize() iterator.foreach{ case (_, r) => { diff --git a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala index cb8252515238e..f5ba31c309277 100644 --- a/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/jobs/JobProgressListenerSuite.scala @@ -65,7 +65,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc // finish this task, should get updated shuffleRead shuffleReadMetrics.remoteBytesRead = 1000 - taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics) + taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false) taskInfo.finishTime = 1 var task = new ShuffleMapTask(0) @@ -142,7 +142,7 @@ class JobProgressListenerSuite extends FunSuite with LocalSparkContext with Matc val taskMetrics = new TaskMetrics() val shuffleReadMetrics = new ShuffleReadMetrics() val shuffleWriteMetrics = new ShuffleWriteMetrics() - taskMetrics.updateShuffleReadMetrics(shuffleReadMetrics) + taskMetrics.setShuffleReadMetrics(Some(shuffleReadMetrics)) taskMetrics.shuffleWriteMetrics = Some(shuffleWriteMetrics) shuffleReadMetrics.remoteBytesRead = base + 1 shuffleReadMetrics.remoteBlocksFetched = base + 2 diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 2002a817d9168..97ffb07662482 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -539,7 +539,7 @@ class JsonProtocolSuite extends FunSuite { sr.localBlocksFetched = e sr.fetchWaitTime = a + d sr.remoteBlocksFetched = f - t.updateShuffleReadMetrics(sr) + t.setShuffleReadMetrics(Some(sr)) } sw.shuffleBytesWritten = a + b + c sw.shuffleWriteTime = b + c + d From 9de6a42bb34ea8963225ce90f1a45adcfee38b58 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 7 Aug 2014 18:53:15 -0700 Subject: [PATCH 03/11] [SPARK-2904] Remove non-used local variable in SparkSubmitArguments Author: Kousuke Saruta Closes #1834 from sarutak/SPARK-2904 and squashes the following commits: 38e7d45 [Kousuke Saruta] Removed non-used variable in SparkSubmitArguments --- .../scala/org/apache/spark/deploy/SparkSubmitArguments.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 087dd4d633db0..c21f1529a1837 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -219,7 +219,6 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { /** Fill in values by parsing user options. */ private def parseOpts(opts: Seq[String]): Unit = { - var inSparkOpts = true val EQ_SEPARATED_OPT="""(--[^=]+)=(.+)""".r // Delineates parsing of Spark options from parsing of user options. From 9a54de16ed9de536e0436d532c587384e1ea0af6 Mon Sep 17 00:00:00 2001 From: Erik Erlandson Date: Thu, 7 Aug 2014 23:45:16 -0700 Subject: [PATCH 04/11] [SPARK-2911]: provide rdd.parent[T](j) to obtain jth parent RDD Author: Erik Erlandson Closes #1841 from erikerlandson/spark-2911-pr and squashes the following commits: 4699e2f [Erik Erlandson] [SPARK-2911]: provide rdd.parent[T](j) to obtain jth parent RDD --- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 5 +++++ .../src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 10 ++++++++++ 2 files changed, 15 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 0159003c88e06..19e10bd04681b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1233,6 +1233,11 @@ abstract class RDD[T: ClassTag]( dependencies.head.rdd.asInstanceOf[RDD[U]] } + /** Returns the jth parent RDD: e.g. rdd.parent[T](0) is equivalent to rdd.firstParent[T] */ + protected[spark] def parent[U: ClassTag](j: Int) = { + dependencies(j).rdd.asInstanceOf[RDD[U]] + } + /** The [[org.apache.spark.SparkContext]] that this RDD was created on. */ def context = sc diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 4a7dc8dca25e2..926d4fecb5b91 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -726,6 +726,16 @@ class RDDSuite extends FunSuite with SharedSparkContext { jrdd.rdd.retag.collect() } + test("parent method") { + val rdd1 = sc.parallelize(1 to 10, 2) + val rdd2 = rdd1.filter(_ % 2 == 0) + val rdd3 = rdd2.map(_ + 1) + val rdd4 = new UnionRDD(sc, List(rdd1, rdd2, rdd3)) + assert(rdd4.parent(0).isInstanceOf[ParallelCollectionRDD[_]]) + assert(rdd4.parent(1).isInstanceOf[FilteredRDD[_]]) + assert(rdd4.parent(2).isInstanceOf[MappedRDD[_, _]]) + } + test("getNarrowAncestors") { val rdd1 = sc.parallelize(1 to 100, 4) val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1) From 9016af3f2729101027e33593e094332f05f48d92 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 8 Aug 2014 11:01:51 -0700 Subject: [PATCH 05/11] [SPARK-2888] [SQL] Fix addColumnMetadataToConf in HiveTableScan JIRA: https://issues.apache.org/jira/browse/SPARK-2888 Author: Yin Huai Closes #1817 from yhuai/fixAddColumnMetadataToConf and squashes the following commits: fba728c [Yin Huai] Fix addColumnMetadataToConf. --- .../sql/hive/execution/HiveTableScan.scala | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala index 8920e2a76a27f..577ca928b43b6 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveTableScan.scala @@ -72,17 +72,12 @@ case class HiveTableScan( } private def addColumnMetadataToConf(hiveConf: HiveConf) { - // Specifies IDs and internal names of columns to be scanned. - val neededColumnIDs = attributes.map(a => relation.output.indexWhere(_.name == a.name): Integer) - val columnInternalNames = neededColumnIDs.map(HiveConf.getColumnInternalName(_)).mkString(",") - - if (attributes.size == relation.output.size) { - // SQLContext#pruneFilterProject guarantees no duplicated value in `attributes` - ColumnProjectionUtils.setFullyReadColumns(hiveConf) - } else { - ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) - } + // Specifies needed column IDs for those non-partitioning columns. + val neededColumnIDs = + attributes.map(a => + relation.attributes.indexWhere(_.name == a.name): Integer).filter(index => index >= 0) + ColumnProjectionUtils.appendReadColumnIDs(hiveConf, neededColumnIDs) ColumnProjectionUtils.appendReadColumnNames(hiveConf, attributes.map(_.name)) // Specifies types and object inspectors of columns to be scanned. @@ -99,7 +94,7 @@ case class HiveTableScan( .mkString(",") hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) - hiveConf.set(serdeConstants.LIST_COLUMNS, columnInternalNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.attributes.map(_.name).mkString(",")) } addColumnMetadataToConf(context.hiveconf) From 0489cee6b24ca34f1adab03a75d157e04a9e06b7 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 8 Aug 2014 11:10:11 -0700 Subject: [PATCH 06/11] [SPARK-2908] [SQL] JsonRDD.nullTypeToStringType does not convert all NullType to StringType JIRA: https://issues.apache.org/jira/browse/SPARK-2908 Author: Yin Huai Closes #1840 from yhuai/SPARK-2908 and squashes the following commits: 86e833e [Yin Huai] Update test. cb11759 [Yin Huai] nullTypeToStringType should check columns with the type of array of structs. --- .../scala/org/apache/spark/sql/json/JsonRDD.scala | 4 +++- .../scala/org/apache/spark/sql/json/JsonSuite.scala | 11 ++++++++--- .../org/apache/spark/sql/json/TestJsonData.scala | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index a3d2a1c7a51f8..1c0b03c684f10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -109,7 +109,9 @@ private[sql] object JsonRDD extends Logging { val newType = dataType match { case NullType => StringType case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull) - case struct: StructType => nullTypeToStringType(struct) + case ArrayType(struct: StructType, containsNull) => + ArrayType(nullTypeToStringType(struct), containsNull) + case struct: StructType =>nullTypeToStringType(struct) case other: DataType => other } StructField(fieldName, newType, nullable) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 75c0589eb208e..58b1e23891a3b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -213,7 +213,8 @@ class JsonSuite extends QueryTest { StructField("arrayOfStruct", ArrayType( StructType( StructField("field1", BooleanType, true) :: - StructField("field2", StringType, true) :: Nil)), true) :: + StructField("field2", StringType, true) :: + StructField("field3", StringType, true) :: Nil)), true) :: StructField("struct", StructType( StructField("field1", BooleanType, true) :: StructField("field2", DecimalType, true) :: Nil), true) :: @@ -263,8 +264,12 @@ class JsonSuite extends QueryTest { // Access elements of an array of structs. checkAnswer( - sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2] from jsonTable"), - (true :: "str1" :: Nil, false :: null :: Nil, null) :: Nil + sql("select arrayOfStruct[0], arrayOfStruct[1], arrayOfStruct[2], arrayOfStruct[3] " + + "from jsonTable"), + (true :: "str1" :: null :: Nil, + false :: null :: null :: Nil, + null :: null :: null :: Nil, + null) :: Nil ) // Access a struct and fields inside of it. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala index d0180f3754f22..a88310b5f1b46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala @@ -43,7 +43,7 @@ object TestJsonData { "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], "arrayOfBoolean":[true, false, true], "arrayOfNull":[null, null, null, null], - "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}], + "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] }""" :: Nil) From c874723fa844b49f057bb2434a12228b2f717e99 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 8 Aug 2014 11:15:16 -0700 Subject: [PATCH 07/11] [SPARK-2877] [SQL] MetastoreRelation should use SparkClassLoader when creating the tableDesc JIRA: https://issues.apache.org/jira/browse/SPARK-2877 Author: Yin Huai Closes #1806 from yhuai/SPARK-2877 and squashes the following commits: 4142bcb [Yin Huai] Use Spark's classloader. --- .../org/apache/spark/sql/hive/HiveMetastoreCatalog.scala | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index 301cf51c00e2b..82e9c1a248626 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.hive import scala.util.parsing.combinator.RegexParsers -import org.apache.hadoop.fs.Path -import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.metastore.api.{FieldSchema, StorageDescriptor, SerDeInfo} import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition} import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table} @@ -39,6 +37,7 @@ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.columnar.InMemoryRelation import org.apache.spark.sql.hive.execution.HiveTableScan +import org.apache.spark.util.Utils /* Implicit conversions */ import scala.collection.JavaConversions._ @@ -288,7 +287,10 @@ private[hive] case class MetastoreRelation ) val tableDesc = new TableDesc( - Class.forName(hiveQlTable.getSerializationLib).asInstanceOf[Class[Deserializer]], + Class.forName( + hiveQlTable.getSerializationLib, + true, + Utils.getContextOrSparkClassLoader).asInstanceOf[Class[Deserializer]], hiveQlTable.getInputFormatClass, // The class of table should be org.apache.hadoop.hive.ql.metadata.Table because // getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to From 45d8f4deab50ae069ecde2201bd486d464a4501e Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 8 Aug 2014 11:23:58 -0700 Subject: [PATCH 08/11] [SPARK-2919] [SQL] Basic support for analyze command in HiveQl The command we will support is ``` ANALYZE TABLE tablename COMPUTE STATISTICS noscan ``` Other cases shown in https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables will still be treated as Hive native commands. JIRA: https://issues.apache.org/jira/browse/SPARK-2919 Author: Yin Huai Closes #1848 from yhuai/sqlAnalyze and squashes the following commits: 0b79d36 [Yin Huai] Typo and format. c59d94b [Yin Huai] Support "ANALYZE TABLE tableName COMPUTE STATISTICS noscan". --- .../org/apache/spark/sql/hive/HiveQl.scala | 21 +++++++-- .../spark/sql/hive/HiveStrategies.scala | 2 + .../{DropTable.scala => commands.scala} | 26 +++++++++++ .../spark/sql/hive/StatisticsSuite.scala | 45 ++++++++++++++++++- 4 files changed, 89 insertions(+), 5 deletions(-) rename sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/{DropTable.scala => commands.scala} (72%) diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index bc2fefafd58c8..05b2f5f6cd3f7 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -46,6 +46,8 @@ private[hive] case class AddFile(filePath: String) extends Command private[hive] case class DropTable(tableName: String, ifExists: Boolean) extends Command +private[hive] case class AnalyzeTable(tableName: String) extends Command + /** Provides a mapping from HiveQL statements to catalyst logical plans and expression trees. */ private[hive] object HiveQl { protected val nativeCommands = Seq( @@ -74,7 +76,6 @@ private[hive] object HiveQl { "TOK_CREATEFUNCTION", "TOK_DROPFUNCTION", - "TOK_ANALYZE", "TOK_ALTERDATABASE_PROPERTIES", "TOK_ALTERINDEX_PROPERTIES", "TOK_ALTERINDEX_REBUILD", @@ -92,7 +93,6 @@ private[hive] object HiveQl { "TOK_ALTERTABLE_SKEWED", "TOK_ALTERTABLE_TOUCH", "TOK_ALTERTABLE_UNARCHIVE", - "TOK_ANALYZE", "TOK_CREATEDATABASE", "TOK_CREATEFUNCTION", "TOK_CREATEINDEX", @@ -239,7 +239,6 @@ private[hive] object HiveQl { ShellCommand(sql.drop(1)) } else { val tree = getAst(sql) - if (nativeCommands contains tree.getText) { NativeCommand(sql) } else { @@ -387,6 +386,22 @@ private[hive] object HiveQl { ifExists) => val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") DropTable(tableName, ifExists.nonEmpty) + // Support "ANALYZE TABLE tableNmae COMPUTE STATISTICS noscan" + case Token("TOK_ANALYZE", + Token("TOK_TAB", Token("TOK_TABNAME", tableNameParts) :: partitionSpec) :: + isNoscan) => + // Reference: + // https://cwiki.apache.org/confluence/display/Hive/StatsDev#StatsDev-ExistingTables + if (partitionSpec.nonEmpty) { + // Analyze partitions will be treated as a Hive native command. + NativePlaceholder + } else if (isNoscan.isEmpty) { + // If users do not specify "noscan", it will be treated as a Hive native command. + NativePlaceholder + } else { + val tableName = tableNameParts.map { case Token(p, Nil) => p }.mkString(".") + AnalyzeTable(tableName) + } // Just fake explain for any of the native commands. case Token("TOK_EXPLAIN", explainArgs) if noExplainCommands.contains(explainArgs.head.getText) => diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 2175c5f3835a6..85d2496a34cfb 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -83,6 +83,8 @@ private[hive] trait HiveStrategies { case DropTable(tableName, ifExists) => execution.DropTable(tableName, ifExists) :: Nil + case AnalyzeTable(tableName) => execution.AnalyzeTable(tableName) :: Nil + case describe: logical.DescribeCommand => val resolvedTable = context.executePlan(describe.table).analyzed resolvedTable match { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala similarity index 72% rename from sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala rename to sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala index 9cd0c86c6c796..2985169da033c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/DropTable.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala @@ -23,6 +23,32 @@ import org.apache.spark.sql.catalyst.expressions.Row import org.apache.spark.sql.execution.{Command, LeafNode} import org.apache.spark.sql.hive.HiveContext +/** + * :: DeveloperApi :: + * Analyzes the given table in the current database to generate statistics, which will be + * used in query optimizations. + * + * Right now, it only supports Hive tables and it only updates the size of a Hive table + * in the Hive metastore. + */ +@DeveloperApi +case class AnalyzeTable(tableName: String) extends LeafNode with Command { + + def hiveContext = sqlContext.asInstanceOf[HiveContext] + + def output = Seq.empty + + override protected[sql] lazy val sideEffectResult = { + hiveContext.analyze(tableName) + Seq.empty[Any] + } + + override def execute(): RDD[Row] = { + sideEffectResult + sparkContext.emptyRDD[Row] + } +} + /** * :: DeveloperApi :: * Drops a table from the metastore and removes it if it is cached. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index bf5931bbf97ee..7c82964b5ecdc 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -19,13 +19,54 @@ package org.apache.spark.sql.hive import scala.reflect.ClassTag + import org.apache.spark.sql.{SQLConf, QueryTest} +import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.execution.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ class StatisticsSuite extends QueryTest { + test("parse analyze commands") { + def assertAnalyzeCommand(analyzeCommand: String, c: Class[_]) { + val parsed = HiveQl.parseSql(analyzeCommand) + val operators = parsed.collect { + case a: AnalyzeTable => a + case o => o + } + + assert(operators.size === 1) + if (operators(0).getClass() != c) { + fail( + s"""$analyzeCommand expected command: $c, but got ${operators(0)} + |parsed command: + |$parsed + """.stripMargin) + } + } + + assertAnalyzeCommand( + "ANALYZE TABLE Table1 COMPUTE STATISTICS", + classOf[NativeCommand]) + assertAnalyzeCommand( + "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS", + classOf[NativeCommand]) + assertAnalyzeCommand( + "ANALYZE TABLE Table1 PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan", + classOf[NativeCommand]) + assertAnalyzeCommand( + "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS", + classOf[NativeCommand]) + assertAnalyzeCommand( + "ANALYZE TABLE Table1 PARTITION(ds, hr) COMPUTE STATISTICS noscan", + classOf[NativeCommand]) + + assertAnalyzeCommand( + "ANALYZE TABLE Table1 COMPUTE STATISTICS nOscAn", + classOf[AnalyzeTable]) + } + test("analyze MetastoreRelations") { def queryTotalSize(tableName: String): BigInt = catalog.lookupRelation(None, tableName).statistics.sizeInBytes @@ -37,7 +78,7 @@ class StatisticsSuite extends QueryTest { assert(queryTotalSize("analyzeTable") === defaultSizeInBytes) - analyze("analyzeTable") + sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan") assert(queryTotalSize("analyzeTable") === BigInt(11624)) @@ -66,7 +107,7 @@ class StatisticsSuite extends QueryTest { assert(queryTotalSize("analyzeTable_part") === defaultSizeInBytes) - analyze("analyzeTable_part") + sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan") assert(queryTotalSize("analyzeTable_part") === BigInt(17436)) From b7c89a7f0ca73153dce36e0f01b81a3947ee1189 Mon Sep 17 00:00:00 2001 From: chutium Date: Fri, 8 Aug 2014 13:31:08 -0700 Subject: [PATCH 09/11] [SPARK-2700] [SQL] Hidden files (such as .impala_insert_staging) should be filtered out by sqlContext.parquetFile Author: chutium Closes #1691 from chutium/SPARK-2700 and squashes the following commits: b76ae8c [chutium] [SPARK-2700] [SQL] fixed styling issue d75a8bd [chutium] [SPARK-2700] [SQL] Hidden files (such as .impala_insert_staging) should be filtered out by sqlContext.parquetFile --- .../scala/org/apache/spark/sql/parquet/ParquetTypes.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala index aaef1a1d474fe..2867dc0a8b1f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala @@ -373,8 +373,9 @@ private[parquet] object ParquetTypesConverter extends Logging { } ParquetRelation.enableLogForwarding() - val children = fs.listStatus(path).filterNot { - _.getPath.getName == FileOutputCommitter.SUCCEEDED_FILE_NAME + val children = fs.listStatus(path).filterNot { status => + val name = status.getPath.getName + name(0) == '.' || name == FileOutputCommitter.SUCCEEDED_FILE_NAME } // NOTE (lian): Parquet "_metadata" file can be very slow if the file consists of lots of row From 74d6f62264babfc6045c21545552f0a2e6958155 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 8 Aug 2014 15:07:31 -0700 Subject: [PATCH 10/11] [SPARK-1997][MLLIB] update breeze to 0.9 0.9 dependences (this version doesn't depend on scalalogging and I excluded commons-math3 from its transitive dependencies): ~~~ +-org.scalanlp:breeze_2.10:0.9 [S] +-com.github.fommil.netlib:core:1.1.2 +-com.github.rwl:jtransforms:2.4.0 +-net.sf.opencsv:opencsv:2.3 +-net.sourceforge.f2j:arpack_combined_all:0.1 +-org.scalanlp:breeze-macros_2.10:0.3.1 [S] | +-org.scalamacros:quasiquotes_2.10:2.0.0 [S] | +-org.slf4j:slf4j-api:1.7.5 +-org.spire-math:spire_2.10:0.7.4 [S] +-org.scalamacros:quasiquotes_2.10:2.0.0 [S] | +-org.spire-math:spire-macros_2.10:0.7.4 [S] +-org.scalamacros:quasiquotes_2.10:2.0.0 [S] ~~~ Closes #1749 CC: witgo avati Author: Xiangrui Meng Closes #1857 from mengxr/breeze-0.9 and squashes the following commits: 7fc16b6 [Xiangrui Meng] don't know why but exclude a private method for mima dcc502e [Xiangrui Meng] update breeze to 0.9 --- mllib/pom.xml | 2 +- .../org/apache/spark/mllib/linalg/distributed/RowMatrix.scala | 4 ++-- .../spark/mllib/linalg/distributed/RowMatrixSuite.scala | 2 +- project/MimaExcludes.scala | 4 ++++ 4 files changed, 8 insertions(+), 4 deletions(-) diff --git a/mllib/pom.xml b/mllib/pom.xml index 9a33bd1cf6ad1..fc1ecfbea708f 100644 --- a/mllib/pom.xml +++ b/mllib/pom.xml @@ -57,7 +57,7 @@ org.scalanlp breeze_${scala.binary.version} - 0.7 + 0.9 diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala index 45486b2c7d82d..e76bc9fefff01 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala @@ -222,7 +222,7 @@ class RowMatrix( EigenValueDecomposition.symmetricEigs(v => G * v, n, k, tol, maxIter) case SVDMode.LocalLAPACK => val G = computeGramianMatrix().toBreeze.asInstanceOf[BDM[Double]] - val (uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G) + val brzSvd.SVD(uFull: BDM[Double], sigmaSquaresFull: BDV[Double], _) = brzSvd(G) (sigmaSquaresFull, uFull) case SVDMode.DistARPACK => require(k < n, s"k must be smaller than n in dist-eigs mode but got k=$k and n=$n.") @@ -338,7 +338,7 @@ class RowMatrix( val Cov = computeCovariance().toBreeze.asInstanceOf[BDM[Double]] - val (u: BDM[Double], _, _) = brzSvd(Cov) + val brzSvd.SVD(u: BDM[Double], _, _) = brzSvd(Cov) if (k == n) { Matrices.dense(n, k, u.data) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala index 325b817980f68..1d3a3221365cc 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/distributed/RowMatrixSuite.scala @@ -99,7 +99,7 @@ class RowMatrixSuite extends FunSuite with LocalSparkContext { for (mat <- Seq(denseMat, sparseMat)) { for (mode <- Seq("auto", "local-svd", "local-eigs", "dist-eigs")) { val localMat = mat.toBreeze() - val (localU, localSigma, localVt) = brzSvd(localMat) + val brzSvd.SVD(localU, localSigma, localVt) = brzSvd(localMat) val localV: BDM[Double] = localVt.t.toDenseMatrix for (k <- 1 to n) { val skip = (mode == "local-eigs" || mode == "dist-eigs") && k == n diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 537ca0dcf267d..b4653c72c10b5 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -110,6 +110,10 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.LabelParser$"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.mllib.util.MulticlassLabelParser$") + ) ++ + Seq ( // package-private classes removed in MLlib + ProblemFilters.exclude[MissingMethodProblem]( + "org.apache.spark.mllib.regression.GeneralizedLinearAlgorithm.org$apache$spark$mllib$regression$GeneralizedLinearAlgorithm$$prependOne") ) case v if v.startsWith("1.0") => Seq( From ec79063fad44751a6689f5e58d47886babeaecff Mon Sep 17 00:00:00 2001 From: GuoQiang Li Date: Fri, 8 Aug 2014 16:57:26 -0700 Subject: [PATCH 11/11] [SPARK-2897][SPARK-2920]TorrentBroadcast does use the serializer class specified in the spark option "spark.serializer" Author: GuoQiang Li Closes #1836 from witgo/SPARK-2897 and squashes the following commits: 23cdc5b [GuoQiang Li] review commit ada4fba [GuoQiang Li] TorrentBroadcast does not support broadcast compression fb91792 [GuoQiang Li] org.apache.spark.broadcast.TorrentBroadcast does use the serializer class specified in the spark option "spark.serializer" --- .../spark/broadcast/TorrentBroadcast.scala | 31 +++++++++++++++---- .../spark/broadcast/BroadcastSuite.scala | 10 ++++-- 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index 86731b684f441..fe73456ef8fad 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -17,14 +17,15 @@ package org.apache.spark.broadcast -import java.io.{ByteArrayInputStream, ObjectInputStream, ObjectOutputStream} +import java.io.{ByteArrayOutputStream, ByteArrayInputStream, InputStream, + ObjectInputStream, ObjectOutputStream, OutputStream} import scala.reflect.ClassTag import scala.util.Random import org.apache.spark.{Logging, SparkConf, SparkEnv, SparkException} +import org.apache.spark.io.CompressionCodec import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} -import org.apache.spark.util.Utils /** * A [[org.apache.spark.broadcast.Broadcast]] implementation that uses a BitTorrent-like @@ -214,11 +215,15 @@ private[broadcast] object TorrentBroadcast extends Logging { private lazy val BLOCK_SIZE = conf.getInt("spark.broadcast.blockSize", 4096) * 1024 private var initialized = false private var conf: SparkConf = null + private var compress: Boolean = false + private var compressionCodec: CompressionCodec = null def initialize(_isDriver: Boolean, conf: SparkConf) { TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests synchronized { if (!initialized) { + compress = conf.getBoolean("spark.broadcast.compress", true) + compressionCodec = CompressionCodec.createCodec(conf) initialized = true } } @@ -228,8 +233,13 @@ private[broadcast] object TorrentBroadcast extends Logging { initialized = false } - def blockifyObject[T](obj: T): TorrentInfo = { - val byteArray = Utils.serialize[T](obj) + def blockifyObject[T: ClassTag](obj: T): TorrentInfo = { + val bos = new ByteArrayOutputStream() + val out: OutputStream = if (compress) compressionCodec.compressedOutputStream(bos) else bos + val ser = SparkEnv.get.serializer.newInstance() + val serOut = ser.serializeStream(out) + serOut.writeObject[T](obj).close() + val byteArray = bos.toByteArray val bais = new ByteArrayInputStream(byteArray) var blockNum = byteArray.length / BLOCK_SIZE @@ -255,7 +265,7 @@ private[broadcast] object TorrentBroadcast extends Logging { info } - def unBlockifyObject[T]( + def unBlockifyObject[T: ClassTag]( arrayOfBlocks: Array[TorrentBlock], totalBytes: Int, totalBlocks: Int): T = { @@ -264,7 +274,16 @@ private[broadcast] object TorrentBroadcast extends Logging { System.arraycopy(arrayOfBlocks(i).byteArray, 0, retByteArray, i * BLOCK_SIZE, arrayOfBlocks(i).byteArray.length) } - Utils.deserialize[T](retByteArray, Thread.currentThread.getContextClassLoader) + + val in: InputStream = { + val arrIn = new ByteArrayInputStream(retByteArray) + if (compress) compressionCodec.compressedInputStream(arrIn) else arrIn + } + val ser = SparkEnv.get.serializer.newInstance() + val serIn = ser.deserializeStream(in) + val obj = serIn.readObject[T]() + serIn.close() + obj } /** diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 7c3d0208b195a..17c64455b2429 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -44,7 +44,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { test("Accessing HttpBroadcast variables in a local cluster") { val numSlaves = 4 - sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", httpConf) + val conf = httpConf.clone + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.broadcast.compress", "true") + sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf) val list = List[Int](1, 2, 3, 4) val broadcast = sc.broadcast(list) val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum)) @@ -69,7 +72,10 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { test("Accessing TorrentBroadcast variables in a local cluster") { val numSlaves = 4 - sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", torrentConf) + val conf = torrentConf.clone + conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + conf.set("spark.broadcast.compress", "true") + sc = new SparkContext("local-cluster[%d, 1, 512]".format(numSlaves), "test", conf) val list = List[Int](1, 2, 3, 4) val broadcast = sc.broadcast(list) val results = sc.parallelize(1 to numSlaves).map(x => (x, broadcast.value.sum))