diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java index 7129998dce0b40..2537b48af833fa 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/factories/BootstrapManagerFactory.java @@ -12,6 +12,7 @@ import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep; import com.linkedin.metadata.boot.steps.IngestRootUserStep; import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep; +import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices; import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices; import com.linkedin.metadata.entity.AspectMigrationsDao; import com.linkedin.metadata.entity.EntityService; @@ -67,8 +68,10 @@ protected BootstrapManager createInstance() { final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep = new IngestDataPlatformInstancesStep(_entityService, _migrationsDao); final RestoreGlossaryIndices restoreGlossaryIndicesStep = new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry); + final RestoreDbtSiblingsIndices + restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry); final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService); return new BootstrapManager(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep, - ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep)); + ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep, restoreDbtSiblingsIndices)); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java new file mode 100644 index 00000000000000..374ff716ec2ccc --- /dev/null +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/RestoreDbtSiblingsIndices.java @@ -0,0 +1,173 @@ +package com.linkedin.metadata.boot.steps; + +import com.linkedin.common.AuditStamp; +import com.linkedin.common.urn.Urn; +import com.linkedin.data.DataMap; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.events.metadata.ChangeType; +import com.linkedin.metadata.Constants; +import com.linkedin.metadata.boot.BootstrapStep; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.key.DataHubUpgradeKey; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.query.ListUrnsResult; +import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.metadata.utils.GenericRecordUtils; +import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.upgrade.DataHubUpgradeRequest; +import com.linkedin.upgrade.DataHubUpgradeResult; +import java.net.URISyntaxException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import static com.linkedin.metadata.Constants.*; + + +@Slf4j +@RequiredArgsConstructor +public class RestoreDbtSiblingsIndices implements BootstrapStep { + private static final String VERSION = "0"; + private static final String UPGRADE_ID = "restore-dbt-siblings-indices"; + private static final Urn SIBLING_UPGRADE_URN = + EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(UPGRADE_ID), Constants.DATA_HUB_UPGRADE_ENTITY_NAME); + private static final Integer BATCH_SIZE = 1000; + private static final Integer SLEEP_SECONDS = 120; + + private final EntityService _entityService; + private final EntityRegistry _entityRegistry; + + @Override + public String name() { + return this.getClass().getSimpleName(); + } + + @Nonnull + @Override + public ExecutionMode getExecutionMode() { + return ExecutionMode.ASYNC; + } + + @Override + public void execute() throws Exception { + log.info("Attempting to run RestoreDbtSiblingsIndices upgrade.."); + log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS)); + + // Sleep to ensure deployment process finishes. + Thread.sleep(SLEEP_SECONDS * 1000); + + EntityResponse response = _entityService.getEntityV2( + Constants.DATA_HUB_UPGRADE_ENTITY_NAME, SIBLING_UPGRADE_URN, + Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME) + ); + if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) { + DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data(); + DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap); + if (request.hasVersion() && request.getVersion().equals(VERSION)) { + log.info("RestoreDbtSiblingsIndices has run before with this version. Skipping"); + return; + } + } + + log.info("Bootstrapping sibling aspects"); + + try { + final int rowCount = _entityService.listUrns(DATASET_ENTITY_NAME, 0, 10).getTotal(); + + log.info("Found {} dataset entities to attempt to bootstrap", rowCount); + + final AspectSpec datasetAspectSpec = + _entityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME).getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME); + final AuditStamp auditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); + + final DataHubUpgradeRequest upgradeRequest = new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(VERSION); + ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, upgradeRequest, auditStamp); + + int indexedCount = 0; + while (indexedCount < rowCount) { + getAndRestoreUpstreamLineageIndices(indexedCount, auditStamp, datasetAspectSpec); + indexedCount += BATCH_SIZE; + } + + final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis()); + ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, upgradeResult, auditStamp); + + log.info("Successfully restored sibling aspects"); + } catch (Exception e) { + log.error("Error when running the RestoreDbtSiblingsIndices Bootstrap Step", e); + _entityService.deleteUrn(SIBLING_UPGRADE_URN); + throw new RuntimeException("Error when running the RestoreDbtSiblingsIndices Bootstrap Step", e); + } + } + + private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp, AspectSpec upstreamAspectSpec) { + ListUrnsResult datasetUrnsResult = _entityService.listUrns(DATASET_ENTITY_NAME, start, BATCH_SIZE); + List datasetUrns = datasetUrnsResult.getEntities(); + log.info("Re-indexing upstreamLineage aspect from {} with batch size {}", start, BATCH_SIZE); + + if (datasetUrns.size() == 0) { + return; + } + + final Map upstreamLineageResponse; + try { + upstreamLineageResponse = + _entityService.getEntitiesV2(DATASET_ENTITY_NAME, new HashSet<>(datasetUrns), Collections.singleton(UPSTREAM_LINEAGE_ASPECT_NAME)); + } catch (URISyntaxException e) { + throw new RuntimeException(String.format("Error fetching upstream lineage history: %s", e.toString())); + } + + // Loop over datasets and produce changelog + for (Urn datasetUrn : datasetUrns) { + EntityResponse response = upstreamLineageResponse.get(datasetUrn); + if (response == null) { + log.warn("Dataset not in set of entity responses {}", datasetUrn); + continue; + } + UpstreamLineage upstreamLineage = getUpstreamLineage(response); + if (upstreamLineage == null) { + continue; + } + + _entityService.produceMetadataChangeLog( + datasetUrn, + DATASET_ENTITY_NAME, + UPSTREAM_LINEAGE_ASPECT_NAME, + upstreamAspectSpec, + null, + upstreamLineage, + null, + null, + auditStamp, + ChangeType.RESTATE); + } + } + + private UpstreamLineage getUpstreamLineage(EntityResponse entityResponse) { + EnvelopedAspectMap aspectMap = entityResponse.getAspects(); + if (!aspectMap.containsKey(UPSTREAM_LINEAGE_ASPECT_NAME)) { + return null; + } + + return new UpstreamLineage(aspectMap.get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data()); + } + + private void ingestUpgradeAspect(String aspectName, RecordTemplate aspect, AuditStamp auditStamp) { + final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal(); + upgradeProposal.setEntityUrn(SIBLING_UPGRADE_URN); + upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME); + upgradeProposal.setAspectName(aspectName); + upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect)); + upgradeProposal.setChangeType(ChangeType.UPSERT); + + _entityService.ingestProposal(upgradeProposal, auditStamp); + } +}