diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java index 702b68dabf..e5e990c500 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/DeltaErrors.java @@ -140,6 +140,14 @@ public static KernelException invalidVersionRange(long startVersion, long endVer } /* ------------------------ PROTOCOL EXCEPTIONS ----------------------------- */ + public static KernelException unsupportedTableFeature(String feature) { + String message = + String.format( + "Unsupported Delta table feature: table requires feature \"%s\" " + + "which is unsupported by this version of Delta Kernel.", + feature); + return new KernelException(message); + } public static KernelException unsupportedReaderProtocol( String tablePath, int tableReaderVersion) { diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java index bcd0512b1a..b6c6e3cb52 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableConfig.java @@ -34,6 +34,77 @@ public class TableConfig { // TableConfigs // ////////////////// + /** + * Whether this Delta table is append-only. Files can't be deleted, or values can't be updated. + */ + public static final TableConfig APPEND_ONLY_ENABLED = + new TableConfig<>( + "delta.appendOnly", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + + /** + * Enable change data feed output. When enabled, DELETE, UPDATE, and MERGE INTO operations will + * need to do additional work to output their change data in an efficiently readable format. + */ + public static final TableConfig CHANGE_DATA_FEED_ENABLED = + new TableConfig<>( + "delta.enableChangeDataFeed", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + + public static final TableConfig CHECKPOINT_POLICY = + new TableConfig<>( + "delta.checkpointPolicy", + "classic", + v -> v, + value -> value.equals("classic") || value.equals("v2"), + "needs to be a string and one of 'classic' or 'v2'.", + true); + + /** Whether commands modifying this Delta table are allowed to create new deletion vectors. */ + public static final TableConfig DELETION_VECTORS_CREATION_ENABLED = + new TableConfig<>( + "delta.enableDeletionVectors", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + + /** + * Whether widening the type of an existing column or field is allowed, either manually using + * ALTER TABLE CHANGE COLUMN or automatically if automatic schema evolution is enabled. + */ + public static final TableConfig TYPE_WIDENING_ENABLED = + new TableConfig<>( + "delta.enableTypeWidening", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + + /** + * Indicates whether Row Tracking is enabled on the table. When this flag is turned on, all rows + * are guaranteed to have Row IDs and Row Commit Versions assigned to them, and writers are + * expected to preserve them by materializing them to hidden columns in the data files. + */ + public static final TableConfig ROW_TRACKING_ENABLED = + new TableConfig<>( + "delta.enableRowTracking", + "false", + Boolean::valueOf, + value -> true, + "needs to be a boolean.", + true); + /** * The shortest duration we have to keep logically deleted data files around before deleting them * physically. @@ -175,6 +246,17 @@ public class TableConfig { Collections.unmodifiableMap( new HashMap>() { { + addConfig(this, APPEND_ONLY_ENABLED); + addConfig(this, CHANGE_DATA_FEED_ENABLED); + addConfig(this, CHECKPOINT_POLICY); + addConfig(this, DELETION_VECTORS_CREATION_ENABLED); + addConfig(this, TYPE_WIDENING_ENABLED); + addConfig(this, ROW_TRACKING_ENABLED); + addConfig(this, LOG_RETENTION); + addConfig(this, EXPIRED_LOG_CLEANUP_ENABLED); + addConfig(this, TOMBSTONE_RETENTION); + addConfig(this, CHECKPOINT_INTERVAL); + addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED); addConfig(this, TOMBSTONE_RETENTION); addConfig(this, CHECKPOINT_INTERVAL); addConfig(this, IN_COMMIT_TIMESTAMPS_ENABLED); diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java new file mode 100644 index 0000000000..78f478320c --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/FeatureAutoEnabledByMetadata.java @@ -0,0 +1,36 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.internal.tablefeatures; + +import io.delta.kernel.internal.actions.Metadata; +import io.delta.kernel.internal.actions.Protocol; + +/** + * Defines behavior for {@link TableFeature} that can be automatically enabled via a change in a + * table's metadata, e.g., through setting particular values of certain feature-specific table + * properties. When the requirements are satisfied, the feature is automatically enabled. + */ +public interface FeatureAutoEnabledByMetadata { + /** + * Determine whether the feature must be supported and enabled because its metadata requirements + * are satisfied. + * + * @param protocol the protocol of the table for features that are already enabled. + * @param metadata the metadata of the table for properties that can enable the feature. + */ + boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java new file mode 100644 index 0000000000..4767f0f120 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeature.java @@ -0,0 +1,226 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.tablefeatures; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.internal.actions.Metadata; +import java.util.Collections; +import java.util.Set; + +/** + * Base class for table features. + * + *

A feature can be explicitly supported by a table's protocol when the protocol contains + * a feature's `name`. Writers (for writer-only features) or readers and writers (for reader-writer + * features) must recognize supported features and must handle them appropriately. + * + *

A table feature that released before Delta Table Features (reader version 3 and writer version + * 7) is considered as a legacy feature. Legacy features are implicitly + * supported when (a) the protocol does not support table features, i.e., has reader + * version less than 3 or writer version less than 7 and (b) the feature's minimum reader/writer + * version is less than or equal to the current protocol's reader/writer version. + * + *

Separately, a feature can be automatically supported by a table's metadata when certain + * feature-specific table properties are set. For example, `changeDataFeed` is automatically + * supported when there's a table property `delta.enableChangeDataFeed=true`. See {@link + * FeatureAutoEnabledByMetadata} for details on how to define such features. This is independent of + * the table's enabled features. When a feature is supported (explicitly or implicitly) by the table + * protocol but its metadata requirements are not satisfied, then clients still have to understand + * the feature (at least to the extent that they can read and preserve the existing data in the + * table that uses the feature). + */ +public abstract class TableFeature { + + ///////////////////////////////////////////////////////////////////////////////// + /// Instance variables. /// + ///////////////////////////////////////////////////////////////////////////////// + private final String featureName; + private final int minReaderVersion; + private final int minWriterVersion; + + ///////////////////////////////////////////////////////////////////////////////// + /// Public methods. /// + ///////////////////////////////////////////////////////////////////////////////// + /** + * Constructor. Does validations to make sure: + * + *

    + *
  • Feature name is not null or empty and has valid characters + *
  • minReaderVersion is always 0 for writer features + *
  • all legacy features can be auto applied based on the metadata and protocol + * + * @param featureName a globally-unique string indicator to represent the feature. All characters + * must be letters (a-z, A-Z), digits (0-9), '-', or '_'. Words must be in camelCase. + * @param minReaderVersion the minimum reader version this feature requires. For a feature that + * can only be explicitly supported, this is either `0` (i.e writerOnly feature) or `3` (the + * reader protocol version that supports table features), depending on the feature is + * writer-only or reader-writer. For a legacy feature that can be implicitly supported, this + * is the first protocol version which the feature is introduced. + * @param minWriterVersion the minimum writer version this feature requires. For a feature that + * can only be explicitly supported, this is the writer protocol `7` that supports table + * features. For a legacy feature that can be implicitly supported, this is the first protocol + * version which the feature is introduced. + */ + public TableFeature(String featureName, int minReaderVersion, int minWriterVersion) { + this.featureName = requireNonNull(featureName, "name is null"); + checkArgument(!featureName.isEmpty(), "name is empty"); + checkArgument( + featureName.chars().allMatch(c -> Character.isLetterOrDigit(c) || c == '-' || c == '_'), + "name contains invalid characters: " + featureName); + checkArgument(minReaderVersion >= 0, "minReaderVersion is negative"); + checkArgument(minWriterVersion >= 1, "minWriterVersion is less than 1"); + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; + + validate(); + } + + /** @return the name of the table feature. */ + public String featureName() { + return featureName; + } + + /** + * @return true if this feature is applicable to both reader and writer, false if it is + * writer-only. + */ + public boolean isReaderWriterFeature() { + return this instanceof ReaderWriterFeatureType; + } + + /** @return the minimum reader version this feature requires */ + public int minReaderVersion() { + return minReaderVersion; + } + + /** @return the minimum writer version that this feature requires. */ + public int minWriterVersion() { + return minWriterVersion; + } + + /** @return if this feature is a legacy feature? */ + public boolean isLegacyFeature() { + return this instanceof LegacyFeatureType; + } + + /** + * Set of table features that this table feature depends on. I.e. the set of features that need to + * be enabled if this table feature is enabled. + * + * @return the set of table features that this table feature depends on. + */ + public Set requiredFeatures() { + return Collections.emptySet(); + } + + /** + * Does Kernel has support to read a table containing this feature? Default implementation returns + * true. Features should override this method if they have special requirements or not supported + * by the Kernel yet. + * + * @return true if Kernel has support to read a table containing this feature. + */ + public boolean hasKernelReadSupport() { + checkArgument(isReaderWriterFeature(), "Should be called only for reader-writer features"); + return true; + } + + /** + * Does Kernel has support to write a table containing this feature? Default implementation + * returns true. Features should override this method if they have special requirements or not + * supported by the Kernel yet. + * + * @param metadata the metadata of the table. Sometimes checking the metadata is necessary to know + * the Kernel can write the table or not. + * @return true if Kernel has support to write a table containing this feature. + */ + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } + + ///////////////////////////////////////////////////////////////////////////////// + /// Define the {@link TableFeature}s traits that define behavior/attributes. /// + ///////////////////////////////////////////////////////////////////////////////// + /** + * An interface to indicate a feature is legacy, i.e., released before Table Features. All legacy + * features are auto enabled by metadata. + */ + public interface LegacyFeatureType extends FeatureAutoEnabledByMetadata {} + + /** An interface to indicate a feature applies to readers and writers. */ + public interface ReaderWriterFeatureType {} + + ///////////////////////////////////////////////////////////////////////////////// + /// Base classes for each of the feature category. /// + ///////////////////////////////////////////////////////////////////////////////// + /** A base class for all table legacy writer-only features. */ + public abstract static class LegacyWriterFeature extends TableFeature + implements LegacyFeatureType { + public LegacyWriterFeature(String featureName, int minWriterVersion) { + super(featureName, /* minReaderVersion = */ 0, minWriterVersion); + } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + } + + /** A base class for all table legacy reader-writer features. */ + public abstract static class LegacyReaderWriterFeature extends TableFeature + implements LegacyFeatureType, ReaderWriterFeatureType { + public LegacyReaderWriterFeature( + String featureName, int minReaderVersion, int minWriterVersion) { + super(featureName, minReaderVersion, minWriterVersion); + } + } + + /** A base class for all non-legacy table writer features. */ + public abstract static class WriterFeature extends TableFeature { + public WriterFeature(String featureName, int minWriterVersion) { + super(featureName, /* minReaderVersion = */ 0, minWriterVersion); + } + + @Override + public boolean hasKernelReadSupport() { + return true; + } + } + + /** A base class for all non-legacy table reader-writer features. */ + public abstract static class ReaderWriterFeature extends TableFeature + implements ReaderWriterFeatureType { + public ReaderWriterFeature(String featureName, int minReaderVersion, int minWriterVersion) { + super(featureName, minReaderVersion, minWriterVersion); + } + } + + /** + * Validate the table feature. This method should throw an exception if the table feature + * properties are invalid. Should be called after the object deriving the {@link TableFeature} is + * constructed. + */ + private void validate() { + if (!isReaderWriterFeature()) { + checkArgument(minReaderVersion() == 0, "Writer-only feature must have minReaderVersion=0"); + } + } + + // Important note: uses the default implementation of `equals` and `hashCode` methods. + // We expect that the feature instances are singletons, so we don't need to compare the fields. +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java index 889d5d41f5..314df4d7dc 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/tablefeatures/TableFeatures.java @@ -17,14 +17,20 @@ package io.delta.kernel.internal.tablefeatures; import static io.delta.kernel.internal.DeltaErrors.*; -import static io.delta.kernel.internal.TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED; +import static io.delta.kernel.internal.util.ColumnMapping.ColumnMappingMode.NONE; +import static io.delta.kernel.types.TimestampNTZType.TIMESTAMP_NTZ; +import static io.delta.kernel.types.VariantType.VARIANT; +import io.delta.kernel.exceptions.KernelException; import io.delta.kernel.internal.DeltaErrors; +import io.delta.kernel.internal.TableConfig; import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.actions.Protocol; +import io.delta.kernel.internal.util.CaseInsensitiveMap; import io.delta.kernel.internal.util.SchemaUtils; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.types.DataType; +import io.delta.kernel.types.FieldMetadata; import io.delta.kernel.types.StructType; import java.util.*; import java.util.stream.Collectors; @@ -32,6 +38,356 @@ /** Contains utility methods related to the Delta table feature support in protocol. */ public class TableFeatures { + ///////////////////////////////////////////////////////////////////////////////// + /// START: Define the {@link TableFeature}s /// + /// If feature instance variable ends with /// + /// 1) `_W_FEATURE` it is a writer only feature. /// + /// 2) `_RW_FEATURE` it is a reader-writer feature. /// + ///////////////////////////////////////////////////////////////////////////////// + public static final TableFeature APPEND_ONLY_W_FEATURE = new AppendOnlyFeature(); + + private static class AppendOnlyFeature extends TableFeature.LegacyWriterFeature { + AppendOnlyFeature() { + super(/* featureName = */ "appendOnly", /* minWriterVersion = */ 2); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.APPEND_ONLY_ENABLED.fromMetadata(metadata); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return true; + } + } + + public static final TableFeature INVARIANTS_W_FEATURE = new InvariantsFeature(); + + private static class InvariantsFeature extends TableFeature.LegacyWriterFeature { + InvariantsFeature() { + super(/* featureName = */ "invariants", /* minWriterVersion = */ 2); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasInvariants(metadata.getSchema()); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + // If there is no invariant, then the table is supported + return !hasInvariants(metadata.getSchema()); + } + } + + public static final TableFeature CONSTRAINTS_W_FEATURE = new ConstraintsFeature(); + + private static class ConstraintsFeature extends TableFeature.LegacyWriterFeature { + ConstraintsFeature() { + super("checkConstraints", /* minWriterVersion = */ 3); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + // Kernel doesn't support table with constraints. + return !hasCheckConstraints(metadata); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasCheckConstraints(metadata); + } + } + + public static final TableFeature CHANGE_DATA_FEED_W_FEATURE = new ChangeDataFeedFeature(); + + private static class ChangeDataFeedFeature extends TableFeature.LegacyWriterFeature { + ChangeDataFeedFeature() { + super("changeDataFeed", /* minWriterVersion = */ 4); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return false; // TODO: yet to be implemented in Kernel + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.CHANGE_DATA_FEED_ENABLED.fromMetadata(metadata); + } + } + + public static final TableFeature COLUMN_MAPPING_RW_FEATURE = new ColumnMappingFeature(); + + private static class ColumnMappingFeature extends TableFeature.LegacyReaderWriterFeature { + ColumnMappingFeature() { + super("columnMapping", /*minReaderVersion = */ 2, /* minWriterVersion = */ 5); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.COLUMN_MAPPING_MODE.fromMetadata(metadata) != NONE; + } + } + + public static final TableFeature GENERATED_COLUMNS_W_FEATURE = new GeneratedColumnsFeature(); + + private static class GeneratedColumnsFeature extends TableFeature.LegacyWriterFeature { + GeneratedColumnsFeature() { + super("generatedColumns", /* minWriterVersion = */ 4); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + // Kernel can write as long as there are no generated columns defined + return !hasGeneratedColumns(metadata); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasGeneratedColumns(metadata); + } + } + + public static final TableFeature IDENTITY_COLUMNS_W_FEATURE = new IdentityColumnsFeature(); + + private static class IdentityColumnsFeature extends TableFeature.LegacyWriterFeature { + IdentityColumnsFeature() { + super("identityColumns", /* minWriterVersion = */ 6); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return !hasIdentityColumns(metadata); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasIdentityColumns(metadata); + } + } + + public static final TableFeature VARIANT_RW_FEATURE = new VariantTypeTableFeature("variantType"); + public static final TableFeature VARIANT_RW_PREVIEW_FEATURE = + new VariantTypeTableFeature("variantType-preview"); + + private static class VariantTypeTableFeature extends TableFeature.ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + VariantTypeTableFeature(String featureName) { + super( + /* featureName = */ featureName, /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasTypeColumn(metadata.getSchema(), VARIANT); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return false; // TODO: yet to be implemented in Kernel + } + } + + public static final TableFeature DOMAIN_METADATA_W_FEATURE = new DomainMetadataFeature(); + + private static class DomainMetadataFeature extends TableFeature.WriterFeature { + DomainMetadataFeature() { + super("domainMetadata", /* minWriterVersion = */ 7); + } + } + + public static final TableFeature ROW_TRACKING_W_FEATURE = new RowTrackingFeature(); + + private static class RowTrackingFeature extends TableFeature.WriterFeature + implements FeatureAutoEnabledByMetadata { + RowTrackingFeature() { + super("rowTracking", /* minWriterVersion = */ 7); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.ROW_TRACKING_ENABLED.fromMetadata(metadata); + } + + @Override + public Set requiredFeatures() { + return Collections.singleton(DOMAIN_METADATA_W_FEATURE); + } + } + + public static final TableFeature DELETION_VECTORS_RW_FEATURE = new DeletionVectorsTableFeature(); + + /** + * Kernel currently only support blind appends. So we don't need to do anything special for + * writing into a table with deletion vectors enabled (i.e a table feature with DV enabled is both + * readable and writable. + */ + private static class DeletionVectorsTableFeature extends TableFeature.ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + DeletionVectorsTableFeature() { + super("deletionVectors", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.DELETION_VECTORS_CREATION_ENABLED.fromMetadata(metadata); + } + } + + public static final TableFeature ICEBERG_COMPAT_V2_W_FEATURE = new IcebergCompatV2TableFeature(); + + private static class IcebergCompatV2TableFeature extends TableFeature.WriterFeature + implements FeatureAutoEnabledByMetadata { + IcebergCompatV2TableFeature() { + super("icebergCompatV2", /* minWriterVersion = */ 7); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.ICEBERG_COMPAT_V2_ENABLED.fromMetadata(metadata); + } + + public @Override Set requiredFeatures() { + return Collections.singleton(COLUMN_MAPPING_RW_FEATURE); + } + } + + public static final TableFeature TYPE_WIDENING_RW_FEATURE = + new TypeWideningTableFeature("typeWidening"); + public static final TableFeature TYPE_WIDENING_PREVIEW_TABLE_FEATURE = + new TypeWideningTableFeature("typeWidening-preview"); + + private static class TypeWideningTableFeature extends TableFeature.ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + TypeWideningTableFeature(String featureName) { + super(featureName, /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.TYPE_WIDENING_ENABLED.fromMetadata(metadata); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return false; // TODO: yet to support it. + } + } + + public static final TableFeature IN_COMMIT_TIMESTAMP_W_FEATURE = + new InCommitTimestampTableFeature(); + + private static class InCommitTimestampTableFeature extends TableFeature.WriterFeature + implements FeatureAutoEnabledByMetadata { + InCommitTimestampTableFeature() { + super("inCommitTimestamp", /* minWriterVersion = */ 7); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); + } + } + + public static final TableFeature TIMESTAMP_NTZ_RW_FEATURE = new TimestampNtzTableFeature(); + + private static class TimestampNtzTableFeature extends TableFeature.ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + TimestampNtzTableFeature() { + super("timestampNtz", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean hasKernelWriteSupport(Metadata metadata) { + return false; // TODO: yet to be implemented in Kernel + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + return hasTypeColumn(metadata.getSchema(), TIMESTAMP_NTZ); + } + } + + public static final TableFeature CHECKPOINT_V2_RW_FEATURE = new CheckpointV2TableFeature(); + + /** + * In order to commit, there is no extra work required when v2 checkpoint is enabled. This affects + * the checkpoint format only. When v2 is enabled, writing classic checkpoints is still allowed. + */ + private static class CheckpointV2TableFeature extends TableFeature.ReaderWriterFeature + implements FeatureAutoEnabledByMetadata { + CheckpointV2TableFeature() { + super("v2Checkpoint", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + + @Override + public boolean metadataRequiresFeatureToBeEnabled(Protocol protocol, Metadata metadata) { + // TODO: define an enum for checkpoint policy when we start supporting writing v2 checkpoints + return "v2".equals(TableConfig.CHECKPOINT_POLICY.fromMetadata(metadata)); + } + } + + public static final TableFeature VACUUM_PROTOCOL_CHECK_RW_FEATURE = + new VacuumProtocolCheckTableFeature(); + + private static class VacuumProtocolCheckTableFeature extends TableFeature.ReaderWriterFeature { + VacuumProtocolCheckTableFeature() { + super("vacuumProtocolCheck", /* minReaderVersion = */ 3, /* minWriterVersion = */ 7); + } + } + + ///////////////////////////////////////////////////////////////////////////////// + /// END: Define the {@link TableFeature}s /// + ///////////////////////////////////////////////////////////////////////////////// + + public static final List TABLE_FEATURES = + Collections.unmodifiableList( + Arrays.asList( + APPEND_ONLY_W_FEATURE, + CHECKPOINT_V2_RW_FEATURE, + CHANGE_DATA_FEED_W_FEATURE, + COLUMN_MAPPING_RW_FEATURE, + CONSTRAINTS_W_FEATURE, + DELETION_VECTORS_RW_FEATURE, + GENERATED_COLUMNS_W_FEATURE, + DOMAIN_METADATA_W_FEATURE, + ICEBERG_COMPAT_V2_W_FEATURE, + IDENTITY_COLUMNS_W_FEATURE, + IN_COMMIT_TIMESTAMP_W_FEATURE, + INVARIANTS_W_FEATURE, + ROW_TRACKING_W_FEATURE, + TIMESTAMP_NTZ_RW_FEATURE, + TYPE_WIDENING_PREVIEW_TABLE_FEATURE, + TYPE_WIDENING_RW_FEATURE, + VACUUM_PROTOCOL_CHECK_RW_FEATURE, + VARIANT_RW_FEATURE, + VARIANT_RW_PREVIEW_FEATURE)); + + public static final Map TABLE_FEATURE_MAP = + Collections.unmodifiableMap( + new CaseInsensitiveMap() { + { + for (TableFeature feature : TABLE_FEATURES) { + put(feature.featureName(), feature); + } + } + }); + + /** Get the table feature by name. Case-insensitive lookup. If not found, throws error. */ + public static TableFeature getTableFeature(String featureName) { + TableFeature tableFeature = TABLE_FEATURE_MAP.get(featureName); + if (tableFeature == null) { + throw DeltaErrors.unsupportedTableFeature(featureName); + } + return tableFeature; + } + + ///////////////////////////////////////////////////////////////////////////////// + /// Everything below will be removed once the Kernel upgrades to use the /// + /// above interfaces. /// + ///////////////////////////////////////////////////////////////////////////////// private static final Set SUPPORTED_WRITER_FEATURES = Collections.unmodifiableSet( new HashSet() { @@ -257,7 +613,7 @@ private static boolean metadataRequiresWriterFeatureToBeEnabled( Metadata metadata, String feature) { switch (feature) { case "inCommitTimestamp": - return IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); + return TableConfig.IN_COMMIT_TIMESTAMPS_ENABLED.fromMetadata(metadata); default: return false; } @@ -278,20 +634,28 @@ private static void validateNoInvariants(StructType tableSchema) { } } - static boolean hasInvariants(StructType tableSchema) { + private static boolean hasInvariants(StructType tableSchema) { return !SchemaUtils.filterRecursively( tableSchema, - /* recurseIntoMapOrArrayElements = */ false, // constraints are not allowed in maps or + /* recurseIntoMapOrArrayElements = */ false, // invariants are not allowed in maps or + // arrays // arrays /* stopOnFirstMatch */ true, /* filter */ field -> field.getMetadata().contains("delta.invariants")) .isEmpty(); } + private static boolean hasCheckConstraints(Metadata metadata) { + return metadata.getConfiguration().entrySet().stream() + .findAny() + .map(entry -> entry.getKey().startsWith("delta.constraints.")) + .orElse(false); + } + /** * Check if the table schema has a column of type. Caution: works only for the primitive types. */ - static boolean hasTypeColumn(StructType tableSchema, DataType type) { + private static boolean hasTypeColumn(StructType tableSchema, DataType type) { return !SchemaUtils.filterRecursively( tableSchema, /* recurseIntoMapOrArrayElements = */ true, @@ -299,4 +663,42 @@ static boolean hasTypeColumn(StructType tableSchema, DataType type) { /* filter */ field -> field.getDataType().equals(type)) .isEmpty(); } + + private static boolean hasIdentityColumns(Metadata metadata) { + return !SchemaUtils.filterRecursively( + metadata.getSchema(), + /* recurseIntoMapOrArrayElements = */ false, // don't expected identity columns in + // nested columns + /* stopOnFirstMatch */ true, + /* filter */ field -> { + FieldMetadata fieldMetadata = field.getMetadata(); + + // Check if the metadata contains the required keys + boolean hasStart = fieldMetadata.contains("delta.identity.start"); + boolean hasStep = fieldMetadata.contains("delta.identity.step"); + boolean hasInsert = fieldMetadata.contains("delta.identity.allowExplicitInsert"); + + // Verify that all or none of the three fields are present + if (!((hasStart == hasStep) && (hasStart == hasInsert))) { + throw new KernelException( + String.format( + "Inconsistent IDENTITY metadata for column %s detected: %s, %s, %s", + field.getName(), hasStart, hasStep, hasInsert)); + } + + // Return true only if all three fields are present + return hasStart && hasStep && hasInsert; + }) + .isEmpty(); + } + + private static boolean hasGeneratedColumns(Metadata metadata) { + return !SchemaUtils.filterRecursively( + metadata.getSchema(), + /* recurseIntoMapOrArrayElements = */ false, // don't expected generated columns in + // nested columns + /* stopOnFirstMatch */ true, + /* filter */ field -> field.getMetadata().contains("delta.generationExpression")) + .isEmpty(); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CaseInsensitiveMap.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CaseInsensitiveMap.java new file mode 100644 index 0000000000..77f947e429 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/util/CaseInsensitiveMap.java @@ -0,0 +1,106 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.util; + +import static io.delta.kernel.internal.util.Preconditions.checkArgument; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Locale; +import java.util.Map; +import java.util.Set; + +/** + * A map that is case-insensitive in its keys. This map is not thread-safe, and key is + * case-insensitive and of string type. + * + * @param + */ +public class CaseInsensitiveMap implements Map { + private final Map innerMap = new HashMap<>(); + + @Override + public V get(Object key) { + return innerMap.get(toLowerCase(key)); + } + + @Override + public V put(String key, V value) { + return innerMap.put(toLowerCase(key), value); + } + + @Override + public void putAll(Map m) { + // behavior of this method is not defined on how to handle duplicates + // don't support this use case, as it is not needed in Kernel + throw new UnsupportedOperationException("putAll"); + } + + @Override + public V remove(Object key) { + return innerMap.remove(toLowerCase(key)); + } + + @Override + public boolean containsKey(Object key) { + return innerMap.containsKey(toLowerCase(key)); + } + + @Override + public boolean containsValue(Object value) { + return innerMap.containsValue(value); + } + + @Override + public Set keySet() { + // no need to convert to lower case here as the inserted keys are already in lower case + return innerMap.keySet(); + } + + @Override + public Set> entrySet() { + // no need to convert to lower case here as the inserted keys are already in lower case + return innerMap.entrySet(); + } + + @Override + public Collection values() { + return innerMap.values(); + } + + @Override + public int size() { + return innerMap.size(); + } + + @Override + public boolean isEmpty() { + return innerMap.isEmpty(); + } + + @Override + public void clear() { + innerMap.clear(); + } + + private String toLowerCase(Object key) { + if (key == null) { + return null; + } + checkArgument(key instanceof String, "Key must be a string"); + return ((String) key).toLowerCase(Locale.ROOT); + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala index 1410025329..5af3569865 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/tablefeatures/TableFeaturesSuite.scala @@ -18,11 +18,14 @@ package io.delta.kernel.internal.tablefeatures import io.delta.kernel.data.{ArrayValue, ColumnVector, MapValue} import io.delta.kernel.exceptions.KernelException import io.delta.kernel.internal.actions.{Format, Metadata, Protocol} -import io.delta.kernel.internal.tablefeatures.TableFeatures.validateWriteSupportedTable +import io.delta.kernel.internal.tablefeatures.TableFeatures.{TABLE_FEATURES, validateWriteSupportedTable} import io.delta.kernel.internal.util.InternalUtils.singletonStringColumnVector +import io.delta.kernel.internal.util.VectorUtils.stringVector import io.delta.kernel.types._ import org.scalatest.funsuite.AnyFunSuite +import java.util.stream.Collectors +import java.util.stream.Collectors.toList import java.util.{Collections, Optional} import scala.collection.JavaConverters._ @@ -30,6 +33,150 @@ import scala.collection.JavaConverters._ * Suite that tests Kernel throws error when it receives a unsupported protocol and metadata */ class TableFeaturesSuite extends AnyFunSuite { + + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Tests for [[TableFeature]] implementations // + ///////////////////////////////////////////////////////////////////////////////////////////////// + val readerWriterFeatures = Seq("columnMapping", "deletionVectors", "timestampNtz", + "typeWidening", "typeWidening-preview", "v2Checkpoint", "vacuumProtocolCheck", + "variantType", "variantType-preview") + + val writerOnlyFeatures = Seq("appendOnly", "invariants", "checkConstraints", + "generatedColumns", "changeDataFeed", "identityColumns", + "rowTracking", "domainMetadata", "icebergCompatV2", "inCommitTimestamp") + + val legacyFeatures = Seq("appendOnly", "invariants", "checkConstraints", + "generatedColumns", "changeDataFeed", "identityColumns", "columnMapping") + + test("basic properties checks") { + + // Check that all features are correctly classified as reader-writer or writer-only + readerWriterFeatures.foreach { feature => + assert(TableFeatures.getTableFeature(feature).isReaderWriterFeature) + } + writerOnlyFeatures.foreach { feature => + assert(!TableFeatures.getTableFeature(feature).isReaderWriterFeature) + } + + // Check that legacy features are correctly classified as legacy features + (readerWriterFeatures ++ writerOnlyFeatures) foreach { feature => + if (legacyFeatures.contains(feature)) { + assert(TableFeatures.getTableFeature(feature).isLegacyFeature) + } else { + assert(!TableFeatures.getTableFeature(feature).isLegacyFeature) + } + } + + // all expected features are present in list. Just make sure we don't miss any + // adding to the list. This is the list used to iterate over all features + assert( + TableFeatures.TABLE_FEATURES.size() == readerWriterFeatures.size + writerOnlyFeatures.size) + } + + val testProtocol = new Protocol(1, 2, Collections.emptyList(), Collections.emptyList()) + Seq( + // Test feature, metadata, expected result + ("appendOnly", testMetadata(tblProps = Map("delta.appendOnly" -> "true")), true), + ("appendOnly", testMetadata(tblProps = Map("delta.appendOnly" -> "false")), false), + ("invariants", testMetadata(includeInvaraint = true), true), + ("invariants", testMetadata(includeInvaraint = false), false), + ("checkConstraints", testMetadata(tblProps = Map("delta.constraints.a" -> "a = b")), true), + ("checkConstraints", testMetadata(), false), + ("generatedColumns", testMetadata(includeGeneratedColumn = true), true), + ("generatedColumns", testMetadata(includeGeneratedColumn = false), false), + ("changeDataFeed", + testMetadata(tblProps = Map("delta.enableChangeDataFeed" -> "true")), true), + ("changeDataFeed", + testMetadata(tblProps = Map("delta.enableChangeDataFeed" -> "false")), false), + ("identityColumns", testMetadata(includeIdentityColumn = true), true), + ("identityColumns", testMetadata(includeIdentityColumn = false), false), + ("columnMapping", testMetadata(tblProps = Map("delta.columnMapping.mode" -> "id")), true), + ("columnMapping", testMetadata(tblProps = Map("delta.columnMapping.mode" -> "none")), false), + ("typeWidening-preview", + testMetadata(tblProps = Map("delta.enableTypeWidening" -> "true")), true), + ("typeWidening-preview", + testMetadata(tblProps = Map("delta.enableTypeWidening" -> "false")), false), + ("typeWidening", testMetadata(tblProps = Map("delta.enableTypeWidening" -> "true")), true), + ("typeWidening", testMetadata(tblProps = Map("delta.enableTypeWidening" -> "false")), false), + ("rowTracking", testMetadata(tblProps = Map("delta.enableRowTracking" -> "true")), true), + ("rowTracking", testMetadata(tblProps = Map("delta.enableRowTracking" -> "false")), false), + ("deletionVectors", + testMetadata(tblProps = Map("delta.enableDeletionVectors" -> "true")), true), + ("deletionVectors", + testMetadata(tblProps = Map("delta.enableDeletionVectors" -> "false")), false), + ("timestampNtz", testMetadata(includeTimestampNtzTypeCol = true), true), + ("timestampNtz", testMetadata(includeTimestampNtzTypeCol = false), false), + ("v2Checkpoint", testMetadata(tblProps = Map("delta.checkpointPolicy" -> "v2")), true), + ("v2Checkpoint", testMetadata(tblProps = Map("delta.checkpointPolicy" -> "classic")), false), + ("icebergCompatV2", + testMetadata(tblProps = Map("delta.enableIcebergCompatV2" -> "true")), true), + ("icebergCompatV2", + testMetadata(tblProps = Map("delta.enableIcebergCompatV2" -> "false")), false), + ("inCommitTimestamp", + testMetadata(tblProps = Map("delta.enableInCommitTimestamps" -> "true")), true), + ("inCommitTimestamp", + testMetadata(tblProps = Map("delta.enableInCommitTimestamps" -> "false")), false) + ).foreach({ case (feature, metadata, expected) => + test(s"metadataRequiresFeatureToBeEnabled - $feature - $metadata") { + val tableFeature = TableFeatures.getTableFeature(feature) + assert(tableFeature.isInstanceOf[FeatureAutoEnabledByMetadata]) + assert(tableFeature.asInstanceOf[FeatureAutoEnabledByMetadata] + .metadataRequiresFeatureToBeEnabled(testProtocol, metadata) == expected) + } + }) + + Seq("domainMetadata", "vacuumProtocolCheck").foreach { feature => + test(s"doesn't support auto enable by metadata: $feature") { + val tableFeature = TableFeatures.getTableFeature(feature) + assert(!tableFeature.isInstanceOf[FeatureAutoEnabledByMetadata]) + } + } + + test("hasKernelReadSupport expected to be true") { + val results = TABLE_FEATURES.stream() + .filter(_.isReaderWriterFeature) + .filter(_.hasKernelReadSupport()) + .collect(toList()).asScala + + val expected = Seq("columnMapping", "v2Checkpoint", "variantType", + "variantType-preview", "typeWidening", "typeWidening-preview", "deletionVectors", + "timestampNtz", "vacuumProtocolCheck") + + assert(results.map(_.featureName()).toSet == expected.toSet) + } + + test("hasKernelWriteSupport expected to be true") { + val results = TABLE_FEATURES.stream() + .filter(_.hasKernelWriteSupport(testMetadata())) + .collect(toList()).asScala + + // checkConstraints, generatedColumns, identityColumns, invariants are writable + // because the metadata has not been set the info that these features are enabled + val expected = Seq("columnMapping", "v2Checkpoint", "deletionVectors", + "vacuumProtocolCheck", "rowTracking", "domainMetadata", "icebergCompatV2", + "inCommitTimestamp", "appendOnly", "invariants", + "checkConstraints", "generatedColumns", "identityColumns" + ) + + assert(results.map(_.featureName()).toSet == expected.toSet) + } + + Seq( + // Test format: feature, metadata, expected value + ("invariants", testMetadata(includeInvaraint = true), false), + ("checkConstraints", testMetadata(tblProps = Map("delta.constraints.a" -> "a = b")), false), + ("generatedColumns", testMetadata(includeGeneratedColumn = true), false), + ("identityColumns", testMetadata(includeIdentityColumn = true), false) + ).foreach({ case (feature, metadata, expected) => + test(s"hasKernelWriteSupport - $feature has metadata") { + val tableFeature = TableFeatures.getTableFeature(feature) + assert(tableFeature.hasKernelWriteSupport(metadata) == expected) + } + }) + + ///////////////////////////////////////////////////////////////////////////////////////////////// + // Legacy tests (will be modified or deleted in subsequent PRs) // + ///////////////////////////////////////////////////////////////////////////////////////////////// test("validate write supported: protocol 1") { checkSupported(createTestProtocol(minWriterVersion = 1)) } @@ -41,19 +188,19 @@ class TableFeaturesSuite extends AnyFunSuite { test("validateWriteSupported: protocol 2 with appendOnly") { checkSupported( createTestProtocol(minWriterVersion = 2), - metadata = createTestMetadata(withAppendOnly = true)) + metadata = testMetadata(tblProps = Map("delta.appendOnly" -> "true"))) } test("validateWriteSupported: protocol 2 with invariants") { checkUnsupported( createTestProtocol(minWriterVersion = 2), - metadata = createTestMetadata(includeVariant = true)) + metadata = testMetadata(includeInvaraint = true)) } test("validateWriteSupported: protocol 2, with appendOnly and invariants") { checkUnsupported( createTestProtocol(minWriterVersion = 2), - metadata = createTestMetadata(includeVariant = true)) + metadata = testMetadata(includeInvaraint = true)) } Seq(3, 4, 5, 6).foreach { minWriterVersion => @@ -95,19 +242,19 @@ class TableFeaturesSuite extends AnyFunSuite { test("validateWriteSupported: protocol 7 with invariants, schema contains invariants") { checkUnsupported( createTestProtocol(minWriterVersion = 7, "invariants"), - metadata = createTestMetadata(includeVariant = true) + metadata = testMetadata(includeInvaraint = true) ) } def checkSupported( protocol: Protocol, - metadata: Metadata = createTestMetadata()): Unit = { + metadata: Metadata = testMetadata()): Unit = { validateWriteSupportedTable(protocol, metadata, "/test/table") } def checkUnsupported( protocol: Protocol, - metadata: Metadata = createTestMetadata()): Unit = { + metadata: Metadata = testMetadata()): Unit = { intercept[KernelException] { validateWriteSupportedTable(protocol, metadata, "/test/table") } @@ -124,13 +271,19 @@ class TableFeaturesSuite extends AnyFunSuite { ) } - def createTestMetadata( - withAppendOnly: Boolean = false, includeVariant: Boolean = false): Metadata = { - var config: Map[String, String] = Map() - if (withAppendOnly) { - config = Map("delta.appendOnly" -> "true"); - } - val testSchema = createTestSchema(includeVariant); + def testMetadata( + includeInvaraint: Boolean = false, + includeTimestampNtzTypeCol: Boolean = false, + includeVariantTypeCol: Boolean = false, + includeGeneratedColumn: Boolean = false, + includeIdentityColumn: Boolean = false, + tblProps: Map[String, String] = Map.empty): Metadata = { + val testSchema = createTestSchema( + includeInvaraint, + includeTimestampNtzTypeCol, + includeVariantTypeCol, + includeGeneratedColumn, + includeIdentityColumn) new Metadata( "id", Optional.of("name"), @@ -145,18 +298,19 @@ class TableFeaturesSuite extends AnyFunSuite { }, Optional.empty(), new MapValue() { // conf - override def getSize = 1 - - override def getKeys: ColumnVector = singletonStringColumnVector("delta.appendOnly") - - override def getValues: ColumnVector = - singletonStringColumnVector(if (withAppendOnly) "false" else "true") + override def getSize = tblProps.size + override def getKeys: ColumnVector = stringVector(tblProps.toSeq.map(_._1).asJava) + override def getValues: ColumnVector = stringVector(tblProps.toSeq.map(_._2).asJava) } ) } def createTestSchema( - includeInvariant: Boolean = false): StructType = { + includeInvariant: Boolean = false, + includeTimestampNtzTypeCol: Boolean = false, + includeVariantTypeCol: Boolean = false, + includeGeneratedColumn: Boolean = false, + includeIdentityColumn: Boolean = false): StructType = { var structType = new StructType() .add("c1", IntegerType.INTEGER) .add("c2", StringType.STRING) @@ -168,6 +322,31 @@ class TableFeaturesSuite extends AnyFunSuite { .putString("delta.invariants", "{\"expression\": { \"expression\": \"x > 3\"} }") .build()) } + if (includeTimestampNtzTypeCol) { + structType = structType.add("c4", TimestampNTZType.TIMESTAMP_NTZ) + } + if (includeVariantTypeCol) { + structType = structType.add("c5", VariantType.VARIANT) + } + if (includeGeneratedColumn) { + structType = structType.add( + "c6", + IntegerType.INTEGER, + FieldMetadata.builder() + .putString("delta.generationExpression", "{\"expression\": \"c1 + 1\"}") + .build()) + } + if (includeIdentityColumn) { + structType = structType.add( + "c7", + IntegerType.INTEGER, + FieldMetadata.builder() + .putLong("delta.identity.start", 1L) + .putLong("delta.identity.step", 2L) + .putBoolean("delta.identity.allowExplicitInsert", true) + .build()) + } + structType } }