Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to filter genotypeRDD on sample names? org.apache.spark.SparkException: Task not serializable? #891

Closed
NeillGibson opened this issue Nov 29, 2015 · 5 comments

Comments

@NeillGibson
Copy link
Contributor

Hi,

I am trying to filter a genotypeRDD based on a set of sample_names.
The error I get is

org.apache.spark.SparkException: Task not serializable
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
        - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1f1ffc18)
        - field (class: org.bdgenomics.adam.rdd.ADAMContext, name: sc, type: class org.apache.spark.SparkContext)
        - object (class org.bdgenomics.adam.rdd.ADAMContext, org.bdgenomics.adam.rdd.ADAMContext@33ec5c0b)

with the following code

import org.bdgenomics.adam.rdd.ADAMContext
import org.bdgenomics.formats.avro._
import org.bdgenomics.adam.rdd.ADAMContext._

# Load the genotype RDDs using the AdamContext
val ac = new ADAMContext(sc)

val sample_names = sc.textFile("/user/ec2-user/1kg/chr22_sample_names.list").collect.toList
val genotypesRDD = ac.loadGenotypes("/user/ec2-user/1kg/chr22.adam")

# This works
genotypesRDD.filter(gt =>  List("HG00096","HG00100").contains( gt.sampleId )).first

# This throws the error
genotypesRDD.filter(gt => human_sample_names.contains(gt.sampleId))

# This also throws the error
val test_two_sample_names = List("HG00096","HG00100")
genotypesRDD.filter(gt =>  test_two_sample_names.contains( gt.sampleId )).first

# This also throws the error
val sample_names_BC = sc.broadcast(sample_names)
genotypesRDD.filter(gt => sample_names_BC.value.contains(gt.sampleId))

Somehow it seems that the adamContext/sparkContext is included in the filter statement.

First I thought trough the list of sample names but maybe it is included trough the genotype objects?

But why does it only show up when I define an external list of names in the filter statement?

@NeillGibson
Copy link
Contributor Author

The full error message is:

org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
        at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:2032)
        at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335)
        at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
        at org.apache.spark.rdd.RDD.withScope(RDD.scala:310)
        at org.apache.spark.rdd.RDD.filter(RDD.scala:334)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:35)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:40)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:42)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:44)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:46)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:48)
        at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:50)
        at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:52)
        at $iwC$$iwC$$iwC$$iwC.<init>(<console>:54)
        at $iwC$$iwC$$iwC.<init>(<console>:56)
        at $iwC$$iwC.<init>(<console>:58)
        at $iwC.<init>(<console>:60)
        at <init>(<console>:62)
        at .<init>(<console>:66)
        at .<clinit>(<console>)
        at .<init>(<console>:7)
        at .<clinit>(<console>)
        at $print(<console>)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
        at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340)
        at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
        at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
        at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
        at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
        at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
        at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
        at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
        at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
        at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
        at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
        at org.apache.spark.repl.Main$.main(Main.scala:31)
        at org.apache.spark.repl.Main.main(Main.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:497)
        at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:674)
        at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.spark.SparkContext
Serialization stack:
        - object not serializable (class: org.apache.spark.SparkContext, value: org.apache.spark.SparkContext@1f1ffc18)
        - field (class: org.bdgenomics.adam.rdd.ADAMContext, name: sc, type: class org.apache.spark.SparkContext)
        - object (class org.bdgenomics.adam.rdd.ADAMContext, org.bdgenomics.adam.rdd.ADAMContext@1f7eaf31)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: ac, type: class org.bdgenomics.adam.rdd.ADAMContext)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@96345d9)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@79455bd7)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@691448ba)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@62ff9c57)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@2a4e1b6)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@4c2142af)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC@10a6a0c0)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC@29a87635)
        - field (class: $iwC$$iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC@133ce0c1)
        - field (class: $iwC$$iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC, $iwC$$iwC$$iwC@3c712f6e)
        - field (class: $iwC$$iwC, name: $iw, type: class $iwC$$iwC$$iwC)
        - object (class $iwC$$iwC, $iwC$$iwC@442dd426)
        - field (class: $iwC, name: $iw, type: class $iwC$$iwC)
        - object (class $iwC, $iwC@110ff7e2)
        - field (class: $line142.$read, name: $iw, type: class $iwC)
        - object (class $line142.$read, $line142.$read@5847aa1)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $VAL215, type: class $line142.$read)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@7bacb1b0)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC@2a9566f6)
        - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, name: $outer, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC)
        - object (class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1, <function1>)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:84)
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:301)
        ... 59 more

@fnothaft
Copy link
Member

fnothaft commented Dec 1, 2015

Interesting. I'm not seeing this on my side:

scala> import org.bdgenomics.adam.rdd.ADAMContext._
import org.bdgenomics.adam.rdd.ADAMContext._

scala> val samples = sc.loadGenotypes("adam-core/src/test/resources/small.vcf").map(_.getSampleId).collect
samples: Array[String] = Array(NA12878, NA12891, NA12892, NA12878, NA12891, NA12892, NA12878, NA12891, NA12892, NA12878, NA12891, NA12892, NA12878, NA12891, NA12892)

scala> samples.toList
res0: List[String] = List(NA12878, NA12891, NA12892, NA12878, NA12891, NA12892, NA12878, NA12891, NA12892, NA12878, NA12891, NA12892, NA12878, NA12891, NA12892)

scala> val fewSamples = samples.take(3).toList
fewSamples: List[String] = List(NA12878, NA12891, NA12892)

scala> val rdd = sc.loadGenotypes("adam-core/src/test/resources/small.vcf").filter(gt => fewSamples.contains(gt.getSampleId))
rdd: org.apache.spark.rdd.RDD[org.bdgenomics.formats.avro.Genotype] = MapPartitionsRDD[7] at filter at <console>:28

scala> rdd.count()
res1: Long = 15

I'll play around with this on our cluster later. You may not want to explicitly instantiate an ADAMContext. Actually, part of the issue may come from the fact that when we declare class ADAMContext(sc: SparkContext), we don't declare the SparkContext as transient. I haven't seen this come up as an issue before, but will see if I can get it to reproduce. What version of Spark are you running, BTW?

@NeillGibson
Copy link
Contributor Author

Hi Frank. Thank you for the information. I am running spark 1.5.2. I'll try tomorrow without explicitly instantiating the AdamContext.

@NeillGibson
Copy link
Contributor Author

Hi @fnothaft . Not explicitly instantiating the AdamContext but just importing
import org.bdgenomics.adam.rdd.ADAMContext._ solved the issue.

I don't fully understand how this work, not instantiating AdamContext but importing and still having access to its methods, but I am happy that it works.

fnothaft added a commit to fnothaft/adam that referenced this issue Dec 2, 2015
Closes bigdatagenomics#891. In org.bdgenomics.adam.rdd.ADAMContext, the val sparkContext:
SparkContext is not marked as @transient, which causes serialization issues.
SparkContexts are not serializable and should only be called from the driver.
@fnothaft
Copy link
Member

fnothaft commented Dec 2, 2015

Hi @NeillGibson !

I've opened a PR that should allow your old code to work: #894.

The singleton object for ADAMContext defines a set of implicit methods. The import org.bdgenomics.adam.rdd.ADAMContext._ import makes these implicit methods available. One of these methods converts a SparkContext into an ADAMContext.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants