Skip to content

Commit

Permalink
[Kernel] Ignore non-conforming commit files in Delta log directory
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Aug 13, 2024
1 parent 7122c25 commit 54b2edf
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static DeltaLogFile forCommitOrCheckpoint(FileStatus file) {
} else if (FileNames.isClassicCheckpointFile(fileName)) {
logType = LogType.CHECKPOINT_CLASSIC;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isMulitPartCheckpointFile(fileName)) {
} else if (FileNames.isMultiPartCheckpointFile(fileName)) {
logType = LogType.MULTIPART_CHECKPOINT;
version = FileNames.checkpointVersion(fileName);
} else if (FileNames.isV2CheckpointFile(fileName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,30 @@ private FileNames() {}

public static final String SIDECAR_DIRECTORY = "_sidecars";

/**
* The subdirectory in the delta log directory where un-backfilled commits are stored as part of
* the coordinated commits table feature.
*/
public static final String COMMIT_SUBDIR = "_commits";

/** Returns the delta (json format) path for a given delta file. */
public static String deltaFile(Path path, long version) {
return String.format("%s/%020d.json", path, version);
}

/**
* Returns the un-backfilled uuid formatted delta (json format) path for a given version.
*
* @param logPath The root path of the delta log.
* @param version The version of the delta file.
* @param uuidString An optional UUID string.
* @return The path to the un-backfilled delta file: <logPath>/_commits/<version>.<uuid>.json
*/
public static String unbackfilledDeltaFile(Path logPath, long version, String uuidString) {
Path commitsPath = commitDirPath(logPath);
return new Path(commitsPath, String.format("%020d.%s.json", version, uuidString)).toString();
}

/** Returns the version for the given delta path. */
public static long deltaVersion(Path path) {
return Long.parseLong(path.getName().split("\\.")[0]);
Expand Down Expand Up @@ -136,18 +155,22 @@ public static boolean isClassicCheckpointFile(String fileName) {
return CLASSIC_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}

public static boolean isMulitPartCheckpointFile(String fileName) {
public static boolean isMultiPartCheckpointFile(String fileName) {
return MULTI_PART_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}

public static boolean isV2CheckpointFile(String fileName) {
return V2_CHECKPOINT_FILE_PATTERN.matcher(fileName).matches();
}

public static boolean isCommitFile(String fileName) {
String filename = new Path(fileName).getName();
return DELTA_FILE_PATTERN.matcher(filename).matches()
|| UUID_DELTA_FILE_REGEX.matcher(filename).matches();
public static boolean isCommitFile(String filePathStr) {
Path filePath = new Path(filePathStr);
String fileName = filePath.getName();
String fileParentName = filePath.getParent().getName();
return DELTA_FILE_PATTERN.matcher(fileName).matches()
// If parent is _commits dir, then match against un-backfilled commit file name pattern.
|| (COMMIT_SUBDIR.equals(fileParentName)
&& UUID_DELTA_FILE_REGEX.matcher(fileName).matches());
}

/**
Expand All @@ -168,4 +191,9 @@ public static long getFileVersion(Path path) {
String.format("Unexpected file type found in transaction log: %s", path));
}
}

/** Returns path to the sidecar directory */
public static Path commitDirPath(Path logPath) {
return new Path(logPath, COMMIT_SUBDIR);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
*/
package io.delta.kernel.internal.snapshot

import java.util.{Arrays, Collections, List, Optional}
import java.util.{Arrays, Collections, List, Optional, UUID}
import java.{lang => javaLang}
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import io.delta.kernel.data.{ColumnarBatch, ColumnVector}
import io.delta.kernel.data.{ColumnVector, ColumnarBatch}
import io.delta.kernel.engine.CommitCoordinatorClientHandler
import io.delta.kernel.engine.coordinatedcommits.{Commit, CommitResponse, GetCommitsResponse}
import io.delta.kernel.exceptions.InvalidTableException
Expand Down Expand Up @@ -910,6 +910,33 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils {
logPath, e = new RuntimeException(errMsg)))
)
}

test("delta log contains commit files with non-conforming names") {
// non-conforming name: name that is not "%020d.json" format
val files =
singularCheckpointFileStatuses(Seq(10L)) ++
deltaFileStatuses(11L until 15L) ++
// non-conforming name
nonConformingDeltaFileStatuses(15L until 20L)

// try to load the latest snapshot and expect to load the latest valid delta file
// which is version 14.
val logSegment = snapshotManager.getLogSegmentAtOrBeforeVersion(
createMockFSListFromEngine(files),
Optional.of(10L),
Optional.empty() /* start version */,
Optional.empty() /* tableCommitCoordinatorClientHandlerOpt */
)
assert(logSegment.isPresent)
checkLogSegment(
logSegment.get(),
expectedVersion = 14,
expectedDeltas = deltaFileStatuses(11L until 15L),
expectedCheckpoints = singularCheckpointFileStatuses(Seq(10L)),
expectedCheckpointVersion = Some(10),
expectedLastCommitTimestamp = 140L
)
}
}

trait SidecarIteratorProvider extends VectorTestUtils {
Expand Down Expand Up @@ -960,7 +987,13 @@ class MockTableCommitCoordinatorClientHandler(
}
new GetCommitsResponse(
versions
.map(v => new Commit(v, FileStatus.of(FileNames.deltaFile(logPath, v), v, v*10), v)).asJava,
.map(v => new Commit(
v,
FileStatus.of(
FileNames.unbackfilledDeltaFile(logPath, v, UUID.randomUUID().toString),
v, /* size */
v * 10 /* modTime */),
v)).asJava,
if (versions.isEmpty) -1 else versions.last)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,15 @@ trait MockFileSystemClientUtils extends MockEngineUtils {
deltaVersions.map(v => FileStatus.of(FileNames.deltaFile(logPath, v), v, v*10))
}

/* non-conforming name: name that is not "%020d.json" format */
def nonConformingDeltaFileStatuses(deltaVersions: Seq[Long]): Seq[FileStatus] = {
deltaVersions.map(v => FileStatus.of(
new Path(logPath, f"$v%020d.${UUID.randomUUID.toString}.json").toString,
v, /* size */
v*10 /* modTime */
))
}

/** Checkpoint file statuses where the timestamp = 10*version */
def singularCheckpointFileStatuses(checkpointVersions: Seq[Long]): Seq[FileStatus] = {
assert(checkpointVersions.size == checkpointVersions.toSet.size)
Expand Down

0 comments on commit 54b2edf

Please sign in to comment.