Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
agilelab-tmnd1991 committed Jan 10, 2024
1 parent 7bb1f7e commit 8a80003
Show file tree
Hide file tree
Showing 17 changed files with 751 additions and 323 deletions.
2 changes: 1 addition & 1 deletion protocol/delta-sharing-protocol-api.yml
Original file line number Diff line number Diff line change
Expand Up @@ -787,7 +787,7 @@ components:

# This is not used for the spec but comes handy for autogeneration
TableQueryResponseObject:
anyOf:
oneOf:
- $ref: '#/components/schemas/ParquetTableQueryResponseObject'
- $ref: '#/components/schemas/DeltaTableQueryResponseObject'
ParquetTableQueryResponseObject:
Expand Down
1 change: 1 addition & 0 deletions server/app/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ val serverGeneratorProperties = mapOf(
"dateLibrary" to "java8",
"disallowAdditionalPropertiesIfNotPresent" to "false",
"generateBuilders" to "false",
"legacyDiscriminatorBehavior" to "false",
"generatePom" to "false",
"interfaceOnly" to "true",
"library" to "quarkus",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import io.whitefox.core.*;
import io.whitefox.core.Schema;
import io.whitefox.core.Share;
import io.whitefox.core.services.DeltaSharingCapabilities;
import java.util.*;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -60,44 +61,44 @@ public static TableQueryResponseObject readTableResult2api(ReadTableResult readT

private static ParquetMetadataObject metadata2Api(Metadata metadata) {
return new ParquetMetadataObject()
.metaData(new ParquetMetadataObjectMetaData()
.numFiles(metadata.numFiles().orElse(null))
.version(metadata.version().orElse(null))
.size(metadata.size().orElse(null))
.id(metadata.id())
.name(metadata.name().orElse(null))
.description(metadata.description().orElse(null))
.format(new ParquetFormatObject().provider(metadata.format().provider()))
.schemaString(metadata.tableSchema().structType().toJson())
.partitionColumns(metadata.partitionColumns())
._configuration(metadata.configuration()));
.metaData(new ParquetMetadataObjectMetaData()
.numFiles(metadata.numFiles().orElse(null))
.version(metadata.version())
.size(metadata.size().orElse(null))
.id(metadata.id())
.name(metadata.name().orElse(null))
.description(metadata.description().orElse(null))
.format(new ParquetFormatObject().provider(metadata.format().provider()))
.schemaString(metadata.tableSchema().structType().toJson())
.partitionColumns(metadata.partitionColumns())
._configuration(metadata.configuration()));
}

private static DeltaProtocolObject protocol2Api(Protocol protocol) {
return new DeltaProtocolObject()
.protocol(new DeltaProtocolObjectProtocol()
.deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol()
.minReaderVersion(protocol.minReaderVersion().orElse(1))
.minWriterVersion(protocol.minWriterVersion().orElse(1))));
.protocol(new DeltaProtocolObjectProtocol()
.deltaProtocol(new DeltaProtocolObjectProtocolDeltaProtocol()
.minReaderVersion(protocol.minReaderVersion().orElse(1))
.minWriterVersion(protocol.minWriterVersion().orElse(1))));
}

private static DeltaFileObject file2Api(TableFile f) {
return new DeltaFileObject()
.id(f.id())
.version(f.version().orElse(null))
.deletionVectorFileId(null) // TODO
.timestamp(f.timestamp().orElse(null))
.expirationTimestamp(f.expirationTimestamp())
.deltaSingleAction(new DeltaSingleAction()
._file(new DeltaAddFileAction()
.id(f.id())
.url(f.url())
.partitionValues(f.partitionValues())
.size(f.size())
.stats(f.stats().orElse(null))
.version(f.version().orElse(null))
.timestamp(f.timestamp().orElse(null))
.expirationTimestamp(f.expirationTimestamp())));
.id(f.id())
.version(f.version().orElse(null))
.deletionVectorFileId(null) // TODO
.timestamp(f.timestamp().orElse(null))
.expirationTimestamp(f.expirationTimestamp())
.deltaSingleAction(new DeltaSingleAction()
._file(new DeltaAddFileAction()
.id(f.id())
.url(f.url())
.partitionValues(f.partitionValues())
.size(f.size())
.stats(f.stats().orElse(null))
.version(f.version().orElse(null))
.timestamp(f.timestamp().orElse(null))
.expirationTimestamp(f.expirationTimestamp())));
}

public static TableReferenceAndReadRequest api2TableReferenceAndReadRequest(
Expand All @@ -113,28 +114,16 @@ public static io.whitefox.api.deltasharing.model.v1.generated.Table table2api(
.schema(sharedTable.schema());
}

/**
* NOTE: this is an undocumented feature of the reference impl of delta-sharing, it's not part of the
* protocol
* ----
* Return the [[io.whitefox.api.server.DeltaHeaders.DELTA_SHARE_CAPABILITIES_HEADER]] header
* that will be set in the response w/r/t the one received in the request.
* If the request did not contain any, we will return an empty one.
*/
public static Map<String, String> toHeaderCapabilitiesMap(String headerCapabilities) {
if (headerCapabilities == null) {
return Map.of();
}
return Arrays.stream(headerCapabilities.toLowerCase().split(";"))
.map(h -> h.split("="))
.filter(h -> h.length == 2)
.map(splits -> Map.entry(splits[0], splits[1]))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public static TableMetadataResponseObject toTableResponseMetadata(Metadata m) {
return new TableMetadataResponseObject()
.protocol(new ParquetProtocolObject().protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1)))
.protocol(new ParquetProtocolObject()
.protocol(new ParquetProtocolObjectProtocol().minReaderVersion(1)))
.metadata(metadata2Api(m));
}

public static String toCapabilitiesHeader(DeltaSharingCapabilities deltaSharingCapabilities) {
return deltaSharingCapabilities.values().entrySet().stream()
.map(entry -> entry.getKey() + "=" + String.join(",", entry.getValue()))
.collect(Collectors.joining(";"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.whitefox.api.server.ApiUtils;
import io.whitefox.core.services.ContentAndToken;
import io.whitefox.core.services.DeltaSharesService;
import io.whitefox.core.services.DeltaSharingCapabilities;
import io.whitefox.core.services.ShareService;
import jakarta.inject.Inject;
import jakarta.ws.rs.core.MediaType;
Expand Down Expand Up @@ -60,8 +61,9 @@ public Response getTableChanges(
Integer startingVersion,
Integer endingVersion,
String endingTimestamp,
Boolean includeHistoricalMetadata) {
return Response.ok().build();
Boolean includeHistoricalMetadata,
String deltaSharingCapabilities) {
return Response.status(Response.Status.NOT_IMPLEMENTED).build();
}

@Override
Expand All @@ -72,20 +74,22 @@ public Response getTableMetadata(
String startingTimestamp,
String deltaSharingCapabilities) {
return wrapExceptions(
() -> optionalToNotFound(
deltaSharesService.getTableMetadata(share, schema, table, startingTimestamp),
m -> optionalToNotFound(
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
v -> Response.ok(
tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)),
ndjsonMediaType)
.status(Response.Status.OK.getStatusCode())
.header(DELTA_TABLE_VERSION_HEADER, String.valueOf(v))
.header(
DELTA_SHARE_CAPABILITIES_HEADER,
getResponseFormatHeader(
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
.build())),
() -> {
DeltaSharingCapabilities requestCapabilities =
new DeltaSharingCapabilities(deltaSharingCapabilities);
return optionalToNotFound(
deltaSharesService.getTableMetadata(
share, schema, table, startingTimestamp, requestCapabilities),
m -> Response.ok(
tableResponseSerializer.serialize(DeltaMappers.toTableResponseMetadata(m)),
ndjsonMediaType)
.status(Response.Status.OK.getStatusCode())
.header(DELTA_TABLE_VERSION_HEADER, String.valueOf(m.version()))
.header(
DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER,
DeltaMappers.toCapabilitiesHeader(m.tableCapabilities()))
.build());
},
exceptionToResponse);
}

Expand Down Expand Up @@ -193,20 +197,25 @@ public Response queryTable(
return wrapExceptions(
() -> optionalToNotFound(
deltaSharesService.getTableVersion(share, schema, table, startingTimestamp),
version -> Response.ok(
tableQueryResponseSerializer.serialize(
DeltaMappers.readTableResult2api(deltaSharesService.queryTable(
share,
schema,
table,
DeltaMappers.api2ReadTableRequest(queryRequest)))),
ndjsonMediaType)
.header(DELTA_TABLE_VERSION_HEADER, version)
.header(
DELTA_SHARE_CAPABILITIES_HEADER,
getResponseFormatHeader(
DeltaMappers.toHeaderCapabilitiesMap(deltaSharingCapabilities)))
.build()),
version -> {
var capabilities = new DeltaSharingCapabilities(deltaSharingCapabilities);
var readTableResult = deltaSharesService.queryTable(
share,
schema,
table,
DeltaMappers.api2ReadTableRequest(queryRequest),
capabilities);
return Response.ok(
tableQueryResponseSerializer.serialize(
DeltaMappers.readTableResult2api(readTableResult)),
ndjsonMediaType)
.header(DELTA_TABLE_VERSION_HEADER, version)
.header(
DeltaSharingCapabilities.DELTA_SHARE_CAPABILITIES_HEADER,
DeltaMappers.toCapabilitiesHeader(
readTableResult.metadata().tableCapabilities()))
.build();
}),
exceptionToResponse);
}

Expand Down
6 changes: 0 additions & 6 deletions server/app/src/main/java/io/whitefox/api/server/ApiUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,6 @@ default <T> Response optionalToNotFound(Optional<T> opt, Function<T, Response> f
return opt.map(fn).orElse(notFoundResponse());
}

default String getResponseFormatHeader(Map<String, String> deltaSharingCapabilities) {
return String.format(
"%s=%s",
DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT, getResponseFormat(deltaSharingCapabilities));
}

default String getResponseFormat(Map<String, String> deltaSharingCapabilities) {
return deltaSharingCapabilities.getOrDefault(
DeltaHeaders.DELTA_SHARING_RESPONSE_FORMAT,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.whitefox.api.server;

import io.whitefox.core.services.DeltaSharingCapabilities;

public interface DeltaHeaders {
String DELTA_SHARING_RESPONSE_FORMAT = "responseformat";
String DELTA_SHARING_RESPONSE_FORMAT = DeltaSharingCapabilities.DELTA_SHARING_RESPONSE_FORMAT;
String DELTA_TABLE_VERSION_HEADER = "Delta-Table-Version";
String DELTA_SHARE_CAPABILITIES_HEADER = "delta-sharing-capabilities";
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ public static MetastoreType api2MetastoreType(
}
}

// TODO we need to resolve the json schema anyOf/oneOf problem
private static MetadataObject metadata2Api(Metadata metadata) {
return new MetadataObject()
.metaData(new MetadataObjectMetaData()
Expand Down
Loading

0 comments on commit 8a80003

Please sign in to comment.