Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(mae/mcl): Make ingestAspect produce both MCLs and MAEs #3737

Merged
merged 5 commits into from
Jan 5, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,22 +1,15 @@
package com.linkedin.metadata.dao.producer;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.common.urn.Urn;
import com.linkedin.metadata.EventUtils;
import com.linkedin.metadata.dao.exception.ModelConversionException;
import com.linkedin.metadata.event.EntityEventProducer;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.MetadataAuditEvent;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.mxe.TopicConvention;
import com.linkedin.mxe.TopicConventionImpl;
import com.linkedin.mxe.Topics;
import io.opentelemetry.extension.annotations.WithSpan;
import java.io.IOException;
import java.util.Arrays;
import java.util.Optional;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -64,58 +57,6 @@ public EntityKafkaMetadataEventProducer(@Nonnull final Producer<String, ? extend
_topicConvention = topicConvention;
}

@Override
@WithSpan
public void produceMetadataAuditEvent(@Nonnull Urn urn, @Nullable Snapshot oldSnapshot, @Nonnull Snapshot newSnapshot,
@Nullable SystemMetadata oldSystemMetadata, @Nullable SystemMetadata newSystemMetadata,
MetadataAuditOperation operation) {
final MetadataAuditEvent metadataAuditEvent = new MetadataAuditEvent();
if (newSnapshot != null) {
metadataAuditEvent.setNewSnapshot(newSnapshot);
}
if (oldSnapshot != null) {
metadataAuditEvent.setOldSnapshot(oldSnapshot);
}
if (oldSystemMetadata != null) {
metadataAuditEvent.setOldSystemMetadata(oldSystemMetadata);
}
if (newSystemMetadata != null) {
metadataAuditEvent.setNewSystemMetadata(newSystemMetadata);
}
if (operation != null) {
metadataAuditEvent.setOperation(operation);
}

GenericRecord record;
try {
log.debug(String.format("Converting Pegasus snapshot to Avro snapshot urn %s\nMetadataAuditEvent: %s",
urn,
metadataAuditEvent.toString()));
record = EventUtils.pegasusToAvroMAE(metadataAuditEvent);
} catch (IOException e) {
log.error(String.format("Failed to convert Pegasus MAE to Avro: %s", metadataAuditEvent.toString()), e);
throw new ModelConversionException("Failed to convert Pegasus MAE to Avro", e);
}

if (_callback.isPresent()) {
_producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
_callback.get());
} else {
_producer.send(new ProducerRecord(_topicConvention.getMetadataAuditEventTopicName(), urn.toString(), record),
(metadata, e) -> {
if (e != null) {
log.error(String.format("Failed to emit MAE for entity with urn %s", urn), e);
} else {
log.debug(String.format("Successfully emitted MAE for entity with urn %s at offset %s, partition %s, topic %s",
urn,
metadata.offset(),
metadata.partition(),
metadata.topic()));
}
});
}
}

@Override
@WithSpan
public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec aspectSpec,
Expand Down Expand Up @@ -152,9 +93,4 @@ record = EventUtils.pegasusToAvroMCL(metadataChangeLog);
});
}
}

@VisibleForTesting
static boolean isValidAspectSpecificTopic(@Nonnull String topic) {
return Arrays.stream(Topics.class.getFields()).anyMatch(field -> field.getName().equals(topic));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@
* of a given aspect + 1.
*
* Note that currently, implementations of this interface are responsible for producing Metadata Audit Events on
* ingestion using {@link #produceMetadataAuditEvent(
*Urn, RecordTemplate, RecordTemplate, SystemMetadata, SystemMetadata, MetadataAuditOperation)}.
* ingestion using {@link #produceMetadataChangeLog(Urn, String, String, AspectSpec, RecordTemplate, RecordTemplate,
* SystemMetadata, SystemMetadata, ChangeType)}.
*
* TODO: Consider whether we can abstract away virtual versioning semantics to subclasses of this class.
* TODO: Extract out a nested 'AspectService'.
Expand Down Expand Up @@ -199,6 +199,8 @@ public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String

