Skip to content

Commit

Permalink
[Spark] InCommitTimestamp: Use clock.currentTimeMillis() instead of n…
Browse files Browse the repository at this point in the history
…anoTime() in commitLarge (#3111)

## Description
We currently use NANOSECONDS.toMillis(System.nanoTime()) for generating
the ICT when `commitLarge` is called. However, this usage of
System.nanoTime() is not correct as it should only be used for measuring
time difference, not to get an approximate wall clock time. This leads
to scenarios where the ICT becomes very small (e.g. 1 Jan 1970)
sometimes because some systems return a very small number when
System.nanoTime() is called. This PR changes this so that
clock.getCurrentTimeMillis() is used instead.

## How was this patch tested?
Added a test case to ensure that `clock.getCurrentTimeMillis()` is being
used.
  • Loading branch information
dhruvarya-db authored May 17, 2024
1 parent e15132b commit eca5a7f
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1337,10 +1337,11 @@ trait OptimisticTransactionImpl extends TransactionalWrite

try {
val tags = Map.empty[String, String]
val commitTimestampMs = clock.getTimeMillis()
val commitInfo = CommitInfo(
NANOSECONDS.toMillis(commitStartNano),
commitTimestampMs,
operation = op.name,
generateInCommitTimestampForFirstCommitAttempt(NANOSECONDS.toMillis(commitStartNano)),
generateInCommitTimestampForFirstCommitAttempt(commitTimestampMs),
operationParameters = op.jsonEncodedValues,
context,
readVersion = Some(readVersion),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,44 @@ class InCommitTimestampSuite
}
}

for (useCommitLarge <- BOOLEAN_DOMAIN)
test("txn.commit should use clock.currentTimeMillis() for ICT" +
s" [useCommitLarge: $useCommitLarge]") {
withTempDir { tempDir =>
spark.range(2).write.format("delta").save(tempDir.getAbsolutePath)
// Clear the DeltaLog cache so that a new DeltaLog is created with the manual clock.
DeltaLog.clearCache()
val expectedCommit1Time = System.currentTimeMillis()
val clock = new ManualClock(expectedCommit1Time)
val deltaLog = DeltaLog.forTable(spark, new Path(tempDir.getCanonicalPath), clock)
val ver0Snapshot = deltaLog.snapshot
assert(DeltaConfigs.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetaData(ver0Snapshot.metadata))
val usageRecords = Log4jUsageLogger.track {
if (useCommitLarge) {
deltaLog.startTransaction().commitLarge(
spark,
Seq(createTestAddFile("1")).toIterator,
newProtocolOpt = None,
DeltaOperations.ManualUpdate,
context = Map.empty,
metrics = Map.empty)
} else {
deltaLog.startTransaction().commit(
Seq(createTestAddFile("1")),
DeltaOperations.ManualUpdate,
tags = Map.empty
)
}
}
val ver1Snapshot = deltaLog.snapshot
val retrievedTimestamp = getInCommitTimestamp(deltaLog, version = 1)
assert(ver1Snapshot.timestamp == retrievedTimestamp)
assert(ver1Snapshot.timestamp == expectedCommit1Time)
val expectedOpType = if (useCommitLarge) "delta.commit.large" else "delta.commit"
assert(filterUsageRecords(usageRecords, expectedOpType).length == 1)
}
}

test("Missing CommitInfo should result in a DELTA_MISSING_COMMIT_INFO exception") {
withTempDir { tempDir =>
spark.range(10).write.format("delta").save(tempDir.getAbsolutePath)
Expand Down

0 comments on commit eca5a7f

Please sign in to comment.