Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1991] Support custom storage levels for vertices and edges #946

Closed
wants to merge 6 commits into from

Conversation

ankurdave
Copy link
Contributor

This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed.

The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the targetStorageLevel attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls Graph#cache(), the vertices and edges are persisted using their target storage levels.

In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the withPartitionsRDD and withTargetStorageLevel methods.

I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15359/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15361/

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15368/

@ankurdave
Copy link
Contributor Author

@rxin

* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel_ : StorageLevel): EdgeRDD[ED, VD] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is ok to just shadow the class member targetStorageLevel rather than adding a weird _ at the end ...

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@@ -52,25 +56,48 @@ object Analytics extends Logging {
}
}

def pickStorageLevel(v: String): StorageLevel = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps move this into spark storagelevle itself.

@rxin
Copy link
Contributor

rxin commented Jun 3, 2014

The changes look good to me, other than the minor thing on storage level.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15398/

@@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should EdgeRDD be marked @DeveloperAPI? Or can users use it directly? This is technically a binary-compat breaking change (though it doesn't affect source compat).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW you could avoid the breakage by having separate 2-arg and 3-arg constructors but if this is an internal API it's fine to leave it. Just wanted to ask whether users call this directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users may manipulate it directly, because it's returned by Graph#edges, but they should never call the constructor. I actually wanted to make this constructor private, but that interfered with Scala specialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, weird. Probably long-term the way to do it might be to create a trait EdgeRDD that users see, and an EdgeRDDImpl that is private[graphx].

@AmplabJenkins
Copy link

Merged build finished. All automated tests passed.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/15402/

@mateiz
Copy link
Contributor

mateiz commented Jun 3, 2014

Hey so one comment, overall this patch has several areas where it introduces default parameters, including some public APIs (e.g. Graph.fromEdgeTuples). This will break binary compatibility but not source compatibility in Scala. Since GraphX is alpha, maybe that's okay, but we need to decide on what granularity of releases we can make such changes.

  • Do we want these changes in 1.0.1? I think it would be bad to break binary compatibility even for an alpha component in a maintenance release.
  • In a future revamp of GraphX, we should consider switching away from default values if we expect to add more optional parameters.
  • If there's a way you can do that in this patch without harming the API too much we might consider doing that, though I'm not sure that's the case here. (One way would be to have people do Graph.fromEdgeTuples and then call .withStorageLevels on the result).

@ankurdave
Copy link
Contributor Author

I think it's not essential to get this into 1.0.1 since that'll be a bugfix release, but I agree about default parameters. In a future PR, or maybe even this one, I can remove the default values.

Unfortunately it won't work to use the builder pattern for this, because Graph.fromEdgeTuples calls cache() and therefore needs the storage levels immediately.

@rxin
Copy link
Contributor

rxin commented Jun 3, 2014

Thanks. I am merging this in master.

@asfgit asfgit closed this in b1feb60 Jun 3, 2014
ankurdave added a commit to ankurdave/spark that referenced this pull request Jun 4, 2014
asfgit pushed a commit that referenced this pull request Jun 4, 2014
Author: Ankur Dave <ankurdave@gmail.com>

Closes #970 from ankurdave/SPARK-1991_docfix and squashes the following commits:

6d07343 [Ankur Dave] Minor: Fix documentation error from #946
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed.

The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels.

In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods.

I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed.

Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#946 from ankurdave/SPARK-1991 and squashes the following commits:

ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString
ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores
c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks"
34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks
6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
pdeyhim pushed a commit to pdeyhim/spark-1 that referenced this pull request Jun 25, 2014
Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#970 from ankurdave/SPARK-1991_docfix and squashes the following commits:

6d07343 [Ankur Dave] Minor: Fix documentation error from apache#946
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed.

The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels.

In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods.

I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed.

Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#946 from ankurdave/SPARK-1991 and squashes the following commits:

ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString
ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores
c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks"
34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks
6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
xiliu82 pushed a commit to xiliu82/spark that referenced this pull request Sep 4, 2014
Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#970 from ankurdave/SPARK-1991_docfix and squashes the following commits:

6d07343 [Ankur Dave] Minor: Fix documentation error from apache#946
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants