From 6fdd137c01ea5ce789131681154191fc642901c3 Mon Sep 17 00:00:00 2001 From: Ankur Dave Date: Mon, 2 Jun 2014 16:54:01 -0700 Subject: [PATCH] [SPARK-1991] Support custom storage levels for vertices and edges --- .../org/apache/spark/graphx/EdgeRDD.scala | 67 +++++++++++- .../scala/org/apache/spark/graphx/Graph.scala | 34 ++++-- .../org/apache/spark/graphx/GraphLoader.scala | 12 ++- .../org/apache/spark/graphx/VertexRDD.scala | 49 +++++++-- .../apache/spark/graphx/impl/GraphImpl.scala | 55 +++++----- .../graphx/impl/ReplicatedVertexView.scala | 6 +- .../apache/spark/graphx/lib/Analytics.scala | 101 ++++++++++-------- 7 files changed, 227 insertions(+), 97 deletions(-) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index a8fc095072512..2a9c3f72957b1 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -24,6 +24,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx.impl.EdgePartition +import org.apache.spark.graphx.impl.EdgePartitionBuilder /** * `EdgeRDD[ED, VD]` extends `RDD[Edge[ED]]` by storing the edges in columnar format on each @@ -32,7 +33,8 @@ import org.apache.spark.graphx.impl.EdgePartition * `impl.ReplicatedVertexView`. */ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( - val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])]) + val partitionsRDD: RDD[(PartitionID, EdgePartition[ED, VD])], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[Edge[ED]](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { partitionsRDD.setName("EdgeRDD") @@ -58,6 +60,10 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() + /** + * Persists the edge partitions at the specified storage level, ignoring any existing target + * storage level. + */ override def persist(newLevel: StorageLevel): this.type = { partitionsRDD.persist(newLevel) this @@ -68,9 +74,15 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( this } + /** Persists the vertex partitions using `targetStorageLevel`, which defaults to MEMORY_ONLY. */ + override def cache(): this.type = { + partitionsRDD.persist(targetStorageLevel) + this + } + private[graphx] def mapEdgePartitions[ED2: ClassTag, VD2: ClassTag]( f: (PartitionID, EdgePartition[ED, VD]) => EdgePartition[ED2, VD2]): EdgeRDD[ED2, VD2] = { - new EdgeRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => + this.withPartitionsRDD[ED2, VD2](partitionsRDD.mapPartitions({ iter => if (iter.hasNext) { val (pid, ep) = iter.next() Iterator(Tuple2(pid, f(pid, ep))) @@ -118,11 +130,60 @@ class EdgeRDD[@specialized ED: ClassTag, VD: ClassTag]( (f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3, VD] = { val ed2Tag = classTag[ED2] val ed3Tag = classTag[ED3] - new EdgeRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { + this.withPartitionsRDD[ED3, VD](partitionsRDD.zipPartitions(other.partitionsRDD, true) { (thisIter, otherIter) => val (pid, thisEPart) = thisIter.next() val (_, otherEPart) = otherIter.next() Iterator(Tuple2(pid, thisEPart.innerJoin(otherEPart)(f)(ed2Tag, ed3Tag))) }) } + + /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */ + private[graphx] def withPartitionsRDD[ED2: ClassTag, VD2: ClassTag]( + partitionsRDD_ : RDD[(PartitionID, EdgePartition[ED2, VD2])]): EdgeRDD[ED2, VD2] = { + new EdgeRDD(partitionsRDD_, targetStorageLevel) + } + + /** + * Changes the target storage level while preserving all other properties of the + * EdgeRDD. Operations on the returned EdgeRDD will preserve this storage level. + * + * This does not actually trigger a cache; to do this, call + * [[org.apache.spark.graphx.EdgeRDD#cache]] on the returned EdgeRDD. + */ + private[graphx] def withTargetStorageLevel( + targetStorageLevel_ : StorageLevel): EdgeRDD[ED, VD] = { + new EdgeRDD(partitionsRDD, targetStorageLevel_) + } + +} + +object EdgeRDD { + /** + * Creates an EdgeRDD from a set of edges. + * + * @tparam ED the edge attribute type + * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD + */ + def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = { + val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => + val builder = new EdgePartitionBuilder[ED, VD] + iter.foreach { e => + builder.add(e.srcId, e.dstId, e.attr) + } + Iterator((pid, builder.toEdgePartition)) + } + EdgeRDD.fromEdgePartitions(edgePartitions) + } + + /** + * Creates an EdgeRDD from already-constructed edge partitions. + * + * @tparam ED the edge attribute type + * @tparam VD the type of the vertex attributes that may be joined with the returned EdgeRDD + */ + def fromEdgePartitions[ED: ClassTag, VD: ClassTag]( + edgePartitions: RDD[(Int, EdgePartition[ED, VD])]): EdgeRDD[ED, VD] = { + new EdgeRDD(edgePartitions) + } } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala index dc5dac4fdad57..c4f9d6514cae3 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/Graph.scala @@ -80,7 +80,8 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab @transient val triplets: RDD[EdgeTriplet[VD, ED]] /** - * Caches the vertices and edges associated with this graph at the specified storage level. + * Caches the vertices and edges associated with this graph at the specified storage level, + * ignoring any target storage levels previously set. * * @param newLevel the level at which to cache the graph. * @@ -89,9 +90,9 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] /** - * Caches the vertices and edges associated with this graph. This is used to - * pin a graph in memory enabling multiple queries to reuse the same - * construction process. + * Caches the vertices and edges associated with this graph at the previously-specified target + * storage levels, which default to `MEMORY_ONLY`. This is used to pin a graph in memory enabling + * multiple queries to reuse the same construction process. */ def cache(): Graph[VD, ED] @@ -358,9 +359,12 @@ object Graph { * Construct a graph from a collection of edges encoded as vertex id pairs. * * @param rawEdges a collection of edges in (src, dst) form + * @param defaultValue the vertex attributes with which to create vertices referenced by the edges * @param uniqueEdges if multiple identical edges are found they are combined and the edge * attribute is set to the sum. Otherwise duplicate edges are treated as separate. To enable * `uniqueEdges`, a [[PartitionStrategy]] must be provided. + * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary + * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary * * @return a graph with edge attributes containing either the count of duplicate edges or 1 * (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex. @@ -368,10 +372,12 @@ object Graph { def fromEdgeTuples[VD: ClassTag]( rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, - uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] = + uniqueEdges: Option[PartitionStrategy] = None, + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, Int] = { val edges = rawEdges.map(p => Edge(p._1, p._2, 1)) - val graph = GraphImpl(edges, defaultValue) + val graph = GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel) uniqueEdges match { case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b) case None => graph @@ -383,14 +389,18 @@ object Graph { * * @param edges the RDD containing the set of edges in the graph * @param defaultValue the default vertex attribute to use for each vertex + * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary + * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary * * @return a graph with edge attributes described by `edges` and vertices * given by all vertices in `edges` with value `defaultValue` */ def fromEdges[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], - defaultValue: VD): Graph[VD, ED] = { - GraphImpl(edges, defaultValue) + defaultValue: VD, + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = { + GraphImpl(edges, defaultValue, edgeStorageLevel, vertexStorageLevel) } /** @@ -405,12 +415,16 @@ object Graph { * @param edges the collection of edges in the graph * @param defaultVertexAttr the default vertex attribute to use for vertices that are * mentioned in edges but not in vertices + * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary + * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary */ def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD = null.asInstanceOf[VD]): Graph[VD, ED] = { - GraphImpl(vertices, edges, defaultVertexAttr) + defaultVertexAttr: VD = null.asInstanceOf[VD], + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = { + GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel) } /** diff --git a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala index 389490c139848..2e814e34f9ad8 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/GraphLoader.scala @@ -17,6 +17,7 @@ package org.apache.spark.graphx +import org.apache.spark.storage.StorageLevel import org.apache.spark.{Logging, SparkContext} import org.apache.spark.graphx.impl.{EdgePartitionBuilder, GraphImpl} @@ -48,12 +49,16 @@ object GraphLoader extends Logging { * @param canonicalOrientation whether to orient edges in the positive * direction * @param minEdgePartitions the number of partitions for the edge RDD + * @param edgeStorageLevel the desired storage level for the edge partitions. To set the vertex + * storage level, call [[org.apache.spark.graphx.Graph#persistVertices]]. */ def edgeListFile( sc: SparkContext, path: String, canonicalOrientation: Boolean = false, - minEdgePartitions: Int = 1) + minEdgePartitions: Int = 1, + edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY, + vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) : Graph[Int, Int] = { val startTime = System.currentTimeMillis @@ -78,12 +83,13 @@ object GraphLoader extends Logging { } } Iterator((pid, builder.toEdgePartition)) - }.cache().setName("GraphLoader.edgeListFile - edges (%s)".format(path)) + }.persist(edgeStorageLevel).setName("GraphLoader.edgeListFile - edges (%s)".format(path)) edges.count() logInfo("It took %d ms to load the edges".format(System.currentTimeMillis - startTime)) - GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1) + GraphImpl.fromEdgePartitions(edges, defaultVertexAttr = 1, edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel) } // end of edgeListFile } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index 8b910fbc5a423..485d3e76d9e42 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -56,7 +56,8 @@ import org.apache.spark.graphx.impl.VertexRDDFunctions._ * @tparam VD the vertex attribute associated with each vertex in the set. */ class VertexRDD[@specialized VD: ClassTag]( - val partitionsRDD: RDD[ShippableVertexPartition[VD]]) + val partitionsRDD: RDD[ShippableVertexPartition[VD]], + val targetStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY) extends RDD[(VertexId, VD)](partitionsRDD.context, List(new OneToOneDependency(partitionsRDD))) { require(partitionsRDD.partitioner.isDefined) @@ -66,7 +67,7 @@ class VertexRDD[@specialized VD: ClassTag]( * VertexRDD will be based on a different index and can no longer be quickly joined with this * RDD. */ - def reindex(): VertexRDD[VD] = new VertexRDD(partitionsRDD.map(_.reindex())) + def reindex(): VertexRDD[VD] = this.withPartitionsRDD(partitionsRDD.map(_.reindex())) override val partitioner = partitionsRDD.partitioner @@ -85,6 +86,10 @@ class VertexRDD[@specialized VD: ClassTag]( } setName("VertexRDD") + /** + * Persists the vertex partitions at the specified storage level, ignoring any existing target + * storage level. + */ override def persist(newLevel: StorageLevel): this.type = { partitionsRDD.persist(newLevel) this @@ -95,6 +100,12 @@ class VertexRDD[@specialized VD: ClassTag]( this } + /** Persists the vertex partitions at `targetStorageLevel`, which defaults to MEMORY_ONLY. */ + override def cache(): this.type = { + partitionsRDD.persist(targetStorageLevel) + this + } + /** The number of vertices in the RDD. */ override def count(): Long = { partitionsRDD.map(_.size).reduce(_ + _) @@ -114,7 +125,7 @@ class VertexRDD[@specialized VD: ClassTag]( f: ShippableVertexPartition[VD] => ShippableVertexPartition[VD2]) : VertexRDD[VD2] = { val newPartitionsRDD = partitionsRDD.mapPartitions(_.map(f), preservesPartitioning = true) - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } @@ -165,7 +176,7 @@ class VertexRDD[@specialized VD: ClassTag]( val otherPart = otherIter.next() Iterator(thisPart.diff(otherPart)) } - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } /** @@ -191,7 +202,7 @@ class VertexRDD[@specialized VD: ClassTag]( val otherPart = otherIter.next() Iterator(thisPart.leftJoin(otherPart)(f)) } - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } /** @@ -220,7 +231,7 @@ class VertexRDD[@specialized VD: ClassTag]( case other: VertexRDD[_] => leftZipJoin(other)(f) case _ => - new VertexRDD[VD3]( + this.withPartitionsRDD[VD3]( partitionsRDD.zipPartitions( other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { (partIter, msgs) => partIter.map(_.leftJoin(msgs)(f)) @@ -242,7 +253,7 @@ class VertexRDD[@specialized VD: ClassTag]( val otherPart = otherIter.next() Iterator(thisPart.innerJoin(otherPart)(f)) } - new VertexRDD(newPartitionsRDD) + this.withPartitionsRDD(newPartitionsRDD) } /** @@ -264,7 +275,7 @@ class VertexRDD[@specialized VD: ClassTag]( case other: VertexRDD[_] => innerZipJoin(other)(f) case _ => - new VertexRDD( + this.withPartitionsRDD( partitionsRDD.zipPartitions( other.copartitionWithVertices(this.partitioner.get), preservesPartitioning = true) { (partIter, msgs) => partIter.map(_.innerJoin(msgs)(f)) @@ -290,7 +301,7 @@ class VertexRDD[@specialized VD: ClassTag]( val parts = partitionsRDD.zipPartitions(shuffled, true) { (thisIter, msgIter) => thisIter.map(_.aggregateUsingIndex(msgIter, reduceFunc)) } - new VertexRDD[VD2](parts) + this.withPartitionsRDD[VD2](parts) } /** @@ -309,7 +320,25 @@ class VertexRDD[@specialized VD: ClassTag]( if (routingTableIter.hasNext) routingTableIter.next() else RoutingTablePartition.empty partIter.map(_.withRoutingTable(routingTable)) } - new VertexRDD(vertexPartitions) + this.withPartitionsRDD(vertexPartitions) + } + + /** Replaces the vertex partitions while preserving all other properties of the VertexRDD. */ + private[graphx] def withPartitionsRDD[VD2: ClassTag]( + partitionsRDD_ : RDD[ShippableVertexPartition[VD2]]): VertexRDD[VD2] = { + new VertexRDD(partitionsRDD_, targetStorageLevel) + } + + /** + * Changes the target storage level while preserving all other properties of the + * VertexRDD. Operations on the returned VertexRDD will preserve this storage level. + * + * This does not actually trigger a cache; to do this, call + * [[org.apache.spark.graphx.VertexRDD#cache]] on the returned VertexRDD. + */ + private[graphx] def withTargetStorageLevel( + targetStorageLevel_ : StorageLevel): VertexRDD[VD] = { + new VertexRDD(partitionsRDD, targetStorageLevel_) } /** Generates an RDD of vertex attributes suitable for shipping to the edge partitions. */ diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala index 1649b244d2881..59d9a8808e56e 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/GraphImpl.scala @@ -61,7 +61,11 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( this } - override def cache(): Graph[VD, ED] = persist(StorageLevel.MEMORY_ONLY) + override def cache(): Graph[VD, ED] = { + vertices.cache() + replicatedVertexView.edges.cache() + this + } override def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] = { vertices.unpersist(blocking) @@ -70,10 +74,10 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected ( } override def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] = { - val numPartitions = replicatedVertexView.edges.partitions.size + val numPartitions = edges.partitions.size val edTag = classTag[ED] val vdTag = classTag[VD] - val newEdges = new EdgeRDD(replicatedVertexView.edges.map { e => + val newEdges = edges.withPartitionsRDD(edges.map { e => val part: PartitionID = partitionStrategy.getPartition(e.srcId, e.dstId, numPartitions) // Should we be using 3-tuple or an optimized class @@ -256,24 +260,33 @@ object GraphImpl { /** Create a graph from edges, setting referenced vertices to `defaultVertexAttr`. */ def apply[VD: ClassTag, ED: ClassTag]( edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(createEdgeRDD(edges), defaultVertexAttr) + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + fromEdgeRDD(EdgeRDD.fromEdges(edges), defaultVertexAttr, edgeStorageLevel, vertexStorageLevel) } /** Create a graph from EdgePartitions, setting referenced vertices to `defaultVertexAttr`. */ def fromEdgePartitions[VD: ClassTag, ED: ClassTag]( edgePartitions: RDD[(PartitionID, EdgePartition[ED, VD])], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - fromEdgeRDD(new EdgeRDD(edgePartitions), defaultVertexAttr) + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + fromEdgeRDD(EdgeRDD.fromEdgePartitions(edgePartitions), defaultVertexAttr, edgeStorageLevel, + vertexStorageLevel) } /** Create a graph from vertices and edges, setting missing vertices to `defaultVertexAttr`. */ def apply[VD: ClassTag, ED: ClassTag]( vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - val edgeRDD = createEdgeRDD(edges)(classTag[ED], classTag[VD]).cache() + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + val edgeRDD = EdgeRDD.fromEdges(edges)(classTag[ED], classTag[VD]) + .withTargetStorageLevel(edgeStorageLevel).cache() val vertexRDD = VertexRDD(vertices, edgeRDD, defaultVertexAttr) + .withTargetStorageLevel(vertexStorageLevel).cache() GraphImpl(vertexRDD, edgeRDD) } @@ -309,23 +322,13 @@ object GraphImpl { */ private def fromEdgeRDD[VD: ClassTag, ED: ClassTag]( edges: EdgeRDD[ED, VD], - defaultVertexAttr: VD): GraphImpl[VD, ED] = { - edges.cache() - val vertices = VertexRDD.fromEdges(edges, edges.partitions.size, defaultVertexAttr) - fromExistingRDDs(vertices, edges) - } - - /** Create an EdgeRDD from a set of edges. */ - private def createEdgeRDD[ED: ClassTag, VD: ClassTag]( - edges: RDD[Edge[ED]]): EdgeRDD[ED, VD] = { - val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) => - val builder = new EdgePartitionBuilder[ED, VD] - iter.foreach { e => - builder.add(e.srcId, e.dstId, e.attr) - } - Iterator((pid, builder.toEdgePartition)) - } - new EdgeRDD(edgePartitions) + defaultVertexAttr: VD, + edgeStorageLevel: StorageLevel, + vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = { + val edgesCached = edges.withTargetStorageLevel(edgeStorageLevel).cache() + val vertices = VertexRDD.fromEdges(edgesCached, edgesCached.partitions.size, defaultVertexAttr) + .withTargetStorageLevel(vertexStorageLevel) + fromExistingRDDs(vertices, edgesCached) } } // end of object GraphImpl diff --git a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala index 3a0bba1b93b41..86b366eb9202b 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/impl/ReplicatedVertexView.scala @@ -69,7 +69,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( .setName("ReplicatedVertexView.upgrade(%s, %s) - shippedVerts %s %s (broadcast)".format( includeSrc, includeDst, shipSrc, shipDst)) .partitionBy(edges.partitioner.get) - val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { + val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { (ePartIter, shippedVertsIter) => ePartIter.map { case (pid, edgePartition) => (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) @@ -91,7 +91,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( .setName("ReplicatedVertexView.withActiveSet - shippedActives (broadcast)") .partitionBy(edges.partitioner.get) - val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedActives) { + val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedActives) { (ePartIter, shippedActivesIter) => ePartIter.map { case (pid, edgePartition) => (pid, edgePartition.withActiveSet(shippedActivesIter.flatMap(_._2.iterator))) @@ -111,7 +111,7 @@ class ReplicatedVertexView[VD: ClassTag, ED: ClassTag]( hasSrcId, hasDstId)) .partitionBy(edges.partitioner.get) - val newEdges = new EdgeRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { + val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) { (ePartIter, shippedVertsIter) => ePartIter.map { case (pid, edgePartition) => (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator))) diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala index 069e042ed94a3..bdfb35b6f4602 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/Analytics.scala @@ -17,7 +17,9 @@ package org.apache.spark.graphx.lib +import scala.collection.mutable import org.apache.spark._ +import org.apache.spark.storage.StorageLevel import org.apache.spark.graphx._ import org.apache.spark.graphx.PartitionStrategy._ @@ -28,18 +30,20 @@ object Analytics extends Logging { def main(args: Array[String]): Unit = { if (args.length < 2) { - System.err.println("Usage: Analytics [other options]") + System.err.println( + "Usage: Analytics --numEPart= [other options]") System.exit(1) } val taskType = args(0) val fname = args(1) - val options = args.drop(2).map { arg => + val optionsList = args.drop(2).map { arg => arg.dropWhile(_ == '-').split('=') match { case Array(opt, v) => (opt -> v) case _ => throw new IllegalArgumentException("Invalid argument: " + arg) } } + val options = mutable.Map(optionsList: _*) def pickPartitioner(v: String): PartitionStrategy = { // TODO: Use reflection rather than listing all the partitioning strategies here. @@ -52,25 +56,48 @@ object Analytics extends Logging { } } + def pickStorageLevel(v: String): StorageLevel = { + // TODO: Find a way to do this without listing all the storage levels + v match { + case "NONE" => StorageLevel.NONE + case "DISK_ONLY" => StorageLevel.DISK_ONLY + case "DISK_ONLY_2" => StorageLevel.DISK_ONLY_2 + case "MEMORY_ONLY" => StorageLevel.MEMORY_ONLY + case "MEMORY_ONLY_2" => StorageLevel.MEMORY_ONLY_2 + case "MEMORY_ONLY_SER" => StorageLevel.MEMORY_ONLY_SER + case "MEMORY_ONLY_SER_2" => StorageLevel.MEMORY_ONLY_SER_2 + case "MEMORY_AND_DISK" => StorageLevel.MEMORY_AND_DISK + case "MEMORY_AND_DISK_2" => StorageLevel.MEMORY_AND_DISK_2 + case "MEMORY_AND_DISK_SER" => StorageLevel.MEMORY_AND_DISK_SER + case "MEMORY_AND_DISK_SER_2" => StorageLevel.MEMORY_AND_DISK_SER_2 + case "OFF_HEAP" => StorageLevel.OFF_HEAP + case _ => throw new IllegalArgumentException("Invalid StorageLevel: " + v) + } + } + val conf = new SparkConf() .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") .set("spark.kryo.registrator", "org.apache.spark.graphx.GraphKryoRegistrator") .set("spark.locality.wait", "100000") + val numEPart = options.remove("numEPart").map(_.toInt).getOrElse { + println("Set the number of edge partitions using --numEPart.") + sys.exit(1) + } + val partitionStrategy: Option[PartitionStrategy] = options.remove("partStrategy") + .map(pickPartitioner(_)) + val edgeStorageLevel = options.remove("edgeStorageLevel") + .map(pickStorageLevel(_)).getOrElse(StorageLevel.MEMORY_ONLY) + val vertexStorageLevel = options.remove("vertexStorageLevel") + .map(pickStorageLevel(_)).getOrElse(StorageLevel.MEMORY_ONLY) + taskType match { case "pagerank" => - var tol: Float = 0.001F - var outFname = "" - var numEPart = 4 - var partitionStrategy: Option[PartitionStrategy] = None - var numIterOpt: Option[Int] = None - - options.foreach{ - case ("tol", v) => tol = v.toFloat - case ("output", v) => outFname = v - case ("numEPart", v) => numEPart = v.toInt - case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) - case ("numIter", v) => numIterOpt = Some(v.toInt) + val tol = options.remove("tol").map(_.toFloat).getOrElse(0.001F) + val outFname = options.remove("output").getOrElse("") + val numIterOpt = options.remove("numIter").map(_.toInt) + + options.foreach { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } @@ -81,7 +108,9 @@ object Analytics extends Logging { val sc = new SparkContext(conf.setAppName("PageRank(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart).cache() + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) println("GRAPHX: Number of vertices " + graph.vertices.count) @@ -102,32 +131,19 @@ object Analytics extends Logging { sc.stop() case "cc" => - var numIter = Int.MaxValue - var numVPart = 4 - var numEPart = 4 - var isDynamic = false - var partitionStrategy: Option[PartitionStrategy] = None - - options.foreach{ - case ("numIter", v) => numIter = v.toInt - case ("dynamic", v) => isDynamic = v.toBoolean - case ("numEPart", v) => numEPart = v.toInt - case ("numVPart", v) => numVPart = v.toInt - case ("partStrategy", v) => partitionStrategy = Some(pickPartitioner(v)) + options.foreach { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } - if (!isDynamic && numIter == Int.MaxValue) { - println("Set number of iterations!") - sys.exit(1) - } println("======================================") println("| Connected Components |") println("======================================") val sc = new SparkContext(conf.setAppName("ConnectedComponents(" + fname + ")")) val unpartitionedGraph = GraphLoader.edgeListFile(sc, fname, - minEdgePartitions = numEPart).cache() + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel).cache() val graph = partitionStrategy.foldLeft(unpartitionedGraph)(_.partitionBy(_)) val cc = ConnectedComponents.run(graph) @@ -135,24 +151,25 @@ object Analytics extends Logging { sc.stop() case "triangles" => - var numEPart = 4 - // TriangleCount requires the graph to be partitioned - var partitionStrategy: PartitionStrategy = RandomVertexCut - - options.foreach{ - case ("numEPart", v) => numEPart = v.toInt - case ("partStrategy", v) => partitionStrategy = pickPartitioner(v) + options.foreach { case (opt, _) => throw new IllegalArgumentException("Invalid option: " + opt) } + println("======================================") println("| Triangle Count |") println("======================================") + val sc = new SparkContext(conf.setAppName("TriangleCount(" + fname + ")")) - val graph = GraphLoader.edgeListFile(sc, fname, canonicalOrientation = true, - minEdgePartitions = numEPart).partitionBy(partitionStrategy).cache() + val graph = GraphLoader.edgeListFile(sc, fname, + canonicalOrientation = true, + minEdgePartitions = numEPart, + edgeStorageLevel = edgeStorageLevel, + vertexStorageLevel = vertexStorageLevel) + // TriangleCount requires the graph to be partitioned + .partitionBy(partitionStrategy.getOrElse(RandomVertexCut)).cache() val triangles = TriangleCount.run(graph) println("Triangles: " + triangles.vertices.map { - case (vid,data) => data.toLong + case (vid, data) => data.toLong }.reduce(_ + _) / 3) sc.stop()