Skip to content

Commit

Permalink
Issue Qbeast-io#405: DataWriter refactory (Qbeast-io#402)
Browse files Browse the repository at this point in the history
* Removing the API optimization from DataWriter

* Adding analyzeAppend and analyzeOptimize to the OTreeManager

* Reduced the stated to ANNOUNCED and FLOODED
  • Loading branch information
cugni authored Sep 10, 2024
1 parent 75aea24 commit 29cb3ac
Show file tree
Hide file tree
Showing 22 changed files with 459 additions and 277 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ ThisBuild / publishTo := {
}

// Sonatype settings
ThisBuild / publishMavenStyle := true
ThisBuild / publishMavenStyle.withRank(KeyRanks.Invisible) := true
ThisBuild / sonatypeProfileName := "io.qbeast"

ThisBuild / sonatypeProjectHosting := Some(
Expand Down
63 changes: 41 additions & 22 deletions src/main/scala/io/qbeast/core/model/BroadcastedTableChanges.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
*/
package io.qbeast.core.model

import io.qbeast.spark.utils.State
import io.qbeast.spark.model.CubeState
import io.qbeast.spark.model.CubeState.CubeStateValue
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.sql.SparkSession

Expand All @@ -28,65 +29,83 @@ object BroadcastedTableChanges {
def apply(
revisionChanges: Option[RevisionChange],
supersededIndexStatus: IndexStatus,
deltaNormalizedCubeWeights: Map[CubeId, NormalizedWeight],
deltaCubeDomains: Map[CubeId, Double],
deltaNormalizedCubeWeights: Map[CubeId, Weight],
newBlocksElementCount: Map[CubeId, Long],
deltaReplicatedSet: Set[CubeId] = Set.empty,
deltaAnnouncedSet: Set[CubeId] = Set.empty): TableChanges = {
val sparkContext = SparkSession.active.sparkContext

BroadcastedTableChanges.create(
supersededIndexStatus,
sparkContext.broadcast(deltaNormalizedCubeWeights),
sparkContext.broadcast(newBlocksElementCount),
deltaReplicatedSet,
deltaAnnouncedSet,
revisionChanges)

}

def create(
supersededIndexStatus: IndexStatus,
deltaNormalizedCubeWeightsBroadcast: Broadcast[Map[CubeId, Weight]],
newBlocksElementCountBroadcast: Broadcast[Map[CubeId, Long]],
deltaReplicatedSet: Set[CubeId],
deltaAnnouncedSet: Set[CubeId],
revisionChanges: Option[RevisionChange]): TableChanges = {

val updatedRevision = revisionChanges match {
case Some(newRev) => newRev.createNewRevision
case None => supersededIndexStatus.revision
}
val cubeWeights = deltaNormalizedCubeWeights
.mapValues(NormalizedWeight.toWeight)
.map(identity)

val replicatedSet = if (revisionChanges.isEmpty) {

supersededIndexStatus.replicatedSet ++ deltaReplicatedSet

} else {
deltaReplicatedSet
}

val announcedSet = if (revisionChanges.isEmpty) {

supersededIndexStatus.announcedSet ++ deltaAnnouncedSet

} else {
deltaAnnouncedSet
}

val cubeStates = replicatedSet.map(id => id -> State.REPLICATED) ++
(announcedSet -- replicatedSet).map(id => id -> State.ANNOUNCED)

BroadcastedTableChanges(
isNewRevision = revisionChanges.isDefined,
isOptimizeOperation = deltaReplicatedSet.nonEmpty,
updatedRevision = updatedRevision,
deltaReplicatedSet = deltaReplicatedSet,
announcedOrReplicatedSet = announcedSet ++ replicatedSet,
cubeStatesBroadcast = SparkSession.active.sparkContext.broadcast(cubeStates.toMap),
cubeWeightsBroadcast = SparkSession.active.sparkContext.broadcast(cubeWeights),
cubeDomainsBroadcast = SparkSession.active.sparkContext.broadcast(deltaCubeDomains))
cubeWeightsBroadcast = deltaNormalizedCubeWeightsBroadcast,
newBlockStatsBroadcast = newBlocksElementCountBroadcast)
}

}

