diff --git a/src/main/scala/is/hail/HailContext.scala b/src/main/scala/is/hail/HailContext.scala index 533b392188b..9fe6b1d139c 100644 --- a/src/main/scala/is/hail/HailContext.scala +++ b/src/main/scala/is/hail/HailContext.scala @@ -12,7 +12,7 @@ import is.hail.io.gen.GenLoader import is.hail.io.plink.{FamFileConfig, PlinkLoader} import is.hail.io.vcf._ import is.hail.keytable.KeyTable -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.OrderedRVD import is.hail.stats.{BaldingNicholsModel, Distribution, UniformDist} import is.hail.utils.{log, _} import is.hail.variant.{GenericDataset, GenomeReference, Genotype, HTSGenotypeView, Locus, VSMFileMetadata, VSMSubgen, Variant, VariantDataset, VariantSampleMatrix} diff --git a/src/main/scala/is/hail/annotations/WritableRegionValue.scala b/src/main/scala/is/hail/annotations/WritableRegionValue.scala new file mode 100644 index 00000000000..dc3c90e9d3f --- /dev/null +++ b/src/main/scala/is/hail/annotations/WritableRegionValue.scala @@ -0,0 +1,54 @@ +package is.hail.annotations + +import is.hail.expr.{TStruct, Type} + +object WritableRegionValue { + def apply(t: Type, initial: RegionValue): WritableRegionValue = + WritableRegionValue(t, initial.region, initial.offset) + + def apply(t: Type, initialRegion: MemoryBuffer, initialOffset: Long): WritableRegionValue = { + val wrv = WritableRegionValue(t) + wrv.set(initialRegion, initialOffset) + wrv + } + + def apply(t: Type): WritableRegionValue = { + val region = MemoryBuffer() + new WritableRegionValue(t, region, new RegionValueBuilder(region), RegionValue(region, 0)) + } +} + +class WritableRegionValue(val t: Type, + val region: MemoryBuffer, + rvb: RegionValueBuilder, + val value: RegionValue) { + + def offset: Long = value.offset + + def setSelect(fromT: TStruct, toFromFieldIdx: Array[Int], fromRV: RegionValue) { + (t: @unchecked) match { + case t: TStruct => + region.clear() + rvb.start(t) + rvb.startStruct() + var i = 0 + while (i < t.size) { + rvb.addField(fromT, fromRV, toFromFieldIdx(i)) + i += 1 + } + rvb.endStruct() + value.setOffset(rvb.end()) + } + } + + def set(rv: RegionValue): Unit = set(rv.region, rv.offset) + + def set(fromRegion: MemoryBuffer, fromOffset: Long) { + region.clear() + rvb.start(t) + rvb.addRegionValue(t, fromRegion, fromOffset) + value.setOffset(rvb.end()) + } + + def pretty: String = value.pretty(t) +} \ No newline at end of file diff --git a/src/main/scala/is/hail/expr/Relational.scala b/src/main/scala/is/hail/expr/Relational.scala index b35fcb8a928..dad53f0d695 100644 --- a/src/main/scala/is/hail/expr/Relational.scala +++ b/src/main/scala/is/hail/expr/Relational.scala @@ -4,6 +4,7 @@ import is.hail.HailContext import is.hail.annotations._ import is.hail.methods.Aggregators import is.hail.sparkextras._ +import is.hail.rvd.{OrderedRVD, OrderedRVPartitioner, OrderedRVType} import is.hail.variant.{VSMFileMetadata, VSMLocalValue, VSMMetadata} import org.apache.spark.rdd.RDD import org.apache.spark.sql.Row @@ -40,15 +41,15 @@ case class MatrixType( "va" -> vaType, "gs" -> !TArray(genotypeType)) - def orderedRDD2Type: OrderedRDD2Type = { - new OrderedRDD2Type(Array("pk"), + def orderedRVType: OrderedRVType = { + new OrderedRVType(Array("pk"), Array("pk", "v"), rowType) } - def pkType: TStruct = orderedRDD2Type.pkType + def pkType: TStruct = orderedRVType.pkType - def kType: TStruct = orderedRDD2Type.kType + def kType: TStruct = orderedRVType.kType def sampleEC: EvalContext = { val aggregationST = Map( @@ -178,10 +179,10 @@ object MatrixValue { val localNSamples = localValue.nSamples val rangeBoundsType = TArray(typ.pkType) new MatrixValue(typ, localValue, - OrderedRDD2(typ.orderedRDD2Type, - new OrderedPartitioner2(rdd.orderedPartitioner.numPartitions, - typ.orderedRDD2Type.partitionKey, - typ.orderedRDD2Type.kType, + OrderedRVD(typ.orderedRVType, + new OrderedRVPartitioner(rdd.orderedPartitioner.numPartitions, + typ.orderedRVType.partitionKey, + typ.orderedRVType.kType, UnsafeIndexedSeq(rangeBoundsType, rdd.orderedPartitioner.rangeBounds.map(b => Row(b)))), rdd.mapPartitions { it => @@ -216,10 +217,10 @@ object MatrixValue { case class MatrixValue( typ: MatrixType, localValue: VSMLocalValue, - rdd2: OrderedRDD2) { + rdd2: OrderedRVD) { def rdd: OrderedRDD[Annotation, Annotation, (Annotation, Iterable[Annotation])] = { - warn("converting OrderedRDD2 => OrderedRDD") + warn("converting OrderedRVD => OrderedRDD") implicit val kOk: OrderedKey[Annotation, Annotation] = typ.vType.orderedKey @@ -239,10 +240,10 @@ case class MatrixValue( ur.getAs[IndexedSeq[Annotation]](3): Iterable[Annotation])) }, OrderedPartitioner( - rdd2.orderedPartitioner.rangeBounds.map { b => + rdd2.partitioner.rangeBounds.map { b => b.asInstanceOf[Row].get(0) }.toArray(kOk.pkct), - rdd2.orderedPartitioner.numPartitions)) + rdd2.partitioner.numPartitions)) } def copyRDD(typ: MatrixType = typ, @@ -280,7 +281,7 @@ case class MatrixValue( localValue.copy( sampleIds = keep.map(sampleIds), sampleAnnotations = keep.map(sampleAnnotations)), - rdd2 = rdd2.mapPartitionsPreservesPartitioning { it => + rdd2 = rdd2.mapPartitionsPreservesPartitioning(typ.orderedRVType) { it => var rv2b = new RegionValueBuilder() var rv2 = RegionValue() @@ -392,16 +393,16 @@ case class MatrixRead( val rdd = if (dropVariants) - OrderedRDD2.empty(hc.sc, typ.orderedRDD2Type) + OrderedRVD.empty(hc.sc, typ.orderedRVType) else { - var rdd = OrderedRDD2( - typ.orderedRDD2Type, - OrderedPartitioner2(hc.sc, + var rdd = OrderedRVD( + typ.orderedRVType, + OrderedRVPartitioner(hc.sc, hc.hadoopConf.readFile(path + "/partitioner.json.gz")(JsonMethods.parse(_))), hc.readRows(path, typ.rowType, nPartitions)) if (dropSamples) { val localRowType = typ.rowType - rdd = rdd.mapPartitionsPreservesPartitioning { it => + rdd = rdd.mapPartitionsPreservesPartitioning(typ.orderedRVType) { it => var rv2b = new RegionValueBuilder() var rv2 = RegionValue() diff --git a/src/main/scala/is/hail/io/LoadMatrix.scala b/src/main/scala/is/hail/io/LoadMatrix.scala index 1d4a47df1a4..d800a091722 100644 --- a/src/main/scala/is/hail/io/LoadMatrix.scala +++ b/src/main/scala/is/hail/io/LoadMatrix.scala @@ -3,7 +3,7 @@ package is.hail.io import is.hail.HailContext import is.hail.annotations._ import is.hail.expr._ -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.OrderedRVD import is.hail.utils._ import is.hail.variant._ import org.apache.hadoop.conf.Configuration @@ -311,6 +311,6 @@ object LoadMatrix { VSMLocalValue(Annotation.empty, sampleIds, Annotation.emptyIndexedSeq(sampleIds.length)), - OrderedRDD2(matrixType.orderedRDD2Type, rdd, Some(rowKeys), None)) + OrderedRVD(matrixType.orderedRVType, rdd, Some(rowKeys), None)) } } diff --git a/src/main/scala/is/hail/io/bgen/BgenLoader.scala b/src/main/scala/is/hail/io/bgen/BgenLoader.scala index f067a7731cf..166eba121a4 100644 --- a/src/main/scala/is/hail/io/bgen/BgenLoader.scala +++ b/src/main/scala/is/hail/io/bgen/BgenLoader.scala @@ -5,7 +5,7 @@ import is.hail.annotations._ import is.hail.expr.{MatrixType, TArray, TCall, TFloat64, TString, TStruct, TVariant} import is.hail.io.vcf.LoadVCF import is.hail.io.{HadoopFSDataBinaryReader, IndexBTree} -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.OrderedRVD import is.hail.utils._ import is.hail.variant._ import org.apache.hadoop.io.LongWritable @@ -127,7 +127,7 @@ object BgenLoader { VSMLocalValue(globalAnnotation = Annotation.empty, sampleIds = sampleIds, sampleAnnotations = Array.fill(nSamples)(Annotation.empty)), - OrderedRDD2(matrixType.orderedRDD2Type, rdd2, Some(fastKeys), None)) + OrderedRVD(matrixType.orderedRVType, rdd2, Some(fastKeys), None)) } def index(hConf: org.apache.hadoop.conf.Configuration, file: String) { diff --git a/src/main/scala/is/hail/io/plink/PlinkLoader.scala b/src/main/scala/is/hail/io/plink/PlinkLoader.scala index b2fd60f06c3..384d7502ea8 100644 --- a/src/main/scala/is/hail/io/plink/PlinkLoader.scala +++ b/src/main/scala/is/hail/io/plink/PlinkLoader.scala @@ -4,7 +4,7 @@ import is.hail.HailContext import is.hail.annotations._ import is.hail.expr._ import is.hail.io.vcf.LoadVCF -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.OrderedRVD import is.hail.utils.StringEscapeUtils._ import is.hail.utils._ import is.hail.variant._ @@ -193,7 +193,7 @@ object PlinkLoader { VSMLocalValue(globalAnnotation = Annotation.empty, sampleIds = sampleIds, sampleAnnotations = sampleAnnotations), - OrderedRDD2(matrixType.orderedRDD2Type, rdd2, Some(fastKeys), None)) + OrderedRVD(matrixType.orderedRVType, rdd2, Some(fastKeys), None)) } def apply(hc: HailContext, bedPath: String, bimPath: String, famPath: String, ffConfig: FamFileConfig, diff --git a/src/main/scala/is/hail/io/vcf/LoadVCF.scala b/src/main/scala/is/hail/io/vcf/LoadVCF.scala index 5412d8e03e5..b8aacae79fe 100644 --- a/src/main/scala/is/hail/io/vcf/LoadVCF.scala +++ b/src/main/scala/is/hail/io/vcf/LoadVCF.scala @@ -5,7 +5,7 @@ import is.hail.HailContext import is.hail.annotations._ import is.hail.expr.{TStruct, _} import is.hail.io.{VCFAttributes, VCFMetadata} -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.OrderedRVD import is.hail.utils._ import is.hail.variant._ import org.apache.hadoop @@ -892,8 +892,8 @@ object LoadVCF { // nothing after the key val justVariants = parseLines(() => ())((c, l, rvb) => ())(lines, kType) - val rdd = OrderedRDD2( - matrixType.orderedRDD2Type, + val rdd = OrderedRVD( + matrixType.orderedRVType, parseLines { () => new ParseLineContext(genotypeSignature, new BufferedLineIterator(headerLinesBc.value.iterator.buffered)) } { (c, l, rvb) => diff --git a/src/main/scala/is/hail/methods/FilterAlleles.scala b/src/main/scala/is/hail/methods/FilterAlleles.scala index dd893074416..00f117a0351 100644 --- a/src/main/scala/is/hail/methods/FilterAlleles.scala +++ b/src/main/scala/is/hail/methods/FilterAlleles.scala @@ -2,7 +2,7 @@ package is.hail.methods import is.hail.annotations._ import is.hail.expr.{EvalContext, Parser, TArray, TInt32, TVariant} -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.{OrderedRVD, RVD} import is.hail.utils._ import is.hail.variant.{GenomeReference, Locus, Variant, VariantDataset, VariantSampleMatrix} import org.apache.spark.rdd.RDD @@ -51,8 +51,8 @@ object FilterAlleles { val newMatrixType = vsm.matrixType.copy(vaType = vAnnotator.newT, genotypeType = gAnnotator.newT) - def filter(rdd: RDD[RegionValue], - removeLeftAligned: Boolean, removeMoving: Boolean, verifyLeftAligned: Boolean): RDD[RegionValue] = { + def filter(rdd: RVD, + removeLeftAligned: Boolean, removeMoving: Boolean, verifyLeftAligned: Boolean): RVD = { def filterAllelesInVariant(prevlocus: Locus, v: Variant, va: Annotation): Option[(Variant, IndexedSeq[Int], IndexedSeq[Int])] = { var alive = 0 @@ -109,7 +109,7 @@ object FilterAlleles { val localSampleIdsBc = vsm.sampleIdsBc val localSampleAnnotationsBc = vsm.sampleAnnotationsBc - rdd.mapPartitions { it => + rdd.mapPartitions(newRowType) { it => var prevLocus: Locus = null it.flatMap { rv => @@ -158,18 +158,18 @@ object FilterAlleles { } } - val newRDD2: OrderedRDD2 = + val newRDD2: OrderedRVD = if (leftAligned) { - OrderedRDD2(newMatrixType.orderedRDD2Type, - vsm.rdd2.orderedPartitioner, + OrderedRVD(newMatrixType.orderedRVType, + vsm.rdd2.partitioner, filter(vsm.rdd2, removeLeftAligned = false, removeMoving = false, verifyLeftAligned = true)) } else { - val leftAlignedVariants = OrderedRDD2(newMatrixType.orderedRDD2Type, - vsm.rdd2.orderedPartitioner, + val leftAlignedVariants = OrderedRVD(newMatrixType.orderedRVType, + vsm.rdd2.partitioner, filter(vsm.rdd2, removeLeftAligned = false, removeMoving = true, verifyLeftAligned = false)) - val movingVariants = OrderedRDD2.shuffle(newMatrixType.orderedRDD2Type, - vsm.rdd2.orderedPartitioner, + val movingVariants = OrderedRVD.shuffle(newMatrixType.orderedRVType, + vsm.rdd2.partitioner, filter(vsm.rdd2, removeLeftAligned = true, removeMoving = false, verifyLeftAligned = false)) leftAlignedVariants.partitionSortedUnion(movingVariants) diff --git a/src/main/scala/is/hail/methods/LDPrune.scala b/src/main/scala/is/hail/methods/LDPrune.scala index f246d938d48..be7cd458e29 100644 --- a/src/main/scala/is/hail/methods/LDPrune.scala +++ b/src/main/scala/is/hail/methods/LDPrune.scala @@ -7,6 +7,7 @@ import is.hail.expr.{TArray, TFloat64Required, TInt32Required, TInt64Required, T import is.hail.sparkextras.GeneralRDD import org.apache.spark.storage.StorageLevel import is.hail.sparkextras._ +import is.hail.rvd.{OrderedRVD, OrderedRVType, RVD} import is.hail.variant._ import is.hail.utils._ import org.apache.spark.sql.Row @@ -81,7 +82,7 @@ object LDPrune { val genotypesPerPack = 32 val nPartitionsPerCore = 3 - case class GlobalPruneIntermediate(rvrdd: RegionValueRDD, rowType: TStruct, index: Int, persist: Boolean) + case class GlobalPruneIntermediate(rvd: RVD, rowType: TStruct, index: Int, persist: Boolean) val table: Array[Byte] = { val t = Array.ofDim[Byte](256 * 4) @@ -242,10 +243,10 @@ object LDPrune { r2 } - private def pruneLocal(inputRDD: OrderedRDD2, r2Threshold: Double, windowSize: Int, queueSize: Option[Int]): OrderedRDD2 = { + private def pruneLocal(inputRDD: OrderedRVD, r2Threshold: Double, windowSize: Int, queueSize: Option[Int]): OrderedRVD = { val localRowType = inputRDD.typ.rowType - inputRDD.mapPartitionsPreservesPartitioning({ it => + inputRDD.mapPartitionsPreservesPartitioning(inputRDD.typ) { it => val queue = queueSize match { case Some(qs) => new util.ArrayDeque[RegionValue](qs) case None => new util.ArrayDeque[RegionValue] @@ -285,16 +286,16 @@ object LDPrune { keepVariant } - }) + } } - private def pruneGlobal(inputRDD: OrderedRDD2, - r2Threshold: Double, windowSize: Int): (OrderedRDD2, Long) = { + private def pruneGlobal(inputRDD: OrderedRVD, + r2Threshold: Double, windowSize: Int): (OrderedRVD, Long) = { val sc = inputRDD.sparkContext - require(inputRDD.getStorageLevel2 != StorageLevel.NONE) + require(inputRDD.storageLevel == StorageLevel.MEMORY_AND_DISK) - val partitioner = inputRDD.orderedPartitioner + val partitioner = inputRDD.partitioner val rangeBounds = partitioner.rangeBounds.map(a => a.asInstanceOf[Row].getAs[Locus](0)).toArray val partitionIndices = inputRDD.partitions.map(_.index) val nPartitions = inputRDD.partitions.length @@ -349,42 +350,43 @@ object LDPrune { val pruneIntermediates = Array.fill[GlobalPruneIntermediate](nPartitions)(null) - def generalRDDInputs(partitionIndex: Int): (Array[RegionValueRDD], Array[(Int, Int)]) = { - val (rvrdds, inputs) = computeDependencies(partitionIndex).zipWithIndex.map { case (depIndex, i) => + def generalRDDInputs(partitionIndex: Int): (Array[RVD], Array[(Int, Int)]) = { + val (rvds, inputs) = computeDependencies(partitionIndex).zipWithIndex.map { case (depIndex, i) => if (depIndex == partitionIndex || contigStartPartitions.contains(depIndex)) - (inputRDD.rvrdd, (i, depIndex)) + (inputRDD, (i, depIndex)) else { val gpi = pruneIntermediates(depIndex) - pruneIntermediates(depIndex) = gpi.copy(rvrdd = gpi.rvrdd.persist2(StorageLevel.MEMORY_AND_DISK)) - (pruneIntermediates(depIndex).rvrdd, (i, gpi.index)) + pruneIntermediates(depIndex) = gpi.copy(rvd = gpi.rvd.persist(StorageLevel.MEMORY_AND_DISK), persist = true) + (pruneIntermediates(depIndex).rvd, (i, gpi.index)) } }.unzip - (rvrdds.toArray, inputs.toArray) + (rvds.toArray, inputs.toArray) } for (i <- partitionIndices) { - val (rvrdds, inputs) = generalRDDInputs(i) + val (rvds, inputs) = generalRDDInputs(i) pruneIntermediates(i) = GlobalPruneIntermediate( - rvrdd = new RegionValueRDD(new GeneralRDD(sc, rvrdds.map(_.rdd), Array((inputs, pruneF))), inputRDD.typ.rowType), + rvd = RVD(inputRDD.typ.rowType, new GeneralRDD(sc, rvds.map(_.rdd), Array((inputs, pruneF)))), rowType = localRowType, index = 0, persist = false) // creating single partition RDDs with partition index = 0 } - val prunedRDD = OrderedRDD2(inputRDD.typ, - inputRDD.orderedPartitioner, - new GeneralRDD[RegionValue](sc, pruneIntermediates.map(_.rvrdd.rdd), + val prunedRDD = OrderedRVD(inputRDD.typ, + inputRDD.partitioner, + new GeneralRDD[RegionValue](sc, pruneIntermediates.map(_.rvd.rdd), pruneIntermediates.zipWithIndex.map { case (gpi, i) => (Array((i, gpi.index)), pruneF) - })).persist2(StorageLevel.MEMORY_AND_DISK) + })) + .persist(StorageLevel.MEMORY_AND_DISK) val nVariantsKept = prunedRDD.count() pruneIntermediates.foreach { gpi => - if (gpi.rvrdd.getStorageLevel2 != StorageLevel.NONE) - gpi.rvrdd.unpersist2() + if (gpi.persist) + gpi.rvd.unpersist() } - inputRDD.unpersist2() + inputRDD.unpersist() (prunedRDD, nVariantsKept) } @@ -441,7 +443,7 @@ object LDPrune { val typ = vsm.rdd2.typ val standardizedRDD = vsm.filterVariantsExpr("v.isBiallelic()").rdd2 - .mapPartitionsPreservesPartitioning(new OrderedRDD2Type(typ.partitionKey, typ.key, bpvType))({ it => + .mapPartitionsPreservesPartitioning(new OrderedRVType(typ.partitionKey, typ.key, bpvType))({ it => val hcView = HardCallView(localRowType) val region = MemoryBuffer() val rvb = new RegionValueBuilder(region) @@ -469,7 +471,7 @@ object LDPrune { }) val ((rddLP1, nVariantsLP1, nPartitionsLP1), durationLP1) = time({ - val prunedRDD = pruneLocal(standardizedRDD, r2Threshold, windowSize, Option(maxQueueSize)).persist2(StorageLevel.MEMORY_AND_DISK) + val prunedRDD = pruneLocal(standardizedRDD, r2Threshold, windowSize, Option(maxQueueSize)).persist(StorageLevel.MEMORY_AND_DISK) val nVariantsKept = prunedRDD.count() val nPartitions = prunedRDD.partitions.length assert(nVariantsKept >= 1) @@ -479,14 +481,14 @@ object LDPrune { val ((rddLP2, nVariantsLP2, nPartitionsLP2), durationLP2) = time({ val (_, nPartitionsRequired) = estimateMemoryRequirements(nVariantsLP1, nSamples, nCores, memoryPerCore) - val repartRDD = rddLP1.coalesce(nPartitionsRequired, shuffle = true).persist2(StorageLevel.MEMORY_AND_DISK) + val repartRDD = rddLP1.coalesce(nPartitionsRequired, shuffle = true).persist(StorageLevel.MEMORY_AND_DISK) repartRDD.count() - rddLP1.unpersist2() - val prunedRDD = pruneLocal(repartRDD, r2Threshold, windowSize, None).persist2(StorageLevel.MEMORY_AND_DISK) + rddLP1.unpersist() + val prunedRDD = pruneLocal(repartRDD, r2Threshold, windowSize, None).persist(StorageLevel.MEMORY_AND_DISK) val nVariantsKept = prunedRDD.count() val nPartitions = prunedRDD.partitions.length assert(nVariantsKept >= 1) - repartRDD.unpersist2() + repartRDD.unpersist() (prunedRDD, nVariantsKept, nPartitions) }) info(s"LD prune step 2 of 3: nVariantsKept=$nVariantsLP2, nPartitions=$nPartitionsLP2, time=${ formatTime(durationLP2) }") diff --git a/src/main/scala/is/hail/methods/LinearRegression.scala b/src/main/scala/is/hail/methods/LinearRegression.scala index ba20e93dd04..8b9997bab3c 100644 --- a/src/main/scala/is/hail/methods/LinearRegression.scala +++ b/src/main/scala/is/hail/methods/LinearRegression.scala @@ -4,7 +4,6 @@ import breeze.linalg._ import breeze.numerics.sqrt import is.hail.annotations._ import is.hail.expr._ -import is.hail.sparkextras.WritableRegionValue import is.hail.stats._ import is.hail.utils._ import is.hail.variant._ diff --git a/src/main/scala/is/hail/methods/SplitMulti.scala b/src/main/scala/is/hail/methods/SplitMulti.scala index b3126efc415..9c63b9acffe 100644 --- a/src/main/scala/is/hail/methods/SplitMulti.scala +++ b/src/main/scala/is/hail/methods/SplitMulti.scala @@ -2,7 +2,7 @@ package is.hail.methods import is.hail.annotations._ import is.hail.expr._ -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.OrderedRVD import is.hail.utils._ import is.hail.variant.{Locus, Variant, VariantSampleMatrix} import org.apache.spark.rdd.RDD @@ -124,10 +124,10 @@ class SplitMultiPartitionContext( } object SplitMulti { - def unionMovedVariants(ordered: OrderedRDD2, - moved: RDD[RegionValue]): OrderedRDD2 = { - ordered.partitionSortedUnion(OrderedRDD2.shuffle(ordered.typ, - ordered.orderedPartitioner, + def unionMovedVariants(ordered: OrderedRVD, + moved: RDD[RegionValue]): OrderedRVD = { + ordered.partitionSortedUnion(OrderedRVD.shuffle(ordered.typ, + ordered.partitioner, moved)) } } @@ -179,16 +179,16 @@ class SplitMulti(vsm: VariantSampleMatrix, variantExpr: String, genotypeExpr: St } def split(): VariantSampleMatrix = { - val newRDD2: OrderedRDD2 = + val newRDD2: OrderedRVD = if (leftAligned) - OrderedRDD2( - newMatrixType.orderedRDD2Type, - vsm.rdd2.orderedPartitioner, + OrderedRVD( + newMatrixType.orderedRVType, + vsm.rdd2.partitioner, split(sortAlleles = true, removeLeftAligned = false, removeMoving = false, verifyLeftAligned = true)) else - SplitMulti.unionMovedVariants(OrderedRDD2( - newMatrixType.orderedRDD2Type, - vsm.rdd2.orderedPartitioner, + SplitMulti.unionMovedVariants(OrderedRVD( + newMatrixType.orderedRVType, + vsm.rdd2.partitioner, split(sortAlleles = true, removeLeftAligned = false, removeMoving = true, verifyLeftAligned = false)), split(sortAlleles = false, removeLeftAligned = true, removeMoving = false, verifyLeftAligned = false)) diff --git a/src/main/scala/is/hail/methods/VEP.scala b/src/main/scala/is/hail/methods/VEP.scala index f2b030ec7f2..00a63656475 100644 --- a/src/main/scala/is/hail/methods/VEP.scala +++ b/src/main/scala/is/hail/methods/VEP.scala @@ -298,7 +298,7 @@ object VEP { val localRowType = vsm.rowType val annotations = vsm.rdd2 - .mapPartitions({ it => + .mapPartitions { it => val pb = new ProcessBuilder(cmd.toList.asJava) val env = pb.environment() if (perl5lib != null) @@ -387,7 +387,7 @@ object VEP { r } - }, preservesPartitioning = true) + } .persist(StorageLevel.MEMORY_AND_DISK) info(s"vep: annotated ${ annotations.count() } variants") diff --git a/src/main/scala/is/hail/rvd/ConcreteRVD.scala b/src/main/scala/is/hail/rvd/ConcreteRVD.scala new file mode 100644 index 00000000000..2e46895bfac --- /dev/null +++ b/src/main/scala/is/hail/rvd/ConcreteRVD.scala @@ -0,0 +1,7 @@ +package is.hail.rvd + +import is.hail.annotations.RegionValue +import is.hail.expr.Type +import org.apache.spark.rdd.RDD + +class ConcreteRVD(val rowType: Type, val rdd: RDD[RegionValue]) extends RVD diff --git a/src/main/scala/is/hail/rvd/OrderedRVD.scala b/src/main/scala/is/hail/rvd/OrderedRVD.scala new file mode 100644 index 00000000000..9b1a198c0fa --- /dev/null +++ b/src/main/scala/is/hail/rvd/OrderedRVD.scala @@ -0,0 +1,527 @@ +package is.hail.rvd + +import java.util + +import is.hail.annotations._ +import is.hail.expr.{TArray, Type} +import is.hail.sparkextras._ +import is.hail.utils._ +import org.apache.spark.rdd.{RDD, ShuffledRDD} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.SparkContext + +import scala.collection.mutable + +class OrderedRVD private( + val typ: OrderedRVType, + val partitioner: OrderedRVPartitioner, + val rdd: RDD[RegionValue]) extends RVD with Serializable { self => + def rowType: Type = typ.rowType + + def insert[PC](newContext: () => PC)(typeToInsert: Type, + path: List[String], + // rv argument to add is the entire row + add: (PC, RegionValue, RegionValueBuilder) => Unit): OrderedRVD = { + val localTyp = typ + + val (insTyp, inserter) = typ.insert(typeToInsert, path) + OrderedRVD(insTyp, + partitioner, + rdd.mapPartitions { it => + val c = newContext() + val rv2b = new RegionValueBuilder() + val rv2 = RegionValue() + + it.map { rv => + val ur = new UnsafeRow(localTyp.rowType, rv) + rv2b.set(rv.region) + rv2b.start(insTyp.rowType) + inserter(rv.region, rv.offset, rv2b, () => add(c, rv, rv2b)) + rv2.set(rv.region, rv2b.end()) + rv2 + } + }) + } + + def mapPreservesPartitioning(newTyp: OrderedRVType)(f: (RegionValue) => RegionValue): OrderedRVD = + OrderedRVD(newTyp, + partitioner, + rdd.map(f)) + + def mapPartitionsPreservesPartitioning(newTyp: OrderedRVType)(f: (Iterator[RegionValue]) => Iterator[RegionValue]): OrderedRVD = + OrderedRVD(newTyp, + partitioner, + rdd.mapPartitions(f)) + + override def filter(p: (RegionValue) => Boolean): OrderedRVD = + OrderedRVD(typ, + partitioner, + rdd.filter(p)) + + def sample(withReplacement: Boolean, fraction: Double, seed: Long): OrderedRVD = + OrderedRVD(typ, + partitioner, + rdd.sample(withReplacement, fraction, seed)) + + override def persist(level: StorageLevel): OrderedRVD = { + val PersistedRVRDD(persistedRDD, iterationRDD) = persistRVRDD(level) + new OrderedRVD(typ, partitioner, iterationRDD) { + override def storageLevel: StorageLevel = rdd.getStorageLevel + + override def persist(newLevel: StorageLevel): OrderedRVD = { + if (newLevel == StorageLevel.NONE) + unpersist() + else { + rdd.persist(newLevel) + this + } + } + + override def unpersist(): OrderedRVD = { + persistedRDD.unpersist() + self + } + } + } + + override def cache(): OrderedRVD = persist(StorageLevel.MEMORY_ONLY) + + override def unpersist(): OrderedRVD = this + + def orderedJoinDistinct(right: OrderedRVD, joinType: String): RDD[JoinedRegionValue] = { + val lTyp = typ + val rTyp = right.typ + + if (lTyp.kType != rTyp.kType) + fatal( + s"""Incompatible join keys. Keys must have same length and types, in order: + | Left key type: ${ lTyp.kType.toPrettyString(compact = true) } + | Right key type: ${ rTyp.kType.toPrettyString(compact = true) } + """.stripMargin) + + joinType match { + case "inner" | "left" => new OrderedJoinDistinctRDD2(this, right, joinType) + case _ => fatal(s"Unknown join type `$joinType'. Choose from `inner' or `left'.") + } + } + + def partitionSortedUnion(rdd2: OrderedRVD): OrderedRVD = { + assert(typ == rdd2.typ) + assert(partitioner == rdd2.partitioner) + + val localTyp = typ + OrderedRVD(typ, partitioner, + rdd.zipPartitions(rdd2.rdd) { case (it, it2) => + new Iterator[RegionValue] { + private val bit = it.buffered + private val bit2 = it2.buffered + + def hasNext: Boolean = bit.hasNext || bit2.hasNext + + def next(): RegionValue = { + if (!bit.hasNext) + bit2.next() + else if (!bit2.hasNext) + bit.next() + else { + val c = localTyp.kInRowOrd.compare(bit.head, bit2.head) + if (c < 0) + bit.next() + else + bit2.next() + } + } + } + }) + } + + def copy(typ: OrderedRVType = typ, + orderedPartitioner: OrderedRVPartitioner = partitioner, + rdd: RDD[RegionValue] = rdd): OrderedRVD = { + OrderedRVD(typ, orderedPartitioner, rdd) + } + + def naiveCoalesce(maxPartitions: Int): OrderedRVD = { + val n = partitioner.numPartitions + if (maxPartitions >= n) + return this + + val newN = maxPartitions + val newNParts = Array.tabulate(newN)(i => (n - i + newN - 1) / newN) + assert(newNParts.sum == n) + assert(newNParts.forall(_ > 0)) + + val newPartEnd = newNParts.scanLeft(-1)(_ + _).tail + assert(newPartEnd.last == n - 1) + + val newRangeBounds = UnsafeIndexedSeq( + TArray(typ.pkType), + newPartEnd.init.map(partitioner.rangeBounds)) + + OrderedRVD( + typ, + new OrderedRVPartitioner(newN, typ.partitionKey, typ.kType, newRangeBounds), + new BlockedRDD(rdd, newPartEnd)) + } + + override def coalesce(maxPartitions: Int, shuffle: Boolean): OrderedRVD = { + require(maxPartitions > 0, "cannot coalesce to nPartitions <= 0") + val n = rdd.partitions.length + if (!shuffle && maxPartitions >= n) + return this + if (shuffle) { + val shuffled = rdd.coalesce(maxPartitions, shuffle = true) + val ranges = OrderedRVD.calculateKeyRanges(typ, OrderedRVD.getPartitionKeyInfo(typ, shuffled), shuffled.getNumPartitions) + OrderedRVD.shuffle(typ, new OrderedRVPartitioner(ranges.length + 1, typ.partitionKey, typ.kType, ranges), shuffled) + } else { + + val partSize = rdd.context.runJob(rdd, getIteratorSize _) + log.info(s"partSize = ${ partSize.toSeq }") + + val partCumulativeSize = mapAccumulate[Array, Long, Long, Long](partSize, 0)((s, acc) => (s + acc, s + acc)) + val totalSize = partCumulativeSize.last + + var newPartEnd = (0 until maxPartitions).map { i => + val t = totalSize * (i + 1) / maxPartitions + + /* j largest index not greater than t */ + var j = util.Arrays.binarySearch(partCumulativeSize, t) + if (j < 0) + j = -j - 1 + while (j < partCumulativeSize.length - 1 + && partCumulativeSize(j + 1) == t) + j += 1 + assert(t <= partCumulativeSize(j) && + (j == partCumulativeSize.length - 1 || + t < partCumulativeSize(j + 1))) + j + }.toArray + + newPartEnd = newPartEnd.zipWithIndex.filter { case (end, i) => i == 0 || newPartEnd(i) != newPartEnd(i - 1) } + .map(_._1) + + info(s"newPartEnd = ${ newPartEnd.toSeq }") + + assert(newPartEnd.last == n - 1) + assert(newPartEnd.zip(newPartEnd.tail).forall { case (i, inext) => i < inext }) + + if (newPartEnd.length < maxPartitions) + warn(s"coalesced to ${ newPartEnd.length } ${ plural(newPartEnd.length, "partition") }, less than requested $maxPartitions") + + val newRangeBounds = newPartEnd.init.map(partitioner.rangeBounds).asInstanceOf[UnsafeIndexedSeq] + val newPartitioner = new OrderedRVPartitioner(newRangeBounds.length + 1, typ.partitionKey, typ.kType, newRangeBounds) + OrderedRVD(typ, newPartitioner, new BlockedRDD(rdd, newPartEnd)) + } + } +} + +object OrderedRVD { + type CoercionMethod = Int + + final val ORDERED_PARTITIONER: CoercionMethod = 0 + final val AS_IS: CoercionMethod = 1 + final val LOCAL_SORT: CoercionMethod = 2 + final val SHUFFLE: CoercionMethod = 3 + + def empty(sc: SparkContext, typ: OrderedRVType): OrderedRVD = { + OrderedRVD(typ, + OrderedRVPartitioner.empty(typ), + sc.emptyRDD[RegionValue]) + } + + def cast(typ: OrderedRVType, + rdd: RDD[RegionValue]): OrderedRVD = { + if (rdd.partitions.isEmpty) + OrderedRVD.empty(rdd.sparkContext, typ) + else + (rdd.partitioner: @unchecked) match { + case Some(p: OrderedRVPartitioner) => OrderedRVD(typ, p.asInstanceOf[OrderedRVPartitioner], rdd) + } + } + + def apply(typ: OrderedRVType, + rdd: RDD[RegionValue], fastKeys: Option[RDD[RegionValue]], hintPartitioner: Option[OrderedRVPartitioner]): OrderedRVD = { + val (_, orderedRDD) = coerce(typ, rdd, fastKeys, hintPartitioner) + orderedRDD + } + + /** + * Precondition: the iterator it is PK-sorted. We lazily K-sort each block + * of PK-equivalent elements. + */ + def localKeySort(typ: OrderedRVType, + // it: Iterator[RegionValue[rowType]] + it: Iterator[RegionValue]): Iterator[RegionValue] = { + new Iterator[RegionValue] { + private val bit = it.buffered + + private val q = new mutable.PriorityQueue[RegionValue]()(typ.kInRowOrd.reverse) + + def hasNext: Boolean = bit.hasNext || q.nonEmpty + + def next(): RegionValue = { + if (q.isEmpty) { + do { + val rv = bit.next() + // FIXME ugh, no good answer here + q.enqueue(RegionValue( + rv.region.copy(), + rv.offset)) + } while (bit.hasNext && typ.pkInRowOrd.compare(q.head, bit.head) == 0) + } + + val rv = q.dequeue() + rv + } + } + } + + // getKeys: RDD[RegionValue[kType]] + def getKeys(typ: OrderedRVType, + // rdd: RDD[RegionValue[rowType]] + rdd: RDD[RegionValue]): RDD[RegionValue] = { + rdd.mapPartitions { it => + val wrv = WritableRegionValue(typ.kType) + it.map { rv => + wrv.setSelect(typ.rowType, typ.kRowFieldIdx, rv) + wrv.value + } + } + } + + def getPartitionKeyInfo(typ: OrderedRVType, + // keys: RDD[kType] + keys: RDD[RegionValue]): Array[OrderedRVPartitionInfo] = { + val nPartitions = keys.getNumPartitions + + val rng = new java.util.Random(1) + val partitionSeed = Array.tabulate[Int](nPartitions)(i => rng.nextInt()) + + val sampleSize = math.min(nPartitions * 20, 1000000) + val samplesPerPartition = sampleSize / nPartitions + + val pkis = keys.mapPartitionsWithIndex { case (i, it) => + if (it.hasNext) + Iterator(OrderedRVPartitionInfo(typ, samplesPerPartition, i, it, partitionSeed(i))) + else + Iterator() + }.collect() + + pkis.sortBy(_.min)(typ.pkOrd) + } + + def coerce(typ: OrderedRVType, + // rdd: RDD[RegionValue[rowType]] + rdd: RDD[RegionValue], + // fastKeys: Option[RDD[RegionValue[kType]]] + fastKeys: Option[RDD[RegionValue]] = None, + hintPartitioner: Option[OrderedRVPartitioner] = None): (CoercionMethod, OrderedRVD) = { + val sc = rdd.sparkContext + + if (rdd.partitions.isEmpty) + return (ORDERED_PARTITIONER, empty(sc, typ)) + + // keys: RDD[RegionValue[kType]] + val keys = fastKeys.getOrElse(getKeys(typ, rdd)) + + val pkis = getPartitionKeyInfo(typ, keys) + + if (pkis.isEmpty) + return (AS_IS, empty(sc, typ)) + + val partitionsSorted = (pkis, pkis.tail).zipped.forall { case (p, pnext) => + val r = typ.pkOrd.lteq(p.max, pnext.min) + if (!r) + log.info(s"not sorted: p = $p, pnext = $pnext") + r + } + + val sortedness = pkis.map(_.sortedness).min + if (partitionsSorted && sortedness >= PartitionKeyInfo.TSORTED) { + val (adjustedPartitions, rangeBounds, adjSortedness) = rangesAndAdjustments(typ, pkis, sortedness) + + val unsafeRangeBounds = UnsafeIndexedSeq(TArray(typ.pkType), rangeBounds) + val partitioner = new OrderedRVPartitioner(adjustedPartitions.length, + typ.partitionKey, + typ.kType, + unsafeRangeBounds) + + val reorderedPartitionsRDD = rdd.reorderPartitions(pkis.map(_.partitionIndex)) + val adjustedRDD = new AdjustedPartitionsRDD(reorderedPartitionsRDD, adjustedPartitions) + (adjSortedness: @unchecked) match { + case PartitionKeyInfo.KSORTED => + info("Coerced sorted dataset") + (AS_IS, OrderedRVD(typ, + partitioner, + adjustedRDD)) + + case PartitionKeyInfo.TSORTED => + info("Coerced almost-sorted dataset") + (LOCAL_SORT, OrderedRVD(typ, + partitioner, + adjustedRDD.mapPartitions { it => + localKeySort(typ, it) + })) + } + } else { + info("Ordering unsorted dataset with network shuffle") + val p = hintPartitioner + .filter(_.numPartitions >= rdd.partitions.length) + .getOrElse { + val ranges = calculateKeyRanges(typ, pkis, rdd.getNumPartitions) + new OrderedRVPartitioner(ranges.length + 1, typ.partitionKey, typ.kType, ranges) + } + (SHUFFLE, shuffle(typ, p, rdd)) + } + } + + def calculateKeyRanges(typ: OrderedRVType, pkis: Array[OrderedRVPartitionInfo], nPartitions: Int): UnsafeIndexedSeq = { + val keys = pkis + .flatMap(_.samples) + .sorted(typ.pkOrd) + + // FIXME weighted + val rangeBounds = + if (keys.length <= nPartitions) + keys.init + else { + val k = keys.length / nPartitions + assert(k > 0) + Array.tabulate(nPartitions - 1)(i => keys((i + 1) * k)) + } + + UnsafeIndexedSeq(TArray(typ.pkType), rangeBounds) + } + + def shuffle(typ: OrderedRVType, + partitioner: OrderedRVPartitioner, + rdd: RDD[RegionValue]): OrderedRVD = { + OrderedRVD(typ, + partitioner, + new ShuffledRDD[RegionValue, RegionValue, RegionValue]( + rdd.mapPartitions { it => + val wkrv = WritableRegionValue(typ.kType) + it.map { rv => + wkrv.setSelect(typ.rowType, typ.kRowFieldIdx, rv) + (wkrv.value, rv) + } + }, + partitioner) + .setKeyOrdering(typ.kOrd) + .mapPartitionsWithIndex { case (i, it) => + it.map { case (k, v) => + assert(partitioner.getPartition(k) == i) + v + } + }) + } + + def shuffle(typ: OrderedRVType, partitioner: OrderedRVPartitioner, rvd: RVD): OrderedRVD = + shuffle(typ, partitioner, rvd.rdd) + + def rangesAndAdjustments(typ: OrderedRVType, + sortedKeyInfo: Array[OrderedRVPartitionInfo], + sortedness: Int): (IndexedSeq[Array[Adjustment[RegionValue]]], Array[RegionValue], Int) = { + + val rangeBounds = new ArrayBuilder[RegionValue]() + val adjustmentsBuffer = new mutable.ArrayBuffer[Array[Adjustment[RegionValue]]] + val indicesBuilder = new ArrayBuilder[Int]() + + var anyOverlaps = false + + val it = sortedKeyInfo.indices.iterator.buffered + + while (it.nonEmpty) { + indicesBuilder.clear() + val i = it.next() + val thisP = sortedKeyInfo(i) + val min = thisP.min + val max = thisP.max + + indicesBuilder += i + + var continue = true + while (continue && it.hasNext && typ.pkOrd.equiv(sortedKeyInfo(it.head).min, max)) { + anyOverlaps = true + if (typ.pkOrd.equiv(sortedKeyInfo(it.head).max, max)) + indicesBuilder += it.next() + else { + indicesBuilder += it.head + continue = false + } + } + + val adjustments = indicesBuilder.result().zipWithIndex.map { case (partitionIndex, index) => + assert(sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED) + val f: (Iterator[RegionValue]) => Iterator[RegionValue] = + // In the first partition, drop elements that should go in the last if necessary + if (index == 0) + if (adjustmentsBuffer.nonEmpty && typ.pkOrd.equiv(min, sortedKeyInfo(adjustmentsBuffer.last.head.index).max)) + _.dropWhile(rv => typ.pkRowOrd.compare(min.region, min.offset, rv) == 0) + else + identity + else + // In every subsequent partition, only take elements that are the max of the last + _.takeWhile(rv => typ.pkRowOrd.compare(max.region, max.offset, rv) == 0) + Adjustment(partitionIndex, f) + } + + adjustmentsBuffer += adjustments + + if (it.hasNext) + rangeBounds += max + } + + val adjSortedness = if (anyOverlaps) + sortedness.min(PartitionKeyInfo.TSORTED) + else + sortedness + + (adjustmentsBuffer, rangeBounds.result(), adjSortedness) + } + + def apply(typ: OrderedRVType, + partitioner: OrderedRVPartitioner, + rdd: RDD[RegionValue]): OrderedRVD = { + val sc = rdd.sparkContext + + new OrderedRVD(typ, partitioner, rdd.mapPartitionsWithIndex { case (i, it) => + val prev = WritableRegionValue(typ.pkType) + + new Iterator[RegionValue] { + var first = true + + def hasNext: Boolean = it.hasNext + + def next(): RegionValue = { + val rv = it.next() + + if (i < partitioner.rangeBounds.length) { + assert(typ.pkRowOrd.compare( + partitioner.region, partitioner.loadElement(i), + rv) >= 0) + } + if (i > 0) + assert(typ.pkRowOrd.compare(partitioner.region, partitioner.loadElement(i - 1), + rv) < 0) + + if (first) + first = false + else + assert(typ.pkRowOrd.compare(prev.value, rv) <= 0) + + prev.setSelect(typ.rowType, typ.pkRowFieldIdx, rv) + + assert(typ.pkRowOrd.compare(prev.value, rv) == 0) + + rv + } + } + }) + } + + def apply(typ: OrderedRVType, partitioner: OrderedRVPartitioner, rvd: RVD): OrderedRVD = { + assert(typ.rowType == rvd.rowType) + apply(typ, partitioner, rvd.rdd) + } +} diff --git a/src/main/scala/is/hail/rvd/OrderedRVPartitionInfo.scala b/src/main/scala/is/hail/rvd/OrderedRVPartitionInfo.scala new file mode 100644 index 00000000000..ca6852baca9 --- /dev/null +++ b/src/main/scala/is/hail/rvd/OrderedRVPartitionInfo.scala @@ -0,0 +1,81 @@ +package is.hail.rvd + +import is.hail.annotations.{RegionValue, WritableRegionValue} +import is.hail.expr.Type + +case class OrderedRVPartitionInfo( + partitionIndex: Int, + size: Int, + min: RegionValue, + max: RegionValue, + // min, max: RegionValue[pkType] + samples: Array[RegionValue], + sortedness: Int) { + def pretty(t: Type): String = { + s"partitionIndex=$partitionIndex,size=$size,min=${min.pretty(t)},max=${max.pretty(t)},samples=${samples.map(_.pretty(t)).mkString(",")},sortedness=$sortedness" + } +} + +object OrderedRVPartitionInfo { + final val UNSORTED = 0 + final val TSORTED = 1 + final val KSORTED = 2 + + def apply(typ: OrderedRVType, sampleSize: Int, partitionIndex: Int, it: Iterator[RegionValue], seed: Int): OrderedRVPartitionInfo = { + val minF = WritableRegionValue(typ.pkType) + val maxF = WritableRegionValue(typ.pkType) + val prevF = WritableRegionValue(typ.kType) + + assert(it.hasNext) + val f0 = it.next() + + minF.setSelect(typ.kType, typ.pkKFieldIdx, f0) + maxF.setSelect(typ.kType, typ.pkKFieldIdx, f0) + prevF.set(f0) + + var sortedness = KSORTED + + val rng = new java.util.Random(seed) + val samples = new Array[WritableRegionValue](sampleSize) + + var i = 0 + + if (sampleSize > 0) { + samples(0) = WritableRegionValue(typ.pkType, f0) + i += 1 + } + + while (it.hasNext) { + val f = it.next() + + if (typ.kOrd.compare(f, prevF.value) < 0) { + if (typ.pkInKOrd.compare(f, prevF.value) < 0) + sortedness = UNSORTED + else + sortedness = sortedness.min(TSORTED) + } + + if (typ.pkKOrd.compare(minF.value, f) > 0) + minF.setSelect(typ.kType, typ.pkKFieldIdx, f) + if (typ.pkKOrd.compare(maxF.value, f) < 0) + maxF.setSelect(typ.kType, typ.pkKFieldIdx, f) + + prevF.set(f) + + if (i < sampleSize) + samples(i) = WritableRegionValue(typ.pkType, f) + else { + val j = rng.nextInt(i) + if (j < sampleSize) + samples(j).set(f) + } + + i += 1 + } + + OrderedRVPartitionInfo(partitionIndex, i, + minF.value, maxF.value, + Array.tabulate[RegionValue](math.min(i, sampleSize))(i => samples(i).value), + sortedness) + } +} \ No newline at end of file diff --git a/src/main/scala/is/hail/rvd/OrderedRVPartitioner.scala b/src/main/scala/is/hail/rvd/OrderedRVPartitioner.scala new file mode 100644 index 00000000000..eb1a36f4161 --- /dev/null +++ b/src/main/scala/is/hail/rvd/OrderedRVPartitioner.scala @@ -0,0 +1,96 @@ +package is.hail.rvd + +import is.hail.annotations._ +import is.hail.expr.{JSONAnnotationImpex, Parser, TArray, TStruct} +import is.hail.utils._ +import is.hail.sparkextras.BinarySearch +import org.apache.spark.{Partitioner, SparkContext} +import org.json4s.JsonAST._ + +class OrderedRVPartitioner( + val numPartitions: Int, + val partitionKey: Array[String], val kType: TStruct, + // rangeBounds is partition max, sorted ascending + // rangeBounds: Array[pkType] + val rangeBounds: UnsafeIndexedSeq) extends Partitioner { + require((numPartitions == 0 && rangeBounds.isEmpty) || numPartitions == rangeBounds.length + 1, + s"nPartitions = $numPartitions, ranges = ${ rangeBounds.length }") + + val (pkType, _) = kType.select(partitionKey) + + val pkKFieldIdx: Array[Int] = partitionKey.map(n => kType.fieldIdx(n)) + val pkKOrd: UnsafeOrdering = OrderedRVType.selectUnsafeOrdering(pkType, (0 until pkType.size).toArray, kType, pkKFieldIdx) + + val rangeBoundsType = TArray(pkType) + assert(rangeBoundsType.typeCheck(rangeBounds)) + + val ordering: Ordering[Annotation] = pkType.ordering(missingGreatest = true) + require(rangeBounds.isEmpty || rangeBounds.zip(rangeBounds.tail).forall { case (left, right) => ordering.compare(left, right) < 0 }) + + def region: MemoryBuffer = rangeBounds.region + + def loadElement(i: Int): Long = rangeBoundsType.loadElement(rangeBounds.region, rangeBounds.aoff, rangeBounds.length, i) + + // return the smallest partition for which key <= max + // pk: Annotation[pkType] + def getPartitionPK(pk: Any): Int = { + assert(pkType.typeCheck(pk)) + + val part = BinarySearch.binarySearch(numPartitions, + // key.compare(elem) + i => + if (i == numPartitions - 1) + -1 // key.compare(inf) + else + ordering.compare(pk, rangeBounds(i))) + part + } + + // return the smallest partition for which key <= max + // key: RegionValue[kType] + def getPartition(key: Any): Int = { + val keyrv = key.asInstanceOf[RegionValue] + + val part = BinarySearch.binarySearch(numPartitions, + // key.compare(elem) + i => + if (i == numPartitions - 1) + -1 // key.compare(inf) + else + -pkKOrd.compare(rangeBounds.region, loadElement(i), keyrv)) + part + } + + def toJSON: JValue = + JObject(List( + "numPartitions" -> JInt(numPartitions), + "partitionKey" -> JArray(partitionKey.map(n => JString(n)).toList), + "kType" -> JString(kType.toPrettyString(compact = true)), + "rangeBounds" -> JSONAnnotationImpex.exportAnnotation(rangeBounds, rangeBoundsType))) +} + +object OrderedRVPartitioner { + def empty(typ: OrderedRVType): OrderedRVPartitioner = { + new OrderedRVPartitioner(0, typ.partitionKey, typ.kType, UnsafeIndexedSeq.empty(TArray(typ.pkType))) + } + + def apply(sc: SparkContext, jv: JValue): OrderedRVPartitioner = { + case class Extract(numPartitions: Int, + partitionKey: Array[String], + kType: String, + rangeBounds: JValue) + val ex = jv.extract[Extract] + + val partitionKey = ex.partitionKey + val kType = Parser.parseType(ex.kType).asInstanceOf[TStruct] + val (pkType, _) = kType.select(partitionKey) + + val rangeBoundsType = TArray(pkType) + new OrderedRVPartitioner(ex.numPartitions, + ex.partitionKey, + kType, + UnsafeIndexedSeq( + rangeBoundsType, + JSONAnnotationImpex.importAnnotation(ex.rangeBounds, rangeBoundsType).asInstanceOf[IndexedSeq[Annotation]])) + } +} \ No newline at end of file diff --git a/src/main/scala/is/hail/rvd/OrderedRVType.scala b/src/main/scala/is/hail/rvd/OrderedRVType.scala new file mode 100644 index 00000000000..2d5bca64b65 --- /dev/null +++ b/src/main/scala/is/hail/rvd/OrderedRVType.scala @@ -0,0 +1,120 @@ +package is.hail.rvd + +import is.hail.annotations.{MemoryBuffer, UnsafeInserter, UnsafeOrdering} +import is.hail.expr.{Parser, TStruct, Type} +import is.hail.utils._ +import org.apache.commons.lang3.builder.HashCodeBuilder +import org.json4s.JsonAST.{JArray, JObject, JString, JValue} + +class OrderedRVType( + val partitionKey: Array[String], + val key: Array[String], // full key + val rowType: TStruct) extends Serializable { + assert(key.startsWith(partitionKey)) + + val (pkType, _) = rowType.select(partitionKey) + val (kType, _) = rowType.select(key) + + val keySet: Set[String] = key.toSet + val (valueType, _) = rowType.filter(f => !keySet.contains(f.name)) + + val valueFieldIdx: Array[Int] = (0 until rowType.size) + .filter(i => !keySet.contains(rowType.fields(i).name)) + .toArray + + val kRowFieldIdx: Array[Int] = key.map(n => rowType.fieldIdx(n)) + val pkRowFieldIdx: Array[Int] = partitionKey.map(n => rowType.fieldIdx(n)) + val pkKFieldIdx: Array[Int] = partitionKey.map(n => kType.fieldIdx(n)) + assert(pkKFieldIdx sameElements (0 until pkType.size)) + + val pkOrd: UnsafeOrdering = pkType.unsafeOrdering(missingGreatest = true) + val kOrd: UnsafeOrdering = kType.unsafeOrdering(missingGreatest = true) + + val pkRowOrd: UnsafeOrdering = OrderedRVType.selectUnsafeOrdering(pkType, (0 until pkType.size).toArray, rowType, pkRowFieldIdx) + val pkKOrd: UnsafeOrdering = OrderedRVType.selectUnsafeOrdering(pkType, (0 until pkType.size).toArray, kType, pkKFieldIdx) + val pkInRowOrd: UnsafeOrdering = OrderedRVType.selectUnsafeOrdering(rowType, pkRowFieldIdx, rowType, pkRowFieldIdx) + val kInRowOrd: UnsafeOrdering = OrderedRVType.selectUnsafeOrdering(rowType, kRowFieldIdx, rowType, kRowFieldIdx) + val pkInKOrd: UnsafeOrdering = OrderedRVType.selectUnsafeOrdering(kType, pkKFieldIdx, kType, pkKFieldIdx) + val kRowOrd: UnsafeOrdering = OrderedRVType.selectUnsafeOrdering(kType, (0 until kType.size).toArray, rowType, kRowFieldIdx) + + def insert(typeToInsert: Type, path: List[String]): (OrderedRVType, UnsafeInserter) = { + assert(path.nonEmpty) + assert(!key.contains(path.head)) + + val (newRowType, inserter) = rowType.unsafeInsert(typeToInsert, path) + + (new OrderedRVType(partitionKey, key, newRowType.asInstanceOf[TStruct]), inserter) + } + + def toJSON: JValue = + JObject(List( + "partitionKey" -> JArray(partitionKey.map(JString).toList), + "key" -> JArray(key.map(JString).toList), + "rowType" -> JString(rowType.toString))) + + override def equals(that: Any): Boolean = that match { + case that: OrderedRVType => + (partitionKey sameElements that.partitionKey) && + (key sameElements that.key) && + rowType == that.rowType + case _ => false + } + + override def hashCode: Int = { + val b = new HashCodeBuilder(43, 19) + b.append(partitionKey.length) + partitionKey.foreach(b.append) + + b.append(key.length) + key.foreach(b.append) + + b.append(rowType) + b.toHashCode + } +} + +object OrderedRVType { + def selectUnsafeOrdering(t1: TStruct, fields1: Array[Int], + t2: TStruct, fields2: Array[Int]): UnsafeOrdering = { + require(fields1.length == fields2.length) + require((fields1, fields2).zipped.forall { case (f1, f2) => + t1.fieldType(f1) == t2.fieldType(f2) + }) + + val nFields = fields1.length + val fieldOrderings = fields1.map(f1 => t1.fieldType(f1).unsafeOrdering(missingGreatest = true)) + + new UnsafeOrdering { + def compare(r1: MemoryBuffer, o1: Long, r2: MemoryBuffer, o2: Long): Int = { + var i = 0 + while (i < nFields) { + val f1 = fields1(i) + val f2 = fields2(i) + val leftDefined = t1.isFieldDefined(r1, o1, f1) + val rightDefined = t2.isFieldDefined(r2, o2, f2) + + if (leftDefined && rightDefined) { + val c = fieldOrderings(i).compare(r1, t1.loadField(r1, o1, f1), r2, t2.loadField(r2, o2, f2)) + if (c != 0) + return c + } else if (leftDefined != rightDefined) { + val c = if (leftDefined) -1 else 1 + return c + } + + i += 1 + } + + 0 + } + } + } + + def apply(jv: JValue): OrderedRVType = { + case class Extract(partitionKey: Array[String], + key: Array[String], + rowType: String) + val ex = jv.extract[Extract] + new OrderedRVType(ex.partitionKey, ex.key, Parser.parseType(ex.rowType).asInstanceOf[TStruct]) + } +} \ No newline at end of file diff --git a/src/main/scala/is/hail/rvd/RVD.scala b/src/main/scala/is/hail/rvd/RVD.scala new file mode 100644 index 00000000000..44d93873ca1 --- /dev/null +++ b/src/main/scala/is/hail/rvd/RVD.scala @@ -0,0 +1,124 @@ +package is.hail.rvd + +import is.hail.annotations.{MemoryBuffer, RegionValue, RegionValueBuilder} +import is.hail.expr.Type +import org.apache.spark.{Partition, SparkContext} +import org.apache.spark.rdd.{AggregateWithContext, RDD} +import org.apache.spark.storage.StorageLevel + +import scala.reflect.ClassTag + + +object RVD { + def apply(rowType: Type, rdd: RDD[RegionValue]): ConcreteRVD = new ConcreteRVD(rowType, rdd) +} + +case class PersistedRVRDD( + persistedRDD: RDD[RegionValue], + iterationRDD: RDD[RegionValue]) + +trait RVD { + self => + def rowType: Type + + def rdd: RDD[RegionValue] + + def sparkContext: SparkContext = rdd.sparkContext + + def getNumPartitions: Int = rdd.getNumPartitions + + def partitions: Array[Partition] = rdd.partitions + + def filter(f: (RegionValue) => Boolean): RVD = RVD(rowType, rdd.filter(f)) + + def map(newRowType: Type)(f: (RegionValue) => RegionValue): RVD = RVD(newRowType, rdd.map(f)) + + def mapWithContext[C](newRowType: Type)(makeContext: () => C)(f: (C, RegionValue) => RegionValue) = + RVD(newRowType, rdd.mapPartitions { it => + val c = makeContext() + it.map { rv => f(c, rv) } + }) + + def map[T](f: (RegionValue) => T)(implicit tct: ClassTag[T]): RDD[T] = rdd.map(f) + + def mapPartitions(newRowType: Type)(f: (Iterator[RegionValue]) => Iterator[RegionValue]): RVD = RVD(newRowType, rdd.mapPartitions(f)) + + def mapPartitionsWithIndex[T](f: (Int, Iterator[RegionValue]) => Iterator[T])(implicit tct: ClassTag[T]): RDD[T] = rdd.mapPartitionsWithIndex(f) + + def mapPartitions[T](f: (Iterator[RegionValue]) => Iterator[T])(implicit tct: ClassTag[T]): RDD[T] = rdd.mapPartitions(f) + + def treeAggregate[U: ClassTag](zeroValue: U)( + seqOp: (U, RegionValue) => U, + combOp: (U, U) => U, + depth: Int = 2): U = rdd.treeAggregate(zeroValue)(seqOp, combOp, depth) + + def aggregateWithContext[U: ClassTag, V](context: () => V)(zeroValue: U) + (seqOp: (V, U, RegionValue) => U, combOp: (U, U) => U): U = { + AggregateWithContext.aggregateWithContext(rdd)(context)(zeroValue)(seqOp, combOp) + } + + def count(): Long = rdd.count() + + protected def persistRVRDD(level: StorageLevel): PersistedRVRDD = { + val localRowType = rowType + + // copy, persist region values + val persistedRDD = rdd.mapPartitions { it => + val region = MemoryBuffer() + val rvb = new RegionValueBuilder(region) + it.map { rv => + region.clear() + rvb.start(localRowType) + rvb.addRegionValue(localRowType, rv) + val off = rvb.end() + RegionValue(region.copy(), off) + } + } + .persist(level) + + PersistedRVRDD(persistedRDD, + persistedRDD + .mapPartitions { it => + val region = MemoryBuffer() + val rv2 = RegionValue(region) + it.map { rv => + region.setFrom(rv.region) + rv2.setOffset(rv.offset) + rv2 + } + }) + } + + def storageLevel: StorageLevel = StorageLevel.NONE + + def persist(level: StorageLevel): RVD = { + val PersistedRVRDD(persistedRDD, iterationRDD) = persistRVRDD(level) + new RVD { + val rowType: Type = self.rowType + + val rdd: RDD[RegionValue] = iterationRDD + + override def storageLevel: StorageLevel = persistedRDD.getStorageLevel + + override def persist(newLevel: StorageLevel): RVD = { + if (newLevel == StorageLevel.NONE) + unpersist() + else { + persistedRDD.persist(newLevel) + this + } + } + + override def unpersist(): RVD = { + persistedRDD.unpersist() + self + } + } + } + + def cache(): RVD = persist(StorageLevel.MEMORY_ONLY) + + def unpersist(): RVD = this + + def coalesce(maxPartitions: Int, shuffle: Boolean): RVD = RVD(rowType, rdd.coalesce(maxPartitions, shuffle = shuffle)) +} diff --git a/src/main/scala/is/hail/sparkextras/OrderedJoinDistinctIterator.scala b/src/main/scala/is/hail/sparkextras/OrderedJoinDistinctIterator.scala index 3859f96406c..b8408b3ba31 100644 --- a/src/main/scala/is/hail/sparkextras/OrderedJoinDistinctIterator.scala +++ b/src/main/scala/is/hail/sparkextras/OrderedJoinDistinctIterator.scala @@ -1,15 +1,16 @@ package is.hail.sparkextras import is.hail.annotations.{JoinedRegionValue, RegionValue} +import is.hail.rvd.OrderedRVType -abstract class OrderedJoinDistinctIterator(leftTyp: OrderedRDD2Type, rightTyp: OrderedRDD2Type, leftIt: Iterator[RegionValue], +abstract class OrderedJoinDistinctIterator(leftTyp: OrderedRVType, rightTyp: OrderedRVType, leftIt: Iterator[RegionValue], rightIt: Iterator[RegionValue]) extends Iterator[JoinedRegionValue] { private val jrv = JoinedRegionValue() private var lrv: RegionValue = _ private var rrv: RegionValue = if (rightIt.hasNext) rightIt.next() else null private var jrvPresent = false - private val lrKOrd = OrderedRDD2Type.selectUnsafeOrdering(leftTyp.rowType, leftTyp.kRowFieldIdx, rightTyp.rowType, rightTyp.kRowFieldIdx) + private val lrKOrd = OrderedRVType.selectUnsafeOrdering(leftTyp.rowType, leftTyp.kRowFieldIdx, rightTyp.rowType, rightTyp.kRowFieldIdx) def lrCompare(): Int = { assert(lrv != null && rrv != null) @@ -69,7 +70,7 @@ abstract class OrderedJoinDistinctIterator(leftTyp: OrderedRDD2Type, rightTyp: O } } -class OrderedInnerJoinDistinctIterator(leftTyp: OrderedRDD2Type, rightTyp: OrderedRDD2Type, leftIt: Iterator[RegionValue], +class OrderedInnerJoinDistinctIterator(leftTyp: OrderedRVType, rightTyp: OrderedRVType, leftIt: Iterator[RegionValue], rightIt: Iterator[RegionValue]) extends OrderedJoinDistinctIterator(leftTyp, rightTyp, leftIt, rightIt) { def hasNext: Boolean = { @@ -90,7 +91,7 @@ class OrderedInnerJoinDistinctIterator(leftTyp: OrderedRDD2Type, rightTyp: Order } } -class OrderedLeftJoinDistinctIterator(leftTyp: OrderedRDD2Type, rightTyp: OrderedRDD2Type, leftIt: Iterator[RegionValue], +class OrderedLeftJoinDistinctIterator(leftTyp: OrderedRVType, rightTyp: OrderedRVType, leftIt: Iterator[RegionValue], rightIt: Iterator[RegionValue]) extends OrderedJoinDistinctIterator(leftTyp, rightTyp, leftIt, rightIt) { def hasNext: Boolean = { diff --git a/src/main/scala/is/hail/sparkextras/OrderedRDD2.scala b/src/main/scala/is/hail/sparkextras/OrderedRDD2.scala index 7b3e84d6ff6..a18d7f9386c 100644 --- a/src/main/scala/is/hail/sparkextras/OrderedRDD2.scala +++ b/src/main/scala/is/hail/sparkextras/OrderedRDD2.scala @@ -1,19 +1,10 @@ package is.hail.sparkextras -import java.util - import is.hail.annotations._ -import is.hail.expr.{JSONAnnotationImpex, Parser, TArray, TStruct, Type} +import is.hail.rvd.{OrderedRVD, OrderedRVPartitioner} import is.hail.utils._ -import org.apache.commons.lang3.builder.HashCodeBuilder import org.apache.spark._ -import org.apache.spark.rdd.{PartitionCoalescer, RDD, ShuffledRDD} -import org.apache.spark.sql.Row -import org.apache.spark.storage.StorageLevel -import org.json4s.MappingException -import org.json4s.JsonAST._ - -import scala.collection.mutable +import org.apache.spark.rdd.RDD object BinarySearch { // return smallest elem such that key <= elem @@ -48,772 +39,13 @@ object BinarySearch { } } -object WritableRegionValue { - def apply(t: Type, initial: RegionValue): WritableRegionValue = - WritableRegionValue(t, initial.region, initial.offset) - - def apply(t: Type, initialRegion: MemoryBuffer, initialOffset: Long): WritableRegionValue = { - val wrv = WritableRegionValue(t) - wrv.set(initialRegion, initialOffset) - wrv - } - - def apply(t: Type): WritableRegionValue = { - val region = MemoryBuffer() - new WritableRegionValue(t, region, new RegionValueBuilder(region), RegionValue(region, 0)) - } -} - -class WritableRegionValue(val t: Type, - val region: MemoryBuffer, - rvb: RegionValueBuilder, - val value: RegionValue) { - - def offset: Long = value.offset - - def setSelect(fromT: TStruct, toFromFieldIdx: Array[Int], fromRV: RegionValue) { - (t: @unchecked) match { - case t: TStruct => - region.clear() - rvb.start(t) - rvb.startStruct() - var i = 0 - while (i < t.size) { - rvb.addField(fromT, fromRV, toFromFieldIdx(i)) - i += 1 - } - rvb.endStruct() - value.setOffset(rvb.end()) - } - } - - def set(rv: RegionValue): Unit = set(rv.region, rv.offset) - - def set(fromRegion: MemoryBuffer, fromOffset: Long) { - region.clear() - rvb.start(t) - rvb.addRegionValue(t, fromRegion, fromOffset) - value.setOffset(rvb.end()) - } - - def pretty: String = value.pretty(t) -} - -object PartitionKeyInfo2 { - final val UNSORTED = 0 - final val TSORTED = 1 - final val KSORTED = 2 - - def apply(typ: OrderedRDD2Type, sampleSize: Int, partitionIndex: Int, it: Iterator[RegionValue], seed: Int): PartitionKeyInfo2 = { - val minF = WritableRegionValue(typ.pkType) - val maxF = WritableRegionValue(typ.pkType) - val prevF = WritableRegionValue(typ.kType) - - assert(it.hasNext) - val f0 = it.next() - - minF.setSelect(typ.kType, typ.pkKFieldIdx, f0) - maxF.setSelect(typ.kType, typ.pkKFieldIdx, f0) - prevF.set(f0) - - var sortedness = KSORTED - - val rng = new java.util.Random(seed) - val samples = new Array[WritableRegionValue](sampleSize) - - var i = 0 - - if (sampleSize > 0) { - samples(0) = WritableRegionValue(typ.pkType, f0) - i += 1 - } - - while (it.hasNext) { - val f = it.next() - - if (typ.kOrd.compare(f, prevF.value) < 0) { - if (typ.pkInKOrd.compare(f, prevF.value) < 0) - sortedness = UNSORTED - else - sortedness = sortedness.min(TSORTED) - } - - if (typ.pkKOrd.compare(minF.value, f) > 0) - minF.setSelect(typ.kType, typ.pkKFieldIdx, f) - if (typ.pkKOrd.compare(maxF.value, f) < 0) - maxF.setSelect(typ.kType, typ.pkKFieldIdx, f) - - prevF.set(f) - - if (i < sampleSize) - samples(i) = WritableRegionValue(typ.pkType, f) - else { - val j = rng.nextInt(i) - if (j < sampleSize) - samples(j).set(f) - } - - i += 1 - } - - PartitionKeyInfo2(partitionIndex, i, - minF.value, maxF.value, - Array.tabulate[RegionValue](math.min(i, sampleSize))(i => samples(i).value), - sortedness) - } -} - -case class PartitionKeyInfo2( - partitionIndex: Int, - size: Int, - min: RegionValue, - max: RegionValue, - // min, max: RegionValue[pkType] - samples: Array[RegionValue], - sortedness: Int) { - def pretty(t: Type): String = { - s"partitionIndex=$partitionIndex,size=$size,min=${min.pretty(t)},max=${max.pretty(t)},samples=${samples.map(_.pretty(t)).mkString(",")},sortedness=$sortedness" - } -} - -object OrderedRDD2Type { - def selectUnsafeOrdering(t1: TStruct, fields1: Array[Int], - t2: TStruct, fields2: Array[Int]): UnsafeOrdering = { - require(fields1.length == fields2.length) - require((fields1, fields2).zipped.forall { case (f1, f2) => - t1.fieldType(f1) == t2.fieldType(f2) - }) - - val nFields = fields1.length - val fieldOrderings = fields1.map(f1 => t1.fieldType(f1).unsafeOrdering(missingGreatest = true)) - - new UnsafeOrdering { - def compare(r1: MemoryBuffer, o1: Long, r2: MemoryBuffer, o2: Long): Int = { - var i = 0 - while (i < nFields) { - val f1 = fields1(i) - val f2 = fields2(i) - val leftDefined = t1.isFieldDefined(r1, o1, f1) - val rightDefined = t2.isFieldDefined(r2, o2, f2) - - if (leftDefined && rightDefined) { - val c = fieldOrderings(i).compare(r1, t1.loadField(r1, o1, f1), r2, t2.loadField(r2, o2, f2)) - if (c != 0) - return c - } else if (leftDefined != rightDefined) { - val c = if (leftDefined) -1 else 1 - return c - } - - i += 1 - } - - 0 - } - } - } - - def apply(jv: JValue): OrderedRDD2Type = { - case class Extract(partitionKey: Array[String], - key: Array[String], - rowType: String) - val ex = jv.extract[Extract] - new OrderedRDD2Type(ex.partitionKey, ex.key, Parser.parseType(ex.rowType).asInstanceOf[TStruct]) - } -} - -class OrderedRDD2Type( - val partitionKey: Array[String], - val key: Array[String], // full key - val rowType: TStruct) extends Serializable { - assert(key.startsWith(partitionKey)) - - val (pkType, _) = rowType.select(partitionKey) - val (kType, _) = rowType.select(key) - - val keySet: Set[String] = key.toSet - val (valueType, _) = rowType.filter(f => !keySet.contains(f.name)) - - val valueFieldIdx: Array[Int] = (0 until rowType.size) - .filter(i => !keySet.contains(rowType.fields(i).name)) - .toArray - - val kRowFieldIdx: Array[Int] = key.map(n => rowType.fieldIdx(n)) - val pkRowFieldIdx: Array[Int] = partitionKey.map(n => rowType.fieldIdx(n)) - val pkKFieldIdx: Array[Int] = partitionKey.map(n => kType.fieldIdx(n)) - assert(pkKFieldIdx sameElements (0 until pkType.size)) - - val pkOrd: UnsafeOrdering = pkType.unsafeOrdering(missingGreatest = true) - val kOrd: UnsafeOrdering = kType.unsafeOrdering(missingGreatest = true) - - val pkRowOrd: UnsafeOrdering = OrderedRDD2Type.selectUnsafeOrdering(pkType, (0 until pkType.size).toArray, rowType, pkRowFieldIdx) - val pkKOrd: UnsafeOrdering = OrderedRDD2Type.selectUnsafeOrdering(pkType, (0 until pkType.size).toArray, kType, pkKFieldIdx) - val pkInRowOrd: UnsafeOrdering = OrderedRDD2Type.selectUnsafeOrdering(rowType, pkRowFieldIdx, rowType, pkRowFieldIdx) - val kInRowOrd: UnsafeOrdering = OrderedRDD2Type.selectUnsafeOrdering(rowType, kRowFieldIdx, rowType, kRowFieldIdx) - val pkInKOrd: UnsafeOrdering = OrderedRDD2Type.selectUnsafeOrdering(kType, pkKFieldIdx, kType, pkKFieldIdx) - val kRowOrd: UnsafeOrdering = OrderedRDD2Type.selectUnsafeOrdering(kType, (0 until kType.size).toArray, rowType, kRowFieldIdx) - - def insert(typeToInsert: Type, path: List[String]): (OrderedRDD2Type, UnsafeInserter) = { - assert(path.nonEmpty) - assert(!key.contains(path.head)) - - val (newRowType, inserter) = rowType.unsafeInsert(typeToInsert, path) - - (new OrderedRDD2Type(partitionKey, key, newRowType.asInstanceOf[TStruct]), inserter) - } - - def toJSON: JValue = - JObject(List( - "partitionKey" -> JArray(partitionKey.map(JString).toList), - "key" -> JArray(key.map(JString).toList), - "rowType" -> JString(rowType.toString))) - - override def equals(that: Any): Boolean = that match { - case that: OrderedRDD2Type => - (partitionKey sameElements that.partitionKey) && - (key sameElements that.key) && - rowType == that.rowType - case _ => false - } - - override def hashCode: Int = { - val b = new HashCodeBuilder(43, 19) - b.append(partitionKey.length) - partitionKey.foreach(b.append) - - b.append(key.length) - key.foreach(b.append) - - b.append(rowType) - b.toHashCode - } -} - -object OrderedRDD2 { - type CoercionMethod = Int - - final val ORDERED_PARTITIONER: CoercionMethod = 0 - final val AS_IS: CoercionMethod = 1 - final val LOCAL_SORT: CoercionMethod = 2 - final val SHUFFLE: CoercionMethod = 3 - - def empty(sc: SparkContext, typ: OrderedRDD2Type): OrderedRDD2 = { - OrderedRDD2(typ, - OrderedPartitioner2.empty(typ), - sc.emptyRDD[RegionValue]) - } - - def cast(typ: OrderedRDD2Type, - rdd: RDD[RegionValue]): OrderedRDD2 = { - if (rdd.partitions.isEmpty) - OrderedRDD2.empty(rdd.sparkContext, typ) - else - rdd match { - case ordered: OrderedRDD2 => ordered.asInstanceOf[OrderedRDD2] - case _ => - (rdd.partitioner: @unchecked) match { - case Some(p: OrderedPartitioner2) => OrderedRDD2(typ, p.asInstanceOf[OrderedPartitioner2], rdd) - } - } - } - - def apply(typ: OrderedRDD2Type, - rdd: RDD[RegionValue], fastKeys: Option[RDD[RegionValue]], hintPartitioner: Option[OrderedPartitioner2]): OrderedRDD2 = { - val (_, orderedRDD) = coerce(typ, rdd, fastKeys, hintPartitioner) - orderedRDD - } - - /** - * Precondition: the iterator it is PK-sorted. We lazily K-sort each block - * of PK-equivalent elements. - */ - def localKeySort(typ: OrderedRDD2Type, - // it: Iterator[RegionValue[rowType]] - it: Iterator[RegionValue]): Iterator[RegionValue] = { - new Iterator[RegionValue] { - private val bit = it.buffered - - private val q = new mutable.PriorityQueue[RegionValue]()(typ.kInRowOrd.reverse) - - def hasNext: Boolean = bit.hasNext || q.nonEmpty - - def next(): RegionValue = { - if (q.isEmpty) { - do { - val rv = bit.next() - // FIXME ugh, no good answer here - q.enqueue(RegionValue( - rv.region.copy(), - rv.offset)) - } while (bit.hasNext && typ.pkInRowOrd.compare(q.head, bit.head) == 0) - } - - val rv = q.dequeue() - rv - } - } - } - - // getKeys: RDD[RegionValue[kType]] - def getKeys(typ: OrderedRDD2Type, - // rdd: RDD[RegionValue[rowType]] - rdd: RDD[RegionValue]): RDD[RegionValue] = { - rdd.mapPartitions { it => - val wrv = WritableRegionValue(typ.kType) - it.map { rv => - wrv.setSelect(typ.rowType, typ.kRowFieldIdx, rv) - wrv.value - } - } - } - - def getPartitionKeyInfo(typ: OrderedRDD2Type, - // keys: RDD[kType] - keys: RDD[RegionValue]): Array[PartitionKeyInfo2] = { - val nPartitions = keys.getNumPartitions - - val rng = new java.util.Random(1) - val partitionSeed = Array.tabulate[Int](nPartitions)(i => rng.nextInt()) - - val sampleSize = math.min(nPartitions * 20, 1000000) - val samplesPerPartition = sampleSize / nPartitions - - val pkis = keys.mapPartitionsWithIndex { case (i, it) => - if (it.hasNext) - Iterator(PartitionKeyInfo2(typ, samplesPerPartition, i, it, partitionSeed(i))) - else - Iterator() - }.collect() - - pkis.sortBy(_.min)(typ.pkOrd) - } - - def coerce(typ: OrderedRDD2Type, - // rdd: RDD[RegionValue[rowType]] - rdd: RDD[RegionValue], - // fastKeys: Option[RDD[RegionValue[kType]]] - fastKeys: Option[RDD[RegionValue]] = None, - hintPartitioner: Option[OrderedPartitioner2] = None): (CoercionMethod, OrderedRDD2) = { - val sc = rdd.sparkContext - - if (rdd.partitions.isEmpty) - return (ORDERED_PARTITIONER, empty(sc, typ)) - - rdd match { - case ordd: OrderedRDD2 => - return (ORDERED_PARTITIONER, ordd.asInstanceOf[OrderedRDD2]) - case _ => - } - - // keys: RDD[RegionValue[kType]] - val keys = fastKeys.getOrElse(getKeys(typ, rdd)) - - val pkis = getPartitionKeyInfo(typ, keys) - - if (pkis.isEmpty) - return (AS_IS, empty(sc, typ)) - - val partitionsSorted = (pkis, pkis.tail).zipped.forall { case (p, pnext) => - val r = typ.pkOrd.lteq(p.max, pnext.min) - if (!r) - log.info(s"not sorted: p = $p, pnext = $pnext") - r - } - - val sortedness = pkis.map(_.sortedness).min - if (partitionsSorted && sortedness >= PartitionKeyInfo.TSORTED) { - val (adjustedPartitions, rangeBounds, adjSortedness) = rangesAndAdjustments(typ, pkis, sortedness) - - val unsafeRangeBounds = UnsafeIndexedSeq(TArray(typ.pkType), rangeBounds) - val partitioner = new OrderedPartitioner2(adjustedPartitions.length, - typ.partitionKey, - typ.kType, - unsafeRangeBounds) - - val reorderedPartitionsRDD = rdd.reorderPartitions(pkis.map(_.partitionIndex)) - val adjustedRDD = new AdjustedPartitionsRDD(reorderedPartitionsRDD, adjustedPartitions) - (adjSortedness: @unchecked) match { - case PartitionKeyInfo.KSORTED => - info("Coerced sorted dataset") - (AS_IS, OrderedRDD2(typ, - partitioner, - adjustedRDD)) - - case PartitionKeyInfo.TSORTED => - info("Coerced almost-sorted dataset") - (LOCAL_SORT, OrderedRDD2(typ, - partitioner, - adjustedRDD.mapPartitions { it => - localKeySort(typ, it) - })) - } - } else { - info("Ordering unsorted dataset with network shuffle") - val p = hintPartitioner - .filter(_.numPartitions >= rdd.partitions.length) - .getOrElse { - val ranges = calculateKeyRanges(typ, pkis, rdd.getNumPartitions) - new OrderedPartitioner2(ranges.length + 1, typ.partitionKey, typ.kType, ranges) - } - (SHUFFLE, shuffle(typ, p, rdd)) - } - } - - def calculateKeyRanges(typ: OrderedRDD2Type, pkis: Array[PartitionKeyInfo2], nPartitions: Int): UnsafeIndexedSeq = { - val keys = pkis - .flatMap(_.samples) - .sorted(typ.pkOrd) - - // FIXME weighted - val rangeBounds = - if (keys.length <= nPartitions) - keys.init - else { - val k = keys.length / nPartitions - assert(k > 0) - Array.tabulate(nPartitions - 1)(i => keys((i + 1) * k)) - } - - UnsafeIndexedSeq(TArray(typ.pkType), rangeBounds) - } - - def shuffle(typ: OrderedRDD2Type, - partitioner: OrderedPartitioner2, - rdd: RDD[RegionValue]): OrderedRDD2 = { - OrderedRDD2(typ, - partitioner, - new ShuffledRDD[RegionValue, RegionValue, RegionValue]( - rdd.mapPartitions { it => - val wkrv = WritableRegionValue(typ.kType) - it.map { rv => - wkrv.setSelect(typ.rowType, typ.kRowFieldIdx, rv) - (wkrv.value, rv) - } - }, - partitioner) - .setKeyOrdering(typ.kOrd) - .mapPartitionsWithIndex { case (i, it) => - it.map { case (k, v) => - assert(partitioner.getPartition(k) == i) - v - } - }) - } - - def rangesAndAdjustments(typ: OrderedRDD2Type, - sortedKeyInfo: Array[PartitionKeyInfo2], - sortedness: Int): (IndexedSeq[Array[Adjustment[RegionValue]]], Array[RegionValue], Int) = { - - val rangeBounds = new ArrayBuilder[RegionValue]() - val adjustmentsBuffer = new mutable.ArrayBuffer[Array[Adjustment[RegionValue]]] - val indicesBuilder = new ArrayBuilder[Int]() - - var anyOverlaps = false - - val it = sortedKeyInfo.indices.iterator.buffered - - while (it.nonEmpty) { - indicesBuilder.clear() - val i = it.next() - val thisP = sortedKeyInfo(i) - val min = thisP.min - val max = thisP.max - - indicesBuilder += i - - var continue = true - while (continue && it.hasNext && typ.pkOrd.equiv(sortedKeyInfo(it.head).min, max)) { - anyOverlaps = true - if (typ.pkOrd.equiv(sortedKeyInfo(it.head).max, max)) - indicesBuilder += it.next() - else { - indicesBuilder += it.head - continue = false - } - } - - val adjustments = indicesBuilder.result().zipWithIndex.map { case (partitionIndex, index) => - assert(sortedKeyInfo(partitionIndex).sortedness >= PartitionKeyInfo.TSORTED) - val f: (Iterator[RegionValue]) => Iterator[RegionValue] = - // In the first partition, drop elements that should go in the last if necessary - if (index == 0) - if (adjustmentsBuffer.nonEmpty && typ.pkOrd.equiv(min, sortedKeyInfo(adjustmentsBuffer.last.head.index).max)) - _.dropWhile(rv => typ.pkRowOrd.compare(min.region, min.offset, rv) == 0) - else - identity - else - // In every subsequent partition, only take elements that are the max of the last - _.takeWhile(rv => typ.pkRowOrd.compare(max.region, max.offset, rv) == 0) - Adjustment(partitionIndex, f) - } - - adjustmentsBuffer += adjustments - - if (it.hasNext) - rangeBounds += max - } - - val adjSortedness = if (anyOverlaps) - sortedness.min(PartitionKeyInfo.TSORTED) - else - sortedness - - (adjustmentsBuffer, rangeBounds.result(), adjSortedness) - } - - def apply(typ: OrderedRDD2Type, - partitioner: OrderedPartitioner2, - rdd: RDD[RegionValue]): OrderedRDD2 = { - val sc = rdd.sparkContext - - new OrderedRDD2(typ, partitioner, new RegionValueRDD( - rdd.mapPartitionsWithIndex { case (i, it) => - val prev = WritableRegionValue(typ.pkType) - - new Iterator[RegionValue] { - var first = true - - def hasNext: Boolean = it.hasNext - - def next(): RegionValue = { - val rv = it.next() - - if (i < partitioner.rangeBounds.length) { - assert(typ.pkRowOrd.compare( - partitioner.region, partitioner.loadElement(i), - rv) >= 0) - } - if (i > 0) - assert(typ.pkRowOrd.compare(partitioner.region, partitioner.loadElement(i - 1), - rv) < 0) - - if (first) - first = false - else - assert(typ.pkRowOrd.compare(prev.value, rv) <= 0) - - prev.setSelect(typ.rowType, typ.pkRowFieldIdx, rv) - - assert(typ.pkRowOrd.compare(prev.value, rv) == 0) - - rv - } - } - }, typ.rowType)) - } -} - -class OrderedRDD2 private( - val typ: OrderedRDD2Type, - @transient val orderedPartitioner: OrderedPartitioner2, - val rvrdd: RegionValueRDD) extends RDD[RegionValue](rvrdd.rdd) { - - val rdd = rvrdd.rdd - - def this(partitionKey: String, key: String, rowType: TStruct, - orderedPartitioner: OrderedPartitioner2, - rvrdd: RegionValueRDD) = this(new OrderedRDD2Type(Array(partitionKey), Array(partitionKey, key), rowType), - orderedPartitioner, rvrdd) - - @transient override val partitioner: Option[Partitioner] = Some(orderedPartitioner) - - override def getPartitions: Array[Partition] = rdd.partitions - - override def compute(split: Partition, context: TaskContext): Iterator[RegionValue] = rdd.iterator(split, context) - - override def getPreferredLocations(split: Partition): Seq[String] = rdd.preferredLocations(split) - - def insert[PC](newContext: () => PC)(typeToInsert: Type, - path: List[String], - // rv argument to add is the entire row - add: (PC, RegionValue, RegionValueBuilder) => Unit): OrderedRDD2 = { - - val (insTyp, inserter) = typ.insert(typeToInsert, path) - OrderedRDD2(insTyp, - orderedPartitioner, - rdd.mapPartitions { it => - val c = newContext() - val rv2b = new RegionValueBuilder() - val rv2 = RegionValue() - - it.map { rv => - val ur = new UnsafeRow(typ.rowType, rv) - rv2b.set(rv.region) - rv2b.start(insTyp.rowType) - inserter(rv.region, rv.offset, rv2b, () => add(c, rv, rv2b)) - rv2.set(rv.region, rv2b.end()) - rv2 - } - }) - } - - def mapPreservesPartitioning(f: (RegionValue) => RegionValue): OrderedRDD2 = - OrderedRDD2(typ, - orderedPartitioner, - rdd.map(f)) - - def mapPartitionsPreservesPartitioning(f: (Iterator[RegionValue]) => Iterator[RegionValue]): OrderedRDD2 = - OrderedRDD2(typ, - orderedPartitioner, - rdd.mapPartitions(f)) - - def mapPartitionsPreservesPartitioning(newTyp: OrderedRDD2Type)(f: (Iterator[RegionValue]) => Iterator[RegionValue]): OrderedRDD2 = - OrderedRDD2(newTyp, - orderedPartitioner, - rdd.mapPartitions(f)) - - override def filter(p: (RegionValue) => Boolean): OrderedRDD2 = - OrderedRDD2(typ, - orderedPartitioner, - rdd.filter(p)) - - override def sample(withReplacement: Boolean, fraction: Double, seed: Long): OrderedRDD2 = - OrderedRDD2(typ, - orderedPartitioner, - rdd.sample(withReplacement, fraction, seed)) - - def getStorageLevel2: StorageLevel = rvrdd.getStorageLevel2 - - def unpersist2() = copy(rvrdd.unpersist2()) - - def persist2(level: StorageLevel): OrderedRDD2 = copy(rvrdd.persist2(level)) - - def orderedJoinDistinct(right: OrderedRDD2, joinType: String): RDD[JoinedRegionValue] = { - val lTyp = typ - val rTyp = right.typ - - if (lTyp.kType != rTyp.kType) - fatal( - s"""Incompatible join keys. Keys must have same length and types, in order: - | Left key type: ${ lTyp.kType.toPrettyString(compact = true) } - | Right key type: ${ rTyp.kType.toPrettyString(compact = true) } - """.stripMargin) - - joinType match { - case "inner" | "left" => new OrderedJoinDistinctRDD2(this, right, joinType) - case _ => fatal(s"Unknown join type `$joinType'. Choose from `inner' or `left'.") - } - } - - def partitionSortedUnion(rdd2: OrderedRDD2): OrderedRDD2 = { - assert(typ == rdd2.typ) - assert(orderedPartitioner == rdd2.orderedPartitioner) - - OrderedRDD2(typ, orderedPartitioner, - rdd.zipPartitions(rdd2.rdd) { case (it, it2) => - new Iterator[RegionValue] { - private val bit = it.buffered - private val bit2 = it2.buffered - - def hasNext: Boolean = bit.hasNext || bit2.hasNext - - def next(): RegionValue = { - if (!bit.hasNext) - bit2.next() - else if (!bit2.hasNext) - bit.next() - else { - val c = typ.kInRowOrd.compare(bit.head, bit2.head) - if (c < 0) - bit.next() - else - bit2.next() - } - } - } - }) - } - - private def copy(rvrdd: RegionValueRDD): OrderedRDD2 = new OrderedRDD2(typ, orderedPartitioner, rvrdd) - - def copy(typ: OrderedRDD2Type = typ, - orderedPartitioner: OrderedPartitioner2 = orderedPartitioner, - rdd: RDD[RegionValue] = rvrdd.rdd): OrderedRDD2 = { - OrderedRDD2(typ, orderedPartitioner, rdd) - } - - def naiveCoalesce(maxPartitions: Int): OrderedRDD2 = { - val n = orderedPartitioner.numPartitions - if (maxPartitions >= n) - return this - - val newN = maxPartitions - val newNParts = Array.tabulate(newN)(i => (n - i + newN - 1) / newN) - assert(newNParts.sum == n) - assert(newNParts.forall(_ > 0)) - - val newPartEnd = newNParts.scanLeft(-1)( _ + _ ).tail - assert(newPartEnd.last == n - 1) - - val newRangeBounds = UnsafeIndexedSeq( - TArray(typ.pkType), - newPartEnd.init.map(orderedPartitioner.rangeBounds)) - - OrderedRDD2( - typ, - new OrderedPartitioner2(newN, typ.partitionKey, typ.kType, newRangeBounds), - new BlockedRDD(rdd, newPartEnd)) - } - - override def coalesce(maxPartitions: Int, shuffle: Boolean, partitionCoalescer: Option[PartitionCoalescer]) - (implicit ord: Ordering[RegionValue]): OrderedRDD2 = { - require(maxPartitions > 0, "cannot coalesce to nPartitions <= 0") - val n = rdd.partitions.length - if (!shuffle && maxPartitions >= n) - return this - if (shuffle) { - val shuffled = super.coalesce(maxPartitions, shuffle) - val ranges = OrderedRDD2.calculateKeyRanges(typ, OrderedRDD2.getPartitionKeyInfo(typ, shuffled), shuffled.getNumPartitions) - OrderedRDD2.shuffle(typ, new OrderedPartitioner2(ranges.length + 1, typ.partitionKey, typ.kType, ranges), shuffled) - } else { - - val partSize = rdd.context.runJob(rdd, getIteratorSize _) - log.info(s"partSize = ${ partSize.toSeq }") - - val partCumulativeSize = mapAccumulate[Array, Long, Long, Long](partSize, 0)((s, acc) => (s + acc, s + acc)) - val totalSize = partCumulativeSize.last - - var newPartEnd = (0 until maxPartitions).map { i => - val t = totalSize * (i + 1) / maxPartitions - - /* j largest index not greater than t */ - var j = util.Arrays.binarySearch(partCumulativeSize, t) - if (j < 0) - j = -j - 1 - while (j < partCumulativeSize.length - 1 - && partCumulativeSize(j + 1) == t) - j += 1 - assert(t <= partCumulativeSize(j) && - (j == partCumulativeSize.length - 1 || - t < partCumulativeSize(j + 1))) - j - }.toArray - - newPartEnd = newPartEnd.zipWithIndex.filter { case (end, i) => i == 0 || newPartEnd(i) != newPartEnd(i - 1) } - .map(_._1) - - info(s"newPartEnd = ${ newPartEnd.toSeq }") - - assert(newPartEnd.last == n - 1) - assert(newPartEnd.zip(newPartEnd.tail).forall { case (i, inext) => i < inext }) - - if (newPartEnd.length < maxPartitions) - warn(s"coalesced to ${ newPartEnd.length } ${ plural(newPartEnd.length, "partition") }, less than requested $maxPartitions") - - val newRangeBounds = newPartEnd.init.map(orderedPartitioner.rangeBounds).asInstanceOf[UnsafeIndexedSeq] - val partitioner = new OrderedPartitioner2(newRangeBounds.length + 1, typ.partitionKey, typ.kType, newRangeBounds) - OrderedRDD2(typ, partitioner, new BlockedRDD(rdd, newPartEnd)) - } - } -} - -class OrderedDependency2(left: OrderedRDD2, right: OrderedRDD2) extends NarrowDependency[RegionValue](right) { +class OrderedDependency2(left: OrderedRVD, right: OrderedRVD) extends NarrowDependency[RegionValue](right.rdd) { override def getParents(partitionId: Int): Seq[Int] = - OrderedDependency2.getDependencies(left.orderedPartitioner, right.orderedPartitioner)(partitionId) + OrderedDependency2.getDependencies(left.partitioner, right.partitioner)(partitionId) } object OrderedDependency2 { - def getDependencies(p1: OrderedPartitioner2, p2: OrderedPartitioner2)(partitionId: Int): Range = { + def getDependencies(p1: OrderedRVPartitioner, p2: OrderedRVPartitioner)(partitionId: Int): Range = { val lastPartition = if (partitionId == p1.rangeBounds.length) p2.numPartitions - 1 else @@ -830,30 +62,30 @@ object OrderedDependency2 { case class OrderedJoinDistinctRDD2Partition(index: Int, leftPartition: Partition, rightPartitions: Array[Partition]) extends Partition -class OrderedJoinDistinctRDD2(left: OrderedRDD2, right: OrderedRDD2, joinType: String) +class OrderedJoinDistinctRDD2(left: OrderedRVD, right: OrderedRVD, joinType: String) extends RDD[JoinedRegionValue](left.sparkContext, - Seq[Dependency[_]](new OneToOneDependency(left), + Seq[Dependency[_]](new OneToOneDependency(left.rdd), new OrderedDependency2(left, right))) { assert(joinType == "left" || joinType == "inner") - override val partitioner: Option[Partitioner] = left.partitioner + override val partitioner: Option[Partitioner] = Some(left.partitioner) def getPartitions: Array[Partition] = { Array.tabulate[Partition](left.getNumPartitions)(i => OrderedJoinDistinctRDD2Partition(i, left.partitions(i), - OrderedDependency2.getDependencies(left.orderedPartitioner, right.orderedPartitioner)(i) + OrderedDependency2.getDependencies(left.partitioner, right.partitioner)(i) .map(right.partitions) .toArray)) } - override def getPreferredLocations(split: Partition): Seq[String] = left.preferredLocations(split) + override def getPreferredLocations(split: Partition): Seq[String] = left.rdd.preferredLocations(split) override def compute(split: Partition, context: TaskContext): Iterator[JoinedRegionValue] = { val partition = split.asInstanceOf[OrderedJoinDistinctRDD2Partition] - val leftIt = left.iterator(partition.leftPartition, context) + val leftIt = left.rdd.iterator(partition.leftPartition, context) val rightIt = partition.rightPartitions.iterator.flatMap { p => - right.iterator(p, context) + right.rdd.iterator(p, context) } joinType match { @@ -863,91 +95,3 @@ class OrderedJoinDistinctRDD2(left: OrderedRDD2, right: OrderedRDD2, joinType: S } } } - -object OrderedPartitioner2 { - def empty(typ: OrderedRDD2Type): OrderedPartitioner2 = { - new OrderedPartitioner2(0, typ.partitionKey, typ.kType, UnsafeIndexedSeq.empty(TArray(typ.pkType))) - } - - def apply(sc: SparkContext, jv: JValue): OrderedPartitioner2 = { - case class Extract(numPartitions: Int, - partitionKey: Array[String], - kType: String, - rangeBounds: JValue) - val ex = jv.extract[Extract] - - val partitionKey = ex.partitionKey - val kType = Parser.parseType(ex.kType).asInstanceOf[TStruct] - val (pkType, _) = kType.select(partitionKey) - - val rangeBoundsType = TArray(pkType) - new OrderedPartitioner2(ex.numPartitions, - ex.partitionKey, - kType, - UnsafeIndexedSeq( - rangeBoundsType, - JSONAnnotationImpex.importAnnotation(ex.rangeBounds, rangeBoundsType).asInstanceOf[IndexedSeq[Annotation]])) - } -} - -class OrderedPartitioner2( - val numPartitions: Int, - val partitionKey: Array[String], val kType: TStruct, - // rangeBounds is partition max, sorted ascending - // rangeBounds: Array[pkType] - val rangeBounds: UnsafeIndexedSeq) extends Partitioner { - require((numPartitions == 0 && rangeBounds.isEmpty) || numPartitions == rangeBounds.length + 1, - s"nPartitions = $numPartitions, ranges = ${ rangeBounds.length }") - - val (pkType, _) = kType.select(partitionKey) - - val pkKFieldIdx: Array[Int] = partitionKey.map(n => kType.fieldIdx(n)) - val pkKOrd: UnsafeOrdering = OrderedRDD2Type.selectUnsafeOrdering(pkType, (0 until pkType.size).toArray, kType, pkKFieldIdx) - - val rangeBoundsType = TArray(pkType) - assert(rangeBoundsType.typeCheck(rangeBounds)) - - val ordering: Ordering[Annotation] = pkType.ordering(missingGreatest = true) - require(rangeBounds.isEmpty || rangeBounds.zip(rangeBounds.tail).forall { case (left, right) => ordering.compare(left, right) < 0 }) - - def region: MemoryBuffer = rangeBounds.region - - def loadElement(i: Int): Long = rangeBoundsType.loadElement(rangeBounds.region, rangeBounds.aoff, rangeBounds.length, i) - - // return the smallest partition for which key <= max - // pk: Annotation[pkType] - def getPartitionPK(pk: Any): Int = { - assert(pkType.typeCheck(pk)) - - val part = BinarySearch.binarySearch(numPartitions, - // key.compare(elem) - i => - if (i == numPartitions - 1) - -1 // key.compare(inf) - else - ordering.compare(pk, rangeBounds(i))) - part - } - - // return the smallest partition for which key <= max - // key: RegionValue[kType] - def getPartition(key: Any): Int = { - val keyrv = key.asInstanceOf[RegionValue] - - val part = BinarySearch.binarySearch(numPartitions, - // key.compare(elem) - i => - if (i == numPartitions - 1) - -1 // key.compare(inf) - else - -pkKOrd.compare(rangeBounds.region, loadElement(i), keyrv)) - part - } - - def toJSON: JValue = - JObject(List( - "numPartitions" -> JInt(numPartitions), - "partitionKey" -> JArray(partitionKey.map(n => JString(n)).toList), - "kType" -> JString(kType.toPrettyString(compact = true)), - "rangeBounds" -> JSONAnnotationImpex.exportAnnotation(rangeBounds, rangeBoundsType))) -} diff --git a/src/main/scala/is/hail/utils/SortedDistinctRowIterator.scala b/src/main/scala/is/hail/utils/SortedDistinctRowIterator.scala index 29d2df434ed..c314cc4536b 100644 --- a/src/main/scala/is/hail/utils/SortedDistinctRowIterator.scala +++ b/src/main/scala/is/hail/utils/SortedDistinctRowIterator.scala @@ -2,14 +2,15 @@ package is.hail.utils import is.hail.expr._ import is.hail.annotations._ +import is.hail.rvd.OrderedRVType import is.hail.sparkextras._ object SortedDistinctRowIterator { - def transformer(ort: OrderedRDD2Type): Iterator[RegionValue] => SortedDistinctRowIterator = + def transformer(ort: OrderedRVType): Iterator[RegionValue] => SortedDistinctRowIterator = new SortedDistinctRowIterator(ort, _) } -class SortedDistinctRowIterator(ort: OrderedRDD2Type, it: Iterator[RegionValue]) extends Iterator[RegionValue] { +class SortedDistinctRowIterator(ort: OrderedRVType, it: Iterator[RegionValue]) extends Iterator[RegionValue] { private val bit = it.buffered private val wrv: WritableRegionValue = WritableRegionValue(ort.rowType) diff --git a/src/main/scala/is/hail/variant/VariantDataset.scala b/src/main/scala/is/hail/variant/VariantDataset.scala index a4223b4fc2a..524d6255fb7 100644 --- a/src/main/scala/is/hail/variant/VariantDataset.scala +++ b/src/main/scala/is/hail/variant/VariantDataset.scala @@ -5,14 +5,15 @@ import is.hail.expr.{EvalContext, Parser, TAggregable, TString, TStruct, Type, _ import is.hail.io.plink.ExportBedBimFam import is.hail.keytable.KeyTable import is.hail.methods._ -import is.hail.sparkextras.OrderedRDD2 +import is.hail.rvd.OrderedRVD import is.hail.stats.ComputeRRM import is.hail.utils._ import org.apache.spark.sql.Row import org.apache.spark.storage.StorageLevel -import scala.collection.JavaConverters._ +import scala.collection.JavaConverters._ import scala.language.implicitConversions +import scala.language.existentials object VariantDataset { @@ -108,9 +109,9 @@ class VariantDatasetFunctions(private val vsm: VariantSampleMatrix) extends AnyV val newMatrixType = vsm.matrixType.copy(vaType = newType) val newRowType = newMatrixType.rowType - val newRDD2 = OrderedRDD2( - newMatrixType.orderedRDD2Type, - vsm.rdd2.orderedPartitioner, + val newRDD2 = OrderedRVD( + newMatrixType.orderedRVType, + vsm.rdd2.partitioner, vsm.rdd2.mapPartitions { it => val splitcontext = new SplitMultiPartitionContext(true, localNSamples, localGlobalAnnotation, localRowType, localVAnnotator, localGAnnotator, splitRowType) @@ -430,7 +431,7 @@ g = let newgt = gtIndex(oldToNew[gtj(g.GT)], oldToNew[gtk(g.GT)]) and } else { val localRowType = vsm.rowType vsm.copy2( - rdd2 = vsm.rdd2.mapPreservesPartitioning { rv => + rdd2 = vsm.rdd2.mapPreservesPartitioning(vsm.rdd2.typ) { rv => val ur = new UnsafeRow(localRowType, rv.region, rv.offset) val v = ur.getAs[Variant](1) if (!v.isBiallelic) diff --git a/src/main/scala/is/hail/variant/VariantSampleMatrix.scala b/src/main/scala/is/hail/variant/VariantSampleMatrix.scala index 55d36a7ebcf..851983171f5 100644 --- a/src/main/scala/is/hail/variant/VariantSampleMatrix.scala +++ b/src/main/scala/is/hail/variant/VariantSampleMatrix.scala @@ -11,6 +11,7 @@ import is.hail.keytable.KeyTable import is.hail.methods.Aggregators.SampleFunctions import is.hail.methods._ import is.hail.sparkextras._ +import is.hail.rvd.{OrderedRVD, OrderedRVType} import is.hail.utils._ import is.hail.{HailContext, utils} import org.apache.hadoop @@ -358,7 +359,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, def this(hc: HailContext, metadata: VSMMetadata, localValue: VSMLocalValue, - rdd2: OrderedRDD2) = + rdd2: OrderedRVD) = this(hc, metadata, MatrixLiteral( MatrixType(metadata), @@ -860,7 +861,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, def nPartitions: Int = rdd2.partitions.length - def annotateVariants2(rightRDD2: OrderedRDD2, newVAType: Type, inserter: Inserter): VariantSampleMatrix = { + def annotateVariants2(rightRDD2: OrderedRVD, newVAType: Type, inserter: Inserter): VariantSampleMatrix = { val leftRowType = rowType val rightRowType = rightRDD2.typ.rowType @@ -869,9 +870,9 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, copy2( vaSignature = newVAType, - rdd2 = OrderedRDD2( - newMatrixType.orderedRDD2Type, - rdd2.orderedPartitioner, + rdd2 = OrderedRVD( + newMatrixType.orderedRVType, + rdd2.partitioner, rdd2.orderedJoinDistinct(rightRDD2, "left") .mapPartitions { it => val rvb = new RegionValueBuilder() @@ -953,7 +954,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, def variants: RDD[Annotation] = rdd.keys def deduplicate(): VariantSampleMatrix = - copy2(rdd2 = rdd2.mapPartitionsPreservesPartitioning( + copy2(rdd2 = rdd2.mapPartitionsPreservesPartitioning(rdd2.typ)( SortedDistinctRowIterator.transformer(rdd2.typ))) def deleteVA(args: String*): (Type, Deleter) = deleteVA(args.toList) @@ -963,7 +964,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, def dropSamples(): VariantSampleMatrix = copyAST(ast = FilterSamples(ast, Const(null, false, TBoolean()))) - def dropVariants(): VariantSampleMatrix = copy2(rdd2 = OrderedRDD2.empty(sparkContext, matrixType.orderedRDD2Type)) + def dropVariants(): VariantSampleMatrix = copy2(rdd2 = OrderedRVD.empty(sparkContext, matrixType.orderedRVType)) def expand(): RDD[(Annotation, Annotation, Annotation)] = mapWithKeys[(Annotation, Annotation, Annotation)]((v, s, g) => (v, s, g)) @@ -995,10 +996,10 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, if (keys == null) None else - keys.iterator.map{ va => + keys.iterator.map { va => region2.clear() rv2b.start(newRowType) - inserter(rv.region, rv.offset, rv2b, {() => + inserter(rv.region, rv.offset, rv2b, { () => rv2b.addAnnotation(keyType, va) }) rv2.setOffset(rv2b.end()) @@ -1007,7 +1008,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, } } val newMatrixType = matrixType.copy(vaType = newVAType) - copy2(vaSignature = newVAType, rdd2 = rdd2.copy(typ = newMatrixType.orderedRDD2Type, rdd = explodedRDD)) + copy2(vaSignature = newVAType, rdd2 = rdd2.copy(typ = newMatrixType.orderedRVType, rdd = explodedRDD)) } def explodeSamples(code: String): VariantSampleMatrix = { @@ -1018,14 +1019,15 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, case TSet(e, _) => e case t => fatal(s"Expected annotation of type Array or Set; found $t") } - val keys = sampleAnnotations.map{ sa => { + val keys = sampleAnnotations.map { sa => { val ks = querier(sa).asInstanceOf[IndexedSeq[Any]] if (ks == null) IndexedSeq.empty[Any] else ks - }} - val sampleMap = (0 until nSamples).flatMap {i => keys(i).iterator.map{ _ => i }} + } + } + val sampleMap = (0 until nSamples).flatMap { i => keys(i).iterator.map { _ => i } } val localRowType = rowType val localGSsig = rowType.fieldType(3).asInstanceOf[TArray] @@ -1044,7 +1046,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, var i = 0 while (i < 3) { rv2b.addRegionValue(localRowType.fieldType(i), rv.region, localRowType.loadField(rv, i)) - i+= 1 + i += 1 } rv2b.startArray(newSampleIds.length) i = 0 @@ -1455,7 +1457,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, copy2(sampleIds = newSampleIds, sampleAnnotations = sampleAnnotations ++ right.sampleAnnotations, - rdd2 = OrderedRDD2(rdd2.typ, rdd2.orderedPartitioner, joined)) + rdd2 = OrderedRVD(rdd2.typ, rdd2.partitioner, joined)) } def makeKT(variantCondition: String, genotypeCondition: String, keyNames: Array[String] = Array.empty, seperator: String = "."): KeyTable = { @@ -2005,7 +2007,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, VSMLocalValue(globalAnnotation, sampleIds, sampleAnnotations), rdd) - def copy2(rdd2: OrderedRDD2 = rdd2, + def copy2(rdd2: OrderedRVD = rdd2, sampleIds: IndexedSeq[Annotation] = sampleIds, sampleAnnotations: IndexedSeq[Annotation] = sampleAnnotations, globalAnnotation: Annotation = globalAnnotation, @@ -2043,12 +2045,12 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, Array("s")) } - def storageLevel: String = rdd2.getStorageLevel2.toReadableString() + def storageLevel: String = rdd2.storageLevel.toReadableString() def summarize(): SummaryResult = { val localRowType = rowType val localNSamples = nSamples - rdd2.aggregateWithContext( () => + rdd2.aggregateWithContext(() => (HardCallView(localRowType), new RegionValueVariant(localRowType.fieldType(1).asInstanceOf[TVariant])) )(new SummaryCombiner)( @@ -2231,12 +2233,12 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, fatal(s"unknown StorageLevel `$storageLevel'") } - copy2(rdd2 = rdd2.persist2(level)) + copy2(rdd2 = rdd2.persist(level)) } def cache(): VariantSampleMatrix = persist("MEMORY_ONLY") - def unpersist(): VariantSampleMatrix = copy2(rdd2 = rdd2.unpersist2()) + def unpersist(): VariantSampleMatrix = copy2(rdd2 = rdd2.unpersist()) def naiveCoalesce(maxPartitions: Int): VariantSampleMatrix = copy2(rdd2 = rdd2.naiveCoalesce(maxPartitions)) @@ -2284,10 +2286,10 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, writeMetadata(dirname, rdd2.partitions.length) hadoopConf.writeTextFile(dirname + "/partitioner.json.gz") { out => - Serialization.write(rdd2.orderedPartitioner.toJSON, out) + Serialization.write(rdd2.partitioner.toJSON, out) } - rdd2.writeRows(dirname, rowType) + rdd2.rdd.writeRows(dirname, rowType) } def linreg(ysExpr: Array[String], xExpr: String, covExpr: Array[String] = Array.empty[String], root: String = "va.linreg", variantBlockSize: Int = 16): VariantSampleMatrix = { @@ -2383,13 +2385,13 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, val newRDD2 = if (leftAligned) - OrderedRDD2(rdd2.typ, - rdd2.orderedPartitioner, + OrderedRVD(rdd2.typ, + rdd2.partitioner, minRep1(removeLeftAligned = false, removeMoving = false, verifyLeftAligned = true)) else SplitMulti.unionMovedVariants( - OrderedRDD2(rdd2.typ, - rdd2.orderedPartitioner, + OrderedRVD(rdd2.typ, + rdd2.partitioner, minRep1(removeLeftAligned = false, removeMoving = true, verifyLeftAligned = false)), minRep1(removeLeftAligned = true, removeMoving = false, verifyLeftAligned = false)) @@ -2495,7 +2497,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, val oldGsType = rowType.fieldType(3).asInstanceOf[TArray] - val newRDD = rdd2.mapPartitionsPreservesPartitioning { it => + val newRDD = rdd2.mapPartitionsPreservesPartitioning(new OrderedRVType(rdd2.typ.partitionKey, rdd2.typ.key, newRowType)) { it => it.map { r => val rvb = new RegionValueBuilder(MemoryBuffer()) rvb.start(newRowType) @@ -2537,7 +2539,7 @@ class VariantSampleMatrix(val hc: HailContext, val metadata: VSMMetadata, rvb.end() rvb.result() } - }.copy(typ = new OrderedRDD2Type(rdd2.typ.partitionKey, rdd2.typ.key, newRowType)) + } copy2(rdd2 = newRDD, sampleIds = kidIds, diff --git a/src/main/scala/org/apache/spark/AggregateWithContext.scala b/src/main/scala/org/apache/spark/AggregateWithContext.scala index 259ce3a6ccc..b7ad296a1f3 100644 --- a/src/main/scala/org/apache/spark/AggregateWithContext.scala +++ b/src/main/scala/org/apache/spark/AggregateWithContext.scala @@ -4,22 +4,20 @@ import org.apache.spark.util.Utils import scala.reflect.ClassTag object AggregateWithContext { - implicit class RichRDD[T](val rdd: RDD[T]) extends AnyVal { - def aggregateWithContext[U: ClassTag, V](context: () => V)(zeroValue: U) - (seqOp: (V, U, T) => U, combOp: (U, U) => U): U = rdd.withScope { - val sc = rdd.sparkContext - // Clone the zero value since we will also be serializing it as part of tasks - var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) - val cleanSeqOp = sc.clean(seqOp) - val cleanCombOp = sc.clean(combOp) - val aggregatePartition = { (it: Iterator[T]) => - val localContext = context() - it.aggregate(zeroValue)(cleanSeqOp(localContext, _, _), cleanCombOp) - } - val cleanAggregatePartition = sc.clean(aggregatePartition) - val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) - sc.runJob(rdd, cleanAggregatePartition, mergeResult) - jobResult + def aggregateWithContext[T, U: ClassTag, V](rdd: RDD[T])(context: () => V)(zeroValue: U) + (seqOp: (V, U, T) => U, combOp: (U, U) => U): U = rdd.withScope { + val sc = rdd.sparkContext + // Clone the zero value since we will also be serializing it as part of tasks + var jobResult = Utils.clone(zeroValue, sc.env.serializer.newInstance()) + val cleanSeqOp = sc.clean(seqOp) + val cleanCombOp = sc.clean(combOp) + val aggregatePartition = { (it: Iterator[T]) => + val localContext = context() + it.aggregate(zeroValue)(cleanSeqOp(localContext, _, _), cleanCombOp) } + val cleanAggregatePartition = sc.clean(aggregatePartition) + val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) + sc.runJob(rdd, cleanAggregatePartition, mergeResult) + jobResult } } diff --git a/src/test/scala/is/hail/methods/LDPruneSuite.scala b/src/test/scala/is/hail/methods/LDPruneSuite.scala index b58f220a1e0..dec49f37eb6 100644 --- a/src/test/scala/is/hail/methods/LDPruneSuite.scala +++ b/src/test/scala/is/hail/methods/LDPruneSuite.scala @@ -5,7 +5,7 @@ import is.hail.SparkSuite import is.hail.annotations.{Annotation, MemoryBuffer, RegionValue, RegionValueBuilder} import is.hail.check.Prop._ import is.hail.check.{Gen, Properties} -import is.hail.expr.{TArray, TCall, TLocus, TStruct, TVariant} +import is.hail.expr.{TArray, TStruct} import is.hail.stats.RegressionUtils import is.hail.variant._ import is.hail.utils._