final RecordTemplate oldValue = result.getOldValue();
final RecordTemplate updatedValue = result.getNewValue();
final SystemMetadata oldSystemMetadata = result.getOldSystemMetadata();
final SystemMetadata updatedSystemMetadata = result.getNewSystemMetadata();

// Apply retention policies asynchronously if there was an update to existing aspect value
if (oldValue != updatedValue && oldValue != null && retentionService != null) {
Expand All @@ -208,15 +210,18 @@ public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String

// Produce MAE after a successful update
if (oldValue != updatedValue || _alwaysEmitAuditEvent) {
log.debug(String.format("Producing MetadataAuditEvent for ingested aspect %s, urn %s", aspectName, urn));
Timer.Context produceMAETimer = MetricUtils.timer(this.getClass(), "produceMAE").time();
if (aspectName.equals(getKeyAspectName(urn))) {
produceMetadataAuditEventForKey(urn, result.getNewSystemMetadata());
} else {
produceMetadataAuditEvent(urn, oldValue, updatedValue, result.getOldSystemMetadata(),
result.getNewSystemMetadata(), MetadataAuditOperation.UPDATE);
log.debug(String.format("Producing MetadataChangeLog for ingested aspect %s, urn %s", aspectName, urn));
String entityName = urnToEntityName(urn);
EntitySpec entitySpec = getEntityRegistry().getEntitySpec(entityName);
AspectSpec aspectSpec = entitySpec.getAspectSpec(aspectName);
if (aspectSpec == null) {
throw new RuntimeException(String.format("Unknown aspect %s for entity %s", aspectName, entityName));
}
produceMAETimer.stop();

Timer.Context produceMCLTimer = MetricUtils.timer(this.getClass(), "produceMCL").time();
produceMetadataChangeLog(urn, entityName, aspectName, aspectSpec, oldValue, updatedValue, oldSystemMetadata,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we produce to BOTH? (And deprecate consumption via MAE processor?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added produce to both and removed MAE consumer

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note the produce calls are async under the hood so no impact on latency

updatedSystemMetadata, ChangeType.UPSERT);
produceMCLTimer.stop();
} else {
log.debug(
String.format("Skipped producing MetadataAuditEvent for ingested aspect %s, urn %s. Aspect has not changed.",
Expand All @@ -235,7 +240,8 @@ public RecordTemplate ingestAspect(@Nonnull final Urn urn, @Nonnull final String
return ingestAspect(urn, aspectName, newValue, auditStamp, generatedSystemMetadata);
}

public IngestProposalResult ingestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal, AuditStamp auditStamp) {
public IngestProposalResult ingestProposal(@Nonnull MetadataChangeProposal metadataChangeProposal,
AuditStamp auditStamp) {

log.debug("entity type = {}", metadataChangeProposal.getEntityType());
EntitySpec entitySpec = getEntityRegistry().getEntitySpec(metadataChangeProposal.getEntityType());
Expand Down Expand Up @@ -388,26 +394,6 @@ public Map<Urn, Entity> getEntities(@Nonnull final Set<Urn> urns, @Nonnull Set<S
.collect(Collectors.toMap(Map.Entry::getKey, entry -> toEntity(entry.getValue())));
}

/**
* Produce metadata audit event and push.
*
* @param urn Urn to push
* @param oldAspectValue Value of aspect before the update.
* @param newAspectValue Value of aspect after the update
*/
public void produceMetadataAuditEvent(@Nonnull final Urn urn, @Nullable final RecordTemplate oldAspectValue,
@Nullable final RecordTemplate newAspectValue, @Nullable final SystemMetadata oldSystemMetadata,
@Nullable final SystemMetadata newSystemMetadata, @Nullable final MetadataAuditOperation operation) {

final Snapshot newSnapshot = buildSnapshot(urn, newAspectValue);
Snapshot oldSnapshot = null;
if (oldAspectValue != null) {
oldSnapshot = buildSnapshot(urn, oldAspectValue);
}

_producer.produceMetadataAuditEvent(urn, oldSnapshot, newSnapshot, oldSystemMetadata, newSystemMetadata, operation);
}

/**
* Produces a {@link com.linkedin.mxe.MetadataChangeLog} from a
* new & previous aspect.
Expand Down Expand Up @@ -445,14 +431,6 @@ public void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull String ent
produceMetadataChangeLog(urn, aspectSpec, metadataChangeLog);
}

public void produceMetadataAuditEventForKey(@Nonnull final Urn urn,
@Nullable final SystemMetadata newSystemMetadata) {

final Snapshot newSnapshot = buildKeySnapshot(urn);

_producer.produceMetadataAuditEvent(urn, null, newSnapshot, null, newSystemMetadata, MetadataAuditOperation.UPDATE);
}

public RecordTemplate getLatestAspect(@Nonnull final Urn urn, @Nonnull final String aspectName) {
log.debug(String.format("Invoked getLatestAspect with urn %s, aspect %s", urn, aspectName));
return getAspect(urn, aspectName, ASPECT_LATEST_VERSION);
Expand Down Expand Up @@ -566,11 +544,6 @@ public Snapshot buildSnapshot(@Nonnull final Urn urn, @Nonnull final RecordTempl
toSnapshotRecord(urn, ImmutableList.of(toAspectUnion(urn, keyAspectValue), toAspectUnion(urn, aspectValue))));
}

protected Snapshot buildKeySnapshot(@Nonnull final Urn urn) {
final RecordTemplate keyAspectValue = buildKeyAspect(urn);
return toSnapshotUnion(toSnapshotRecord(urn, ImmutableList.of(toAspectUnion(urn, keyAspectValue))));
}

protected RecordTemplate buildKeyAspect(@Nonnull final Urn urn) {
final EntitySpec spec = _entityRegistry.getEntitySpec(urnToEntityName(urn));
final AspectSpec keySpec = spec.getKeyAspectSpec();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,16 @@
package com.linkedin.metadata.event;

import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.snapshot.Snapshot;
import com.linkedin.mxe.MetadataChangeLog;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.SystemMetadata;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;


/**
* Interface implemented by producers of {@link com.linkedin.mxe.MetadataAuditEvent}s.
*/
public interface EntityEventProducer {

/**
* Produces a {@link com.linkedin.mxe.MetadataAuditEvent} from a
* new & previous Entity {@link Snapshot}.
* @param urn the urn associated with the entity changed
* @param oldSnapshot a {@link RecordTemplate} corresponding to the old snapshot.
* @param newSnapshot a {@link RecordTemplate} corresponding to the new snapshot.
* @param oldSystemMetadata
* @param newSystemMetadata
*/
void produceMetadataAuditEvent(
@Nonnull final Urn urn,
@Nullable final Snapshot oldSnapshot,
@Nonnull final Snapshot newSnapshot,
@Nullable SystemMetadata oldSystemMetadata,
@Nullable SystemMetadata newSystemMetadata,
MetadataAuditOperation operation
);

/**
* Produces a {@link com.linkedin.mxe.MetadataChangeLog} from a
* new & previous aspect.
Expand All @@ -42,9 +19,6 @@ void produceMetadataAuditEvent(
* @param aspectSpec aspect spec of the aspect being updated
* @param metadataChangeLog metadata change log to push into MCL kafka topic
*/
void produceMetadataChangeLog(
@Nonnull final Urn urn,
@Nonnull AspectSpec aspectSpec,
@Nonnull final MetadataChangeLog metadataChangeLog
);
void produceMetadataChangeLog(@Nonnull final Urn urn, @Nonnull AspectSpec aspectSpec,
@Nonnull final MetadataChangeLog metadataChangeLog);
}
Loading