Skip to content

Commit

Permalink
[40] Add commit-level metadata and source-target id mapping
Browse files Browse the repository at this point in the history
  • Loading branch information
danielhumanmod committed Feb 16, 2025
1 parent 6816a83 commit 262f685
Show file tree
Hide file tree
Showing 21 changed files with 769 additions and 114 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.List;

import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

import org.apache.xtable.model.storage.PartitionFileGroup;
Expand All @@ -47,4 +48,6 @@ public class InternalSnapshot {
List<PartitionFileGroup> partitionedDataFiles;
// pending commits before latest commit on the table.
@Builder.Default List<Instant> pendingCommits = Collections.emptyList();
// commit identifier in source table
@NonNull String sourceIdentifier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.xtable.model;

import lombok.Builder;
import lombok.NonNull;
import lombok.Value;

import org.apache.xtable.model.storage.DataFilesDiff;
Expand All @@ -36,4 +37,7 @@ public class TableChange {

/** The {@link InternalTable} at the commit time to which this table change belongs. */
InternalTable tableAsOfChange;

// Commit identifier in source table
@NonNull String sourceIdentifier;
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,30 @@ public class TableSyncMetadata {
Instant lastInstantSynced;
List<Instant> instantsToConsiderForNextSync;
int version;
String sourceTableFormat;
String sourceIdentifier;

/**
* @deprecated Use {@link #of(Instant, List, String, String)} instead. This method exists for
* backward compatibility and will be removed in a future version.
*/
@Deprecated
public static TableSyncMetadata of(
Instant lastInstantSynced, List<Instant> instantsToConsiderForNextSync) {
return new TableSyncMetadata(lastInstantSynced, instantsToConsiderForNextSync, CURRENT_VERSION);
return TableSyncMetadata.of(lastInstantSynced, instantsToConsiderForNextSync, null, null);
}

public static TableSyncMetadata of(
Instant lastInstantSynced,
List<Instant> instantsToConsiderForNextSync,
String sourceTableFormat,
String sourceIdentifier) {
return new TableSyncMetadata(
lastInstantSynced,
instantsToConsiderForNextSync,
CURRENT_VERSION,
sourceTableFormat,
sourceIdentifier);
}

public String toJson() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,4 +78,18 @@ public interface ConversionSource<COMMIT> extends Closeable {
* false.
*/
boolean isIncrementalSyncSafeFrom(Instant instant);

/**
* Extract the identifier of the provided commit. The identifier is defined as:
*
* <ul>
* <li>Snapshot ID in Iceberg
* <li>Version ID in Delta
* <li>Timestamp in Hudi
* </ul>
*
* @param commit The provided commit
* @return the string version of the commit identifier
*/
String getCommitIdentifier(COMMIT commit);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,14 @@ public interface ConversionTarget {

/** Initializes the client with provided configuration */
void init(TargetTable targetTable, Configuration configuration);

/**
* Retrieves the commit identifier from the target table that corresponds to a given source table
* commit identifier
*
* @param sourceIdentifier the unique identifier of the source table commit
* @return an {@link Optional} containing the target commit identifier if a corresponding commit
* exists, or an empty {@link Optional} if no match is found
*/
Optional<String> getTargetCommitIdentifier(String sourceIdentifier);
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ public Map<String, SyncResult> syncSnapshot(
internalTable,
target -> target.syncFilesForSnapshot(snapshot.getPartitionedDataFiles()),
startTime,
snapshot.getPendingCommits()));
snapshot.getPendingCommits(),
snapshot.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync snapshot", e);
results.put(
Expand Down Expand Up @@ -121,7 +122,8 @@ public Map<String, List<SyncResult>> syncChanges(
change.getTableAsOfChange(),
target -> target.syncFilesForDiff(change.getFilesDiff()),
startTime,
changes.getPendingCommits()));
changes.getPendingCommits(),
change.getSourceIdentifier()));
} catch (Exception e) {
log.error("Failed to sync table changes", e);
resultsForFormat.add(buildResultForError(SyncMode.INCREMENTAL, startTime, e));
Expand Down Expand Up @@ -149,19 +151,26 @@ private SyncResult getSyncResult(
InternalTable tableState,
SyncFiles fileSyncMethod,
Instant startTime,
List<Instant> pendingCommits) {
List<Instant> pendingCommits,
String sourceIdentifier) {
// initialize the sync
conversionTarget.beginSync(tableState);
// Persist the latest commit time in table properties for incremental syncs
// Syncing metadata must precede the following steps to ensure that the metadata is available
// before committing
TableSyncMetadata latestState =
TableSyncMetadata.of(
tableState.getLatestCommitTime(),
pendingCommits,
tableState.getTableFormat(),
sourceIdentifier);
conversionTarget.syncMetadata(latestState);
// sync schema updates
conversionTarget.syncSchema(tableState.getReadSchema());
// sync partition updates
conversionTarget.syncPartitionSpec(tableState.getPartitioningFields());
// Update the files in the target table
fileSyncMethod.sync(conversionTarget);
// Persist the latest commit time in table properties for incremental syncs.
TableSyncMetadata latestState =
TableSyncMetadata.of(tableState.getLatestCommitTime(), pendingCommits);
conversionTarget.syncMetadata(latestState);
conversionTarget.completeSync();

return SyncResult.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ void jsonRoundTrip(TableSyncMetadata metadata, String expectedJson) {

private static Stream<Arguments> provideMetadataAndJson() {
return Stream.of(
// Old version of metadata and JSON
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"),
Expand All @@ -56,7 +57,24 @@ private static Stream<Arguments> provideMetadataAndJson() {
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"));
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0}"),
// New version of metadata and JSON with `sourceTableFormat` and `sourceIdentifier` fields
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"),
Arrays.asList(
Instant.parse("2020-08-21T11:15:30.00Z"),
Instant.parse("2024-01-21T12:15:30.00Z")),
"TEST",
"0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[\"2020-08-21T11:15:30Z\",\"2024-01-21T12:15:30Z\"],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(
Instant.parse("2020-07-04T10:15:30.00Z"), Collections.emptyList(), "TEST", "0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"instantsToConsiderForNextSync\":[],\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"),
Arguments.of(
TableSyncMetadata.of(Instant.parse("2020-07-04T10:15:30.00Z"), null, "TEST", "0"),
"{\"lastInstantSynced\":\"2020-07-04T10:15:30Z\",\"version\":0,\"sourceTableFormat\":\"TEST\",\"sourceIdentifier\":\"0\"}"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ public void extractSnapshot() {
InternalTable table = InternalTable.builder().latestCommitTime(Instant.now()).build();
List<PartitionFileGroup> dataFiles = Collections.emptyList();
InternalSnapshot internalSnapshot =
InternalSnapshot.builder().table(table).partitionedDataFiles(dataFiles).build();
InternalSnapshot.builder()
.table(table)
.partitionedDataFiles(dataFiles)
.sourceIdentifier("0")
.build();
when(mockConversionSource.getCurrentSnapshot()).thenReturn(internalSnapshot);
assertEquals(internalSnapshot, ExtractFromSource.of(mockConversionSource).extractSnapshot());
}
Expand Down Expand Up @@ -86,6 +90,7 @@ public void extractTableChanges() {
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
.sourceIdentifier("0")
.build();
when(mockConversionSource.getTableChangeForCommit(firstCommitToSync))
.thenReturn(tableChangeToReturnAtFirstInstant);
Expand All @@ -94,6 +99,7 @@ public void extractTableChanges() {
.tableAsOfChange(tableAtFirstInstant)
.filesDiff(
DataFilesDiff.builder().fileAdded(newFile1).fileRemoved(initialFile2).build())
.sourceIdentifier("0")
.build();

// add 2 new files, remove 2 files
Expand All @@ -110,6 +116,7 @@ public void extractTableChanges() {
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
.sourceIdentifier("1")
.build();
when(mockConversionSource.getTableChangeForCommit(secondCommitToSync))
.thenReturn(tableChangeToReturnAtSecondInstant);
Expand All @@ -121,6 +128,7 @@ public void extractTableChanges() {
.filesAdded(Arrays.asList(newFile2, newFile3))
.filesRemoved(Arrays.asList(initialFile3, newFile1))
.build())
.sourceIdentifier("1")
.build();

IncrementalTableChanges actual =
Expand Down
Loading

0 comments on commit 262f685

Please sign in to comment.