Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(schema-history): remove blame language for the schema history feature #5457

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,9 @@
import com.linkedin.datahub.graphql.resolvers.mutate.RemoveTagResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.RemoveTermResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.UpdateDescriptionResolver;
import com.linkedin.datahub.graphql.resolvers.operation.ReportOperationResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.UpdateNameResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.UpdateParentNodeResolver;
import com.linkedin.datahub.graphql.resolvers.operation.ReportOperationResolver;
import com.linkedin.datahub.graphql.resolvers.policy.DeletePolicyResolver;
import com.linkedin.datahub.graphql.resolvers.policy.GetGrantedPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.policy.ListPoliciesResolver;
Expand All @@ -165,6 +165,7 @@
import com.linkedin.datahub.graphql.resolvers.test.TestResultsResolver;
import com.linkedin.datahub.graphql.resolvers.test.UpdateTestResolver;
import com.linkedin.datahub.graphql.resolvers.timeline.GetSchemaBlameResolver;
import com.linkedin.datahub.graphql.resolvers.timeline.GetSchemaVersionListResolver;
import com.linkedin.datahub.graphql.resolvers.type.AspectInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.EntityInterfaceTypeResolver;
import com.linkedin.datahub.graphql.resolvers.type.HyperParameterValueTypeResolver;
Expand Down Expand Up @@ -214,8 +215,8 @@
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.config.DatahubConfiguration;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.TestsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand All @@ -232,12 +233,6 @@
import graphql.schema.DataFetchingEnvironment;
import graphql.schema.StaticDataFetcher;
import graphql.schema.idl.RuntimeWiring;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.dataloader.BatchLoaderContextProvider;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderOptions;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
Expand All @@ -250,10 +245,15 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.dataloader.BatchLoaderContextProvider;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderOptions;

import static com.linkedin.datahub.graphql.Constants.*;
import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME;
import static graphql.Scalars.GraphQLLong;
import static com.linkedin.metadata.Constants.*;
import static graphql.Scalars.*;


