Skip to content

Commit

Permalink
Merge pull request #248 from osopardo1/create-external-table
Browse files Browse the repository at this point in the history
Create a Table from a external location without passing the options.
  • Loading branch information
osopardo1 authored Dec 18, 2023
2 parents 304ed4f + 61a7468 commit 468c86d
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 72 deletions.
8 changes: 4 additions & 4 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -89,17 +89,17 @@ object QbeastOptions {
QbeastOptions(columnsToIndex, desiredCubeSize, stats)
}

def loadTableIDFromParameters(parameters: Map[String, String]): QTableID = {
def loadTableIDFromOptions(options: Map[String, String]): QTableID = {
new QTableID(
parameters.getOrElse(
options.getOrElse(
PATH, {
throw AnalysisExceptionFactory.create("'path' is not specified")
}))
}

def checkQbeastProperties(parameters: Map[String, String]): Unit = {
def checkQbeastOptions(options: Map[String, String]): Unit = {
require(
parameters.contains("columnsToIndex") || parameters.contains("columnstoindex"),
options.contains("columnsToIndex") || options.contains("columnstoindex"),
throw AnalysisExceptionFactory.create("'columnsToIndex is not specified"))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ object QbeastBaseRelation {
*/
def createRelation(
sqlContext: SQLContext,
table: IndexedTable,
indexedTable: IndexedTable,
options: Map[String, String]): BaseRelation = {

val spark = SparkSession.active
val tableID = table.tableID
val tableID = indexedTable.tableID
val snapshot = QbeastContext.metadataManager.loadSnapshot(tableID)
val schema = QbeastContext.metadataManager.loadCurrentSchema(tableID)
if (snapshot.isInitial) {
Expand All @@ -52,22 +52,19 @@ object QbeastBaseRelation {
new ParquetFileFormat(),
options)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, options, append = !overwrite)
indexedTable.save(data, options, append = !overwrite)
}
}
} else {
// If the table contains data, initialize it
val revision = snapshot.loadLatestRevision
val columnsToIndex = revision.columnTransformers.map(row => row.columnName).mkString(",")
val cubeSize = revision.desiredCubeSize
val parameters =
Map[String, String]("columnsToIndex" -> columnsToIndex, "cubeSize" -> cubeSize.toString())

val path = new Path(tableID.id)
val fileIndex = OTreeIndex(spark, path)
val bucketSpec: Option[BucketSpec] = None
val file = new ParquetFileFormat()

// Verify and Merge options with existing indexed properties
val parameters = indexedTable.verifyAndMergeProperties(options)

new HadoopFsRelation(
fileIndex,
partitionSchema = StructType(Seq.empty[StructField]),
Expand All @@ -76,7 +73,7 @@ object QbeastBaseRelation {
file,
parameters)(spark) with InsertableRelation {
def insert(data: DataFrame, overwrite: Boolean): Unit = {
table.save(data, parameters, append = !overwrite)
indexedTable.save(data, parameters, append = !overwrite)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.qbeast.spark.internal.sources

import io.qbeast.context.QbeastContext
import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.IndexedTableFactory
Expand Down Expand Up @@ -54,15 +53,11 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
schema: StructType,
partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
val tableId = QbeastOptions.loadTableIDFromParameters(properties.asScala.toMap)
val tableId = QbeastOptions.loadTableIDFromOptions(properties.asScala.toMap)
val indexedTable = tableFactory.getIndexedTable(tableId)
if (indexedTable.exists) {
// If the table exists, we make sure to pass all the properties to QbeastTableImpl
val currentRevision = metadataManager.loadSnapshot(tableId).loadLatestRevision
val indexProperties = Map(
"columnsToIndex" -> currentRevision.columnTransformers.map(_.columnName).mkString(","),
"cubeSize" -> currentRevision.desiredCubeSize.toString)
val tableProperties = properties.asScala.toMap ++ indexProperties
val tableProperties = indexedTable.verifyAndMergeProperties(properties.asScala.toMap)
new QbeastTableImpl(
TableIdentifier(tableId.id),
new Path(tableId.id),
Expand Down Expand Up @@ -100,7 +95,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
parameters.contains("columnsToIndex") || mode == SaveMode.Append,
throw AnalysisExceptionFactory.create("'columnsToIndex' is not specified"))

val tableId = QbeastOptions.loadTableIDFromParameters(parameters)
val tableId = QbeastOptions.loadTableIDFromOptions(parameters)
val table = tableFactory.getIndexedTable(tableId)
mode match {
case SaveMode.Append => table.save(data, parameters, append = true)
Expand All @@ -116,7 +111,7 @@ class QbeastDataSource private[sources] (private val tableFactory: IndexedTableF
override def createRelation(
sqlContext: SQLContext,
parameters: Map[String, String]): BaseRelation = {
val tableID = QbeastOptions.loadTableIDFromParameters(parameters)
val tableID = QbeastOptions.loadTableIDFromOptions(parameters)
val table = tableFactory.getIndexedTable(tableID)
if (table.exists) {
table.load()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext
import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl}
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.catalyst.TableIdentifier
Expand Down Expand Up @@ -110,7 +109,6 @@ class QbeastCatalog[T <: TableCatalog with SupportsNamespaces with FunctionCatal
properties: util.Map[String, String]): Table = {

if (QbeastCatalogUtils.isQbeastProvider(properties)) {
checkQbeastProperties(properties.asScala.toMap)
// Create the table
QbeastCatalogUtils.createQbeastTable(
ident,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.internal.sources.catalog

import io.qbeast.context.QbeastContext.metadataManager
import io.qbeast.context.QbeastContext.{metadataManager}
import io.qbeast.core.model.QTableID
import io.qbeast.spark.internal.sources.v2.QbeastTableImpl
import io.qbeast.spark.table.IndexedTableFactory
Expand Down Expand Up @@ -155,13 +155,14 @@ object QbeastCatalogUtils {
tableFactory: IndexedTableFactory,
existingSessionCatalog: SessionCatalog): Unit = {

val isPathTable = QbeastCatalogUtils.isPathTable(ident)
val isPathTable = this.isPathTable(ident)
val properties = allTableProperties.asScala.toMap

// Get table location
val location = if (isPathTable) {
Option(ident.name())
} else {
Option(allTableProperties.get("location"))
properties.get("location")
}

// Define the table type.
Expand All @@ -176,6 +177,10 @@ object QbeastCatalogUtils {
.orElse(existingTableOpt.flatMap(_.storage.locationUri))
.getOrElse(existingSessionCatalog.defaultTablePath(id))

// Process the parameters/options/configuration sent to the table
val indexedTable = tableFactory.getIndexedTable(QTableID(loc.toString))
val allProperties = indexedTable.verifyAndMergeProperties(properties)

// Initialize the path option
val storage = DataSource
.buildStorageFormatFromOptions(writeOptions)
Expand All @@ -198,7 +203,7 @@ object QbeastCatalogUtils {
provider = Some("qbeast"),
partitionColumnNames = Seq.empty,
bucketSpec = None,
properties = allTableProperties.asScala.toMap,
properties = allProperties,
comment = commentOpt)

// Verify the schema if it's an external table
Expand All @@ -210,9 +215,7 @@ object QbeastCatalogUtils {
// Write data, if any
val append = tableCreationMode.saveMode == SaveMode.Append
dataFrame.map { df =>
tableFactory
.getIndexedTable(QTableID(loc.toString))
.save(df, allTableProperties.asScala.toMap, append)
indexedTable.save(df, allProperties, append)
}

// Update the existing session catalog with the Qbeast table information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
*/
package io.qbeast.spark.internal.sources.v2

import io.qbeast.spark.internal.QbeastOptions.checkQbeastProperties
import io.qbeast.spark.internal.QbeastOptions.checkQbeastOptions
import io.qbeast.spark.internal.sources.catalog.{CreationMode, QbeastCatalogUtils}
import io.qbeast.spark.table.IndexedTableFactory
import org.apache.spark.sql.catalyst.catalog.SessionCatalog
Expand Down Expand Up @@ -70,7 +70,7 @@ private[sources] class QbeastStagedTableImpl(
writeOptions.foreach { case (k, v) => props.put(k, v) }

// Check all the Qbeast properties are correctly specified
checkQbeastProperties(props.asScala.toMap)
checkQbeastOptions(props.asScala.toMap)

// Creates the corresponding table on the Catalog and executes
// the writing of the dataFrame (if any)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ class QbeastWriteBuilder(
// Passing the options in the query plan plus the properties
// because columnsToIndex needs to be included in the contract
val writeOptions = info.options().toMap ++ properties
// scalastyle:off
println("data schema " + data.schema)
indexedTable.save(data, writeOptions, append)

// TODO: Push this to Apache Spark
Expand Down
102 changes: 68 additions & 34 deletions src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import io.qbeast.core.model._
import io.qbeast.spark.delta.{CubeDataLoader, StagingDataManager, StagingResolution}
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.internal.QbeastOptions
import io.qbeast.spark.internal.QbeastOptions.{COLUMNS_TO_INDEX, CUBE_SIZE}
import io.qbeast.spark.internal.QbeastOptions._
import io.qbeast.spark.internal.sources.QbeastBaseRelation
import org.apache.spark.qbeast.config.DEFAULT_NUMBER_OF_RETRIES
import org.apache.spark.sql.delta.actions.FileAction
Expand All @@ -30,13 +30,26 @@ trait IndexedTable {
*/
def exists: Boolean

/**
* Returns whether the table contains Qbeast metadata
* @return
*/
def hasQbeastMetadata: Boolean

/**
* Returns the table id which identifies the table.
*
* @return the table id
*/
def tableID: QTableID

/**
* Merge new and index current properties
* @param properties the properties you want to merge
* @return
*/
def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String]

/**
* Saves given data in the table and updates the index. The specified columns are
* used to define the index when the table is created or overwritten. The append
Expand Down Expand Up @@ -139,11 +152,61 @@ private[table] class IndexedTableImpl(
with StagingUtils {
private var snapshotCache: Option[QbeastSnapshot] = None

/**
* Latest Revision Available
*
* @return
*/
private def latestRevision: Revision = snapshot.loadLatestRevision

override def exists: Boolean = !snapshot.isInitial

private def isNewRevision(qbeastOptions: QbeastOptions, latestRevision: Revision): Boolean = {
override def hasQbeastMetadata: Boolean = try {
snapshot.loadLatestRevision
true
} catch {
case _: Exception => false
}

override def verifyAndMergeProperties(properties: Map[String, String]): Map[String, String] = {
if (!exists) {
// IF not exists, we should only check new properties
checkQbeastOptions(properties)
properties
} else if (hasQbeastMetadata) {
// IF has qbeast metadata, we can merge both properties: new and current
val currentColumnsIndexed =
latestRevision.columnTransformers.map(_.columnName).mkString(",")
val currentCubeSize = latestRevision.desiredCubeSize.toString
val finalProperties = {
(properties.contains(COLUMNS_TO_INDEX), properties.contains(CUBE_SIZE)) match {
case (true, true) => properties
case (false, false) =>
properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed, CUBE_SIZE -> currentCubeSize)
case (true, false) => properties + (CUBE_SIZE -> currentCubeSize)
case (false, true) =>
properties + (COLUMNS_TO_INDEX -> currentColumnsIndexed)
}
}
finalProperties
} else {
throw AnalysisExceptionFactory.create(
s"Table ${tableID.id} exists but does not contain Qbeast metadata. " +
s"Please use ConvertToQbeastCommand to convert the table to Qbeast.")
}
}

private def isNewRevision(qbeastOptions: QbeastOptions): Boolean = {

// TODO feature: columnsToIndex may change between revisions
checkColumnsToMatchSchema(latestRevision)
val columnsToIndex = qbeastOptions.columnsToIndex
val currentColumnsToIndex = latestRevision.columnTransformers.map(_.columnName)
val isNewColumns = !latestRevision.matchColumns(columnsToIndex)
if (isNewColumns) {
throw AnalysisExceptionFactory.create(
s"Columns to index '${columnsToIndex.mkString(",")}' do not match " +
s"existing index ${currentColumnsToIndex.mkString(",")}.")
}
// Checks if the desiredCubeSize is different from the existing one
val isNewCubeSize = latestRevision.desiredCubeSize != qbeastOptions.cubeSize
// Checks if the user-provided column boundaries would trigger the creation of
Expand All @@ -169,26 +232,6 @@ private[table] class IndexedTableImpl(

}

/**
* Add the required indexing parameters when the SaveMode is Append.
* The user-provided parameters are respected.
* @param latestRevision the latest revision
* @param parameters the parameters required for indexing
*/
private def addRequiredParams(
latestRevision: Revision,
parameters: Map[String, String]): Map[String, String] = {
val columnsToIndex = latestRevision.columnTransformers.map(_.columnName).mkString(",")
val desiredCubeSize = latestRevision.desiredCubeSize.toString
(parameters.contains(COLUMNS_TO_INDEX), parameters.contains(CUBE_SIZE)) match {
case (true, true) => parameters
case (false, false) =>
parameters + (COLUMNS_TO_INDEX -> columnsToIndex, CUBE_SIZE -> desiredCubeSize)
case (true, false) => parameters + (CUBE_SIZE -> desiredCubeSize)
case (false, true) => parameters + (COLUMNS_TO_INDEX -> columnsToIndex)
}
}

override def save(
data: DataFrame,
parameters: Map[String, String],
Expand All @@ -197,12 +240,11 @@ private[table] class IndexedTableImpl(
if (exists && append) {
// If the table exists and we are appending new data
// 1. Load existing IndexStatus
val latestRevision = snapshot.loadLatestRevision
val updatedParameters = addRequiredParams(latestRevision, parameters)
val updatedParameters = verifyAndMergeProperties(parameters)
if (isStaging(latestRevision)) { // If the existing Revision is Staging
IndexStatus(revisionBuilder.createNewRevision(tableID, data.schema, updatedParameters))
} else {
if (isNewRevision(QbeastOptions(updatedParameters), latestRevision)) {
if (isNewRevision(QbeastOptions(updatedParameters))) {
// If the new parameters generate a new revision, we need to create another one
val newPotentialRevision = revisionBuilder
.createNewRevision(tableID, data.schema, updatedParameters)
Expand Down Expand Up @@ -255,14 +297,6 @@ private[table] class IndexedTableImpl(
snapshotCache = None
}

private def checkColumnsToMatchSchema(revision: Revision): Unit = {
val columnsToIndex = revision.columnTransformers.map(_.columnName)
if (!snapshot.loadLatestRevision.matchColumns(columnsToIndex)) {
throw AnalysisExceptionFactory.create(
s"Columns to index '$columnsToIndex' do not match existing index.")
}
}

/**
* Creates a QbeastBaseRelation for the given table.
* @return the QbeastBaseRelation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import io.qbeast.spark.QbeastIntegrationTestSpec
import io.qbeast.spark.internal.sources.v2.{QbeastStagedTableImpl, QbeastTableImpl}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.CannotReplaceMissingTableException
import org.apache.spark.sql.connector.catalog.{
CatalogExtension,
CatalogPlugin,
Expand Down Expand Up @@ -248,7 +247,7 @@ class QbeastCatalogTest extends QbeastIntegrationTestSpec with CatalogTestSuite

"QbeastCatalogUtils" should "throw an error when trying to replace a non-existing table" in
withQbeastContextSparkAndTmpWarehouse((spark, _) => {
an[CannotReplaceMissingTableException] shouldBe thrownBy(
an[AnalysisException] shouldBe thrownBy(
QbeastCatalogUtils.createQbeastTable(
Identifier.of(defaultNamespace, "students"),
schema,
Expand Down
Loading

0 comments on commit 468c86d

Please sign in to comment.