Skip to content

Commit

Permalink
[query] fix bad bug in IndexedRVDSpec2 (#14420)
Browse files Browse the repository at this point in the history
CHANGELOG: Fixes a serious, but likely rare, bug in the
Table/MatrixTable reader, which has been present since Sep 2020. It
manifests as many (around half or more) of the rows being dropped. This
could only happen when 1) reading a (matrix)table whose partitioning
metadata allows rows with the same key to be split across neighboring
partitions, and 2) reading it with a different partitioning than it was
written. 1) would likely only happen by reading data keyed by locus and
alleles, and rekeying it to only locus before writing. 2) would likely
only happen by using the `_intervals` or `_n_partitions` arguments to
`read_(matrix)_table`, or possibly `repartition`. Please reach out to us
if you're concerned you may have been affected by this.

This fixes a serious and longstanding bug in `IndexedRVDSpec2`, which
appears to have been around since this code was first added in #9522
almost four years ago. It was reported in this [zulip
thread](https://hail.zulipchat.com/#narrow/stream/123010-Hail-Query-0.2E2-support/topic/Number.20of.20rows.20changing.20with.20partitioning).
I want to do further work to better characterize exactly what it takes
to be affected by this bug, but I think you must have a table or
matrixtable on disk which has duplicate keys, and moreover keys which
span neighboring partitions, and then you must read the data with a
different partitioner.

The root of the issue is an invalid assumption made in the code. To read
data written with partitioner `p1` using new partitioner `p2`, it first
computes the "intersection", or common refinement, of the two. It then
assumes that each partition in the refinement overlaps exactly one
partition of `p1`. But this is only true if the partitions of `p1` are
themselves mutually disjoint, which is usually but not necessarily true.

For example, suppose `p1 = [ [1, 5], [5, 8] ]` is the old partitioner,
and `p2 = [ [1, 4), [4, 8] ]` is the new. Note that the two input
partitions are not disjoint, as the key `5` is allowed in both. The
common refinement would then be `[ [1, 4), [4, 5], [5, 8] ]`. For each
partition in the refinement, we want to read in the corresponding range
from the appropriate input partition, then we want to group the
partitions in the refinement to match the new partitioner. The code
finds "the appropriate input partition" by taking the first input
partition which overlaps the refinement partition, using
`lowerBoundInterval`. That works if there is only one overlapping input
partition, but here fails, since the refinement partition `[5, 8]`
overlaps both input partitions. So the code mistakenly reads from the
input partition `[1, 5]` to produce the refinement partition `[5, 8]`,
and so completely drops all rows in the input `[5, 8]`.

In practice, I think the most likely way to run into this (and the way
it was found by a user) is to have a dataset keyed by `["locus",
"alleles"]`, which has split multi-allelics, so there are multiple rows
with the same locus. Then shorten the key to `["locus"]`, write the
dataset to disk, and read it back with a different partitioning, e.g. by
passing a `_n_partitions` argument to `read_table` or
`read_matrix_table`. For instance, if the partitioning was originally `[
[{1:1, ["A"]}, {1:500, ["G"]}), [{1:500, ["G"]}, {1:1000, ["C"]}] ]`,
then after shortening the key it would be `[ [1:1, 1:500], [1:500,
1:1000] ]`. Notice that even though the original partitioning had no
overlap, it does after shortening the key, because rows with locus
`1:500` with alleles less than `["G"]` are allowed in the first
partition, so we have to make the right endpoint inclusive after
shortening. You would then need to write this rekeyed dataset to disk
and read it back with different partitioning (note that `ds.repartition`
is enough to do this in the batch backend).

I still need to think through what holes in our testing allowed this to
remain undetected for so long, and attempt to plug them. We should also
plan for what to tell a user who is concerned they may have been
affected by this in the past.
  • Loading branch information
patrick-schultz authored and chrisvittal committed Jul 11, 2024
1 parent e583d70 commit 83f3a5d
Show file tree
Hide file tree
Showing 107 changed files with 103 additions and 31 deletions.
94 changes: 65 additions & 29 deletions hail/src/main/scala/is/hail/rvd/AbstractRVDSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ import is.hail.backend.{ExecuteContext, HailStateManager}
import is.hail.compatibility
import is.hail.expr.{ir, JSONAnnotationImpex}
import is.hail.expr.ir.{
IR, Literal, PartitionNativeReader, PartitionZippedIndexedNativeReader,
PartitionZippedNativeReader, ReadPartition, ToStream,
flatMapIR, IR, Literal, PartitionNativeReader, PartitionZippedIndexedNativeReader,
PartitionZippedNativeReader, ReadPartition, Ref, ToStream,
}
import is.hail.expr.ir.lowering.{TableStage, TableStageDependency}
import is.hail.io._
Expand Down Expand Up @@ -484,48 +484,84 @@ case class IndexedRVDSpec2(
): IR => TableStage = newPartitioner match {
case Some(np) =>
val part = partitioner(ctx.stateManager)
/* ensure the old and new partitioners have the same key, and ensure the new partitioner is
* strict */
val extendedNP = np.extendKey(part.kType)
val tmpPartitioner = part.intersect(extendedNP)

assert(key.nonEmpty)

val rSpec = typedCodecSpec
val reader =
ir.PartitionNativeReaderIndexed(rSpec, indexSpec, part.kType.fieldNames, uidFieldName)

val absPath = path
val partPaths = tmpPartitioner.rangeBounds.map(b => partFiles(part.lowerBoundInterval(b)))

val kSize = part.kType.size
absolutePartPaths(path)
assert(tmpPartitioner.rangeBounds.size == partPaths.length)
val contextsValues: IndexedSeq[Row] = tmpPartitioner.rangeBounds.map { interval =>
val partIdx = part.lowerBoundInterval(interval)
val partPath = partFiles(partIdx)
val reader = ir.PartitionNativeReaderIndexed(
typedCodecSpec,
indexSpec,
part.kType.fieldNames,
uidFieldName,
)

def makeCtx(oldPartIdx: Int, newPartIdx: Int): Row = {
val oldInterval = part.rangeBounds(oldPartIdx)
val partFile = partFiles(oldPartIdx)
val intersectionInterval =
extendedNP.rangeBounds(newPartIdx)
.intersect(extendedNP.kord, oldInterval).get
Row(
partIdx.toLong,
s"$absPath/parts/$partPath",
s"$absPath/${indexSpec.relPath}/$partPath.idx",
RVDPartitioner.intervalToIRRepresentation(interval, kSize),
oldPartIdx.toLong,
s"$path/parts/$partFile",
s"$path/${indexSpec.relPath}/$partFile.idx",
RVDPartitioner.intervalToIRRepresentation(intersectionInterval, part.kType.size),
)
}

assert(TArray(reader.contextType).typeCheck(contextsValues))
val (nestedContexts, newPartitioner) = if (filterIntervals) {
/* We want to filter to intervals in newPartitioner, while preserving the old partitioning,
* but dropping any partitions we know would be empty. So we construct a map from old
* partitions to the range of overlapping new partitions, dropping any with an empty range. */
val contextsAndBounds = for {
(oldInterval, oldPartIdx) <- part.rangeBounds.toFastSeq.zipWithIndex
overlapRange = extendedNP.queryInterval(oldInterval)
if overlapRange.nonEmpty
} yield {
val ctxs = overlapRange.map(newPartIdx => makeCtx(oldPartIdx, newPartIdx))
// the interval spanning all overlapping filter intervals
val newInterval = Interval(
extendedNP.rangeBounds(overlapRange.head).left,
extendedNP.rangeBounds(overlapRange.last).right,
)
(
ctxs,
// Shrink oldInterval to the rows filtered to.
// By construction we know oldInterval and newInterval overlap
oldInterval.intersect(extendedNP.kord, newInterval).get,
)
}
val (nestedContexts, newRangeBounds) = contextsAndBounds.unzip

(nestedContexts, new RVDPartitioner(part.sm, part.kType, newRangeBounds))
} else {
/* We want to use newPartitioner as the partitioner, dropping any rows not contained in any
* new partition. So we construct a map from new partitioner to the range of overlapping old
* partitions. */
val nestedContexts =
extendedNP.rangeBounds.toFastSeq.zipWithIndex.map { case (newInterval, newPartIdx) =>
val overlapRange = part.queryInterval(newInterval)
overlapRange.map(oldPartIdx => makeCtx(oldPartIdx, newPartIdx))
}

val contexts = ir.ToStream(ir.Literal(TArray(reader.contextType), contextsValues))
(nestedContexts, extendedNP)
}

val body = (ctx: IR) => ir.ReadPartition(ctx, requestedType.rowType, reader)
assert(TArray(TArray(reader.contextType)).typeCheck(nestedContexts))

{ (globals: IR) =>
val ts = TableStage(
TableStage(
globals,
tmpPartitioner,
newPartitioner,
TableStageDependency.none,
contexts,
body,
contexts = ir.ToStream(ir.Literal(TArray(TArray(reader.contextType)), nestedContexts)),
body = (ctxs: Ref) =>
flatMapIR(ToStream(ctxs, true)) { ctx =>
ir.ReadPartition(ctx, requestedType.rowType, reader)
},
)
if (filterIntervals) ts.repartitionNoShuffle(ctx, part, dropEmptyPartitions = true)
else ts.repartitionNoShuffle(ctx, extendedNP)
}

case None =>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This folder comprises a Hail (www.hail.is) native Table or MatrixTable.
Written with version 0.2.128-705d4033e0c9
Created at 2024/03/27 12:03:10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This folder comprises a Hail (www.hail.is) native Table or MatrixTable.
Written with version 0.2.128-705d4033e0c9
Created at 2024/03/27 12:03:10
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This folder comprises a Hail (www.hail.is) native Table or MatrixTable.
Written with version 0.2.128-705d4033e0c9
Created at 2024/03/27 12:03:10
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This folder comprises a Hail (www.hail.is) native Table or MatrixTable.
Written with version 0.2.128-705d4033e0c9
Created at 2024/03/27 12:03:10
Empty file.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
This folder comprises a Hail (www.hail.is) native Table or MatrixTable.
Written with version 0.2.128-705d4033e0c9
Created at 2024/03/27 12:03:10
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"file_version":67328,"hail_version":"0.2.128-705d4033e0c9","references_rel_path":"../references","table_type":"Table{global:Struct{},key:[locus],row:Struct{locus:Locus(GRCh37),alleles:Array[String],rsid:String,qual:Float64,filters:Set[String],info:Struct{NEGATIVE_TRAIN_SITE:Boolean,HWP:Float64,AC:Array[Int32],culprit:String,MQ0:Int32,ReadPosRankSum:Float64,AN:Int32,InbreedingCoeff:Float64,AF:Array[Float64],GQ_STDDEV:Float64,FS:Float64,DP:Int32,GQ_MEAN:Float64,POSITIVE_TRAIN_SITE:Boolean,VQSLOD:Float64,ClippingRankSum:Float64,BaseQRankSum:Float64,MLEAF:Array[Float64],MLEAC:Array[Int32],MQ:Float64,QD:Float64,END:Int32,DB:Boolean,HaplotypeScore:Float64,MQRankSum:Float64,CCC:Int32,NCC:Int32,DS:Boolean}}}","components":{"globals":{"name":"RVDComponentSpec","rel_path":"../globals/rows"},"rows":{"name":"RVDComponentSpec","rel_path":"rows"},"partition_counts":{"name":"PartitionCountsComponentSpec","counts":[18,17,17,18,17,17,17,18,17,17,17,18,17,17,17,18,17,17,17,18]},"properties":{"name":"PropertiesSpec","properties":{"distinctlyKeyed":false}}},"name":"TableSpec"}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"name":"IndexedRVDSpec2","_key":["locus"],"_codecSpec":{"name":"TypedCodecSpec","_eType":"+EBaseStruct{locus:EBaseStruct{contig:+EBinary,position:+EInt32},alleles:EArray[EBinary],rsid:EBinary,qual:EFloat64,filters:EArray[EBinary],info:EBaseStruct{NEGATIVE_TRAIN_SITE:EBoolean,HWP:EFloat64,AC:EArray[EInt32],culprit:EBinary,MQ0:EInt32,ReadPosRankSum:EFloat64,AN:EInt32,InbreedingCoeff:EFloat64,AF:EArray[EFloat64],GQ_STDDEV:EFloat64,FS:EFloat64,DP:EInt32,GQ_MEAN:EFloat64,POSITIVE_TRAIN_SITE:EBoolean,VQSLOD:EFloat64,ClippingRankSum:EFloat64,BaseQRankSum:EFloat64,MLEAF:EArray[EFloat64],MLEAC:EArray[EInt32],MQ:EFloat64,QD:EFloat64,END:EInt32,DB:EBoolean,HaplotypeScore:EFloat64,MQRankSum:EFloat64,CCC:EInt32,NCC:EInt32,DS:EBoolean}}","_vType":"Struct{locus:Locus(GRCh37),alleles:Array[String],rsid:String,qual:Float64,filters:Set[String],info:Struct{NEGATIVE_TRAIN_SITE:Boolean,HWP:Float64,AC:Array[Int32],culprit:String,MQ0:Int32,ReadPosRankSum:Float64,AN:Int32,InbreedingCoeff:Float64,AF:Array[Float64],GQ_STDDEV:Float64,FS:Float64,DP:Int32,GQ_MEAN:Float64,POSITIVE_TRAIN_SITE:Boolean,VQSLOD:Float64,ClippingRankSum:Float64,BaseQRankSum:Float64,MLEAF:Array[Float64],MLEAC:Array[Int32],MQ:Float64,QD:Float64,END:Int32,DB:Boolean,HaplotypeScore:Float64,MQRankSum:Float64,CCC:Int32,NCC:Int32,DS:Boolean}}","_bufferSpec":{"name":"LEB128BufferSpec","child":{"name":"BlockingBufferSpec","blockSize":65536,"child":{"name":"ZstdBlockBufferSpec","blockSize":65536,"child":{"name":"StreamBlockBufferSpec"}}}}},"_indexSpec":{"name":"IndexSpec2","_relPath":"../../index","_leafCodec":{"name":"TypedCodecSpec","_eType":"EBaseStruct{first_idx:+EInt64,keys:+EArray[+EBaseStruct{key:+EBaseStruct{locus:EBaseStruct{contig:+EBinary,position:+EInt32}},offset:+EInt64,annotation:+EBaseStruct{entries_offset:EInt64}}]}","_vType":"Struct{first_idx:Int64,keys:Array[Struct{key:Struct{locus:Locus(GRCh37)},offset:Int64,annotation:Struct{entries_offset:Int64}}]}","_bufferSpec":{"name":"LEB128BufferSpec","child":{"name":"BlockingBufferSpec","blockSize":65536,"child":{"name":"ZstdBlockBufferSpec","blockSize":65536,"child":{"name":"StreamBlockBufferSpec"}}}}},"_internalNodeCodec":{"name":"TypedCodecSpec","_eType":"EBaseStruct{children:+EArray[+EBaseStruct{index_file_offset:+EInt64,first_idx:+EInt64,first_key:+EBaseStruct{locus:EBaseStruct{contig:+EBinary,position:+EInt32}},first_record_offset:+EInt64,first_annotation:+EBaseStruct{entries_offset:EInt64}}]}","_vType":"Struct{children:Array[Struct{index_file_offset:Int64,first_idx:Int64,first_key:Struct{locus:Locus(GRCh37)},first_record_offset:Int64,first_annotation:Struct{entries_offset:Int64}}]}","_bufferSpec":{"name":"LEB128BufferSpec","child":{"name":"BlockingBufferSpec","blockSize":65536,"child":{"name":"ZstdBlockBufferSpec","blockSize":65536,"child":{"name":"StreamBlockBufferSpec"}}}}},"_keyType":"Struct{locus:Locus(GRCh37)}","_annotationType":"Struct{entries_offset:Int64}"},"_partFiles":["part-00-cdb826da-6c5c-47b6-945b-3190a87a6a14","part-01-06f6a507-61e2-4bd1-a917-e1809270144c","part-02-881d024c-5baf-4fe6-bc8f-53eda3845bde","part-03-1e085a57-4dcb-4131-bc79-353324ffad47","part-04-d17ed9aa-6b33-4b0b-85d5-578da32f7581","part-05-40d512f8-23ba-485e-aefa-47eced2bfe6d","part-06-9b2dc9c7-c8b1-4ed4-9056-20142b5f6658","part-07-b9a32d97-cb10-4158-aeaa-645dcea68ca7","part-08-c2a0123f-a3d4-4b80-9c21-73cb2bed0b63","part-09-ca197aee-6bfd-4068-b771-e9ca63551a7c","part-10-17048169-a98b-49ee-ae4d-62641023b3ac","part-11-c89858f5-4d78-4739-af31-308a1c257ff4","part-12-3e391e78-782d-495d-a29c-cacc56e1baf8","part-13-62566d28-e496-4538-a325-b567be66accf","part-14-8ab32ab7-15cd-4302-bb45-6b3dc02db5b6","part-15-c4301966-4fd8-4ea0-b439-b49a693bf683","part-16-8d638c2e-b1a5-4507-ba00-337a02e3f431","part-17-0c739863-b5fe-4e33-8f47-3e2751b599df","part-18-35d65ae7-5d1d-43f8-bb21-e6565874975e","part-19-2fd81de2-5d34-43db-809d-2f1fe1e67200"],"_jRangeBounds":[{"start":{"locus":{"contig":"20","position":10019093}},"end":{"locus":{"contig":"20","position":10286773}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":10286773}},"end":{"locus":{"contig":"20","position":10603326}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":10603326}},"end":{"locus":{"contig":"20","position":10625804}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":10625804}},"end":{"locus":{"contig":"20","position":10653469}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":10653469}},"end":{"locus":{"contig":"20","position":13071871}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":13071871}},"end":{"locus":{"contig":"20","position":13260252}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":13260252}},"end":{"locus":{"contig":"20","position":13561632}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":13561632}},"end":{"locus":{"contig":"20","position":13709115}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":13709115}},"end":{"locus":{"contig":"20","position":13798776}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":13798776}},"end":{"locus":{"contig":"20","position":14032627}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":14032627}},"end":{"locus":{"contig":"20","position":15948325}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":15948325}},"end":{"locus":{"contig":"20","position":16347823}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":16347823}},"end":{"locus":{"contig":"20","position":16410559}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":16410559}},"end":{"locus":{"contig":"20","position":17410116}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":17410116}},"end":{"locus":{"contig":"20","position":17475217}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":17475217}},"end":{"locus":{"contig":"20","position":17595540}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":17595540}},"end":{"locus":{"contig":"20","position":17600357}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":17600357}},"end":{"locus":{"contig":"20","position":17608348}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":17608348}},"end":{"locus":{"contig":"20","position":17705709}},"includeStart":true,"includeEnd":true},{"start":{"locus":{"contig":"20","position":17705709}},"end":{"locus":{"contig":"20","position":17970876}},"includeStart":true,"includeEnd":true}],"_attrs":{}}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
23 changes: 21 additions & 2 deletions hail/src/test/scala/is/hail/expr/ir/TableIRSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ import is.hail.annotations.SafeNDArray
import is.hail.expr.Nat
import is.hail.expr.ir.TestUtils._
import is.hail.expr.ir.lowering.{DArrayLowering, LowerTableIR}
import is.hail.methods.ForceCountTable
import is.hail.methods.{ForceCountTable, NPartitionsTable}
import is.hail.rvd.RVDPartitioner
import is.hail.types._
import is.hail.types.virtual._
import is.hail.utils._

