Skip to content

Commit

Permalink
Qbeast-io#207 Fixes for IndexFile, QbeastBaseRelation, QbeastFileForm…
Browse files Browse the repository at this point in the history
…at and RangedColumnarBatchIterator
  • Loading branch information
alexeiakimov committed Aug 30, 2023
1 parent 2b5bce4 commit 87e8f94
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 14 deletions.
16 changes: 15 additions & 1 deletion core/src/main/scala/io/qbeast/core/model/IndexFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,22 @@ package io.qbeast.core.model
* @param revisionId the revision identifier
* @param blocks the index blocks
*/
final case class IndexFile(file: File, revisionId: Long, blocks: Array[Block]) extends Serializable {
final case class IndexFile(file: File, revisionId: Long, blocks: Array[Block])
extends Serializable {
require(file != null)
require(revisionId >= 0)
require(blocks.nonEmpty)

override def equals(other: Any): Boolean = other match {
case IndexFile(file, revisionId, blocks) =>
this.file == file && revisionId == revisionId && this.blocks.length == blocks.length && (0 until this.blocks.length)
.forall(i => this.blocks(i) == blocks(i))
case _ => false
}

override def hashCode(): Int = {
val hash = 31 * file.hashCode() + revisionId.hashCode()
blocks.foldLeft(hash)((hash, block) => 31 * hash + block.hashCode())
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ class IndexFileTest extends AnyFlatSpec with Matchers {
val file = File("path", 1, 2)
val cubeId = CubeId.root(1)
val state = "FLOODED"
val blocks = Seq(
val blocks = Array(
Block(file, RowRange(3, 4), cubeId, state, Weight(5), Weight(6)),
Block(file, RowRange(7, 8), cubeId, state, Weight(9), Weight(10)),
Block(file, RowRange(11, 12), cubeId, state, Weight(13), Weight(14)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object QbeastBaseRelation {
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString)

val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ private[sources] class QbeastFileFormat extends ParquetFileFormat {
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {
// The inherited reader should not apply filter pushdown otherwise
// the specified row ranges will be incorrect, so get the original setting,
// and disable filter pushdown in the parquet files
val filterPushdown = sparkSession.conf.get("spark.sql.parquet.filterPushdown")
sparkSession.conf.set("spark.sql.parquet.filterPushdown", "false")
val reader = super.buildReaderWithPartitionValues(
sparkSession,
dataSchema,
Expand All @@ -35,6 +40,8 @@ private[sources] class QbeastFileFormat extends ParquetFileFormat {
filters,
options,
hadoopConf)
// Restore the original filter pushdown setting
sparkSession.conf.set("spark.sql.parquet.filterPushdown", filterPushdown)
fileWithRanges: PartitionedFile => {
val (path, ranges) = PathRangesCodec.decode(fileWithRanges.filePath)
val file = fileWithRanges.copy(filePath = path)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ private[sources] class RangedColumnarBatchIterator(
}

private var offset = 0
private var batchToClose: Option[ColumnarBatch] = None
private val ranges_ = Queue(ranges.filterNot(_.isEmpty).sortBy(_.from): _*)

override def hasNext: Boolean = {
Expand All @@ -40,10 +41,18 @@ private[sources] class RangedColumnarBatchIterator(
while (batches_.hasNext && offset + batches_.head.numRows() <= ranges_.front.from) {
val batch = batches_.next()
offset += batch.numRows()
batch.close()
batchToClose = Some(batch)
}
// There is a batch intersecting with the first range
batches_.hasNext
// The next batch if exists intersects with the first range
val result = batches_.hasNext
// If there are no more batches, close the used batch to free the resources
// For unknown reasons Spark reuses the returned columnar batch so the close
// operation must be called only for the last batch when there is no more data
// otherwise a closed batch causes NPE in OnHeapColumnVector.
if (!result && batchToClose.isDefined) {
batchToClose.get.close()
}
result
}

override def next(): ColumnarBatch = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class IndexStatusBuilderTest extends QbeastIntegrationTestSpec {
DeltaQbeastSnapshot(deltaLog.snapshot).loadLatestIndexStatus

indexStatus.revision.revisionID shouldBe 1
indexStatus.cubesStatuses.foreach(_._2.files.size shouldBe 1)
indexStatus.cubesStatuses.foreach(_._2.files.size shouldBe >=(1))
indexStatus.cubesStatuses.foreach { case (cube: CubeId, cubeStatus: CubeStatus) =>
cubeStatus.files.foreach(block => block.cubeId shouldBe cube)
}
Expand Down Expand Up @@ -55,7 +55,7 @@ class IndexStatusBuilderTest extends QbeastIntegrationTestSpec {
secondIndexStatus.revision.revisionID shouldBe 1
secondIndexStatus.announcedSet shouldBe Set.empty
secondIndexStatus.replicatedSet shouldBe Set.empty
secondIndexStatus.cubesStatuses.foreach(_._2.files.size shouldBe <=(2))
secondIndexStatus.cubesStatuses.foreach(_._2.files.size shouldBe >=(1))
secondIndexStatus.cubesStatuses.foreach { case (cube: CubeId, cubeStatus: CubeStatus) =>
if (cubeStatus.maxWeight < Weight.MaxValue) {
firstIndexStatus.cubesStatuses.get(cube) shouldBe defined
Expand Down
17 changes: 11 additions & 6 deletions src/test/scala/io/qbeast/spark/index/query/QueryExecutorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.functions.{col, expr}
import org.apache.spark.sql.delta.actions.AddFile
import io.qbeast.spark.utils.TagUtils
import io.qbeast.spark.internal.sources.PathRangesCodec

class QueryExecutorTest extends QbeastIntegrationTestSpec with QueryTestSpec {

Expand Down Expand Up @@ -53,9 +54,10 @@ class QueryExecutorTest extends QbeastIntegrationTestSpec with QueryTestSpec {
val allDeltaFiles = deltaLog.snapshot.allFiles.collect()
val allFiles = allDeltaFiles.map(_.path)

val matchFiles = queryExecutor.execute().map(_.file.path)
val matchFiles =
queryExecutor.execute().map(_.file.path).map(p => PathRangesCodec.decode(p)._1)

matchFiles.size shouldBe <(allFiles.length)
matchFiles.size shouldBe <=(allFiles.length)
matchFiles.foreach(file => allFiles should contain(file))

})
Expand All @@ -75,9 +77,10 @@ class QueryExecutorTest extends QbeastIntegrationTestSpec with QueryTestSpec {
val allDeltaFiles = deltaLog.snapshot.allFiles.collect()
val allFiles = allDeltaFiles.map(_.path)

val matchFiles = queryExecutor.execute().map(_.file.path)
val matchFiles =
queryExecutor.execute().map(_.file.path).map(p => PathRangesCodec.decode(p)._1)

matchFiles.size shouldBe <(allFiles.length)
matchFiles.size shouldBe <=(allFiles.length)
matchFiles.foreach(file => allFiles should contain(file))

})
Expand Down Expand Up @@ -175,11 +178,13 @@ class QueryExecutorTest extends QbeastIntegrationTestSpec with QueryTestSpec {
val matchFiles = queryExecutor
.executeRevision(querySpec, faultyIndexStatus)
.map(_.file.path)
.map(p => PathRangesCodec.decode(p)._1)
.toSet

val allFiles = deltaLog.snapshot.allFiles.collect().map(_.path)

val diff = allFiles.toSet -- matchFiles.toSet
diff.size shouldBe 1
matchFiles.size shouldBe <=(allFiles.length)
matchFiles.foreach(file => allFiles should contain(file))
})

it should "find the max value when filtering" in withSparkAndTmpDir((spark, tmpdir) => {
Expand Down

0 comments on commit 87e8f94

Please sign in to comment.