-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
SPARK-2711. Create a ShuffleMemoryManager to track memory for all spilling collections #1707
Conversation
QA tests have started for PR 1707. This patch merges cleanly. |
QA results for PR 1707: |
test this please |
QA tests have started for PR 1707. This patch merges cleanly. |
QA results for PR 1707: |
threadMemory(threadId) = curMem + numBytes | ||
// Notify other waiting threads because the # active of threads may have increased, so | ||
// they may cancel their current waits | ||
notifyAll() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps notifyAll unconditionally (or conditioned only on having increased the number of active threads)
QA tests have started for PR 1707. This patch merges cleanly. |
QA results for PR 1707: |
* some situations, to make sure each thread has a chance to ramp up to a reasonable share of | ||
* the available memory before being forced to spill. | ||
*/ | ||
def tryToAcquire(numBytes: Long): Boolean = synchronized { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We talked offline about possibly having this allocate "as much as possible" rather than all-or-nothing. Did you decide one way or another?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I'm still working on that.
QA tests have started for PR 1707. This patch merges cleanly. |
QA results for PR 1707: |
BTW that failing test ^ is exactly the one that fails on my laptop due to the issue that #1722 fixes. |
// All accesses should be manually synchronized | ||
val shuffleMemoryMap = mutable.HashMap[Long, Long]() | ||
// Manages the memory used by externally spilling collections in shuffle operations | ||
val shuffleMemoryManager = new ShuffleMemoryManager(conf) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to add this to the constructor, like we do for other *Managers
?
This tracks memory properly if there are multiple spilling collections in the same task (which was a problem before), and also implements an algorithm that lets each thread grow up to 1 / 2N of the memory pool (where N is the number of threads) before spilling, which avoids an inefficiency with small spills we had before (some threads would spill many times at 0-1 MB because the pool was allocated elsewhere).
- Always notifyAll if a new thread was added in tryToAcquire - Log when a thread blocks
Instead of returning false if we can't grant all the memory a caller requested, we can now grant part of their request, while still keeping the previous behavior of not forcing a thread to spill if it has less than 1 / 2N, and not letting any thread get more than 1 / N. This should better utilize the available shuffle memory pool.
Thanks Andrew. I think I've addressed all the comments. |
QA tests have started for PR 1707. This patch merges cleanly. |
LGTM |
Thanks for the review, going to merge this then. |
Actually let me retest it since the previous run was cancelled. |
Jenkins, test this please |
QA tests have started for PR 1707. This patch merges cleanly. |
test this please |
QA tests have started for PR 1707. This patch merges cleanly. |
Jenkins actually passed this (see https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17919/consoleFull) but a glitch in the reporting script made it not post here, so going to merge it. |
Thanks for the review. |
…lling collections This tracks memory properly if there are multiple spilling collections in the same task (which was a problem before), and also implements an algorithm that lets each thread grow up to 1 / 2N of the memory pool (where N is the number of threads) before spilling, which avoids an inefficiency with small spills we had before (some threads would spill many times at 0-1 MB because the pool was allocated elsewhere). Author: Matei Zaharia <matei@databricks.com> Closes #1707 from mateiz/spark-2711 and squashes the following commits: debf75b [Matei Zaharia] Review comments 24f28f3 [Matei Zaharia] Small rename c8f3a8b [Matei Zaharia] Update ShuffleMemoryManager to be able to partially grant requests 315e3a5 [Matei Zaharia] Some review comments b810120 [Matei Zaharia] Create central manager to track memory for all spilling collections (cherry picked from commit 4fde28c) Signed-off-by: Matei Zaharia <matei@databricks.com>
…lling collections This tracks memory properly if there are multiple spilling collections in the same task (which was a problem before), and also implements an algorithm that lets each thread grow up to 1 / 2N of the memory pool (where N is the number of threads) before spilling, which avoids an inefficiency with small spills we had before (some threads would spill many times at 0-1 MB because the pool was allocated elsewhere). Author: Matei Zaharia <matei@databricks.com> Closes apache#1707 from mateiz/spark-2711 and squashes the following commits: debf75b [Matei Zaharia] Review comments 24f28f3 [Matei Zaharia] Small rename c8f3a8b [Matei Zaharia] Update ShuffleMemoryManager to be able to partially grant requests 315e3a5 [Matei Zaharia] Some review comments b810120 [Matei Zaharia] Create central manager to track memory for all spilling collections
… test in Spark rio (apache#1707) We run Iceberg unit tests in Spark rio. During it, it replaces Iceberg’s Hive and Spark version with latest versions from checkout Spark repo, to ensure latest Spark/Hive work with Iceberg. As Boson is included in both Iceberg and Spark like Hive, we need to do the same for Boson dependency.
This tracks memory properly if there are multiple spilling collections in the same task (which was a problem before), and also implements an algorithm that lets each thread grow up to 1 / 2N of the memory pool (where N is the number of threads) before spilling, which avoids an inefficiency with small spills we had before (some threads would spill many times at 0-1 MB because the pool was allocated elsewhere).