-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-6194] [SPARK-677] [PySpark] fix memory leak in collect() #4923
Conversation
Test build #28324 has started for PR 4923 at commit
|
It looks like we take the result of |
Test build #28324 has finished for PR 4923 at commit
|
Test FAILed. |
I had some thought of that, found that by separating these two then we can reuse |
Test build #28331 has started for PR 4923 at commit
|
Test build #28331 has finished for PR 4923 at commit
|
Test FAILed. |
Test build #28347 has started for PR 4923 at commit
|
Test build #28347 has finished for PR 4923 at commit
|
Test PASSed. |
Now that this has been updated to collect results via a socket, it looks like we may finally be able to close https://issues.apache.org/jira/browse/SPARK-677, one of the oldest Spark issues. |
@@ -341,7 +342,7 @@ private[spark] object PythonRDD extends Logging { | |||
/** | |||
* Adapter for calling SparkContext#runJob from Python. | |||
* | |||
* This method will return an iterator of an array that contains all elements in the RDD | |||
* This method will serve an iterator of an array that contains all elements in the RDD | |||
* (effectively a collect()), but allows you to run on a certain subset of partitions, | |||
* or to enable local execution. | |||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Even with the updated description, this method's return type could be confusing to new readers.
It might help to add an explicit description of the return type, e.g.
@return the port number of a local socket which serves the data collected from this job.
We should also document the lifecycle of the server socket created by this method: what happens if a client does not consume the whole iterator? Is it the caller's responsibility to close the socket at that point?
This looks really good to me overall. It would be great if you could update the PR description to reflect the most recent changes (collecting via a socket instead of a temporary file). A couple of quick questions:
|
Test build #28405 has started for PR 4923 at commit
|
@JoshRosen This should have positive performance impact, the end-to-end time of a job (collect 50M integers) changed from 10.6s to 9.2s. For the additional failure tests, it's nice to have. I'd like leave them out of this PR, do it later if we really need it. I think we should backport this as-is to maintenance branches. |
LGTM; I'll merge this as soon as Jenkins passes. Thanks! |
Test build #28405 has finished for PR 4923 at commit
|
Test PASSed. |
Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM. This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python. cc JoshRosen Author: Davies Liu <davies@databricks.com> Closes #4923 from davies/fix_collect and squashes the following commits: d730286 [Davies Liu] address comments 24c92a4 [Davies Liu] fix style ba54614 [Davies Liu] use socket to transfer data from JVM 9517c8f [Davies Liu] fix memory leak in collect() (cherry picked from commit 8767565) Signed-off-by: Josh Rosen <joshrosen@databricks.com> Conflicts: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala python/pyspark/rdd.py python/pyspark/sql/dataframe.py
I've merged this into |
I have now cherry-picked this into |
Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM. This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python. cc JoshRosen Author: Davies Liu <davies@databricks.com> Closes #4923 from davies/fix_collect and squashes the following commits: d730286 [Davies Liu] address comments 24c92a4 [Davies Liu] fix style ba54614 [Davies Liu] use socket to transfer data from JVM 9517c8f [Davies Liu] fix memory leak in collect() (cherry picked from commit 8767565) Signed-off-by: Josh Rosen <joshrosen@databricks.com>
Note that this patch introduced a bug which can call |
Because circular reference between JavaObject and JavaMember, an Java object can not be released until Python GC kick in, then it will cause memory leak in collect(), which may consume lots of memory in JVM.
This PR change the way we sending collected data back into Python from local file to socket, which could avoid any disk IO during collect, also avoid any referrers of Java object in Python.
cc @JoshRosen