Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Convert Seq to Set to improve time complexity #266

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/main/scala/io/qbeast/spark/table/IndexedTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package io.qbeast.spark.table

import io.qbeast.core.keeper.Keeper
import io.qbeast.core.model._
import io.qbeast.core.model.RevisionFactory
import io.qbeast.spark.delta.StagingDataManager
import io.qbeast.spark.delta.StagingResolution
import io.qbeast.spark.internal.sources.QbeastBaseRelation
Expand Down Expand Up @@ -122,7 +123,7 @@ trait IndexedTableFactory {
* the metadata manager
* @param dataWriter
* the data writer
* @param revisionBuilder
* @param revisionFactory
* the revision builder
*/
final class IndexedTableFactoryImpl(
Expand Down Expand Up @@ -159,8 +160,8 @@ final class IndexedTableFactoryImpl(
* the metadata manager
* @param dataWriter
* the data writer
* @param revisionBuilder
* the revision builder
* @param revisionFactory
* the revision factory
* @param autoIndexer
* the auto indexer
*/
Expand Down Expand Up @@ -388,12 +389,12 @@ private[table] class IndexedTableImpl(
}

override def optimize(revisionID: RevisionID): Unit = {
val files = snapshot.loadIndexFiles(revisionID).map(_.path).toSeq
val files = snapshot.loadIndexFiles(revisionID).map(_.path)
optimize(files)
}

override def optimize(files: Seq[String]): Unit = {
val paths = files.toSeq
val paths = files.toSet
val schema = metadataManager.loadCurrentSchema(tableID)
snapshot.loadAllRevisions.foreach { revision =>
val indexFiles = snapshot
Expand All @@ -402,7 +403,11 @@ private[table] class IndexedTableImpl(
.toIndexedSeq
if (indexFiles.nonEmpty) {
val indexStatus = snapshot.loadIndexStatus(revision.revisionID)
metadataManager.updateWithTransaction(tableID, schema, QbeastOptions.empty, true) {
metadataManager.updateWithTransaction(
tableID,
schema,
QbeastOptions.empty,
append = true) {
val tableChanges = BroadcastedTableChanges(None, indexStatus, Map.empty, Map.empty)
val fileActions =
dataWriter.compact(tableID, schema, revision, indexStatus, indexFiles)
Expand Down
Loading