case class BroadcastedTableChanges(
case class BroadcastedTableChanges private[model] (
isNewRevision: Boolean,
isOptimizeOperation: Boolean,
updatedRevision: Revision,
deltaReplicatedSet: Set[CubeId],
announcedOrReplicatedSet: Set[CubeId],
cubeStatesBroadcast: Broadcast[Map[CubeId, String]],
// this contains an entry for each cube in the index
cubeWeightsBroadcast: Broadcast[Map[CubeId, Weight]],
cubeDomainsBroadcast: Broadcast[Map[CubeId, Double]])
// this map contains an entry for each new block added in this write operation.
newBlockStatsBroadcast: Broadcast[Map[CubeId, Long]])
extends TableChanges {

override def cubeWeight(cubeId: CubeId): Option[Weight] = cubeWeightsBroadcast.value.get(cubeId)
override def cubeWeight(cubeId: CubeId): Option[Weight] =
cubeWeightsBroadcast.value.get(cubeId)

override def cubeState(cubeId: CubeId): CubeStateValue = {
if (announcedOrReplicatedSet.contains(cubeId)) {
CubeState.ANNOUNCED
} else {
CubeState.FLOODED
}
}

override def cubeState(cubeId: CubeId): Option[String] = cubeStatesBroadcast.value.get(cubeId)
override def deltaBlockElementCount: Map[CubeId, Long] =
newBlockStatsBroadcast.value

override def cubeDomains: Map[CubeId, Double] = cubeDomainsBroadcast.value
}
23 changes: 0 additions & 23 deletions src/main/scala/io/qbeast/core/model/DataWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.qbeast.core.model

import io.qbeast.IISeq
import org.apache.spark.sql.Dataset

/**
* Data Writer template
Expand Down Expand Up @@ -48,26 +47,4 @@ trait DataWriter[DATA, DataSchema, FileDescriptor] {
data: DATA,
tableChanges: TableChanges): IISeq[FileDescriptor]

/**
* Optimize the files
* @param tableID
* the table identifier
* @param schema
* the schema of the data
* @param revision
* the revision of the index
* @param indexStatus
* the current index status
* @param indexFiles
* the index files to compact
* @return
* the sequence of files written and deleted
*/
def optimize(
tableID: QTableID,
schema: DataSchema,
revision: Revision,
indexStatus: IndexStatus,
indexFiles: Dataset[IndexFile]): IISeq[FileDescriptor]

}
5 changes: 4 additions & 1 deletion src/main/scala/io/qbeast/core/model/QbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.qbeast.core.model

import io.qbeast.IISeq
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

/**
Expand Down Expand Up @@ -73,7 +74,7 @@ trait QbeastSnapshot {
* Loads the index files of the specified revision.
*
* @param revisionId
* the revision identitifier
* the revision identifier
* @return
* the index files of the specified revision
*/
Expand Down Expand Up @@ -111,4 +112,6 @@ trait QbeastSnapshot {
*/
def loadRevisionAt(timestamp: Long): Revision

def loadDataframeFromIndexFiles(indexFile: Dataset[IndexFile]): DataFrame

}
4 changes: 2 additions & 2 deletions src/main/scala/io/qbeast/core/model/RevisionClasses.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ trait TableChanges {
val updatedRevision: Revision
val deltaReplicatedSet: Set[CubeId]
val announcedOrReplicatedSet: Set[CubeId]
def cubeState(cubeId: CubeId): Option[String]
def cubeState(cubeId: CubeId): String
def cubeWeight(cubeId: CubeId): Option[Weight]
def cubeDomains: Map[CubeId, Double]
def deltaBlockElementCount: Map[CubeId, Long]
}
2 changes: 1 addition & 1 deletion src/main/scala/io/qbeast/spark/QbeastTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ class QbeastTable private (
}

