diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/AspectStorageValidationUtil.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/AspectStorageValidationUtil.java index f19a570d197739..7804aa2067088b 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/AspectStorageValidationUtil.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/AspectStorageValidationUtil.java @@ -16,8 +16,8 @@ private AspectStorageValidationUtil() { * @return {@code true} if table exists. */ public static boolean checkTableExists(@Nonnull CqlSession session) { - String query = String.format("SELECT columnfamily_name\n " - + "FROM schema_columnfamilies WHERE keyspace_name='%s';", + String query = String.format("SELECT table_name \n " + + "FROM system_schema.tables where table_name = '%s' allow filtering;", CassandraAspect.TABLE_NAME); ResultSet rs = session.execute(query); return rs.all().size() > 0; diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java index 13ce40e8fbb365..a95e8eb62f8516 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/cassandra/CassandraAspectDao.java @@ -7,7 +7,6 @@ import com.datastax.oss.driver.api.core.cql.ResultSet; import com.datastax.oss.driver.api.core.cql.Row; import com.datastax.oss.driver.api.core.cql.SimpleStatement; -import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder; import com.datastax.oss.driver.api.core.paging.OffsetPager; import com.datastax.oss.driver.api.core.paging.OffsetPager.Page; import com.datastax.oss.driver.api.querybuilder.QueryBuilder; @@ -28,10 +27,6 @@ import com.linkedin.metadata.query.ExtraInfo; import com.linkedin.metadata.query.ExtraInfoArray; import com.linkedin.metadata.query.ListResultMetadata; -import lombok.extern.slf4j.Slf4j; - -import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.net.URISyntaxException; import java.sql.Timestamp; import java.util.HashMap; @@ -41,13 +36,12 @@ import java.util.Set; import java.util.function.Supplier; import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import lombok.extern.slf4j.Slf4j; -import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom; -import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto; -import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal; -import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom; -import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.update; -import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION; +import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*; +import static com.linkedin.metadata.Constants.*; @Slf4j public class CassandraAspectDao implements AspectDao, AspectMigrationsDao { @@ -97,10 +91,13 @@ public long countEntities() { SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME) .distinct() .column(CassandraAspect.URN_COLUMN) - .countAll() .build(); - return _cqlSession.execute(ss).one().getLong(0); + ResultSet rs = _cqlSession.execute(ss); + // TODO: make sure it doesn't blow up on a large database + // Getting a count of distinct values in a Cassandra query doesn't seem to be feasible, but counting them in the app is dangerous + // The saving grace here is that the only place where this method is used should only run once, what the database is still young + return rs.all().size(); } @Override @@ -110,10 +107,11 @@ public boolean checkIfAspectExists(@Nonnull String aspectName) { .column(CassandraAspect.URN_COLUMN) .whereColumn(CassandraAspect.ASPECT_COLUMN).isEqualTo(literal(aspectName)) .limit(1) + .allowFiltering() .build(); ResultSet rs = _cqlSession.execute(ss); - return rs.all().size() > 0; + return rs.one() != null; } private Map getMaxVersions(@Nonnull final String urn, @Nonnull final Set aspectNames) { @@ -451,7 +449,6 @@ public Iterable listAllUrns(int start, int pageSize) { validateConnection(); SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME) .column(CassandraAspect.URN_COLUMN) - .orderBy(CassandraAspect.URN_COLUMN, ClusteringOrder.ASC) .build(); ResultSet rs = _cqlSession.execute(ss); diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java new file mode 100644 index 00000000000000..e4b23aba7b92c8 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/AspectGenerationUtils.java @@ -0,0 +1,69 @@ +package com.linkedin.metadata; + +import com.linkedin.chart.ChartInfo; +import com.linkedin.common.AuditStamp; +import com.linkedin.common.ChangeAuditStamps; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.data.template.RecordTemplate; +import com.linkedin.identity.CorpUserInfo; +import com.linkedin.metadata.key.CorpUserKey; +import com.linkedin.metadata.utils.EntityKeyUtils; +import com.linkedin.metadata.utils.PegasusUtils; +import com.linkedin.mxe.SystemMetadata; +import javax.annotation.Nonnull; + + +public class AspectGenerationUtils { + + private AspectGenerationUtils() { + } + + @Nonnull + public static AuditStamp createAuditStamp() { + return new AuditStamp().setTime(123L).setActor(UrnUtils.getUrn("urn:li:corpuser:tester")); + } + + @Nonnull + public static SystemMetadata createSystemMetadata() { + return createSystemMetadata(1625792689, "run-123"); + } + + @Nonnull + public static SystemMetadata createSystemMetadata(long lastObserved, @Nonnull String runId) { + SystemMetadata metadata = new SystemMetadata(); + metadata.setLastObserved(lastObserved); + metadata.setRunId(runId); + return metadata; + } + + @Nonnull + public static CorpUserKey createCorpUserKey(Urn urn) { + return (CorpUserKey) EntityKeyUtils.convertUrnToEntityKey(urn, new CorpUserKey().schema()); + } + + @Nonnull + public static CorpUserInfo createCorpUserInfo(@Nonnull String email) { + CorpUserInfo corpUserInfo = new CorpUserInfo(); + corpUserInfo.setEmail(email); + corpUserInfo.setActive(true); + return corpUserInfo; + } + + @Nonnull + public static ChartInfo createChartInfo(@Nonnull String title, @Nonnull String description) { + ChartInfo chartInfo = new ChartInfo(); + chartInfo.setTitle(title); + chartInfo.setDescription(description); + ChangeAuditStamps lastModified = new ChangeAuditStamps(); + lastModified.setCreated(createAuditStamp()); + lastModified.setLastModified(createAuditStamp()); + chartInfo.setLastModified(lastModified); + return chartInfo; + } + + @Nonnull + public static String getAspectName(RecordTemplate record) { + return PegasusUtils.getAspectNameFromSchema(record.schema()); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java b/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java new file mode 100644 index 00000000000000..2361bcc22780ad --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/AspectIngestionUtils.java @@ -0,0 +1,75 @@ +package com.linkedin.metadata; + +import com.linkedin.chart.ChartInfo; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.identity.CorpUserInfo; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.key.CorpUserKey; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.Nonnull; + + +public class AspectIngestionUtils { + + private AspectIngestionUtils() { + } + + @Nonnull + public static Map ingestCorpUserKeyAspects(EntityService entityService, int aspectCount) { + return ingestCorpUserKeyAspects(entityService, aspectCount, 0); + } + + @Nonnull + public static Map ingestCorpUserKeyAspects(EntityService entityService, int aspectCount, int startIndex) { + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserKey()); + Map aspects = new HashMap<>(); + for (int i = startIndex; i < startIndex + aspectCount; i++) { + Urn urn = UrnUtils.getUrn(String.format("urn:li:corpuser:tester%d", i)); + CorpUserKey aspect = AspectGenerationUtils.createCorpUserKey(urn); + aspects.put(urn, aspect); + entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata()); + } + return aspects; + } + + @Nonnull + public static Map ingestCorpUserInfoAspects(@Nonnull final EntityService entityService, int aspectCount) { + return ingestCorpUserInfoAspects(entityService, aspectCount, 0); + } + + @Nonnull + public static Map ingestCorpUserInfoAspects(@Nonnull final EntityService entityService, int aspectCount, int startIndex) { + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); + Map aspects = new HashMap<>(); + for (int i = startIndex; i < startIndex + aspectCount; i++) { + Urn urn = UrnUtils.getUrn(String.format("urn:li:corpuser:tester%d", i)); + String email = String.format("email%d@test.com", i); + CorpUserInfo aspect = AspectGenerationUtils.createCorpUserInfo(email); + aspects.put(urn, aspect); + entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata()); + } + return aspects; + } + + @Nonnull + public static Map ingestChartInfoAspects(@Nonnull final EntityService entityService, int aspectCount) { + return ingestChartInfoAspects(entityService, aspectCount, 0); + } + + @Nonnull + public static Map ingestChartInfoAspects(@Nonnull final EntityService entityService, int aspectCount, int startIndex) { + String aspectName = AspectGenerationUtils.getAspectName(new ChartInfo()); + Map aspects = new HashMap<>(); + for (int i = startIndex; i < startIndex + aspectCount; i++) { + Urn urn = UrnUtils.getUrn(String.format("urn:li:chart:(looker,test%d)", i)); + String title = String.format("Test Title %d", i); + String description = String.format("Test description %d", i); + ChartInfo aspect = AspectGenerationUtils.createChartInfo(title, description); + aspects.put(urn, aspect); + entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata()); + } + return aspects; + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/AspectMigrationsDaoTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/AspectMigrationsDaoTest.java new file mode 100644 index 00000000000000..e780e1375a445a --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/AspectMigrationsDaoTest.java @@ -0,0 +1,98 @@ +package com.linkedin.metadata.entity; + +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.AspectIngestionUtils; +import com.linkedin.metadata.event.EventProducer; +import com.linkedin.metadata.key.CorpUserKey; +import com.linkedin.metadata.models.registry.ConfigEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistryException; +import com.linkedin.metadata.models.registry.MergedEntityRegistry; +import com.linkedin.metadata.snapshot.Snapshot; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.testcontainers.shaded.com.google.common.collect.ImmutableList; +import org.testng.annotations.Test; + +import static com.linkedin.metadata.Constants.*; +import static org.testng.Assert.*; + + +abstract public class AspectMigrationsDaoTest { + + protected T _migrationsDao; + + protected final EntityRegistry _snapshotEntityRegistry; + protected final EntityRegistry _configEntityRegistry; + protected final EntityRegistry _testEntityRegistry; + protected EventProducer _mockProducer; + + protected EntityService _entityService; + protected RetentionService _retentionService; + + protected AspectMigrationsDaoTest() throws EntityRegistryException { + _snapshotEntityRegistry = new TestEntityRegistry(); + _configEntityRegistry = new ConfigEntityRegistry(Snapshot.class.getClassLoader().getResourceAsStream("entity-registry.yml")); + _testEntityRegistry = new MergedEntityRegistry(_snapshotEntityRegistry).apply(_configEntityRegistry); + } + + @Test + public void testListAllUrns() throws AssertionError { + final int totalAspects = 30; + final int pageSize = 25; + final int lastPageSize = 5; + Map ingestedAspects = AspectIngestionUtils.ingestCorpUserKeyAspects(_entityService, totalAspects); + List ingestedUrns = ingestedAspects.keySet().stream().map(Urn::toString).collect(Collectors.toList()); + List seenUrns = new ArrayList<>(); + + Iterable page1 = _migrationsDao.listAllUrns(0, pageSize); + List page1Urns = ImmutableList.copyOf(page1); + + // validate first page + assertEquals(page1Urns.size(), pageSize); + for (String urn : page1Urns) { + assertNotNull(UrnUtils.getUrn(urn)); + seenUrns.add(urn); + } + + Iterable page2 = _migrationsDao.listAllUrns(pageSize, pageSize); + List page2Urns = ImmutableList.copyOf(page2); + + // validate last page + assertEquals(page2Urns.size(), lastPageSize); + for (String urn : page2Urns) { + assertNotNull(UrnUtils.getUrn(urn)); + seenUrns.add(urn); + } + + // validate all ingested URNs were returned exactly once + for (String urn : ingestedUrns) { + assertEquals(seenUrns.stream().filter(u -> u.equals(urn)).count(), 1); + } + } + + @Test + public void testCountEntities() throws AssertionError { + AspectIngestionUtils.ingestCorpUserInfoAspects(_entityService, 11); + AspectIngestionUtils.ingestChartInfoAspects(_entityService, 22); + final int expected = 33; + + long actual = _migrationsDao.countEntities(); + + assertEquals(actual, expected); + } + + @Test + public void testCheckIfAspectExists() throws AssertionError { + boolean actual = _migrationsDao.checkIfAspectExists(CORP_USER_INFO_ASPECT_NAME); + assertFalse(actual); + + AspectIngestionUtils.ingestCorpUserInfoAspects(_entityService, 1); + + actual = _migrationsDao.checkIfAspectExists(CORP_USER_INFO_ASPECT_NAME); + assertTrue(actual); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/CassandraAspectMigrationsDaoTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/CassandraAspectMigrationsDaoTest.java new file mode 100644 index 00000000000000..b3d2679fc3be58 --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/CassandraAspectMigrationsDaoTest.java @@ -0,0 +1,63 @@ +package com.linkedin.metadata.entity; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.linkedin.metadata.CassandraTestUtils; +import com.linkedin.metadata.entity.cassandra.CassandraAspectDao; +import com.linkedin.metadata.entity.cassandra.CassandraRetentionService; +import com.linkedin.metadata.event.EventProducer; +import com.linkedin.metadata.models.registry.EntityRegistryException; +import org.testcontainers.containers.CassandraContainer; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +public class CassandraAspectMigrationsDaoTest extends AspectMigrationsDaoTest { + + private CassandraContainer _cassandraContainer; + + public CassandraAspectMigrationsDaoTest() throws EntityRegistryException { + } + + @BeforeClass + public void setupContainer() { + _cassandraContainer = CassandraTestUtils.setupContainer(); + } + + @AfterClass + public void tearDown() { + _cassandraContainer.stop(); + } + + @BeforeMethod + public void setupTest() { + CassandraTestUtils.purgeData(_cassandraContainer); + configureComponents(); + } + + private void configureComponents() { + CqlSession session = CassandraTestUtils.createTestSession(_cassandraContainer); + CassandraAspectDao dao = new CassandraAspectDao(session); + dao.setConnectionValidated(true); + _mockProducer = mock(EventProducer.class); + _entityService = new EntityService(dao, _mockProducer, _testEntityRegistry); + _retentionService = new CassandraRetentionService(_entityService, session, 1000); + _entityService.setRetentionService(_retentionService); + + _migrationsDao = dao; + } + + /** + * Ideally, all tests would be in the base class, so they're reused between all implementations. + * When that's the case - test runner will ignore this class (and its base!) so we keep this dummy test + * to make sure this class will always be discovered. + */ + @Test + public void obligatoryTest() throws AssertionError { + Assert.assertTrue(true); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/CassandraEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/CassandraEntityServiceTest.java index 5dd17567589340..ab716ed7b0f9ad 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/CassandraEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/CassandraEntityServiceTest.java @@ -2,18 +2,23 @@ import com.datastax.oss.driver.api.core.CqlSession; import com.linkedin.common.urn.Urn; -import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; import com.linkedin.identity.CorpUserInfo; +import com.linkedin.metadata.AspectGenerationUtils; +import com.linkedin.metadata.AspectIngestionUtils; import com.linkedin.metadata.CassandraTestUtils; import com.linkedin.metadata.entity.cassandra.CassandraAspectDao; import com.linkedin.metadata.entity.cassandra.CassandraRetentionService; import com.linkedin.metadata.event.EventProducer; import com.linkedin.metadata.key.CorpUserKey; import com.linkedin.metadata.models.registry.EntityRegistryException; +import com.linkedin.metadata.query.ExtraInfo; import com.linkedin.metadata.query.ListUrnsResult; -import com.linkedin.metadata.utils.PegasusUtils; -import com.linkedin.mxe.SystemMetadata; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.testcontainers.containers.CassandraContainer; import org.testng.Assert; import org.testng.annotations.AfterClass; @@ -21,9 +26,8 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import static org.mockito.Mockito.mock; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; /** * A class that knows how to configure {@link EntityServiceTest} to run integration tests against a Cassandra database. @@ -71,110 +75,105 @@ private void configureComponents() { * to make sure this class will always be discovered. */ @Test - public void obligatoryTest() throws Exception { + public void obligatoryTest() throws AssertionError { Assert.assertTrue(true); } @Override @Test - public void testIngestListLatestAspects() throws Exception { + public void testIngestListLatestAspects() throws AssertionError { // TODO: If you're modifying this test - match your changes in sibling implementations. // TODO: Move this test into the base class, // If you can find a way for Cassandra and relational databases to share result ordering rules. - Urn entityUrn1 = Urn.createFromString("urn:li:corpuser:test1"); - Urn entityUrn2 = Urn.createFromString("urn:li:corpuser:test2"); - Urn entityUrn3 = Urn.createFromString("urn:li:corpuser:test3"); - - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); - - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); - - // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); - _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); - - // Ingest CorpUserInfo Aspect #2 - CorpUserInfo writeAspect2 = createCorpUserInfo("email2@test.com"); - _entityService.ingestAspect(entityUrn2, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata1); - - // Ingest CorpUserInfo Aspect #3 - CorpUserInfo writeAspect3 = createCorpUserInfo("email3@test.com"); - _entityService.ingestAspect(entityUrn3, aspectName, writeAspect3, TEST_AUDIT_STAMP, metadata1); - - // List aspects - ListResult batch1 = _entityService.listLatestAspects(entityUrn1.getEntityType(), aspectName, 0, 2); - - assertEquals(batch1.getNextStart(), 2); - assertEquals(batch1.getPageSize(), 2); - assertEquals(batch1.getTotalCount(), 3); - assertEquals(batch1.getTotalPageCount(), 2); - assertEquals(batch1.getValues().size(), 2); - assertTrue(DataTemplateUtil.areEqual(writeAspect1, batch1.getValues().get(0))); - assertTrue(DataTemplateUtil.areEqual(writeAspect3, batch1.getValues().get(1))); - - ListResult batch2 = _entityService.listLatestAspects(entityUrn1.getEntityType(), aspectName, 2, 2); - assertEquals(batch2.getValues().size(), 1); - assertTrue(DataTemplateUtil.areEqual(writeAspect2, batch2.getValues().get(0))); + final int totalEntities = 100; + final int pageSize = 30; + final int expectedTotalPages = 4; + final int expectedEntitiesInLastPage = 10; + + Map writtenAspects = AspectIngestionUtils.ingestCorpUserInfoAspects(_entityService, totalEntities); + Set writtenUrns = writtenAspects.keySet(); + String entity = writtenUrns.stream().findFirst().get().getEntityType(); + String aspect = AspectGenerationUtils.getAspectName(new CorpUserInfo()); + + List readUrns = new ArrayList<>(); + for (int pageNo = 0; pageNo < expectedTotalPages; pageNo++) { + boolean isLastPage = pageNo == expectedTotalPages - 1; + int pageStart = pageNo * pageSize; + int expectedEntityCount = isLastPage ? expectedEntitiesInLastPage : pageSize; + int expectedNextStart = isLastPage ? -1 : pageStart + pageSize; + + ListResult page = _entityService.listLatestAspects(entity, aspect, pageStart, pageSize); + + // Check paging metadata works as expected + assertEquals(page.getNextStart(), expectedNextStart); + assertEquals(page.getPageSize(), pageSize); + assertEquals(page.getTotalCount(), totalEntities); + assertEquals(page.getTotalPageCount(), expectedTotalPages); + assertEquals(page.getValues().size(), expectedEntityCount); + + // Remember all URNs we've seen returned for later assertions + readUrns.addAll(page.getMetadata().getExtraInfos().stream().map(ExtraInfo::getUrn).collect(Collectors.toList())); + } + assertEquals(readUrns.size(), writtenUrns.size()); + + // Check that all URNs we've created were seen in some page or other (also check that none were seen more than once) + // We can't be strict on exact order of items in the responses because Cassandra query limitations get in the way here. + for (Urn wUrn : writtenUrns) { + long matchingUrnCount = readUrns.stream().filter(rUrn -> rUrn.toString().equals(wUrn.toString())).count(); + assertEquals(matchingUrnCount, 1L, String.format("Each URN should appear exactly once. %s appeared %d times.", wUrn, matchingUrnCount)); + } } @Override @Test - public void testIngestListUrns() throws Exception { + public void testIngestListUrns() throws AssertionError { // TODO: If you're modifying this test - match your changes in sibling implementations. // TODO: Move this test into the base class, // If you can find a way for Cassandra and relational databases to share result ordering rules. - Urn entityUrn1 = Urn.createFromString("urn:li:corpuser:test1"); - Urn entityUrn2 = Urn.createFromString("urn:li:corpuser:test2"); - Urn entityUrn3 = Urn.createFromString("urn:li:corpuser:test3"); - - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); - - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserKey().schema()); - - // Ingest CorpUserInfo Aspect #1 - RecordTemplate writeAspect1 = createCorpUserKey(entityUrn1); - _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); - - // Ingest CorpUserInfo Aspect #2 - RecordTemplate writeAspect2 = createCorpUserKey(entityUrn2); - _entityService.ingestAspect(entityUrn2, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata1); - - // Ingest CorpUserInfo Aspect #3 - RecordTemplate writeAspect3 = createCorpUserKey(entityUrn3); - _entityService.ingestAspect(entityUrn3, aspectName, writeAspect3, TEST_AUDIT_STAMP, metadata1); - - // List aspects urns - ListUrnsResult batch1 = _entityService.listUrns(entityUrn1.getEntityType(), 0, 2); - - assertEquals((int) batch1.getStart(), 0); - assertEquals((int) batch1.getCount(), 2); - assertEquals((int) batch1.getTotal(), 3); - assertEquals(batch1.getEntities().size(), 2); - assertEquals(entityUrn1.toString(), batch1.getEntities().get(0).toString()); - assertEquals(entityUrn3.toString(), batch1.getEntities().get(1).toString()); - - ListUrnsResult batch2 = _entityService.listUrns(entityUrn1.getEntityType(), 2, 2); - - assertEquals((int) batch2.getStart(), 2); - assertEquals((int) batch2.getCount(), 1); - assertEquals((int) batch2.getTotal(), 3); - assertEquals(batch2.getEntities().size(), 1); - assertEquals(entityUrn2.toString(), batch2.getEntities().get(0).toString()); + final int totalEntities = 100; + final int pageSize = 30; + final int expectedTotalPages = 4; + final int expectedEntitiesInLastPage = 10; + + Map writtenAspects = AspectIngestionUtils.ingestCorpUserKeyAspects(_entityService, totalEntities); + Set writtenUrns = writtenAspects.keySet(); + String entity = writtenUrns.stream().findFirst().get().getEntityType(); + + List readUrns = new ArrayList<>(); + for (int pageNo = 0; pageNo < expectedTotalPages; pageNo++) { + boolean isLastPage = pageNo == expectedTotalPages - 1; + int pageStart = pageNo * pageSize; + int expectedEntityCount = isLastPage ? expectedEntitiesInLastPage : pageSize; + + ListUrnsResult page = _entityService.listUrns(entity, pageStart, pageSize); + + // Check paging metadata works as expected + assertEquals(page.getStart().intValue(), pageStart); + assertEquals(page.getTotal().intValue(), totalEntities); + assertEquals(page.getEntities().size(), expectedEntityCount); + + // Remember all URNs we've seen returned for later assertions + readUrns.addAll(page.getEntities()); + } + assertEquals(readUrns.size(), writtenUrns.size()); + + // Check that all URNs we've created were seen in some page or other (also check that none were seen more than once) + // We can't be strict on exact order of items in the responses because Cassandra query limitations get in the way here. + for (Urn wUrn : writtenUrns) { + long matchingUrnCount = readUrns.stream().filter(rUrn -> rUrn.toString().equals(wUrn.toString())).count(); + assertEquals(matchingUrnCount, 1L, String.format("Each URN should appear exactly once. %s appeared %d times.", wUrn, matchingUrnCount)); + } } @Override @Test - public void testNestedTransactions() throws Exception { + public void testNestedTransactions() { // Doesn't look like Cassandra can support nested transactions (or nested batching). } } diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanAspectMigrationsDaoTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanAspectMigrationsDaoTest.java new file mode 100644 index 00000000000000..a1fe62c0a594de --- /dev/null +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanAspectMigrationsDaoTest.java @@ -0,0 +1,43 @@ +package com.linkedin.metadata.entity; + +import com.linkedin.metadata.EbeanTestUtils; +import com.linkedin.metadata.entity.ebean.EbeanAspectDao; +import com.linkedin.metadata.entity.ebean.EbeanRetentionService; +import com.linkedin.metadata.event.EventProducer; +import com.linkedin.metadata.models.registry.EntityRegistryException; +import io.ebean.EbeanServer; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import static org.mockito.Mockito.*; + + +public class EbeanAspectMigrationsDaoTest extends AspectMigrationsDaoTest { + + public EbeanAspectMigrationsDaoTest() throws EntityRegistryException { + } + + @BeforeMethod + public void setupTest() { + EbeanServer server = EbeanTestUtils.createTestServer(); + _mockProducer = mock(EventProducer.class); + EbeanAspectDao dao = new EbeanAspectDao(server); + dao.setConnectionValidated(true); + _entityService = new EntityService(dao, _mockProducer, _testEntityRegistry); + _retentionService = new EbeanRetentionService(_entityService, server, 1000); + _entityService.setRetentionService(_retentionService); + + _migrationsDao = dao; + } + + /** + * Ideally, all tests would be in the base class, so they're reused between all implementations. + * When that's the case - test runner will ignore this class (and its base!) so we keep this dummy test + * to make sure this class will always be discovered. + */ + @Test + public void obligatoryTest() throws AssertionError { + Assert.assertTrue(true); + } +} diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index cf6a9b997d0868..8fece4ae785119 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -1,9 +1,11 @@ package com.linkedin.metadata.entity; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.RecordTemplate; import com.linkedin.identity.CorpUserInfo; +import com.linkedin.metadata.AspectGenerationUtils; import com.linkedin.metadata.EbeanTestUtils; import com.linkedin.metadata.entity.ebean.EbeanAspectDao; import com.linkedin.metadata.entity.ebean.EbeanRetentionService; @@ -55,39 +57,37 @@ public void setupTest() { * to make sure this class will always be discovered. */ @Test - public void obligatoryTest() throws Exception { + public void obligatoryTest() throws AssertionError { Assert.assertTrue(true); } @Override @Test - public void testIngestListLatestAspects() throws Exception { + public void testIngestListLatestAspects() throws AssertionError { // TODO: If you're modifying this test - match your changes in sibling implementations. // TODO: Move this test into the base class, // If you can find a way for Cassandra and relational databases to share result ordering rules. - Urn entityUrn1 = Urn.createFromString("urn:li:corpuser:test1"); - Urn entityUrn2 = Urn.createFromString("urn:li:corpuser:test2"); - Urn entityUrn3 = Urn.createFromString("urn:li:corpuser:test3"); + Urn entityUrn1 = UrnUtils.getUrn("urn:li:corpuser:test1"); + Urn entityUrn2 = UrnUtils.getUrn("urn:li:corpuser:test2"); + Urn entityUrn3 = UrnUtils.getUrn("urn:li:corpuser:test3"); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #2 - CorpUserInfo writeAspect2 = createCorpUserInfo("email2@test.com"); + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("email2@test.com"); _entityService.ingestAspect(entityUrn2, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #3 - CorpUserInfo writeAspect3 = createCorpUserInfo("email3@test.com"); + CorpUserInfo writeAspect3 = AspectGenerationUtils.createCorpUserInfo("email3@test.com"); _entityService.ingestAspect(entityUrn3, aspectName, writeAspect3, TEST_AUDIT_STAMP, metadata1); // List aspects @@ -108,57 +108,55 @@ public void testIngestListLatestAspects() throws Exception { @Override @Test - public void testIngestListUrns() throws Exception { + public void testIngestListUrns() throws AssertionError { // TODO: If you're modifying this test - match your changes in sibling implementations. // TODO: Move this test into the base class, // If you can find a way for Cassandra and relational databases to share result ordering rules. - Urn entityUrn1 = Urn.createFromString("urn:li:corpuser:test1"); - Urn entityUrn2 = Urn.createFromString("urn:li:corpuser:test2"); - Urn entityUrn3 = Urn.createFromString("urn:li:corpuser:test3"); + Urn entityUrn1 = UrnUtils.getUrn("urn:li:corpuser:test1"); + Urn entityUrn2 = UrnUtils.getUrn("urn:li:corpuser:test2"); + Urn entityUrn3 = UrnUtils.getUrn("urn:li:corpuser:test3"); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserKey().schema()); // Ingest CorpUserInfo Aspect #1 - RecordTemplate writeAspect1 = createCorpUserKey(entityUrn1); + RecordTemplate writeAspect1 = AspectGenerationUtils.createCorpUserKey(entityUrn1); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #2 - RecordTemplate writeAspect2 = createCorpUserKey(entityUrn2); + RecordTemplate writeAspect2 = AspectGenerationUtils.createCorpUserKey(entityUrn2); _entityService.ingestAspect(entityUrn2, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #3 - RecordTemplate writeAspect3 = createCorpUserKey(entityUrn3); + RecordTemplate writeAspect3 = AspectGenerationUtils.createCorpUserKey(entityUrn3); _entityService.ingestAspect(entityUrn3, aspectName, writeAspect3, TEST_AUDIT_STAMP, metadata1); // List aspects urns ListUrnsResult batch1 = _entityService.listUrns(entityUrn1.getEntityType(), 0, 2); - assertEquals((int) batch1.getStart(), 0); - assertEquals((int) batch1.getCount(), 2); - assertEquals((int) batch1.getTotal(), 3); + assertEquals(batch1.getStart().intValue(), 0); + assertEquals(batch1.getCount().intValue(), 2); + assertEquals(batch1.getTotal().intValue(), 3); assertEquals(batch1.getEntities().size(), 2); assertEquals(entityUrn1.toString(), batch1.getEntities().get(0).toString()); assertEquals(entityUrn2.toString(), batch1.getEntities().get(1).toString()); ListUrnsResult batch2 = _entityService.listUrns(entityUrn1.getEntityType(), 2, 2); - assertEquals((int) batch2.getStart(), 2); - assertEquals((int) batch2.getCount(), 1); - assertEquals((int) batch2.getTotal(), 3); + assertEquals(batch2.getStart().intValue(), 2); + assertEquals(batch2.getCount().intValue(), 1); + assertEquals(batch2.getTotal().intValue(), 3); assertEquals(batch2.getEntities().size(), 1); assertEquals(entityUrn3.toString(), batch2.getEntities().get(0).toString()); } @Override @Test - public void testNestedTransactions() throws Exception { + public void testNestedTransactions() throws AssertionError { EbeanServer server = _aspectDao.getServer(); try (Transaction transaction = server.beginTransaction(TxScope.requiresNew() diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java index d324165ad604e1..e5edcf22439d8c 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EntityServiceTest.java @@ -10,6 +10,7 @@ import com.linkedin.common.VersionedUrn; import com.linkedin.common.urn.CorpuserUrn; import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; import com.linkedin.data.ByteString; import com.linkedin.data.template.DataTemplateUtil; import com.linkedin.data.template.JacksonDataTemplateCodec; @@ -20,6 +21,7 @@ import com.linkedin.entity.EnvelopedAspect; import com.linkedin.events.metadata.ChangeType; import com.linkedin.identity.CorpUserInfo; +import com.linkedin.metadata.AspectGenerationUtils; import com.linkedin.metadata.aspect.Aspect; import com.linkedin.metadata.aspect.CorpUserAspect; import com.linkedin.metadata.aspect.CorpUserAspectArray; @@ -34,8 +36,6 @@ import com.linkedin.metadata.run.AspectRowSummary; import com.linkedin.metadata.snapshot.CorpUserSnapshot; import com.linkedin.metadata.snapshot.Snapshot; -import com.linkedin.metadata.utils.EntityKeyUtils; -import com.linkedin.metadata.utils.PegasusUtils; import com.linkedin.mxe.GenericAspect; import com.linkedin.mxe.MetadataAuditOperation; import com.linkedin.mxe.MetadataChangeLog; @@ -45,11 +45,6 @@ import com.linkedin.retention.Retention; import com.linkedin.retention.VersionBasedRetention; import com.linkedin.util.Pair; -import org.mockito.ArgumentCaptor; -import org.mockito.Mockito; -import org.testng.annotations.Test; - -import javax.annotation.Nonnull; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -57,16 +52,14 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import javax.annotation.Nonnull; +import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; +import org.testng.annotations.Test; -import static org.mockito.Mockito.reset; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertNull; -import static org.testng.Assert.assertTrue; +import static com.linkedin.metadata.Constants.*; +import static org.mockito.Mockito.*; +import static org.testng.Assert.*; /** * A class to test {@link EntityService} @@ -88,7 +81,7 @@ abstract public class EntityServiceTest> pairToIngest = new ArrayList<>(); Status writeAspect1 = new Status().setRemoved(false); - String aspectName1 = getAspectName(writeAspect1); + String aspectName1 = AspectGenerationUtils.getAspectName(writeAspect1); pairToIngest.add(getAspectRecordPair(writeAspect1, Status.class)); - CorpUserInfo writeAspect2 = createCorpUserInfo("email@test.com"); - String aspectName2 = getAspectName(writeAspect2); + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); + String aspectName2 = AspectGenerationUtils.getAspectName(writeAspect2); pairToIngest.add(getAspectRecordPair(writeAspect2, CorpUserInfo.class)); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); _entityService.ingestAspects(entityUrn, pairToIngest, TEST_AUDIT_STAMP, metadata1); @@ -450,7 +422,7 @@ public void testIngestAspectsGetLatestAspects() throws Exception { @Test public void testIngestTimeseriesAspect() throws Exception { - Urn entityUrn = Urn.createFromString("urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)"); + Urn entityUrn = UrnUtils.getUrn("urn:li:dataset:(urn:li:dataPlatform:foo,bar,PROD)"); DatasetProfile datasetProfile = new DatasetProfile(); datasetProfile.setRowCount(1000); datasetProfile.setColumnCount(15); @@ -470,15 +442,15 @@ public void testIngestTimeseriesAspect() throws Exception { } @Test - public void testUpdateGetAspect() throws Exception { + public void testUpdateGetAspect() throws AssertionError { // Test Writing a CorpUser Entity - Urn entityUrn = Urn.createFromString("urn:li:corpuser:test"); + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:test"); - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); AspectSpec corpUserInfoSpec = _testEntityRegistry.getEntitySpec("corpuser").getAspectSpec("corpUserInfo"); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect = AspectGenerationUtils.createCorpUserInfo("email@test.com"); // Validate retrieval of CorpUserInfo Aspect #1 _entityService.updateAspect(entityUrn, "corpuser", aspectName, corpUserInfoSpec, writeAspect, TEST_AUDIT_STAMP, 1, @@ -500,15 +472,15 @@ public void testUpdateGetAspect() throws Exception { } @Test - public void testGetAspectAtVersion() throws Exception { + public void testGetAspectAtVersion() throws AssertionError { // Test Writing a CorpUser Entity - Urn entityUrn = Urn.createFromString("urn:li:corpuser:test"); + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:test"); - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); AspectSpec corpUserInfoSpec = _testEntityRegistry.getEntitySpec("corpuser").getAspectSpec("corpUserInfo"); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect = AspectGenerationUtils.createCorpUserInfo("email@test.com"); // Validate retrieval of CorpUserInfo Aspect #1 _entityService.updateAspect(entityUrn, "corpuser", aspectName, corpUserInfoSpec, writeAspect, TEST_AUDIT_STAMP, 1, @@ -533,35 +505,30 @@ public void testGetAspectAtVersion() throws Exception { } @Test - public void testRollbackAspect() throws Exception { - Urn entityUrn1 = Urn.createFromString("urn:li:corpuser:test1"); - Urn entityUrn2 = Urn.createFromString("urn:li:corpuser:test2"); - Urn entityUrn3 = Urn.createFromString("urn:li:corpuser:test3"); + public void testRollbackAspect() throws AssertionError { + Urn entityUrn1 = UrnUtils.getUrn("urn:li:corpuser:test1"); + Urn entityUrn2 = UrnUtils.getUrn("urn:li:corpuser:test2"); + Urn entityUrn3 = UrnUtils.getUrn("urn:li:corpuser:test3"); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123"); + SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456"); - SystemMetadata metadata2 = new SystemMetadata(); - metadata2.setLastObserved(1635792689); - metadata2.setRunId("run-456"); - - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #2 - CorpUserInfo writeAspect2 = createCorpUserInfo("email2@test.com"); + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("email2@test.com"); _entityService.ingestAspect(entityUrn2, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #3 - CorpUserInfo writeAspect3 = createCorpUserInfo("email3@test.com"); + CorpUserInfo writeAspect3 = AspectGenerationUtils.createCorpUserInfo("email3@test.com"); _entityService.ingestAspect(entityUrn3, aspectName, writeAspect3, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #1 Overwrite - CorpUserInfo writeAspect1Overwrite = createCorpUserInfo("email1.overwrite@test.com"); + CorpUserInfo writeAspect1Overwrite = AspectGenerationUtils.createCorpUserInfo("email1.overwrite@test.com"); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1Overwrite, TEST_AUDIT_STAMP, metadata2); // this should no-op since this run has been overwritten @@ -593,29 +560,24 @@ public void testRollbackAspect() throws Exception { } @Test - public void testRollbackKey() throws Exception { - Urn entityUrn1 = Urn.createFromString("urn:li:corpuser:test1"); - - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + public void testRollbackKey() throws AssertionError { + Urn entityUrn1 = UrnUtils.getUrn("urn:li:corpuser:test1"); - SystemMetadata metadata2 = new SystemMetadata(); - metadata2.setLastObserved(1635792689); - metadata2.setRunId("run-456"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123"); + SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456"); - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); String keyAspectName = _entityService.getKeyAspectName(entityUrn1); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); RecordTemplate writeKey1 = _entityService.buildKeyAspect(entityUrn1); _entityService.ingestAspect(entityUrn1, keyAspectName, writeKey1, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #1 Overwrite - CorpUserInfo writeAspect1Overwrite = createCorpUserInfo("email1.overwrite@test.com"); + CorpUserInfo writeAspect1Overwrite = AspectGenerationUtils.createCorpUserInfo("email1.overwrite@test.com"); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1Overwrite, TEST_AUDIT_STAMP, metadata2); // this should no-op since the key should have been written in the furst run @@ -647,39 +609,34 @@ public void testRollbackKey() throws Exception { } @Test - public void testRollbackUrn() throws Exception { - Urn entityUrn1 = Urn.createFromString("urn:li:corpuser:test1"); - Urn entityUrn2 = Urn.createFromString("urn:li:corpuser:test2"); - Urn entityUrn3 = Urn.createFromString("urn:li:corpuser:test3"); - - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + public void testRollbackUrn() throws AssertionError { + Urn entityUrn1 = UrnUtils.getUrn("urn:li:corpuser:test1"); + Urn entityUrn2 = UrnUtils.getUrn("urn:li:corpuser:test2"); + Urn entityUrn3 = UrnUtils.getUrn("urn:li:corpuser:test3"); - SystemMetadata metadata2 = new SystemMetadata(); - metadata2.setLastObserved(1635792689); - metadata2.setRunId("run-456"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123"); + SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456"); - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); String keyAspectName = _entityService.getKeyAspectName(entityUrn1); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); RecordTemplate writeKey1 = _entityService.buildKeyAspect(entityUrn1); _entityService.ingestAspect(entityUrn1, keyAspectName, writeKey1, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #2 - CorpUserInfo writeAspect2 = createCorpUserInfo("email2@test.com"); + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("email2@test.com"); _entityService.ingestAspect(entityUrn2, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #3 - CorpUserInfo writeAspect3 = createCorpUserInfo("email3@test.com"); + CorpUserInfo writeAspect3 = AspectGenerationUtils.createCorpUserInfo("email3@test.com"); _entityService.ingestAspect(entityUrn3, aspectName, writeAspect3, TEST_AUDIT_STAMP, metadata1); // Ingest CorpUserInfo Aspect #1 Overwrite - CorpUserInfo writeAspect1Overwrite = createCorpUserInfo("email1.overwrite@test.com"); + CorpUserInfo writeAspect1Overwrite = AspectGenerationUtils.createCorpUserInfo("email1.overwrite@test.com"); _entityService.ingestAspect(entityUrn1, aspectName, writeAspect1Overwrite, TEST_AUDIT_STAMP, metadata2); // this should no-op since the key should have been written in the furst run @@ -689,7 +646,7 @@ public void testRollbackUrn() throws Exception { rollbackKeyWithWrongRunId.setUrn(entityUrn1.toString()); // this should delete all related aspects - _entityService.deleteUrn(Urn.createFromString("urn:li:corpuser:test1")); + _entityService.deleteUrn(UrnUtils.getUrn("urn:li:corpuser:test1")); // assert the new most recent aspect is null RecordTemplate readNewRecentAspect = _entityService.getAspect(entityUrn1, aspectName, 0); @@ -700,20 +657,15 @@ public void testRollbackUrn() throws Exception { } @Test - public void testIngestGetLatestAspect() throws Exception { - Urn entityUrn = Urn.createFromString("urn:li:corpuser:test"); + public void testIngestGetLatestAspect() throws AssertionError { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:test"); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); - String aspectName = PegasusUtils.getAspectNameFromSchema(writeAspect1.schema()); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); + String aspectName = AspectGenerationUtils.getAspectName(writeAspect1); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); - - SystemMetadata metadata2 = new SystemMetadata(); - metadata2.setLastObserved(1635792689); - metadata2.setRunId("run-456"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123"); + SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456"); // Validate retrieval of CorpUserInfo Aspect #1 _entityService.ingestAspect(entityUrn, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); @@ -736,7 +688,7 @@ public void testIngestGetLatestAspect() throws Exception { reset(_mockProducer); // Ingest CorpUserInfo Aspect #2 - CorpUserInfo writeAspect2 = createCorpUserInfo("email2@test.com"); + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("email2@test.com"); // Validate retrieval of CorpUserInfo Aspect #2 _entityService.ingestAspect(entityUrn, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata2); @@ -763,19 +715,14 @@ public void testIngestGetLatestAspect() throws Exception { @Test public void testIngestGetLatestEnvelopedAspect() throws Exception { - Urn entityUrn = Urn.createFromString("urn:li:corpuser:test"); + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:test"); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); - String aspectName = PegasusUtils.getAspectNameFromSchema(writeAspect1.schema()); - - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); + String aspectName = AspectGenerationUtils.getAspectName(writeAspect1); - SystemMetadata metadata2 = new SystemMetadata(); - metadata2.setLastObserved(1635792689); - metadata2.setRunId("run-456"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123"); + SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456"); // Validate retrieval of CorpUserInfo Aspect #1 _entityService.ingestAspect(entityUrn, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); @@ -783,7 +730,7 @@ public void testIngestGetLatestEnvelopedAspect() throws Exception { assertTrue(DataTemplateUtil.areEqual(writeAspect1, new CorpUserInfo(readAspect1.getValue().data()))); // Ingest CorpUserInfo Aspect #2 - CorpUserInfo writeAspect2 = createCorpUserInfo("email2@test.com"); + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("email2@test.com"); // Validate retrieval of CorpUserInfo Aspect #2 _entityService.ingestAspect(entityUrn, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata2); @@ -808,20 +755,16 @@ public void testIngestGetLatestEnvelopedAspect() throws Exception { } @Test - public void testIngestSameAspect() throws Exception { - Urn entityUrn = Urn.createFromString("urn:li:corpuser:test"); + public void testIngestSameAspect() throws AssertionError { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:test"); // Ingest CorpUserInfo Aspect #1 - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); - String aspectName = PegasusUtils.getAspectNameFromSchema(writeAspect1.schema()); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); + String aspectName = AspectGenerationUtils.getAspectName(writeAspect1); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); - - SystemMetadata metadata2 = new SystemMetadata(); - metadata2.setLastObserved(1635792689); - metadata2.setRunId("run-456"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(1625792689, "run-123"); + SystemMetadata metadata2 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-456"); + SystemMetadata metadata3 = AspectGenerationUtils.createSystemMetadata(1635792689, "run-123"); // Validate retrieval of CorpUserInfo Aspect #1 _entityService.ingestAspect(entityUrn, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); @@ -844,21 +787,17 @@ public void testIngestSameAspect() throws Exception { reset(_mockProducer); // Ingest CorpUserInfo Aspect #2 - CorpUserInfo writeAspect2 = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect2 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); // Validate retrieval of CorpUserInfo Aspect #2 _entityService.ingestAspect(entityUrn, aspectName, writeAspect2, TEST_AUDIT_STAMP, metadata2); RecordTemplate readAspect2 = _entityService.getLatestAspect(entityUrn, aspectName); - EntityAspect readAspectDao2 = _aspectDao.getAspect(entityUrn.toString(), aspectName, 0); + EntityAspect readAspectDao2 = _aspectDao.getAspect(entityUrn.toString(), aspectName, ASPECT_LATEST_VERSION); assertTrue(DataTemplateUtil.areEqual(writeAspect2, readAspect2)); assertFalse(DataTemplateUtil.areEqual(EntityUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata2)); assertFalse(DataTemplateUtil.areEqual(EntityUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata1)); - SystemMetadata metadata3 = new SystemMetadata(); - metadata3.setLastObserved(1635792689); - metadata3.setRunId("run-123"); - assertTrue(DataTemplateUtil.areEqual(EntityUtils.parseSystemMetadata(readAspectDao2.getSystemMetadata()), metadata3)); verify(_mockProducer, times(0)).produceMetadataChangeLog(Mockito.eq(entityUrn), Mockito.any(), mclCaptor.capture()); @@ -870,24 +809,22 @@ public void testIngestSameAspect() throws Exception { } @Test - public void testRetention() throws Exception { - Urn entityUrn = Urn.createFromString("urn:li:corpuser:test1"); + public void testRetention() throws AssertionError { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:test1"); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); // Ingest CorpUserInfo Aspect - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); _entityService.ingestAspect(entityUrn, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); - CorpUserInfo writeAspect1a = createCorpUserInfo("email_a@test.com"); + CorpUserInfo writeAspect1a = AspectGenerationUtils.createCorpUserInfo("email_a@test.com"); _entityService.ingestAspect(entityUrn, aspectName, writeAspect1a, TEST_AUDIT_STAMP, metadata1); - CorpUserInfo writeAspect1b = createCorpUserInfo("email_b@test.com"); + CorpUserInfo writeAspect1b = AspectGenerationUtils.createCorpUserInfo("email_b@test.com"); _entityService.ingestAspect(entityUrn, aspectName, writeAspect1b, TEST_AUDIT_STAMP, metadata1); - String aspectName2 = PegasusUtils.getAspectNameFromSchema(new Status().schema()); + String aspectName2 = AspectGenerationUtils.getAspectName(new Status()); // Ingest Status Aspect Status writeAspect2 = new Status().setRemoved(true); _entityService.ingestAspect(entityUrn, aspectName2, writeAspect2, TEST_AUDIT_STAMP, metadata1); @@ -905,7 +842,7 @@ public void testRetention() throws Exception { new Retention().setVersion(new VersionBasedRetention().setMaxVersions(4)))); // Ingest CorpUserInfo Aspect again - CorpUserInfo writeAspect1c = createCorpUserInfo("email_c@test.com"); + CorpUserInfo writeAspect1c = AspectGenerationUtils.createCorpUserInfo("email_c@test.com"); _entityService.ingestAspect(entityUrn, aspectName, writeAspect1c, TEST_AUDIT_STAMP, metadata1); // Ingest Status Aspect again Status writeAspect2c = new Status().setRemoved(false); @@ -925,24 +862,22 @@ public void testRetention() throws Exception { } @Test - public void testIngestAspectIfNotPresent() throws Exception { - Urn entityUrn = Urn.createFromString("urn:li:corpuser:test1"); + public void testIngestAspectIfNotPresent() throws AssertionError { + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:test1"); - SystemMetadata metadata1 = new SystemMetadata(); - metadata1.setLastObserved(1625792689); - metadata1.setRunId("run-123"); + SystemMetadata metadata1 = AspectGenerationUtils.createSystemMetadata(); - String aspectName = PegasusUtils.getAspectNameFromSchema(new CorpUserInfo().schema()); + String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo()); // Ingest CorpUserInfo Aspect - CorpUserInfo writeAspect1 = createCorpUserInfo("email@test.com"); + CorpUserInfo writeAspect1 = AspectGenerationUtils.createCorpUserInfo("email@test.com"); _entityService.ingestAspectIfNotPresent(entityUrn, aspectName, writeAspect1, TEST_AUDIT_STAMP, metadata1); - CorpUserInfo writeAspect1a = createCorpUserInfo("email_a@test.com"); + CorpUserInfo writeAspect1a = AspectGenerationUtils.createCorpUserInfo("email_a@test.com"); _entityService.ingestAspectIfNotPresent(entityUrn, aspectName, writeAspect1a, TEST_AUDIT_STAMP, metadata1); - CorpUserInfo writeAspect1b = createCorpUserInfo("email_b@test.com"); + CorpUserInfo writeAspect1b = AspectGenerationUtils.createCorpUserInfo("email_b@test.com"); _entityService.ingestAspectIfNotPresent(entityUrn, aspectName, writeAspect1b, TEST_AUDIT_STAMP, metadata1); - String aspectName2 = PegasusUtils.getAspectNameFromSchema(new Status().schema()); + String aspectName2 = AspectGenerationUtils.getAspectName(new Status()); // Ingest Status Aspect Status writeAspect2 = new Status().setRemoved(true); _entityService.ingestAspectIfNotPresent(entityUrn, aspectName2, writeAspect2, TEST_AUDIT_STAMP, metadata1); @@ -961,14 +896,6 @@ public void testIngestAspectIfNotPresent() throws Exception { assertEquals(_entityService.listLatestAspects(entityUrn.getEntityType(), aspectName2, 0, 10).getTotalCount(), 1); } - protected static AuditStamp createTestAuditStamp() { - try { - return new AuditStamp().setTime(123L).setActor(Urn.createFromString("urn:li:principal:tester")); - } catch (Exception e) { - throw new RuntimeException("Failed to create urn"); - } - } - @Nonnull protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String email) throws Exception { CorpuserUrn corpuserUrn = CorpuserUrn.createFromUrn(entityUrn); @@ -976,7 +903,7 @@ protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String Snapshot snapshot = new Snapshot(); CorpUserSnapshot corpUserSnapshot = new CorpUserSnapshot(); List userAspects = new ArrayList<>(); - userAspects.add(CorpUserAspect.create(createCorpUserInfo(email))); + userAspects.add(CorpUserAspect.create(AspectGenerationUtils.createCorpUserInfo(email))); corpUserSnapshot.setAspects(new CorpUserAspectArray(userAspects)); corpUserSnapshot.setUrn(corpuserUrn); snapshot.setCorpUserSnapshot(corpUserSnapshot); @@ -984,28 +911,11 @@ protected com.linkedin.entity.Entity createCorpUserEntity(Urn entityUrn, String return entity; } - @Nonnull - protected RecordTemplate createCorpUserKey(Urn urn) throws Exception { - return EntityKeyUtils.convertUrnToEntityKey(urn, new CorpUserKey().schema()); - } - - @Nonnull - protected CorpUserInfo createCorpUserInfo(String email) throws Exception { - CorpUserInfo corpUserInfo = new CorpUserInfo(); - corpUserInfo.setEmail(email); - corpUserInfo.setActive(true); - return corpUserInfo; - } - - protected String getAspectName(RecordTemplate record) { - return PegasusUtils.getAspectNameFromSchema(record.schema()); - } - protected Pair getAspectRecordPair(T aspect, Class clazz) throws Exception { final ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL); RecordTemplate recordTemplate = RecordUtils.toRecordTemplate(clazz, objectMapper.writeValueAsString(aspect)); - return new Pair<>(getAspectName(aspect), recordTemplate); + return new Pair<>(AspectGenerationUtils.getAspectName(aspect), recordTemplate); } } diff --git a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java index 53b7d241d03f4e..8dfaced5d6a880 100644 --- a/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java +++ b/metadata-service/factories/src/main/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStep.java @@ -17,11 +17,13 @@ import javax.annotation.Nonnull; import java.util.Optional; +import static com.linkedin.metadata.Constants.*; + @Slf4j @RequiredArgsConstructor public class IngestDataPlatformInstancesStep implements BootstrapStep { - private static final String PLATFORM_INSTANCE_ASPECT_NAME = "dataPlatformInstance"; + private static final int BATCH_SIZE = 1000; private final EntityService _entityService; @@ -47,7 +49,7 @@ private Optional getDataPlatformInstance(Urn urn) { @Override public void execute() throws Exception { log.info("Checking for DataPlatformInstance"); - if (_migrationsDao.checkIfAspectExists(PLATFORM_INSTANCE_ASPECT_NAME)) { + if (_migrationsDao.checkIfAspectExists(DATA_PLATFORM_INSTANCE_ASPECT_NAME)) { log.info("DataPlatformInstance aspect exists. Skipping step"); return; } @@ -69,7 +71,7 @@ public void execute() throws Exception { final AuditStamp aspectAuditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis()); - _entityService.ingestAspect(urn, PLATFORM_INSTANCE_ASPECT_NAME, dataPlatformInstance.get(), aspectAuditStamp, null); + _entityService.ingestAspect(urn, DATA_PLATFORM_INSTANCE_ASPECT_NAME, dataPlatformInstance.get(), aspectAuditStamp, null); } log.info("Finished ingesting DataPlatformInstance for urn {} to {}", start, start + BATCH_SIZE); start += BATCH_SIZE; diff --git a/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStepTest.java b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStepTest.java new file mode 100644 index 00000000000000..34fee379ed09ba --- /dev/null +++ b/metadata-service/factories/src/test/java/com/linkedin/metadata/boot/steps/IngestDataPlatformInstancesStepTest.java @@ -0,0 +1,149 @@ +package com.linkedin.metadata.boot.steps; + +import com.linkedin.common.DataPlatformInstance; +import com.linkedin.common.urn.Urn; +import com.linkedin.common.urn.UrnUtils; +import com.linkedin.metadata.entity.AspectMigrationsDao; +import com.linkedin.metadata.entity.EntityService; +import com.linkedin.metadata.models.AspectSpec; +import com.linkedin.metadata.models.EntitySpec; +import com.linkedin.metadata.models.registry.ConfigEntityRegistry; +import com.linkedin.metadata.models.registry.EntityRegistry; +import com.linkedin.metadata.utils.DataPlatformInstanceUtils; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.jetbrains.annotations.NotNull; +import org.testng.annotations.Test; + +import static com.linkedin.metadata.Constants.*; +import static org.mockito.Mockito.*; + + +/** + * Test the behavior of IngestDataPlatformInstancesStep. + * + * We expect it to check if any data platform instance aspects already exist in the database and if none are found, + * to go through all the stored entities and ingest a data platform instance aspect for any that are compatible with it. + * + * CorpUser is used as an example of an entity that is not compatible with data platform instance and therefore should be ignored. + * Char is used as an example of an entity that should get adorned with a data platform instance. + * + * See {@link DataPlatformInstanceUtils} for the compatibility rules. + */ +public class IngestDataPlatformInstancesStepTest { + + @Test + public void testExecuteDoesNothingWhenDataPlatformInstanceAspectsAlreadyExists() throws Exception { + final EntityService entityService = mock(EntityService.class); + final AspectMigrationsDao migrationsDao = mock(AspectMigrationsDao.class); + + mockDBWithDataPlatformInstanceAspects(migrationsDao); + + final IngestDataPlatformInstancesStep step = new IngestDataPlatformInstancesStep(entityService, migrationsDao); + step.execute(); + + verify(migrationsDao, times(1)).checkIfAspectExists(anyString()); + verifyNoMoreInteractions(migrationsDao); + verifyZeroInteractions(entityService); + } + + @Test + public void testExecuteCopesWithEmptyDB() throws Exception { + final EntityService entityService = mock(EntityService.class); + final AspectMigrationsDao migrationsDao = mock(AspectMigrationsDao.class); + + mockEmptyDB(migrationsDao); + + final IngestDataPlatformInstancesStep step = new IngestDataPlatformInstancesStep(entityService, migrationsDao); + step.execute(); + + verify(migrationsDao, times(1)).checkIfAspectExists(anyString()); + verify(migrationsDao, times(1)).countEntities(); + verifyNoMoreInteractions(migrationsDao); + verifyZeroInteractions(entityService); + } + + @Test + public void testExecuteChecksKeySpecForAllUrns() throws Exception { + final EntityRegistry entityRegistry = getTestEntityRegistry(); + final EntityService entityService = mock(EntityService.class); + final AspectMigrationsDao migrationsDao = mock(AspectMigrationsDao.class); + final int countOfCorpUserEntities = 2; + final int countOfChartEntities = 4; + final int totalUrnsInDB = countOfCorpUserEntities + countOfChartEntities; + + mockDBWithWorkToDo(entityRegistry, entityService, migrationsDao, countOfCorpUserEntities, countOfChartEntities); + + final IngestDataPlatformInstancesStep step = new IngestDataPlatformInstancesStep(entityService, migrationsDao); + step.execute(); + + verify(entityService, times(totalUrnsInDB)).getKeyAspectSpec(any(Urn.class)); + } + + @Test + public void testExecuteWhenSomeEntitiesShouldReceiveDataPlatformInstance() throws Exception { + final EntityRegistry entityRegistry = getTestEntityRegistry(); + final EntityService entityService = mock(EntityService.class); + final AspectMigrationsDao migrationsDao = mock(AspectMigrationsDao.class); + final int countOfCorpUserEntities = 5; + final int countOfChartEntities = 7; + + mockDBWithWorkToDo(entityRegistry, entityService, migrationsDao, countOfCorpUserEntities, countOfChartEntities); + + final IngestDataPlatformInstancesStep step = new IngestDataPlatformInstancesStep(entityService, migrationsDao); + step.execute(); + + verify(entityService, times(countOfChartEntities)) + .ingestAspect( + argThat(arg -> arg.getEntityType().equals("chart")), + eq(DATA_PLATFORM_INSTANCE_ASPECT_NAME), + any(DataPlatformInstance.class), + any(), + any()); + verify(entityService, times(0)) + .ingestAspect(argThat(arg -> !arg.getEntityType().equals("chart")), anyString(), any(), any(), any()); + } + + @NotNull + private ConfigEntityRegistry getTestEntityRegistry() { + return new ConfigEntityRegistry( + IngestDataPlatformInstancesStepTest.class.getClassLoader().getResourceAsStream("test-entity-registry.yaml")); + } + + private void mockDBWithDataPlatformInstanceAspects(AspectMigrationsDao migrationsDao) { + when(migrationsDao.checkIfAspectExists(DATA_PLATFORM_INSTANCE_ASPECT_NAME)).thenReturn(true); + } + + private void mockEmptyDB(AspectMigrationsDao migrationsDao) { + when(migrationsDao.checkIfAspectExists(DATA_PLATFORM_INSTANCE_ASPECT_NAME)).thenReturn(false); + when(migrationsDao.countEntities()).thenReturn(0L); + } + + private void mockDBWithWorkToDo( + EntityRegistry entityRegistry, + EntityService entityService, + AspectMigrationsDao migrationsDao, + int countOfCorpUserEntities, + int countOfChartEntities) { + List corpUserUrns = insertMockEntities(countOfCorpUserEntities, "corpuser", "urn:li:corpuser:test%d", entityRegistry, entityService); + List charUrns = insertMockEntities(countOfChartEntities, "chart", "urn:li:chart:(looker,test%d)", entityRegistry, entityService); + List allUrnsInDB = Stream.concat(corpUserUrns.stream(), charUrns.stream()).map(Urn::toString).collect(Collectors.toList()); + when(migrationsDao.checkIfAspectExists(DATA_PLATFORM_INSTANCE_ASPECT_NAME)).thenReturn(false); + when(migrationsDao.countEntities()).thenReturn((long) allUrnsInDB.size()); + when(migrationsDao.listAllUrns(anyInt(), anyInt())).thenReturn(allUrnsInDB); + } + + private List insertMockEntities(int count, String entity, String urnTemplate, EntityRegistry entityRegistry, EntityService entityService) { + EntitySpec entitySpec = entityRegistry.getEntitySpec(entity); + AspectSpec keySpec = entitySpec.getKeyAspectSpec(); + List urns = new ArrayList<>(); + for (int i = 0; i < count; i++) { + Urn urn = UrnUtils.getUrn(String.format(urnTemplate, i)); + urns.add(urn); + when(entityService.getKeyAspectSpec(urn)).thenReturn(keySpec); + } + return urns; + } +} diff --git a/metadata-service/factories/src/test/resources/test-entity-registry.yaml b/metadata-service/factories/src/test/resources/test-entity-registry.yaml new file mode 100644 index 00000000000000..45aa9b9554fb4b --- /dev/null +++ b/metadata-service/factories/src/test/resources/test-entity-registry.yaml @@ -0,0 +1,10 @@ +id: test-registry +entities: + - name: corpuser + keyAspect: corpUserKey + aspects: + - corpUserInfo + - name: chart + keyAspect: chartKey + aspects: + - domains