Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4672][Core]Checkpoint() should clear f to shorten the serialization chain #3545

Closed
wants to merge 3 commits into from

Conversation

JerryLead
Copy link
Contributor

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

The f closure of PartitionsRDD(ZippedPartitionsRDD2) contains a $outer that references EdgeRDD/VertexRDD, which causes task's serialization chain become very long in iterative GraphX applications. As a result, StackOverflow error will occur. If we set "f = null" in clearDependencies(), checkpoint() can cut off the long serialization chain. More details and explanation can be found in the JIRA.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented Dec 2, 2014

cc @tdas can you take a look at this?

@rxin
Copy link
Contributor

rxin commented Dec 2, 2014

I talked to @tdas and this is fine, but even with this, we should figure out why f is capturing its outer this way and remove that since it is expensive for serialization. cc @ankurdave

@ankurdave
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Dec 3, 2014

Test build #24064 has started for PR 3545 at commit f7faea5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 3, 2014

Test build #24064 has finished for PR 3545 at commit f7faea5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24064/
Test PASSed.

@jason-dai
Copy link

Maybe we can try something like:

class ZippedPartitionsRDD2 (sc, f, …) {
  val cleanF = (part1, part2, ctx) => sc.clean(f(rdd1.iterator(part1, ctx), rdd2.iterator(part2, context)))

  override def compute(s: Partition, context: TaskContext): Iterator[V] = {
    …
    cleanF(partitions(0), partitions(1), context)
  }
  …
}

@JerryLead
Copy link
Contributor Author

I write a simple example as follows to show how the abnormal $outer is generated. I think the reason is that zipPartitions() in foo() uses the VertexRDD.bar(). As a result, VertexRDD becomes the $outer and its member variable PartitionsRDD will be serialized. @rxin @tdas @ankurdave @jason-dai

The f=>$outer=>VertexRDD=>PartitionsRDD=>v will be serialized while the task is being serialized (shown in the following figures). P.S., Does cleanF in compute() have problems when this RDD is recomputed?

import org.apache.spark.SparkContext
import org.apache.spark.rdd._


class PartitionsRDD extends Serializable {
  val v = "partitionsRDD"
}

class VertexRDD extends Serializable {

  val partitionsRDD = new PartitionsRDD

  def foo(pairs1: RDD[(Int, Char)], pairs2: RDD[(Int, Char)]) = {
    val zipRDD = pairs1.zipPartitions(pairs2) {
      (thisIter, otherIter) =>
        val p1 = thisIter.next()
        val p2 = otherIter.next()

        Iterator((p1._1 + p2._1 + bar(), p1._2 + ":" + p2._2))
    }
    zipRDD
  }

  def bar() = 5
}

object AbnormalFClosure {


  def main(args: Array[String]) {

    val sc = new SparkContext("local", "ZippedPartition Test")


    val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'),
      (3, 'c'), (4, 'd'),
      (5, 'e'), (3, 'f'),
      (2, 'g'), (1, 'h'))
    val pairs1 = sc.parallelize(data1, 2)


    val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'),
      (3, 'C'), (4, 'D'))
    val pairs2 = sc.parallelize(data2, 2)

    val zipRDD = new VertexRDD().foo(pairs1, pairs2)

    zipRDD.count()

  }
}

The serialization chain of f=>$outer(i.e., VertexRDD)=>PartitionsRDD=>v:
bb7f311ac041c1affbe633e96fe12634

PartitionsRDD is being serialized:
2d06b6eb2694f9b5dd268f373dcae9de

@jason-dai
Copy link

I believe ClosureCleaner.clean() is defined to deal with exactly this issue: scala may capture the entire class in closure, even if only one member variable is used.

asfgit pushed a commit that referenced this pull request Dec 3, 2014
…ation chain

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

The f closure of `PartitionsRDD(ZippedPartitionsRDD2)` contains a `$outer` that references EdgeRDD/VertexRDD, which causes task's serialization chain become very long in iterative GraphX applications. As a result, StackOverflow error will occur. If we set "f = null" in `clearDependencies()`, checkpoint() can cut off the long serialization chain. More details and explanation can be found in the JIRA.

Author: JerryLead <JerryLead@163.com>
Author: Lijie Xu <csxulijie@gmail.com>

Closes #3545 from JerryLead/my_core and squashes the following commits:

f7faea5 [JerryLead] checkpoint() should clear the f to avoid StackOverflow error
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master

(cherry picked from commit 77be8b9)
Signed-off-by: Ankur Dave <ankurdave@gmail.com>
@asfgit asfgit closed this in 77be8b9 Dec 3, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants