Skip to content

Commit

Permalink
Merge pull request Qbeast-io#252 from alexeiakimov/250-backport-the-t…
Browse files Browse the repository at this point in the history
…xnAppId-and-txnVersion-support

250 backport the txn app id and txn version support
  • Loading branch information
alexeiakimov authored Dec 19, 2023
2 parents 468c86d + eb60c19 commit 3efc2fc
Show file tree
Hide file tree
Showing 19 changed files with 445 additions and 91 deletions.
11 changes: 8 additions & 3 deletions core/src/main/scala/io/qbeast/core/model/MetadataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@ import io.qbeast.IISeq
* Metadata Manager template
* @tparam DataSchema type of data schema
* @tparam FileDescriptor type of file descriptor
* @tparam QbeastOptions type of Qbeast options
*/
trait MetadataManager[DataSchema, FileDescriptor] {
trait MetadataManager[DataSchema, FileDescriptor, QbeastOptions] {
type Configuration = Map[String, String]

/**
Expand All @@ -28,11 +29,15 @@ trait MetadataManager[DataSchema, FileDescriptor] {
* Writes and updates the metadata by using transaction control
* @param tableID the QTableID
* @param schema the schema of the data
* @param options the Qbeast options
* @param append the append flag
* @param writer the writer code to be executed
*/
def updateWithTransaction(tableID: QTableID, schema: DataSchema, append: Boolean)(
writer: => (TableChanges, IISeq[FileDescriptor])): Unit
def updateWithTransaction(
tableID: QTableID,
schema: DataSchema,
options: QbeastOptions,
append: Boolean)(writer: => (TableChanges, IISeq[FileDescriptor])): Unit

/**
* Updates the table metadata by overwriting the metadata configurations
Expand Down
16 changes: 7 additions & 9 deletions core/src/main/scala/io/qbeast/core/model/QbeastCoreContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import scala.reflect.ClassTag
* @tparam DATA type of the data
* @tparam DataSchema type of the data schema
* @tparam FileDescriptor type of the file descriptor
* @tparam QbeastOptions type of Qbeast options
*/
trait QbeastCoreContext[DATA, DataSchema, FileDescriptor] {
def metadataManager: MetadataManager[DataSchema, FileDescriptor]
trait QbeastCoreContext[DATA, DataSchema, FileDescriptor, QbeastOptions] {
def metadataManager: MetadataManager[DataSchema, FileDescriptor, QbeastOptions]
def dataWriter: DataWriter[DATA, DataSchema, FileDescriptor]
def indexManager: IndexManager[DATA]
def queryManager[QUERY: ClassTag]: QueryManager[QUERY, DATA]
def revisionBuilder: RevisionFactory[DataSchema]
def revisionBuilder: RevisionFactory[DataSchema, QbeastOptions]
def keeper: Keeper

}
Expand All @@ -25,7 +26,7 @@ trait QbeastCoreContext[DATA, DataSchema, FileDescriptor] {
*
* @tparam DataSchema type of the data schema
*/
trait RevisionFactory[DataSchema] {
trait RevisionFactory[DataSchema, QbeastOptions] {

/**
* Create a new revision for a table with given parameters
Expand All @@ -35,10 +36,7 @@ trait RevisionFactory[DataSchema] {
* @param options the options
* @return
*/
def createNewRevision(
qtableID: QTableID,
schema: DataSchema,
options: Map[String, String]): Revision
def createNewRevision(qtableID: QTableID, schema: DataSchema, options: QbeastOptions): Revision

/**
* Create a new revision with given parameters from an old revision
Expand All @@ -51,7 +49,7 @@ trait RevisionFactory[DataSchema] {
def createNextRevision(
qtableID: QTableID,
schema: DataSchema,
options: Map[String, String],
options: QbeastOptions,
oldRevision: RevisionID): Revision

}
36 changes: 36 additions & 0 deletions docs/AdvancedConfiguration.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,42 @@ In a `JSON` string, you can pass the **minimum and maximum values of the columns
}
```

## TxnAppId and TxnVersion

These options are used to make the writes idempotent.

The option `txnAppId` identifies an application writing data to the table. It is
the responsibility of the user to assign unique identifiers to the applications
writing data to the table.

The option `txnVersion` identifies the transaction issued by the application.
The value of this option must be a valid string representation of a positive
long number.

```scala
df.write.format("qbeast")
.option("columnsToIndex", "a")
.option("txnAppId", "ingestionService")
.option("txnVersion", "1")
```

If the table already contains the data written by some other transaction with
the same `txnAppId` and `txnVersion` then the requested write will be ignored.

```scala
// The data is written
df.write.format("qbeast")
.option("columnsToIndex", "a")
.option("txnAppId", "ingestionService")
.option("txnVersion", "1")
...
// The data is ignored
df.write.format("qbeast")
.mode("append")
.option("txnAppId", "ingestionService")
.option("txnVersion", "1")
```

## Indexing Timestamps with ColumnStats

For indexing `Timestamps` or `Dates` with `columnStats` (min and maximum ranges), notice that **the values need to be formatted in a proper way** (following `"yyyy-MM-dd HH:mm:ss.SSSSSS'Z'"` pattern) for Qbeast to be able to parse it.
Expand Down
7 changes: 4 additions & 3 deletions src/main/scala/io/qbeast/context/QbeastContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{DataFrame, SparkSession}

import scala.reflect.ClassTag
import io.qbeast.spark.internal.QbeastOptions

/**
* Qbeast context provides access to internal mechanisms of
Expand Down Expand Up @@ -59,7 +60,7 @@ trait QbeastContext {
*/
object QbeastContext
extends QbeastContext
with QbeastCoreContext[DataFrame, StructType, FileAction] {
with QbeastCoreContext[DataFrame, StructType, FileAction, QbeastOptions] {
private var managedOption: Option[QbeastContext] = None
private var unmanagedOption: Option[QbeastContext] = None

Expand All @@ -78,13 +79,13 @@ object QbeastContext

override def indexManager: IndexManager[DataFrame] = SparkOTreeManager

override def metadataManager: MetadataManager[StructType, FileAction] =
override def metadataManager: MetadataManager[StructType, FileAction, QbeastOptions] =
SparkDeltaMetadataManager

override def dataWriter: DataWriter[DataFrame, StructType, FileAction] =
SparkDeltaDataWriter

override def revisionBuilder: RevisionFactory[StructType] =
override def revisionBuilder: RevisionFactory[StructType, QbeastOptions] =
SparkRevisionFactory

/**
Expand Down
15 changes: 13 additions & 2 deletions src/main/scala/io/qbeast/spark/delta/DeltaMetadataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,16 +66,27 @@ private[delta] case class DeltaMetadataWriter(
}

def writeWithTransaction(writer: => (TableChanges, Seq[FileAction])): Unit = {
val oldTransactions = deltaLog.unsafeVolatileSnapshot.setTransactions
// If the transaction was completed before then no operation
for (txn <- oldTransactions; version <- options.txnVersion; appId <- options.txnAppId) {
if (txn.appId == appId && txn.version == version) {
return
}
}
deltaLog.withNewTransaction { txn =>
// Register metrics to use in the Commit Info
val statsTrackers = createStatsTrackers(txn)
registerStatsTrackers(statsTrackers)
// Execute write
val (changes, newFiles) = writer
// Update Qbeast Metadata (replicated set, revision..)
val finalActions = updateMetadata(txn, changes, newFiles)
var actions = updateMetadata(txn, changes, newFiles)
// Set transaction identifier if specified
for (txnVersion <- options.txnVersion; txnAppId <- options.txnAppId) {
actions +:= SetTransaction(txnAppId, txnVersion, Some(System.currentTimeMillis()))
}
// Commit the information to the DeltaLog
txn.commit(finalActions, deltaOperation)
txn.commit(actions, deltaOperation)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,34 @@ import org.apache.spark.sql.delta.actions.FileAction
import org.apache.spark.sql.delta.{DeltaLog, DeltaOptions}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SaveMode, SparkSession}
import io.qbeast.spark.internal.QbeastOptions

/**
* Spark+Delta implementation of the MetadataManager interface
*/
object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction] {
object SparkDeltaMetadataManager extends MetadataManager[StructType, FileAction, QbeastOptions] {

override def updateWithTransaction(tableID: QTableID, schema: StructType, append: Boolean)(
writer: => (TableChanges, IISeq[FileAction])): Unit = {
override def updateWithTransaction(
tableID: QTableID,
schema: StructType,
options: QbeastOptions,
append: Boolean)(writer: => (TableChanges, IISeq[FileAction])): Unit = {

val deltaLog = loadDeltaQbeastLog(tableID).deltaLog
val mode = if (append) SaveMode.Append else SaveMode.Overwrite
val options =
new DeltaOptions(Map("path" -> tableID.id), SparkSession.active.sessionState.conf)
val metadataWriter = DeltaMetadataWriter(tableID, mode, deltaLog, options, schema)
val conf = SparkSession.active.sessionState.conf
val deltaOptions = Map.newBuilder[String, String]
deltaOptions += "path" -> tableID.id
for (txnAppId <- options.txnAppId; txnVersion <- options.txnVersion) {
deltaOptions += DeltaOptions.TXN_APP_ID -> txnAppId
deltaOptions += DeltaOptions.TXN_VERSION -> txnVersion
}
val metadataWriter = DeltaMetadataWriter(
tableID,
mode,
deltaLog,
new DeltaOptions(deltaOptions.result(), conf),
schema)
metadataWriter.writeWithTransaction(writer)
}

Expand Down
26 changes: 23 additions & 3 deletions src/main/scala/io/qbeast/spark/delta/StagingDataManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import org.apache.spark.qbeast.config.STAGING_SIZE_IN_BYTES
import org.apache.spark.sql.delta.actions.{FileAction, RemoveFile}
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import io.qbeast.spark.internal.QbeastOptions
import org.apache.spark.sql.delta.DeltaOptions

/**
* Access point for staged data
Expand Down Expand Up @@ -79,13 +81,31 @@ private[spark] class StagingDataManager(tableID: QTableID) extends DeltaStagingU
/**
* Stage the data without indexing by writing it in the delta format. If the table
* is not yet a qbeast table, use ConvertToQbeastCommand for conversion after the write.
*
* @param data
* the data to stage
* @param indexStatus
* the index status
* @param options
* the options
* @param append
* the operation appends data
*/
def stageData(data: DataFrame, indexStatus: IndexStatus, append: Boolean): Unit = {
def stageData(
data: DataFrame,
indexStatus: IndexStatus,
options: QbeastOptions,
append: Boolean): Unit = {
// Write data to the staging area in the delta format
data.write
var writer = data.write
.format("delta")
.mode(if (append) SaveMode.Append else SaveMode.Overwrite)
.save(tableID.id)
for (txnVersion <- options.txnVersion; txnAppId <- options.txnAppId) {
writer = writer
.option(DeltaOptions.TXN_VERSION, txnVersion)
.option(DeltaOptions.TXN_APP_ID, txnAppId)
}
writer.save(tableID.id)

// Convert if the table is not yet qbeast
if (isInitial) {
Expand Down
13 changes: 6 additions & 7 deletions src/main/scala/io/qbeast/spark/index/SparkRevisionFactory.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.util.matching.Regex
/**
* Spark implementation of RevisionBuilder
*/
object SparkRevisionFactory extends RevisionFactory[StructType] {
object SparkRevisionFactory extends RevisionFactory[StructType, QbeastOptions] {

val SpecExtractor: Regex = "([^:]+):([A-z]+)".r

Expand All @@ -25,11 +25,10 @@ object SparkRevisionFactory extends RevisionFactory[StructType] {
override def createNewRevision(
qtableID: QTableID,
schema: StructType,
options: Map[String, String]): Revision = {
options: QbeastOptions): Revision = {

val qbeastOptions = QbeastOptions(options)
val columnSpecs = qbeastOptions.columnsToIndex
val desiredCubeSize = qbeastOptions.cubeSize
val columnSpecs = options.columnsToIndex
val desiredCubeSize = options.cubeSize

val transformers = columnSpecs.map {
case SpecExtractor(columnName, transformerType) =>
Expand All @@ -40,7 +39,7 @@ object SparkRevisionFactory extends RevisionFactory[StructType] {

}.toVector

qbeastOptions.stats match {
options.stats match {
case None => Revision.firstRevision(qtableID, desiredCubeSize, transformers)
case Some(stats) =>
val columnStats = stats.first()
Expand Down Expand Up @@ -78,7 +77,7 @@ object SparkRevisionFactory extends RevisionFactory[StructType] {
override def createNextRevision(
qtableID: QTableID,
schema: StructType,
options: Map[String, String],
options: QbeastOptions,
oldRevisionID: RevisionID): Revision = {
val revision = createNewRevision(qtableID, schema, options)
revision.copy(revisionID = oldRevisionID + 1)
Expand Down
31 changes: 29 additions & 2 deletions src/main/scala/io/qbeast/spark/internal/QbeastOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,26 @@ import io.qbeast.core.model.QTableID
import io.qbeast.spark.index.ColumnsToIndex
import org.apache.spark.qbeast.config.DEFAULT_CUBE_SIZE
import org.apache.spark.sql.{AnalysisExceptionFactory, DataFrame, SparkSession}
import org.apache.spark.sql.delta.DeltaOptions

/**
* Container for Qbeast options.
*
* @param columnsToIndex value of columnsToIndex option
* @param cubeSize value of cubeSize option
* @param stats
* the stats if available
* @param txnVersion
* the transaction identifier
* @param txnAppId
* the application identifier
*/
case class QbeastOptions(columnsToIndex: Seq[String], cubeSize: Int, stats: Option[DataFrame])
case class QbeastOptions(
columnsToIndex: Seq[String],
cubeSize: Int,
stats: Option[DataFrame],
txnAppId: Option[String],
txnVersion: Option[String])

/**
* Options available when trying to write in qbeast format
Expand All @@ -24,6 +37,8 @@ object QbeastOptions {
val CUBE_SIZE = "cubeSize"
val PATH = "path"
val STATS = "columnStats"
val TXN_APP_ID = DeltaOptions.TXN_APP_ID
val TXN_VERSION = DeltaOptions.TXN_VERSION

/**
* Gets the columns to index from the options
Expand Down Expand Up @@ -77,6 +92,11 @@ object QbeastOptions {
}
}

private def getTxnAppId(options: Map[String, String]): Option[String] = options.get(TXN_APP_ID)

private def getTxnVersion(options: Map[String, String]): Option[String] =
options.get(TXN_VERSION)

/**
* Create QbeastOptions object from options map
* @param options the options map
Expand All @@ -86,9 +106,16 @@ object QbeastOptions {
val columnsToIndex = getColumnsToIndex(options)
val desiredCubeSize = getDesiredCubeSize(options)
val stats = getStats(options)
QbeastOptions(columnsToIndex, desiredCubeSize, stats)
val txnAppId = getTxnAppId(options)
val txnVersion = getTxnVersion(options)
QbeastOptions(columnsToIndex, desiredCubeSize, stats, txnAppId, txnVersion)
}

/**
* The empty options to be used as a placeholder.
*/
lazy val empty: QbeastOptions = QbeastOptions(Seq.empty, DEFAULT_CUBE_SIZE, None, None, None)

def loadTableIDFromOptions(options: Map[String, String]): QTableID = {
new QTableID(
options.getOrElse(
Expand Down
Loading

0 comments on commit 3efc2fc

Please sign in to comment.