groupByKey
,
+ Default number of tasks to use across the cluster for distributed shuffle operations (groupByKey
,
reduceByKey
, etc) when not set by user.
BlockManager
might take a performance hit.
+ + +
+ +As a consequence, it is often necessary to be able to move between table and graph views of the same +physical data and to leverage the properties of each view to easily and efficiently express +computation. However, existing graph analytics pipelines must compose graph-parallel and data- +parallel systems, leading to extensive data movement and duplication and a complicated programming +model.GraphX optimizes the representation of `VD` and `ED` when they are plain old data-types (e.g., -> int, double, etc...) reducing the in memory footprint. +> GraphX optimizes the representation of vertex and edge types when they are plain old data-types +> (e.g., int, double, etc...) reducing the in memory footprint by storing them in specialized +> arrays. -In some cases we may wish to have vertices with different property types in the same graph. This can -be accomplished through inheritance. For example to model users and products as a bipartite graph -we might do the following: +In some cases it may be desirable to have vertices with different property types in the same graph. +This can be accomplished through inheritance. For example to model users and products as a +bipartite graph we might do the following: {% highlight scala %} class VertexProperty() @@ -116,9 +132,11 @@ var graph: Graph[VertexProperty, String] = null {% endhighlight %} Like RDDs, property graphs are immutable, distributed, and fault-tolerant. Changes to the values or -structure of the graph are accomplished by producing a new graph with the desired changes. The graph -is partitioned across the workers using a range of vertex-partitioning heuristics. As with RDDs, -each partition of the graph can be recreated on a different machine in the event of a failure. +structure of the graph are accomplished by producing a new graph with the desired changes. Note +that substantial parts of the original graph (i.e., unaffected structure, attributes, and indicies) +are reused in the new graph reducing the cost of this inherently functional data-structure. The +graph is partitioned across the workers using a range of vertex-partitioning heuristics. As with +RDDs, each partition of the graph can be recreated on a different machine in the event of a failure. Logically the property graph corresponds to a pair of typed collections (RDDs) encoding the properties for each vertex and edge. As a consequence, the graph class contains members to access @@ -131,12 +149,12 @@ class Graph[VD, ED] { } {% endhighlight %} -The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexId, +The classes `VertexRDD[VD]` and `EdgeRDD[ED]` extend and are optimized versions of `RDD[(VertexID, VD)]` and `RDD[Edge[ED]]` respectively. Both `VertexRDD[VD]` and `EdgeRDD[ED]` provide additional functionality built around graph computation and leverage internal optimizations. We discuss the `VertexRDD` and `EdgeRDD` API in greater detail in the section on [vertex and edge RDDs](#vertex_and_edge_rdds) but for now they can be thought of as simply RDDs of the form: -`RDD[(VertexId, VD)]` and `RDD[Edge[ED]]`. +`RDD[(VertexID, VD)]` and `RDD[Edge[ED]]`. ### Example Property Graph @@ -168,7 +186,7 @@ code constructs a graph from a collection of RDDs: // Assume the SparkContext has already been constructed val sc: SparkContext // Create an RDD for the vertices -val users: RDD[(VertexID, (String, String))] = +val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges @@ -183,7 +201,7 @@ val graph = Graph(users, relationships, defaultUser) In the above example we make use of the [`Edge`][Edge] case class. Edges have a `srcId` and a `dstId` corresponding to the source and destination vertex identifiers. In addition, the `Edge` -class contains the `attr` member which contains the edge property. +class has an `attr` member which stores the edge property. [Edge]: api/graphx/index.html#org.apache.spark.graphx.Edge @@ -199,7 +217,7 @@ graph.edges.filter(e => e.srcId > e.dstId).count {% endhighlight %} > Note that `graph.vertices` returns an `VertexRDD[(String, String)]` which extends -> `RDD[(VertexId, (String, String))]` and so we use the scala `case` expression to deconstruct the +> `RDD[(VertexID, (String, String))]` and so we use the scala `case` expression to deconstruct the > tuple. On the other hand, `graph.edges` returns an `EdgeRDD` containing `Edge[String]` objects. > We could have also used the case class type constructor as in the following: > {% highlight scala %} @@ -266,6 +284,75 @@ able to support different graph representations in the future. Each graph repre provide implementations of the core operations and reuse many of the useful operations defined in [`GraphOps`][GraphOps]. +### Summary List of Operators +The following is a quick summary of the functionality defined in both [`Graph`][Graph] and +[`GraphOps`][GraphOps] but presented as members of Graph for simplicity. Note that some function +signatures have been simplified (e.g., default arguments and type constraints removed) and some more +advanced functionality has been removed so please consult the API docs for the official list of +operations. + +{% highlight scala %} +/** Summary of the functionality in the property graph */ +class Graph[VD, ED] { + // Information about the Graph =================================================================== + val numEdges: Long + val numVertices: Long + val inDegrees: VertexRDD[Int] + val outDegrees: VertexRDD[Int] + val degrees: VertexRDD[Int] + // Views of the graph as collections ============================================================= + val vertices: VertexRDD[VD] + val edges: EdgeRDD[ED] + val triplets: RDD[EdgeTriplet[VD, ED]] + // Functions for caching graphs ================================================================== + def persist(newLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] + def cache(): Graph[VD, ED] + def unpersistVertices(blocking: Boolean = true): Graph[VD, ED] + // Change the partitioning heuristic ============================================================ + def partitionBy(partitionStrategy: PartitionStrategy): Graph[VD, ED] + // Transform vertex and edge attributes ========================================================== + def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] + def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] + def mapEdges[ED2](map: (PartitionID, Iterator[Edge[ED]]) => Iterator[ED2]): Graph[VD, ED2] + def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] + def mapTriplets[ED2](map: (PartitionID, Iterator[EdgeTriplet[VD, ED]]) => Iterator[ED2]) + : Graph[VD, ED2] + // Modify the graph structure ==================================================================== + def reverse: Graph[VD, ED] + def subgraph( + epred: EdgeTriplet[VD,ED] => Boolean = (x => true), + vpred: (VertexID, VD) => Boolean = ((v, d) => true)) + : Graph[VD, ED] + def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] + def groupEdges(merge: (ED, ED) => ED): Graph[VD, ED] + // Join RDDs with the graph ====================================================================== + def joinVertices[U](table: RDD[(VertexID, U)])(mapFunc: (VertexID, VD, U) => VD): Graph[VD, ED] + def outerJoinVertices[U, VD2](other: RDD[(VertexID, U)]) + (mapFunc: (VertexID, VD, Option[U]) => VD2) + : Graph[VD2, ED] + // Aggregate information about adjacent triplets ================================================= + def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[Array[(VertexID, VD)]] + def mapReduceTriplets[A: ClassTag]( + mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + reduceFunc: (A, A) => A, + activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) + : VertexRDD[A] + // Iterative graph-parallel computation ========================================================== + def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)( + vprog: (VertexID, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)], + mergeMsg: (A, A) => A) + : Graph[VD, ED] + // Basic graph algorithms ======================================================================== + def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double] + def connectedComponents(): Graph[VertexID, ED] + def triangleCount(): Graph[Int, ED] + def stronglyConnectedComponents(numIter: Int): Graph[VertexID, ED] +} +{% endhighlight %} + + ## Property Operators In direct analogy to the RDD `map` operator, the property @@ -273,7 +360,7 @@ graph contains the following: {% highlight scala %} class Graph[VD, ED] { - def mapVertices[VD2](map: (VertexID, VD) => VD2): Graph[VD2, ED] + def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED] def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2] def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2] } @@ -295,7 +382,7 @@ val newGraph = Graph(newVertices, graph.edges) val newGraph = graph.mapVertices((id, attr) => mapUdf(id, attr)) {% endhighlight %} -[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexID,VD)āVD2)(ClassTag[VD2]):Graph[VD2,ED] +[Graph.mapVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapVertices[VD2]((VertexId,VD)āVD2)(ClassTag[VD2]):Graph[VD2,ED] These operators are often used to initialize the graph for a particular computation or project away unnecessary properties. For example, given a graph with the out-degrees as the vertex properties @@ -321,7 +408,7 @@ add more in the future. The following is a list of the basic structural operato class Graph[VD, ED] { def reverse: Graph[VD, ED] def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, - vpred: (VertexID, VD) => Boolean): Graph[VD, ED] + vpred: (VertexId, VD) => Boolean): Graph[VD, ED] def mask[VD2, ED2](other: Graph[VD2, ED2]): Graph[VD, ED] def groupEdges(merge: (ED, ED) => ED): Graph[VD,ED] } @@ -340,11 +427,11 @@ satisfy the edge predicate *and connect vertices that satisfy the vertex predica operator can be used in number of situations to restrict the graph to the vertices and edges of interest or eliminate broken links. For example in the following code we remove broken links: -[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])āBoolean,(VertexID,VD)āBoolean):Graph[VD,ED] +[Graph.subgraph]: api/graphx/index.html#org.apache.spark.graphx.Graph@subgraph((EdgeTriplet[VD,ED])āBoolean,(VertexId,VD)āBoolean):Graph[VD,ED] {% highlight scala %} // Create an RDD for the vertices -val users: RDD[(VertexID, (String, String))] = +val users: RDD[(VertexId, (String, String))] = sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) @@ -407,9 +494,9 @@ using the *join* operators. Below we list the key join operators: {% highlight scala %} class Graph[VD, ED] { - def joinVertices[U](table: RDD[(VertexID, U)])(map: (VertexID, VD, U) => VD) + def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD) : Graph[VD, ED] - def outerJoinVertices[U, VD2](table: RDD[(VertexID, U)])(map: (VertexID, VD, Option[U]) => VD2) + def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2) : Graph[VD2, ED] } {% endhighlight %} @@ -419,13 +506,13 @@ returns a new graph with the vertex properties obtained by applying the user def to the result of the joined vertices. Vertices without a matching value in the RDD retain their original value. -[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexID,U)])((VertexID,VD,U)āVD)(ClassTag[U]):Graph[VD,ED] +[GraphOps.joinVertices]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@joinVertices[U](RDD[(VertexId,U)])((VertexId,VD,U)āVD)(ClassTag[U]):Graph[VD,ED] > Note that if the RDD contains more than one value for a given vertex only one will be used. It > is therefore recommended that the input RDD be first made unique using the following which will > also *pre-index* the resulting values to substantially accelerate the subsequent join. > {% highlight scala %} -val nonUniqueCosts: RDD[(VertexId, Double)] +val nonUniqueCosts: RDD[(VertexID, Double)] val uniqueCosts: VertexRDD[Double] = graph.vertices.aggregateUsingIndex(nonUnique, (a,b) => a + b) val joinedGraph = graph.joinVertices(uniqueCosts)( @@ -438,7 +525,7 @@ property type. Because not all vertices may have a matching value in the input function takes an `Option` type. For example, we can setup a graph for PageRank by initializing vertex properties with their `outDegree`. -[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexID,U)])((VertexID,VD,Option[U])āVD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED] +[Graph.outerJoinVertices]: api/graphx/index.html#org.apache.spark.graphx.Graph@outerJoinVertices[U,VD2](RDD[(VertexId,U)])((VertexId,VD,Option[U])āVD2)(ClassTag[U],ClassTag[VD2]):Graph[VD2,ED] {% highlight scala %} @@ -457,7 +544,7 @@ val degreeGraph = graph.outerJoinVertices(outDegrees) { (id, oldAttr, outDegOpt) > provide type annotation for the user defined function: > {% highlight scala %} val joinedGraph = graph.joinVertices(uniqueCosts, - (id: VertexId, oldCost: Double, extraCost: Double) => oldCost + extraCost) + (id: VertexID, oldCost: Double, extraCost: Double) => oldCost + extraCost) {% endhighlight %} @@ -472,7 +559,7 @@ PageRank Value, shortest path to the source, and smallest reachable vertex id). ### Map Reduce Triplets (mapReduceTriplets) -[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexID,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A] +[Graph.mapReduceTriplets]: api/graphx/index.html#org.apache.spark.graphx.Graph@mapReduceTriplets[A](mapFunc:org.apache.spark.graphx.EdgeTriplet[VD,ED]=>Iterator[(org.apache.spark.graphx.VertexId,A)],reduceFunc:(A,A)=>A,activeSetOpt:Option[(org.apache.spark.graphx.VertexRDD[_],org.apache.spark.graphx.EdgeDirection)])(implicitevidence$10:scala.reflect.ClassTag[A]):org.apache.spark.graphx.VertexRDD[A] The core (heavily optimized) aggregation primitive in GraphX is the [`mapReduceTriplets`][Graph.mapReduceTriplets] operator: @@ -480,7 +567,7 @@ The core (heavily optimized) aggregation primitive in GraphX is the {% highlight scala %} class Graph[VD, ED] { def mapReduceTriplets[A]( - map: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + map: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], reduce: (A, A) => A) : VertexRDD[A] } @@ -495,26 +582,26 @@ containing the aggregate message (of type `A`) destined to each vertex. Vertice receive a message are not included in the returned `VertexRDD`.
-In the following example we use the `mapReduceTriplets` operator to compute the average age of the @@ -547,8 +634,8 @@ val avgAgeOfOlderFollowers: VertexRDD[Double] = avgAgeOfOlderFollowers.collect.foreach(println(_)) {% endhighlight %} -> Note that the `mapReduceTriplets` operation performs optimally when the messages (and their sums) -> are constant sized (e.g., floats and addition instead of lists and concatenation). More +> Note that the `mapReduceTriplets` operation performs optimally when the messages (and the sums of +> messages) are constant sized (e.g., floats and addition instead of lists and concatenation). More > precisely, the result of `mapReduceTriplets` should ideally be sub-linear in the degree of each > vertex. @@ -562,13 +649,13 @@ compute the max in, out, and total degrees: {% highlight scala %} // Define a reduce operation to compute the highest degree vertex -def max(a: (VertexID, Int), b: (VertexID, Int)): (VertexID, Int) = { +def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = { if (a._2 > b._2) a else b } // Compute the max degrees -val maxInDegree: (VertexID, Int) = graph.inDegrees.reduce(max) -val maxOutDegree: (VertexID, Int) = graph.outDegrees.reduce(max) -val maxDegrees: (VertexID, Int) = graph.degrees.reduce(max) +val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max) +val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max) +val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max) {% endhighlight %} ### Collecting Neighbors @@ -578,14 +665,14 @@ attributes at each vertex. This can be easily accomplished using the [`collectNeighborIds`][GraphOps.collectNeighborIds] and the [`collectNeighbors`][GraphOps.collectNeighbors] operators. -[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexID]] -[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexID,VD)]] +[GraphOps.collectNeighborIds]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighborIds(EdgeDirection):VertexRDD[Array[VertexId]] +[GraphOps.collectNeighbors]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@collectNeighbors(EdgeDirection):VertexRDD[Array[(VertexId,VD)]] {% highlight scala %} class GraphOps[VD, ED] { - def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexID]] - def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexID, VD)] ] + def collectNeighborIds(edgeDirection: EdgeDirection): VertexRDD[Array[VertexId]] + def collectNeighbors(edgeDirection: EdgeDirection): VertexRDD[ Array[(VertexId, VD)] ] } {% endhighlight %} @@ -593,11 +680,20 @@ class GraphOps[VD, ED] { > substantial communication. If possible try expressing the same computation using the > `mapReduceTriplets` operator directly. +## Caching and Uncaching + +In Spark, RDDs are not persisted in memory by default. To avoid recomputation, they must be explicitly cached when using them multiple times (see the [Spark Programming Guide][RDD Persistence]). Graphs in GraphX behave the same way. **When using a graph multiple times, make sure to call [`Graph.cache()`][Graph.cache] on it first.** + +[RDD Persistence]: scala-programming-guide.html#rdd-persistence +[Graph.cache]: api/graphx/index.html#org.apache.spark.graphx.Graph@cache():Graph[VD,ED] + +In iterative computations, *uncaching* may also be necessary for best performance. By default, cached RDDs and graphs will remain in memory until memory pressure forces them to be evicted in LRU order. For iterative computation, intermediate results from previous iterations will fill up the cache. Though they will eventually be evicted, the unnecessary data stored in memory will slow down garbage collection. It would be more efficient to uncache intermediate results as soon as they are no longer necessary. This involves materializing (caching and forcing) a graph or RDD every iteration, uncaching all other datasets, and only using the materialized dataset in future iterations. However, because graphs are composed of multiple RDDs, it can be difficult to unpersist them correctly. **For iterative computation we recommend using the Pregel API, which correctly unpersists intermediate results.** + # Pregel API Graphs are inherently recursive data-structures as properties of vertices depend on properties of -their neighbors which intern depend on properties of *their* neighbors. As a +their neighbors which in turn depend on properties of *their* neighbors. As a consequence many important graph algorithms iteratively recompute the properties of each vertex until a fixed-point condition is reached. A range of graph-parallel abstractions have been proposed to express these iterative algorithms. GraphX exposes a Pregel-like operator which is a fusion of @@ -620,7 +716,7 @@ messages remaining. The following is the type signature of the [Pregel operator][GraphOps.pregel] as well as a *sketch* of its implementation (note calls to graph.cache have been removed): -[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexID,VD,A)āVD,(EdgeTriplet[VD,ED])āIterator[(VertexID,A)],(A,A)āA)(ClassTag[A]):Graph[VD,ED] +[GraphOps.pregel]: api/graphx/index.html#org.apache.spark.graphx.GraphOps@pregel[A](A,Int,EdgeDirection)((VertexId,VD,A)āVD,(EdgeTriplet[VD,ED])āIterator[(VertexId,A)],(A,A)āA)(ClassTag[A]):Graph[VD,ED] {% highlight scala %} class GraphOps[VD, ED] { @@ -628,8 +724,8 @@ class GraphOps[VD, ED] { (initialMsg: A, maxIter: Int = Int.MaxValue, activeDir: EdgeDirection = EdgeDirection.Out) - (vprog: (VertexID, VD, A) => VD, - sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID, A)], + (vprog: (VertexId, VD, A) => VD, + sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] = { // Receive the initial message at each vertex @@ -674,7 +770,7 @@ import org.apache.spark.graphx.util.GraphGenerators // A graph with edge attributes containing distances val graph: Graph[Int, Double] = GraphGenerators.logNormalGraph(sc, numVertices = 100).mapEdges(e => e.attr.toDouble) -val sourceId: VertexID = 42 // The ultimate source +val sourceId: VertexId = 42 // The ultimate source // Initialize the graph such that all vertices except the root have distance infinity. val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) val sssp = initialGraph.pregel(Double.PositiveInfinity)( @@ -721,7 +817,7 @@ It creates a `Graph` from the specified edges, automatically creating any vertic {% highlight scala %} object Graph { def apply[VD, ED]( - vertices: RDD[(VertexID, VD)], + vertices: RDD[(VertexId, VD)], edges: RDD[Edge[ED]], defaultVertexAttr: VD = null) : Graph[VD, ED] @@ -731,7 +827,7 @@ object Graph { defaultValue: VD): Graph[VD, ED] def fromEdgeTuples[VD]( - rawEdges: RDD[(VertexID, VertexID)], + rawEdges: RDD[(VertexId, VertexId)], defaultValue: VD, uniqueEdges: Option[PartitionStrategy] = None): Graph[VD, Int] @@ -747,8 +843,8 @@ object Graph { [PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy$ [GraphLoader.edgeListFile]: api/graphx/index.html#org.apache.spark.graphx.GraphLoader$@edgeListFile(SparkContext,String,Boolean,Int):Graph[Int,Int] -[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexID,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] -[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexID,VertexID)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int] +[Graph.apply]: api/graphx/index.html#org.apache.spark.graphx.Graph$@apply[VD,ED](RDD[(VertexId,VD)],RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] +[Graph.fromEdgeTuples]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdgeTuples[VD](RDD[(VertexId,VertexId)],VD,Option[PartitionStrategy])(ClassTag[VD]):Graph[VD,Int] [Graph.fromEdges]: api/graphx/index.html#org.apache.spark.graphx.Graph$@fromEdges[VD,ED](RDD[Edge[ED]],VD)(ClassTag[VD],ClassTag[ED]):Graph[VD,ED] # Vertex and Edge RDDs @@ -761,47 +857,46 @@ respectively. In this section we review some of the additional useful functiona ## VertexRDDs -The `VertexRDD[A]` extends the more traditional `RDD[(VertexId, A)]` but adds the additional -constraint that each `VertexId` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of -vertices each with an attribute of type `A`. Internally, this is achieved by storing the vertex -attributes in a reusable hash-map data-structure. As a consequence if two `VertexRDD`s are derived -from the same base `VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant -time without hash evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the -following additional functionality: +The `VertexRDD[A]` extends `RDD[(VertexID, A)]` and adds the additional constraint that each +`VertexID` occurs only *once*. Moreover, `VertexRDD[A]` represents a *set* of vertices each with an +attribute of type `A`. Internally, this is achieved by storing the vertex attributes in a reusable +hash-map data-structure. As a consequence if two `VertexRDD`s are derived from the same base +`VertexRDD` (e.g., by `filter` or `mapValues`) they can be joined in constant time without hash +evaluations. To leverage this indexed data-structure, the `VertexRDD` exposes the following +additional functionality: {% highlight scala %} -class VertexRDD[VD] { +class VertexRDD[VD] extends RDD[(VertexID, VD)] { // Filter the vertex set but preserves the internal index - def filter(pred: Tuple2[VertexID, VD] => Boolean): VertexRDD[VD] + def filter(pred: Tuple2[VertexId, VD] => Boolean): VertexRDD[VD] // Transform the values without changing the ids (preserves the internal index) def mapValues[VD2](map: VD => VD2): VertexRDD[VD2] - def mapValues[VD2](map: (VertexID, VD) => VD2): VertexRDD[VD2] + def mapValues[VD2](map: (VertexId, VD) => VD2): VertexRDD[VD2] // Remove vertices from this set that appear in the other set def diff(other: VertexRDD[VD]): VertexRDD[VD] // Join operators that take advantage of the internal indexing to accelerate joins (substantially) - def leftJoin[VD2, VD3](other: RDD[(VertexID, VD2)])(f: (VertexID, VD, Option[VD2]) => VD3): VertexRDD[VD3] - def innerJoin[U, VD2](other: RDD[(VertexID, U)])(f: (VertexID, VD, U) => VD2): VertexRDD[VD2] + def leftJoin[VD2, VD3](other: RDD[(VertexId, VD2)])(f: (VertexId, VD, Option[VD2]) => VD3): VertexRDD[VD3] + def innerJoin[U, VD2](other: RDD[(VertexId, U)])(f: (VertexId, VD, U) => VD2): VertexRDD[VD2] // Use the index on this RDD to accelerate a `reduceByKey` operation on the input RDD. - def aggregateUsingIndex[VD2](other: RDD[(VertexID, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] + def aggregateUsingIndex[VD2](other: RDD[(VertexId, VD2)], reduceFunc: (VD2, VD2) => VD2): VertexRDD[VD2] } {% endhighlight %} Notice, for example, how the `filter` operator returns an `VertexRDD`. Filter is actually implemented using a `BitSet` thereby reusing the index and preserving the ability to do fast joins with other `VertexRDD`s. Likewise, the `mapValues` operators do not allow the `map` function to -change the `VertexId` thereby enabling the same `HashMap` data-structures to be reused. Both the +change the `VertexID` thereby enabling the same `HashMap` data-structures to be reused. Both the `leftJoin` and `innerJoin` are able to identify when joining two `VertexRDD`s derived from the same `HashMap` and implement the join by linear scan rather than costly point lookups. -The `aggregateUsingIndex` operator can be slightly confusing but is also useful for efficient -construction of a new `VertexRDD` from an `RDD[(VertexId, A)]`. Conceptually, if I have constructed -a `VertexRDD[B]` over a set of vertices, *which is a super-set* of the vertices in some -`RDD[(VertexId, A)]` then I can reuse the index to both aggregate and then subsequently index the -RDD. For example: +The `aggregateUsingIndex` operator is useful for efficient construction of a new `VertexRDD` from an +`RDD[(VertexID, A)]`. Conceptually, if I have constructed a `VertexRDD[B]` over a set of vertices, +*which is a super-set* of the vertices in some `RDD[(VertexID, A)]` then I can reuse the index to +both aggregate and then subsequently index the `RDD[(VertexID, A)]`. For example: {% highlight scala %} val setA: VertexRDD[Int] = VertexRDD(sc.parallelize(0L until 100L).map(id => (id, 1))) -val rddB: RDD[(VertexID, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) +val rddB: RDD[(VertexId, Double)] = sc.parallelize(0L until 100L).flatMap(id => List((id, 1.0), (id, 2.0))) // There should be 200 entries in rddB rddB.count val setB: VertexRDD[Double] = setA.aggregateUsingIndex(rddB, _ + _) @@ -813,10 +908,10 @@ val setC: VertexRDD[Double] = setA.innerJoin(setB)((id, a, b) => a + b) ## EdgeRDDs -The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` is considerably simpler than the `VertexRDD`. -GraphX organizes the edges in blocks partitioned using one of the various partitioning strategies -defined in [`PartitionStrategy`][PartitionStrategy]. Within each partition, edge attributes and -adjacency structure, are stored separately enabling maximum reuse when changing attribute values. +The `EdgeRDD[ED]`, which extends `RDD[Edge[ED]]` organizes the edges in blocks partitioned using one +of the various partitioning strategies defined in [`PartitionStrategy`][PartitionStrategy]. Within +each partition, edge attributes and adjacency structure, are stored separately enabling maximum +reuse when changing attribute values. [PartitionStrategy]: api/graphx/index.html#org.apache.spark.graphx.PartitionStrategy @@ -827,11 +922,11 @@ def mapValues[ED2](f: Edge[ED] => ED2): EdgeRDD[ED2] // Revere the edges reusing both attributes and structure def reverse: EdgeRDD[ED] // Join two `EdgeRDD`s partitioned using the same partitioning strategy. -def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexID, VertexID, ED, ED2) => ED3): EdgeRDD[ED3] +def innerJoin[ED2, ED3](other: EdgeRDD[ED2])(f: (VertexId, VertexId, ED, ED2) => ED3): EdgeRDD[ED3] {% endhighlight %} In most applications we have found that operations on the `EdgeRDD` are accomplished through the -graph or rely on operations defined in the base `RDD` class. +graph operators or rely on operations defined in the base `RDD` class. # Optimized Representation @@ -853,7 +948,9 @@ reduce both the communication and storage overhead. Logically, this corresponds to machines and allowing vertices to span multiple machines. The exact method of assigning edges depends on the [`PartitionStrategy`][PartitionStrategy] and there are several tradeoffs to the various heuristics. Users can choose between different strategies by repartitioning the graph with -the [`Graph.partitionBy`][Graph.partitionBy] operator. +the [`Graph.partitionBy`][Graph.partitionBy] operator. The default partitioning strategy is to use +the initial partitioning of the edges as provided on graph construction. However, users can easily +switch to 2D-partitioning or other heuristics included in GraphX. [Graph.partitionBy]: api/graphx/index.html#org.apache.spark.graphx.Graph$@partitionBy(partitionStrategy:org.apache.spark.graphx.PartitionStrategy):org.apache.spark.graphx.Graph[VD,ED] @@ -867,16 +964,15 @@ the [`Graph.partitionBy`][Graph.partitionBy] operator. Once the edges have be partitioned the key challenge to efficient graph-parallel computation is efficiently joining vertex attributes with the edges. Because real-world graphs typically have more -edges than vertices, we move vertex attributes to the edges. - - - - +edges than vertices, we move vertex attributes to the edges. Because not all partitions will +contain edges adjacent to all vertices we internally maintain a routing table which identifies where +to broadcast vertices when implementing the join required for operations like `triplets` and +`mapReduceTriplets`. # Graph Algorithms -GraphX includes a set of graph algorithms in to simplify analytics. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used. +GraphX includes a set of graph algorithms to simplify analytics tasks. The algorithms are contained in the `org.apache.spark.graphx.lib` package and can be accessed directly as methods on `Graph` via [`GraphOps`][GraphOps]. This section describes the algorithms and how they are used. ## PageRank @@ -953,13 +1049,6 @@ val triCountByUsername = users.join(triCounts).map { case (id, (username, tc)) = println(triCountByUsername.collect().mkString("\n")) {% endhighlight %} --Note that
+ +mapReduceTriplets
takes an additional optionalactiveSet
-(see API docs) which restricts the map phase to edges adjacent to the vertices in the provided -VertexRDD
: -Note that
+ {% highlight scala %} activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None {% endhighlight %} -mapReduceTriplets
takes an additional optionalactiveSet
+(not shown above see API docs for details) which restricts the map phase to edges adjacent to the +vertices in the providedVertexRDD
:-The EdgeDirection specifies which edges adjacent to the vertex set are included in the map phase. If -the direction is
+ +In
,mapFunc
will only be run only on edges with -destination in the active set. If the direction isOut
,mapFunc
will only -be run only on edges originating from vertices in the active set. If the direction is -Either
,mapFunc
will be run only on edges with either vertex in the -active set. If the direction isBoth
,mapFunc
will be run only on edges -with both vertices in the active set. The active set must be derived from the set of vertices in -the graph. Restricting computation to triplets adjacent to a subset of the vertices is often -necessary in incremental iterative computation and is a key part of the GraphX implementation of -Pregel. -The EdgeDirection specifies which edges adjacent to the vertex set are included in the map +phase. If the direction is
+In
, then the user definedmap
function will +only be run only on edges with the destination vertex in the active set. If the direction is +Out
, then themap
function will only be run only on edges originating from +vertices in the active set. If the direction isEither
, then themap
+function will be run only on edges with either vertex in the active set. If the direction is +Both
, then themap
function will be run only on edges with both vertices +in the active set. The active set must be derived from the set of vertices in the graph. +Restricting computation to triplets adjacent to a subset of the vertices is often necessary in +incremental iterative computation and is a key part of the GraphX implementation of Pregel.
- - -
# Examples diff --git a/docs/img/java-sm.png b/docs/img/java-sm.png new file mode 100644 index 0000000000000..a82ee7d682e49 Binary files /dev/null and b/docs/img/java-sm.png differ diff --git a/docs/img/python-sm.png b/docs/img/python-sm.png new file mode 100644 index 0000000000000..ae01e05252abd Binary files /dev/null and b/docs/img/python-sm.png differ diff --git a/docs/img/scala-sm.png b/docs/img/scala-sm.png new file mode 100644 index 0000000000000..30db034b70cf9 Binary files /dev/null and b/docs/img/scala-sm.png differ diff --git a/docs/img/streaming-arch.png b/docs/img/streaming-arch.png new file mode 100644 index 0000000000000..bc57b460fdf8b Binary files /dev/null and b/docs/img/streaming-arch.png differ diff --git a/docs/img/streaming-dstream-ops.png b/docs/img/streaming-dstream-ops.png new file mode 100644 index 0000000000000..a1c5634aa3c3a Binary files /dev/null and b/docs/img/streaming-dstream-ops.png differ diff --git a/docs/img/streaming-dstream-window.png b/docs/img/streaming-dstream-window.png new file mode 100644 index 0000000000000..276d2fee5e30e Binary files /dev/null and b/docs/img/streaming-dstream-window.png differ diff --git a/docs/img/streaming-dstream.png b/docs/img/streaming-dstream.png new file mode 100644 index 0000000000000..90f43b8c7138c Binary files /dev/null and b/docs/img/streaming-dstream.png differ diff --git a/docs/img/streaming-figures.pptx b/docs/img/streaming-figures.pptx new file mode 100644 index 0000000000000..1b18c2ee0ea3e Binary files /dev/null and b/docs/img/streaming-figures.pptx differ diff --git a/docs/img/streaming-flow.png b/docs/img/streaming-flow.png new file mode 100644 index 0000000000000..a870cb9b1839b Binary files /dev/null and b/docs/img/streaming-flow.png differ diff --git a/docs/index.md b/docs/index.md index debdb33108676..4eb297df39144 100644 --- a/docs/index.md +++ b/docs/index.md @@ -9,7 +9,7 @@ It also supports a rich set of higher-level tools including [Shark](http://shark # Downloading -Get Spark by visiting the [downloads page](http://spark.incubator.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}. +Get Spark by visiting the [downloads page](http://spark.apache.org/downloads.html) of the Apache Spark site. This documentation is for Spark version {{site.SPARK_VERSION}}. Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). All you need to run it is to have `java` to installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. @@ -19,7 +19,7 @@ Spark uses [Simple Build Tool](http://www.scala-sbt.org), which is bundled with sbt/sbt assembly -For its Scala API, Spark {{site.SPARK_VERSION}} depends on Scala {{site.SCALA_VERSION}}. If you write applications in Scala, you will need to use this same version of Scala in your own program -- newer major versions may not work. You can get the right version of Scala from [scala-lang.org](http://www.scala-lang.org/download/). +For its Scala API, Spark {{site.SPARK_VERSION}} depends on Scala {{site.SCALA_BINARY_VERSION}}. If you write applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X) -- newer major versions may not work. You can get the right version of Scala from [scala-lang.org](http://www.scala-lang.org/download/). # Running the Examples and Shell @@ -75,7 +75,7 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui * [Spark Programming Guide](scala-programming-guide.html): an overview of Spark concepts, and details on the Scala API * [Java Programming Guide](java-programming-guide.html): using Spark from Java * [Python Programming Guide](python-programming-guide.html): using Spark from Python -* [Spark Streaming](streaming-programming-guide.html): using the alpha release of Spark Streaming +* [Spark Streaming](streaming-programming-guide.html): Spark's API for processing data streams * [MLlib (Machine Learning)](mllib-guide.html): Spark's built-in machine learning library * [Bagel (Pregel on Spark)](bagel-programming-guide.html): simple graph processing model * [GraphX (Graphs on Spark)](graphx-programming-guide.html): Spark's new API for graphs @@ -96,7 +96,7 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui * [Amazon EC2](ec2-scripts.html): scripts that let you launch a cluster on EC2 in about 5 minutes * [Standalone Deploy Mode](spark-standalone.html): launch a standalone cluster quickly without a third-party cluster manager * [Mesos](running-on-mesos.html): deploy a private cluster using - [Apache Mesos](http://incubator.apache.org/mesos) + [Apache Mesos](http://mesos.apache.org) * [YARN](running-on-yarn.html): deploy Spark on top of Hadoop NextGen (YARN) **Other documents:** @@ -110,20 +110,20 @@ For this version of Spark (0.8.1) Hadoop 2.2.x (or newer) users will have to bui **External resources:** -* [Spark Homepage](http://spark.incubator.apache.org) +* [Spark Homepage](http://spark.apache.org) * [Shark](http://shark.cs.berkeley.edu): Apache Hive over Spark -* [Mailing Lists](http://spark.incubator.apache.org/mailing-lists.html): ask questions about Spark here +* [Mailing Lists](http://spark.apache.org/mailing-lists.html): ask questions about Spark here * [AMP Camps](http://ampcamp.berkeley.edu/): a series of training camps at UC Berkeley that featured talks and exercises about Spark, Shark, Mesos, and more. [Videos](http://ampcamp.berkeley.edu/agenda-2012), [slides](http://ampcamp.berkeley.edu/agenda-2012) and [exercises](http://ampcamp.berkeley.edu/exercises-2012) are available online for free. -* [Code Examples](http://spark.incubator.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/incubator-spark/tree/master/examples/src/main/scala/) of Spark +* [Code Examples](http://spark.apache.org/examples.html): more are also available in the [examples subfolder](https://github.com/apache/spark/tree/master/examples/src/main/scala/) of Spark * [Paper Describing Spark](http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf) * [Paper Describing Spark Streaming](http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf) # Community -To get help using Spark or keep up with Spark development, sign up for the [user mailing list](http://spark.incubator.apache.org/mailing-lists.html). +To get help using Spark or keep up with Spark development, sign up for the [user mailing list](http://spark.apache.org/mailing-lists.html). If you're in the San Francisco Bay Area, there's a regular [Spark meetup](http://www.meetup.com/spark-users/) every few weeks. Come by to meet the developers and other users. diff --git a/docs/java-programming-guide.md b/docs/java-programming-guide.md index 07732fa1229f3..5c73dbb25ede8 100644 --- a/docs/java-programming-guide.md +++ b/docs/java-programming-guide.md @@ -189,7 +189,7 @@ We hope to generate documentation with Java-style syntax in the future. # Where to Go from Here Spark includes several sample programs using the Java API in -[`examples/src/main/java`](https://github.com/apache/incubator-spark/tree/master/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the +[`examples/src/main/java`](https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples). You can run them by passing the class name to the `bin/run-example` script included in Spark; for example: ./bin/run-example org.apache.spark.examples.JavaWordCount diff --git a/docs/js/main.js b/docs/js/main.js index 8b137891791fe..0bd2286cced19 100755 --- a/docs/js/main.js +++ b/docs/js/main.js @@ -1 +1,80 @@ +function codeTabs() { + var counter = 0; + var langImages = { + "scala": "img/scala-sm.png", + "python": "img/python-sm.png", + "java": "img/java-sm.png" + }; + $("div.codetabs").each(function() { + $(this).addClass("tab-content"); + // Insert the tab bar + var tabBar = $(' '); + $(this).before(tabBar); + + // Add each code sample to the tab bar: + var codeSamples = $(this).children("div"); + codeSamples.each(function() { + $(this).addClass("tab-pane"); + var lang = $(this).data("lang"); + var image = $(this).data("image"); + var notabs = $(this).data("notabs"); + var capitalizedLang = lang.substr(0, 1).toUpperCase() + lang.substr(1); + var id = "tab_" + lang + "_" + counter; + $(this).attr("id", id); + if (image != null && langImages[lang]) { + var buttonLabel = ""; + } else if (notabs == null) { + var buttonLabel = "" + capitalizedLang + ""; + } else { + var buttonLabel = "" + } + tabBar.append( + '+ +
+ +Internally, it works as follows. Spark Streaming receives live input data streams and divides +the data into batches, which are then processed by the Spark engine to generate the final +stream of results in batches. + ++ +
+ +Spark Streaming provides a high-level abstraction called *discretized stream* or *DStream*, +which represents a continuous stream of data. DStreams can be created either from input data +stream from sources such as Kafka and Flume, or by applying high-level +operations on other DStreams. Internally, a DStream is represented as a sequence of +[RDDs](api/core/index.html#org.apache.spark.rdd.RDD). + +This guide shows you how to start writing Spark Streaming programs with DStreams. You can +write Spark Streaming programs in Scala or Java, both of which are presented in this guide. You +will find tabs throughout this guide that let you choose between Scala and Java +code snippets. + +*************************************************************************************************** + +# A Quick Example +Before we go into the details of how to write your own Spark Streaming program, +let's take a quick look at what a simple Spark Streaming program looks like. Let's say we want to +count the number of words in text data received from a data server listening on a TCP +socket. All you need to +do is as follows. + ++{% highlight bash %} +# TERMINAL 1: +# Running Netcat + +$ nc -lk 9999 + +hello world + + + +... +{% endhighlight %} + | ++ | +{% highlight bash %} +# TERMINAL 2: RUNNING NetworkWordCount or JavaNetworkWordCount + +$ ./bin/run-example org.apache.spark.streaming.examples.NetworkWordCount local[2] localhost 9999 +... +------------------------------------------- +Time: 1357008430000 ms +------------------------------------------- +(hello,1) +(world,1) +... +{% endhighlight %} + | +
Source | Artifact |
---|---|
Kafka | spark-streaming-kafka_{{site.SCALA_BINARY_VERSION}} |
Flume | spark-streaming-flume_{{site.SCALA_BINARY_VERSION}} |
spark-streaming-twitter_{{site.SCALA_BINARY_VERSION}} | |
ZeroMQ | spark-streaming-zeromq_{{site.SCALA_BINARY_VERSION}} |
MQTT | spark-streaming-mqtt_{{site.SCALA_BINARY_VERSION}} |
+ +
+ +Any operation applied on a DStream translates to operations on the underlying RDDs. For example, +in the [earlier example](#a-quick-example) of converting a stream of lines to words, +the `flatmap` operation is applied on each RDD in the `lines` DStream to generate the RDDs of the + `words` DStream. This is shown the following figure. + ++ +
+ + +These underlying RDD transformations are computed by the Spark engine. The DStream operations +hide most of these details and provides the developer with higher-level API for convenience. +These operations are discussed in detail in later sections. + +## Input Sources + +We have already taken a look at the `streamingContext.socketTextStream(...)` in the [quick +example](#a-quick-example) which creates a DStream from text +data received over a TCP socket connection. Besides sockets, the core Spark Streaming API provides +methods for creating DStreams from files and Akka actors as input sources. + +Specifically, for files, the DStream can be created as + +Transformation | Meaning | ||
---|---|---|---|
Transformation | Meaning | ||
map(func) | -Returns a new DStream formed by passing each element of the source DStream through a function func. | -||
filter(func) | -Returns a new DStream formed by selecting those elements of the source DStream on which func returns true. | +Return a new DStream by passing each element of the source DStream through a + function func. | |
flatMap(func) | - Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item). |
+ Similar to map, but each input item can be mapped to 0 or more output items. | |
mapPartitions(func) | -Similar to map, but runs separately on each partition (block) of the DStream, so func must be of type - Iterator[T] => Iterator[U] when running on an DStream of type T. | +filter(func) | +Return a new DStream by selecting only the records of the source DStream on which + func returns true. |
repartition(numPartitions) | @@ -82,329 +465,681 @@ DStreams support many of the transformations available on normal Spark RDD's:|||
union(otherStream) | -Return a new DStream that contains the union of the elements in the source DStream and the argument DStream. | +Return a new DStream that contains the union of the elements in the source DStream and + otherDStream. | |
count() | -Returns a new DStream of single-element RDDs by counting the number of elements in each RDD of the source DStream. | +Return a new DStream of single-element RDDs by counting the number of elements in each RDD + of the source DStream. | |
reduce(func) | -Returns a new DStream of single-element RDDs by aggregating the elements in each RDD of the source DStream using a function func (which takes two arguments and returns one). The function should be associative so that it can be computed in parallel. | +Return a new DStream of single-element RDDs by aggregating the elements in each RDD of the + source DStream using a function func (which takes two arguments and returns one). + The function should be associative so that it can be computed in parallel. | |
countByValue() | -When called on a DStream of elements of type K, returns a new DStream of (K, Long) pairs where the value of each key is its frequency in each RDD of the source DStream. | -||
groupByKey([numTasks]) | - When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together all the values of each key in the RDDs of the source DStream. - Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks.
- |
+ When called on a DStream of elements of type K, return a new DStream of (K, Long) pairs + where the value of each key is its frequency in each RDD of the source DStream. | |
reduceByKey(func, [numTasks]) | - When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function. Like in groupByKey , the number of reduce tasks is configurable through an optional second argument. |
+ When called on a DStream of (K, V) pairs, return a new DStream of (K, V) pairs where the
+ values for each key are aggregated using the given reduce function. Note: By default,
+ this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to
+ do the grouping. You can pass an optional numTasks argument to set a different
+ number of tasks. |
|
join(otherStream, [numTasks]) | -When called on two DStreams of (K, V) and (K, W) pairs, returns a new DStream of (K, (V, W)) pairs with all pairs of elements for each key. | +When called on two DStreams of (K, V) and (K, W) pairs, return a new DStream of (K, (V, W)) + pairs with all pairs of elements for each key. | |
cogroup(otherStream, [numTasks]) | -When called on DStream of (K, V) and (K, W) pairs, returns a new DStream of (K, Seq[V], Seq[W]) tuples. | +When called on DStream of (K, V) and (K, W) pairs, return a new DStream of + (K, Seq[V], Seq[W]) tuples. | |
transform(func) | -Returns a new DStream by applying func (a RDD-to-RDD function) to every RDD of the stream. This can be used to do arbitrary RDD operations on the DStream. | +Return a new DStream by applying a RDD-to-RDD function to every RDD of the source DStream. + This can be used to do arbitrary RDD operations on the DStream. | |
updateStateByKey(func) | -Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of each key. This can be used to track session state by using the session-id as the key and updating the session state as new data is received. | -
Transformation | Meaning | |
---|---|---|
window(windowDuration, slideDuration) | -Return a new DStream which is computed based on windowed batches of the source DStream. windowDuration is the width of the window and slideTime is the frequency during which the window is calculated. Both times must be multiples of the batch interval. - | -|
countByWindow(windowDuration, slideDuration) | - Return a sliding count of elements in the stream. windowDuration and slideDuration are exactly as defined in window() .
- |
-|
reduceByWindow(func, windowDuration, slideDuration) | - Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel. windowDuration and slideDuration are exactly as defined in window() .
- |
-|
groupByKeyAndWindow(windowDuration, slideDuration, [numTasks]) - | - When called on a DStream of (K, V) pairs, returns a new DStream of (K, Seq[V]) pairs by grouping together values of each key over batches in a sliding window. -Note: By default, this uses Spark's default number of parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional numTasks argument to set a different number of tasks. |
-|
reduceByKeyAndWindow(func, windowDuration, slideDuration, [numTasks]) | - When called on a DStream of (K, V) pairs, returns a new DStream of (K, V) pairs where the values for each key are aggregated using the given reduce function func over batches in a sliding window. Like in groupByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
- windowDuration and slideDuration are exactly as defined in window() .
- |
-|
reduceByKeyAndWindow(func, invFunc, windowDuration, slideDuration, [numTasks]) | - A more efficient version of the above reduceByKeyAndWindow() where the reduce value of each window is calculated
- incrementally using the reduce values of the previous window. This is done by reducing the new data that enter the sliding window, and "inverse reducing" the old data that leave the window. An example would be that of "adding" and "subtracting" counts of keys as the window slides. However, it is applicable to only "invertible reduce functions", that is, those reduce functions which have a corresponding "inverse reduce" function (taken as parameter invFunc. Like in groupByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
- windowDuration and slideDuration are exactly as defined in window() .
- |
-|
countByValueAndWindow(windowDuration, slideDuration, [numTasks]) | - When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the value of each key is its frequency within a sliding window. Like in groupByKeyAndWindow , the number of reduce tasks is configurable through an optional second argument.
- windowDuration and slideDuration are exactly as defined in window() .
- |
+ Return a new "state" DStream where the state for each key is updated by applying the + given function on the previous state of the key and the new values for the key. This can be + used to maintain arbitrary state data for each ket. |
Operator | Meaning |
---|---|
foreachRDD(func) | -The fundamental output operator. Applies a function, func, to each RDD generated from the stream. This function should have side effects, such as printing output, saving the RDD to external files, or writing it over the network to an external system. | -
print() | -Prints first ten elements of every batch of data in a DStream on the driver. | -
saveAsObjectFiles(prefix, [suffix]) | - Save this DStream's contents as a SequenceFile of serialized objects. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]".
- |
-
saveAsTextFiles(prefix, [suffix]) | -Save this DStream's contents as a text files. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". | -
saveAsHadoopFiles(prefix, [suffix]) | -Save this DStream's contents as a Hadoop file. The file name at each batch interval is generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". | -
-{% highlight bash %}
-# TERMINAL 1
-# RUNNING NETCAT
+In fact, you can also use [machine learning](mllib-guide.html) and
+[graph computation](graphx-programming-guide.html) algorithms in the `transform` method.
-$ nc -lk 9999
-hello world
+Window Operations+Finally, Spark Streaming also provides *windowed computations*, which allow you to apply +transformations over a sliding window of data. This following figure illustrates this sliding +window. ++ + +As shown in the figure, every time the window *slides* over a source DStream, +the source RDDs that fall within the window are combined and operated upon to produce the +RDDs of the windowed DStream. In this specific case, the operation is applied over last 3 time +units of data, and slides by 2 time units. This shows that any window-based operation needs to +specify two parameters. + * window length - The duration of the window (3 in the figure) + * slide interval - The interval at which the window-based operation is performed (2 in + the figure). -... +These two parameters must be multiples of the batch interval of the source DStream (1 in the +figure). + +Let's illustrate the window operations with an example. Say, you want to extend the +[earlier example](#a-quick-example) by generating word counts over last 30 seconds of data, +every 10 seconds. To do this, we have to apply the `reduceByKey` operation on the `pairs` DStream of +`(word, 1)` pairs over the last 30 seconds of data. This is done using the +operation `reduceByKeyAndWindow`. + +
+
+
+{% highlight scala %}
+// Reduce last 30 seconds of data, every 10 seconds
+val windowedWordCounts = pairs.reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
{% endhighlight %}
- |
-
-{% highlight bash %}
-# TERMINAL 2: RUNNING NetworkWordCount
-...
--------------------------------------------
-Time: 1357008430000 ms
--------------------------------------------
-(hello,1)
-(world,1)
-...
+
+
+
+{% highlight java %}
+// Reduce function adding two integers, defined separately for clarity
+Function2 |
-
Transformation | Meaning |
---|---|
window(windowLength, slideInterval) | +Return a new DStream which is computed based on windowed batches of the source DStream. + | +
countByWindow(windowLength, slideInterval) | +Return a sliding window count of elements in the stream. + | +
reduceByWindow(func, windowLength, slideInterval) | +Return a new single-element stream, created by aggregating elements in the stream over a + sliding interval using func. The function should be associative so that it can be computed + correctly in parallel. + | +
reduceByKeyAndWindow(func, windowLength, slideInterval, + [numTasks]) | + When called on a DStream of (K, V) pairs, returns a new DStream of (K, V)
+ pairs where the values for each key are aggregated using the given reduce function func
+ over batches in a sliding window. Note: By default, this uses Spark's default number of
+ parallel tasks (2 for local machine, 8 for a cluster) to do the grouping. You can pass an optional
+ numTasks argument to set a different number of tasks.
+ |
+
reduceByKeyAndWindow(func, invFunc, windowLength, + slideInterval, [numTasks]) | + A more efficient version of the above reduceByKeyAndWindow() where the reduce
+ value of each window is calculated incrementally using the reduce values of the previous window.
+ This is done by reducing the new data that enter the sliding window, and "inverse reducing" the
+ old data that leave the window. An example would be that of "adding" and "subtracting" counts
+ of keys as the window slides. However, it is applicable to only "invertible reduce functions",
+ that is, those reduce functions which have a corresponding "inverse reduce" function (taken as
+ parameter invFunc. Like in reduceByKeyAndWindow , the number of reduce tasks
+ is configurable through an optional argument.
+ |
+
countByValueAndWindow(windowLength, + slideInterval, [numTasks]) | + When called on a DStream of (K, V) pairs, returns a new DStream of (K, Long) pairs where the
+ value of each key is its frequency within a sliding window. Like in
+ reduceByKeyAndWindow , the number of reduce tasks is configurable through an
+ optional argument.
+ |
+
Output Operation | Meaning |
---|---|
print() | +Prints first ten elements of every batch of data in a DStream on the driver. | +
foreachRDD(func) | +The fundamental output operator. Applies a function, func, to each RDD generated from + the stream. This function should have side effects, such as printing output, saving the RDD to + external files, or writing it over the network to an external system. | +
saveAsObjectFiles(prefix, [suffix]) | + Save this DStream's contents as a SequenceFile of serialized objects. The file
+ name at each batch interval is generated based on prefix and
+ suffix: "prefix-TIME_IN_MS[.suffix]".
+ |
+
saveAsTextFiles(prefix, [suffix]) | +Save this DStream's contents as a text files. The file name at each batch interval is + generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". | +
saveAsHadoopFiles(prefix, [suffix]) | +Save this DStream's contents as a Hadoop file. The file name at each batch interval is + generated based on prefix and suffix: "prefix-TIME_IN_MS[.suffix]". | +