Skip to content

Commit

Permalink
[SPARK-1991] Support custom storage levels for vertices and edges
Browse files Browse the repository at this point in the history
  • Loading branch information
ankurdave committed Jun 2, 2014
1 parent d17d221 commit 6fdd137
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 97 deletions.
67 changes: 64 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.graphx.impl.EdgePartitionBuilder

/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
Expand All @@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

partitionsRDD.setName("EdgeRDD")
Expand All @@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

/**
* Persists the edge partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
Expand All @@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this
}

/** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
Expand Down Expand Up @@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
partitionsRDD_ : RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
new EdgeRDD(partitionsRDD_, targetStorageLevel)
}

/**
* Changes the target storage level while preserving all other properties of the
* EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel_ : StorageLevel): EdgeRDD[ED, VD] = {
new EdgeRDD(partitionsRDD, targetStorageLevel_)
}

}

object EdgeRDD {
/**
* Creates an EdgeRDD from a set of edges.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
EdgeRDD.fromEdgePartitions(edgePartitions)
}

/**
* Creates an EdgeRDD from already-constructed edge partitions.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
new EdgeRDD(edgePartitions)
}
}
34 changes: 24 additions & 10 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
@transient val triplets: RDD[EdgeTriplet[VD, ED]]

/**
* Caches the vertices and edges associated with this graph at the specified storage level.
* Caches the vertices and edges associated with this graph at the specified storage level,
* ignoring any target storage levels previously set.
*
* @param newLevel the level at which to cache the graph.
*
Expand All @@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

/**
* Caches the vertices and edges associated with this graph. This is used to
* pin a graph in memory enabling multiple queries to reuse the same
* construction process.
* Caches the vertices and edges associated with this graph at the previously-specified target
* storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
* multiple queries to reuse the same construction process.
*/
def cache(): Graph[VD, ED]

Expand Down Expand Up @@ -358,20 +359,25 @@ object Graph {
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src, dst) form
* @param defaultValue the vertex attributes with which to create vertices referenced by the edges
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* `uniqueEdges`, a [[PartitionStrategy]] must be provided.
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue)
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph
Expand All @@ -383,14 +389,18 @@ object Graph {
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
GraphImpl(edges, defaultValue)
defaultValue: VD,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}

/**
Expand All @@ -405,12 +415,16 @@ object Graph {
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr)
defaultVertexAttr: VD = null.asInstanceOf[VD],
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}

/**
Expand Down
12 changes: 9 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.graphx

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}

Expand Down Expand Up @@ -48,12 +49,16 @@ object GraphLoader extends Logging {
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the edge RDD
* @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex
* storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
minEdgePartitions: Int = 1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis
Expand All @@ -78,12 +83,13 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile

}
49 changes: 39 additions & 10 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[ShippableVertexPartition[VD]])
val partitionsRDD: RDD[ShippableVertexPartition[VD]],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

require(partitionsRDD.partitioner.isDefined)
Expand All @@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag](
* VertexRDD will be based on a different index and can no longer be quickly joined with this
* RDD.
*/
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))

override val partitioner = partitionsRDD.partitioner

Expand All @@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag](
}
setName("VertexRDD")

/**
* Persists the vertex partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
Expand All @@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag](
this
}

/** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}

/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
Expand All @@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}


Expand Down Expand Up @@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.diff(otherPart))
}
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}

/**
Expand All @@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.leftJoin(otherPart)(f))
}
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}

/**
Expand Down Expand Up @@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
leftZipJoin(other)(f)
case _ =>
new VertexRDD[VD3](
this.withPartitionsRDD[VD3](
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
Expand All @@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.innerJoin(otherPart)(f))
}
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}

/**
Expand All @@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
innerZipJoin(other)(f)
case _ =>
new VertexRDD(
this.withPartitionsRDD(
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
Expand All @@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag](
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
new VertexRDD[VD2](parts)
this.withPartitionsRDD[VD2](parts)
}

/**
Expand All @@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag](
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
partIter.map(_.withRoutingTable(routingTable))
}
new VertexRDD(vertexPartitions)
this.withPartitionsRDD(vertexPartitions)
}

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[VD2: ClassTag](
partitionsRDD_ : RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
new VertexRDD(partitionsRDD_, targetStorageLevel)
}

/**
* Changes the target storage level while preserving all other properties of the
* VertexRDD. Operations on the returned VertexRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel_ : StorageLevel): VertexRDD[VD] = {
new VertexRDD(partitionsRDD, targetStorageLevel_)
}

/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
Expand Down
Loading

0 comments on commit 6fdd137

Please sign in to comment.