Skip to content

Commit

Permalink
[SPARK-1991] Support custom storage levels for vertices and edges
Browse files Browse the repository at this point in the history
This PR adds support for specifying custom storage levels for the vertices and edges of a graph. This enables GraphX to handle graphs larger than memory size by specifying MEMORY_AND_DISK and then repartitioning the graph to use many small partitions, each of which does fit in memory. Spark will then automatically load partitions from disk as needed.

The user specifies the desired vertex and edge storage levels when building the graph by passing them to the graph constructor. These are then stored in the `targetStorageLevel` attribute of the VertexRDD and EdgeRDD respectively. Whenever GraphX needs to cache a VertexRDD or EdgeRDD (because it plans to use it more than once, for example), it uses the specified target storage level. Also, when the user calls `Graph#cache()`, the vertices and edges are persisted using their target storage levels.

In order to facilitate propagating the target storage levels across VertexRDD and EdgeRDD operations, we remove raw calls to the constructors and instead introduce the `withPartitionsRDD` and `withTargetStorageLevel` methods.

I tested this change by running PageRank and triangle count on a severely memory-constrained cluster (1 executor with 300 MB of memory, and a 1 GB graph). Before this PR, these algorithms used to fail with OutOfMemoryErrors. With this PR, and using the DISK_ONLY storage level, they succeed.

Author: Ankur Dave <ankurdave@gmail.com>

Closes apache#946 from ankurdave/SPARK-1991 and squashes the following commits:

ce17d95 [Ankur Dave] Move pickStorageLevel to StorageLevel.fromString
ccaf06f [Ankur Dave] Shadow members in withXYZ() methods rather than using underscores
c34abc0 [Ankur Dave] Exclude all of GraphX from compatibility checks vs. 1.0.0
c5ca068 [Ankur Dave] Revert "Exclude all of GraphX from binary compatibility checks"
34bcefb [Ankur Dave] Exclude all of GraphX from binary compatibility checks
6fdd137 [Ankur Dave] [SPARK-1991] Support custom storage levels for vertices and edges
  • Loading branch information
ankurdave authored and conviva-zz committed Sep 4, 2014
1 parent 4cc9bea commit 73e4dba
Show file tree
Hide file tree
Showing 8 changed files with 229 additions and 97 deletions.
21 changes: 21 additions & 0 deletions core/src/main/scala/org/apache/spark/storage/StorageLevel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,27 @@ object StorageLevel {
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
val OFF_HEAP = new StorageLevel(false, false, true, false)

/**
* :: DeveloperApi ::
* Return the StorageLevel object with the specified name.
*/
@DeveloperApi
def fromString(s: String): StorageLevel = s match {
case "NONE" => NONE
case "DISK_ONLY" => DISK_ONLY
case "DISK_ONLY_2" => DISK_ONLY_2
case "MEMORY_ONLY" => MEMORY_ONLY
case "MEMORY_ONLY_2" => MEMORY_ONLY_2
case "MEMORY_ONLY_SER" => MEMORY_ONLY_SER
case "MEMORY_ONLY_SER_2" => MEMORY_ONLY_SER_2
case "MEMORY_AND_DISK" => MEMORY_AND_DISK
case "MEMORY_AND_DISK_2" => MEMORY_AND_DISK_2
case "MEMORY_AND_DISK_SER" => MEMORY_AND_DISK_SER
case "MEMORY_AND_DISK_SER_2" => MEMORY_AND_DISK_SER_2
case "OFF_HEAP" => OFF_HEAP
case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + s)
}

/**
* :: DeveloperApi ::
* Create a new StorageLevel object without setting useOffHeap.
Expand Down
67 changes: 64 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.graphx.impl.EdgePartitionBuilder

/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
Expand All @@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

partitionsRDD.setName("EdgeRDD")
Expand All @@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

/**
* Persists the edge partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
Expand All @@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this
}

/** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
Expand Down Expand Up @@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
new EdgeRDD(partitionsRDD, this.targetStorageLevel)
}

/**
* Changes the target storage level while preserving all other properties of the
* EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
new EdgeRDD(this.partitionsRDD, targetStorageLevel)
}

}

object EdgeRDD {
/**
* Creates an EdgeRDD from a set of edges.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
EdgeRDD.fromEdgePartitions(edgePartitions)
}

/**
* Creates an EdgeRDD from already-constructed edge partitions.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
new EdgeRDD(edgePartitions)
}
}
34 changes: 24 additions & 10 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
@transient val triplets: RDD[EdgeTriplet[VD, ED]]

/**
* Caches the vertices and edges associated with this graph at the specified storage level.
* Caches the vertices and edges associated with this graph at the specified storage level,
* ignoring any target storage levels previously set.
*
* @param newLevel the level at which to cache the graph.
*
Expand All @@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

/**
* Caches the vertices and edges associated with this graph. This is used to
* pin a graph in memory enabling multiple queries to reuse the same
* construction process.
* Caches the vertices and edges associated with this graph at the previously-specified target
* storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
* multiple queries to reuse the same construction process.
*/
def cache(): Graph[VD, ED]

Expand Down Expand Up @@ -358,20 +359,25 @@ object Graph {
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src, dst) form
* @param defaultValue the vertex attributes with which to create vertices referenced by the edges
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* `uniqueEdges`, a [[PartitionStrategy]] must be provided.
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue)
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph
Expand All @@ -383,14 +389,18 @@ object Graph {
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
GraphImpl(edges, defaultValue)
defaultValue: VD,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}

/**
Expand All @@ -405,12 +415,16 @@ object Graph {
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr)
defaultVertexAttr: VD = null.asInstanceOf[VD],
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}

/**
Expand Down
12 changes: 9 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.graphx

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}

Expand Down Expand Up @@ -48,12 +49,16 @@ object GraphLoader extends Logging {
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the edge RDD
* @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex
* storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
minEdgePartitions: Int = 1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis
Expand All @@ -78,12 +83,13 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile

}
Loading

0 comments on commit 73e4dba

Please sign in to comment.