Skip to content

Commit

Permalink
[SPARK-2503] Lower shuffle output buffer (spark.shuffle.file.buffer.k…
Browse files Browse the repository at this point in the history
…b) to 32KB.

This can substantially reduce memory usage during shuffle.

Author: Reynold Xin <rxin@apache.org>

Closes apache#1781 from rxin/SPARK-2503-spark.shuffle.file.buffer.kb and squashes the following commits:

104b8d8 [Reynold Xin] [SPARK-2503] Lower shuffle output buffer (spark.shuffle.file.buffer.kb) to 32KB.
  • Loading branch information
rxin committed Aug 5, 2014
1 parent cc491f6 commit acff9a7
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[spark] class SortShuffleWriter[K, V, C](
private val ser = Serializer.getSerializer(dep.serializer.orNull)

private val conf = SparkEnv.get.conf
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

private var sorter: ExternalSorter[K, V, _] = null
private var outputFile: File = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class ShuffleBlockManager(blockManager: BlockManager) extends Logging {
val sortBasedShuffle =
conf.get("spark.shuffle.manager", "") == classOf[SortShuffleManager].getName

private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val bufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

/**
* Contains all the state related to a particular shuffle. This includes a pool of unused
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ class ExternalAppendOnlyMap[K, V, C](
private var _memoryBytesSpilled = 0L
private var _diskBytesSpilled = 0L

private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val fileBufferSize = sparkConf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024
private val keyComparator = new HashComparator[K]
private val ser = serializer.newInstance()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ private[spark] class ExternalSorter[K, V, C](

private val conf = SparkEnv.get.conf
private val spillingEnabled = conf.getBoolean("spark.shuffle.spill", true)
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 100) * 1024
private val fileBufferSize = conf.getInt("spark.shuffle.file.buffer.kb", 32) * 1024

// Size of object batches when reading/writing from serializers.
//
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ Apart from these, the following properties are also available, and may be useful
</tr>
<tr>
<td><code>spark.shuffle.file.buffer.kb</code></td>
<td>100</td>
<td>32</td>
<td>
Size of the in-memory buffer for each shuffle file output stream, in kilobytes. These buffers
reduce the number of disk seeks and system calls made in creating intermediate shuffle files.
Expand Down

0 comments on commit acff9a7

Please sign in to comment.