-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management #9241
Conversation
Test build #44210 has finished for PR 9241 at commit
|
Test build #44242 has finished for PR 9241 at commit
|
Test build #1948 has started for PR 9241 at commit |
Test build #44265 has finished for PR 9241 at commit
|
Test build #44275 has finished for PR 9241 at commit
|
|
||
class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext { | ||
|
||
test("memory acquired on construction") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a consequence of no longer needing the reserved page now that we have force-spilling support, right?
I know that this is still |
offsetInPage += 4 + totalLength; | ||
currentRecordNumber++; | ||
return loc; | ||
numRecords --; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mind omitting the space between numRecords
and --
?
Hey @davies, This patch makes sense to me at a high-level, but I have a few questions:
I'm going to focus on merging my memory manager unification patch tonight so that we can start rebasing this tomorrow. |
Test build #44447 has finished for PR 9241 at commit
|
Test build #44453 has finished for PR 9241 at commit
|
@@ -101,27 +106,95 @@ | |||
private final boolean inHeap; | |||
|
|||
/** | |||
* The size of memory granted to each consumer. | |||
*/ | |||
private HashMap<MemoryConsumer, Long> consumers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be final.
Test build #44450 has finished for PR 9241 at commit
|
Platform.putInt(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0); | ||
pageCursor = 4; | ||
return true; | ||
} catch (OutOfMemoryError e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we want to catch OOME here, I think that we should do it at a much smaller scope (in the assignment to currentPage
but not for adding to dataPages
or modifying the page cursor. Given the risks of catching OOME that I mentioned above, the scope of the catch should be as narrow as possible.
Test build #44644 has finished for PR 9241 at commit
|
} | ||
try { | ||
reader = spillWriters.getFirst().getReader(blockManager); | ||
recordsInPage = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why set this to -1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, because we rely on reader.hasNext()
when we're dealing with on-disk data.
LGTM overall right now (pending Jenkins for commit 4ee1f42). We can address any minor issues in followups. I'll take one final glance at one of the new unit tests when I get home, then will merge this to unblock my patch. |
Test build #44647 has finished for PR 9241 at commit
|
Test build #44652 has finished for PR 9241 at commit
|
Test build #44656 has finished for PR 9241 at commit
|
@JoshRosen I'm merging this into master, other comments will be addressed by followup PR. |
Great, sounds good to me! I do think that this patch might benefit from more comments to explain the code and I'd be happy to help with that in a followup PR. |
break; | ||
} | ||
} | ||
} catch (IOException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this catch clause be moved to wrap c.spill() at line 142 ?
## What changes were proposed in this pull request? Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR #9241 ## How was this patch tested? Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <skedia@fb.com> Closes #12285 from sitalkedia/executor_oom.
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR #9241 Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <skedia@fb.com> Closes #12285 from sitalkedia/executor_oom. (cherry picked from commit d187e7d) Signed-off-by: Davies Liu <davies.liu@gmail.com> Conflicts: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR apache#9241 Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <skedia@fb.com> Closes apache#12285 from sitalkedia/executor_oom. (cherry picked from commit d187e7d) Signed-off-by: Davies Liu <davies.liu@gmail.com> Conflicts: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java (cherry picked from commit 413d060)
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR apache#9241 Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <skedia@fb.com> Closes apache#12285 from sitalkedia/executor_oom. (cherry picked from commit d187e7d) Signed-off-by: Davies Liu <davies.liu@gmail.com> Conflicts: core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
…same thread for memory ## What changes were proposed in this pull request? In #9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution. But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from #9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core. ## How was this patch tested? add two unit tests for it. Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes #10024 from lianhuiwang/SPARK-4452-2.
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization. This is a regression partially introduced in PR apache#9241 Tested by running a job and observed around 30% speedup after this change. Author: Sital Kedia <skedia@fb.com> Closes apache#12285 from sitalkedia/executor_oom.
…same thread for memory In apache#9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution. But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from apache#9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core. add two unit tests for it. Author: Lianhui Wang <lianhuiwang09@gmail.com> Closes apache#10024 from lianhuiwang/SPARK-4452-2.
This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.
Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).
The PrepareRDD may be not needed anymore, could be removed in follow up PR.
The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).
For thread-safety, here what I'm got:
Without calling spill(), the operators should only be used by single thread, no safety problems.
spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.
if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.
During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.
In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).