-
Notifications
You must be signed in to change notification settings - Fork 28.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1991] Support custom storage levels for vertices and edges #946
Conversation
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15359/ |
Merged build triggered. |
Merged build started. |
Merged build finished. |
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15361/ |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD. | ||
*/ | ||
private[graphx] def withTargetStorageLevel( | ||
targetStorageLevel_ : StorageLevel): EdgeRDD[ED, VD] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is ok to just shadow the class member targetStorageLevel rather than adding a weird _ at the end ...
Merged build triggered. |
Merged build started. |
@@ -52,25 +56,48 @@ object Analytics extends Logging { | |||
} | |||
} | |||
|
|||
def pickStorageLevel(v: String): StorageLevel = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps move this into spark storagelevle itself.
The changes look good to me, other than the minor thing on storage level. |
Merged build triggered. |
Merged build started. |
Merged build finished. All automated tests passed. |
All automated tests passed. |
@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition | |||
* `impl.ReplicatedVertexView`. | |||
*/ | |||
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( | |||
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]) | |||
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], | |||
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) | |||
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should EdgeRDD be marked @DeveloperAPI
? Or can users use it directly? This is technically a binary-compat breaking change (though it doesn't affect source compat).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW you could avoid the breakage by having separate 2-arg and 3-arg constructors but if this is an internal API it's fine to leave it. Just wanted to ask whether users call this directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Users may manipulate it directly, because it's returned by Graph#edges
, but they should never call the constructor. I actually wanted to make this constructor private, but that interfered with Scala specialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, weird. Probably long-term the way to do it might be to create a trait EdgeRDD
that users see, and an EdgeRDDImpl
that is private[graphx]
.
Merged build finished. All automated tests passed. |
All automated tests passed. |
Hey so one comment, overall this patch has several areas where it introduces default parameters, including some public APIs (e.g. Graph.fromEdgeTuples). This will break binary compatibility but not source compatibility in Scala. Since GraphX is alpha, maybe that's okay, but we need to decide on what granularity of releases we can make such changes.
|
I think it's not essential to get this into 1.0.1 since that'll be a bugfix release, but I agree about default parameters. In a future PR, or maybe even this one, I can remove the default values. Unfortunately it won't work to use the builder pattern for this, because Graph.fromEdgeTuples calls cache() and therefore needs the storage levels immediately. |
Thanks. I am merging this in master. |
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed. The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels. In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods. I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed. Author: Ankur Dave <ankurdave@gmail.com> Closes apache#946 from ankurdave/SPARK-1991 and squashes the following commits: ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0 c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks" 34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks 6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
Author: Ankur Dave <ankurdave@gmail.com> Closes apache#970 from ankurdave/SPARK-1991_docfix and squashes the following commits: 6d07343 [Ankur Dave] Minor: Fix documentation error from apache#946
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed. The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels. In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods. I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed. Author: Ankur Dave <ankurdave@gmail.com> Closes apache#946 from ankurdave/SPARK-1991 and squashes the following commits: ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0 c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks" 34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks 6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
Author: Ankur Dave <ankurdave@gmail.com> Closes apache#970 from ankurdave/SPARK-1991_docfix and squashes the following commits: 6d07343 [Ankur Dave] Minor: Fix documentation error from apache#946
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed.
The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the
targetStorageLevel
attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user callsGraph#cache()
, the vertices and edges are persisted using their target storage levels.In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the
withPartitionsRDD
andwithTargetStorageLevel
methods.I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed.