Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-1991] Support custom storage levels for vertices and edges #946

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 64 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.storage.StorageLevel

import org.apache.spark.graphx.impl.EdgePartition
import org.apache.spark.graphx.impl.EdgePartitionBuilder

/**
* `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each
Expand All @@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition
* `impl.ReplicatedVertexView`.
*/
class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])])
val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should EdgeRDD be marked @DeveloperAPI? Or can users use it directly? This is technically a binary-compat breaking change (though it doesn't affect source compat).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW you could avoid the breakage by having separate 2-arg and 3-arg constructors but if this is an internal API it's fine to leave it. Just wanted to ask whether users call this directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users may manipulate it directly, because it's returned by Graph#edges, but they should never call the constructor. I actually wanted to make this constructor private, but that interfered with Scala specialization.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, weird. Probably long-term the way to do it might be to create a trait EdgeRDD that users see, and an EdgeRDDImpl that is private[graphx].


partitionsRDD.setName("EdgeRDD")
Expand All @@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](

override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect()

/**
* Persists the edge partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
Expand All @@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
this
}

/** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}

private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag](
f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = {
new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter =>
if (iter.hasNext) {
val (pid, ep) = iter.next()
Iterator(Tuple2(pid, f(pid, ep)))
Expand Down Expand Up @@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag](
(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = {
val ed2Tag = classTag[ED2]
val ed3Tag = classTag[ED3]
new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) {
(thisIter, otherIter) =>
val (pid, thisEPart) = thisIter.next()
val (_, otherEPart) = otherIter.next()
Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag)))
})
}

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag](
partitionsRDD: RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = {
new EdgeRDD(partitionsRDD, this.targetStorageLevel)
}

/**
* Changes the target storage level while preserving all other properties of the
* EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): EdgeRDD[ED, VD] = {
new EdgeRDD(this.partitionsRDD, targetStorageLevel)
}

}

object EdgeRDD {
/**
* Creates an EdgeRDD from a set of edges.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = {
val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
val builder = new EdgePartitionBuilder[ED, VD]
iter.foreach { e =>
builder.add(e.srcId, e.dstId, e.attr)
}
Iterator((pid, builder.toEdgePartition))
}
EdgeRDD.fromEdgePartitions(edgePartitions)
}

/**
* Creates an EdgeRDD from already-constructed edge partitions.
*
* @tparam ED the edge attribute type
* @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD
*/
def fromEdgePartitions[ED: ClassTag, VD: ClassTag](
edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = {
new EdgeRDD(edgePartitions)
}
}
34 changes: 24 additions & 10 deletions graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
@transient val triplets: RDD[EdgeTriplet[VD, ED]]

/**
* Caches the vertices and edges associated with this graph at the specified storage level.
* Caches the vertices and edges associated with this graph at the specified storage level,
* ignoring any target storage levels previously set.
*
* @param newLevel the level at which to cache the graph.
*
Expand All @@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab
def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED]

/**
* Caches the vertices and edges associated with this graph. This is used to
* pin a graph in memory enabling multiple queries to reuse the same
* construction process.
* Caches the vertices and edges associated with this graph at the previously-specified target
* storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling
* multiple queries to reuse the same construction process.
*/
def cache(): Graph[VD, ED]

Expand Down Expand Up @@ -358,20 +359,25 @@ object Graph {
* Construct a graph from a collection of edges encoded as vertex id pairs.
*
* @param rawEdges a collection of edges in (src, dst) form
* @param defaultValue the vertex attributes with which to create vertices referenced by the edges
* @param uniqueEdges if multiple identical edges are found they are combined and the edge
* attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable
* `uniqueEdges`, a [[PartitionStrategy]] must be provided.
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
*/
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
defaultValue: VD,
uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] =
uniqueEdges: Option[PartitionStrategy] = None,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] =
{
val edges = rawEdges.map(p => Edge(p._1, p._2, 1))
val graph = GraphImpl(edges, defaultValue)
val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
uniqueEdges match {
case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
case None => graph
Expand All @@ -383,14 +389,18 @@ object Graph {
*
* @param edges the RDD containing the set of edges in the graph
* @param defaultValue the default vertex attribute to use for each vertex
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes described by `edges` and vertices
* given by all vertices in `edges` with value `defaultValue`
*/
def fromEdges[VD: ClassTag, ED: ClassTag](
edges: RDD[Edge[ED]],
defaultValue: VD): Graph[VD, ED] = {
GraphImpl(edges, defaultValue)
defaultValue: VD,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel)
}

/**
Expand All @@ -405,12 +415,16 @@ object Graph {
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr)
defaultVertexAttr: VD = null.asInstanceOf[VD],
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}

/**
Expand Down
12 changes: 9 additions & 3 deletions graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.graphx

import org.apache.spark.storage.StorageLevel
import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl}

Expand Down Expand Up @@ -48,12 +49,16 @@ object GraphLoader extends Logging {
* @param canonicalOrientation whether to orient edges in the positive
* direction
* @param minEdgePartitions the number of partitions for the edge RDD
* @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex
* storage level, call [[org.apache.spark.graphx.Graph#persistVertices]].
*/
def edgeListFile(
sc: SparkContext,
path: String,
canonicalOrientation: Boolean = false,
minEdgePartitions: Int = 1)
minEdgePartitions: Int = 1,
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
: Graph[Int, Int] =
{
val startTime = System.currentTimeMillis
Expand All @@ -78,12 +83,13 @@ object GraphLoader extends Logging {
}
}
Iterator((pid, builder.toEdgePartition))
}.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path))
}.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path))
edges.count()

logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime))

GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1)
GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel,
vertexStorageLevel = vertexStorageLevel)
} // end of edgeListFile

}
49 changes: 39 additions & 10 deletions graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._
* @tparam VD the vertex attribute associated with each vertex in the set.
*/
class VertexRDD[@specialized VD: ClassTag](
val partitionsRDD: RDD[ShippableVertexPartition[VD]])
val partitionsRDD: RDD[ShippableVertexPartition[VD]],
val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY)
extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) {

require(partitionsRDD.partitioner.isDefined)
Expand All @@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag](
* VertexRDD will be based on a different index and can no longer be quickly joined with this
* RDD.
*/
def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex()))
def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex()))

override val partitioner = partitionsRDD.partitioner

Expand All @@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag](
}
setName("VertexRDD")

/**
* Persists the vertex partitions at the specified storage level, ignoring any existing target
* storage level.
*/
override def persist(newLevel: StorageLevel): this.type = {
partitionsRDD.persist(newLevel)
this
Expand All @@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag](
this
}

/** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */
override def cache(): this.type = {
partitionsRDD.persist(targetStorageLevel)
this
}

/** The number of vertices in the RDD. */
override def count(): Long = {
partitionsRDD.map(_.size).reduce(_ + _)
Expand All @@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag](
f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2])
: VertexRDD[VD2] = {
val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true)
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}


Expand Down Expand Up @@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.diff(otherPart))
}
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}

/**
Expand All @@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.leftJoin(otherPart)(f))
}
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}

/**
Expand Down Expand Up @@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
leftZipJoin(other)(f)
case _ =>
new VertexRDD[VD3](
this.withPartitionsRDD[VD3](
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.leftJoin(msgs)(f))
Expand All @@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag](
val otherPart = otherIter.next()
Iterator(thisPart.innerJoin(otherPart)(f))
}
new VertexRDD(newPartitionsRDD)
this.withPartitionsRDD(newPartitionsRDD)
}

/**
Expand All @@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag](
case other: VertexRDD[_] =>
innerZipJoin(other)(f)
case _ =>
new VertexRDD(
this.withPartitionsRDD(
partitionsRDD.zipPartitions(
other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) {
(partIter, msgs) => partIter.map(_.innerJoin(msgs)(f))
Expand All @@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag](
val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) =>
thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc))
}
new VertexRDD[VD2](parts)
this.withPartitionsRDD[VD2](parts)
}

/**
Expand All @@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag](
if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty
partIter.map(_.withRoutingTable(routingTable))
}
new VertexRDD(vertexPartitions)
this.withPartitionsRDD(vertexPartitions)
}

/** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */
private[graphx] def withPartitionsRDD[VD2: ClassTag](
partitionsRDD: RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = {
new VertexRDD(partitionsRDD, this.targetStorageLevel)
}

/**
* Changes the target storage level while preserving all other properties of the
* VertexRDD. Operations on the returned VertexRDD will preserve this storage level.
*
* This does not actually trigger a cache; to do this, call
* [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD.
*/
private[graphx] def withTargetStorageLevel(
targetStorageLevel: StorageLevel): VertexRDD[VD] = {
new VertexRDD(this.partitionsRDD, targetStorageLevel)
}

/** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */
Expand Down
Loading