From 2914723d61ed05c0e07bf324ad9ceffec6e7b5fb Mon Sep 17 00:00:00 2001 From: Michael L Heuer Date: Thu, 6 Dec 2018 10:52:27 -0600 Subject: [PATCH] Add java-specific methods where missing. --- .../adam/api/java/JavaADAMContext.scala | 32 +- .../bdgenomics/adam/rdd/GenomicDataset.scala | 519 +++++------------- .../NucleotideContigFragmentDataset.scala | 44 +- .../adam/rdd/feature/CoverageDataset.scala | 21 +- .../adam/rdd/feature/FeatureDataset.scala | 93 +++- .../adam/rdd/fragment/FragmentDataset.scala | 41 +- .../rdd/read/AlignmentRecordDataset.scala | 183 ++++-- .../adam/rdd/variant/GenotypeDataset.scala | 81 ++- .../rdd/variant/VariantContextDataset.scala | 13 +- .../adam/rdd/variant/VariantDataset.scala | 62 ++- 10 files changed, 589 insertions(+), 500 deletions(-) diff --git a/adam-apis/src/main/scala/org/bdgenomics/adam/api/java/JavaADAMContext.scala b/adam-apis/src/main/scala/org/bdgenomics/adam/api/java/JavaADAMContext.scala index 4135a40e6a..0f7a94817e 100644 --- a/adam-apis/src/main/scala/org/bdgenomics/adam/api/java/JavaADAMContext.scala +++ b/adam-apis/src/main/scala/org/bdgenomics/adam/api/java/JavaADAMContext.scala @@ -51,7 +51,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { def getSparkContext: JavaSparkContext = new JavaSparkContext(ac.sc) /** - * Load alignment records into an AlignmentRecordDataset (java-friendly method). + * (Java-specific) Load alignment records into an AlignmentRecordDataset. * * Loads path names ending in: * * .bam/.cram/.sam as BAM/CRAM/SAM format, @@ -79,7 +79,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load alignment records into an AlignmentRecordDataset (java-friendly method). + * (Java-specific) Load alignment records into an AlignmentRecordDataset. * * Loads path names ending in: * * .bam/.cram/.sam as BAM/CRAM/SAM format, @@ -111,7 +111,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Functions like loadBam, but uses BAM index files to look at fewer blocks, + * (Java-specific) Functions like loadBam, but uses BAM index files to look at fewer blocks, * and only returns records within the specified ReferenceRegions. BAM index file required. * * @param pathName The path name to load indexed BAM formatted alignment records from. @@ -132,7 +132,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load nucleotide contig fragments into a NucleotideContigFragmentDataset (java-friendly method). + * (Java-specific) Load nucleotide contig fragments into a NucleotideContigFragmentDataset. * * If the path name has a .fa/.fasta extension, load as FASTA format. * Else, fall back to Parquet + Avro. @@ -152,7 +152,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load fragments into a FragmentDataset (java-friendly method). + * (Java-specific) Load fragments into a FragmentDataset. * * Loads path names ending in: * * .bam/.cram/.sam as BAM/CRAM/SAM format and @@ -175,7 +175,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load fragments into a FragmentDataset (java-friendly method). + * (Java-specific) Load fragments into a FragmentDataset. * * Loads path names ending in: * * .bam/.cram/.sam as BAM/CRAM/SAM format and @@ -200,7 +200,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load features into a FeatureDataset (java-friendly method). + * (Java-specific) Load features into a FeatureDataset. * * Loads path names ending in: * * .bed as BED6/12 format, @@ -227,7 +227,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load features into a FeatureDataset (java-friendly method). + * (Java-specific) Load features into a FeatureDataset. * * Loads path names ending in: * * .bed as BED6/12 format, @@ -257,7 +257,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load features into a FeatureDataset and convert to a CoverageDataset (java-friendly method). + * (Java-specific) Load features into a FeatureDataset and convert to a CoverageDataset. * Coverage is stored in the score field of Feature. * * Loads path names ending in: @@ -285,7 +285,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load features into a FeatureDataset and convert to a CoverageDataset (java-friendly method). + * (Java-specific) Load features into a FeatureDataset and convert to a CoverageDataset. * Coverage is stored in the score field of Feature. * * Loads path names ending in: @@ -317,7 +317,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load genotypes into a GenotypeDataset (java-friendly method). + * (Java-specific) Load genotypes into a GenotypeDataset. * * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. * Else, fall back to Parquet + Avro. @@ -334,7 +334,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load genotypes into a GenotypeDataset (java-friendly method). + * (Java-specific) Load genotypes into a GenotypeDataset. * * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. * Else, fall back to Parquet + Avro. @@ -354,7 +354,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load variants into a VariantDataset (java-friendly method). + * (Java-specific) Load variants into a VariantDataset. * * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. * Else, fall back to Parquet + Avro. @@ -370,7 +370,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load variants into a VariantDataset (java-friendly method). + * (Java-specific) Load variants into a VariantDataset. * * If the path name has a .vcf/.vcf.gz/.vcf.bgzf/.vcf.bgz extension, load as VCF format. * Else, fall back to Parquet + Avro. @@ -388,7 +388,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load reference sequences into a broadcastable ReferenceFile (java-friendly method). + * (Java-specific) Load reference sequences into a broadcastable ReferenceFile. * * If the path name has a .2bit extension, loads a 2bit file. Else, uses loadContigFragments * to load the reference as an RDD, which is then collected to the driver. @@ -407,7 +407,7 @@ class JavaADAMContext(val ac: ADAMContext) extends Serializable { } /** - * Load reference sequences into a broadcastable ReferenceFile (java-friendly method). + * (Java-specific) Load reference sequences into a broadcastable ReferenceFile. * * If the path name has a .2bit extension, loads a 2bit file. Else, uses loadContigFragments * to load the reference as an RDD, which is then collected to the driver. Uses a diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicDataset.scala index 970c4f8953..1a4648cb10 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/GenomicDataset.scala @@ -148,7 +148,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transforms the underlying Dataset into a new Dataset + * (Scala-specific) Applies a function that transforms the underlying Dataset into a new Dataset * using the Spark SQL API. * * @param tFn A function that transforms the underlying Dataset as a Dataset. @@ -158,7 +158,17 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg def transformDataset(tFn: Dataset[U] => Dataset[U]): V /** - * Applies a function that transforms the underlying DataFrame into a new DataFrame + * (Java-specific) Applies a function that transforms the underlying Dataset into a new Dataset + * using the Spark SQL API. + * + * @param tFn A function that transforms the underlying Dataset as a Dataset. + * @return A new genomic dataset where the Dataset of genomic data has been replaced, but the + * metadata (sequence dictionary, and etc) are copied without modification. + */ + def transformDataset(tFn: JFunction[Dataset[U], Dataset[U]]): V + + /** + * (Scala-specific) Applies a function that transforms the underlying DataFrame into a new DataFrame * using the Spark SQL API. * * @param tFn A function that transforms the underlying data as a DataFrame. @@ -175,8 +185,8 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transforms the underlying DataFrame into a new DataFrame - * using the Spark SQL API. Java-friendly variant. + * (Java-specific) Applies a function that transforms the underlying DataFrame into a new DataFrame + * using the Spark SQL API. * * @param tFn A function that transforms the underlying DataFrame as a DataFrame. * @return A new genomic dataset where the DataFrame of genomic data has been replaced, but the @@ -189,7 +199,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transmutes the underlying Dataset into a new Dataset of a + * (Scala-specific) Applies a function that transmutes the underlying Dataset into a new Dataset of a * different type. * * @param tFn A function that transforms the underlying Dataset. @@ -204,8 +214,8 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transmutes the underlying Dataset into a new Dataset of a - * different type. Java friendly variant. + * (Java-specific) Applies a function that transmutes the underlying Dataset into a new Dataset of a + * different type. * * @param tFn A function that transforms the underlying Dataset. * @return A new genomic dataset where the Dataset of genomic data has been replaced, but the @@ -220,8 +230,8 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transmutes the underlying DataFrame into a new DataFrame of a - * different type. Java friendly variant. + * (Java-specific) Applies a function that transmutes the underlying DataFrame into a new DataFrame of a + * different type. * * @param tFn A function that transforms the underlying DataFrame. * @return A new genomic dataset where the DataFrame of genomic data has been replaced, but the @@ -239,8 +249,8 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transmutes the underlying DataFrame into a new DataFrame of a - * different type. Java friendly variant. + * (Java-specific) Applies a function that transmutes the underlying DataFrame into a new DataFrame of a + * different type. * * @param tFn A function that transforms the underlying DataFrame. * @return A new genomic dataset where the DataFrame of genomic data has been replaced, but the @@ -347,7 +357,6 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg * * @param pathName The path to save the partitioned Parquet flag file to. * @param partitionSize Partition bin size, in base pairs, used in Hive-style partitioning. - * */ private def writePartitionedParquetFlag(pathName: String, partitionSize: Int): Unit = { val path = new Path(pathName, "_partitionedByStartPos") @@ -470,14 +479,14 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Unions together multiple genomic datasets. + * (Scala-specific) Unions together multiple genomic datasets. * * @param datasets Genomic datasets to union with this genomic dataset. */ def union(datasets: V*): V /** - * Unions together multiple genomic datasets. + * (Java-specific) Unions together multiple genomic datasets. * * @param datasets Genomic datasets to union with this genomic dataset. */ @@ -487,7 +496,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transforms the underlying RDD into a new RDD. + * (Scala-specific) Applies a function that transforms the underlying RDD into a new RDD. * * @param tFn A function that transforms the underlying RDD. * @return A new genomic dataset where the RDD of genomic data has been replaced, but the @@ -498,8 +507,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transforms the underlying RDD into a new RDD. - * Java friendly variant. + * (Java-specific) Applies a function that transforms the underlying RDD into a new RDD. * * @param tFn A function that transforms the underlying RDD. * @return A new genomic dataset where the RDD of genomic data has been replaced, but the @@ -510,7 +518,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transmutes the underlying RDD into a new RDD of a + * (Scala-specific) Applies a function that transmutes the underlying RDD into a new RDD of a * different type. * * @param tFn A function that transforms the underlying RDD. @@ -523,8 +531,8 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Applies a function that transmutes the underlying RDD into a new RDD of a - * different type. Java friendly variant. + * (Java-specific) Applies a function that transmutes the underlying RDD into a new RDD of a + * different type. * * @param tFn A function that transforms the underlying RDD. * @param convFn The conversion function used to build the final RDD. @@ -626,7 +634,22 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Sorts our genome aligned data by reference positions, with contigs ordered + * (Java-specific) Sorts our genome aligned data by reference positions, with contigs ordered + * by index. + * + * @param partitions The number of partitions for the new genomic dataset. + * @param stringency The level of ValidationStringency to enforce. + * @return Returns a new genomic dataset containing sorted data. + */ + def sort(partitions: java.lang.Integer, + stringency: ValidationStringency)( + implicit tTag: ClassTag[T]): V = { + sort(partitions = partitions, + stringency = stringency)(ClassTag.AnyRef.asInstanceOf[ClassTag[T]]) + } + + /** + * (Scala-specific) Sorts our genome aligned data by reference positions, with contigs ordered * by index. * * @param partitions The number of partitions for the new genomic dataset. @@ -687,12 +710,34 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Sorts our genome aligned data by reference positions, with contigs ordered + * (Java-specific) Sorts our genome aligned data by reference positions, with contigs ordered + * lexicographically. + * + * @param partitions The number of partitions for the new genomic dataset. + * @param storePartitionMap A Boolean flag to determine whether to store the + * partition bounds from the resulting genomic dataset. + * @param storageLevel The level at which to persist the resulting genomic dataset. + * @param stringency The level of ValidationStringency to enforce. + * @return Returns a new genomic dataset containing sorted data. + */ + def sortLexicographically(partitions: java.lang.Integer, + storePartitionMap: java.lang.Boolean, + storageLevel: StorageLevel, + stringency: ValidationStringency)( + implicit tTag: ClassTag[T]): V = { + sortLexicographically(partitions = partitions, + storePartitionMap = storePartitionMap, + storageLevel = storageLevel, + stringency = stringency)(ClassTag.AnyRef.asInstanceOf[ClassTag[T]]) + } + + /** + * (Scala-specific) Sorts our genome aligned data by reference positions, with contigs ordered * lexicographically. * * @param partitions The number of partitions for the new genomic dataset. * @param storePartitionMap A Boolean flag to determine whether to store the - * partition bounds from the resulting genomic dataset. + * partition bounds from the resulting genomic dataset. * @param storageLevel The level at which to persist the resulting genomic dataset. * @param stringency The level of ValidationStringency to enforce. * @return Returns a new genomic dataset containing sorted data. @@ -741,7 +786,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Pipes genomic data to a subprocess that runs in parallel using Spark. + * (Scala-specific) Pipes genomic data to a subprocess that runs in parallel using Spark. * * Files are substituted in to the command with a $x syntax. E.g., to invoke * a command that uses the first file from the files Seq, use $0. To access @@ -905,9 +950,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Pipes genomic data to a subprocess that runs in parallel using Spark. - * - * SparkR friendly variant. + * (R-specific) Pipes genomic data to a subprocess that runs in parallel using Spark. * * @param cmd Command to run. * @param files Files to make locally available to the commands being run. @@ -944,9 +987,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Pipes genomic data to a subprocess that runs in parallel using Spark. - * - * Java/PySpark friendly variant. + * (Java/Python-specific) Pipes genomic data to a subprocess that runs in parallel using Spark. * * @param cmd Command to run. * @param files Files to make locally available to the commands being run. @@ -1035,7 +1076,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Runs a filter that selects data in the underlying RDD that overlaps + * (Scala-specific) Runs a filter that selects data in the underlying RDD that overlaps * several genomic regions. * * @param querys The regions to query for. @@ -1054,8 +1095,8 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Runs a filter that selects data in the underlying RDD that overlaps - * several genomic regions. Java friendly version. + * (Java-specific) Runs a filter that selects data in the underlying RDD that overlaps + * several genomic regions. * * @param querys The regions to query for. * @return Returns a new GenomicDataset containing only data that overlaps the @@ -1083,35 +1124,13 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a broadcast inner join between this genomic dataset and another genomic dataset. - * - * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, - * and broadcast to all the nodes in the cluster. The key equality function - * used for this join is the reference region overlap function. Since this - * is an inner join, all values who do not overlap a value from the other - * genomic dataset are dropped. SparkR friendly version. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space. - */ - def broadcastRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(T, X), (U, Y)] = { - - broadcastRegionJoin(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a broadcast inner join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a broadcast inner join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, * and broadcast to all the nodes in the cluster. The key equality function * used for this join is the reference region overlap function. Since this * is an inner join, all values who do not overlap a value from the other - * genomic dataset are dropped. Python/Java friendly version. + * genomic dataset are dropped. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -1121,7 +1140,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def broadcastRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(T, X), (U, Y)] = { + flankSize: java.lang.Long): GenericGenomicDataset[(T, X), (U, Y)] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -1130,11 +1149,11 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(U, Y)] - broadcastRegionJoin(genomicDataset, flankSize.toLong) + broadcastRegionJoin(genomicDataset, flankSize) } /** - * Performs a broadcast inner join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a broadcast inner join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, * and broadcast to all the nodes in the cluster. The key equality function @@ -1237,7 +1256,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a broadcast right outer join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a broadcast right outer join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, * and broadcast to all the nodes in the cluster. The key equality function @@ -1245,32 +1264,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg * is a right outer join, all values in the left genomic dataset that do not overlap a * value from the right genomic dataset are dropped. If a value from the right genomic dataset does * not overlap any values in the left genomic dataset, it will be paired with a `None` - * in the product of the join. SparkR friendly version. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, and all keys from the - * right genomic dataset that did not overlap a key in the left genomic dataset. - */ - def rightOuterBroadcastRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(Option[T], X), (Option[U], Y)] = { - - rightOuterBroadcastRegionJoin(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a broadcast right outer join between this genomic dataset and another genomic dataset. - * - * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, - * and broadcast to all the nodes in the cluster. The key equality function - * used for this join is the reference region overlap function. Since this - * is a right outer join, all values in the left genomic dataset that do not overlap a - * value from the right genomic dataset are dropped. If a value from the right genomic dataset does - * not overlap any values in the left genomic dataset, it will be paired with a `None` - * in the product of the join. PySpark/Java friendly version. + * in the product of the join. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -1281,7 +1275,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def rightOuterBroadcastRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(Option[T], X), (Option[U], Y)] = { + flankSize: java.lang.Long): GenericGenomicDataset[(Option[T], X), (Option[U], Y)] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] implicit val txTag = ClassTag.AnyRef.asInstanceOf[ClassTag[(Option[T], X)]] @@ -1289,11 +1283,11 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(Option[U], Y)] - rightOuterBroadcastRegionJoin(genomicDataset, flankSize.toLong) + rightOuterBroadcastRegionJoin(genomicDataset, flankSize) } /** - * Performs a broadcast right outer join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a broadcast right outer join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, * and broadcast to all the nodes in the cluster. The key equality function @@ -1405,37 +1399,13 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a broadcast inner join between this genomic dataset and another genomic dataset. - * - * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, - * and broadcast to all the nodes in the cluster. The key equality function - * used for this join is the reference region overlap function. Since this - * is an inner join, all values who do not overlap a value from the other - * genomic dataset are dropped. SparkR friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space. - * - * @see broadcastRegionJoinAgainstAndGroupByRight - */ - def broadcastRegionJoinAndGroupByRight[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(Iterable[T], X), (Seq[U], Y)] = { - - broadcastRegionJoinAndGroupByRight(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a broadcast inner join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a broadcast inner join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, * and broadcast to all the nodes in the cluster. The key equality function * used for this join is the reference region overlap function. Since this * is an inner join, all values who do not overlap a value from the other - * genomic dataset are dropped. PySpark/Java friendly variant. + * genomic dataset are dropped. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -1447,7 +1417,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def broadcastRegionJoinAndGroupByRight[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(Iterable[T], X), (Seq[U], Y)] = { + flankSize: java.lang.Long): GenericGenomicDataset[(Iterable[T], X), (Seq[U], Y)] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -1456,11 +1426,11 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(Seq[U], Y)] - broadcastRegionJoinAndGroupByRight(genomicDataset, flankSize.toLong) + broadcastRegionJoinAndGroupByRight(genomicDataset, flankSize) } /** - * Performs a broadcast inner join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a broadcast inner join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left genomic dataset (this genomic dataset) is collected to the driver, * and broadcast to all the nodes in the cluster. The key equality function @@ -1566,34 +1536,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a broadcast right outer join between this genomic dataset and another genomic dataset. - * - * In a broadcast join, the left side of the join (broadcastTree) is broadcast to - * to all the nodes in the cluster. The key equality function - * used for this join is the reference region overlap function. Since this - * is a right outer join, all values in the left genomic dataset that do not overlap a - * value from the right genomic dataset are dropped. If a value from the right genomic dataset does - * not overlap any values in the left genomic dataset, it will be paired with a `None` - * in the product of the join. SparkR friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, and all keys from the - * right genomic dataset that did not overlap a key in the left genomic dataset. - * - * @see rightOuterBroadcastRegionJoinAgainstAndGroupByRight - */ - def rightOuterBroadcastRegionJoinAndGroupByRight[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(Iterable[T], X), (Seq[U], Y)] = { - - rightOuterBroadcastRegionJoinAndGroupByRight(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a broadcast right outer join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a broadcast right outer join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left side of the join (broadcastTree) is broadcast to * to all the nodes in the cluster. The key equality function @@ -1601,7 +1544,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg * is a right outer join, all values in the left genomic dataset that do not overlap a * value from the right genomic dataset are dropped. If a value from the right genomic dataset does * not overlap any values in the left genomic dataset, it will be paired with a `None` - * in the product of the join. PySpark/Java friendly variant. + * in the product of the join. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -1614,7 +1557,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def rightOuterBroadcastRegionJoinAndGroupByRight[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(Iterable[T], X), (Seq[U], Y)] = { + flankSize: java.lang.Long): GenericGenomicDataset[(Iterable[T], X), (Seq[U], Y)] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -1623,11 +1566,11 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(Seq[U], Y)] - rightOuterBroadcastRegionJoinAndGroupByRight(genomicDataset, flankSize.toLong) + rightOuterBroadcastRegionJoinAndGroupByRight(genomicDataset, flankSize) } /** - * Performs a broadcast right outer join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a broadcast right outer join between this genomic dataset and another genomic dataset. * * In a broadcast join, the left side of the join (broadcastTree) is broadcast to * to all the nodes in the cluster. The key equality function @@ -1773,36 +1716,13 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge inner join between this genomic dataset and another genomic dataset. - * - * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The - * partitions are then zipped, and we do a merge join on each partition. - * The key equality function used for this join is the reference region - * overlap function. Since this is an inner join, all values who do not - * overlap a value from the other genomic dataset are dropped. SparkR friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space. - */ - def shuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(T, X), (U, Y)] = { - - shuffleRegionJoin(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a sort-merge inner join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a sort-merge inner join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. * The key equality function used for this join is the reference region * overlap function. Since this is an inner join, all values who do not - * overlap a value from the other genomic dataset are dropped. PySpark/Java friendly - * variant. + * overlap a value from the other genomic dataset are dropped. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -1812,7 +1732,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def shuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(T, X), (U, Y)] = { + flankSize: java.lang.Long): GenericGenomicDataset[(T, X), (U, Y)] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -1821,7 +1741,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(U, Y)] - shuffleRegionJoin(genomicDataset, flankSize.toLong) + shuffleRegionJoin(genomicDataset, flankSize) } /** @@ -1871,7 +1791,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge inner join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a sort-merge inner join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. @@ -1920,33 +1840,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge right outer join between this genomic dataset and another genomic dataset. - * - * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The - * partitions are then zipped, and we do a merge join on each partition. - * The key equality function used for this join is the reference region - * overlap function. Since this is a right outer join, all values in the - * left genomic dataset that do not overlap a value from the right genomic dataset are dropped. - * If a value from the right genomic dataset does not overlap any values in the left - * genomic dataset, it will be paired with a `None` in the product of the join. SparkR - * friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, and all keys from the - * right genomic dataset that did not overlap a key in the left genomic dataset. - */ - def rightOuterShuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(Option[T], X), (Option[U], Y)] = { - - rightOuterShuffleRegionJoin(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a sort-merge right outer join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a sort-merge right outer join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. @@ -1955,7 +1849,6 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg * left genomic dataset that do not overlap a value from the right genomic dataset are dropped. * If a value from the right genomic dataset does not overlap any values in the left * genomic dataset, it will be paired with a `None` in the product of the join. - * PySpark/Java friendly variant. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -1966,7 +1859,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def rightOuterShuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(Option[T], X), (Option[U], Y)] = { + flankSize: java.lang.Long): GenericGenomicDataset[(Option[T], X), (Option[U], Y)] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -2030,7 +1923,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge right outer join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a sort-merge right outer join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. @@ -2085,33 +1978,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge left outer join between this genomic dataset and another genomic dataset. - * - * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The - * partitions are then zipped, and we do a merge join on each partition. - * The key equality function used for this join is the reference region - * overlap function. Since this is a left outer join, all values in the - * right genomic dataset that do not overlap a value from the left genomic dataset are dropped. - * If a value from the left genomic dataset does not overlap any values in the right - * genomic dataset, it will be paired with a `None` in the product of the join. SparkR - * friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, and all keys from the - * left genomic dataset that did not overlap a key in the right genomic dataset. - */ - def leftOuterShuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(T, Option[X]), (U, Option[Y])] = { - - leftOuterShuffleRegionJoin(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a sort-merge left outer join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a sort-merge left outer join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. @@ -2120,7 +1987,6 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg * right genomic dataset that do not overlap a value from the left genomic dataset are dropped. * If a value from the left genomic dataset does not overlap any values in the right * genomic dataset, it will be paired with a `None` in the product of the join. - * PySpark/Java friendly variant. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -2131,7 +1997,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def leftOuterShuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(T, Option[X]), (U, Option[Y])] = { + flankSize: java.lang.Long): GenericGenomicDataset[(T, Option[X]), (U, Option[Y])] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -2140,7 +2006,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(U, Option[Y])] - leftOuterShuffleRegionJoin(genomicDataset, flankSize.toLong) + leftOuterShuffleRegionJoin(genomicDataset, flankSize) } /** @@ -2193,7 +2059,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge left outer join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a sort-merge left outer join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. @@ -2248,34 +2114,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge left outer join between this genomic dataset and another genomic dataset, - * followed by a groupBy on the left value. - * - * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The - * partitions are then zipped, and we do a merge join on each partition. - * The key equality function used for this join is the reference region - * overlap function. Since this is a left outer join, all values in the - * right genomic dataset that do not overlap a value from the left genomic dataset are dropped. - * If a value from the left genomic dataset does not overlap any values in the right - * genomic dataset, it will be paired with an empty Iterable in the product of the join. - * SparkR friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, and all keys from the - * left genomic dataset that did not overlap a key in the right genomic dataset. - */ - def leftOuterShuffleRegionJoinAndGroupByLeft[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(T, Iterable[X]), (U, Seq[Y])] = { - - leftOuterShuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a sort-merge left outer join between this genomic dataset and another genomic dataset, + * (Java-specific) Performs a sort-merge left outer join between this genomic dataset and another genomic dataset, * followed by a groupBy on the left value. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The @@ -2285,7 +2124,6 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg * right genomic dataset that do not overlap a value from the left genomic dataset are dropped. * If a value from the left genomic dataset does not overlap any values in the right * genomic dataset, it will be paired with an empty Iterable in the product of the join. - * PySpark/Java friendly variant. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -2296,7 +2134,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def leftOuterShuffleRegionJoinAndGroupByLeft[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(T, Iterable[X]), (U, Seq[Y])] = { + flankSize: java.lang.Long): GenericGenomicDataset[(T, Iterable[X]), (U, Seq[Y])] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -2305,7 +2143,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(U, Seq[Y])] - leftOuterShuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize.toLong) + leftOuterShuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize) } /** @@ -2359,7 +2197,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge left outer join between this genomic dataset and another genomic dataset, + * (Scala-specific) Performs a sort-merge left outer join between this genomic dataset and another genomic dataset, * followed by a groupBy on the left value. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The @@ -2416,38 +2254,14 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge full outer join between this genomic dataset and another genomic dataset. + * (Java-specific) Performs a sort-merge full outer join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. * The key equality function used for this join is the reference region * overlap function. Since this is a full outer join, if a value from either * genomic dataset does not overlap any values in the other genomic dataset, it will be paired with - * a `None` in the product of the join. SparkR friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, and values that did not - * overlap will be paired with a `None`. - */ - def fullOuterShuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(Option[T], Option[X]), (Option[U], Option[Y])] = { - - fullOuterShuffleRegionJoin(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a sort-merge full outer join between this genomic dataset and another genomic dataset. - * - * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The - * partitions are then zipped, and we do a merge join on each partition. - * The key equality function used for this join is the reference region - * overlap function. Since this is a full outer join, if a value from either - * genomic dataset does not overlap any values in the other genomic dataset, it will be paired with - * a `None` in the product of the join. PySpark/Java friendly variant. + * a `None` in the product of the join. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -2458,7 +2272,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def fullOuterShuffleRegionJoin[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(Option[T], Option[X]), (Option[U], Option[Y])] = { + flankSize: java.lang.Long): GenericGenomicDataset[(Option[T], Option[X]), (Option[U], Option[Y])] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -2467,7 +2281,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(Option[U], Option[Y])] - fullOuterShuffleRegionJoin(genomicDataset, flankSize.toLong) + fullOuterShuffleRegionJoin(genomicDataset, flankSize) } /** @@ -2519,7 +2333,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge full outer join between this genomic dataset and another genomic dataset. + * (Scala-specific) Performs a sort-merge full outer join between this genomic dataset and another genomic dataset. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. @@ -2572,38 +2386,14 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge inner join between this genomic dataset and another genomic dataset, - * followed by a groupBy on the left value. - * - * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The - * partitions are then zipped, and we do a merge join on each partition. - * The key equality function used for this join is the reference region - * overlap function. In the same operation, we group all values by the left - * item in the genomic dataset. SparkR friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, grouped together by - * the value they overlapped in the left genomic dataset. - */ - def shuffleRegionJoinAndGroupByLeft[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(T, Iterable[X]), (U, Seq[Y])] = { - - shuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a sort-merge inner join between this genomic dataset and another genomic dataset, + * (Java-specific) Performs a sort-merge inner join between this genomic dataset and another genomic dataset, * followed by a groupBy on the left value. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The * partitions are then zipped, and we do a merge join on each partition. * The key equality function used for this join is the reference region * overlap function. In the same operation, we group all values by the left - * item in the genomic dataset. PySpark/Java friendly variant. + * item in the genomic dataset. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -2614,7 +2404,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def shuffleRegionJoinAndGroupByLeft[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(T, Iterable[X]), (U, Seq[Y])] = { + flankSize: java.lang.Long): GenericGenomicDataset[(T, Iterable[X]), (U, Seq[Y])] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -2623,7 +2413,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(U, Seq[Y])] - shuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize.toLong) + shuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize) } /** @@ -2677,7 +2467,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge inner join between this genomic dataset and another genomic dataset, + * (Scala-specific) Performs a sort-merge inner join between this genomic dataset and another genomic dataset, * followed by a groupBy on the left value. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The @@ -2732,34 +2522,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge right outer join between this genomic dataset and another genomic dataset, - * followed by a groupBy on the left value, if not null. - * - * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The - * partitions are then zipped, and we do a merge join on each partition. - * The key equality function used for this join is the reference region - * overlap function. In the same operation, we group all values by the left - * item in the genomic dataset. Since this is a right outer join, all values from the - * right genomic dataset who did not overlap a value from the left genomic dataset are placed into - * a length-1 Iterable with a `None` key. SparkR friendly variant. - * - * @param genomicDataset The right genomic dataset in the join. - * @param flankSize Sets a flankSize for the distance between elements to be - * joined. If set to 0, an overlap is required to join two elements. - * @return Returns a new genomic dataset containing all pairs of keys that - * overlapped in the genomic coordinate space, grouped together by - * the value they overlapped in the left genomic dataset, and all values from the - * right genomic dataset that did not overlap an item in the left genomic dataset. - */ - def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( - genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Double): GenericGenomicDataset[(Option[T], Iterable[X]), (Option[U], Seq[Y])] = { - - rightOuterShuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize.toInt: java.lang.Integer) - } - - /** - * Performs a sort-merge right outer join between this genomic dataset and another genomic dataset, + * (Java-specific) Performs a sort-merge right outer join between this genomic dataset and another genomic dataset, * followed by a groupBy on the left value, if not null. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The @@ -2768,7 +2531,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg * overlap function. In the same operation, we group all values by the left * item in the genomic dataset. Since this is a right outer join, all values from the * right genomic dataset who did not overlap a value from the left genomic dataset are placed into - * a length-1 Iterable with a `None` key. PySpark/Java friendly variant. + * a length-1 Iterable with a `None` key. * * @param genomicDataset The right genomic dataset in the join. * @param flankSize Sets a flankSize for the distance between elements to be @@ -2780,7 +2543,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg */ def rightOuterShuffleRegionJoinAndGroupByLeft[X, Y <: Product, Z <: GenomicDataset[X, Y, Z]]( genomicDataset: GenomicDataset[X, Y, Z], - flankSize: java.lang.Integer): GenericGenomicDataset[(Option[T], Iterable[X]), (Option[U], Seq[Y])] = { + flankSize: java.lang.Long): GenericGenomicDataset[(Option[T], Iterable[X]), (Option[U], Seq[Y])] = { implicit val tTag = ClassTag.AnyRef.asInstanceOf[ClassTag[T]] implicit val xTag = ClassTag.AnyRef.asInstanceOf[ClassTag[X]] @@ -2789,7 +2552,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg implicit val u2Tag: TypeTag[Y] = genomicDataset.uTag implicit val uyTag = typeTag[(Option[U], Seq[Y])] - rightOuterShuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize.toLong) + rightOuterShuffleRegionJoinAndGroupByLeft(genomicDataset, flankSize) } /** @@ -2845,7 +2608,7 @@ trait GenomicDataset[T, U <: Product, V <: GenomicDataset[T, U, V]] extends Logg } /** - * Performs a sort-merge right outer join between this genomic dataset and another genomic dataset, + * (Scala-specific) Performs a sort-merge right outer join between this genomic dataset and another genomic dataset, * followed by a groupBy on the left value, if not null. * * In a sort-merge join, both genomic datasets are co-partitioned and sorted. The @@ -3194,12 +2957,19 @@ case class DatasetBoundGenericGenomicDataset[T, U <: Product]( // this cannot be in the GenericGenomicDataset trait due to need for the // implicit classtag - def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = { + override def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = { DatasetBoundGenericGenomicDataset(tFn(dataset), sequences, converter, tagHolder) } + + override def transformDataset(tFn: JFunction[Dataset[U], Dataset[U]]): GenericGenomicDataset[T, U] = { + DatasetBoundGenericGenomicDataset(tFn.call(dataset), + sequences, + converter, + tagHolder) + } } case class RDDBoundGenericGenomicDataset[T, U <: Product]( @@ -3254,12 +3024,19 @@ case class RDDBoundGenericGenomicDataset[T, U <: Product]( // this cannot be in the GenericGenomicDataset trait due to need for the // implicit classtag - def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = { + override def transformDataset(tFn: Dataset[U] => Dataset[U]): GenericGenomicDataset[T, U] = { DatasetBoundGenericGenomicDataset(tFn(dataset), sequences, converter, tagHolder) } + + override def transformDataset(tFn: JFunction[Dataset[U], Dataset[U]]): GenericGenomicDataset[T, U] = { + DatasetBoundGenericGenomicDataset(tFn.call(dataset), + sequences, + converter, + tagHolder) + } } /** @@ -3513,7 +3290,7 @@ private[rdd] trait VCFSupportingGenomicDataset[T, U <: Product, V <: VCFSupporti } /** - * Adds a VCF header line describing an array format field, with fixed count. + * (Scala-specific) Adds a VCF header line describing an array format field, with fixed count. * * @param id The identifier for the field. * @param count The number of elements in the array. @@ -3529,9 +3306,7 @@ private[rdd] trait VCFSupportingGenomicDataset[T, U <: Product, V <: VCFSupporti } /** - * Adds a VCF header line describing an array format field, with fixed count. - * - * Java friendly variant. + * (Java-specific) Adds a VCF header line describing an array format field, with fixed count. * * @param id The identifier for the field. * @param count The number of elements in the array. @@ -3631,7 +3406,7 @@ private[rdd] trait VCFSupportingGenomicDataset[T, U <: Product, V <: VCFSupporti } /** - * Adds a VCF header line describing an array info field, with fixed count. + * (Scala-specific) Adds a VCF header line describing an array info field, with fixed count. * * @param id The identifier for the field. * @param count The number of elements in the array. @@ -3647,9 +3422,7 @@ private[rdd] trait VCFSupportingGenomicDataset[T, U <: Product, V <: VCFSupporti } /** - * Adds a VCF header line describing an array info field, with fixed count. - * - * Java friendly variant. + * (Java-specific) Adds a VCF header line describing an array info field, with fixed count. * * @param id The identifier for the field. * @param count The number of elements in the array. @@ -3880,7 +3653,7 @@ abstract class AvroGenomicDataset[T <% IndexedRecord: Manifest, U <: Product, V } /** - * Saves this genomic dataset to disk as a Parquet file. + * (Java-specific) Saves this genomic dataset to disk as a Parquet file. * * @param pathName Path to save the file at. * @param blockSize The size in bytes of blocks to write. diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentDataset.scala index 2e98ce6776..8b52eb0f52 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/contig/NucleotideContigFragmentDataset.scala @@ -21,6 +21,7 @@ import com.google.common.base.Splitter import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.converters.FragmentConverter @@ -44,6 +45,7 @@ import org.bdgenomics.utils.interval.array.{ IntervalArraySerializer } import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.math.max import scala.reflect.ClassTag import scala.reflect.runtime.universe._ @@ -243,19 +245,16 @@ sealed abstract class NucleotideContigFragmentDataset extends AvroGenomicDataset ReferenceRegion(elem).toSeq } - /** - * Applies a function that transforms the underlying Dataset into a new Dataset using - * the Spark SQL API. - * - * @param tFn A function that transforms the underlying Dataset as a Dataset. - * @return A new genomic dataset where the Dataset of genomic data has been replaced, but the - * metadata (sequence dictionary, and etc) is copied without modification. - */ - def transformDataset( + override def transformDataset( tFn: Dataset[NucleotideContigFragmentProduct] => Dataset[NucleotideContigFragmentProduct]): NucleotideContigFragmentDataset = { DatasetBoundNucleotideContigFragmentDataset(tFn(dataset), sequences) } + override def transformDataset( + tFn: JFunction[Dataset[NucleotideContigFragmentProduct], Dataset[NucleotideContigFragmentProduct]]): NucleotideContigFragmentDataset = { + DatasetBoundNucleotideContigFragmentDataset(tFn.call(dataset), sequences) + } + /** * Save nucleotide contig fragments as Parquet or FASTA. * @@ -406,8 +405,19 @@ sealed abstract class NucleotideContigFragmentDataset extends AvroGenomicDataset } /** - * From a set of contigs, returns a list of sequences based on reference regions provided - * @param regions List of Reference regions over which to get sequences + * (Java-specific) From a set of contigs, returns a list of sequences based on reference regions provided. + * + * @param regions List of Reference regions over which to get sequences. + * @return JavaRDD[(ReferenceRegion, String)] of region -> sequence pairs. + */ + def extractRegions(regions: java.util.List[ReferenceRegion]): JavaRDD[(ReferenceRegion, String)] = { + extractRegions(asScalaBuffer(regions)).toJavaRDD() + } + + /** + * (Scala-specific) From a set of contigs, returns a list of sequences based on reference regions provided. + * + * @param regions Reference regions over which to get sequences. * @return RDD[(ReferenceRegion, String)] of region -> sequence pairs. */ def extractRegions(regions: Iterable[ReferenceRegion]): RDD[(ReferenceRegion, String)] = { @@ -443,11 +453,9 @@ sealed abstract class NucleotideContigFragmentDataset extends AvroGenomicDataset } /** - * For all adjacent records in the genomic dataset, we extend the records so that the adjacent + * (Java-specific) For all adjacent records in the genomic dataset, we extend the records so that the adjacent * records now overlap by _n_ bases, where _n_ is the flank length. * - * Java friendly variant. - * * @param flankLength The length to extend adjacent records by. * @return Returns the genomic dataset, with all adjacent fragments extended with flanking sequence. */ @@ -458,7 +466,7 @@ sealed abstract class NucleotideContigFragmentDataset extends AvroGenomicDataset } /** - * For all adjacent records in the genomic dataset, we extend the records so that the adjacent + * (Scala-specific) For all adjacent records in the genomic dataset, we extend the records so that the adjacent * records now overlap by _n_ bases, where _n_ is the flank length. * * @param flankLength The length to extend adjacent records by. @@ -472,7 +480,7 @@ sealed abstract class NucleotideContigFragmentDataset extends AvroGenomicDataset } /** - * Counts the k-mers contained in a FASTA contig. + * (Scala-specific) Counts the k-mers contained in a FASTA contig. * * @param kmerLength The length of k-mers to count. * @return Returns an RDD containing k-mer/count pairs. @@ -487,9 +495,7 @@ sealed abstract class NucleotideContigFragmentDataset extends AvroGenomicDataset } /** - * Counts the k-mers contained in a FASTA contig. - * - * Java friendly variant. + * (Java-specific) Counts the k-mers contained in a FASTA contig. * * @param kmerLength The length of k-mers to count. * @return Returns an RDD containing k-mer/count pairs. diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageDataset.scala index 53d6f580ec..34835adb37 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/CoverageDataset.scala @@ -21,6 +21,7 @@ import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.serializers.FieldSerializer import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.models.{ @@ -255,11 +256,16 @@ abstract class CoverageDataset disableDictionaryEncoding) } - def transformDataset( + override def transformDataset( tFn: Dataset[Coverage] => Dataset[Coverage]): CoverageDataset = { DatasetBoundCoverageDataset(tFn(dataset), sequences, samples) } + override def transformDataset( + tFn: JFunction[Dataset[Coverage], Dataset[Coverage]]): CoverageDataset = { + DatasetBoundCoverageDataset(tFn.call(dataset), sequences, samples) + } + /** * Saves coverage as feature file. * @@ -356,11 +362,11 @@ abstract class CoverageDataset def toFeatures(): FeatureDataset /** - * Gets coverage overlapping specified ReferenceRegion. + * (Java-specific) Gets coverage overlapping specified ReferenceRegion. * * For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified * to bin together ReferenceRegions of equal size. The coverage of each bin is - * coverage of the first base pair in that bin. Java friendly variant. + * coverage of the first base pair in that bin. * * @param bpPerBin base pairs per bin, number of bases to combine to one bin. * @return Genomic dataset of Coverage Records. @@ -371,7 +377,8 @@ abstract class CoverageDataset } /** - * Gets coverage overlapping specified ReferenceRegion. + * (Scala-specific) Gets coverage overlapping specified ReferenceRegion. + * * For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified * to bin together ReferenceRegions of equal size. The coverage of each bin is * coverage of the first base pair in that bin. @@ -393,11 +400,11 @@ abstract class CoverageDataset } /** - * Gets coverage overlapping specified ReferenceRegion. + * (Java-specific) Gets coverage overlapping specified ReferenceRegion. * * For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified * to bin together ReferenceRegions of equal size. The coverage of each bin is - * the mean coverage over all base pairs in that bin. Java friendly variant. + * the mean coverage over all base pairs in that bin. * * @param bpPerBin base pairs per bin, number of bases to combine to one bin. * @return Genomic dataset of Coverage Records. @@ -408,7 +415,7 @@ abstract class CoverageDataset } /** - * Gets coverage overlapping specified ReferenceRegion. + * (Scala-specific) Gets coverage overlapping specified ReferenceRegion. * * For large ReferenceRegions, base pairs per bin (bpPerBin) can be specified * to bin together ReferenceRegions of equal size. The coverage of each bin is diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureDataset.scala index 0ce3dd46b8..078248ab67 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/feature/FeatureDataset.scala @@ -22,6 +22,7 @@ import java.util.{ Collections, Comparator } import org.apache.hadoop.fs.{ FileSystem, Path } import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.instrumentation.Timers._ @@ -335,6 +336,11 @@ case class DatasetBoundFeatureDataset private[rdd] ( copy(dataset = tFn(dataset)) } + override def transformDataset( + tFn: JFunction[Dataset[FeatureProduct], Dataset[FeatureProduct]]): FeatureDataset = { + copy(dataset = tFn.call(dataset)) + } + override def replaceSequences(newSequences: SequenceDictionary): FeatureDataset = { copy(sequences = newSequences) } @@ -461,19 +467,16 @@ sealed abstract class FeatureDataset extends AvroGenomicDataset[Feature, Feature iterableDatasets.map(_.samples).fold(samples)(_ ++ _)) } - /** - * Applies a function that transforms the underlying Dataset into a new Dataset using - * the Spark SQL API. - * - * @param tFn A function that transforms the underlying Dataset as a Dataset. - * @return A new FeatureDataset where the Dataset of genomic data has been replaced, but the - * metadata (sequence dictionary, and etc) is copied without modification. - */ - def transformDataset( + override def transformDataset( tFn: Dataset[FeatureProduct] => Dataset[FeatureProduct]): FeatureDataset = { DatasetBoundFeatureDataset(tFn(dataset), sequences, samples) } + override def transformDataset( + tFn: JFunction[Dataset[FeatureProduct], Dataset[FeatureProduct]]): FeatureDataset = { + DatasetBoundFeatureDataset(tFn.call(dataset), sequences, samples) + } + /** * Java friendly save function. Automatically detects the output format. * @@ -540,7 +543,17 @@ sealed abstract class FeatureDataset extends AvroGenomicDataset[Feature, Feature } /** - * Filter this FeatureDataset by feature type to those that match the specified feature types. + * (Java-specific) Filter this FeatureDataset by feature type to those that match the specified feature types. + * + * @param featureType List of feature types to filter by. + * @return FeatureDataset filtered by the specified feature types. + */ + def filterToFeatureTypes(featureTypes: java.util.List[String]): FeatureDataset = { + filterToFeatureTypes(asScalaBuffer(featureTypes)) + } + + /** + * (Scala-specific) Filter this FeatureDataset by feature type to those that match the specified feature types. * * @param featureType Sequence of feature types to filter by. * @return FeatureDataset filtered by the specified feature types. @@ -560,7 +573,17 @@ sealed abstract class FeatureDataset extends AvroGenomicDataset[Feature, Feature } /** - * Filter this FeatureDataset by gene to those that match the specified genes. + * (Java-specific) Filter this FeatureDataset by gene to those that match the specified genes. + * + * @param geneIds List of genes to filter by. + * @return FeatureDataset filtered by the specified genes. + */ + def filterToGenes(geneIds: java.util.List[String]): FeatureDataset = { + filterToGenes(asScalaBuffer(geneIds)) + } + + /** + * (Scala-specific) Filter this FeatureDataset by gene to those that match the specified genes. * * @param geneIds Sequence of genes to filter by. * @return FeatureDataset filtered by the specified genes. @@ -580,7 +603,17 @@ sealed abstract class FeatureDataset extends AvroGenomicDataset[Feature, Feature } /** - * Filter this FeatureDataset by transcript to those that match the specified transcripts. + * (Java-specific) Filter this FeatureDataset by transcript to those that match the specified transcripts. + * + * @param transcriptIds List of transcripts to filter by. + * @return FeatureDataset filtered by the specified transcripts. + */ + def filterToTranscripts(transcriptIds: java.util.List[String]): FeatureDataset = { + filterToTranscripts(asScalaBuffer(transcriptIds)) + } + + /** + * (Scala-specific) Filter this FeatureDataset by transcript to those that match the specified transcripts. * * @param transcriptIds Sequence of transcripts to filter by. * @return FeatureDataset filtered by the specified transcripts. @@ -600,7 +633,17 @@ sealed abstract class FeatureDataset extends AvroGenomicDataset[Feature, Feature } /** - * Filter this FeatureDataset by exon to those that match the specified exons. + * (Java-specific) Filter this FeatureDataset by exon to those that match the specified exons. + * + * @param exonIds List of exons to filter by. + * @return FeatureDataset filtered by the specified exons. + */ + def filterToExons(exonIds: java.util.List[String]): FeatureDataset = { + filterToExons(asScalaBuffer(exonIds)) + } + + /** + * (Scala-specific) Filter this FeatureDataset by exon to those that match the specified exons. * * @param exonIds Sequence of exons to filter by. * @return FeatureDataset filtered by the specified exons. @@ -610,7 +653,17 @@ sealed abstract class FeatureDataset extends AvroGenomicDataset[Feature, Feature } /** - * Filter this FeatureDataset by score. + * (Java-specific) Filter this FeatureDataset by score. + * + * @param minimumScore Minimum score to filter by, inclusive. + * @return FeatureDataset filtered by the specified minimum score. + */ + def filterByScore(minimumScore: java.lang.Double): FeatureDataset = { + filterByScore(minimumScore) + } + + /** + * (Scala-specific) Filter this FeatureDataset by score. * * @param minimumScore Minimum score to filter by, inclusive. * @return FeatureDataset filtered by the specified minimum score. @@ -630,7 +683,17 @@ sealed abstract class FeatureDataset extends AvroGenomicDataset[Feature, Feature } /** - * Filter this FeatureDataset by parent to those that match the specified parents. + * (Java-specific) Filter this FeatureDataset by parent to those that match the specified parents. + * + * @param parentIds List of parents to filter by. + * @return FeatureDataset filtered by the specified parents. + */ + def filterToParents(parentIds: java.util.List[String]): FeatureDataset = { + filterToParents(asScalaBuffer(parentIds)) + } + + /** + * (Scala-specific) Filter this FeatureDataset by parent to those that match the specified parents. * * @param parentIds Sequence of parents to filter by. * @return FeatureDataset filtered by the specified parents. diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentDataset.scala index 1e70ab8498..2122aaf601 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/fragment/FragmentDataset.scala @@ -19,6 +19,7 @@ package org.bdgenomics.adam.rdd.fragment import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.converters.AlignmentRecordConverter @@ -50,6 +51,7 @@ import org.bdgenomics.utils.interval.array.{ } import org.bdgenomics.utils.misc.Logging import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe._ @@ -216,6 +218,11 @@ case class DatasetBoundFragmentDataset private[rdd] ( copy(dataset = tFn(dataset)) } + override def transformDataset( + tFn: JFunction[Dataset[FragmentProduct], Dataset[FragmentProduct]]): FragmentDataset = { + copy(dataset = tFn.call(dataset)) + } + def replaceSequences( newSequences: SequenceDictionary): FragmentDataset = { copy(sequences = newSequences) @@ -300,15 +307,7 @@ sealed abstract class FragmentDataset extends AvroRecordGroupGenomicDataset[Frag iterableDatasets.map(_.processingSteps).fold(processingSteps)(_ ++ _)) } - /** - * Applies a function that transforms the underlying Dataset into a new Dataset using - * the Spark SQL API. - * - * @param tFn A function that transforms the underlying Dataset as a Dataset. - * @return A new genomic dataset where the Dataset of genomic data has been replaced, but the - * metadata (sequence dictionary, and etc) is copied without modification. - */ - def transformDataset( + override def transformDataset( tFn: Dataset[FragmentProduct] => Dataset[FragmentProduct]): FragmentDataset = { DatasetBoundFragmentDataset(tFn(dataset), sequences, @@ -316,6 +315,14 @@ sealed abstract class FragmentDataset extends AvroRecordGroupGenomicDataset[Frag processingSteps) } + override def transformDataset( + tFn: JFunction[Dataset[FragmentProduct], Dataset[FragmentProduct]]): FragmentDataset = { + DatasetBoundFragmentDataset(tFn.call(dataset), + sequences, + recordGroups, + processingSteps) + } + /** * Essentially, splits up the reads in a Fragment. * @@ -354,7 +361,21 @@ sealed abstract class FragmentDataset extends AvroRecordGroupGenomicDataset[Frag } /** - * Rewrites the quality scores of fragments to place all quality scores in bins. + * (Java-specific) Rewrites the quality scores of fragments to place all quality scores in bins. + * + * Quality score binning maps all quality scores to a limited number of + * discrete values, thus reducing the entropy of the quality score + * distribution, and reducing the amount of space that fragments consume on disk. + * + * @param bins The bins to use. + * @return Fragments whose quality scores are binned. + */ + def binQualityScores(bins: java.util.List[QualityScoreBin]): FragmentDataset = { + binQualityScores(asScalaBuffer(bins)) + } + + /** + * (Scala-specific) Rewrites the quality scores of fragments to place all quality scores in bins. * * Quality score binning maps all quality scores to a limited number of * discrete values, thus reducing the entropy of the quality score diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordDataset.scala index 2edb9f75d2..215dc08bcc 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/read/AlignmentRecordDataset.scala @@ -28,6 +28,7 @@ import org.apache.hadoop.io.LongWritable import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext import org.apache.spark.api.java.JavaRDD +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD @@ -62,6 +63,7 @@ import org.bdgenomics.utils.interval.array.{ } import org.seqdoop.hadoop_bam._ import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.math.{ abs, min } import scala.reflect.ClassTag @@ -274,6 +276,11 @@ case class DatasetBoundAlignmentRecordDataset private[rdd] ( copy(dataset = tFn(dataset)) } + override def transformDataset( + tFn: JFunction[Dataset[AlignmentRecordProduct], Dataset[AlignmentRecordProduct]]): AlignmentRecordDataset = { + copy(dataset = tFn.call(dataset)) + } + def replaceSequences( newSequences: SequenceDictionary): AlignmentRecordDataset = { copy(sequences = newSequences) @@ -389,15 +396,7 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas @transient val uTag: TypeTag[AlignmentRecordProduct] = typeTag[AlignmentRecordProduct] - /** - * Applies a function that transforms the underlying Dataset into a new Dataset using - * the Spark SQL API. - * - * @param tFn A function that transforms the underlying Dataset as a Dataset. - * @return A new genomic dataset where the Dataset of genomic data has been replaced, but the - * metadata (sequence dictionary, and etc) is copied without modification. - */ - def transformDataset( + override def transformDataset( tFn: Dataset[AlignmentRecordProduct] => Dataset[AlignmentRecordProduct]): AlignmentRecordDataset = { DatasetBoundAlignmentRecordDataset(dataset, sequences, @@ -406,6 +405,15 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas .transformDataset(tFn) } + override def transformDataset( + tFn: JFunction[Dataset[AlignmentRecordProduct], Dataset[AlignmentRecordProduct]]): AlignmentRecordDataset = { + DatasetBoundAlignmentRecordDataset(dataset, + sequences, + recordGroups, + processingSteps) + .transformDataset(tFn) + } + /** * Replaces the underlying RDD and SequenceDictionary and emits a new object. * @@ -701,9 +709,7 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer. - * - * Java friendly variant. + * (Java-specific) Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer. * * @param kmerLength The value of _k_ to use for cutting _k_-mers. * @return Returns an RDD containing k-mer/count pairs. @@ -717,7 +723,7 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer. + * (Scala-specific) Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer. * * @param kmerLength The value of _k_ to use for cutting _k_-mers. * @return Returns an RDD containing k-mer/count pairs. @@ -732,12 +738,31 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer. + * (Java-specific) Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer. * * @param kmerLength The value of _k_ to use for cutting _k_-mers. * @return Returns a Dataset containing k-mer/count pairs. */ - def countKmersAsDataset(kmerLength: java.lang.Integer): Dataset[(String, Long)] = { + def countKmersAsDataset(kmerLength: java.lang.Integer): Dataset[(String, java.lang.Long)] = { + import dataset.sqlContext.implicits._ + val kmers = dataset.select($"sequence".as[String]) + .flatMap(_.sliding(kmerLength)) + .as[String] + + kmers.toDF() + .groupBy($"value") + .count() + .select($"value".as("kmer"), $"count".as("count")) + .as[(String, java.lang.Long)] + } + + /** + * (Scala-specific) Cuts reads into _k_-mers, and then counts the number of occurrences of each _k_-mer. + * + * @param kmerLength The value of _k_ to use for cutting _k_-mers. + * @return Returns a Dataset containing k-mer/count pairs. + */ + def countKmersAsDataset(kmerLength: Int): Dataset[(String, Long)] = { import dataset.sqlContext.implicits._ val kmers = dataset.select($"sequence".as[String]) .flatMap(_.sliding(kmerLength)) @@ -1033,11 +1058,9 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Runs base quality score recalibration on a set of reads. Uses a table of + * (Java-specific) Runs base quality score recalibration on a set of reads. Uses a table of * known SNPs to mask true variation during the recalibration process. * - * Java friendly variant. - * * @param knownSnps A table of known SNPs to mask valid variants. * @param minAcceptableQuality The minimum quality score to recalibrate. * @param storageLevel An optional storage level to set for the output @@ -1057,7 +1080,36 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Runs base quality score recalibration on a set of reads. Uses a table of + * (Java-specific) Runs base quality score recalibration on a set of reads. Uses a table of + * known SNPs to mask true variation during the recalibration process. + * + * @param knownSnps A table of known SNPs to mask valid variants. + * @param minAcceptableQuality The minimum quality score to recalibrate. + * @param storageLevel Storage level to set for the output + * of the first stage of BQSR. Set to null to omit. + * @param samplingFraction Fraction of reads to sample when + * generating the covariate table. + * @param samplingSeed Seed to provide if downsampling reads. + * @return Returns a genomic dataset of recalibrated reads. + */ + def recalibrateBaseQualities( + knownSnps: VariantDataset, + minAcceptableQuality: java.lang.Integer, + storageLevel: StorageLevel, + samplingFraction: java.lang.Double, + samplingSeed: java.lang.Long): AlignmentRecordDataset = { + val snpTable = SnpTable(knownSnps) + val bcastSnps = rdd.context.broadcast(snpTable) + val sMinQual: Int = minAcceptableQuality + recalibrateBaseQualities(bcastSnps, + minAcceptableQuality = sMinQual, + optStorageLevel = Option(storageLevel), + optSamplingFraction = Option(samplingFraction), + optSamplingSeed = Option(samplingSeed)) + } + + /** + * (Scala-specific) Runs base quality score recalibration on a set of reads. Uses a table of * known SNPs to mask true variation during the recalibration process. * * @param knownSnps A table of known SNPs to mask valid variants. @@ -1103,7 +1155,7 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas */ def realignIndels(referenceFile: ReferenceFile): AlignmentRecordDataset = { realignIndels(consensusModel = new ConsensusGeneratorFromReads, - optReferenceFile = Some(referenceFile) + optReferenceFile = Option(referenceFile) ) } @@ -1233,7 +1285,30 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Computes the mismatching positions field (SAM "MD" tag). + * (Java-specific) Computes the mismatching positions field (SAM "MD" tag). + * + * @param referenceFile A reference file that can be broadcast to all nodes. + * @param overwriteExistingTags If true, overwrites the MD tags on reads where + * it is already populated. If false, we only tag reads that are currently + * missing an MD tag. + * @param validationStringency If we are recalculating existing tags and we + * find that the MD tag that was previously on the read doesn't match our + * new tag, LENIENT will log a warning message, STRICT will throw an + * exception, and SILENT will ignore. + * @return Returns a new AlignmentRecordDataset where all reads have the + * mismatchingPositions field populated. + */ + def computeMismatchingPositions( + referenceFile: ReferenceFile, + overwriteExistingTags: java.lang.Boolean, + validationStringency: ValidationStringency): AlignmentRecordDataset = { + computeMismatchingPositions(referenceFile = referenceFile, + overwriteExistingTags = overwriteExistingTags, + validationStringency = validationStringency) + } + + /** + * (Scala-specific) Computes the mismatching positions field (SAM "MD" tag). * * @param referenceFile A reference file that can be broadcast to all nodes. * @param overwriteExistingTags If true, overwrites the MD tags on reads where @@ -1275,10 +1350,10 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Saves these AlignmentRecords to two FASTQ files. + * (Java-specific) Saves these AlignmentRecords to two FASTQ files. * * The files are one for the first mate in each pair, and the other for the - * second mate in the pair. Java friendly variant. + * second mate in the pair. * * @param fileName1 Path at which to save a FASTQ file containing the first * mate of each pair. @@ -1454,9 +1529,7 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Saves reads in FASTQ format. - * - * Java friendly variant. + * (Java-specific) Saves reads in FASTQ format. * * @param fileName Path to save files at. * @param outputOriginalBaseQualities If true, writes out reads with the base @@ -1556,8 +1629,8 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Reassembles read pairs from two sets of unpaired reads. The assumption is that the two sets - * were _originally_ paired together. Java friendly variant. + * (Java-specific) Reassembles read pairs from two sets of unpaired reads. The assumption is that the two sets + * were _originally_ paired together. * * @note The RDD that this is called on should be the RDD with the first read from the pair. * @param secondPairRdd The rdd containing the second read from the pairs. @@ -1572,7 +1645,7 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Reassembles read pairs from two sets of unpaired reads. The assumption is that the two sets + * (Scala-specific) Reassembles read pairs from two sets of unpaired reads. The assumption is that the two sets * were _originally_ paired together. * * @note The RDD that this is called on should be the RDD with the first read from the pair. @@ -1639,7 +1712,21 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Rewrites the quality scores of reads to place all quality scores in bins. + * (Java-specific) Rewrites the quality scores of reads to place all quality scores in bins. + * + * Quality score binning maps all quality scores to a limited number of + * discrete values, thus reducing the entropy of the quality score + * distribution, and reducing the amount of space that reads consume on disk. + * + * @param bins The bins to use. + * @return Reads whose quality scores are binned. + */ + def binQualityScores(bins: java.util.List[QualityScoreBin]): AlignmentRecordDataset = { + binQualityScores(asScalaBuffer(bins)) + } + + /** + * (Scala-specific) Rewrites the quality scores of reads to place all quality scores in bins. * * Quality score binning maps all quality scores to a limited number of * discrete values, thus reducing the entropy of the quality score @@ -1682,7 +1769,17 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Filter this AlignmentRecordDataset by mapping quality. + * (Java-specific) Filter this AlignmentRecordDataset by mapping quality. + * + * @param minimumMapq Minimum mapping quality to filter by, inclusive. + * @return AlignmentRecordDataset filtered by mapping quality. + */ + def filterByMapq(minimumMapq: java.lang.Integer): AlignmentRecordDataset = { + filterByMapq(minimumMapq) + } + + /** + * (Scala-specific) Filter this AlignmentRecordDataset by mapping quality. * * @param minimumMapq Minimum mapping quality to filter by, inclusive. * @return AlignmentRecordDataset filtered by mapping quality. @@ -1738,7 +1835,17 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Filter this AlignmentRecordDataset by record group to those that match the specified record groups. + * (Java-specific) Filter this AlignmentRecordDataset by record group to those that match the specified record groups. + * + * @param recordGroupNames List of record groups to filter by. + * @return AlignmentRecordDataset filtered by one or more record groups. + */ + def filterToRecordGroups(recordGroupNames: java.util.List[String]): AlignmentRecordDataset = { + filterToRecordGroups(asScalaBuffer(recordGroupNames)) + } + + /** + * (Scala-specific) Filter this AlignmentRecordDataset by record group to those that match the specified record groups. * * @param recordGroupNames Sequence of record groups to filter by. * @return AlignmentRecordDataset filtered by one or more record groups. @@ -1758,7 +1865,17 @@ sealed abstract class AlignmentRecordDataset extends AvroRecordGroupGenomicDatas } /** - * Filter this AlignmentRecordDataset by sample to those that match the specified samples. + * (Java-specific) Filter this AlignmentRecordDataset by sample to those that match the specified samples. + * + * @param recordGroupSamples List of samples to filter by. + * @return AlignmentRecordDataset filtered by the specified samples. + */ + def filterToSamples(recordGroupSamples: java.util.List[String]): AlignmentRecordDataset = { + filterToSamples(asScalaBuffer(recordGroupSamples)) + } + + /** + * (Scala-specific) Filter this AlignmentRecordDataset by sample to those that match the specified samples. * * @param recordGroupSamples Sequence of samples to filter by. * @return AlignmentRecordDataset filtered by the specified samples. diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeDataset.scala index 6cb4d94d13..811a21d085 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/GenotypeDataset.scala @@ -21,6 +21,7 @@ import htsjdk.variant.vcf.{ VCFHeader, VCFHeaderLine } import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.converters.DefaultHeaderLines @@ -185,6 +186,11 @@ case class DatasetBoundGenotypeDataset private[rdd] ( copy(dataset = tFn(dataset)) } + override def transformDataset( + tFn: JFunction[Dataset[GenotypeProduct], Dataset[GenotypeProduct]]): GenotypeDataset = { + copy(dataset = tFn.call(dataset)) + } + def replaceSequences( newSequences: SequenceDictionary): GenotypeDataset = { copy(sequences = newSequences) @@ -316,19 +322,16 @@ sealed abstract class GenotypeDataset extends MultisampleAvroGenomicDataset[Geno IntervalArray(rdd, GenotypeArray.apply(_, _)) } - /** - * Applies a function that transforms the underlying Dataset into a new Dataset using - * the Spark SQL API. - * - * @param tFn A function that transforms the underlying Dataset as a Dataset. - * @return A new genomic dataset where the Dataset of genomic data has been replaced, but the - * metadata (sequence dictionary, and etc) is copied without modification. - */ - def transformDataset( + override def transformDataset( tFn: Dataset[GenotypeProduct] => Dataset[GenotypeProduct]): GenotypeDataset = { DatasetBoundGenotypeDataset(tFn(dataset), sequences, samples, headerLines) } + override def transformDataset( + tFn: JFunction[Dataset[GenotypeProduct], Dataset[GenotypeProduct]]): GenotypeDataset = { + DatasetBoundGenotypeDataset(tFn.call(dataset), sequences, samples, headerLines) + } + /** * @return Returns this GenotypeDataset squared off as a VariantContextDataset. */ @@ -420,7 +423,17 @@ sealed abstract class GenotypeDataset extends MultisampleAvroGenomicDataset[Geno } /** - * Filter this GenotypeDataset by quality (VCF FORMAT field "GQ"). + * (Java-specific) Filter this GenotypeDataset by quality (VCF FORMAT field "GQ"). + * + * @param minimumQuality Minimum quality to filter by, inclusive. + * @return GenotypeDataset filtered by quality. + */ + def filterByQuality(minimumQuality: java.lang.Double): GenotypeDataset = { + filterByQuality(minimumQuality) + } + + /** + * (Scala-specific) Filter this GenotypeDataset by quality (VCF FORMAT field "GQ"). * * @param minimumQuality Minimum quality to filter by, inclusive. * @return GenotypeDataset filtered by quality. @@ -430,7 +443,17 @@ sealed abstract class GenotypeDataset extends MultisampleAvroGenomicDataset[Geno } /** - * Filter this GenotypeDataset by read depth (VCF FORMAT field "DP"). + * (Java-specific) Filter this GenotypeDataset by read depth (VCF FORMAT field "DP"). + * + * @param minimumReadDepth Minimum read depth to filter by, inclusive. + * @return GenotypeDataset filtered by read depth. + */ + def filterByReadDepth(minimumReadDepth: java.lang.Integer): GenotypeDataset = { + filterByReadDepth(minimumReadDepth) + } + + /** + * (Scala-specific) Filter this GenotypeDataset by read depth (VCF FORMAT field "DP"). * * @param minimumReadDepth Minimum read depth to filter by, inclusive. * @return GenotypeDataset filtered by read depth. @@ -440,7 +463,17 @@ sealed abstract class GenotypeDataset extends MultisampleAvroGenomicDataset[Geno } /** - * Filter this GenotypeDataset by alternate read depth (VCF FORMAT field "AD"). + * (Java-specific) Filter this GenotypeDataset by alternate read depth (VCF FORMAT field "AD"). + * + * @param minimumAlternateReadDepth Minimum alternate read depth to filter by, inclusive. + * @return GenotypeDataset filtered by alternate read depth. + */ + def filterByAlternateReadDepth(minimumAlternateReadDepth: java.lang.Integer): GenotypeDataset = { + filterByAlternateReadDepth(minimumAlternateReadDepth) + } + + /** + * (Scala-specific) Filter this GenotypeDataset by alternate read depth (VCF FORMAT field "AD"). * * @param minimumAlternateReadDepth Minimum alternate read depth to filter by, inclusive. * @return GenotypeDataset filtered by alternate read depth. @@ -450,7 +483,17 @@ sealed abstract class GenotypeDataset extends MultisampleAvroGenomicDataset[Geno } /** - * Filter this GenotypeDataset by reference read depth (VCF FORMAT field "AD"). + * (Java-specific) Filter this GenotypeDataset by reference read depth (VCF FORMAT field "AD"). + * + * @param minimumReferenceReadDepth Minimum reference read depth to filter by, inclusive. + * @return GenotypeDataset filtered by reference read depth. + */ + def filterByReferenceReadDepth(minimumReferenceReadDepth: java.lang.Integer): GenotypeDataset = { + filterByReferenceReadDepth(minimumReferenceReadDepth) + } + + /** + * (Scala-specific) Filter this GenotypeDataset by reference read depth (VCF FORMAT field "AD"). * * @param minimumReferenceReadDepth Minimum reference read depth to filter by, inclusive. * @return GenotypeDataset filtered by reference read depth. @@ -470,7 +513,17 @@ sealed abstract class GenotypeDataset extends MultisampleAvroGenomicDataset[Geno } /** - * Filter this GenotypeDataset by sample to those that match the specified samples. + * (Java-specific) Filter this GenotypeDataset by sample to those that match the specified samples. + * + * @param sampleIds List of samples to filter by. + * return GenotypeDataset filtered by one or more samples. + */ + def filterToSamples(sampleIds: java.util.List[String]): GenotypeDataset = { + filterToSamples(asScalaBuffer(sampleIds)) + } + + /** + * (Scala-specific) Filter this GenotypeDataset by sample to those that match the specified samples. * * @param sampleIds Sequence of samples to filter by. * return GenotypeDataset filtered by one or more samples. diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextDataset.scala index 08a8df1ad3..e1c7973280 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantContextDataset.scala @@ -34,6 +34,7 @@ import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.converters.{ @@ -142,6 +143,11 @@ case class DatasetBoundVariantContextDataset private[rdd] ( copy(dataset = tFn(dataset)) } + override def transformDataset( + tFn: JFunction[Dataset[VariantContextProduct], Dataset[VariantContextProduct]]): VariantContextDataset = { + copy(dataset = tFn.call(dataset)) + } + def replaceSequences( newSequences: SequenceDictionary): VariantContextDataset = { copy(sequences = newSequences) @@ -227,11 +233,16 @@ sealed abstract class VariantContextDataset extends MultisampleGenomicDataset[Va saveMetadata(pathName) } - def transformDataset( + override def transformDataset( tFn: Dataset[VariantContextProduct] => Dataset[VariantContextProduct]): VariantContextDataset = { DatasetBoundVariantContextDataset(tFn(dataset), sequences, samples, headerLines) } + override def transformDataset( + tFn: JFunction[Dataset[VariantContextProduct], Dataset[VariantContextProduct]]): VariantContextDataset = { + DatasetBoundVariantContextDataset(tFn.call(dataset), sequences, samples, headerLines) + } + /** * Replaces the header lines attached to this genomic dataset. * diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantDataset.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantDataset.scala index 22ee6b904a..7a58ee96cc 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantDataset.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/variant/VariantDataset.scala @@ -21,6 +21,7 @@ import htsjdk.variant.vcf.{ VCFHeader, VCFHeaderLine } import org.apache.hadoop.fs.Path import org.apache.parquet.hadoop.metadata.CompressionCodecName import org.apache.spark.SparkContext +import org.apache.spark.api.java.function.{ Function => JFunction } import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SQLContext } import org.bdgenomics.adam.converters.DefaultHeaderLines @@ -166,6 +167,11 @@ case class DatasetBoundVariantDataset private[rdd] ( copy(dataset = tFn(dataset)) } + override def transformDataset( + tFn: JFunction[Dataset[VariantProduct], Dataset[VariantProduct]]): VariantDataset = { + copy(dataset = tFn.call(dataset)) + } + def replaceSequences( newSequences: SequenceDictionary): VariantDataset = { copy(sequences = newSequences) @@ -280,19 +286,16 @@ sealed abstract class VariantDataset extends AvroGenomicDataset[Variant, Variant (headerLines ++ iterableRdds.flatMap(_.headerLines)).distinct) } - /** - * Applies a function that transforms the underlying RDD into a new RDD using - * the Spark SQL API. - * - * @param tFn A function that transforms the underlying RDD as a Dataset. - * @return A new RDD where the RDD of genomic data has been replaced, but the - * metadata (sequence dictionary, and etc) is copied without modification. - */ - def transformDataset( + override def transformDataset( tFn: Dataset[VariantProduct] => Dataset[VariantProduct]): VariantDataset = { DatasetBoundVariantDataset(tFn(dataset), sequences, headerLines) } + override def transformDataset( + tFn: JFunction[Dataset[VariantProduct], Dataset[VariantProduct]]): VariantDataset = { + DatasetBoundVariantDataset(tFn.call(dataset), sequences, headerLines) + } + /** * @return Returns this VariantDataset as a VariantContextDataset. */ @@ -314,7 +317,18 @@ sealed abstract class VariantDataset extends AvroGenomicDataset[Variant, Variant } /** - * Filter this VariantDataset by quality (VCF column 6 "QUAL"). Variants split + * (Java-specific) Filter this VariantDataset by quality (VCF column 6 "QUAL"). Variants split + * for multi-allelic sites will also be filtered out. + * + * @param minimumQuality Minimum quality to filter by, inclusive. + * @return VariantDataset filtered by quality. + */ + def filterByQuality(minimumQuality: java.lang.Double): VariantDataset = { + filterByQuality(minimumQuality) + } + + /** + * (Scala-specific) Filter this VariantDataset by quality (VCF column 6 "QUAL"). Variants split * for multi-allelic sites will also be filtered out. * * @param minimumQuality Minimum quality to filter by, inclusive. @@ -325,7 +339,19 @@ sealed abstract class VariantDataset extends AvroGenomicDataset[Variant, Variant } /** - * Filter this VariantDataset by total read depth (VCF INFO reserved key AD, Number=R, + * (Java-specific) Filter this VariantDataset by total read depth (VCF INFO reserved key AD, Number=R, + * split for multi-allelic sites into single integer values for the reference allele + * (filterByReferenceReadDepth) and the alternate allele (this method)). + * + * @param minimumReadDepth Minimum total read depth to filter by, inclusive. + * @return VariantDataset filtered by total read depth. + */ + def filterByReadDepth(minimumReadDepth: java.lang.Integer): VariantDataset = { + filterByReadDepth(minimumReadDepth) + } + + /** + * (Scala-specific) Filter this VariantDataset by total read depth (VCF INFO reserved key AD, Number=R, * split for multi-allelic sites into single integer values for the reference allele * (filterByReferenceReadDepth) and the alternate allele (this method)). * @@ -337,7 +363,19 @@ sealed abstract class VariantDataset extends AvroGenomicDataset[Variant, Variant } /** - * Filter this VariantDataset by reference total read depth (VCF INFO reserved key AD, Number=R, + * (Java-specific) Filter this VariantDataset by reference total read depth (VCF INFO reserved key AD, Number=R, + * split for multi-allelic sites into single integer values for the alternate allele + * (filterByReadDepth) and the reference allele (this method)). + * + * @param minimumReferenceReadDepth Minimum reference total read depth to filter by, inclusive. + * @return VariantDataset filtered by reference total read depth. + */ + def filterByReferenceReadDepth(minimumReferenceReadDepth: java.lang.Integer): VariantDataset = { + filterByReferenceReadDepth(minimumReferenceReadDepth) + } + + /** + * (Scala-specific) Filter this VariantDataset by reference total read depth (VCF INFO reserved key AD, Number=R, * split for multi-allelic sites into single integer values for the alternate allele * (filterByReadDepth) and the reference allele (this method)). *