Skip to content

Commit

Permalink
feat(siblings): index sibling aspects for historical dbt metadata (da…
Browse files Browse the repository at this point in the history
…tahub-project#5291)

* fixing dbt platform issues

* starting sibling restore index job work

* finish restore indices

* migrating to list urns

* rename constant

* disaster recovery
  • Loading branch information
gabe-lyons authored and alexey-kravtsov committed Jul 8, 2022
1 parent 82b7c03 commit a0a1cbe
Show file tree
Hide file tree
Showing 2 changed files with 177 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.linkedin.metadata.boot.steps.IngestRetentionPoliciesStep;
import com.linkedin.metadata.boot.steps.IngestRootUserStep;
import com.linkedin.metadata.boot.steps.RemoveClientIdAspectStep;
import com.linkedin.metadata.boot.steps.RestoreDbtSiblingsIndices;
import com.linkedin.metadata.boot.steps.RestoreGlossaryIndices;
import com.linkedin.metadata.entity.AspectMigrationsDao;
import com.linkedin.metadata.entity.EntityService;
Expand Down Expand Up @@ -67,8 +68,10 @@ protected BootstrapManager createInstance() {
final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep =
new IngestDataPlatformInstancesStep(_entityService, _migrationsDao);
final RestoreGlossaryIndices restoreGlossaryIndicesStep = new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry);
final RestoreDbtSiblingsIndices
restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry);
final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService);
return new BootstrapManager(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep,
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep));
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep, restoreDbtSiblingsIndices));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package com.linkedin.metadata.boot.steps;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.dataset.UpstreamLineage;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeRequest;
import com.linkedin.upgrade.DataHubUpgradeResult;
import java.net.URISyntaxException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import javax.annotation.Nonnull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.*;


@Slf4j
@RequiredArgsConstructor
public class RestoreDbtSiblingsIndices implements BootstrapStep {
private static final String VERSION = "0";
private static final String UPGRADE_ID = "restore-dbt-siblings-indices";
private static final Urn SIBLING_UPGRADE_URN =
EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(UPGRADE_ID), Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
private static final Integer BATCH_SIZE = 1000;
private static final Integer SLEEP_SECONDS = 120;

private final EntityService _entityService;
private final EntityRegistry _entityRegistry;

@Override
public String name() {
return this.getClass().getSimpleName();
}

@Nonnull
@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNC;
}

@Override
public void execute() throws Exception {
log.info("Attempting to run RestoreDbtSiblingsIndices upgrade..");
log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS));

// Sleep to ensure deployment process finishes.
Thread.sleep(SLEEP_SECONDS * 1000);

EntityResponse response = _entityService.getEntityV2(
Constants.DATA_HUB_UPGRADE_ENTITY_NAME, SIBLING_UPGRADE_URN,
Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)
);
if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) {
DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data();
DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap);
if (request.hasVersion() && request.getVersion().equals(VERSION)) {
log.info("RestoreDbtSiblingsIndices has run before with this version. Skipping");
return;
}
}

log.info("Bootstrapping sibling aspects");

try {
final int rowCount = _entityService.listUrns(DATASET_ENTITY_NAME, 0, 10).getTotal();

log.info("Found {} dataset entities to attempt to bootstrap", rowCount);

final AspectSpec datasetAspectSpec =
_entityRegistry.getEntitySpec(Constants.DATASET_ENTITY_NAME).getAspectSpec(Constants.UPSTREAM_LINEAGE_ASPECT_NAME);
final AuditStamp auditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());

final DataHubUpgradeRequest upgradeRequest = new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(VERSION);
ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, upgradeRequest, auditStamp);

int indexedCount = 0;
while (indexedCount < rowCount) {
getAndRestoreUpstreamLineageIndices(indexedCount, auditStamp, datasetAspectSpec);
indexedCount += BATCH_SIZE;
}

final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());
ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, upgradeResult, auditStamp);

log.info("Successfully restored sibling aspects");
} catch (Exception e) {
log.error("Error when running the RestoreDbtSiblingsIndices Bootstrap Step", e);
_entityService.deleteUrn(SIBLING_UPGRADE_URN);
throw new RuntimeException("Error when running the RestoreDbtSiblingsIndices Bootstrap Step", e);
}
}

private void getAndRestoreUpstreamLineageIndices(int start, AuditStamp auditStamp, AspectSpec upstreamAspectSpec) {
ListUrnsResult datasetUrnsResult = _entityService.listUrns(DATASET_ENTITY_NAME, start, BATCH_SIZE);
List<Urn> datasetUrns = datasetUrnsResult.getEntities();
log.info("Re-indexing upstreamLineage aspect from {} with batch size {}", start, BATCH_SIZE);

if (datasetUrns.size() == 0) {
return;
}

final Map<Urn, EntityResponse> upstreamLineageResponse;
try {
upstreamLineageResponse =
_entityService.getEntitiesV2(DATASET_ENTITY_NAME, new HashSet<>(datasetUrns), Collections.singleton(UPSTREAM_LINEAGE_ASPECT_NAME));
} catch (URISyntaxException e) {
throw new RuntimeException(String.format("Error fetching upstream lineage history: %s", e.toString()));
}

// Loop over datasets and produce changelog
for (Urn datasetUrn : datasetUrns) {
EntityResponse response = upstreamLineageResponse.get(datasetUrn);
if (response == null) {
log.warn("Dataset not in set of entity responses {}", datasetUrn);
continue;
}
UpstreamLineage upstreamLineage = getUpstreamLineage(response);
if (upstreamLineage == null) {
continue;
}

_entityService.produceMetadataChangeLog(
datasetUrn,
DATASET_ENTITY_NAME,
UPSTREAM_LINEAGE_ASPECT_NAME,
upstreamAspectSpec,
null,
upstreamLineage,
null,
null,
auditStamp,
ChangeType.RESTATE);
}
}

private UpstreamLineage getUpstreamLineage(EntityResponse entityResponse) {
EnvelopedAspectMap aspectMap = entityResponse.getAspects();
if (!aspectMap.containsKey(UPSTREAM_LINEAGE_ASPECT_NAME)) {
return null;
}

return new UpstreamLineage(aspectMap.get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data());
}

private void ingestUpgradeAspect(String aspectName, RecordTemplate aspect, AuditStamp auditStamp) {
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(SIBLING_UPGRADE_URN);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(aspectName);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect));
upgradeProposal.setChangeType(ChangeType.UPSERT);

_entityService.ingestProposal(upgradeProposal, auditStamp);
}
}

0 comments on commit a0a1cbe

Please sign in to comment.