/**
Expand Down Expand Up @@ -635,6 +635,7 @@ private void configureQueryResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("ingestionSource", new GetIngestionSourceResolver(this.entityClient))
.dataFetcher("executionRequest", new GetIngestionExecutionRequestResolver(this.entityClient))
.dataFetcher("getSchemaBlame", new GetSchemaBlameResolver(this.timelineService))
.dataFetcher("getSchemaVersionList", new GetSchemaVersionListResolver(this.timelineService))
.dataFetcher("test", getResolver(testType))
.dataFetcher("listTests", new ListTestsResolver(entityClient))
.dataFetcher("getRootGlossaryTerms", new GetRootGlossaryTermsResolver(this.entityClient))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.GetSchemaBlameInput;
import com.linkedin.datahub.graphql.generated.GetSchemaBlameResult;
import com.linkedin.datahub.graphql.types.timeline.mappers.SchemaFieldBlameMapper;
import com.linkedin.datahub.graphql.types.timeline.mappers.SchemaBlameMapper;
import com.linkedin.metadata.timeline.TimelineService;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
Expand Down Expand Up @@ -46,7 +46,7 @@ public CompletableFuture<GetSchemaBlameResult> get(final DataFetchingEnvironment
Urn datasetUrn = Urn.createFromString(datasetUrnString);
List<ChangeTransaction> changeTransactionList =
_timelineService.getTimeline(datasetUrn, changeCategorySet, startTime, endTime, null, null, false);
return SchemaFieldBlameMapper.map(changeTransactionList, version);
return SchemaBlameMapper.map(changeTransactionList, version);
} catch (URISyntaxException u) {
log.error(
String.format("Failed to list schema blame data, likely due to the Urn %s being invalid", datasetUrnString),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.linkedin.datahub.graphql.resolvers.timeline;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.generated.GetSchemaVersionListInput;
import com.linkedin.datahub.graphql.generated.GetSchemaVersionListResult;
import com.linkedin.datahub.graphql.types.timeline.mappers.SchemaVersionListMapper;
import com.linkedin.metadata.timeline.TimelineService;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;


/*
Returns the most recent changes made to each column in a dataset at each dataset version.
*/
@Slf4j
public class GetSchemaVersionListResolver implements DataFetcher<CompletableFuture<GetSchemaVersionListResult>> {
private final TimelineService _timelineService;

public GetSchemaVersionListResolver(TimelineService timelineService) {
_timelineService = timelineService;
}

@Override
public CompletableFuture<GetSchemaVersionListResult> get(final DataFetchingEnvironment environment) throws Exception {
final GetSchemaVersionListInput input =
bindArgument(environment.getArgument("input"), GetSchemaVersionListInput.class);

final String datasetUrnString = input.getDatasetUrn();
final long startTime = 0;
final long endTime = 0;

return CompletableFuture.supplyAsync(() -> {
try {
final Set<ChangeCategory> changeCategorySet = new HashSet<>();
changeCategorySet.add(ChangeCategory.TECHNICAL_SCHEMA);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice - ilike how you hide this detail

Urn datasetUrn = Urn.createFromString(datasetUrnString);
List<ChangeTransaction> changeTransactionList =
_timelineService.getTimeline(datasetUrn, changeCategorySet, startTime, endTime, null, null, false);
return SchemaVersionListMapper.map(changeTransactionList);
} catch (URISyntaxException u) {
log.error(
String.format("Failed to list schema blame data, likely due to the Urn %s being invalid", datasetUrnString),
u);
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

} catch (Exception e) {
log.error("Failed to list schema blame data", e);
return null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
import com.linkedin.datahub.graphql.generated.SchemaFieldBlame;
import com.linkedin.datahub.graphql.generated.SchemaFieldChange;
import com.linkedin.datahub.graphql.generated.SemanticVersionStruct;
import com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils;
import com.linkedin.metadata.key.SchemaFieldKey;
import com.linkedin.metadata.timeline.data.ChangeCategory;
import com.linkedin.metadata.timeline.data.ChangeEvent;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.util.Pair;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
Expand All @@ -24,14 +24,17 @@
import lombok.extern.slf4j.Slf4j;
import org.apache.parquet.SemanticVersion;

import static com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils.*;


// Class for converting ChangeTransactions received from the Timeline API to SchemaFieldBlame structs for every schema
// at every semantic version.
@Slf4j
public class SchemaFieldBlameMapper {
public class SchemaBlameMapper {

public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransactions, @Nullable String versionCutoff) {
if (changeTransactions.isEmpty()) {
log.debug("Change transactions are empty");
return null;
}

Expand All @@ -40,10 +43,6 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction

String latestSemanticVersionString =
truncateSemanticVersion(changeTransactions.get(changeTransactions.size() - 1).getSemVer());
long latestSemanticVersionTimestamp = changeTransactions.get(changeTransactions.size() - 1).getTimestamp();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need this any more?

String latestVersionStamp = changeTransactions.get(changeTransactions.size() - 1).getVersionStamp();
result.setLatestVersion(
new SemanticVersionStruct(latestSemanticVersionString, latestSemanticVersionTimestamp, latestVersionStamp));

String semanticVersionFilterString = versionCutoff == null ? latestSemanticVersionString : versionCutoff;
Optional<SemanticVersion> semanticVersionFilterOptional = createSemanticVersion(semanticVersionFilterString);
Expand All @@ -54,7 +53,7 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction
SemanticVersion semanticVersionFilter = semanticVersionFilterOptional.get();

List<ChangeTransaction> reversedChangeTransactions = changeTransactions.stream()
.map(SchemaFieldBlameMapper::semanticVersionChangeTransactionPair)
.map(TimelineUtils::semanticVersionChangeTransactionPair)
.filter(Optional::isPresent)
.map(Optional::get)
.filter(semanticVersionChangeTransactionPair ->
Expand All @@ -69,13 +68,7 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction
result.setVersion(
new SemanticVersionStruct(selectedSemanticVersion, selectedSemanticVersionTimestamp, selectedVersionStamp));

List<SemanticVersionStruct> semanticVersionStructList = new ArrayList<>();
for (ChangeTransaction changeTransaction : reversedChangeTransactions) {
SemanticVersionStruct semanticVersionStruct =
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need this anymore?

new SemanticVersionStruct(truncateSemanticVersion(changeTransaction.getSemVer()),
changeTransaction.getTimestamp(), changeTransaction.getVersionStamp());
semanticVersionStructList.add(semanticVersionStruct);

for (ChangeEvent changeEvent : changeTransaction.getChangeEvents()) {
if (changeEvent.getCategory() != ChangeCategory.TECHNICAL_SCHEMA) {
continue;
Expand Down Expand Up @@ -115,71 +108,9 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction
.getChangeType()
.equals(ChangeOperationType.REMOVE))
.collect(Collectors.toList()));
result.setSemanticVersionList(semanticVersionStructList);
return result;
}

private static Optional<Pair<SemanticVersion, ChangeTransaction>> semanticVersionChangeTransactionPair(
ChangeTransaction changeTransaction) {
Optional<SemanticVersion> semanticVersion = createSemanticVersion(changeTransaction.getSemVer());
return semanticVersion.map(version -> Pair.of(version, changeTransaction));
}

private static Optional<SemanticVersion> createSemanticVersion(String semanticVersionString) {
String truncatedSemanticVersion = truncateSemanticVersion(semanticVersionString);
try {
SemanticVersion semanticVersion = SemanticVersion.parse(truncatedSemanticVersion);
return Optional.of(semanticVersion);
} catch (SemanticVersion.SemanticVersionParseException e) {
return Optional.empty();
}
}

// The SemanticVersion is currently returned from the ChangeTransactions in the format "x.y.z-computed". This function
// removes the suffix "computed".
private static String truncateSemanticVersion(String semanticVersion) {
String suffix = "-computed";
return semanticVersion.endsWith(suffix) ? semanticVersion.substring(0, semanticVersion.lastIndexOf(suffix))
: semanticVersion;
}

private static SchemaFieldChange getLastSchemaFieldChange(ChangeEvent changeEvent, long timestamp,
String semanticVersion, String versionStamp) {
SchemaFieldChange schemaFieldChange = new SchemaFieldChange();
schemaFieldChange.setTimestampMillis(timestamp);
schemaFieldChange.setLastSemanticVersion(truncateSemanticVersion(semanticVersion));
schemaFieldChange.setChangeType(
ChangeOperationType.valueOf(ChangeOperationType.class, changeEvent.getOperation().toString()));
schemaFieldChange.setVersionStamp(versionStamp);

String translatedChangeOperationType;
switch (changeEvent.getOperation()) {
case ADD:
translatedChangeOperationType = "Added";
break;
case MODIFY:
translatedChangeOperationType = "Modified";
break;
case REMOVE:
translatedChangeOperationType = "Removed";
break;
default:
translatedChangeOperationType = "Unknown change made";
log.warn(translatedChangeOperationType);
break;
}

String suffix = "-computed";
String translatedSemanticVersion =
semanticVersion.endsWith(suffix) ? semanticVersion.substring(0, semanticVersion.lastIndexOf(suffix))
: semanticVersion;

String lastSchemaFieldChange = String.format("%s in v%s", translatedChangeOperationType, translatedSemanticVersion);
schemaFieldChange.setLastSchemaFieldChange(lastSchemaFieldChange);

return schemaFieldChange;
}

private SchemaFieldBlameMapper() {
private SchemaBlameMapper() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.graphql.types.timeline.mappers;

import com.linkedin.datahub.graphql.generated.GetSchemaVersionListResult;
import com.linkedin.datahub.graphql.generated.SemanticVersionStruct;
import com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils;
import com.linkedin.metadata.timeline.data.ChangeTransaction;
import com.linkedin.util.Pair;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils.*;


// Class for converting ChangeTransactions received from the Timeline API to list of schema versions.
@Slf4j
public class SchemaVersionListMapper {

public static GetSchemaVersionListResult map(List<ChangeTransaction> changeTransactions) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any chance this list is null?

if (changeTransactions.isEmpty()) {
log.debug("Change transactions are empty");
return null;
}

GetSchemaVersionListResult result = new GetSchemaVersionListResult();

String latestSemanticVersionString =
truncateSemanticVersion(changeTransactions.get(changeTransactions.size() - 1).getSemVer());
long latestSemanticVersionTimestamp = changeTransactions.get(changeTransactions.size() - 1).getTimestamp();
String latestVersionStamp = changeTransactions.get(changeTransactions.size() - 1).getVersionStamp();
result.setLatestVersion(
new SemanticVersionStruct(latestSemanticVersionString, latestSemanticVersionTimestamp, latestVersionStamp));

List<ChangeTransaction> reversedChangeTransactions = changeTransactions.stream()
.map(TimelineUtils::semanticVersionChangeTransactionPair)
.filter(Optional::isPresent)
.map(Optional::get)
.sorted(Collections.reverseOrder(Comparator.comparing(Pair::getFirst)))
.map(Pair::getSecond)
.collect(Collectors.toList());

List<SemanticVersionStruct> semanticVersionStructList = reversedChangeTransactions.stream()
.map(changeTransaction -> new SemanticVersionStruct(truncateSemanticVersion(changeTransaction.getSemVer()),
changeTransaction.getTimestamp(), changeTransaction.getVersionStamp()))
.collect(Collectors.toList());

result.setSemanticVersionList(semanticVersionStructList);
return result;
}

private SchemaVersionListMapper() {
}
}
Loading