diff --git a/protocol/delta-sharing-protocol-api.yml b/protocol/delta-sharing-protocol-api.yml index 2fa890e97..bb2ff4841 100644 --- a/protocol/delta-sharing-protocol-api.yml +++ b/protocol/delta-sharing-protocol-api.yml @@ -787,7 +787,7 @@ components: # This is not used for the spec but comes handy for autogeneration TableQueryResponseObject: - anyOf: + oneOf: - $ref: '#/components/schemas/ParquetTableQueryResponseObject' - $ref: '#/components/schemas/DeltaTableQueryResponseObject' ParquetTableQueryResponseObject: diff --git a/server/app/build.gradle.kts b/server/app/build.gradle.kts index fce88e2d9..5104765cd 100644 --- a/server/app/build.gradle.kts +++ b/server/app/build.gradle.kts @@ -49,6 +49,7 @@ val serverGeneratorProperties = mapOf( "dateLibrary" to "java8", "disallowAdditionalPropertiesIfNotPresent" to "false", "generateBuilders" to "false", + "legacyDiscriminatorBehavior" to "false", "generatePom" to "false", "interfaceOnly" to "true", "library" to "quarkus", diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java index 04fe8682a..720dfb0a8 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/DeltaMappers.java @@ -5,6 +5,7 @@ import io.whitefox.core.*; import io.whitefox.core.Schema; import io.whitefox.core.Share; +import io.whitefox.core.services.DeltaSharingCapabilities; import java.util.*; import java.util.stream.Collectors; @@ -60,44 +61,44 @@ public static TableQueryResponseObject readTableResult2api(ReadTableResult readT private static ParquetMetadataObject metadata2Api(Metadata metadata) { return new ParquetMetadataObject() - .metaData(new ParquetMetadataObjectMetaData() - .numFiles(metadata.numFiles().orElse(null)) - .version(metadata.version().orElse(null)) - .size(metadata.size().orElse(null)) - .id(metadata.id()) - .name(metadata.name().orElse(null)) - .description(metadata.description().orElse(null)) - .format(new ParquetFormatObject().provider(metadata.format().provider())) - .schemaString(metadata.tableSchema().structType().toJson()) - .partitionColumns(metadata.partitionColumns()) - ._configuration(metadata.configuration())); + .metaData(new ParquetMetadataObjectMetaData() + .numFiles(metadata.numFiles().orElse(null)) + .version(metadata.version()) + .size(metadata.size().orElse(null)) + .id(metadata.id()) + .name(metadata.name().orElse(null)) + .description(metadata.description().orElse(null)) + .format(new ParquetFormatObject().provider(metadata.format().provider())) + .schemaString(metadata.tableSchema().structType().toJson()) + .partitionColumns(metadata.partitionColumns()) + ._configuration(metadata.configuration())); } private static DeltaProtocolObject protocol2Api(Protocol protocol) { return new DeltaProtocolObject() - .protocol(new DeltaProtocolObjectProtocol() - .deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol() - .minReaderVersion(protocol.minReaderVersion().orElse(1)) - .minWriterVersion(protocol.minWriterVersion().orElse(1)))); + .protocol(new DeltaProtocolObjectProtocol() + .deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol() + .minReaderVersion(protocol.minReaderVersion().orElse(1)) + .minWriterVersion(protocol.minWriterVersion().orElse(1)))); } private static DeltaFileObject file2Api(TableFile f) { return new DeltaFileObject() - .id(f.id()) - .version(f.version().orElse(null)) - .deletionVectorFileId(null) // TODO - .timestamp(f.timestamp().orElse(null)) - .expirationTimestamp(f.expirationTimestamp()) - .deltaSingleAction(new DeltaSingleAction() - ._file(new DeltaAddFileAction() - .id(f.id()) - .url(f.url()) - .partitionValues(f.partitionValues()) - .size(f.size()) - .stats(f.stats().orElse(null)) - .version(f.version().orElse(null)) - .timestamp(f.timestamp().orElse(null)) - .expirationTimestamp(f.expirationTimestamp()))); + .id(f.id()) + .version(f.version().orElse(null)) + .deletionVectorFileId(null) // TODO + .timestamp(f.timestamp().orElse(null)) + .expirationTimestamp(f.expirationTimestamp()) + .deltaSingleAction(new DeltaSingleAction() + ._file(new DeltaAddFileAction() + .id(f.id()) + .url(f.url()) + .partitionValues(f.partitionValues()) + .size(f.size()) + .stats(f.stats().orElse(null)) + .version(f.version().orElse(null)) + .timestamp(f.timestamp().orElse(null)) + .expirationTimestamp(f.expirationTimestamp()))); } public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest( @@ -113,28 +114,16 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api( .schema(sharedTable.schema()); } - /** - * NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the - * protocol - * ---- - * Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header - * that will be set in the response w/r/t the one received in the request. - * If the request did not contain any, we will return an empty one. - */ - public static Map toHeaderCapabilitiesMap(String headerCapabilities) { - if (headerCapabilities == null) { - return Map.of(); - } - return Arrays.stream(headerCapabilities.toLowerCase().split(";")) - .map(h -> h.split("=")) - .filter(h -> h.length == 2) - .map(splits -> Map.entry(splits[0], splits[1])) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - public static TableMetadataResponseObject toTableResponseMetadata(Metadata m) { return new TableMetadataResponseObject() - .protocol(new ParquetProtocolObject().protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1))) + .protocol(new ParquetProtocolObject() + .protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1))) .metadata(metadata2Api(m)); } + + public static String toCapabilitiesHeader(DeltaSharingCapabilities deltaSharingCapabilities) { + return deltaSharingCapabilities.values().entrySet().stream() + .map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue())) + .collect(Collectors.joining(";")); + } } diff --git a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java index 8bb0b18ca..4775df8e5 100644 --- a/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java +++ b/server/app/src/main/java/io/whitefox/api/deltasharing/server/DeltaSharesApiImpl.java @@ -14,6 +14,7 @@ import io.whitefox.api.server.ApiUtils; import io.whitefox.core.services.ContentAndToken; import io.whitefox.core.services.DeltaSharesService; +import io.whitefox.core.services.DeltaSharingCapabilities; import io.whitefox.core.services.ShareService; import jakarta.inject.Inject; import jakarta.ws.rs.core.MediaType; @@ -60,8 +61,9 @@ public Response getTableChanges( Integer startingVersion, Integer endingVersion, String endingTimestamp, - Boolean includeHistoricalMetadata) { - return Response.ok().build(); + Boolean includeHistoricalMetadata, + String deltaSharingCapabilities) { + return Response.status(Response.Status.NOT_IMPLEMENTED).build(); } @Override @@ -72,20 +74,22 @@ public Response getTableMetadata( String startingTimestamp, String deltaSharingCapabilities) { return wrapExceptions( - () -> optionalToNotFound( - deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp), - m -> optionalToNotFound( - deltaSharesService.getTableVersion(share, schema, table, startingTimestamp), - v -> Response.ok( - tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)), - ndjsonMediaType) - .status(Response.Status.OK.getStatusCode()) - .header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v)) - .header( - DELTA_SHARE_CAPABILITIES_HEADER, - getResponseFormatHeader( - DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities))) - .build())), + () -> { + DeltaSharingCapabilities requestCapabilities = + new DeltaSharingCapabilities(deltaSharingCapabilities); + return optionalToNotFound( + deltaSharesService.getTableMetadata( + share, schema, table, startingTimestamp, requestCapabilities), + m -> Response.ok( + tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)), + ndjsonMediaType) + .status(Response.Status.OK.getStatusCode()) + .header(DELTA_TABLE_VERSION_HEADER, String.valueOf(m.version())) + .header( + DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER, + DeltaMappers.toCapabilitiesHeader(m.tableCapabilities())) + .build()); + }, exceptionToResponse); } @@ -193,20 +197,25 @@ public Response queryTable( return wrapExceptions( () -> optionalToNotFound( deltaSharesService.getTableVersion(share, schema, table, startingTimestamp), - version -> Response.ok( - tableQueryResponseSerializer.serialize( - DeltaMappers.readTableResult2api(deltaSharesService.queryTable( - share, - schema, - table, - DeltaMappers.api2ReadTableRequest(queryRequest)))), - ndjsonMediaType) - .header(DELTA_TABLE_VERSION_HEADER, version) - .header( - DELTA_SHARE_CAPABILITIES_HEADER, - getResponseFormatHeader( - DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities))) - .build()), + version -> { + var capabilities = new DeltaSharingCapabilities(deltaSharingCapabilities); + var readTableResult = deltaSharesService.queryTable( + share, + schema, + table, + DeltaMappers.api2ReadTableRequest(queryRequest), + capabilities); + return Response.ok( + tableQueryResponseSerializer.serialize( + DeltaMappers.readTableResult2api(readTableResult)), + ndjsonMediaType) + .header(DELTA_TABLE_VERSION_HEADER, version) + .header( + DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER, + DeltaMappers.toCapabilitiesHeader( + readTableResult.metadata().tableCapabilities())) + .build(); + }), exceptionToResponse); } diff --git a/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java b/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java index b5ece4313..1cb765de5 100644 --- a/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java +++ b/server/app/src/main/java/io/whitefox/api/server/ApiUtils.java @@ -60,12 +60,6 @@ default Response optionalToNotFound(Optional opt, Function f return opt.map(fn).orElse(notFoundResponse()); } - default String getResponseFormatHeader(Map deltaSharingCapabilities) { - return String.format( - "%s=%s", - DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, getResponseFormat(deltaSharingCapabilities)); - } - default String getResponseFormat(Map deltaSharingCapabilities) { return deltaSharingCapabilities.getOrDefault( DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, diff --git a/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java b/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java index 6c261d1a0..725f186fe 100644 --- a/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java +++ b/server/app/src/main/java/io/whitefox/api/server/DeltaHeaders.java @@ -1,7 +1,8 @@ package io.whitefox.api.server; +import io.whitefox.core.services.DeltaSharingCapabilities; + public interface DeltaHeaders { - String DELTA_SHARING_RESPONSE_FORMAT = "responseformat"; + String DELTA_SHARING_RESPONSE_FORMAT = DeltaSharingCapabilities.DELTA_SHARING_RESPONSE_FORMAT; String DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version"; - String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities"; } diff --git a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java index ac5dd4d36..8888dee33 100644 --- a/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java +++ b/server/app/src/main/java/io/whitefox/api/server/WhitefoxMappers.java @@ -173,6 +173,7 @@ public static MetastoreType api2MetastoreType( } } + // TODO we need to resolve the json schema anyOf/oneOf problem private static MetadataObject metadata2Api(Metadata metadata) { return new MetadataObject() .metaData(new MetadataObjectMetaData() diff --git a/server/core/src/main/java/io/whitefox/core/Metadata.java b/server/core/src/main/java/io/whitefox/core/Metadata.java index 79c6eaaf7..e24e7e657 100644 --- a/server/core/src/main/java/io/whitefox/core/Metadata.java +++ b/server/core/src/main/java/io/whitefox/core/Metadata.java @@ -1,147 +1,277 @@ package io.whitefox.core; import io.whitefox.annotations.SkipCoverageGenerated; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Optional; - -public class Metadata { - private final String id; - private final Optional name; - private final Optional description; - private final Format format; - private final TableSchema tableSchema; - private final List partitionColumns; - private final Map configuration; - private final Optional version; - private final Optional size; - private final Optional numFiles; - - public enum Format { - PARQUET("parquet"); +import io.whitefox.core.services.DeltaSharingCapabilities; +import io.whitefox.core.types.StructType; +import shadedelta.org.apache.parquet.hadoop.metadata.ParquetMetadata; +import java.util.*; +import java.util.stream.Collectors; + +public abstract class Metadata { + Metadata() { + } + + ParquetMetadata parquet(String id, Optional name, Optional description, ParquetFormat format, StructType schema, List partitionColumns, Map configuration, Optional version, Optional size, Optional numFiles) { + return new ParquetMetadata(id, name, description, format, schema, partitionColumns, configuration, version, size, numFiles); + } + + DeltaMetadata delta(Optional version, Optional size, Optional numFiles, String id, Optional name, Optional description, DeltaFormat format, StructType schema, List partitionColumns, Optional createdTime, Map configuration) { + return new DeltaMetadata(version, size, numFiles, id, name, description, format, schema, partitionColumns, createdTime, configuration); + } + + public static class DeltaFormat { private final String provider; + private final Map options; - private Format(final String provider) { + public DeltaFormat(String provider, Map options) { this.provider = provider; + this.options = Collections.unmodifiableMap(options); } public String provider() { - return this.provider; + return provider; } - } - public Metadata( - String id, - Optional name, - Optional description, - Format format, - TableSchema tableSchema, - List partitionColumns, - Map configuration, - Optional version, - Optional size, - Optional numFiles) { - this.id = id; - this.name = name; - this.description = description; - this.format = format; - this.tableSchema = tableSchema; - this.partitionColumns = partitionColumns; - this.configuration = configuration; - this.version = version; - this.size = size; - this.numFiles = numFiles; - } + public Map options() { + return options; + } - public String id() { - return id; - } + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaFormat that = (DeltaFormat) o; + return Objects.equals(provider, that.provider) && Objects.equals(options, that.options); + } - public Optional name() { - return name; - } + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(provider, options); + } - public Optional description() { - return description; + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaFormat{" + + "provider='" + provider + '\'' + + ", options=" + options + + '}'; + } } - public Format format() { - return format; - } + public static class DeltaMetadata extends Metadata { + private final Optional version; + private final Optional size; + private final Optional numFiles; + private final String id; + private final Optional name; + private final Optional description; + private final DeltaFormat format; + private final StructType schema; + private final List partitionColumns; + private final Optional createdTime; + private final Map configuration; - public TableSchema tableSchema() { - return tableSchema; - } + DeltaMetadata(Optional version, Optional size, Optional numFiles, String id, Optional name, Optional description, DeltaFormat format, StructType schema, List partitionColumns, Optional createdTime, Map configuration) { + this.version = version; + this.size = size; + this.numFiles = numFiles; + this.id = id; + this.name = name; + this.description = description; + this.format = format; + this.schema = schema; + this.partitionColumns = partitionColumns; + this.createdTime = createdTime; + this.configuration = configuration; + } - public List partitionColumns() { - return partitionColumns; - } + public Optional version() { + return version; + } - public Map configuration() { - return configuration; - } + public Optional size() { + return size; + } - public Optional version() { - return version; - } + public Optional numFiles() { + return numFiles; + } - public Optional size() { - return size; - } + public String id() { + return id; + } - public Optional numFiles() { - return numFiles; - } + public Optional name() { + return name; + } - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Metadata metadata = (Metadata) o; - return Objects.equals(id, metadata.id) - && Objects.equals(name, metadata.name) - && Objects.equals(description, metadata.description) - && format == metadata.format - && Objects.equals(tableSchema, metadata.tableSchema) - && Objects.equals(partitionColumns, metadata.partitionColumns) - && Objects.equals(configuration, metadata.configuration) - && Objects.equals(version, metadata.version) - && Objects.equals(size, metadata.size) - && Objects.equals(numFiles, metadata.numFiles); - } + public Optional description() { + return description; + } + + public DeltaFormat format() { + return format; + } + + public StructType schema() { + return schema; + } + + public List partitionColumns() { + return partitionColumns; + } + + public Optional createdTime() { + return createdTime; + } + + public Map configuration() { + return configuration; + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaMetadata{" + + "version=" + version + + ", size=" + size + + ", numFiles=" + numFiles + + ", id='" + id + '\'' + + ", name=" + name + + ", description=" + description + + ", format=" + format + + ", schema=" + schema + + ", partitionColumns=" + partitionColumns + + ", createdTime=" + createdTime + + ", configuration=" + configuration + + '}'; + } - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash( - id, - name, - description, - format, - tableSchema, - partitionColumns, - configuration, - version, - size, - numFiles); + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaMetadata that = (DeltaMetadata) o; + return Objects.equals(version, that.version) && Objects.equals(size, that.size) && Objects.equals(numFiles, that.numFiles) && Objects.equals(id, that.id) && Objects.equals(name, that.name) && Objects.equals(description, that.description) && Objects.equals(format, that.format) && Objects.equals(schema, that.schema) && Objects.equals(partitionColumns, that.partitionColumns) && Objects.equals(createdTime, that.createdTime) && Objects.equals(configuration, that.configuration); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(version, size, numFiles, id, name, description, format, schema, partitionColumns, createdTime, configuration); + } } - @Override - @SkipCoverageGenerated - public String toString() { - return "Metadata{" + "id='" - + id + '\'' + ", name=" - + name + ", description=" - + description + ", format=" - + format + ", tableSchema=" - + tableSchema + ", partitionColumns=" - + partitionColumns + ", configuration=" - + configuration + ", version=" - + version + ", size=" - + size + ", numFiles=" - + numFiles + '}'; + public static class ParquetMetadata extends Metadata { + private final String id; + private final Optional name; + private final Optional description; + private final ParquetFormat format; + private final StructType schema; + private final List partitionColumns; + private final Map configuration; + private final Optional version; + private final Optional size; + private final Optional numFiles; + + ParquetMetadata(String id, Optional name, Optional description, ParquetFormat format, StructType schema, List partitionColumns, Map configuration, Optional version, Optional size, Optional numFiles) { + this.id = id; + this.name = name; + this.description = description; + this.format = format; + this.schema = schema; + this.partitionColumns = partitionColumns; + this.configuration = configuration; + this.version = version; + this.size = size; + this.numFiles = numFiles; + HashSet partitionColumnSet = new HashSet<>(schema.fieldNames()); + for (String partitionColumn : partitionColumns) { + if (!partitionColumnSet.contains(partitionColumn)){ + throw new IllegalArgumentException( + String.format("Partition column %s is not part of schema %s", partitionColumn, schema) + ); + } + } + } + + public String id() { + return id; + } + + public Optional name() { + return name; + } + + public Optional description() { + return description; + } + + public ParquetFormat format() { + return format; + } + + public StructType schema() { + return schema; + } + + public List partitionColumns() { + return partitionColumns; + } + + public Map configuration() { + return configuration; + } + + public Optional version() { + return version; + } + + public Optional size() { + return size; + } + + public Optional numFiles() { + return numFiles; + } + + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ParquetMetadata that = (ParquetMetadata) o; + return Objects.equals(id, that.id) && Objects.equals(name, that.name) && Objects.equals(description, that.description) && format == that.format && Objects.equals(schema, that.schema) && Objects.equals(partitionColumns, that.partitionColumns) && Objects.equals(configuration, that.configuration) && Objects.equals(version, that.version) && Objects.equals(size, that.size) && Objects.equals(numFiles, that.numFiles); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(id, name, description, format, schema, partitionColumns, configuration, version, size, numFiles); + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "ParquetMetadata{" + + "id='" + id + '\'' + + ", name=" + name + + ", description=" + description + + ", format=" + format + + ", schema=" + schema + + ", partitionColumns=" + partitionColumns + + ", configuration=" + configuration + + ", version=" + version + + ", size=" + size + + ", numFiles=" + numFiles + + '}'; + } } + + public enum ParquetFormat {PARQUET_FORMAT} } diff --git a/server/core/src/main/java/io/whitefox/core/Protocol.java b/server/core/src/main/java/io/whitefox/core/Protocol.java index 232e16488..2908173cd 100644 --- a/server/core/src/main/java/io/whitefox/core/Protocol.java +++ b/server/core/src/main/java/io/whitefox/core/Protocol.java @@ -1,47 +1,89 @@ package io.whitefox.core; import io.whitefox.annotations.SkipCoverageGenerated; +import io.whitefox.core.services.DeltaSharingCapabilities; + import java.util.Objects; -import java.util.Optional; - -public class Protocol { - private final Optional minReaderVersion; - private final Optional minWriterVersion; - - public Protocol(Optional minReaderVersion, Optional minWriterVersion) { - this.minReaderVersion = minReaderVersion; - this.minWriterVersion = minWriterVersion; - } - - public Optional minReaderVersion() { - return minReaderVersion; - } - - public Optional minWriterVersion() { - return minWriterVersion; - } - - @Override - @SkipCoverageGenerated - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - Protocol protocol = (Protocol) o; - return Objects.equals(minReaderVersion, protocol.minReaderVersion) - && Objects.equals(minWriterVersion, protocol.minWriterVersion); - } - - @Override - @SkipCoverageGenerated - public int hashCode() { - return Objects.hash(minReaderVersion, minWriterVersion); - } - - @Override - @SkipCoverageGenerated - public String toString() { - return "Protocol{" + "minReaderVersion=" - + minReaderVersion + ", minWriterVersion=" - + minWriterVersion + '}'; - } -} +import java.util.Set; + +public abstract class Protocol { + Protocol() { + } + + Protocol delta(int minReaderVersion, int minWriterVersion, Set readerFeatures, Set writerFeatures) { + return new DeltaProtocol(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures); + } + + ParquetProtocol parquet() { + return ParquetProtocol.INSTANCE; + } + + + public static class DeltaProtocol extends Protocol { + private final int minReaderVersion; + private final int minWriterVersion; + + private final Set readerFeatures; + + private final Set writerFeatures; + + DeltaProtocol(int minReaderVersion, + int minWriterVersion, + Set readerFeatures, + Set writerFeatures) { + this.minReaderVersion = minReaderVersion; + this.minWriterVersion = minWriterVersion; + this.readerFeatures = readerFeatures; + this.writerFeatures = writerFeatures; + } + + public int minReaderVersion() { + return minReaderVersion; + } + + public int minWriterVersion() { + return minWriterVersion; + } + + public Set readerFeatures() { + return readerFeatures; + } + + public Set writerFeatures() { + return writerFeatures; + } + + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaProtocol that = (DeltaProtocol) o; + return minReaderVersion == that.minReaderVersion && minWriterVersion == that.minWriterVersion && Objects.equals(readerFeatures, that.readerFeatures) && Objects.equals(writerFeatures, that.writerFeatures); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(minReaderVersion, minWriterVersion, readerFeatures, writerFeatures); + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaProtocolObject{" + + "minReaderVersion=" + minReaderVersion + + ", minWriterVersion=" + minWriterVersion + + ", readerFeatures=" + readerFeatures + + ", writerFeatures=" + writerFeatures + + '}'; + } + } + + public static class ParquetProtocol extends Protocol { + private ParquetProtocol(){} + + public static final ParquetProtocol INSTANCE = new ParquetProtocol(); + } + +} \ No newline at end of file diff --git a/server/core/src/main/java/io/whitefox/core/TableFile.java b/server/core/src/main/java/io/whitefox/core/TableFile.java index 42c897abd..37b0a1f0e 100644 --- a/server/core/src/main/java/io/whitefox/core/TableFile.java +++ b/server/core/src/main/java/io/whitefox/core/TableFile.java @@ -6,7 +6,6 @@ import java.util.Optional; public class TableFile { - private final String url; private final String id; private final long size; diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java index 6ca328ef7..99a32506a 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharedTable.java @@ -39,6 +39,7 @@ public static DeltaSharedTable of( var dt = DeltaLog.forTable( hadoopConfigBuilder.buildConfig(sharedTable.internalTable().provider().storage()), dataPath); + if (!dt.tableExists()) { throw new IllegalArgumentException( String.format("Cannot find a delta table at %s", dataPath)); @@ -54,31 +55,52 @@ public static DeltaSharedTable of(SharedTable sharedTable) { return of(sharedTable, TableSchemaConverter.INSTANCE, new HadoopConfigBuilder()); } - public Optional getMetadata(Optional startingTimestamp) { - return getSnapshot(startingTimestamp).map(this::metadataFromSnapshot); + public Optional getMetadata( + Optional startingTimestamp, DeltaSharingCapabilities deltaSharingCapabilities) { + return getSnapshot(startingTimestamp) + .map(snapshot -> metadataFromSnapshot(snapshot, deltaSharingCapabilities)); } - private Metadata metadataFromSnapshot(Snapshot snapshot) { + private Metadata metadataFromSnapshot( + Snapshot snapshot, DeltaSharingCapabilities requestCapabilities) { + return new Metadata( snapshot.getMetadata().getId(), Optional.of(tableDetails.name()), Optional.ofNullable(snapshot.getMetadata().getDescription()), - Metadata.Format.PARQUET, + Metadata.Format.PARQUET, // TODO this depends on the table new TableSchema(tableSchemaConverter.convertDeltaSchemaToWhitefox( snapshot.getMetadata().getSchema())), snapshot.getMetadata().getPartitionColumns(), snapshot.getMetadata().getConfiguration(), - Optional.of(snapshot.getVersion()), + snapshot.getVersion(), Optional.empty(), // size is fine to be empty - Optional.empty() // numFiles is ok to be empty here too + Optional.empty(), // numFiles is ok to be empty here too + compatibleCapabilities( + DeltaSharingCapabilities.defaultValue(), + requestCapabilities) // TODO read it from the table ); } + private DeltaSharingCapabilities compatibleCapabilities( + DeltaSharingCapabilities tableCapabilities, DeltaSharingCapabilities requestCapabilities) { + if (requestCapabilities + .getResponseFormat() + .contains(DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA)) { + return tableCapabilities.withResponseFormat( + DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA); + } else { + return tableCapabilities.withResponseFormat( + DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET); + } + } + public Optional getTableVersion(Optional startingTimestamp) { return getSnapshot(startingTimestamp).map(Snapshot::getVersion); } - public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { + public ReadTableResultToBeSigned queryTable( + ReadTableRequest readTableRequest, DeltaSharingCapabilities requestCapabilities) { Snapshot snapshot; if (readTableRequest instanceof ReadTableRequest.ReadTableCurrentVersion) { snapshot = deltaLog.snapshot(); @@ -92,8 +114,8 @@ public ReadTableResultToBeSigned queryTable(ReadTableRequest readTableRequest) { throw new IllegalArgumentException("Unknown ReadTableRequest type: " + readTableRequest); } return new ReadTableResultToBeSigned( - new Protocol(Optional.of(1), Optional.of(1)), - metadataFromSnapshot(snapshot), + new Protocol(Optional.of(1), Optional.of(1)), // TODO + metadataFromSnapshot(snapshot, requestCapabilities), snapshot.getAllFiles().stream() .map(f -> new TableFileToBeSigned( location() + "/" + f.getPath(), diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java index 82c2baba7..4434707e9 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesService.java @@ -10,27 +10,67 @@ public interface DeltaSharesService { + /** + * @return the table version if it exists, otherwise {@link Optional#empty} + */ Optional getTableVersion( String share, String schema, String table, String startingTimestamp); + /** + * @return a list (up to maxResults size) of {@link Share} and a token to retrieve the next page. + * The listing will start from the token passed as nextPageToken (if any, otherwise from the first). + */ ContentAndToken> listShares( Optional nextPageToken, Optional maxResults); + /** + * @return the table metadata if exists, otherwise {@link Optional#empty}. + * This method will also evaluate the input deltaSharingCapabilities and match + * them with the actual table capabilities. + */ Optional getTableMetadata( - String share, String schema, String table, String startingTimestamp); + String share, + String schema, + String table, + String startingTimestamp, + DeltaSharingCapabilities deltaSharingCapabilities); + /** + * @return a list (up to maxResults size) of {@link Schema} that are part of input share + * and a token to retrieve the next page. The listing will start from the token passed as nextPageToken + * (if any, otherwise from the first). If the share does not exist, it will return {@link Optional#empty()} + */ Optional>> listSchemas( String share, Optional nextPageToken, Optional maxResults); + /** + * @return a list (up to maxResults size) of {@link SharedTable} that are part of input share and schema + * and a token to retrieve the next page. The listing will start from the token passed as nextPageToken + * (if any, otherwise from the first). If the share or the schema does not exist, + * it will return {@link Optional#empty()} + */ Optional>> listTables( String share, String schema, Optional nextPageToken, Optional maxResults); + /** + * @return a list (up to maxResults size) of {@link SharedTable} that are part of input share + * and a token to retrieve the next page. The listing will start from the token passed as nextPageToken + * (if any, otherwise from the first). If the share does not exist, it will return {@link Optional#empty()} + */ Optional>> listTablesOfShare( String share, Optional token, Optional maxResults); + /** + * @return metadata needed to read the table from a delta-sharing client. + * This method will also evaluate the input deltaSharingCapabilities and match them with the actual table capabilities. + */ ReadTableResult queryTable( - String share, String schema, String table, ReadTableRequest queryRequest); + String share, + String schema, + String table, + ReadTableRequest queryRequest, + DeltaSharingCapabilities capabilities); } diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java index 23bde9a77..5fb6eeee0 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharesServiceImpl.java @@ -58,10 +58,14 @@ public ContentAndToken> listShares( @Override public Optional getTableMetadata( - String share, String schema, String table, String startingTimestamp) { - return storageManager - .getSharedTable(share, schema, table) - .flatMap(t -> tableLoader.loadTable(t).getMetadata(Optional.ofNullable(startingTimestamp))); + String share, + String schema, + String table, + String startingTimestamp, + DeltaSharingCapabilities deltaSharingCapabilities) { + return storageManager.getSharedTable(share, schema, table).flatMap(t -> tableLoader + .loadTable(t) + .getMetadata(Optional.ofNullable(startingTimestamp), deltaSharingCapabilities)); } @Override @@ -120,7 +124,11 @@ public Optional>> listTablesOfShare( @Override public ReadTableResult queryTable( - String share, String schema, String tableName, ReadTableRequest queryRequest) { + String share, + String schema, + String tableName, + ReadTableRequest queryRequest, + DeltaSharingCapabilities requestCapabilities) { SharedTable sharedTable = storageManager .getSharedTable(share, schema, tableName) .orElseThrow(() -> new TableNotFound(String.format( @@ -128,7 +136,8 @@ public ReadTableResult queryTable( var fileSigner = fileSignerFactory.newFileSigner(sharedTable.internalTable().provider().storage()); - var readTableResultToBeSigned = tableLoader.loadTable(sharedTable).queryTable(queryRequest); + var readTableResultToBeSigned = + tableLoader.loadTable(sharedTable).queryTable(queryRequest, requestCapabilities); return new ReadTableResult( readTableResultToBeSigned.protocol(), readTableResultToBeSigned.metadata(), diff --git a/server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java b/server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java index 5d93e44eb..461cc5ed5 100644 --- a/server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java +++ b/server/core/src/main/java/io/whitefox/core/services/DeltaSharingCapabilities.java @@ -1,62 +1,153 @@ package io.whitefox.core.services; -import java.util.Arrays; -import java.util.Map; -import java.util.Set; +import io.whitefox.annotations.SkipCoverageGenerated; +import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class DeltaSharingCapabilities { - private final Map values; + private static final Logger log = LoggerFactory.getLogger(DeltaSharingCapabilities.class); + private final Map> values; - public DeltaSharingCapabilities(Map values) { - this.values = Map.copyOf(values); + public Map> values() { + return values; } - public Map values() { - return values; + public static final String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities"; + + public DeltaSharingCapabilities(Map> values) { + this.values = Collections.unmodifiableMap(values); + } + + public static DeltaSharingCapabilities defaultValue() { + return new DeltaSharingCapabilities((String) null); + } + + public DeltaSharingCapabilities(String s) { + this(parseDeltaSharingCapabilities(s)); + } + + public DeltaSharingCapabilities withResponseFormat( + DeltaSharingCapabilities.DeltaSharingResponseFormat format) { + return new DeltaSharingCapabilities(Stream.concat( + values.entrySet().stream(), + Stream.of(Map.entry(DELTA_SHARING_RESPONSE_FORMAT, Set.of(format.toString())))) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))); + } + + private static Map> parseDeltaSharingCapabilities(String s) { + if (s == null) { + return Map.of(); + } else { + return Arrays.stream(s.split(";", -1)) + .flatMap(entry -> { + if (StringUtils.isBlank(entry)) { + return Stream.empty(); + } + var keyAndValues = entry.split("=", -1); + if (keyAndValues.length != 2) { + throw new IllegalArgumentException(String.format( + "Each %s must be in the format key=value", DELTA_SHARE_CAPABILITIES_HEADER)); + } + var key = keyAndValues[0]; + var values = Arrays.stream(keyAndValues[1].split(",", -1)) + .collect(Collectors.toUnmodifiableSet()); + return Stream.of(Map.entry(key, values)); + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } } - public DeltaSharingResponseFormat getResponseFormat() { + public Set getResponseFormat() { var value = values.get(DELTA_SHARING_RESPONSE_FORMAT); - if (value == null) { - return DeltaSharingResponseFormat.PARQUET; + if (value == null || value.isEmpty()) { + return Set.of(DeltaSharingResponseFormat.PARQUET); } else { - return DeltaSharingResponseFormat.valueOf(value.toUpperCase()); + return value.stream() + .flatMap(s -> { + try { + return Stream.of(DeltaSharingResponseFormat.valueOf(s.toUpperCase())); + } catch (IllegalArgumentException e) { + log.warn("Ignoring unknown {}: {}", DELTA_SHARING_RESPONSE_FORMAT, s); + return Stream.empty(); + } + }) + .collect(Collectors.toUnmodifiableSet()); } } - public Set getReaderFeatures() { + public Set getReaderFeatures() { var value = values.get(DELTA_SHARING_READER_FEATURES); - if (value == null) { + if (value == null || value.isEmpty()) { return Set.of(); } else { - return Arrays.stream(value.split(",")) - .map(String::trim) - .map(DeltaSharingReaderFeatures::fromString) + return value.stream() + .flatMap(s -> { + try { + return Stream.of(DeltaSharingFeatures.fromString(s)); + } catch (IllegalArgumentException e) { + log.warn("Ignoring unknown {}: {}", DELTA_SHARING_READER_FEATURES, s); + return Stream.empty(); + } + }) .collect(Collectors.toSet()); } } + @Override + @SkipCoverageGenerated + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + DeltaSharingCapabilities that = (DeltaSharingCapabilities) o; + return Objects.equals(values, that.values); + } + + @Override + @SkipCoverageGenerated + public int hashCode() { + return Objects.hash(values); + } + + @Override + @SkipCoverageGenerated + public String toString() { + return "DeltaSharingCapabilities{" + "values=" + values + '}'; + } + public static final String DELTA_SHARING_RESPONSE_FORMAT = "responseformat"; - public static final String DELTA_SHARING_READER_FEATURES = "readerfeatures"; public enum DeltaSharingResponseFormat { DELTA, PARQUET } - public enum DeltaSharingReaderFeatures { - DELETION_VECTORS("deletionvectors"), - COLUMN_MAPPING("columnmapping"), - TIMESTAMP_NTZ("timestampntz"), - DOMAIN_METADATA("domainmetadata"), - V2CHECKPOINT("v2checkpoint"), - CHECK_CONSTRAINTS("checkconstraints"), - GENERATED_COLUMNS("generatedcolumns"), - ALLOW_COLUMN_DEFAULTS("allowcolumndefaults"), - IDENTITY_COLUMNS("identitycolumns"); - - DeltaSharingReaderFeatures(String stringRepresentation) { + public static final String DELTA_SHARING_READER_FEATURES = "readerfeatures"; + static final String DELTA_SHARING_READER_FEATURE_DELETION_VECTOR = "deletionvectors"; + static final String DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING = "columnmapping"; + static final String DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ = "timestampntz"; + static final String DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA = "domainmetadata"; + static final String DELTA_SHARING_READER_FEATURE_V2CHECKPOINT = "v2checkpoint"; + static final String DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS = "checkconstraints"; + static final String DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS = "generatedcolumns"; + static final String DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS = "allowcolumndefaults"; + static final String DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS = "identitycolumns"; + + public enum DeltaSharingFeatures { + DELETION_VECTORS(DELTA_SHARING_READER_FEATURE_DELETION_VECTOR), + COLUMN_MAPPING(DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING), + TIMESTAMP_NTZ(DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ), + DOMAIN_METADATA(DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA), + V2CHECKPOINT(DELTA_SHARING_READER_FEATURE_V2CHECKPOINT), + CHECK_CONSTRAINTS(DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS), + GENERATED_COLUMNS(DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS), + ALLOW_COLUMN_DEFAULTS(DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS), + IDENTITY_COLUMNS(DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS); + + DeltaSharingFeatures(String stringRepresentation) { this.stringRepresentation = stringRepresentation; } @@ -66,40 +157,29 @@ public String stringRepresentation() { return stringRepresentation; } - public static DeltaSharingReaderFeatures fromString(String s) { - switch (s) { + public static DeltaSharingFeatures fromString(String s) { + switch (s.toLowerCase()) { case DELTA_SHARING_READER_FEATURE_DELETION_VECTOR: - return DeltaSharingReaderFeatures.DELETION_VECTORS; + return DeltaSharingFeatures.DELETION_VECTORS; case DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING: - return DeltaSharingReaderFeatures.COLUMN_MAPPING; + return DeltaSharingFeatures.COLUMN_MAPPING; case DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ: - return DeltaSharingReaderFeatures.TIMESTAMP_NTZ; + return DeltaSharingFeatures.TIMESTAMP_NTZ; case DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA: - return DeltaSharingReaderFeatures.DOMAIN_METADATA; + return DeltaSharingFeatures.DOMAIN_METADATA; case DELTA_SHARING_READER_FEATURE_V2CHECKPOINT: - return DeltaSharingReaderFeatures.V2CHECKPOINT; + return DeltaSharingFeatures.V2CHECKPOINT; case DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS: - return DeltaSharingReaderFeatures.CHECK_CONSTRAINTS; + return DeltaSharingFeatures.CHECK_CONSTRAINTS; case DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS: - return DeltaSharingReaderFeatures.GENERATED_COLUMNS; + return DeltaSharingFeatures.GENERATED_COLUMNS; case DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS: - return DeltaSharingReaderFeatures.ALLOW_COLUMN_DEFAULTS; + return DeltaSharingFeatures.ALLOW_COLUMN_DEFAULTS; case DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS: - return DeltaSharingReaderFeatures.IDENTITY_COLUMNS; + return DeltaSharingFeatures.IDENTITY_COLUMNS; default: throw new IllegalArgumentException("Unknown reader feature: " + s); } } - - private static final String DELTA_SHARING_READER_FEATURE_DELETION_VECTOR = "deletionvectors"; - private static final String DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING = "columnmapping"; - private static final String DELTA_SHARING_READER_FEATURE_TIMESTAMP_NTZ = "timestampntz"; - private static final String DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA = "domainmetadata"; - private static final String DELTA_SHARING_READER_FEATURE_V2CHECKPOINT = "v2checkpoint"; - private static final String DELTA_SHARING_READER_FEATURE_CHECK_CONSTRAINTS = "checkconstraints"; - private static final String DELTA_SHARING_READER_FEATURE_GENERATED_COLUMNS = "generatedcolumns"; - private static final String DELTA_SHARING_READER_FEATURE_ALLOW_COLUMN_DEFAULTS = - "allowcolumndefaults"; - private static final String DELTA_SHARING_READER_FEATURE_IDENTITY_COLUMNS = "identitycolumns"; } } diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java index af39f1a9b..a26729bf8 100644 --- a/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaShareServiceTest.java @@ -200,7 +200,7 @@ public void getTableMetadata() { StorageManager storageManager = new InMemoryStorageManager(shares); DeltaSharesService deltaSharesService = new DeltaSharesServiceImpl(storageManager, 100, loader, fileSignerFactory); - var tableMetadata = deltaSharesService.getTableMetadata("name", "default", "table1", null); + var tableMetadata = deltaSharesService.getTableMetadata("name", "default", "table1", null, DeltaSharingCapabilities.defaultValue()); Assertions.assertTrue(tableMetadata.isPresent()); Assertions.assertEquals( "56d48189-cdbc-44f2-9b0e-2bded4c79ed7", tableMetadata.get().id()); @@ -221,7 +221,7 @@ public void tableMetadataNotFound() { StorageManager storageManager = new InMemoryStorageManager(shares); DeltaSharesService deltaSharesService = new DeltaSharesServiceImpl(storageManager, 100, loader, fileSignerFactory); - var resultTable = deltaSharesService.getTableMetadata("name", "default", "tableNotFound", null); + var resultTable = deltaSharesService.getTableMetadata("name", "default", "tableNotFound", null, DeltaSharingCapabilities.defaultValue()); Assertions.assertTrue(resultTable.isEmpty()); } } diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java index 2a7a3c644..6b219553c 100644 --- a/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaSharedTableTest.java @@ -28,7 +28,7 @@ void getTableVersion() throws ExecutionException, InterruptedException { void getTableMetadata() { var PTable = new SharedTable("delta-table", "default", "share1", deltaTable("delta-table")); var DTable = DeltaSharedTable.of(PTable); - var metadata = DTable.getMetadata(Optional.empty()); + var metadata = DTable.getMetadata(Optional.empty(), DeltaSharingCapabilities.defaultValue()); assertTrue(metadata.isPresent()); assertEquals("56d48189-cdbc-44f2-9b0e-2bded4c79ed7", metadata.get().id()); } diff --git a/server/core/src/test/java/io/whitefox/core/services/DeltaSharingCapabilitiesTest.java b/server/core/src/test/java/io/whitefox/core/services/DeltaSharingCapabilitiesTest.java new file mode 100644 index 000000000..f7866bb20 --- /dev/null +++ b/server/core/src/test/java/io/whitefox/core/services/DeltaSharingCapabilitiesTest.java @@ -0,0 +1,111 @@ +package io.whitefox.core.services; + +import java.util.Set; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DeltaSharingCapabilitiesTest { + + @Test + void parseSimpleResponseFormatDelta() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA), + new DeltaSharingCapabilities("responseformat=delta").getResponseFormat()); + } + + @Test + void parseSimpleResponseFormatParquet() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities("responseformat=PaRquEt").getResponseFormat()); + } + + @Test + void failToParseUnknownResponseFormatAndReturnNothing() { + Assertions.assertEquals( + Set.of(), new DeltaSharingCapabilities("responseformat=iceberg").getResponseFormat()); + } + + @Test + void failToParseUnknownResponseFormatAndReturnOthers() { + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA, + DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities("responseformat=iceberg,parquet,delta").getResponseFormat()); + } + + @Test + void noCapabilitiesEqualsDefault() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities((String) null).getResponseFormat()); + Assertions.assertEquals( + Set.of(), new DeltaSharingCapabilities((String) null).getReaderFeatures()); + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + new DeltaSharingCapabilities("").getResponseFormat()); + Assertions.assertEquals(Set.of(), new DeltaSharingCapabilities("").getReaderFeatures()); + } + + @Test + void parseSimpleReaderFeature() { + Assertions.assertEquals( + Set.of(DeltaSharingCapabilities.DeltaSharingFeatures.DELETION_VECTORS), + new DeltaSharingCapabilities(String.format( + "%s=%s", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_DELETION_VECTOR)) + .getReaderFeatures()); + } + + @Test + void failToParseUnknownReaderFeatureAndReturnNothing() { + Assertions.assertEquals( + Set.of(), + new DeltaSharingCapabilities(String.format( + "%s=%s", DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, "unknown")) + .getReaderFeatures()); + } + + @Test + void failToParseUnknownReaderFeatureAndReturnOthers() { + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingFeatures.COLUMN_MAPPING, + DeltaSharingCapabilities.DeltaSharingFeatures.DOMAIN_METADATA), + new DeltaSharingCapabilities(String.format( + "%s=%s,%s,%s", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, + "unknown", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING, + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA)) + .getReaderFeatures()); + } + + @Test + void kitchenSink() { + var readerFeatures = String.format( + "%s=%s,%s,%s", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURES, + "unknown", + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_COLUMN_MAPPING, + DeltaSharingCapabilities.DELTA_SHARING_READER_FEATURE_DOMAIN_METADATA); + var responseFormat = "responseformat=iceberg,parquet,delta"; + var randomStuff = "thisIsAKey=these,some,values,lol"; + var capabilities = new DeltaSharingCapabilities( + String.format("%s;%s;%s", readerFeatures, responseFormat, randomStuff)); + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingResponseFormat.DELTA, + DeltaSharingCapabilities.DeltaSharingResponseFormat.PARQUET), + capabilities.getResponseFormat()); + Assertions.assertEquals( + Set.of( + DeltaSharingCapabilities.DeltaSharingFeatures.COLUMN_MAPPING, + DeltaSharingCapabilities.DeltaSharingFeatures.DOMAIN_METADATA), + capabilities.getReaderFeatures()); + Assertions.assertEquals( + Set.of("these", "some", "values", "lol"), capabilities.values().get("thisIsAKey")); + } +}