Skip to content

Commit

Permalink
Issue 274: Fix erroneous idempotent writes (Qbeast-io#304)
Browse files Browse the repository at this point in the history
* Fix comparison when issuing a smaller or equal txnVersion
  • Loading branch information
Jiaweihu08 authored Apr 5, 2024
1 parent b0fa253 commit 6e6b5b4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ private[delta] case class DeltaMetadataWriter(
val oldTransactions = deltaLog.unsafeVolatileSnapshot.setTransactions
// If the transaction was completed before then no operation
for (txn <- oldTransactions; version <- options.txnVersion; appId <- options.txnAppId) {
if (txn.appId == appId && txn.version == version) {
if (txn.appId == appId && version <= txn.version) {
val message = s"Transaction ${version} from application ${appId} is already completed," +
" the requested write is ignored"
logWarning(message)
Expand Down
16 changes: 14 additions & 2 deletions src/test/scala/io/qbeast/spark/utils/QbeastSparkTxnTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,20 @@ class QbeastSparkTxnTest extends QbeastIntegrationTestSpec {
.option(QbeastOptions.COLUMNS_TO_INDEX, "id")
.option(QbeastOptions.CUBE_SIZE, 1)
.option(QbeastOptions.TXN_APP_ID, "test")
.option(QbeastOptions.TXN_VERSION, "1")
.option(QbeastOptions.TXN_VERSION, "2")
.save(tmpDir)
data.write
.format("qbeast")
.option(QbeastOptions.TXN_APP_ID, "test")
.option(QbeastOptions.TXN_VERSION, "1")
.mode(SaveMode.Append)
.save(tmpDir)
data.write
.format("qbeast")
.option(QbeastOptions.TXN_APP_ID, "test")
.option(QbeastOptions.TXN_VERSION, "2")
.mode(SaveMode.Append)
.save(tmpDir)
spark.read.format("qbeast").load(tmpDir).count() shouldBe data.count()
}

Expand All @@ -91,14 +97,20 @@ class QbeastSparkTxnTest extends QbeastIntegrationTestSpec {
.option(QbeastOptions.COLUMNS_TO_INDEX, "id")
.option(QbeastOptions.CUBE_SIZE, 1)
.option(QbeastOptions.TXN_APP_ID, "test")
.option(QbeastOptions.TXN_VERSION, "1")
.option(QbeastOptions.TXN_VERSION, "2")
.save(tmpDir)
data.write
.format("qbeast")
.option(QbeastOptions.TXN_APP_ID, "test")
.option(QbeastOptions.TXN_VERSION, "1")
.mode(SaveMode.Append)
.save(tmpDir)
data.write
.format("qbeast")
.option(QbeastOptions.TXN_APP_ID, "test")
.option(QbeastOptions.TXN_VERSION, "2")
.mode(SaveMode.Append)
.save(tmpDir)
spark.read.format("qbeast").load(tmpDir).count() shouldBe data.count()
}

Expand Down

0 comments on commit 6e6b5b4

Please sign in to comment.