Skip to content

Commit

Permalink
Merge pull request #120 from osopardo1/append-index-status-builder
Browse files Browse the repository at this point in the history
Fixed #119
  • Loading branch information
osopardo1 authored Jul 22, 2022
2 parents 9a17df8 + e764c2d commit 0d31627
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ private[delta] class IndexStatusBuilder(
revisionFiles
.groupBy(TagColumns.cube)
.agg(
weight(min(TagColumns.maxWeight)).as("maxWeight"),
min(weight(TagColumns.maxWeight)).as("maxWeight"),
sum(TagColumns.elementCount).as("elementCount"),
collect_list(qblock).as("files"))
.select(
Expand Down
65 changes: 65 additions & 0 deletions src/test/scala/io/qbeast/spark/delta/IndexStatusBuilderTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package io.qbeast.spark.delta

import io.qbeast.core.model.{CubeId, CubeStatus, Weight}
import io.qbeast.spark.QbeastIntegrationTestSpec
import org.apache.spark.sql.delta.DeltaLog

class IndexStatusBuilderTest extends QbeastIntegrationTestSpec {

"IndexBuilder" should "build cube information from DeltaLog" in withSparkAndTmpDir(
(spark, tmpDir) => {
import spark.implicits._
val data = 0.to(100000).toDF("id")

// Append data x times
data.write
.format("qbeast")
.option("columnsToIndex", "id")
.option("cubeSize", "10000")
.save(tmpDir)

val deltaLog = DeltaLog.forTable(spark, tmpDir)
val indexStatus =
DeltaQbeastSnapshot(deltaLog.snapshot).loadLatestIndexStatus

indexStatus.revision.revisionID shouldBe 1
indexStatus.cubesStatuses.foreach(_._2.files.size shouldBe 1)
indexStatus.replicatedSet shouldBe Set.empty
indexStatus.announcedSet shouldBe Set.empty
})

it should "work well on appending the same revision" in withSparkAndTmpDir((spark, tmpDir) => {

import spark.implicits._
val data = 0.to(100000).toDF("id")

// Append data x times
data.write
.format("qbeast")
.option("columnsToIndex", "id")
.option("cubeSize", "10000")
.save(tmpDir)
val deltaLog = DeltaLog.forTable(spark, tmpDir)
val firstIndexStatus = DeltaQbeastSnapshot(deltaLog.snapshot).loadLatestIndexStatus
data.write
.format("qbeast")
.mode("append")
.option("columnsToIndex", "id")
.option("cubeSize", "10000")
.save(tmpDir)
val secondIndexStatus = DeltaQbeastSnapshot(deltaLog.update()).loadLatestIndexStatus

secondIndexStatus.revision.revisionID shouldBe 1
secondIndexStatus.announcedSet shouldBe Set.empty
secondIndexStatus.replicatedSet shouldBe Set.empty
secondIndexStatus.cubesStatuses.foreach(_._2.files.size shouldBe <=(2))
secondIndexStatus.cubesStatuses.foreach { case (cube: CubeId, cubeStatus: CubeStatus) =>
if (cubeStatus.maxWeight < Weight.MaxValue) {
firstIndexStatus.cubesStatuses.get(cube) shouldBe defined
cubeStatus.maxWeight shouldBe cubeStatus.files.map(_.maxWeight).min
cubeStatus.maxWeight shouldBe <=(firstIndexStatus.cubesStatuses(cube).maxWeight)
}
}
})

}

0 comments on commit 0d31627

Please sign in to comment.