-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(schema-history): remove blame language for the schema history feature #5457
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
package com.linkedin.datahub.graphql.resolvers.timeline; | ||
|
||
import com.linkedin.common.urn.Urn; | ||
import com.linkedin.datahub.graphql.generated.GetSchemaVersionListInput; | ||
import com.linkedin.datahub.graphql.generated.GetSchemaVersionListResult; | ||
import com.linkedin.datahub.graphql.types.timeline.mappers.SchemaVersionListMapper; | ||
import com.linkedin.metadata.timeline.TimelineService; | ||
import com.linkedin.metadata.timeline.data.ChangeCategory; | ||
import com.linkedin.metadata.timeline.data.ChangeTransaction; | ||
import graphql.schema.DataFetcher; | ||
import graphql.schema.DataFetchingEnvironment; | ||
import java.net.URISyntaxException; | ||
import java.util.HashSet; | ||
import java.util.List; | ||
import java.util.Set; | ||
import java.util.concurrent.CompletableFuture; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*; | ||
|
||
|
||
/* | ||
Returns the most recent changes made to each column in a dataset at each dataset version. | ||
*/ | ||
@Slf4j | ||
public class GetSchemaVersionListResolver implements DataFetcher<CompletableFuture<GetSchemaVersionListResult>> { | ||
private final TimelineService _timelineService; | ||
|
||
public GetSchemaVersionListResolver(TimelineService timelineService) { | ||
_timelineService = timelineService; | ||
} | ||
|
||
@Override | ||
public CompletableFuture<GetSchemaVersionListResult> get(final DataFetchingEnvironment environment) throws Exception { | ||
final GetSchemaVersionListInput input = | ||
bindArgument(environment.getArgument("input"), GetSchemaVersionListInput.class); | ||
|
||
final String datasetUrnString = input.getDatasetUrn(); | ||
final long startTime = 0; | ||
final long endTime = 0; | ||
|
||
return CompletableFuture.supplyAsync(() -> { | ||
try { | ||
final Set<ChangeCategory> changeCategorySet = new HashSet<>(); | ||
changeCategorySet.add(ChangeCategory.TECHNICAL_SCHEMA); | ||
Urn datasetUrn = Urn.createFromString(datasetUrnString); | ||
List<ChangeTransaction> changeTransactionList = | ||
_timelineService.getTimeline(datasetUrn, changeCategorySet, startTime, endTime, null, null, false); | ||
return SchemaVersionListMapper.map(changeTransactionList); | ||
} catch (URISyntaxException u) { | ||
log.error( | ||
String.format("Failed to list schema blame data, likely due to the Urn %s being invalid", datasetUrnString), | ||
u); | ||
return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
} catch (Exception e) { | ||
log.error("Failed to list schema blame data", e); | ||
return null; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nice! |
||
} | ||
}); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,13 +6,13 @@ | |
import com.linkedin.datahub.graphql.generated.SchemaFieldBlame; | ||
import com.linkedin.datahub.graphql.generated.SchemaFieldChange; | ||
import com.linkedin.datahub.graphql.generated.SemanticVersionStruct; | ||
import com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils; | ||
import com.linkedin.metadata.key.SchemaFieldKey; | ||
import com.linkedin.metadata.timeline.data.ChangeCategory; | ||
import com.linkedin.metadata.timeline.data.ChangeEvent; | ||
import com.linkedin.metadata.timeline.data.ChangeTransaction; | ||
import com.linkedin.metadata.utils.EntityKeyUtils; | ||
import com.linkedin.util.Pair; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.HashMap; | ||
|
@@ -24,14 +24,17 @@ | |
import lombok.extern.slf4j.Slf4j; | ||
import org.apache.parquet.SemanticVersion; | ||
|
||
import static com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils.*; | ||
|
||
|
||
// Class for converting ChangeTransactions received from the Timeline API to SchemaFieldBlame structs for every schema | ||
// at every semantic version. | ||
@Slf4j | ||
public class SchemaFieldBlameMapper { | ||
public class SchemaBlameMapper { | ||
|
||
public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransactions, @Nullable String versionCutoff) { | ||
if (changeTransactions.isEmpty()) { | ||
log.debug("Change transactions are empty"); | ||
return null; | ||
} | ||
|
||
|
@@ -40,10 +43,6 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction | |
|
||
String latestSemanticVersionString = | ||
truncateSemanticVersion(changeTransactions.get(changeTransactions.size() - 1).getSemVer()); | ||
long latestSemanticVersionTimestamp = changeTransactions.get(changeTransactions.size() - 1).getTimestamp(); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need this any more? |
||
String latestVersionStamp = changeTransactions.get(changeTransactions.size() - 1).getVersionStamp(); | ||
result.setLatestVersion( | ||
new SemanticVersionStruct(latestSemanticVersionString, latestSemanticVersionTimestamp, latestVersionStamp)); | ||
|
||
String semanticVersionFilterString = versionCutoff == null ? latestSemanticVersionString : versionCutoff; | ||
Optional<SemanticVersion> semanticVersionFilterOptional = createSemanticVersion(semanticVersionFilterString); | ||
|
@@ -54,7 +53,7 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction | |
SemanticVersion semanticVersionFilter = semanticVersionFilterOptional.get(); | ||
|
||
List<ChangeTransaction> reversedChangeTransactions = changeTransactions.stream() | ||
.map(SchemaFieldBlameMapper::semanticVersionChangeTransactionPair) | ||
.map(TimelineUtils::semanticVersionChangeTransactionPair) | ||
.filter(Optional::isPresent) | ||
.map(Optional::get) | ||
.filter(semanticVersionChangeTransactionPair -> | ||
|
@@ -69,13 +68,7 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction | |
result.setVersion( | ||
new SemanticVersionStruct(selectedSemanticVersion, selectedSemanticVersionTimestamp, selectedVersionStamp)); | ||
|
||
List<SemanticVersionStruct> semanticVersionStructList = new ArrayList<>(); | ||
for (ChangeTransaction changeTransaction : reversedChangeTransactions) { | ||
SemanticVersionStruct semanticVersionStruct = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't need this anymore? |
||
new SemanticVersionStruct(truncateSemanticVersion(changeTransaction.getSemVer()), | ||
changeTransaction.getTimestamp(), changeTransaction.getVersionStamp()); | ||
semanticVersionStructList.add(semanticVersionStruct); | ||
|
||
for (ChangeEvent changeEvent : changeTransaction.getChangeEvents()) { | ||
if (changeEvent.getCategory() != ChangeCategory.TECHNICAL_SCHEMA) { | ||
continue; | ||
|
@@ -115,71 +108,9 @@ public static GetSchemaBlameResult map(List<ChangeTransaction> changeTransaction | |
.getChangeType() | ||
.equals(ChangeOperationType.REMOVE)) | ||
.collect(Collectors.toList())); | ||
result.setSemanticVersionList(semanticVersionStructList); | ||
return result; | ||
} | ||
|
||
private static Optional<Pair<SemanticVersion, ChangeTransaction>> semanticVersionChangeTransactionPair( | ||
ChangeTransaction changeTransaction) { | ||
Optional<SemanticVersion> semanticVersion = createSemanticVersion(changeTransaction.getSemVer()); | ||
return semanticVersion.map(version -> Pair.of(version, changeTransaction)); | ||
} | ||
|
||
private static Optional<SemanticVersion> createSemanticVersion(String semanticVersionString) { | ||
String truncatedSemanticVersion = truncateSemanticVersion(semanticVersionString); | ||
try { | ||
SemanticVersion semanticVersion = SemanticVersion.parse(truncatedSemanticVersion); | ||
return Optional.of(semanticVersion); | ||
} catch (SemanticVersion.SemanticVersionParseException e) { | ||
return Optional.empty(); | ||
} | ||
} | ||
|
||
// The SemanticVersion is currently returned from the ChangeTransactions in the format "x.y.z-computed". This function | ||
// removes the suffix "computed". | ||
private static String truncateSemanticVersion(String semanticVersion) { | ||
String suffix = "-computed"; | ||
return semanticVersion.endsWith(suffix) ? semanticVersion.substring(0, semanticVersion.lastIndexOf(suffix)) | ||
: semanticVersion; | ||
} | ||
|
||
private static SchemaFieldChange getLastSchemaFieldChange(ChangeEvent changeEvent, long timestamp, | ||
String semanticVersion, String versionStamp) { | ||
SchemaFieldChange schemaFieldChange = new SchemaFieldChange(); | ||
schemaFieldChange.setTimestampMillis(timestamp); | ||
schemaFieldChange.setLastSemanticVersion(truncateSemanticVersion(semanticVersion)); | ||
schemaFieldChange.setChangeType( | ||
ChangeOperationType.valueOf(ChangeOperationType.class, changeEvent.getOperation().toString())); | ||
schemaFieldChange.setVersionStamp(versionStamp); | ||
|
||
String translatedChangeOperationType; | ||
switch (changeEvent.getOperation()) { | ||
case ADD: | ||
translatedChangeOperationType = "Added"; | ||
break; | ||
case MODIFY: | ||
translatedChangeOperationType = "Modified"; | ||
break; | ||
case REMOVE: | ||
translatedChangeOperationType = "Removed"; | ||
break; | ||
default: | ||
translatedChangeOperationType = "Unknown change made"; | ||
log.warn(translatedChangeOperationType); | ||
break; | ||
} | ||
|
||
String suffix = "-computed"; | ||
String translatedSemanticVersion = | ||
semanticVersion.endsWith(suffix) ? semanticVersion.substring(0, semanticVersion.lastIndexOf(suffix)) | ||
: semanticVersion; | ||
|
||
String lastSchemaFieldChange = String.format("%s in v%s", translatedChangeOperationType, translatedSemanticVersion); | ||
schemaFieldChange.setLastSchemaFieldChange(lastSchemaFieldChange); | ||
|
||
return schemaFieldChange; | ||
} | ||
|
||
private SchemaFieldBlameMapper() { | ||
private SchemaBlameMapper() { | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
package com.linkedin.datahub.graphql.types.timeline.mappers; | ||
|
||
import com.linkedin.datahub.graphql.generated.GetSchemaVersionListResult; | ||
import com.linkedin.datahub.graphql.generated.SemanticVersionStruct; | ||
import com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils; | ||
import com.linkedin.metadata.timeline.data.ChangeTransaction; | ||
import com.linkedin.util.Pair; | ||
import java.util.Collections; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import java.util.Optional; | ||
import java.util.stream.Collectors; | ||
import lombok.extern.slf4j.Slf4j; | ||
|
||
import static com.linkedin.datahub.graphql.types.timeline.utils.TimelineUtils.*; | ||
|
||
|
||
// Class for converting ChangeTransactions received from the Timeline API to list of schema versions. | ||
@Slf4j | ||
public class SchemaVersionListMapper { | ||
|
||
public static GetSchemaVersionListResult map(List<ChangeTransaction> changeTransactions) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any chance this list is null? |
||
if (changeTransactions.isEmpty()) { | ||
log.debug("Change transactions are empty"); | ||
return null; | ||
} | ||
|
||
GetSchemaVersionListResult result = new GetSchemaVersionListResult(); | ||
|
||
String latestSemanticVersionString = | ||
truncateSemanticVersion(changeTransactions.get(changeTransactions.size() - 1).getSemVer()); | ||
long latestSemanticVersionTimestamp = changeTransactions.get(changeTransactions.size() - 1).getTimestamp(); | ||
String latestVersionStamp = changeTransactions.get(changeTransactions.size() - 1).getVersionStamp(); | ||
result.setLatestVersion( | ||
new SemanticVersionStruct(latestSemanticVersionString, latestSemanticVersionTimestamp, latestVersionStamp)); | ||
|
||
List<ChangeTransaction> reversedChangeTransactions = changeTransactions.stream() | ||
.map(TimelineUtils::semanticVersionChangeTransactionPair) | ||
.filter(Optional::isPresent) | ||
.map(Optional::get) | ||
.sorted(Collections.reverseOrder(Comparator.comparing(Pair::getFirst))) | ||
.map(Pair::getSecond) | ||
.collect(Collectors.toList()); | ||
|
||
List<SemanticVersionStruct> semanticVersionStructList = reversedChangeTransactions.stream() | ||
.map(changeTransaction -> new SemanticVersionStruct(truncateSemanticVersion(changeTransaction.getSemVer()), | ||
changeTransaction.getTimestamp(), changeTransaction.getVersionStamp())) | ||
.collect(Collectors.toList()); | ||
|
||
result.setSemanticVersionList(semanticVersionStructList); | ||
return result; | ||
} | ||
|
||
private SchemaVersionListMapper() { | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice - ilike how you hide this detail