diff --git a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java index f656b1ab66b0d3..d8808c5b5abeb8 100644 --- a/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java +++ b/metadata-jobs/mae-consumer/src/main/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHook.java @@ -1,7 +1,9 @@ package com.linkedin.metadata.kafka.hook.siblings; +import com.datahub.authentication.Authentication; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import com.linkedin.common.AuditStamp; import com.linkedin.common.Siblings; import com.linkedin.common.SubTypes; @@ -10,21 +12,25 @@ import com.linkedin.common.urn.Urn; import com.linkedin.dataset.UpstreamArray; import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.client.RestliEntityClient; import com.linkedin.events.metadata.ChangeType; -import com.linkedin.gms.factory.entity.EntityServiceFactory; +import com.linkedin.gms.factory.auth.SystemAuthenticationFactory; +import com.linkedin.gms.factory.entity.RestliEntityClientFactory; import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory; -import com.linkedin.gms.factory.search.SearchServiceFactory; -import com.linkedin.metadata.entity.EntityService; +import com.linkedin.gms.factory.search.EntitySearchServiceFactory; +import com.linkedin.metadata.Constants; import com.linkedin.metadata.kafka.hook.MetadataChangeLogHook; import com.linkedin.metadata.models.EntitySpec; import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.SearchResult; -import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.utils.EntityKeyUtils; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; +import com.linkedin.r2.RemoteInvocationException; import java.net.URISyntaxException; import java.util.List; import java.util.stream.Collectors; @@ -53,7 +59,7 @@ @Slf4j @Component @Singleton -@Import({EntityRegistryFactory.class, EntityServiceFactory.class, SearchServiceFactory.class}) +@Import({EntityRegistryFactory.class, RestliEntityClientFactory.class, EntitySearchServiceFactory.class, SystemAuthenticationFactory.class}) public class SiblingAssociationHook implements MetadataChangeLogHook { public static final String SIBLING_ASSOCIATION_SYSTEM_ACTOR = "urn:li:corpuser:__datahub_system_sibling_hook"; @@ -61,18 +67,21 @@ public class SiblingAssociationHook implements MetadataChangeLogHook { public static final String SOURCE_SUBTYPE = "source"; private final EntityRegistry _entityRegistry; - private final EntityService _entityService; - private final SearchService _searchService; + private final RestliEntityClient _entityClient; + private final EntitySearchService _searchService; + private final Authentication _systemAuthentication; @Autowired public SiblingAssociationHook( @Nonnull final EntityRegistry entityRegistry, - @Nonnull final EntityService entityService, - @Nonnull final SearchService searchService + @Nonnull final RestliEntityClient entityClient, + @Nonnull final EntitySearchService searchService, + @Nonnull final Authentication systemAuthentication ) { _entityRegistry = entityRegistry; - _entityService = entityService; + _entityClient = entityClient; _searchService = searchService; + _systemAuthentication = systemAuthentication; } @Value("${siblings.enabled:false}") @@ -123,8 +132,7 @@ private void handleEntityKeyEvent(DatasetUrn datasetUrn) { entitiesWithYouAsSiblingFilter, null, 0, - 10, - null); + 10); // we have a match of an entity with you as a sibling, associate yourself back searchResult.getEntities().forEach(entity -> { @@ -146,21 +154,12 @@ private void handleDbtDatasetEvent(MetadataChangeLog event, DatasetUrn datasetUr if (event.getAspectName().equals(UPSTREAM_LINEAGE_ASPECT_NAME)) { upstreamLineage = getUpstreamLineageFromEvent(event); - subTypesAspectOfEntity = - (SubTypes) _entityService.getLatestAspect( - datasetUrn, - SUB_TYPES_ASPECT_NAME - ); - + subTypesAspectOfEntity = getSubtypesFromEntityClient(datasetUrn); } if (event.getAspectName().equals(SUB_TYPES_ASPECT_NAME)) { subTypesAspectOfEntity = getSubtypesFromEvent(event); - upstreamLineage = - (UpstreamLineage) _entityService.getLatestAspect( - datasetUrn, - UPSTREAM_LINEAGE_ASPECT_NAME - ); + upstreamLineage = getUpstreamLineageFromEntityClient(datasetUrn); } if ( @@ -195,10 +194,8 @@ private void handleSourceDatasetEvent(MetadataChangeLog event, DatasetUrn source } private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) { - Siblings existingDbtSiblingAspect = - (Siblings) _entityService.getLatestAspect(dbtUrn, SIBLINGS_ASPECT_NAME); - Siblings existingSourceSiblingAspect = - (Siblings) _entityService.getLatestAspect(sourceUrn, SIBLINGS_ASPECT_NAME); + Siblings existingDbtSiblingAspect = getSiblingsFromEntityClient(dbtUrn); + Siblings existingSourceSiblingAspect = getSiblingsFromEntityClient(sourceUrn); log.info("Associating {} and {} as siblings.", dbtUrn.toString(), sourceUrn.toString()); @@ -228,7 +225,12 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) { dbtSiblingProposal.setChangeType(ChangeType.UPSERT); dbtSiblingProposal.setEntityUrn(dbtUrn); - _entityService.ingestProposal(dbtSiblingProposal, auditStamp); + try { + _entityClient.ingestProposal(dbtSiblingProposal, _systemAuthentication); + } catch (RemoteInvocationException e) { + log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString()); + throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e); + } // set dbt as a sibling of source @@ -245,7 +247,14 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) { // clean up any references to stale siblings that have been deleted List filteredNewSiblingsArray = - newSiblingsUrnArray.stream().filter(urn -> _entityService.exists(urn)).collect(Collectors.toList()); + newSiblingsUrnArray.stream().filter(urn -> { + try { + return _entityClient.exists(urn, _systemAuthentication); + } catch (RemoteInvocationException e) { + log.error("Error while checking existence of {}: {}", urn.toString(), e.toString()); + throw new RuntimeException("Error checking existence. Skipping processing.", e); + } + }).collect(Collectors.toList()); sourceSiblingAspect.setSiblings(new UrnArray(filteredNewSiblingsArray)); sourceSiblingAspect.setPrimary(false); @@ -259,7 +268,12 @@ private void setSiblingsAndSoftDeleteSibling(Urn dbtUrn, Urn sourceUrn) { sourceSiblingProposal.setChangeType(ChangeType.UPSERT); sourceSiblingProposal.setEntityUrn(sourceUrn); - _entityService.ingestProposal(sourceSiblingProposal, auditStamp); + try { + _entityClient.ingestProposal(sourceSiblingProposal, _systemAuthentication); + } catch (RemoteInvocationException e) { + log.error("Error while associating {} with {}: {}", dbtUrn.toString(), sourceUrn.toString(), e.toString()); + throw new RuntimeException("Error ingesting sibling proposal. Skipping processing.", e); + } } @@ -362,4 +376,67 @@ private Filter createFilterForEntitiesWithYouAsSibling( return filter; } + private SubTypes getSubtypesFromEntityClient( + final Urn urn + ) { + try { + EntityResponse entityResponse = _entityClient.getV2( + DATASET_ENTITY_NAME, + urn, + ImmutableSet.of(SUB_TYPES_ASPECT_NAME), + _systemAuthentication + ); + + if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SUB_TYPES_ASPECT_NAME)) { + return new SubTypes(entityResponse.getAspects().get(Constants.SUB_TYPES_ASPECT_NAME).getValue().data()); + } else { + return null; + } + } catch (RemoteInvocationException | URISyntaxException e) { + throw new RuntimeException("Failed to retrieve Subtypes", e); + } + } + + private UpstreamLineage getUpstreamLineageFromEntityClient( + final Urn urn + ) { + try { + EntityResponse entityResponse = _entityClient.getV2( + DATASET_ENTITY_NAME, + urn, + ImmutableSet.of(UPSTREAM_LINEAGE_ASPECT_NAME), + _systemAuthentication + ); + + if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.UPSTREAM_LINEAGE_ASPECT_NAME)) { + return new UpstreamLineage(entityResponse.getAspects().get(Constants.UPSTREAM_LINEAGE_ASPECT_NAME).getValue().data()); + } else { + return null; + } + } catch (RemoteInvocationException | URISyntaxException e) { + throw new RuntimeException("Failed to retrieve UpstreamLineage", e); + } + } + + private Siblings getSiblingsFromEntityClient( + final Urn urn + ) { + try { + EntityResponse entityResponse = _entityClient.getV2( + DATASET_ENTITY_NAME, + urn, + ImmutableSet.of(SIBLINGS_ASPECT_NAME), + _systemAuthentication + ); + + if (entityResponse != null && entityResponse.hasAspects() && entityResponse.getAspects().containsKey(Constants.SIBLINGS_ASPECT_NAME)) { + return new Siblings(entityResponse.getAspects().get(Constants.SIBLINGS_ASPECT_NAME).getValue().data()); + } else { + return null; + } + } catch (RemoteInvocationException | URISyntaxException e) { + throw new RuntimeException("Failed to retrieve UpstreamLineage", e); + } + } + } diff --git a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java index 190cd922ee15ea..3ab8175115646d 100644 --- a/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java +++ b/metadata-jobs/mae-consumer/src/test/java/com/linkedin/metadata/kafka/hook/siblings/SiblingAssociationHookTest.java @@ -1,7 +1,8 @@ package com.linkedin.metadata.kafka.hook.siblings; +import com.datahub.authentication.Authentication; import com.google.common.collect.ImmutableList; -import com.linkedin.common.AuditStamp; +import com.google.common.collect.ImmutableSet; import com.linkedin.common.FabricType; import com.linkedin.common.Siblings; import com.linkedin.common.SubTypes; @@ -14,15 +15,19 @@ import com.linkedin.dataset.Upstream; import com.linkedin.dataset.UpstreamArray; import com.linkedin.dataset.UpstreamLineage; +import com.linkedin.entity.Aspect; +import com.linkedin.entity.EntityResponse; +import com.linkedin.entity.EnvelopedAspect; +import com.linkedin.entity.EnvelopedAspectMap; +import com.linkedin.entity.client.RestliEntityClient; import com.linkedin.events.metadata.ChangeType; -import com.linkedin.metadata.entity.EntityService; import com.linkedin.metadata.key.DatasetKey; import com.linkedin.metadata.models.registry.ConfigEntityRegistry; import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.search.EntitySearchService; import com.linkedin.metadata.search.SearchEntity; import com.linkedin.metadata.search.SearchEntityArray; import com.linkedin.metadata.search.SearchResult; -import com.linkedin.metadata.search.SearchService; import com.linkedin.metadata.utils.GenericRecordUtils; import com.linkedin.mxe.MetadataChangeLog; import com.linkedin.mxe.MetadataChangeProposal; @@ -36,16 +41,18 @@ public class SiblingAssociationHookTest { private SiblingAssociationHook _siblingAssociationHook; - EntityService _mockEntityService; - SearchService _mockSearchService; + RestliEntityClient _mockEntityClient; + EntitySearchService _mockSearchService; + Authentication _mockAuthentication; @BeforeMethod public void setupTest() { EntityRegistry registry = new ConfigEntityRegistry( SiblingAssociationHookTest.class.getClassLoader().getResourceAsStream("test-entity-registry-siblings.yml")); - _mockEntityService = Mockito.mock(EntityService.class); - _mockSearchService = Mockito.mock(SearchService.class); - _siblingAssociationHook = new SiblingAssociationHook(registry, _mockEntityService, _mockSearchService); + _mockEntityClient = Mockito.mock(RestliEntityClient.class); + _mockSearchService = Mockito.mock(EntitySearchService.class); + _mockAuthentication = Mockito.mock(Authentication.class); + _siblingAssociationHook = new SiblingAssociationHook(registry, _mockEntityClient, _mockSearchService, _mockAuthentication); _siblingAssociationHook.setEnabled(true); } @@ -53,14 +60,21 @@ public void setupTest() { public void testInvokeWhenThereIsAPairWithDbtSourceNode() throws Exception { SubTypes mockSourceSubtypesAspect = new SubTypes(); mockSourceSubtypesAspect.setTypeNames(new StringArray(ImmutableList.of("source"))); + EnvelopedAspectMap mockResponseMap = new EnvelopedAspectMap(); + mockResponseMap.put(SUB_TYPES_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(mockSourceSubtypesAspect.data()))); + EntityResponse mockResponse = new EntityResponse(); + mockResponse.setAspects(mockResponseMap); + + Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true); - Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true); Mockito.when( - _mockEntityService.getLatestAspect( + _mockEntityClient.getV2( + DATASET_ENTITY_NAME, Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"), - SUB_TYPES_ASPECT_NAME - )).thenReturn(mockSourceSubtypesAspect); + ImmutableSet.of(SUB_TYPES_ASPECT_NAME), + _mockAuthentication + )).thenReturn(mockResponse); MetadataChangeLog event = new MetadataChangeLog(); event.setEntityType(DATASET_ENTITY_NAME); @@ -90,9 +104,9 @@ public void testInvokeWhenThereIsAPairWithDbtSourceNode() throws Exception { proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect)); proposal.setChangeType(ChangeType.UPSERT); - Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal( + Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal( Mockito.eq(proposal), - Mockito.any(AuditStamp.class) + Mockito.eq(_mockAuthentication) ); final Siblings sourceSiblingsAspect = new Siblings() @@ -106,9 +120,9 @@ public void testInvokeWhenThereIsAPairWithDbtSourceNode() throws Exception { proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal( + Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal( Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) + Mockito.eq(_mockAuthentication) ); } @@ -117,13 +131,23 @@ public void testInvokeWhenThereIsNoPairWithDbtModel() throws Exception { SubTypes mockSourceSubtypesAspect = new SubTypes(); mockSourceSubtypesAspect.setTypeNames(new StringArray(ImmutableList.of("model"))); - Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true); + Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true); + + EnvelopedAspectMap mockResponseMap = new EnvelopedAspectMap(); + mockResponseMap.put(SUB_TYPES_ASPECT_NAME, new EnvelopedAspect().setValue(new Aspect(mockSourceSubtypesAspect.data()))); + EntityResponse mockResponse = new EntityResponse(); + mockResponse.setAspects(mockResponseMap); + + Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true); + Mockito.when( - _mockEntityService.getLatestAspect( + _mockEntityClient.getV2( + DATASET_ENTITY_NAME, Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:dbt,my-proj.jaffle_shop.customers,PROD)"), - SUB_TYPES_ASPECT_NAME - )).thenReturn(mockSourceSubtypesAspect); + ImmutableSet.of(SUB_TYPES_ASPECT_NAME), + _mockAuthentication + )).thenReturn(mockResponse); MetadataChangeLog event = new MetadataChangeLog(); event.setEntityType(DATASET_ENTITY_NAME); @@ -153,15 +177,15 @@ public void testInvokeWhenThereIsNoPairWithDbtModel() throws Exception { proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect)); proposal.setChangeType(ChangeType.UPSERT); - Mockito.verify(_mockEntityService, Mockito.times(0)).ingestProposal( + Mockito.verify(_mockEntityClient, Mockito.times(0)).ingestProposal( Mockito.eq(proposal), - Mockito.any(AuditStamp.class) + Mockito.eq(_mockAuthentication) ); } @Test public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Exception { - Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true); + Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true); MetadataChangeLog event = new MetadataChangeLog(); event.setEntityType(DATASET_ENTITY_NAME); @@ -191,9 +215,9 @@ public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Except proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect)); proposal.setChangeType(ChangeType.UPSERT); - Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal( + Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal( Mockito.eq(proposal), - Mockito.any(AuditStamp.class) + Mockito.eq(_mockAuthentication) ); final Siblings sourceSiblingsAspect = new Siblings() @@ -207,15 +231,15 @@ public void testInvokeWhenThereIsAPairWithBigqueryDownstreamNode() throws Except proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal( + Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal( Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) + Mockito.eq(_mockAuthentication) ); } @Test public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception { - Mockito.when(_mockEntityService.exists(Mockito.any())).thenReturn(true); + Mockito.when(_mockEntityClient.exists(Mockito.any(), Mockito.any())).thenReturn(true); SearchResult returnSearchResult = new SearchResult(); SearchEntityArray returnEntityArray = new SearchEntityArray(); @@ -229,7 +253,7 @@ public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception { Mockito.when( _mockSearchService.search( - anyString(), anyString(), any(), any(), anyInt(), anyInt(), any() + anyString(), anyString(), any(), any(), anyInt(), anyInt() )).thenReturn(returnSearchResult); MetadataChangeLog event = new MetadataChangeLog(); @@ -256,9 +280,9 @@ public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception { proposal.setAspect(GenericRecordUtils.serializeAspect(dbtSiblingsAspect)); proposal.setChangeType(ChangeType.UPSERT); - Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal( + Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal( Mockito.eq(proposal), - Mockito.any(AuditStamp.class) + Mockito.eq(_mockAuthentication) ); final Siblings sourceSiblingsAspect = new Siblings() @@ -272,9 +296,9 @@ public void testInvokeWhenThereIsAKeyBeingReingested() throws Exception { proposal2.setAspect(GenericRecordUtils.serializeAspect(sourceSiblingsAspect)); proposal2.setChangeType(ChangeType.UPSERT); - Mockito.verify(_mockEntityService, Mockito.times(1)).ingestProposal( + Mockito.verify(_mockEntityClient, Mockito.times(1)).ingestProposal( Mockito.eq(proposal2), - Mockito.any(AuditStamp.class) + Mockito.eq(_mockAuthentication) ); } } diff --git a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json index 07273487808c9c..9116dd03c8733b 100644 --- a/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json +++ b/metadata-service/restli-api/src/main/idl/com.linkedin.entity.entities.restspec.json @@ -114,6 +114,13 @@ "optional" : true } ], "returns" : "com.linkedin.metadata.run.DeleteReferencesResponse" + }, { + "name" : "exists", + "parameters" : [ { + "name" : "urn", + "type" : "string" + } ], + "returns" : "boolean" }, { "name" : "filter", "parameters" : [ { diff --git a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json index a1cd7337889ef9..3a7dee437cfc61 100644 --- a/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json +++ b/metadata-service/restli-api/src/main/snapshot/com.linkedin.entity.entities.snapshot.json @@ -5679,6 +5679,13 @@ "optional" : true } ], "returns" : "com.linkedin.metadata.run.DeleteReferencesResponse" + }, { + "name" : "exists", + "parameters" : [ { + "name" : "urn", + "type" : "string" + } ], + "returns" : "boolean" }, { "name" : "filter", "parameters" : [ { diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java index 734bf2d6aadae7..1ebe7fe3674b14 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/EntityClient.java @@ -296,4 +296,6 @@ public DataMap getRawAspect(@Nonnull String urn, @Nonnull String aspect, @Nonnul public void producePlatformEvent(@Nonnull String name, @Nullable String key, @Nonnull PlatformEvent event, @Nonnull Authentication authentication) throws Exception; + + Boolean exists(Urn urn, @Nonnull Authentication authentication) throws RemoteInvocationException; } diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java index d3587820a79359..d2ca0906569c8a 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/JavaEntityClient.java @@ -463,4 +463,9 @@ public void producePlatformEvent( @Nonnull Authentication authentication) throws Exception { _eventProducer.producePlatformEvent(name, key, event); } + + @Override + public Boolean exists(Urn urn, @Nonnull Authentication authentication) throws RemoteInvocationException { + return _entityService.exists(urn); + } } diff --git a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java index 3f301a020c274b..d580d4f2392da1 100644 --- a/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java +++ b/metadata-service/restli-client/src/main/java/com/linkedin/entity/client/RestliEntityClient.java @@ -20,6 +20,7 @@ import com.linkedin.entity.EntitiesDoBrowseRequestBuilder; import com.linkedin.entity.EntitiesDoDeleteReferencesRequestBuilder; import com.linkedin.entity.EntitiesDoDeleteRequestBuilder; +import com.linkedin.entity.EntitiesDoExistsRequestBuilder; import com.linkedin.entity.EntitiesDoFilterRequestBuilder; import com.linkedin.entity.EntitiesDoGetBrowsePathsRequestBuilder; import com.linkedin.entity.EntitiesDoIngestRequestBuilder; @@ -662,4 +663,11 @@ public void producePlatformEvent(@Nonnull String name, @Nullable String key, @No } sendClientRequest(requestBuilder, authentication); } + + @Override + public Boolean exists(Urn urn, @Nonnull Authentication authentication) throws RemoteInvocationException { + final EntitiesDoExistsRequestBuilder requestBuilder = + ENTITIES_REQUEST_BUILDERS.actionExists().urnParam(urn.toString()); + return sendClientRequest(requestBuilder, authentication).getEntity(); + } } diff --git a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java index ea6dc4f76551a6..c0f2e2200ba49d 100644 --- a/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java +++ b/metadata-service/restli-servlet-impl/src/main/java/com/linkedin/metadata/resources/entity/EntityResource.java @@ -89,6 +89,7 @@ public class EntityResource extends CollectionResourceTaskTemplate filter(@ActionParam(PARAM_ENTITY) @Nonnull String enti return RestliUtil.toTask(() -> _entitySearchService.filter(entityName, filter, sortCriterion, start, count), MetricRegistry.name(this.getClass(), "search")); } + + @Action(name = ACTION_EXISTS) + @Nonnull + @WithSpan + public Task exists(@ActionParam(PARAM_URN) @Nonnull String urnStr) throws URISyntaxException { + log.info("EXISTS for {}", urnStr); + Urn urn = Urn.createFromString(urnStr); + return RestliUtil.toTask(() -> _entityService.exists(urn), + MetricRegistry.name(this.getClass(), "exists")); + } } diff --git a/smoke-test/tests/cypress/example_siblings_to_datahub_rest.yml b/smoke-test/tests/cypress/example_siblings_to_datahub_rest.yml new file mode 100644 index 00000000000000..89b259a9ea3c3c --- /dev/null +++ b/smoke-test/tests/cypress/example_siblings_to_datahub_rest.yml @@ -0,0 +1,11 @@ +# see https://datahubproject.io/docs/generated/ingestion/sources/file for complete documentation +source: + type: "file" + config: + filename: "./cypress_dbt_data.json" + +# see https://datahubproject.io/docs/metadata-ingestion/sink_docs/datahub for complete documentation +sink: + type: "datahub-rest" + config: + server: "http://localhost:8080"