Skip to content

Commit

Permalink
Merge pull request #266 from Jiaweihu08/fix-optimize-file-filtering
Browse files Browse the repository at this point in the history
Convert Seq to Set to improve time complexity
  • Loading branch information
osopardo1 authored Feb 8, 2024
2 parents 03800e6 + d0e372f commit 6a780ea
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.qbeast.spark.table

import io.qbeast.core.keeper.Keeper
import io.qbeast.core.model._
import io.qbeast.core.model.RevisionFactory
import io.qbeast.spark.delta.StagingDataManager
import io.qbeast.spark.delta.StagingResolution
import io.qbeast.spark.internal.sources.QbeastBaseRelation
Expand Down Expand Up @@ -122,7 +123,7 @@ trait IndexedTableFactory {
* the metadata manager
* @param dataWriter
* the data writer
* @param revisionBuilder
* @param revisionFactory
* the revision builder
*/
final class IndexedTableFactoryImpl(
Expand Down Expand Up @@ -159,8 +160,8 @@ final class IndexedTableFactoryImpl(
* the metadata manager
* @param dataWriter
* the data writer
* @param revisionBuilder
* the revision builder
* @param revisionFactory
* the revision factory
* @param autoIndexer
* the auto indexer
*/
Expand Down Expand Up @@ -388,12 +389,12 @@ private[table] class IndexedTableImpl(
}

override def optimize(revisionID: RevisionID): Unit = {
val files = snapshot.loadIndexFiles(revisionID).map(_.path).toSeq
val files = snapshot.loadIndexFiles(revisionID).map(_.path)
optimize(files)
}

override def optimize(files: Seq[String]): Unit = {
val paths = files.toSeq
val paths = files.toSet
val schema = metadataManager.loadCurrentSchema(tableID)
snapshot.loadAllRevisions.foreach { revision =>
val indexFiles = snapshot
Expand All @@ -402,7 +403,11 @@ private[table] class IndexedTableImpl(
.toIndexedSeq
if (indexFiles.nonEmpty) {
val indexStatus = snapshot.loadIndexStatus(revision.revisionID)
metadataManager.updateWithTransaction(tableID, schema, QbeastOptions.empty, true) {
metadataManager.updateWithTransaction(
tableID,
schema,
QbeastOptions.empty,
append = true) {
val tableChanges = BroadcastedTableChanges(None, indexStatus, Map.empty, Map.empty)
val fileActions =
dataWriter.compact(tableID, schema, revision, indexStatus, indexFiles)
Expand Down

0 comments on commit 6a780ea

Please sign in to comment.