import is.hail.variant.Locus
import org.apache.spark.sql.Row
import org.scalatest.{Failed, Succeeded}
import org.scalatest.Inspectors.forAll
Expand Down Expand Up @@ -674,6 +674,25 @@ class TableIRSuite extends HailSuite {
assertEvalsTo(TableCount(join), 346L)
}

@Test def testNativeReaderWithOverlappingPartitions(): Unit = {
val path = "src/test/resources/sample.vcf-20-partitions-with-overlap.mt/rows"
// i1 overlaps the first two partitions
val i1 = Interval(Row(Locus("20", 10200000)), Row(Locus("20", 10500000)), true, true)

def test(filterIntervals: Boolean, expectedNParts: Int): Unit = {
val opts = NativeReaderOptions(FastSeq(i1), TLocus("GRCh37"), filterIntervals)
val tr = TableNativeReader(fs, TableNativeReaderParameters(path, Some(opts)))
val tir = TableRead(tr.fullTypeWithoutUIDs, false, tr)
val nParts = TableToValueApply(tir, NPartitionsTable())
val count = TableToValueApply(tir, ForceCountTable())
assertEvalsTo(nParts, expectedNParts)
assertEvalsTo(count, 20L)
}

test(false, 1)
test(true, 2)
}

@Test def testTableKeyBy(): Unit = {
implicit val execStrats = ExecStrategy.interpretOnly
val data = Array(Array("A", 1), Array("A", 2), Array("B", 1))
Expand Down

0 comments on commit 83f3a5d

Please sign in to comment.