Skip to content

Commit

Permalink
Merge branch 'cus2240-lookml-ingestion-fails' of github.com:sid-acryl…
Browse files Browse the repository at this point in the history
…/datahub-fork into cus2240-lookml-ingestion-fails
  • Loading branch information
sid-acryl committed Jul 27, 2024
2 parents 99b89d2 + f6d97c0 commit 75ede04
Show file tree
Hide file tree
Showing 219 changed files with 2,834 additions and 1,046 deletions.
3 changes: 2 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ buildscript {
ext.hazelcastVersion = '5.3.6'
ext.ebeanVersion = '12.16.1'
ext.googleJavaFormatVersion = '1.18.1'
ext.openLineageVersion = '1.16.0'
ext.openLineageVersion = '1.19.0'
ext.logbackClassicJava8 = '1.2.12'

ext.docker_registry = 'acryldata'
Expand Down Expand Up @@ -111,6 +111,7 @@ project.ext.externalDependency = [
'avroCompiler': 'org.apache.avro:avro-compiler:1.11.3',
'awsGlueSchemaRegistrySerde': 'software.amazon.glue:schema-registry-serde:1.1.17',
'awsMskIamAuth': 'software.amazon.msk:aws-msk-iam-auth:2.0.3',
'awsS3': 'software.amazon.awssdk:s3:2.26.21',
'awsSecretsManagerJdbc': 'com.amazonaws.secretsmanager:aws-secretsmanager-jdbc:1.0.13',
'awsPostgresIamAuth': 'software.amazon.jdbc:aws-advanced-jdbc-wrapper:1.0.2',
'awsRds':'software.amazon.awssdk:rds:2.18.24',
Expand Down
1 change: 1 addition & 0 deletions datahub-graphql-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ dependencies {
implementation externalDependency.opentelemetryAnnotations

implementation externalDependency.slf4jApi
implementation externalDependency.springContext
compileOnly externalDependency.lombok
annotationProcessor externalDependency.lombok

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1869,7 +1869,9 @@ private void configureCorpGroupResolvers(final RuntimeWiring.Builder builder) {
"CorpGroup",
typeWiring ->
typeWiring
.dataFetcher("relationships", new EntityRelationshipsResultResolver(graphClient))
.dataFetcher(
"relationships",
new EntityRelationshipsResultResolver(graphClient, entityService))
.dataFetcher("privileges", new EntityPrivilegesResolver(entityClient))
.dataFetcher(
"aspects", new WeaklyTypedAspectsResolver(entityClient, entityRegistry))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,16 @@
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.net.URISyntaxException;
import java.time.DateTimeException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.support.CronExpression;

/** Creates or updates an ingestion source. Requires the MANAGE_INGESTION privilege. */
@Slf4j
Expand All @@ -46,55 +50,51 @@ public UpsertIngestionSourceResolver(final EntityClient entityClient) {
public CompletableFuture<String> get(final DataFetchingEnvironment environment) throws Exception {
final QueryContext context = environment.getContext();

return GraphQLConcurrencyUtils.supplyAsync(
() -> {
if (IngestionAuthUtils.canManageIngestion(context)) {

final Optional<String> ingestionSourceUrn =
Optional.ofNullable(environment.getArgument("urn"));
final UpdateIngestionSourceInput input =
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);
if (!IngestionAuthUtils.canManageIngestion(context)) {
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
final Optional<String> ingestionSourceUrn = Optional.ofNullable(environment.getArgument("urn"));
final UpdateIngestionSourceInput input =
bindArgument(environment.getArgument("input"), UpdateIngestionSourceInput.class);

// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final MetadataChangeProposal proposal;
if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
try {
proposal =
buildMetadataChangeProposalWithUrn(
Urn.createFromString(ingestionSourceUrn.get()),
INGESTION_INFO_ASPECT_NAME,
info);
} catch (URISyntaxException e) {
throw new DataHubGraphQLException(
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
final UUID uuid = UUID.randomUUID();
final String uuidStr = uuid.toString();
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal =
buildMetadataChangeProposalWithKey(
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
}
// Create the policy info.
final DataHubIngestionSourceInfo info = mapIngestionSourceInfo(input);
final MetadataChangeProposal proposal;
if (ingestionSourceUrn.isPresent()) {
// Update existing ingestion source
try {
proposal =
buildMetadataChangeProposalWithUrn(
Urn.createFromString(ingestionSourceUrn.get()), INGESTION_INFO_ASPECT_NAME, info);
} catch (URISyntaxException e) {
throw new DataHubGraphQLException(
String.format("Malformed urn %s provided.", ingestionSourceUrn.get()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
} else {
// Create new ingestion source
// Since we are creating a new Ingestion Source, we need to generate a unique UUID.
final UUID uuid = UUID.randomUUID();
final String uuidStr = uuid.toString();
final DataHubIngestionSourceKey key = new DataHubIngestionSourceKey();
key.setId(uuidStr);
proposal =
buildMetadataChangeProposalWithKey(
key, INGESTION_SOURCE_ENTITY_NAME, INGESTION_INFO_ASPECT_NAME, info);
}

try {
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to perform update against ingestion source with urn %s",
input.toString()),
e);
}
return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
return _entityClient.ingestProposal(context.getOperationContext(), proposal, false);
} catch (Exception e) {
throw new RuntimeException(
String.format(
"Failed to perform update against ingestion source with urn %s",
input.toString()),
e);
}
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
},
this.getClass().getSimpleName(),
"get");
Expand Down Expand Up @@ -137,9 +137,38 @@ private DataHubIngestionSourceConfig mapConfig(final UpdateIngestionSourceConfig

private DataHubIngestionSourceSchedule mapSchedule(
final UpdateIngestionSourceScheduleInput input) {

final String modifiedCronInterval = adjustCronInterval(input.getInterval());
try {
CronExpression.parse(modifiedCronInterval);
} catch (IllegalArgumentException e) {
throw new DataHubGraphQLException(
String.format("Invalid cron schedule `%s`: %s", input.getInterval(), e.getMessage()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}
try {
ZoneId.of(input.getTimezone());
} catch (DateTimeException e) {
throw new DataHubGraphQLException(
String.format("Invalid timezone `%s`: %s", input.getTimezone(), e.getMessage()),
DataHubGraphQLErrorCode.BAD_REQUEST);
}

final DataHubIngestionSourceSchedule result = new DataHubIngestionSourceSchedule();
result.setInterval(input.getInterval());
result.setTimezone(input.getTimezone());
return result;
}

// Copied from IngestionScheduler.java
private String adjustCronInterval(final String origCronInterval) {
Objects.requireNonNull(origCronInterval, "origCronInterval must not be null");
// Typically we support 5-character cron. Spring's lib only supports 6 character cron so we make
// an adjustment here.
final String[] originalCronParts = origCronInterval.split(" ");
if (originalCronParts.length == 5) {
return String.format("0 %s", origCronInterval);
}
return origCronInterval;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,21 @@

import com.linkedin.common.EntityRelationship;
import com.linkedin.common.EntityRelationships;
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.concurrency.GraphQLConcurrencyUtils;
import com.linkedin.datahub.graphql.generated.Entity;
import com.linkedin.datahub.graphql.generated.EntityRelationshipsResult;
import com.linkedin.datahub.graphql.generated.RelationshipsInput;
import com.linkedin.datahub.graphql.types.common.mappers.AuditStampMapper;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.query.filter.RelationshipDirection;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
Expand All @@ -29,8 +32,16 @@ public class EntityRelationshipsResultResolver

private final GraphClient _graphClient;

private final EntityService _entityService;

public EntityRelationshipsResultResolver(final GraphClient graphClient) {
this(graphClient, null);
}

public EntityRelationshipsResultResolver(
final GraphClient graphClient, final EntityService entityService) {
_graphClient = graphClient;
_entityService = entityService;
}

@Override
Expand All @@ -47,13 +58,16 @@ public CompletableFuture<EntityRelationshipsResult> get(DataFetchingEnvironment
final Integer count = input.getCount(); // Optional!
final RelationshipDirection resolvedDirection =
RelationshipDirection.valueOf(relationshipDirection.toString());
final boolean includeSoftDelete = input.getIncludeSoftDelete();

return GraphQLConcurrencyUtils.supplyAsync(
() ->
mapEntityRelationships(
context,
fetchEntityRelationships(
urn, relationshipTypes, resolvedDirection, start, count, context.getActorUrn()),
resolvedDirection),
resolvedDirection,
includeSoftDelete),
this.getClass().getSimpleName(),
"get");
}
Expand All @@ -72,13 +86,28 @@ private EntityRelationships fetchEntityRelationships(
private EntityRelationshipsResult mapEntityRelationships(
@Nullable final QueryContext context,
final EntityRelationships entityRelationships,
final RelationshipDirection relationshipDirection) {
final RelationshipDirection relationshipDirection,
final boolean includeSoftDelete) {
final EntityRelationshipsResult result = new EntityRelationshipsResult();

final Set<Urn> existentUrns;
if (context != null && _entityService != null && !includeSoftDelete) {
Set<Urn> allRelatedUrns =
entityRelationships.getRelationships().stream()
.map(EntityRelationship::getEntity)
.collect(Collectors.toSet());
existentUrns = _entityService.exists(context.getOperationContext(), allRelatedUrns, false);
} else {
existentUrns = null;
}

List<EntityRelationship> viewable =
entityRelationships.getRelationships().stream()
.filter(
rel -> context == null || canView(context.getOperationContext(), rel.getEntity()))
rel ->
(existentUrns == null || existentUrns.contains(rel.getEntity()))
&& (context == null
|| canView(context.getOperationContext(), rel.getEntity())))
.collect(Collectors.toList());

result.setStart(entityRelationships.getStart());
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1267,6 +1267,11 @@ input RelationshipsInput {
The number of results to be returned
"""
count: Int

"""
Whether to include soft-deleted, related, entities
"""
includeSoftDelete: Boolean = true
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.testng.Assert.*;

import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceConfigInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceInput;
import com.linkedin.datahub.graphql.generated.UpdateIngestionSourceScheduleInput;
Expand All @@ -22,14 +23,17 @@

public class UpsertIngestionSourceResolverTest {

private static final UpdateIngestionSourceInput TEST_INPUT =
new UpdateIngestionSourceInput(
"Test source",
"mysql",
"Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput(
"my test recipe", "0.8.18", "executor id", false, null));
private static final UpdateIngestionSourceInput TEST_INPUT = makeInput();

private static UpdateIngestionSourceInput makeInput() {
return new UpdateIngestionSourceInput(
"Test source",
"mysql",
"Test source description",
new UpdateIngestionSourceScheduleInput("* * * * *", "UTC"),
new UpdateIngestionSourceConfigInput(
"my test recipe", "0.8.18", "executor id", false, null));
}

@Test
public void testGetSuccess() throws Exception {
Expand Down Expand Up @@ -104,4 +108,54 @@ public void testGetEntityClientException() throws Exception {

assertThrows(RuntimeException.class, () -> resolver.get(mockEnv).join());
}

@Test
public void testUpsertWithInvalidCron() throws Exception {
final UpdateIngestionSourceInput input = makeInput();
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * 123", "UTC"));

// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());

input.setSchedule(new UpdateIngestionSourceScheduleInput("null", "UTC"));
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
}

@Test
public void testUpsertWithInvalidTimezone() throws Exception {
final UpdateIngestionSourceInput input = makeInput();
input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "Invalid"));

// Create resolver
EntityClient mockClient = Mockito.mock(EntityClient.class);
UpsertIngestionSourceResolver resolver = new UpsertIngestionSourceResolver(mockClient);

// Execute resolver
QueryContext mockContext = getMockAllowContext();
DataFetchingEnvironment mockEnv = Mockito.mock(DataFetchingEnvironment.class);
Mockito.when(mockEnv.getArgument(Mockito.eq("urn")))
.thenReturn(TEST_INGESTION_SOURCE_URN.toString());
Mockito.when(mockEnv.getArgument(Mockito.eq("input"))).thenReturn(input);
Mockito.when(mockEnv.getContext()).thenReturn(mockContext);

assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());

input.setSchedule(new UpdateIngestionSourceScheduleInput("* * * * *", "America/Los_Angel"));
assertThrows(DataHubGraphQLException.class, () -> resolver.get(mockEnv).join());
Mockito.verify(mockClient, Mockito.times(0)).ingestProposal(any(), any(), anyBoolean());
}
}
Loading

0 comments on commit 75ede04

Please sign in to comment.