Skip to content

Commit

Permalink
feat(ingest) - add audit actor urn to auditStamp (datahub-project#5264)
Browse files Browse the repository at this point in the history
  • Loading branch information
neojunjie authored and PiotrSierkin-Ki committed Jul 26, 2022
1 parent dbcc201 commit 002bbc7
Show file tree
Hide file tree
Showing 8 changed files with 43 additions and 16 deletions.
1 change: 1 addition & 0 deletions metadata-service/openapi-servlet/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ apply plugin: 'java'

dependencies {

compile project(':metadata-service:auth-api')
compile project(':metadata-service:factories')

compile externalDependency.reflections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.Timer;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
Expand Down Expand Up @@ -104,9 +106,12 @@ public ResponseEntity<List<String>> postEntities(
@RequestBody @Nonnull List<UpsertAspectRequest> aspectRequests) {
log.info("INGEST PROPOSAL proposal: {}", aspectRequests);

Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();

List<Pair<String, Boolean>> responses = aspectRequests.stream()
.map(MappingUtil::mapToProposal)
.map(proposal -> MappingUtil.ingestProposal(proposal, _entityService, _objectMapper))
.map(proposal -> MappingUtil.ingestProposal(proposal, actorUrnStr, _entityService, _objectMapper))
.collect(Collectors.toList());
if (responses.stream().anyMatch(Pair::getSecond)) {
return ResponseEntity.status(HttpStatus.CREATED)
Expand Down Expand Up @@ -140,10 +145,14 @@ public ResponseEntity<List<RollbackRunResultDto>> deleteEntities(
List<UpsertAspectRequest> deleteRequests = entityUrns.stream()
.map(entityUrn -> MappingUtil.createStatusRemoval(entityUrn, _entityService))
.collect(Collectors.toList());

Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();

return ResponseEntity.ok(Collections.singletonList(RollbackRunResultDto.builder()
.rowsRolledBack(deleteRequests.stream()
.map(MappingUtil::mapToProposal)
.map(proposal -> MappingUtil.ingestProposal(proposal, _entityService, _objectMapper))
.map(proposal -> MappingUtil.ingestProposal(proposal, actorUrnStr, _entityService, _objectMapper))
.filter(Pair::getSecond)
.map(Pair::getFirst)
.map(urnString -> new AspectRowSummary().urn(urnString))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.datahubproject.openapi.platform.entities;

import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.util.Pair;
Expand Down Expand Up @@ -44,8 +46,11 @@ public ResponseEntity<List<String>> postEntities(
@RequestBody @Nonnull List<MetadataChangeProposal> metadataChangeProposals) {
log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposals);

Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();

List<Pair<String, Boolean>> responses = metadataChangeProposals.stream()
.map(proposal -> MappingUtil.ingestProposal(proposal, _entityService, _objectMapper))
.map(proposal -> MappingUtil.ingestProposal(proposal, actorUrnStr, _entityService, _objectMapper))
.collect(Collectors.toList());
if (responses.stream().anyMatch(Pair::getSecond)) {
return ResponseEntity.status(HttpStatus.CREATED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.Aspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.entity.RollbackRunResult;
import com.linkedin.metadata.entity.ValidationException;
Expand Down Expand Up @@ -246,13 +245,13 @@ public static GenericAspect convertGenericAspect(@Nonnull io.datahubproject.open
}
}

public static Pair<String, Boolean> ingestProposal(MetadataChangeProposal metadataChangeProposal, EntityService entityService,
public static Pair<String, Boolean> ingestProposal(MetadataChangeProposal metadataChangeProposal, String actorUrn, EntityService entityService,
ObjectMapper objectMapper) {
// TODO: Use the actor present in the IC.
Timer.Context context = MetricUtils.timer("postEntity").time();
final com.linkedin.common.AuditStamp auditStamp =
new com.linkedin.common.AuditStamp().setTime(System.currentTimeMillis())
.setActor(UrnUtils.getUrn(Constants.UNKNOWN_ACTOR));
.setActor(UrnUtils.getUrn(actorUrn));
io.datahubproject.openapi.generated.KafkaAuditHeader auditHeader = metadataChangeProposal.getAuditHeader();

com.linkedin.mxe.MetadataChangeProposal serviceProposal =
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
package entities;

import com.datahub.authentication.Actor;
import com.datahub.authentication.ActorType;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.event.EventProducer;
Expand Down Expand Up @@ -34,6 +38,7 @@
import org.testng.annotations.Test;

import static com.linkedin.metadata.Constants.*;
import static org.mockito.Mockito.when;


public class EntitiesControllerTest {
Expand All @@ -53,6 +58,9 @@ public void setup()
EventProducer mockEntityEventProducer = Mockito.mock(EventProducer.class);
MockEntityService mockEntityService = new MockEntityService(aspectDao, mockEntityEventProducer, mockEntityRegistry);
_entitiesController = new EntitiesController(mockEntityService, new ObjectMapper());
Authentication authentication = Mockito.mock(Authentication.class);
when(authentication.getActor()).thenReturn(new Actor(ActorType.USER, "datahub"));
AuthenticationContext.setAuthentication(authentication);
}

EntitiesController _entitiesController;
Expand Down
1 change: 1 addition & 0 deletions metadata-service/restli-servlet-impl/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
}

compile project(':metadata-service:restli-api')
compile project(':metadata-service:auth-api')
compile project(path: ':metadata-service:restli-api', configuration: 'dataTemplate')
compile project(':li-utils')
compile project(':metadata-models')
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.linkedin.metadata.resources.entity;

import com.codahale.metrics.MetricRegistry;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.linkedin.aspect.GetTimeseriesAspectValuesResponse;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.EnvelopedAspectArray;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.entity.EntityService;
Expand Down Expand Up @@ -124,9 +125,10 @@ public Task<String> ingestProposal(
@ActionParam(PARAM_PROPOSAL) @Nonnull MetadataChangeProposal metadataChangeProposal) throws URISyntaxException {
log.info("INGEST PROPOSAL proposal: {}", metadataChangeProposal);

// TODO: Use the actor present in the IC.
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));

final List<MetadataChangeProposal> additionalChanges =
AspectUtils.getAdditionalChanges(metadataChangeProposal, _entityService);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
package com.linkedin.metadata.resources.entity;

import com.codahale.metrics.MetricRegistry;
import com.datahub.authentication.Authentication;
import com.datahub.authentication.AuthenticationContext;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.LongMap;
import com.linkedin.data.template.StringArray;
import com.linkedin.entity.Entity;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.browse.BrowseResult;
import com.linkedin.metadata.entity.DeleteEntityService;
import com.linkedin.metadata.entity.EntityService;
Expand Down Expand Up @@ -196,9 +197,9 @@ public Task<Void> ingest(@ActionParam(PARAM_ENTITY) @Nonnull Entity entity,

SystemMetadata systemMetadata = populateDefaultFieldsIfEmpty(providedSystemMetadata);

// TODO Correctly audit ingestions.
final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));

// variables referenced in lambdas are required to be final
final SystemMetadata finalSystemMetadata = systemMetadata;
Expand All @@ -222,8 +223,9 @@ public Task<Void> batchIngest(@ActionParam(PARAM_ENTITIES) @Nonnull Entity[] ent
}
}

final AuditStamp auditStamp =
new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(Constants.UNKNOWN_ACTOR));
Authentication authentication = AuthenticationContext.getAuthentication();
String actorUrnStr = authentication.getActor().toUrnStr();
final AuditStamp auditStamp = new AuditStamp().setTime(_clock.millis()).setActor(Urn.createFromString(actorUrnStr));

if (systemMetadataList == null) {
systemMetadataList = new SystemMetadata[entities.length];
Expand Down

0 comments on commit 002bbc7

Please sign in to comment.