From e62003508a78a83d13075ee68f230cb412f23bc5 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 5 Dec 2023 07:46:16 +0100 Subject: [PATCH 01/15] WIP on Create External table --- .../io/qbeast/spark/QbeastTableUtils.scala | 46 +++++++++++++++++++ .../qbeast/spark/internal/QbeastOptions.scala | 4 ++ .../sources/catalog/QbeastCatalog.scala | 2 - .../sources/catalog/QbeastCatalogUtils.scala | 34 +++++++++++++- .../utils/QbeastSQLIntegrationTest.scala | 38 +++++++++++++++ 5 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 src/main/scala/io/qbeast/spark/QbeastTableUtils.scala diff --git a/src/main/scala/io/qbeast/spark/QbeastTableUtils.scala b/src/main/scala/io/qbeast/spark/QbeastTableUtils.scala new file mode 100644 index 000000000..c410ddeb9 --- /dev/null +++ b/src/main/scala/io/qbeast/spark/QbeastTableUtils.scala @@ -0,0 +1,46 @@ +package io.qbeast.spark + +import io.qbeast.spark.delta.DeltaQbeastSnapshot +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.DeltaTableUtils.isDeltaTable + +object QbeastTableUtils { + + /** + * Gets the existing configuration of a Qbeast table + * @param spark + * @param path + * @return + */ + def getExistingConf(spark: SparkSession, path: Path): Map[String, String] = { + val deltaLog = DeltaLog.forTable(spark, path) + val unsafeVolatileSnapshot = deltaLog.update() + val qbeastSnapshot = DeltaQbeastSnapshot(unsafeVolatileSnapshot) + val revision = qbeastSnapshot.loadLatestRevision + Map( + "columnsToIndex" -> revision.columnTransformers.map(_.columnName).mkString(","), + "cubeSize" -> revision.desiredCubeSize.toString) + + } + + /** + * Checks if a table is a Qbeast table + * @param path + * @return + */ + + def isQbeastTable(path: Path): Boolean = { + val spark = SparkSession.active + + isDeltaTable(spark, path) && { + val deltaLog = DeltaLog.forTable(spark, path) + val unsafeVolatileSnapshot = deltaLog.update() + val qbeastSnapshot = DeltaQbeastSnapshot(unsafeVolatileSnapshot) + val isQbeast = qbeastSnapshot.loadAllRevisions.nonEmpty + isQbeast + } + } + +} diff --git a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala index 54597b15d..043371579 100644 --- a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala +++ b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala @@ -97,6 +97,10 @@ object QbeastOptions { })) } + def containsQbeastProperties(parameters: Map[String, String]): Boolean = { + parameters.contains("columnsToIndex") || parameters.contains("columnstoindex") + } + def checkQbeastProperties(parameters: Map[String, String]): Unit = { require( parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"), diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala index 89e70629c..0cd2bd3d0 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalog.scala @@ -4,7 +4,6 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.context.QbeastContext -import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl} import org.apache.hadoop.fs.Path import org.apache.spark.sql.catalyst.TableIdentifier @@ -110,7 +109,6 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal properties: util.Map[String, String]): Table = { if (QbeastCatalogUtils.isQbeastProvider(properties)) { - checkQbeastProperties(properties.asScala.toMap) // Create the table QbeastCatalogUtils.createQbeastTable( ident, diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index fda04dd32..0b8d8184c 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -5,6 +5,9 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.context.QbeastContext.metadataManager import io.qbeast.core.model.QTableID +import io.qbeast.spark.QbeastTableUtils +import io.qbeast.spark.internal.QbeastOptions +import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties import io.qbeast.spark.internal.sources.v2.QbeastTableImpl import io.qbeast.spark.table.IndexedTableFactory import org.apache.hadoop.fs.{FileSystem, Path} @@ -161,7 +164,7 @@ object QbeastCatalogUtils { val location = if (isPathTable) { Option(ident.name()) } else { - Option(allTableProperties.get("location")) + allTableProperties.asScala.get("location") } // Define the table type. @@ -176,6 +179,33 @@ object QbeastCatalogUtils { .orElse(existingTableOpt.flatMap(_.storage.locationUri)) .getOrElse(existingSessionCatalog.defaultTablePath(id)) + // IF the table is EXTERNAL + // AND no qbeast properties are defined + // we can load the configuration from the commit log + val isExternal = tableType == CatalogTableType.EXTERNAL + val qbeastProperties = + if (isExternal) { + val locPath = new Path(loc) + val containsQbeastProperties = + QbeastOptions.containsQbeastProperties(allTableProperties.asScala.toMap) + if (!containsQbeastProperties && QbeastTableUtils.isQbeastTable(locPath)) { + QbeastTableUtils.getExistingConf(spark, locPath) + } else if (containsQbeastProperties) { + checkQbeastProperties(allTableProperties.asScala.toMap) + Map.empty // nothing to add + } else { + throw AnalysisExceptionFactory.create( + "The table you are trying to create is not Qbeast Formatted. " + + "Please specify the columns to index and the cube size in the options. " + + "You can also use the ConvertToQbeastCommand before creating the table.") + } + } else { + checkQbeastProperties(allTableProperties.asScala.toMap) + Map.empty // nothing to add + } + + val allProperties = allTableProperties.asScala.toMap ++ qbeastProperties + // Initialize the path option val storage = DataSource .buildStorageFormatFromOptions(writeOptions) @@ -198,7 +228,7 @@ object QbeastCatalogUtils { provider = Some("qbeast"), partitionColumnNames = Seq.empty, bucketSpec = None, - properties = allTableProperties.asScala.toMap, + properties = allProperties, comment = commentOpt) // Verify the schema if it's an external table diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala index 287151acd..52751b0e0 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala @@ -121,6 +121,44 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { }) + // TODO + it should "create EXTERNAL existing table WITHOUT options" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + val data = createTestData(spark) + data.write.format("qbeast").option("columnsToIndex", "id,name").save(location) + + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"LOCATION '$location'") + + }) + + // TODO + it should "throw an error if the table is NOT qbeast" in withQbeastContextSparkAndTmpWarehouse( + (spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + val data = createTestData(spark) + data.write.format("delta").save(location) + + an[AnalysisException] shouldBe thrownBy( + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"LOCATION '$location'")) + + an[AnalysisException] shouldBe thrownBy( + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + "OPTIONS ('columnsToIndex'='id') " + + s"LOCATION '$location'")) + + }) + it should "throw an error when using different path locations" in withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { From c21abc0b4ab752dc80b5c9f7520d100b737793ba Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 5 Dec 2023 12:12:18 +0100 Subject: [PATCH 02/15] delete unnecessary classes, simplify code --- .../io/qbeast/spark/QbeastTableUtils.scala | 46 ---------- .../qbeast/spark/internal/QbeastOptions.scala | 4 - .../sources/catalog/QbeastCatalogUtils.scala | 90 ++++++++++++------- .../sources/v2/QbeastWriteBuilder.scala | 2 - .../io/qbeast/spark/table/IndexedTable.scala | 13 +++ 5 files changed, 69 insertions(+), 86 deletions(-) delete mode 100644 src/main/scala/io/qbeast/spark/QbeastTableUtils.scala diff --git a/src/main/scala/io/qbeast/spark/QbeastTableUtils.scala b/src/main/scala/io/qbeast/spark/QbeastTableUtils.scala deleted file mode 100644 index c410ddeb9..000000000 --- a/src/main/scala/io/qbeast/spark/QbeastTableUtils.scala +++ /dev/null @@ -1,46 +0,0 @@ -package io.qbeast.spark - -import io.qbeast.spark.delta.DeltaQbeastSnapshot -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.delta.DeltaLog -import org.apache.spark.sql.delta.DeltaTableUtils.isDeltaTable - -object QbeastTableUtils { - - /** - * Gets the existing configuration of a Qbeast table - * @param spark - * @param path - * @return - */ - def getExistingConf(spark: SparkSession, path: Path): Map[String, String] = { - val deltaLog = DeltaLog.forTable(spark, path) - val unsafeVolatileSnapshot = deltaLog.update() - val qbeastSnapshot = DeltaQbeastSnapshot(unsafeVolatileSnapshot) - val revision = qbeastSnapshot.loadLatestRevision - Map( - "columnsToIndex" -> revision.columnTransformers.map(_.columnName).mkString(","), - "cubeSize" -> revision.desiredCubeSize.toString) - - } - - /** - * Checks if a table is a Qbeast table - * @param path - * @return - */ - - def isQbeastTable(path: Path): Boolean = { - val spark = SparkSession.active - - isDeltaTable(spark, path) && { - val deltaLog = DeltaLog.forTable(spark, path) - val unsafeVolatileSnapshot = deltaLog.update() - val qbeastSnapshot = DeltaQbeastSnapshot(unsafeVolatileSnapshot) - val isQbeast = qbeastSnapshot.loadAllRevisions.nonEmpty - isQbeast - } - } - -} diff --git a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala index 043371579..54597b15d 100644 --- a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala +++ b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala @@ -97,10 +97,6 @@ object QbeastOptions { })) } - def containsQbeastProperties(parameters: Map[String, String]): Boolean = { - parameters.contains("columnsToIndex") || parameters.contains("columnstoindex") - } - def checkQbeastProperties(parameters: Map[String, String]): Unit = { require( parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"), diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 0b8d8184c..df1d84f70 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -5,11 +5,9 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.context.QbeastContext.metadataManager import io.qbeast.core.model.QTableID -import io.qbeast.spark.QbeastTableUtils -import io.qbeast.spark.internal.QbeastOptions -import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties +import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE, checkQbeastProperties} import io.qbeast.spark.internal.sources.v2.QbeastTableImpl -import io.qbeast.spark.table.IndexedTableFactory +import io.qbeast.spark.table.{IndexedTable, IndexedTableFactory} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException @@ -134,6 +132,53 @@ object QbeastCatalogUtils { } } + /** + * Loads the Qbeast parameters from: + * - Properties + * - Existing indexedTable + * @param tableType the table type + * @param indexedTable the indexed table + * @param properties the properties + * @return + */ + def loadQbeastRequiredProperties( + tableType: CatalogTableType, + indexedTable: IndexedTable, + properties: Map[String, String]): Map[String, String] = { + + val isExternal = tableType == CatalogTableType.EXTERNAL + val containsColumnsToIndex = + properties.contains("columnsToIndex") || properties.contains("columnstoindex") + if (isExternal && indexedTable.exists) { + // IF the table is EXTERNAL and EXISTS physically, + // Check and Add Qbeast Properties + (containsColumnsToIndex, indexedTable.isConverted) match { + case (false, true) => + // If it does NOT contain Qbeast Properties AND is converted, load the latest revision + val qbeastSnapshot = metadataManager.loadSnapshot(indexedTable.tableID) + val latestRevision = qbeastSnapshot.loadLatestRevision + val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",") + Map( + COLUMNS_TO_INDEX -> columnsToIndex, + CUBE_SIZE -> latestRevision.desiredCubeSize.toString) + case (_, false) => + // If it is NOT converted, throw error + throw AnalysisExceptionFactory.create( + "The table you are trying to create is not Qbeast Formatted. " + + "Please specify the columns to index and the cube size in the options. " + + "You can also use the ConvertToQbeastCommand before creating the table.") + case _ => + // If it contains Qbeast Properties, check them + checkQbeastProperties(properties) + Map.empty + } + } else { + // If it's NOT external, check the properties + checkQbeastProperties(properties) + Map.empty + } + } + /** * Creates a Table on the Catalog * @param ident the Identifier of the table @@ -158,7 +203,7 @@ object QbeastCatalogUtils { tableFactory: IndexedTableFactory, existingSessionCatalog: SessionCatalog): Unit = { - val isPathTable = QbeastCatalogUtils.isPathTable(ident) + val isPathTable = this.isPathTable(ident) // Get table location val location = if (isPathTable) { @@ -179,32 +224,11 @@ object QbeastCatalogUtils { .orElse(existingTableOpt.flatMap(_.storage.locationUri)) .getOrElse(existingSessionCatalog.defaultTablePath(id)) - // IF the table is EXTERNAL - // AND no qbeast properties are defined - // we can load the configuration from the commit log - val isExternal = tableType == CatalogTableType.EXTERNAL - val qbeastProperties = - if (isExternal) { - val locPath = new Path(loc) - val containsQbeastProperties = - QbeastOptions.containsQbeastProperties(allTableProperties.asScala.toMap) - if (!containsQbeastProperties && QbeastTableUtils.isQbeastTable(locPath)) { - QbeastTableUtils.getExistingConf(spark, locPath) - } else if (containsQbeastProperties) { - checkQbeastProperties(allTableProperties.asScala.toMap) - Map.empty // nothing to add - } else { - throw AnalysisExceptionFactory.create( - "The table you are trying to create is not Qbeast Formatted. " + - "Please specify the columns to index and the cube size in the options. " + - "You can also use the ConvertToQbeastCommand before creating the table.") - } - } else { - checkQbeastProperties(allTableProperties.asScala.toMap) - Map.empty // nothing to add - } - - val allProperties = allTableProperties.asScala.toMap ++ qbeastProperties + // Process the parameters/options/configuration sent to the table + val properties = allTableProperties.asScala.toMap + val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) + val allProperties = + properties ++ loadQbeastRequiredProperties(tableType, indexedTable, properties) // Initialize the path option val storage = DataSource @@ -240,9 +264,7 @@ object QbeastCatalogUtils { // Write data, if any val append = tableCreationMode.saveMode == SaveMode.Append dataFrame.map { df => - tableFactory - .getIndexedTable(QTableID(loc.toString)) - .save(df, allTableProperties.asScala.toMap, append) + indexedTable.save(df, allProperties, append) } // Update the existing session catalog with the Qbeast table information diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala index 47aa164e9..61ec278ae 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastWriteBuilder.scala @@ -62,8 +62,6 @@ class QbeastWriteBuilder( // Passing the options in the query plan plus the properties // because columnsToIndex needs to be included in the contract val writeOptions = info.options().toMap ++ properties - // scalastyle:off - println("data schema " + data.schema) indexedTable.save(data, writeOptions, append) // TODO: Push this to Apache Spark diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 4e24065fa..996274184 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -30,6 +30,12 @@ trait IndexedTable { */ def exists: Boolean + /** + * Returns whether the table is converted to Qbeast format. + * @return + */ + def isConverted: Boolean + /** * Returns the table id which identifies the table. * @@ -141,6 +147,13 @@ private[table] class IndexedTableImpl( override def exists: Boolean = !snapshot.isInitial + override def isConverted: Boolean = try { + snapshot.loadLatestRevision + true + } catch { + case _: Exception => false + } + private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = { // TODO feature: columnsToIndex may change between revisions checkColumnsToMatchSchema(latestRevision) From 9f4ec94f01f6637d4c6627f82d00108f4a051982 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 5 Dec 2023 13:44:05 +0100 Subject: [PATCH 03/15] Change test excepc tion --- .../spark/internal/sources/catalog/QbeastCatalogTest.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala index 4ab5d8c3e..b6fffa504 100644 --- a/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala +++ b/src/test/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogTest.scala @@ -4,7 +4,6 @@ import io.qbeast.spark.QbeastIntegrationTestSpec import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.TableIdentifier -import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException import org.apache.spark.sql.connector.catalog.{ CatalogExtension, CatalogPlugin, @@ -248,7 +247,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite "QbeastCatalogUtils" should "throw an error when trying to replace a non-existing table" in withQbeastContextSparkAndTmpWarehouse((spark, _) => { - an[CannotReplaceMissingTableException] shouldBe thrownBy( + an[AnalysisException] shouldBe thrownBy( QbeastCatalogUtils.createQbeastTable( Identifier.of(defaultNamespace, "students"), schema, From 68252aedd3261eef827055c08585e10dc0643da0 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 5 Dec 2023 13:51:53 +0100 Subject: [PATCH 04/15] Remove TODOs --- .../scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala index 52751b0e0..9c33c79f6 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala @@ -121,7 +121,6 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { }) - // TODO it should "create EXTERNAL existing table WITHOUT options" in withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { @@ -136,7 +135,6 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { }) - // TODO it should "throw an error if the table is NOT qbeast" in withQbeastContextSparkAndTmpWarehouse( (spark, tmpDir) => { From 7f015b009eff1f7825529e1bd12261e2bc182797 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 5 Dec 2023 14:29:36 +0100 Subject: [PATCH 05/15] Unify naming --- .../qbeast/spark/internal/QbeastOptions.scala | 14 +++++++--- .../internal/sources/QbeastDataSource.scala | 6 ++--- .../sources/catalog/QbeastCatalogUtils.scala | 27 ++++++++++--------- .../sources/v2/QbeastStagedTableImpl.scala | 4 +-- 4 files changed, 30 insertions(+), 21 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala index 54597b15d..aff3d2ab4 100644 --- a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala +++ b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala @@ -25,6 +25,8 @@ object QbeastOptions { val PATH = "path" val STATS = "columnStats" + private final val keys = Seq(COLUMNS_TO_INDEX, CUBE_SIZE, PATH, STATS) + /** * Gets the columns to index from the options * @param options the options passed on the dataframe @@ -89,17 +91,21 @@ object QbeastOptions { QbeastOptions(columnsToIndex, desiredCubeSize, stats) } - def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = { + def loadTableIDFromOptions(options: Map[String, String]): QTableID = { new QTableID( - parameters.getOrElse( + options.getOrElse( PATH, { throw AnalysisExceptionFactory.create("'path' is not specified") })) } - def checkQbeastProperties(parameters: Map[String, String]): Unit = { + def loadQbeastOptions(options: Map[String, String]): Map[String, String] = { + options.filter(keys.contains) + } + + def checkQbeastOptions(options: Map[String, String]): Unit = { require( - parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"), + options.contains("columnsToIndex") || options.contains("columnstoindex"), throw AnalysisExceptionFactory.create("'columnsToIndex is not specified")) } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala index 06e9a1e32..7c06e49eb 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala @@ -54,7 +54,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF schema: StructType, partitioning: Array[Transform], properties: util.Map[String, String]): Table = { - val tableId = QbeastOptions.loadTableIDFromParameters(properties.asScala.toMap) + val tableId = QbeastOptions.loadTableIDFromOptions(properties.asScala.toMap) val indexedTable = tableFactory.getIndexedTable(tableId) if (indexedTable.exists) { // If the table exists, we make sure to pass all the properties to QbeastTableImpl @@ -100,7 +100,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF parameters.contains("columnsToIndex") || mode == SaveMode.Append, throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified")) - val tableId = QbeastOptions.loadTableIDFromParameters(parameters) + val tableId = QbeastOptions.loadTableIDFromOptions(parameters) val table = tableFactory.getIndexedTable(tableId) mode match { case SaveMode.Append => table.save(data, parameters, append = true) @@ -116,7 +116,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF override def createRelation( sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = { - val tableID = QbeastOptions.loadTableIDFromParameters(parameters) + val tableID = QbeastOptions.loadTableIDFromOptions(parameters) val table = tableFactory.getIndexedTable(tableID) if (table.exists) { table.load() diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index df1d84f70..3cfd40a6e 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -5,7 +5,12 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.context.QbeastContext.metadataManager import io.qbeast.core.model.QTableID -import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE, checkQbeastProperties} +import io.qbeast.spark.internal.QbeastOptions.{ + COLUMNS_TO_INDEX, + CUBE_SIZE, + checkQbeastOptions, + loadQbeastOptions +} import io.qbeast.spark.internal.sources.v2.QbeastTableImpl import io.qbeast.spark.table.{IndexedTable, IndexedTableFactory} import org.apache.hadoop.fs.{FileSystem, Path} @@ -133,15 +138,13 @@ object QbeastCatalogUtils { } /** - * Loads the Qbeast parameters from: - * - Properties - * - Existing indexedTable + * Loads the Qbeast options if necessary * @param tableType the table type * @param indexedTable the indexed table * @param properties the properties * @return */ - def loadQbeastRequiredProperties( + private def loadQbeastRequiredOptions( tableType: CatalogTableType, indexedTable: IndexedTable, properties: Map[String, String]): Map[String, String] = { @@ -169,13 +172,13 @@ object QbeastCatalogUtils { "You can also use the ConvertToQbeastCommand before creating the table.") case _ => // If it contains Qbeast Properties, check them - checkQbeastProperties(properties) - Map.empty + checkQbeastOptions(properties) + loadQbeastOptions(properties) } } else { // If it's NOT external, check the properties - checkQbeastProperties(properties) - Map.empty + checkQbeastOptions(properties) + loadQbeastOptions(properties) } } @@ -204,12 +207,13 @@ object QbeastCatalogUtils { existingSessionCatalog: SessionCatalog): Unit = { val isPathTable = this.isPathTable(ident) + val properties = allTableProperties.asScala.toMap // Get table location val location = if (isPathTable) { Option(ident.name()) } else { - allTableProperties.asScala.get("location") + properties.get("location") } // Define the table type. @@ -225,10 +229,9 @@ object QbeastCatalogUtils { .getOrElse(existingSessionCatalog.defaultTablePath(id)) // Process the parameters/options/configuration sent to the table - val properties = allTableProperties.asScala.toMap val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) val allProperties = - properties ++ loadQbeastRequiredProperties(tableType, indexedTable, properties) + properties ++ loadQbeastRequiredOptions(tableType, indexedTable, properties) // Initialize the path option val storage = DataSource diff --git a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala index 8c07b5632..79933c1ed 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/v2/QbeastStagedTableImpl.scala @@ -3,7 +3,7 @@ */ package io.qbeast.spark.internal.sources.v2 -import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties +import io.qbeast.spark.internal.QbeastOptions.checkQbeastOptions import io.qbeast.spark.internal.sources.catalog.{CreationMode, QbeastCatalogUtils} import io.qbeast.spark.table.IndexedTableFactory import org.apache.spark.sql.catalyst.catalog.SessionCatalog @@ -70,7 +70,7 @@ private[sources] class QbeastStagedTableImpl( writeOptions.foreach { case (k, v) => props.put(k, v) } // Check all the Qbeast properties are correctly specified - checkQbeastProperties(props.asScala.toMap) + checkQbeastOptions(props.asScala.toMap) // Creates the corresponding table on the Catalog and executes // the writing of the dataFrame (if any) From 236946778bca9eb860bbaa6013c258112787381c Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 11 Dec 2023 09:49:08 +0100 Subject: [PATCH 06/15] Remove unnecessary code, add properties method to IndexedTable --- .../qbeast/spark/internal/QbeastOptions.scala | 6 -- .../sources/catalog/QbeastCatalogUtils.scala | 69 ++++--------------- .../io/qbeast/spark/table/IndexedTable.scala | 22 +++++- 3 files changed, 36 insertions(+), 61 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala index aff3d2ab4..60218f8e7 100644 --- a/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala +++ b/src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala @@ -25,8 +25,6 @@ object QbeastOptions { val PATH = "path" val STATS = "columnStats" - private final val keys = Seq(COLUMNS_TO_INDEX, CUBE_SIZE, PATH, STATS) - /** * Gets the columns to index from the options * @param options the options passed on the dataframe @@ -99,10 +97,6 @@ object QbeastOptions { })) } - def loadQbeastOptions(options: Map[String, String]): Map[String, String] = { - options.filter(keys.contains) - } - def checkQbeastOptions(options: Map[String, String]): Unit = { require( options.contains("columnsToIndex") || options.contains("columnstoindex"), diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 3cfd40a6e..5f25a216c 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -5,14 +5,9 @@ package io.qbeast.spark.internal.sources.catalog import io.qbeast.context.QbeastContext.metadataManager import io.qbeast.core.model.QTableID -import io.qbeast.spark.internal.QbeastOptions.{ - COLUMNS_TO_INDEX, - CUBE_SIZE, - checkQbeastOptions, - loadQbeastOptions -} +import io.qbeast.spark.internal.QbeastOptions._ import io.qbeast.spark.internal.sources.v2.QbeastTableImpl -import io.qbeast.spark.table.{IndexedTable, IndexedTableFactory} +import io.qbeast.spark.table.{IndexedTableFactory} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException @@ -24,6 +19,7 @@ import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{ + AnalysisException, AnalysisExceptionFactory, DataFrame, SaveMode, @@ -137,51 +133,6 @@ object QbeastCatalogUtils { } } - /** - * Loads the Qbeast options if necessary - * @param tableType the table type - * @param indexedTable the indexed table - * @param properties the properties - * @return - */ - private def loadQbeastRequiredOptions( - tableType: CatalogTableType, - indexedTable: IndexedTable, - properties: Map[String, String]): Map[String, String] = { - - val isExternal = tableType == CatalogTableType.EXTERNAL - val containsColumnsToIndex = - properties.contains("columnsToIndex") || properties.contains("columnstoindex") - if (isExternal && indexedTable.exists) { - // IF the table is EXTERNAL and EXISTS physically, - // Check and Add Qbeast Properties - (containsColumnsToIndex, indexedTable.isConverted) match { - case (false, true) => - // If it does NOT contain Qbeast Properties AND is converted, load the latest revision - val qbeastSnapshot = metadataManager.loadSnapshot(indexedTable.tableID) - val latestRevision = qbeastSnapshot.loadLatestRevision - val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",") - Map( - COLUMNS_TO_INDEX -> columnsToIndex, - CUBE_SIZE -> latestRevision.desiredCubeSize.toString) - case (_, false) => - // If it is NOT converted, throw error - throw AnalysisExceptionFactory.create( - "The table you are trying to create is not Qbeast Formatted. " + - "Please specify the columns to index and the cube size in the options. " + - "You can also use the ConvertToQbeastCommand before creating the table.") - case _ => - // If it contains Qbeast Properties, check them - checkQbeastOptions(properties) - loadQbeastOptions(properties) - } - } else { - // If it's NOT external, check the properties - checkQbeastOptions(properties) - loadQbeastOptions(properties) - } - } - /** * Creates a Table on the Catalog * @param ident the Identifier of the table @@ -230,8 +181,18 @@ object QbeastCatalogUtils { // Process the parameters/options/configuration sent to the table val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) - val allProperties = - properties ++ loadQbeastRequiredOptions(tableType, indexedTable, properties) + val allProperties = { + if (tableType == CatalogTableType.EXTERNAL && !indexedTable.isConverted) + throw AnalysisExceptionFactory.create("This Table is not converted") + else { + try { + checkQbeastOptions(properties) + properties // No options added + } catch { + case _: AnalysisException => properties ++ indexedTable.properties + } + } + } // Initialize the path option val storage = DataSource diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 996274184..3bff0c313 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -8,7 +8,7 @@ import io.qbeast.core.model._ import io.qbeast.spark.delta.{CubeDataLoader, StagingDataManager, StagingResolution} import io.qbeast.spark.index.QbeastColumns import io.qbeast.spark.internal.QbeastOptions -import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE} +import io.qbeast.spark.internal.QbeastOptions._ import io.qbeast.spark.internal.sources.QbeastBaseRelation import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES import org.apache.spark.sql.delta.actions.FileAction @@ -43,6 +43,13 @@ trait IndexedTable { */ def tableID: QTableID + /** + * Returns the table properties. + * + * @return the table properties + */ + def properties: Map[String, String] + /** * Saves given data in the table and updates the index. The specified columns are * used to define the index when the table is created or overwritten. The append @@ -154,6 +161,19 @@ private[table] class IndexedTableImpl( case _: Exception => false } + override def properties: Map[String, String] = { + if (exists && isConverted) { + val latestRevision = snapshot.loadLatestRevision + Map( + COLUMNS_TO_INDEX -> latestRevision.columnTransformers + .map(_.columnName) + .mkString(","), + CUBE_SIZE -> latestRevision.desiredCubeSize.toString) + } else { + Map.empty + } + } + private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = { // TODO feature: columnsToIndex may change between revisions checkColumnsToMatchSchema(latestRevision) From 8e5cdd84ae5b6aac6e5090e5fbdaa6432f32240b Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 11 Dec 2023 09:56:22 +0100 Subject: [PATCH 07/15] braces --- .../spark/internal/sources/catalog/QbeastCatalogUtils.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 5f25a216c..9c9c368a1 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -182,9 +182,9 @@ object QbeastCatalogUtils { // Process the parameters/options/configuration sent to the table val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) val allProperties = { - if (tableType == CatalogTableType.EXTERNAL && !indexedTable.isConverted) + if (tableType == CatalogTableType.EXTERNAL && !indexedTable.isConverted) { throw AnalysisExceptionFactory.create("This Table is not converted") - else { + } else { try { checkQbeastOptions(properties) properties // No options added From 52f731366227fc2037d2d76de702228cbcdd078e Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 11 Dec 2023 11:30:23 +0100 Subject: [PATCH 08/15] Fix exception --- .../sources/catalog/QbeastCatalogUtils.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 9c9c368a1..566c5d718 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -181,16 +181,19 @@ object QbeastCatalogUtils { // Process the parameters/options/configuration sent to the table val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) + val isNewQbeastTable = (tableType == CatalogTableType.EXTERNAL) && !indexedTable.isConverted val allProperties = { - if (tableType == CatalogTableType.EXTERNAL && !indexedTable.isConverted) { - throw AnalysisExceptionFactory.create("This Table is not converted") - } else { - try { - checkQbeastOptions(properties) - properties // No options added - } catch { - case _: AnalysisException => properties ++ indexedTable.properties - } + try { + checkQbeastOptions(properties) + properties // No options added + } catch { + case _: AnalysisException if (isNewQbeastTable) => + throw AnalysisExceptionFactory.create( + "The table you are trying to create is not Qbeast Formatted. " + + "Please specify the columns to index and the cube size in the options. " + + "You can also use the ConvertToQbeastCommand before creating the table.") + case _: AnalysisException => // Add existing table properties + properties ++ indexedTable.properties // Add the write options } } From 7be92aa1b88a55de2fbf4236ab5dcad9054ddedb Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Mon, 11 Dec 2023 11:39:19 +0100 Subject: [PATCH 09/15] Added checks --- .../internal/sources/catalog/QbeastCatalogUtils.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 566c5d718..326f523de 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -182,16 +182,16 @@ object QbeastCatalogUtils { // Process the parameters/options/configuration sent to the table val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) val isNewQbeastTable = (tableType == CatalogTableType.EXTERNAL) && !indexedTable.isConverted + if (indexedTable.exists && isNewQbeastTable) { + throw AnalysisExceptionFactory.create( + "The table you are trying to create already exists and is NOT Qbeast Formatted. " + + "Please use the ConvertToQbeastCommand before creating the table.") + } val allProperties = { try { checkQbeastOptions(properties) properties // No options added } catch { - case _: AnalysisException if (isNewQbeastTable) => - throw AnalysisExceptionFactory.create( - "The table you are trying to create is not Qbeast Formatted. " + - "Please specify the columns to index and the cube size in the options. " + - "You can also use the ConvertToQbeastCommand before creating the table.") case _: AnalysisException => // Add existing table properties properties ++ indexedTable.properties // Add the write options } From cd03beb5dcf2ad94b875ba847c36364575a9ac3a Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 12 Dec 2023 14:01:55 +0100 Subject: [PATCH 10/15] Change method name, aggregate code, add verifyAndMergeProperties name --- .../internal/sources/QbeastBaseRelation.scala | 17 +-- .../internal/sources/QbeastDataSource.scala | 7 +- .../sources/catalog/QbeastCatalogUtils.scala | 22 +-- .../io/qbeast/spark/table/IndexedTable.scala | 111 ++++++++------- .../utils/QbeastCreateTableSQLTest.scala | 128 ++++++++++++++++++ .../utils/QbeastSQLIntegrationTest.scala | 38 +----- 6 files changed, 195 insertions(+), 128 deletions(-) create mode 100644 src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala index 9127b8572..5a45cdcc1 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastBaseRelation.scala @@ -33,11 +33,11 @@ object QbeastBaseRelation { */ def createRelation( sqlContext: SQLContext, - table: IndexedTable, + indexedTable: IndexedTable, options: Map[String, String]): BaseRelation = { val spark = SparkSession.active - val tableID = table.tableID + val tableID = indexedTable.tableID val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID) val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID) if (snapshot.isInitial) { @@ -52,22 +52,19 @@ object QbeastBaseRelation { new ParquetFileFormat(), options)(spark) with InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit = { - table.save(data, options, append = !overwrite) + indexedTable.save(data, options, append = !overwrite) } } } else { // If the table contains data, initialize it - val revision = snapshot.loadLatestRevision - val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",") - val cubeSize = revision.desiredCubeSize - val parameters = - Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString()) - val path = new Path(tableID.id) val fileIndex = OTreeIndex(spark, path) val bucketSpec: Option[BucketSpec] = None val file = new ParquetFileFormat() + // Verify and Merge options with existing indexed properties + val parameters = indexedTable.verifyAndMergeProperties(options) + new HadoopFsRelation( fileIndex, partitionSchema = StructType(Seq.empty[StructField]), @@ -76,7 +73,7 @@ object QbeastBaseRelation { file, parameters)(spark) with InsertableRelation { def insert(data: DataFrame, overwrite: Boolean): Unit = { - table.save(data, parameters, append = !overwrite) + indexedTable.save(data, parameters, append = !overwrite) } } } diff --git a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala index 7c06e49eb..56906b6e6 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/QbeastDataSource.scala @@ -4,7 +4,6 @@ package io.qbeast.spark.internal.sources import io.qbeast.context.QbeastContext -import io.qbeast.context.QbeastContext.metadataManager import io.qbeast.spark.internal.QbeastOptions import io.qbeast.spark.internal.sources.v2.QbeastTableImpl import io.qbeast.spark.table.IndexedTableFactory @@ -58,11 +57,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF val indexedTable = tableFactory.getIndexedTable(tableId) if (indexedTable.exists) { // If the table exists, we make sure to pass all the properties to QbeastTableImpl - val currentRevision = metadataManager.loadSnapshot(tableId).loadLatestRevision - val indexProperties = Map( - "columnsToIndex" -> currentRevision.columnTransformers.map(_.columnName).mkString(","), - "cubeSize" -> currentRevision.desiredCubeSize.toString) - val tableProperties = properties.asScala.toMap ++ indexProperties + val tableProperties = indexedTable.verifyAndMergeProperties(properties.asScala.toMap) new QbeastTableImpl( TableIdentifier(tableId.id), new Path(tableId.id), diff --git a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala index 326f523de..2d2e7e7d8 100644 --- a/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala +++ b/src/main/scala/io/qbeast/spark/internal/sources/catalog/QbeastCatalogUtils.scala @@ -3,11 +3,10 @@ */ package io.qbeast.spark.internal.sources.catalog -import io.qbeast.context.QbeastContext.metadataManager +import io.qbeast.context.QbeastContext.{metadataManager} import io.qbeast.core.model.QTableID -import io.qbeast.spark.internal.QbeastOptions._ import io.qbeast.spark.internal.sources.v2.QbeastTableImpl -import io.qbeast.spark.table.{IndexedTableFactory} +import io.qbeast.spark.table.IndexedTableFactory import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException @@ -19,7 +18,6 @@ import org.apache.spark.sql.delta.DeltaLog import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{ - AnalysisException, AnalysisExceptionFactory, DataFrame, SaveMode, @@ -181,21 +179,7 @@ object QbeastCatalogUtils { // Process the parameters/options/configuration sent to the table val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString)) - val isNewQbeastTable = (tableType == CatalogTableType.EXTERNAL) && !indexedTable.isConverted - if (indexedTable.exists && isNewQbeastTable) { - throw AnalysisExceptionFactory.create( - "The table you are trying to create already exists and is NOT Qbeast Formatted. " + - "Please use the ConvertToQbeastCommand before creating the table.") - } - val allProperties = { - try { - checkQbeastOptions(properties) - properties // No options added - } catch { - case _: AnalysisException => // Add existing table properties - properties ++ indexedTable.properties // Add the write options - } - } + val allProperties = indexedTable.verifyAndMergeProperties(properties) // Initialize the path option val storage = DataSource diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 3bff0c313..3066858c3 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -14,7 +14,7 @@ import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES import org.apache.spark.sql.delta.actions.FileAction import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{AnalysisExceptionFactory, DataFrame} +import org.apache.spark.sql.{AnalysisException, AnalysisExceptionFactory, DataFrame} import java.util.ConcurrentModificationException @@ -31,10 +31,10 @@ trait IndexedTable { def exists: Boolean /** - * Returns whether the table is converted to Qbeast format. + * Returns whether the table contains Qbeast metadata * @return */ - def isConverted: Boolean + def containsQbeastMetadata: Boolean /** * Returns the table id which identifies the table. @@ -44,11 +44,11 @@ trait IndexedTable { def tableID: QTableID /** - * Returns the table properties. - * - * @return the table properties + * Merge new and index current properties + * @param properties the properties you want to merge + * @return */ - def properties: Map[String, String] + def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] /** * Saves given data in the table and updates the index. The specified columns are @@ -152,31 +152,59 @@ private[table] class IndexedTableImpl( with StagingUtils { private var snapshotCache: Option[QbeastSnapshot] = None + /** + * Latest Revision Available + * + * @return + */ + private def latestRevision: Revision = snapshot.loadLatestRevision + override def exists: Boolean = !snapshot.isInitial - override def isConverted: Boolean = try { + override def containsQbeastMetadata: Boolean = try { snapshot.loadLatestRevision true } catch { - case _: Exception => false + case _: AnalysisException => false } - override def properties: Map[String, String] = { - if (exists && isConverted) { - val latestRevision = snapshot.loadLatestRevision - Map( - COLUMNS_TO_INDEX -> latestRevision.columnTransformers - .map(_.columnName) - .mkString(","), - CUBE_SIZE -> latestRevision.desiredCubeSize.toString) + override def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] = { + if (!exists) { // IF not exists, we should only check new properties + checkQbeastOptions(properties) + properties + } else if (containsQbeastMetadata) { // If exists, we can merge both properties: new and current + val currentColumnsIndexed = + latestRevision.columnTransformers.map(_.columnName).mkString(",") + val currentCubeSize = latestRevision.desiredCubeSize.toString + val finalProperties = { + (properties.contains(COLUMNS_TO_INDEX), properties.contains(CUBE_SIZE)) match { + case (true, true) => properties + case (false, false) => + properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed, CUBE_SIZE -> currentCubeSize) + case (true, false) => properties + (CUBE_SIZE -> currentCubeSize) + case (false, true) => + properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed) + } + } + finalProperties } else { - Map.empty + throw AnalysisExceptionFactory.create( + s"Table ${tableID.id} exists but does not contain Qbeast metadata. " + + s"Please use ConvertToQbeastCommand to convert the table to Qbeast.") } } - private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = { + private def isNewRevision(qbeastOptions: QbeastOptions): Boolean = { + // TODO feature: columnsToIndex may change between revisions - checkColumnsToMatchSchema(latestRevision) + val columnsToIndex = qbeastOptions.columnsToIndex + val currentColumnsToIndex = latestRevision.columnTransformers.map(_.columnName) + val isNewColumns = !latestRevision.matchColumns(columnsToIndex) + if (isNewColumns) { + throw AnalysisExceptionFactory.create( + s"Columns to index '${columnsToIndex.mkString(",")}' do not match " + + s"existing index ${currentColumnsToIndex.mkString(",")}.") + } // Checks if the desiredCubeSize is different from the existing one val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize // Checks if the user-provided column boundaries would trigger the creation of @@ -202,43 +230,22 @@ private[table] class IndexedTableImpl( } - /** - * Add the required indexing parameters when the SaveMode is Append. - * The user-provided parameters are respected. - * @param latestRevision the latest revision - * @param parameters the parameters required for indexing - */ - private def addRequiredParams( - latestRevision: Revision, - parameters: Map[String, String]): Map[String, String] = { - val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",") - val desiredCubeSize = latestRevision.desiredCubeSize.toString - (parameters.contains(COLUMNS_TO_INDEX), parameters.contains(CUBE_SIZE)) match { - case (true, true) => parameters - case (false, false) => - parameters + (COLUMNS_TO_INDEX -> columnsToIndex, CUBE_SIZE -> desiredCubeSize) - case (true, false) => parameters + (CUBE_SIZE -> desiredCubeSize) - case (false, true) => parameters + (COLUMNS_TO_INDEX -> columnsToIndex) - } - } - override def save( data: DataFrame, parameters: Map[String, String], append: Boolean): BaseRelation = { val indexStatus = if (exists && append) { - // If the table exists and we are appending new data + // If the indexedTable exists and we are appending new data // 1. Load existing IndexStatus - val latestRevision = snapshot.loadLatestRevision - val updatedParameters = addRequiredParams(latestRevision, parameters) + val updatedProperties = verifyAndMergeProperties(parameters) if (isStaging(latestRevision)) { // If the existing Revision is Staging - IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters)) + IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedProperties)) } else { - if (isNewRevision(QbeastOptions(updatedParameters), latestRevision)) { + if (isNewRevision(QbeastOptions(updatedProperties))) { // If the new parameters generate a new revision, we need to create another one val newPotentialRevision = revisionBuilder - .createNewRevision(tableID, data.schema, updatedParameters) + .createNewRevision(tableID, data.schema, updatedProperties) val newRevisionCubeSize = newPotentialRevision.desiredCubeSize // Merge new Revision Transformations with old Revision Transformations val newRevisionTransformations = @@ -288,16 +295,8 @@ private[table] class IndexedTableImpl( snapshotCache = None } - private def checkColumnsToMatchSchema(revision: Revision): Unit = { - val columnsToIndex = revision.columnTransformers.map(_.columnName) - if (!snapshot.loadLatestRevision.matchColumns(columnsToIndex)) { - throw AnalysisExceptionFactory.create( - s"Columns to index '$columnsToIndex' do not match existing index.") - } - } - /** - * Creates a QbeastBaseRelation for the given table. + * Creates a QbeastBaseRelation for the given indexedTable. * @return the QbeastBaseRelation */ private def createQbeastBaseRelation(): BaseRelation = { @@ -415,7 +414,7 @@ private[table] class IndexedTableImpl( val currentIndexStatus = snapshot.loadIndexStatus(revisionID) metadataManager.updateWithTransaction(tableID, schema, append = true) { - // There's no affected table changes on compaction, so we send an empty object + // There's no affected indexedTable changes on compaction, so we send an empty object val tableChanges = BroadcastedTableChanges(None, currentIndexStatus, Map.empty) val fileActions = dataWriter.compact(tableID, schema, currentIndexStatus, tableChanges) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala new file mode 100644 index 000000000..a87406847 --- /dev/null +++ b/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala @@ -0,0 +1,128 @@ +package io.qbeast.spark.utils + +import io.qbeast.TestClasses.Student +import io.qbeast.spark.{QbeastIntegrationTestSpec, QbeastTable} +import org.apache.spark.sql.{DataFrame, SparkSession} +import org.apache.spark.sql.AnalysisException + +import scala.util.Random + +class QbeastCreateTableSQLTest extends QbeastIntegrationTestSpec { + + private val students = 1.to(10).map(i => Student(i, i.toString, Random.nextInt())) + + private def createStudentsTestData(spark: SparkSession): DataFrame = { + import spark.implicits._ + students.toDF() + } + + it should "create EXTERNAL existing indexedTable WITHOUT options" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write.format("qbeast").option("columnsToIndex", "id,name").save(location) + + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"LOCATION '$location'") + + }) + + it should "throw an error if the indexedTable is NOT qbeast" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write.format("delta").save(location) + + an[AnalysisException] shouldBe thrownBy( + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"LOCATION '$location'")) + + an[AnalysisException] shouldBe thrownBy( + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + "OPTIONS ('columnsToIndex'='id') " + + s"LOCATION '$location'")) + + }) + + it should "NOT overwrite existing columnsToIndex if specified" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + // WRITE INITIAL DATA WITH QBEAST + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write.format("qbeast").option("columnsToIndex", "id").save(location) + + // COLUMNS TO INDEX ARE CHANGED + spark.sql( + s"CREATE EXTERNAL TABLE student_column_change (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"OPTIONS ('columnsToIndex'='id,name') " + + s"LOCATION '$location'") + + // COLUMNS TO INDEX CANNOT BE CHANGED + an[AnalysisException] shouldBe thrownBy(data.writeTo("student_column_change").append()) + }) + + it should "overwrite existing CUBE SIZE options if specified" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + // WRITE INITIAL DATA WITH QBEAST + val location = tmpDir + "/external_student/" + val data = createStudentsTestData(spark) + data.write + .format("qbeast") + .option("columnsToIndex", "id") + .option("cubeSize", "100") + .save(location) + + spark.sql( + s"CREATE EXTERNAL TABLE student_cube_change (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"OPTIONS ('cubeSize'='50') " + + s"LOCATION '$location'") + + val qbeastTable = QbeastTable.forPath(spark, location) + qbeastTable.cubeSize() shouldBe 100 + + data.writeTo("student_cube_change").append() + + spark.sql("SELECT * FROM student_cube_change").count() shouldBe data.count() * 2 + qbeastTable.cubeSize() shouldBe 50 + + }) + + it should "create indexedTable even if location is not populated" in + withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { + + val location = tmpDir + "/external_student/" + + spark.sql( + s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + + s"USING qbeast " + + s"OPTIONS ('columnsToIndex'='id,name') " + + s"LOCATION '$location'") + + // SELECT FROM + spark.sql("SELECT * FROM student").count() shouldBe 0 + + // WRITE TO + val data = createStudentsTestData(spark) + data.writeTo("student").append() + + // SELECT FROM + spark.read.format("qbeast").load(location).count() shouldBe data.count() + spark.sql("SELECT * FROM student").count() shouldBe data.count() + val qbeastTable = QbeastTable.forPath(spark, location) + qbeastTable.indexedColumns() shouldBe Seq("id", "name") + + }) + +} diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala index 9c33c79f6..a2712bdf0 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala @@ -50,7 +50,7 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { spark.sql( s"CREATE TABLE student (id INT, name STRING, age INT) USING qbeast " + - "OPTIONS ('columnsToIndex'='id')") + "TBLPROPERTIES ('columnsToIndex'='id')") spark.sql("INSERT INTO table student SELECT * FROM data") @@ -121,42 +121,6 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { }) - it should "create EXTERNAL existing table WITHOUT options" in - withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { - - val location = tmpDir + "/external_student/" - val data = createTestData(spark) - data.write.format("qbeast").option("columnsToIndex", "id,name").save(location) - - spark.sql( - s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + - s"USING qbeast " + - s"LOCATION '$location'") - - }) - - it should "throw an error if the table is NOT qbeast" in withQbeastContextSparkAndTmpWarehouse( - (spark, tmpDir) => { - - val location = tmpDir + "/external_student/" - val data = createTestData(spark) - data.write.format("delta").save(location) - - an[AnalysisException] shouldBe thrownBy( - spark.sql( - s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + - s"USING qbeast " + - s"LOCATION '$location'")) - - an[AnalysisException] shouldBe thrownBy( - spark.sql( - s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " + - s"USING qbeast " + - "OPTIONS ('columnsToIndex'='id') " + - s"LOCATION '$location'")) - - }) - it should "throw an error when using different path locations" in withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { From b69b2c1a572efdfe6a9cfe293e1d274817691fa2 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 12 Dec 2023 14:02:46 +0100 Subject: [PATCH 11/15] Fix test name --- .../scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala index a87406847..7c5d55f73 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastCreateTableSQLTest.scala @@ -16,7 +16,7 @@ class QbeastCreateTableSQLTest extends QbeastIntegrationTestSpec { students.toDF() } - it should "create EXTERNAL existing indexedTable WITHOUT options" in + "Qbeast SQL" should "create EXTERNAL existing indexedTable WITHOUT options" in withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => { val location = tmpDir + "/external_student/" From 2e78bad96960af0a42500c99af631d6c2e43b173 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 12 Dec 2023 14:05:47 +0100 Subject: [PATCH 12/15] Rollback some changes --- .../io/qbeast/spark/table/IndexedTable.scala | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index 3066858c3..c9a150cc4 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -165,7 +165,7 @@ private[table] class IndexedTableImpl( snapshot.loadLatestRevision true } catch { - case _: AnalysisException => false + case _: Exception => false } override def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] = { @@ -236,16 +236,16 @@ private[table] class IndexedTableImpl( append: Boolean): BaseRelation = { val indexStatus = if (exists && append) { - // If the indexedTable exists and we are appending new data + // If the table exists and we are appending new data // 1. Load existing IndexStatus - val updatedProperties = verifyAndMergeProperties(parameters) + val updatedParameters = verifyAndMergeProperties(parameters) if (isStaging(latestRevision)) { // If the existing Revision is Staging - IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedProperties)) + IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters)) } else { - if (isNewRevision(QbeastOptions(updatedProperties))) { + if (isNewRevision(QbeastOptions(updatedParameters), latestRevision)) { // If the new parameters generate a new revision, we need to create another one val newPotentialRevision = revisionBuilder - .createNewRevision(tableID, data.schema, updatedProperties) + .createNewRevision(tableID, data.schema, updatedParameters) val newRevisionCubeSize = newPotentialRevision.desiredCubeSize // Merge new Revision Transformations with old Revision Transformations val newRevisionTransformations = @@ -296,7 +296,7 @@ private[table] class IndexedTableImpl( } /** - * Creates a QbeastBaseRelation for the given indexedTable. + * Creates a QbeastBaseRelation for the given table. * @return the QbeastBaseRelation */ private def createQbeastBaseRelation(): BaseRelation = { @@ -414,7 +414,7 @@ private[table] class IndexedTableImpl( val currentIndexStatus = snapshot.loadIndexStatus(revisionID) metadataManager.updateWithTransaction(tableID, schema, append = true) { - // There's no affected indexedTable changes on compaction, so we send an empty object + // There's no affected table changes on compaction, so we send an empty object val tableChanges = BroadcastedTableChanges(None, currentIndexStatus, Map.empty) val fileActions = dataWriter.compact(tableID, schema, currentIndexStatus, tableChanges) From 559758c8e174cbab63964f6b3885fddc52173d81 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 12 Dec 2023 14:06:31 +0100 Subject: [PATCH 13/15] Rollback --- .../scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala index a2712bdf0..287151acd 100644 --- a/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala +++ b/src/test/scala/io/qbeast/spark/utils/QbeastSQLIntegrationTest.scala @@ -50,7 +50,7 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec { spark.sql( s"CREATE TABLE student (id INT, name STRING, age INT) USING qbeast " + - "TBLPROPERTIES ('columnsToIndex'='id')") + "OPTIONS ('columnsToIndex'='id')") spark.sql("INSERT INTO table student SELECT * FROM data") From ad6e131f938ce16fbcbb1d009da967d78b9f40a5 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 12 Dec 2023 14:07:27 +0100 Subject: [PATCH 14/15] fix --- src/main/scala/io/qbeast/spark/table/IndexedTable.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index c9a150cc4..fbda0306e 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -14,7 +14,7 @@ import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES import org.apache.spark.sql.delta.actions.FileAction import org.apache.spark.sql.sources.BaseRelation import org.apache.spark.sql.types.StructType -import org.apache.spark.sql.{AnalysisException, AnalysisExceptionFactory, DataFrame} +import org.apache.spark.sql.{AnalysisExceptionFactory, DataFrame} import java.util.ConcurrentModificationException @@ -242,7 +242,7 @@ private[table] class IndexedTableImpl( if (isStaging(latestRevision)) { // If the existing Revision is Staging IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters)) } else { - if (isNewRevision(QbeastOptions(updatedParameters), latestRevision)) { + if (isNewRevision(QbeastOptions(updatedParameters))) { // If the new parameters generate a new revision, we need to create another one val newPotentialRevision = revisionBuilder .createNewRevision(tableID, data.schema, updatedParameters) From 61a7468b30a15ff8813b39154aab92640cb20884 Mon Sep 17 00:00:00 2001 From: osopardo1 Date: Tue, 12 Dec 2023 16:09:56 +0100 Subject: [PATCH 15/15] Change method name --- .../scala/io/qbeast/spark/table/IndexedTable.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala index fbda0306e..05e69abaa 100644 --- a/src/main/scala/io/qbeast/spark/table/IndexedTable.scala +++ b/src/main/scala/io/qbeast/spark/table/IndexedTable.scala @@ -34,7 +34,7 @@ trait IndexedTable { * Returns whether the table contains Qbeast metadata * @return */ - def containsQbeastMetadata: Boolean + def hasQbeastMetadata: Boolean /** * Returns the table id which identifies the table. @@ -161,7 +161,7 @@ private[table] class IndexedTableImpl( override def exists: Boolean = !snapshot.isInitial - override def containsQbeastMetadata: Boolean = try { + override def hasQbeastMetadata: Boolean = try { snapshot.loadLatestRevision true } catch { @@ -169,10 +169,12 @@ private[table] class IndexedTableImpl( } override def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] = { - if (!exists) { // IF not exists, we should only check new properties + if (!exists) { + // IF not exists, we should only check new properties checkQbeastOptions(properties) properties - } else if (containsQbeastMetadata) { // If exists, we can merge both properties: new and current + } else if (hasQbeastMetadata) { + // IF has qbeast metadata, we can merge both properties: new and current val currentColumnsIndexed = latestRevision.columnTransformers.map(_.columnName).mkString(",") val currentCubeSize = latestRevision.desiredCubeSize.toString