Skip to content

Commit

Permalink
[Kernel] Integrate table features utilities into the read and write path
Browse files Browse the repository at this point in the history
  • Loading branch information
vkorukanti committed Feb 14, 2025
1 parent 4e6d5fa commit 4e60a26
Show file tree
Hide file tree
Showing 12 changed files with 148 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static KernelException unsupportedReaderProtocol(
return new KernelException(message);
}

public static KernelException unsupportedReaderFeature(
public static KernelException unsupportedReaderFeatures(
String tablePath, Set<String> unsupportedFeatures) {
String message =
String.format(
Expand All @@ -179,7 +179,8 @@ public static KernelException unsupportedWriterProtocol(
return new KernelException(message);
}

public static KernelException unsupportedWriterFeature(String tablePath, String writerFeature) {
public static KernelException unsupportedWriterFeatures(
String tablePath, Set<String> writerFeature) {
String message =
String.format(
"Unsupported Delta writer feature: table `%s` requires writer table feature \"%s\" "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public CloseableIterator<ColumnarBatch> getChanges(
for (int rowId = 0; rowId < protocolVector.getSize(); rowId++) {
if (!protocolVector.isNullAt(rowId)) {
Protocol protocol = Protocol.fromColumnVector(protocolVector, rowId);
TableFeatures.validateReadSupportedTable(protocol, getDataPath().toString());
TableFeatures.validateKernelCanReadTheTable(protocol, getDataPath().toString());
}
}
if (shouldDropProtocolColumn) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static io.delta.kernel.internal.util.VectorUtils.stringArrayValue;
import static io.delta.kernel.internal.util.VectorUtils.stringStringMapValue;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toSet;

import io.delta.kernel.*;
import io.delta.kernel.engine.Engine;
Expand All @@ -36,6 +37,7 @@
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
import io.delta.kernel.internal.snapshot.SnapshotHint;
import io.delta.kernel.internal.tablefeatures.TableFeature;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.ColumnMapping;
import io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode;
Expand Down Expand Up @@ -150,16 +152,15 @@ public Transaction build(Engine engine) {
metadata = metadata.withNewConfiguration(newProperties);
}

Set<String> newWriterFeatures =
TableFeatures.extractAutomaticallyEnabledWriterFeatures(metadata, protocol);
Set<TableFeature> newWriterFeatures =
TableFeatures.extractAutomaticallyEnabledFeatures(metadata, protocol);
if (!newWriterFeatures.isEmpty()) {
logger.info("Automatically enabling writer features: {}", newWriterFeatures);
logger.info(
"Automatically enabling writer features: {}",
newWriterFeatures.stream().map(TableFeature::featureName).collect(toSet()));
shouldUpdateProtocol = true;
Set<String> oldWriterFeatures = protocol.getWriterFeatures();
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
Set<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(protocol, metadata, table.getPath(engine));
protocol = protocol.withFeatures(newWriterFeatures);
TableFeatures.validateKernelCanWriteTheTable(protocol, metadata, table.getPath(engine));
}
}

Expand All @@ -183,7 +184,7 @@ public Transaction build(Engine engine) {
private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable) {
String tablePath = table.getPath(engine);
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateWriteSupportedTable(
TableFeatures.validateKernelCanWriteTheTable(
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);

if (!isNewTable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ private TransactionCommitResult commitWithRetry(
// If row tracking is supported, assign base row IDs and default row commit versions to any
// AddFile actions that do not yet have them. If the row ID high watermark changes, emit a
// DomainMetadata action to update it.
if (TableFeatures.isRowTrackingSupported(protocol)) {
if (TableFeatures.isRowTrackingEnabled(protocol)) {
domainMetadatas =
RowTracking.updateRowIdHighWatermarkIfNeeded(
readSnapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -381,21 +381,4 @@ public Protocol merge(Protocol... others) {
// in the set with reader version 2 unless the writer version is at least 5.
return mergedProtocol.denormalizedNormalized();
}

/////////////////////////////////////////////////////////////////////////////////////////////////
/// Legacy method which will be removed after the table feature integration is done ///
/////////////////////////////////////////////////////////////////////////////////////////////////
public Protocol withNewWriterFeatures(Set<String> writerFeatures) {
Tuple2<Integer, Integer> newProtocolVersions =
TableFeatures.minProtocolVersionFromAutomaticallyEnabledFeatures(writerFeatures);
Set<String> newWriterFeatures = new HashSet<>(writerFeatures);
if (this.writerFeatures != null) {
newWriterFeatures.addAll(this.writerFeatures);
}
return new Protocol(
newProtocolVersions._1,
newProtocolVersions._2,
this.readerFeatures == null ? null : new HashSet<>(this.readerFeatures),
newWriterFeatures);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED;
import static io.delta.kernel.internal.TableConfig.LOG_RETENTION;
import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs;
import static io.delta.kernel.internal.tablefeatures.TableFeatures.validateWriteSupportedTable;
import static io.delta.kernel.internal.util.Utils.singletonCloseableIterator;

import io.delta.kernel.data.ColumnarBatch;
Expand All @@ -32,6 +31,7 @@
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.tablefeatures.TableFeatures;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.utils.CloseableIterator;
import io.delta.kernel.utils.FileStatus;
Expand Down Expand Up @@ -65,7 +65,7 @@ public static void checkpoint(Engine engine, Clock clock, SnapshotImpl snapshot)
logger.info("{}: Starting checkpoint for version: {}", tablePath, version);

// Check if writing to the given table protocol version/features is supported in Kernel
validateWriteSupportedTable(
TableFeatures.validateKernelCanWriteTheTable(
snapshot.getProtocol(), snapshot.getMetadata(), snapshot.getDataPath().toString());

final Path checkpointPath = FileNames.checkpointFileSingular(logPath, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public TransactionRebaseState resolveConflicts(Engine engine) throws ConcurrentW
CloseableIterable<Row> updatedDataActions = attemptDataActions;
List<DomainMetadata> updatedDomainMetadatas = transaction.getDomainMetadatas();

if (TableFeatures.isRowTrackingSupported(transaction.getProtocol())) {
if (TableFeatures.isRowTrackingEnabled(transaction.getProtocol())) {
updatedDomainMetadatas =
RowTracking.updateRowIdHighWatermarkIfNeeded(
snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ protected Tuple2<Protocol, Metadata> loadTableProtocolAndMetadata(

if (protocol != null) {
// Stop since we have found the latest Protocol and Metadata.
TableFeatures.validateReadSupportedTable(protocol, dataPath.toString());
TableFeatures.validateKernelCanReadTheTable(protocol, dataPath.toString());
return new Tuple2<>(protocol, metadata);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public static CloseableIterable<Row> assignBaseRowIdAndDefaultRowCommitVersion(
long currCommitVersion,
CloseableIterable<Row> txnDataActions) {
checkArgument(
TableFeatures.isRowTrackingSupported(txnProtocol),
TableFeatures.isRowTrackingEnabled(txnProtocol),
"Base row ID and default row commit version are assigned "
+ "only when feature 'rowTracking' is supported.");

Expand Down Expand Up @@ -154,7 +154,7 @@ public static List<DomainMetadata> updateRowIdHighWatermarkIfNeeded(
CloseableIterable<Row> txnDataActions,
List<DomainMetadata> txnDomainMetadatas) {
checkArgument(
TableFeatures.isRowTrackingSupported(txnProtocol),
TableFeatures.isRowTrackingEnabled(txnProtocol),
"Row ID high watermark is updated only when feature 'rowTracking' is supported.");

// Filter out existing row tracking domainMetadata action, if any
Expand Down
Loading

0 comments on commit 4e60a26

Please sign in to comment.