Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10342] [SPARK-10309] [SPARK-10474] [SPARK-10929] [SQL] Cooperative memory management #9241

Closed
wants to merge 17 commits into from

Conversation

davies
Copy link
Contributor

@davies davies commented Oct 23, 2015

This PR introduce a mechanism to call spill() on those SQL operators that support spilling (for example, BytesToBytesMap, UnsafeExternalSorter and ShuffleExternalSorter) if there is not enough memory for execution. The preserved first page is needed anymore, so removed.

Other Spillable objects in Spark core (ExternalSorter and AppendOnlyMap) are not included in this PR, but those could benefit from this (trigger others' spilling).

The PrepareRDD may be not needed anymore, could be removed in follow up PR.

The following script will fail with OOM before this PR, finished in 150 seconds with 2G heap (also works in 1.5 branch, with similar duration).

sqlContext.setConf("spark.sql.shuffle.partitions", "1")
df = sqlContext.range(1<<25).selectExpr("id", "repeat(id, 2) as s")
df2 = df.select(df.id.alias('id2'), df.s.alias('s2'))
j = df.join(df2, df.id==df2.id2).groupBy(df.id).max("id", "id2")
j.explain()
print j.count()

For thread-safety, here what I'm got:

  1. Without calling spill(), the operators should only be used by single thread, no safety problems.

  2. spill() could be triggered in two cases, triggered by itself, or by other operators. we can check trigger == this in spill(), so it's still in the same thread, so safety problems.

  3. if it's triggered by other operators (right now cache will not trigger spill()), we only spill the data into disk when it's in scanning stage (building is finished), so the in-memory sorter or memory pages are read-only, we only need to synchronize the iterator and change it.

  4. During scanning, the iterator will only use one record in one page, we can't free this page, because the downstream is currently using it (used by UnsafeRow or other objects). In BytesToBytesMap, we just skip the current page, and dump all others into disk. In UnsafeExternalSorter, we keep the page that is used by current record (having the same baseObject), free it when loading the next record. In ShuffleExternalSorter, the spill() will not trigger during scanning.

  5. In order to avoid deadlock, we didn't call acquireMemory during spill (so we reused the pointer array in InMemorySorter).

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44210 has finished for PR 9241 at commit 3562476.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class BytesToBytesMapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n * abstract class MemoryConsumer\n

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #44242 has finished for PR 9241 at commit 0c77c94.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class BytesToBytesMapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n * abstract class MemoryConsumer\n

@SparkQA
Copy link

SparkQA commented Oct 23, 2015

Test build #1948 has started for PR 9241 at commit 5c198cf.

@SparkQA
Copy link

SparkQA commented Oct 24, 2015

Test build #44265 has finished for PR 9241 at commit 5c198cf.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class BytesToBytesMapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n * abstract class MemoryConsumer\n

@SparkQA
Copy link

SparkQA commented Oct 24, 2015

Test build #44275 has finished for PR 9241 at commit 86e47ca.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds no public classes.


class TungstenAggregationIteratorSuite extends SparkFunSuite with SharedSQLContext {

test("memory acquired on construction") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a consequence of no longer needing the reserved page now that we have force-spilling support, right?

@JoshRosen
Copy link
Contributor

I know that this is still WIP, but were you thinking about also enabling this for the two Spillable collections (ExternalAppendOnlyMap and ExternalSorter)? That's probably a lower priority given that we're most concerned about optimizing SQL's memory usage, but it would still be nice to do. If we decide to defer it for now, let's create a followup task to do it in the next release.

offsetInPage += 4 + totalLength;
currentRecordNumber++;
return loc;
numRecords --;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind omitting the space between numRecords and --?

@JoshRosen
Copy link
Contributor

Hey @davies,

This patch makes sense to me at a high-level, but I have a few questions:

  • Could you add a description to this pull request and share your list of remaining TODOs with me (maybe by posting it as a checklist in the PR description)?
  • Can you comment on the thread-safety concerns here? My current understanding is that we don't have to worry about memory-manager-triggered spills racing with other interactions on the spillables because the iterator model provides some implicit synchronization. In order to convince ourselves that this is safe in all cases, however, I'd like to think through two corner-cases:
    • What happens if a single task contains multiple threads? Currently, this can happen in PythonRDD, PipedRDD, and a couple of other places. All of these cases are situations where we have a writer or reader thread for interacting with an external process. Although we have separate threads, they are somewhat synchronous / coupled via their interaction with the external process. This could be tricky, though, so I'd like to talk through some examples to make sure we've covered all of the tricky cases.
    • What happens if operator B is in the middle of processing a next() call on its iterator, which calls it's parent's next() method, which requires memory to grow, which triggers a spill that drains memory from A and de-allocates or spills data structures that it's relying on? Do we have to make any distinctions between asking an upstream operator to spill versus a downstream one?
    • Are there any risks of deadlocks with the extra synchronization added here?

I'm going to focus on merging my memory manager unification patch tonight so that we can start rebasing this tomorrow.

@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44447 has finished for PR 9241 at commit d0ada7b.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@davies davies changed the title [SPARK-10342] [SQL] [WIP] Cooperative memory management [SPARK-10342] [SPARK-10309] [SQL] [WIP] Cooperative memory management Oct 27, 2015
@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44453 has finished for PR 9241 at commit 49b8135.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -101,27 +106,95 @@
private final boolean inHeap;

/**
* The size of memory granted to each consumer.
*/
private HashMap<MemoryConsumer, Long> consumers;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be final.

@SparkQA
Copy link

SparkQA commented Oct 27, 2015

Test build #44450 has finished for PR 9241 at commit ee6b9a4.

  • This patch fails from timeout after a configured wait of 250m.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

Platform.putInt(currentPage.getBaseObject(), currentPage.getBaseOffset(), 0);
pageCursor = 4;
return true;
} catch (OutOfMemoryError e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we want to catch OOME here, I think that we should do it at a much smaller scope (in the assignment to currentPage but not for adding to dataPages or modifying the page cursor. Given the risks of catching OOME that I mentioned above, the scope of the catch should be as narrow as possible.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44644 has finished for PR 9241 at commit afc8c7c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

}
try {
reader = spillWriters.getFirst().getReader(blockManager);
recordsInPage = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set this to -1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, because we rely on reader.hasNext() when we're dealing with on-disk data.

@JoshRosen
Copy link
Contributor

LGTM overall right now (pending Jenkins for commit 4ee1f42). We can address any minor issues in followups.

I'll take one final glance at one of the new unit tests when I get home, then will merge this to unblock my patch.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44647 has finished for PR 9241 at commit cda4b2a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44652 has finished for PR 9241 at commit 4ee1f42.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44656 has finished for PR 9241 at commit e943e74.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):\n * public abstract class MemoryConsumer\n * final class ShuffleExternalSorter extends MemoryConsumer\n * public final class BytesToBytesMap extends MemoryConsumer\n * public final class MapIterator implements Iterator<Location>\n * public final class UnsafeExternalSorter extends MemoryConsumer\n * class SpillableIterator extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillReader extends UnsafeSorterIterator\n * public final class UnsafeSorterSpillWriter\n

@davies
Copy link
Contributor Author

davies commented Oct 30, 2015

@JoshRosen I'm merging this into master, other comments will be addressed by followup PR.

@JoshRosen
Copy link
Contributor

Great, sounds good to me! I do think that this patch might benefit from more comments to explain the code and I'd be happy to help with that in a followup PR.

@asfgit asfgit closed this in 56419cf Oct 30, 2015
break;
}
}
} catch (IOException e) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this catch clause be moved to wrap c.spill() at line 142 ?

