diff --git a/src/main/scala/io/qbeast/spark/QbeastTable.scala b/src/main/scala/io/qbeast/spark/QbeastTable.scala index fa6053c5c..1e9a40f95 100644 --- a/src/main/scala/io/qbeast/spark/QbeastTable.scala +++ b/src/main/scala/io/qbeast/spark/QbeastTable.scala @@ -6,11 +6,10 @@ package io.qbeast.spark import io.qbeast.context.QbeastContext import io.qbeast.core.model.{QTableID, RevisionID} import io.qbeast.spark.delta.DeltaQbeastSnapshot +import io.qbeast.spark.internal.commands.{AnalyzeTableCommand, OptimizeTableCommand} import io.qbeast.spark.table._ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.delta.DeltaLog -import io.qbeast.spark.internal.commands.AnalyzeTableCommand -import io.qbeast.spark.internal.commands.OptimizeTableCommand /** * Class for interacting with QbeastTable at a user level @@ -32,12 +31,12 @@ class QbeastTable private ( private def indexedTable: IndexedTable = indexedTableFactory.getIndexedTable(tableID) - private def getAvailableRevision(revisionID: Option[RevisionID]): RevisionID = { - revisionID match { - case Some(id) if qbeastSnapshot.existsRevision(id) => - id - case None => qbeastSnapshot.loadLatestRevision.revisionID - } + private def latestRevisionAvailable = qbeastSnapshot.loadLatestRevision + + private def latestRevisionAvailableID = latestRevisionAvailable.revisionID + + private def getAvailableRevision(revisionID: RevisionID): RevisionID = { + if (qbeastSnapshot.existsRevision(revisionID)) revisionID else latestRevisionAvailableID } /** @@ -46,25 +45,81 @@ class QbeastTable private ( * @param revisionID the identifier of the revision to optimize. * If doesn't exist or none is specified, would be the last available */ - def optimize(revisionID: Option[RevisionID] = None): Unit = { + def optimize(revisionID: RevisionID): Unit = { OptimizeTableCommand(getAvailableRevision(revisionID), indexedTable) .run(sparkSession) + } + def optimize(): Unit = { + OptimizeTableCommand(latestRevisionAvailableID, indexedTable) + .run(sparkSession) } /** * The analyze operation should analyze the index structure * and find the cubes that need optimization * @param revisionID the identifier of the revision to optimize. - * If doesn't exist or none is specified, would be the last available + * If doesn't exist or none is specified, would be the last available * @return the sequence of cubes to optimize in string representation */ - def analyze(revisionID: Option[RevisionID] = None): Seq[String] = { + def analyze(revisionID: RevisionID): Seq[String] = { AnalyzeTableCommand(getAvailableRevision(revisionID), indexedTable) .run(sparkSession) .map(_.getString(0)) } + def analyze(): Seq[String] = { + AnalyzeTableCommand(latestRevisionAvailableID, indexedTable) + .run(sparkSession) + .map(_.getString(0)) + } + + /** + * Outputs the indexed columns of the table + * @param revisionID the identifier of the revision. + * If doesn't exist or none is specified, would be the last available + * @return + */ + + def indexedColumns(revisionID: RevisionID): Seq[String] = { + qbeastSnapshot + .loadRevision(getAvailableRevision(revisionID)) + .columnTransformers + .map(_.columnName) + } + + def indexedColumns(): Seq[String] = { + latestRevisionAvailable.columnTransformers.map(_.columnName) + } + + /** + * Outputs the cubeSize of the table + * @param revisionID the identifier of the revision. + * If doesn't exist or none is specified, would be the last available + * @return + */ + def cubeSize(revisionID: RevisionID): Int = + qbeastSnapshot.loadRevision(getAvailableRevision(revisionID)).desiredCubeSize + + def cubeSize(): Int = + latestRevisionAvailable.desiredCubeSize + + /** + * Outputs all the revision identifiers available for the table + * @return + */ + def revisionsIDs(): Seq[RevisionID] = { + qbeastSnapshot.loadAllRevisions.map(_.revisionID) + } + + /** + * Outputs the identifier of the latest revision available + * @return + */ + def latestRevisionID(): RevisionID = { + latestRevisionAvailableID + } + } object QbeastTable { diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala new file mode 100644 index 000000000..cfb8e1200 --- /dev/null +++ b/src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala @@ -0,0 +1,99 @@ +package io.qbeast.spark.utils + +import io.qbeast.TestClasses.Client3 +import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.functions.col + +class QbeastTableTest extends QbeastIntegrationTestSpec { + + private def createDF(spark: SparkSession) = { + val rdd = + spark.sparkContext.parallelize( + 0.to(1000) + .map(i => Client3(i * i, s"student-$i", i, (i * 1000 + 123), i * 2567.3432143))) + spark.createDataFrame(rdd) + } + + "IndexedColumns" should "output the indexed columns" in withQbeastContextSparkAndTmpDir { + (spark, tmpDir) => + { + val data = createDF(spark) + val columnsToIndex = Seq("age", "val2") + val cubeSize = 100 + // WRITE SOME DATA + writeTestData(data, columnsToIndex, cubeSize, tmpDir) + + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.indexedColumns() shouldBe columnsToIndex + } + } + + "CubeSize" should "output the cube size" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val data = createDF(spark) + val columnsToIndex = Seq("age", "val2") + val cubeSize = 100 + // WRITE SOME DATA + writeTestData(data, columnsToIndex, cubeSize, tmpDir) + + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.cubeSize() shouldBe cubeSize + } + } + + "Latest revision" should "ouput the last revision available" in withQbeastContextSparkAndTmpDir { + (spark, tmpDir) => + { + val data = createDF(spark) + val columnsToIndex = Seq("age", "val2") + val cubeSize = 100 + // WRITE SOME DATA + writeTestData(data, columnsToIndex, cubeSize, tmpDir) + + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.latestRevisionID() shouldBe 1L + } + } + + it should "ouput the last revision available within different revisions" in + withQbeastContextSparkAndTmpDir { (spark, tmpDir) => + { + val revision1 = createDF(spark) + val columnsToIndex = Seq("age", "val2") + val cubeSize = 100 + // WRITE SOME DATA + writeTestData(revision1, columnsToIndex, cubeSize, tmpDir) + + val revision2 = revision1.withColumn("age", col("age") * 2) + writeTestData(revision2, columnsToIndex, cubeSize, tmpDir, "append") + + val revision3 = revision1.withColumn("val2", col("val2") * 2) + writeTestData(revision3, columnsToIndex, cubeSize, tmpDir, "append") + + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.latestRevisionID() shouldBe 3L + } + } + + "Revisions" should "outputs all the revision available" in withQbeastContextSparkAndTmpDir { + (spark, tmpDir) => + { + val revision1 = createDF(spark) + val columnsToIndex = Seq("age", "val2") + val cubeSize = 100 + // WRITE SOME DATA + writeTestData(revision1, columnsToIndex, cubeSize, tmpDir) + + val revision2 = revision1.withColumn("age", col("age") * 2) + writeTestData(revision2, columnsToIndex, cubeSize, tmpDir, "append") + + val revision3 = revision1.withColumn("val2", col("val2") * 2) + writeTestData(revision3, columnsToIndex, cubeSize, tmpDir, "append") + + val qbeastTable = QbeastTable.forPath(spark, tmpDir) + qbeastTable.revisionsIDs().size shouldBe 3 + qbeastTable.revisionsIDs() shouldBe Seq(1L, 2L, 3L) + } + } +}