Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CREATE EXTERNAL TABLE without OPTIONS #248

Merged
merged 15 commits into from
Dec 18, 2023
8 changes: 4 additions & 4 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ object QbeastOptions {
QbeastOptions(columnsToIndex, desiredCubeSize, stats)
}

def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = {
def loadTableIDFromOptions(options: Map[String, String]): QTableID = {
new QTableID(
parameters.getOrElse(
options.getOrElse(
PATH, {
throw AnalysisExceptionFactory.create("'path' is not specified")
}))
}

def checkQbeastProperties(parameters: Map[String, String]): Unit = {
def checkQbeastOptions(options: Map[String, String]): Unit = {
require(
parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"),
options.contains("columnsToIndex") || options.contains("columnstoindex"),
throw AnalysisExceptionFactory.create("'columnsToIndex is not specified"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
val tableId = QbeastOptions.loadTableIDFromParameters(properties.asScala.toMap)
val tableId = QbeastOptions.loadTableIDFromOptions(properties.asScala.toMap)
val indexedTable = tableFactory.getIndexedTable(tableId)
if (indexedTable.exists) {
// If the table exists, we make sure to pass all the properties to QbeastTableImpl
Expand Down Expand Up @@ -100,7 +100,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
parameters.contains("columnsToIndex") || mode == SaveMode.Append,
throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified"))

val tableId = QbeastOptions.loadTableIDFromParameters(parameters)
val tableId = QbeastOptions.loadTableIDFromOptions(parameters)
val table = tableFactory.getIndexedTable(tableId)
mode match {
case SaveMode.Append => table.save(data, parameters, append = true)
Expand All @@ -116,7 +116,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val tableID = QbeastOptions.loadTableIDFromParameters(parameters)
val tableID = QbeastOptions.loadTableIDFromOptions(parameters)
val table = tableFactory.getIndexedTable(tableID)
if (table.exists) {
table.load()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext
import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -110,7 +109,6 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
properties: util.Map[String, String]): Table = {

if (QbeastCatalogUtils.isQbeastProvider(properties)) {
checkQbeastProperties(properties.asScala.toMap)
// Create the table
QbeastCatalogUtils.createQbeastTable(
ident,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.core.model.QTableID
import io.qbeast.spark.internal.QbeastOptions._
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.IndexedTableFactory
import io.qbeast.spark.table.{IndexedTableFactory}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
Expand All @@ -18,6 +19,7 @@ import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{
AnalysisException,
AnalysisExceptionFactory,
DataFrame,
SaveMode,
Expand Down Expand Up @@ -155,13 +157,14 @@ object QbeastCatalogUtils {
tableFactory: IndexedTableFactory,
existingSessionCatalog: SessionCatalog): Unit = {

val isPathTable = QbeastCatalogUtils.isPathTable(ident)
val isPathTable = this.isPathTable(ident)
val properties = allTableProperties.asScala.toMap

// Get table location
val location = if (isPathTable) {
Option(ident.name())
} else {
Option(allTableProperties.get("location"))
properties.get("location")
}

// Define the table type.
Expand All @@ -176,6 +179,24 @@ object QbeastCatalogUtils {
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(existingSessionCatalog.defaultTablePath(id))

// Process the parameters/options/configuration sent to the table
val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString))
val isNewQbeastTable = (tableType == CatalogTableType.EXTERNAL) && !indexedTable.isConverted
if (indexedTable.exists && isNewQbeastTable) {
throw AnalysisExceptionFactory.create(
"The table you are trying to create already exists and is NOT Qbeast Formatted. " +
"Please use the ConvertToQbeastCommand before creating the table.")
}
val allProperties = {
try {
checkQbeastOptions(properties)
properties // No options added
} catch {
case _: AnalysisException => // Add existing table properties
properties ++ indexedTable.properties // Add the write options
}
}

// Initialize the path option
val storage = DataSource
.buildStorageFormatFromOptions(writeOptions)
Expand All @@ -198,7 +219,7 @@ object QbeastCatalogUtils {
provider = Some("qbeast"),
partitionColumnNames = Seq.empty,
bucketSpec = None,
properties = allTableProperties.asScala.toMap,
properties = allProperties,
comment = commentOpt)

// Verify the schema if it's an external table
Expand All @@ -210,9 +231,7 @@ object QbeastCatalogUtils {
// Write data, if any
val append = tableCreationMode.saveMode == SaveMode.Append
dataFrame.map { df =>
tableFactory
.getIndexedTable(QTableID(loc.toString))
.save(df, allTableProperties.asScala.toMap, append)
indexedTable.save(df, allProperties, append)
}

// Update the existing session catalog with the Qbeast table information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.internal.sources.v2

import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import io.qbeast.spark.internal.QbeastOptions.checkQbeastOptions
import io.qbeast.spark.internal.sources.catalog.{CreationMode, QbeastCatalogUtils}
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
Expand Down Expand Up @@ -70,7 +70,7 @@ private[sources] class QbeastStagedTableImpl(
writeOptions.foreach { case (k, v) => props.put(k, v) }

// Check all the Qbeast properties are correctly specified
checkQbeastProperties(props.asScala.toMap)
checkQbeastOptions(props.asScala.toMap)

// Creates the corresponding table on the Catalog and executes
// the writing of the dataFrame (if any)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class QbeastWriteBuilder(
// Passing the options in the query plan plus the properties
// because columnsToIndex needs to be included in the contract
val writeOptions = info.options().toMap ++ properties
// scalastyle:off
println("data schema " + data.schema)
indexedTable.save(data, writeOptions, append)

// TODO: Push this to Apache Spark
Expand Down
35 changes: 34 additions & 1 deletion src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.qbeast.core.model._
import io.qbeast.spark.delta.{CubeDataLoader, StagingDataManager, StagingResolution}
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE}
import io.qbeast.spark.internal.QbeastOptions._
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES
import org.apache.spark.sql.delta.actions.FileAction
Expand All @@ -30,13 +30,26 @@ trait IndexedTable {
*/
def exists: Boolean

/**
* Returns whether the table is converted to Qbeast format.
* @return
*/
def isConverted: Boolean

/**
* Returns the table id which identifies the table.
*
* @return the table id
*/
def tableID: QTableID

/**
* Returns the table properties.
*
* @return the table properties
*/
def properties: Map[String, String]

/**
* Saves given data in the table and updates the index. The specified columns are
* used to define the index when the table is created or overwritten. The append
Expand Down Expand Up @@ -141,6 +154,26 @@ private[table] class IndexedTableImpl(

override def exists: Boolean = !snapshot.isInitial

override def isConverted: Boolean = try {
snapshot.loadLatestRevision
true
} catch {
case _: Exception => false
}

override def properties: Map[String, String] = {
if (exists && isConverted) {
val latestRevision = snapshot.loadLatestRevision
Map(
COLUMNS_TO_INDEX -> latestRevision.columnTransformers
.map(_.columnName)
.mkString(","),
CUBE_SIZE -> latestRevision.desiredCubeSize.toString)
} else {
Map.empty
}
}

private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = {
// TODO feature: columnsToIndex may change between revisions
checkColumnsToMatchSchema(latestRevision)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.qbeast.spark.QbeastIntegrationTestSpec
import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.connector.catalog.{
CatalogExtension,
CatalogPlugin,
Expand Down Expand Up @@ -248,7 +247,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite

"QbeastCatalogUtils" should "throw an error when trying to replace a non-existing table" in
withQbeastContextSparkAndTmpWarehouse((spark, _) => {
an[CannotReplaceMissingTableException] shouldBe thrownBy(
an[AnalysisException] shouldBe thrownBy(
QbeastCatalogUtils.createQbeastTable(
Identifier.of(defaultNamespace, "students"),
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,42 @@ class QbeastSQLIntegrationTest extends QbeastIntegrationTestSpec {

})

it should "create EXTERNAL existing table WITHOUT options" in
withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => {

val location = tmpDir + "/external_student/"
val data = createTestData(spark)
data.write.format("qbeast").option("columnsToIndex", "id,name").save(location)

spark.sql(
s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " +
s"USING qbeast " +
s"LOCATION '$location'")

})

it should "throw an error if the table is NOT qbeast" in withQbeastContextSparkAndTmpWarehouse(
(spark, tmpDir) => {

val location = tmpDir + "/external_student/"
val data = createTestData(spark)
data.write.format("delta").save(location)

an[AnalysisException] shouldBe thrownBy(
spark.sql(
s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " +
s"USING qbeast " +
s"LOCATION '$location'"))

an[AnalysisException] shouldBe thrownBy(
spark.sql(
s"CREATE EXTERNAL TABLE student (id INT, name STRING, age INT) " +
s"USING qbeast " +
"OPTIONS ('columnsToIndex'='id') " +
s"LOCATION '$location'"))

})

it should "throw an error when using different path locations" in
withQbeastContextSparkAndTmpWarehouse((spark, tmpDir) => {

Expand Down