asfgit pushed a commit that referenced this pull request Apr 12, 2016
## What changes were proposed in this pull request?

Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR #9241

## How was this patch tested?

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <skedia@fb.com>

Closes #12285 from sitalkedia/executor_oom.
asfgit pushed a commit that referenced this pull request Apr 12, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR #9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <skedia@fb.com>

Closes #12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <davies.liu@gmail.com>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
zzcclp pushed a commit to zzcclp/spark that referenced this pull request Apr 13, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <skedia@fb.com>

Closes apache#12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <davies.liu@gmail.com>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java

(cherry picked from commit 413d060)
liyezhang556520 pushed a commit to liyezhang556520/spark that referenced this pull request Apr 15, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <skedia@fb.com>

Closes apache#12285 from sitalkedia/executor_oom.

(cherry picked from commit d187e7d)
Signed-off-by: Davies Liu <davies.liu@gmail.com>

Conflicts:
	core/src/main/java/org/apache/spark/shuffle/sort/ShuffleInMemorySorter.java
	core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
asfgit pushed a commit that referenced this pull request Apr 21, 2016
…same thread for memory

## What changes were proposed in this pull request?
In #9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from #9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

## How was this patch tested?
add two unit tests for it.

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes #10024 from lianhuiwang/SPARK-4452-2.
Parth-Brahmbhatt pushed a commit to Parth-Brahmbhatt/spark that referenced this pull request Jul 25, 2016
Fix memory leak in the Sorter. When the UnsafeExternalSorter spills the data to disk, it does not free up the underlying pointer array. As a result, we see a lot of executor OOM and also memory under utilization.
This is a regression partially introduced in PR apache#9241

Tested by running a job and observed around 30% speedup after this change.

Author: Sital Kedia <skedia@fb.com>

Closes apache#12285 from sitalkedia/executor_oom.
Parth-Brahmbhatt pushed a commit to Parth-Brahmbhatt/spark that referenced this pull request Jul 25, 2016
…same thread for memory

In apache#9241 It implemented a mechanism to call spill() on those SQL operators that support spilling if there is not enough memory for execution.
But ExternalSorter and AppendOnlyMap in Spark core are not worked. So this PR make them benefit from apache#9241. Now when there is not enough memory for execution, it can get memory by spilling ExternalSorter and AppendOnlyMap in Spark core.

add two unit tests for it.

Author: Lianhui Wang <lianhuiwang09@gmail.com>

Closes apache#10024 from lianhuiwang/SPARK-4452-2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants