Skip to content

Commit

Permalink
feat(groups): add native groups concept to DataHub
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya-radhakrishnan committed Jul 20, 2022
1 parent ac61bcd commit 1af378f
Show file tree
Hide file tree
Showing 26 changed files with 1,309 additions and 326 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.linkedin.datahub.graphql;

import com.datahub.authentication.AuthenticationConfiguration;
import com.datahub.authentication.group.GroupService;
import com.datahub.authentication.token.StatefulTokenService;
import com.datahub.authentication.user.NativeUserService;
import com.datahub.authorization.AuthorizationConfiguration;
Expand Down Expand Up @@ -84,11 +85,11 @@
import com.linkedin.datahub.graphql.resolvers.config.AppConfigResolver;
import com.linkedin.datahub.graphql.resolvers.container.ContainerEntitiesResolver;
import com.linkedin.datahub.graphql.resolvers.container.ParentContainersResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardUsageStatsResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardStatsSummaryResolver;
import com.linkedin.datahub.graphql.resolvers.dashboard.DashboardUsageStatsResolver;
import com.linkedin.datahub.graphql.resolvers.dataset.DatasetHealthResolver;
import com.linkedin.datahub.graphql.resolvers.dataset.DatasetUsageStatsResolver;
import com.linkedin.datahub.graphql.resolvers.dataset.DatasetStatsSummaryResolver;
import com.linkedin.datahub.graphql.resolvers.dataset.DatasetUsageStatsResolver;
import com.linkedin.datahub.graphql.resolvers.deprecation.UpdateDeprecationResolver;
import com.linkedin.datahub.graphql.resolvers.domain.CreateDomainResolver;
import com.linkedin.datahub.graphql.resolvers.domain.DeleteDomainResolver;
Expand Down Expand Up @@ -147,9 +148,9 @@
import com.linkedin.datahub.graphql.resolvers.mutate.RemoveTagResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.RemoveTermResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.UpdateDescriptionResolver;
import com.linkedin.datahub.graphql.resolvers.operation.ReportOperationResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.UpdateNameResolver;
import com.linkedin.datahub.graphql.resolvers.mutate.UpdateParentNodeResolver;
import com.linkedin.datahub.graphql.resolvers.operation.ReportOperationResolver;
import com.linkedin.datahub.graphql.resolvers.policy.DeletePolicyResolver;
import com.linkedin.datahub.graphql.resolvers.policy.GetGrantedPrivilegesResolver;
import com.linkedin.datahub.graphql.resolvers.policy.ListPoliciesResolver;
Expand Down Expand Up @@ -217,8 +218,8 @@
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.config.DatahubConfiguration;
import com.linkedin.metadata.config.IngestionConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.config.TestsConfiguration;
import com.linkedin.metadata.config.VisualConfiguration;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.graph.GraphClient;
import com.linkedin.metadata.graph.SiblingGraphService;
Expand All @@ -235,12 +236,6 @@
import graphql.schema.DataFetchingEnvironment;
import graphql.schema.StaticDataFetcher;
import graphql.schema.idl.RuntimeWiring;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.dataloader.BatchLoaderContextProvider;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderOptions;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
Expand All @@ -253,10 +248,15 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.dataloader.BatchLoaderContextProvider;
import org.dataloader.DataLoader;
import org.dataloader.DataLoaderOptions;

import static com.linkedin.datahub.graphql.Constants.*;
import static com.linkedin.metadata.Constants.DATA_PROCESS_INSTANCE_RUN_EVENT_ASPECT_NAME;
import static graphql.Scalars.GraphQLLong;
import static com.linkedin.metadata.Constants.*;
import static graphql.Scalars.*;


/**
Expand All @@ -281,6 +281,7 @@ public class GmsGraphQLEngine {
private final TimeseriesAspectService timeseriesAspectService;
private final TimelineService timelineService;
private final NativeUserService nativeUserService;
private final GroupService groupService;

private final IngestionConfiguration ingestionConfiguration;
private final AuthenticationConfiguration authenticationConfiguration;
Expand Down Expand Up @@ -352,19 +353,13 @@ public GmsGraphQLEngine(
final TimeseriesAspectService timeseriesAspectService,
final EntityRegistry entityRegistry,
final SecretService secretService,
final NativeUserService nativeUserService,
final IngestionConfiguration ingestionConfiguration,
final NativeUserService nativeUserService, final IngestionConfiguration ingestionConfiguration,
final AuthenticationConfiguration authenticationConfiguration,
final AuthorizationConfiguration authorizationConfiguration,
final GitVersion gitVersion,
final TimelineService timelineService,
final boolean supportsImpactAnalysis,
final VisualConfiguration visualConfiguration,
final TelemetryConfiguration telemetryConfiguration,
final TestsConfiguration testsConfiguration,
final DatahubConfiguration datahubConfiguration,
final SiblingGraphService siblingGraphService
) {
final AuthorizationConfiguration authorizationConfiguration, final GitVersion gitVersion,
final TimelineService timelineService, final boolean supportsImpactAnalysis,
final VisualConfiguration visualConfiguration, final TelemetryConfiguration telemetryConfiguration,
final TestsConfiguration testsConfiguration, final DatahubConfiguration datahubConfiguration,
final SiblingGraphService siblingGraphService, final GroupService groupService) {

this.entityClient = entityClient;
this.graphClient = graphClient;
Expand All @@ -382,6 +377,7 @@ public GmsGraphQLEngine(
this.timeseriesAspectService = timeseriesAspectService;
this.timelineService = timelineService;
this.nativeUserService = nativeUserService;
this.groupService = groupService;

this.ingestionConfiguration = Objects.requireNonNull(ingestionConfiguration);
this.authenticationConfiguration = Objects.requireNonNull(authenticationConfiguration);
Expand Down Expand Up @@ -699,9 +695,9 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("removeOwner", new RemoveOwnerResolver(entityService))
.dataFetcher("addLink", new AddLinkResolver(entityService))
.dataFetcher("removeLink", new RemoveLinkResolver(entityService))
.dataFetcher("addGroupMembers", new AddGroupMembersResolver(this.entityClient))
.dataFetcher("removeGroupMembers", new RemoveGroupMembersResolver(this.entityClient))
.dataFetcher("createGroup", new CreateGroupResolver(this.entityClient))
.dataFetcher("addGroupMembers", new AddGroupMembersResolver(this.groupService))
.dataFetcher("removeGroupMembers", new RemoveGroupMembersResolver(this.groupService))
.dataFetcher("createGroup", new CreateGroupResolver(this.groupService))
.dataFetcher("removeUser", new RemoveUserResolver(this.entityClient))
.dataFetcher("removeGroup", new RemoveGroupResolver(this.entityClient))
.dataFetcher("updateUserStatus", new UpdateUserStatusResolver(this.entityClient))
Expand All @@ -727,7 +723,8 @@ private void configureMutationResolvers(final RuntimeWiring.Builder builder) {
.dataFetcher("createGlossaryTerm", new CreateGlossaryTermResolver(this.entityClient))
.dataFetcher("createGlossaryNode", new CreateGlossaryNodeResolver(this.entityClient))
.dataFetcher("updateParentNode", new UpdateParentNodeResolver(entityService))
.dataFetcher("deleteGlossaryEntity", new DeleteGlossaryEntityResolver(this.entityClient, this.entityService))
.dataFetcher("deleteGlossaryEntity",
new DeleteGlossaryEntityResolver(this.entityClient, this.entityService))
.dataFetcher("updateName", new UpdateNameResolver(entityService))
.dataFetcher("addRelatedTerms", new AddRelatedTermsResolver(this.entityService))
.dataFetcher("removeRelatedTerms", new RemoveRelatedTermsResolver(this.entityService))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import java.util.Optional;
import javax.annotation.Nonnull;

import static com.linkedin.datahub.graphql.resolvers.AuthUtils.*;
import static com.linkedin.metadata.Constants.*;


public class AuthorizationUtils {

Expand Down Expand Up @@ -91,6 +94,15 @@ public static boolean canManageUserCredentials(@Nonnull QueryContext context) {
return isAuthorized(context, Optional.empty(), PoliciesConfig.MANAGE_USER_CREDENTIALS_PRIVILEGE);
}

public static boolean canEditGroupMembers(@Nonnull String groupUrnStr, @Nonnull QueryContext context) {
final DisjunctivePrivilegeGroup orPrivilegeGroups = new DisjunctivePrivilegeGroup(
ImmutableList.of(ALL_PRIVILEGES_GROUP,
new ConjunctivePrivilegeGroup(ImmutableList.of(PoliciesConfig.EDIT_GROUP_MEMBERS_PRIVILEGE.getType()))));

return AuthorizationUtils.isAuthorized(context.getAuthorizer(), context.getActorUrn(), CORP_GROUP_ENTITY_NAME,
groupUrnStr, orPrivilegeGroups);
}

public static boolean isAuthorized(
@Nonnull QueryContext context,
@Nonnull Optional<ResourceSpec> resourceSpec,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,148 +1,81 @@
package com.linkedin.datahub.graphql.resolvers.group;

import com.google.common.collect.ImmutableList;
import com.linkedin.common.UrnArray;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.group.GroupService;
import com.linkedin.common.Origin;
import com.linkedin.common.OriginType;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.authorization.ConjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.authorization.DisjunctivePrivilegeGroup;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLErrorCode;
import com.linkedin.datahub.graphql.exception.DataHubGraphQLException;
import com.linkedin.datahub.graphql.generated.AddGroupMembersInput;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.identity.GroupMembership;
import com.linkedin.metadata.authorization.PoliciesConfig;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import static com.linkedin.datahub.graphql.resolvers.AuthUtils.*;
import static com.linkedin.datahub.graphql.authorization.AuthorizationUtils.*;
import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
import static com.linkedin.metadata.Constants.*;


/**
* Resolver that adds a set of members to a group, if the user and group both exist.
* Resolver that adds a set of native members to a group, if the user and group both exist.
*/
public class AddGroupMembersResolver implements DataFetcher<CompletableFuture<Boolean>> {

private final EntityClient _entityClient;
private final GroupService _groupService;

public AddGroupMembersResolver(final EntityClient entityClient) {
_entityClient = entityClient;
public AddGroupMembersResolver(final GroupService groupService) {
_groupService = groupService;
}

@Override
public CompletableFuture<Boolean> get(final DataFetchingEnvironment environment) throws Exception {

final AddGroupMembersInput input = bindArgument(environment.getArgument("input"), AddGroupMembersInput.class);
final String groupUrnStr = input.getGroupUrn();
final QueryContext context = environment.getContext();
final Authentication authentication = context.getAuthentication();
Urn groupUrn = Urn.createFromString(groupUrnStr);

if (isAuthorized(input, context)) {
final String groupUrnStr = input.getGroupUrn();
final List<String> userUrnStrs = input.getUserUrns();

return CompletableFuture.runAsync(() -> {
if (!groupExists(groupUrnStr, context)) {
// The group doesn't exist.
throw new DataHubGraphQLException("Failed to add member to group. Group does not exist.", DataHubGraphQLErrorCode.NOT_FOUND);
}
})
.thenApply(ignored -> CompletableFuture.allOf(
userUrnStrs.stream().map(userUrnStr -> CompletableFuture.runAsync(() ->
addUserToGroup(userUrnStr, groupUrnStr, context)
)).toArray(CompletableFuture[]::new)))
.thenApply((ignored) -> Boolean.TRUE);
if (!canEditGroupMembers(groupUrnStr, context)) {
throw new AuthorizationException(
"Unauthorized to perform this action. Please contact your DataHub administrator.");
}
throw new AuthorizationException("Unauthorized to perform this action. Please contact your DataHub administrator.");
}

private boolean isAuthorized(AddGroupMembersInput input, QueryContext context) {
final DisjunctivePrivilegeGroup orPrivilegeGroups = new DisjunctivePrivilegeGroup(ImmutableList.of(
ALL_PRIVILEGES_GROUP,
new ConjunctivePrivilegeGroup(ImmutableList.of(PoliciesConfig.EDIT_GROUP_MEMBERS_PRIVILEGE.getType()))
));

return AuthorizationUtils.isAuthorized(
context.getAuthorizer(),
context.getActorUrn(),
CORP_GROUP_ENTITY_NAME,
input.getGroupUrn(),
orPrivilegeGroups);
}

private void addUserToGroup(final String userUrnStr, final String groupUrnStr, final QueryContext context) {
try {
// First, fetch user's group membership aspect.
Urn userUrn = Urn.createFromString(userUrnStr);
final EntityResponse entityResponse =
_entityClient.batchGetV2(CORP_USER_ENTITY_NAME, Collections.singleton(userUrn),
Collections.singleton(GROUP_MEMBERSHIP_ASPECT_NAME), context.getAuthentication()).get(userUrn);

GroupMembership groupMembership;
if (entityResponse == null || !entityResponse.getAspects().containsKey(GROUP_MEMBERSHIP_ASPECT_NAME)) {
// Verify the user exists
if (!userExists(userUrnStr, context)) {
throw new DataHubGraphQLException("Failed to add member to group. User does not exist.", DataHubGraphQLErrorCode.NOT_FOUND);
if (!_groupService.groupExists(groupUrn)) {
// The group doesn't exist.
throw new DataHubGraphQLException(
String.format("Failed to add members to group %s. Group does not exist.", groupUrnStr),
DataHubGraphQLErrorCode.NOT_FOUND);
}
return CompletableFuture.supplyAsync(() -> {
Origin groupOrigin = _groupService.getGroupOrigin(groupUrn);
if (groupOrigin == null || !groupOrigin.hasType()) {
try {
_groupService.migrateGroupMembershipToNativeGroupMembership(groupUrn, context.getActorUrn(),
context.getAuthentication());
} catch (Exception e) {
throw new RuntimeException(
String.format("Failed to migrate group membership for group %s when adding group members", groupUrnStr));
}
// If the user doesn't have one, create one.
groupMembership = new GroupMembership();
groupMembership.setGroups(new UrnArray());
} else {
groupMembership = new GroupMembership(entityResponse.getAspects()
.get(GROUP_MEMBERSHIP_ASPECT_NAME).getValue().data());
} else if (groupOrigin.getType() == OriginType.EXTERNAL) {
throw new RuntimeException(String.format(
"Group %s was ingested from an external provider and cannot have members manually added to it",
groupUrnStr));
}
// Handle the duplicate case.
final Urn groupUrn = Urn.createFromString(groupUrnStr);
groupMembership.getGroups().remove(groupUrn);
groupMembership.getGroups().add(groupUrn);

// Finally, create the MetadataChangeProposal.
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(Urn.createFromString(userUrnStr));
proposal.setEntityType(CORP_USER_ENTITY_NAME);
proposal.setAspectName(GROUP_MEMBERSHIP_ASPECT_NAME);
proposal.setAspect(GenericRecordUtils.serializeAspect(groupMembership));
proposal.setChangeType(ChangeType.UPSERT);
_entityClient.ingestProposal(proposal, context.getAuthentication());
} catch (Exception e) {
throw new RuntimeException("Failed to add member to group", e);
}
}

private boolean groupExists(final String groupUrnStr, final QueryContext context) {
try {
Urn groupUrn = Urn.createFromString(groupUrnStr);
final EntityResponse entityResponse = _entityClient.batchGetV2(
CORP_GROUP_ENTITY_NAME,
Collections.singleton(groupUrn),
Collections.singleton(CORP_GROUP_KEY_ASPECT_NAME),
context.getAuthentication()).get(groupUrn);
return entityResponse != null && entityResponse.getAspects().containsKey(CORP_GROUP_KEY_ASPECT_NAME);
} catch (Exception e) {
throw new DataHubGraphQLException("Failed to fetch group!", DataHubGraphQLErrorCode.SERVER_ERROR);
}
}

private boolean userExists(final String userUrnStr, final QueryContext context) {
try {
Urn userUrn = Urn.createFromString(userUrnStr);
final EntityResponse entityResponse = _entityClient.batchGetV2(
CORP_USER_ENTITY_NAME,
Collections.singleton(userUrn),
Collections.singleton(CORP_USER_KEY_ASPECT_NAME),
context.getAuthentication()).get(userUrn);
return entityResponse != null && entityResponse.getAspects().containsKey(CORP_USER_KEY_ASPECT_NAME);
} catch (Exception e) {
throw new DataHubGraphQLException("Failed to fetch user!", DataHubGraphQLErrorCode.SERVER_ERROR);
}
try {
// Add each user to the group
final List<Urn> userUrnList = input.getUserUrns().stream().map(UrnUtils::getUrn).collect(Collectors.toList());
userUrnList.forEach(userUrn -> _groupService.addUserToNativeGroup(userUrn, groupUrn, authentication));
return true;
} catch (Exception e) {
throw new RuntimeException(String.format("Failed to add group members to group %s", groupUrnStr));
}
});
}
}
}
Loading

0 comments on commit 1af378f

Please sign in to comment.