Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Easier information API #72

Merged
merged 5 commits into from
Feb 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 47 additions & 8 deletions src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,11 @@ class QbeastTable private (

private def indexedTable: IndexedTable = indexedTableFactory.getIndexedTable(tableID)

private def getAvailableRevision(revisionID: Option[RevisionID]): RevisionID = {
revisionID match {
case Some(id) if qbeastSnapshot.existsRevision(id) =>
id
case None => qbeastSnapshot.loadLatestRevision.revisionID
}
private def latestRevisionAvailable = qbeastSnapshot.loadLatestRevision.revisionID

private def getAvailableRevision(revisionID: RevisionID): RevisionID = {
if (qbeastSnapshot.existsRevision(revisionID)) revisionID
else latestRevisionAvailable
}

/**
Expand All @@ -46,7 +45,7 @@ class QbeastTable private (
* @param revisionID the identifier of the revision to optimize.
* If doesn't exist or none is specified, would be the last available
*/
def optimize(revisionID: Option[RevisionID] = None): Unit = {
def optimize(revisionID: RevisionID = -1L): Unit = {
OptimizeTableCommand(getAvailableRevision(revisionID), indexedTable)
.run(sparkSession)

Expand All @@ -59,12 +58,52 @@ class QbeastTable private (
* If doesn't exist or none is specified, would be the last available
* @return the sequence of cubes to optimize in string representation
*/
def analyze(revisionID: Option[RevisionID] = None): Seq[String] = {
def analyze(revisionID: RevisionID = -1L): Seq[String] = {
AnalyzeTableCommand(getAvailableRevision(revisionID), indexedTable)
.run(sparkSession)
.map(_.getString(0))
}

/**
* Outputs the indexed columns of the table
* @param revisionID the identifier of the revision.
* If doesn't exist or none is specified, would be the last available
* @return
*/
def indexedColumns(revisionID: RevisionID = -1L): Seq[String] = {
val revID = getAvailableRevision(revisionID)
qbeastSnapshot.loadRevision(revID).columnTransformers.map(_.columnName)
}

/**
* Outputs the cubeSize of the table
* @param revisionID the identifier of the revision.
* If doesn't exist or none is specified, would be the last available
* @return
*/
def cubeSize(revisionID: RevisionID = -1L): Int = {
val revID = getAvailableRevision(revisionID)
qbeastSnapshot.loadRevision(revID).desiredCubeSize
}

/**
* Outputs all the revision identifiers available for the table
* @return
*/

def revisionsID(): Seq[RevisionID] = {
qbeastSnapshot.loadAllRevisions.map(_.revisionID)
}

/**
* Outputs the identifier of the latest revision available
* @return
*/

def latestRevisionID(): RevisionID = {
qbeastSnapshot.loadLatestRevision.revisionID
}

}

object QbeastTable {
Expand Down
99 changes: 99 additions & 0 deletions src/test/scala/io/qbeast/spark/utils/QbeastTableTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package io.qbeast.spark.utils

import io.qbeast.TestClasses.Client3
import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.col

class QbeastTableTest extends QbeastIntegrationTestSpec {

private def createDF(spark: SparkSession) = {
val rdd =
spark.sparkContext.parallelize(
0.to(1000)
.map(i => Client3(i * i, s"student-$i", i, (i * 1000 + 123), i * 2567.3432143)))
spark.createDataFrame(rdd)
}

"IndexedColumns" should "output the indexed columns" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
{
val data = createDF(spark)
val columnsToIndex = Seq("age", "val2")
val cubeSize = 100
// WRITE SOME DATA
writeTestData(data, columnsToIndex, cubeSize, tmpDir)

val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.indexedColumns() shouldBe columnsToIndex
}
}

"CubeSize" should "output the cube size" in withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
{
val data = createDF(spark)
val columnsToIndex = Seq("age", "val2")
val cubeSize = 100
// WRITE SOME DATA
writeTestData(data, columnsToIndex, cubeSize, tmpDir)

val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.cubeSize() shouldBe cubeSize
}
}

"Latest revision" should "ouput the last revision available" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
{
val data = createDF(spark)
val columnsToIndex = Seq("age", "val2")
val cubeSize = 100
// WRITE SOME DATA
writeTestData(data, columnsToIndex, cubeSize, tmpDir)

val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.latestRevisionID() shouldBe 1L
}
}

it should "ouput the last revision available within different revisions" in
withQbeastContextSparkAndTmpDir { (spark, tmpDir) =>
{
val revision1 = createDF(spark)
val columnsToIndex = Seq("age", "val2")
val cubeSize = 100
// WRITE SOME DATA
writeTestData(revision1, columnsToIndex, cubeSize, tmpDir)

val revision2 = revision1.withColumn("age", col("age") * 2)
writeTestData(revision2, columnsToIndex, cubeSize, tmpDir, "append")

val revision3 = revision1.withColumn("val2", col("val2") * 2)
writeTestData(revision3, columnsToIndex, cubeSize, tmpDir, "append")

val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.latestRevisionID() shouldBe 3L
}
}

"Revisions" should "outputs all the revision available" in withQbeastContextSparkAndTmpDir {
(spark, tmpDir) =>
{
val revision1 = createDF(spark)
val columnsToIndex = Seq("age", "val2")
val cubeSize = 100
// WRITE SOME DATA
writeTestData(revision1, columnsToIndex, cubeSize, tmpDir)

val revision2 = revision1.withColumn("age", col("age") * 2)
writeTestData(revision2, columnsToIndex, cubeSize, tmpDir, "append")

val revision3 = revision1.withColumn("val2", col("val2") * 2)
writeTestData(revision3, columnsToIndex, cubeSize, tmpDir, "append")

val qbeastTable = QbeastTable.forPath(spark, tmpDir)
qbeastTable.revisionsID().size shouldBe 3
qbeastTable.revisionsID() shouldBe Seq(1L, 2L, 3L)
}
}
}