/**
* Gather an dataset containing all the important information about the index structure.
* Gather a dataset containing all the important information about the index structure.
*
* @param revisionID
* optional RevisionID
Expand Down
19 changes: 19 additions & 0 deletions src/main/scala/io/qbeast/spark/delta/DeltaQbeastSnapshot.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@ import io.qbeast.core.model._
import io.qbeast.spark.utils.MetadataConfig
import io.qbeast.spark.utils.TagColumns
import io.qbeast.IISeq
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.Snapshot
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.AnalysisExceptionFactory
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dataset

/**
Expand Down Expand Up @@ -212,4 +214,21 @@ case class DeltaQbeastSnapshot(protected override val snapshot: Snapshot)
*/
def loadStagingFiles(): Dataset[AddFile] = stagingFiles()

override def loadDataframeFromIndexFiles(indexFile: Dataset[IndexFile]): DataFrame = {
if (snapshot.deletionVectorsSupported) {

// TODO find a cleaner version to get a subset of data from the parquet considering the deleted parts.
throw new UnsupportedOperationException("Deletion vectors are not supported yet")
} else {
import indexFile.sparkSession.implicits._
val rootPath = snapshot.path.getParent
val paths = indexFile.map(ifile => new Path(rootPath, ifile.path).toString).collect()

indexFile.sparkSession.read
.schema(snapshot.schema)
.parquet(paths: _*)

}
}

}
6 changes: 3 additions & 3 deletions src/main/scala/io/qbeast/spark/delta/writer/BlockWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import io.qbeast.core.model.TableChanges
import io.qbeast.core.model.Weight
import io.qbeast.spark.delta.IndexFiles
import io.qbeast.spark.index.QbeastColumns
import io.qbeast.spark.utils.State
import io.qbeast.spark.model.CubeState
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.TaskAttemptContextImpl
Expand Down Expand Up @@ -85,7 +85,7 @@ case class BlockWriter(
rows.foreach { row =>
val cubeId = revision.createCubeId(row.getBinary(qbeastColumns.cubeColumnIndex))

val state = tableChanges.cubeState(cubeId).getOrElse(State.FLOODED)
val state = tableChanges.cubeState(cubeId)
val maxWeight = tableChanges.cubeWeight(cubeId).getOrElse(Weight.MaxValue)
val context = contexts.getOrElseUpdate(cubeId, buildWriter(cubeId, state, maxWeight))

Expand Down Expand Up @@ -169,7 +169,7 @@ case class BlockWriter(
.beginBlock()
.setCubeId(cubeId)
.setMaxWeight(maxWeight)
.setReplicated(state == State.ANNOUNCED || state == State.REPLICATED)
.setReplicated(state == CubeState.ANNOUNCED)
blockStatsTracker.foreach(_.newFile(writtenPath.toString)) // Update stats trackers
new BlockContext(builder, writer, writtenPath, blockStatsTracker)
}
Expand Down
8 changes: 4 additions & 4 deletions src/main/scala/io/qbeast/spark/delta/writer/Rollup.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ private[writer] class Rollup(limit: Double) {
* @param size
* the size associated with the cube
*/
def populate(cubeId: CubeId, size: Double): Rollup = {
def populate(cubeId: CubeId, size: Long): Rollup = {
val group = groups.getOrElseUpdate(cubeId, Group.empty)
group.add(cubeId, size)
this
Expand Down Expand Up @@ -75,9 +75,9 @@ private[writer] class Rollup(limit: Double) {
}.toMap
}

private class Group(val cubeIds: mutable.Set[CubeId], var size: Double) {
private class Group(val cubeIds: mutable.Set[CubeId], var size: Long) {

def add(cubeId: CubeId, size: Double): Unit = {
def add(cubeId: CubeId, size: Long): Unit = {
cubeIds.add(cubeId)
this.size += size
}
Expand All @@ -92,7 +92,7 @@ private[writer] class Rollup(limit: Double) {
}

private object Group {
def empty: Group = new Group(mutable.Set.empty, 0.0)
def empty: Group = new Group(mutable.Set.empty, 0L)
}

}
Loading

0 comments on commit 29cb3ac

Please sign in to comment.