Skip to content

Commit

Permalink
fix(cassandra): fix Cassandra queries used by IngestDataPlatformInsta…
Browse files Browse the repository at this point in the history
…ncesStep (datahub-project#5199)
  • Loading branch information
Justin Marozas authored and maggiehays committed Aug 1, 2022
1 parent 7cbaac9 commit 6751365
Show file tree
Hide file tree
Showing 13 changed files with 747 additions and 334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ private AspectStorageValidationUtil() {
* @return {@code true} if table exists.
*/
public static boolean checkTableExists(@Nonnull CqlSession session) {
String query = String.format("SELECT columnfamily_name\n "
+ "FROM schema_columnfamilies WHERE keyspace_name='%s';",
String query = String.format("SELECT table_name \n "
+ "FROM system_schema.tables where table_name = '%s' allow filtering;",
CassandraAspect.TABLE_NAME);
ResultSet rs = session.execute(query);
return rs.all().size() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.paging.OffsetPager;
import com.datastax.oss.driver.api.core.paging.OffsetPager.Page;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
Expand All @@ -28,10 +27,6 @@
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.HashMap;
Expand All @@ -41,13 +36,12 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.update;
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*;
import static com.linkedin.metadata.Constants.*;

@Slf4j
public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
Expand Down Expand Up @@ -97,10 +91,13 @@ public long countEntities() {
SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME)
.distinct()
.column(CassandraAspect.URN_COLUMN)
.countAll()
.build();

return _cqlSession.execute(ss).one().getLong(0);
ResultSet rs = _cqlSession.execute(ss);
// TODO: make sure it doesn't blow up on a large database
// Getting a count of distinct values in a Cassandra query doesn't seem to be feasible, but counting them in the app is dangerous
// The saving grace here is that the only place where this method is used should only run once, what the database is still young
return rs.all().size();
}

@Override
Expand All @@ -110,10 +107,11 @@ public boolean checkIfAspectExists(@Nonnull String aspectName) {
.column(CassandraAspect.URN_COLUMN)
.whereColumn(CassandraAspect.ASPECT_COLUMN).isEqualTo(literal(aspectName))
.limit(1)
.allowFiltering()
.build();

ResultSet rs = _cqlSession.execute(ss);
return rs.all().size() > 0;
return rs.one() != null;
}

private Map<String, Long> getMaxVersions(@Nonnull final String urn, @Nonnull final Set<String> aspectNames) {
Expand Down Expand Up @@ -451,7 +449,6 @@ public Iterable<String> listAllUrns(int start, int pageSize) {
validateConnection();
SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME)
.column(CassandraAspect.URN_COLUMN)
.orderBy(CassandraAspect.URN_COLUMN, ClusteringOrder.ASC)
.build();

ResultSet rs = _cqlSession.execute(ss);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.linkedin.metadata;

import com.linkedin.chart.ChartInfo;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.ChangeAuditStamps;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.mxe.SystemMetadata;
import javax.annotation.Nonnull;


public class AspectGenerationUtils {

private AspectGenerationUtils() {
}

@Nonnull
public static AuditStamp createAuditStamp() {
return new AuditStamp().setTime(123L).setActor(UrnUtils.getUrn("urn:li:corpuser:tester"));
}

@Nonnull
public static SystemMetadata createSystemMetadata() {
return createSystemMetadata(1625792689, "run-123");
}

@Nonnull
public static SystemMetadata createSystemMetadata(long lastObserved, @Nonnull String runId) {
SystemMetadata metadata = new SystemMetadata();
metadata.setLastObserved(lastObserved);
metadata.setRunId(runId);
return metadata;
}

@Nonnull
public static CorpUserKey createCorpUserKey(Urn urn) {
return (CorpUserKey) EntityKeyUtils.convertUrnToEntityKey(urn, new CorpUserKey().schema());
}

@Nonnull
public static CorpUserInfo createCorpUserInfo(@Nonnull String email) {
CorpUserInfo corpUserInfo = new CorpUserInfo();
corpUserInfo.setEmail(email);
corpUserInfo.setActive(true);
return corpUserInfo;
}

@Nonnull
public static ChartInfo createChartInfo(@Nonnull String title, @Nonnull String description) {
ChartInfo chartInfo = new ChartInfo();
chartInfo.setTitle(title);
chartInfo.setDescription(description);
ChangeAuditStamps lastModified = new ChangeAuditStamps();
lastModified.setCreated(createAuditStamp());
lastModified.setLastModified(createAuditStamp());
chartInfo.setLastModified(lastModified);
return chartInfo;
}

@Nonnull
public static String getAspectName(RecordTemplate record) {
return PegasusUtils.getAspectNameFromSchema(record.schema());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.linkedin.metadata;

import com.linkedin.chart.ChartInfo;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.CorpUserKey;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;


public class AspectIngestionUtils {

private AspectIngestionUtils() {
}

@Nonnull
public static Map<Urn, CorpUserKey> ingestCorpUserKeyAspects(EntityService entityService, int aspectCount) {
return ingestCorpUserKeyAspects(entityService, aspectCount, 0);
}

@Nonnull
public static Map<Urn, CorpUserKey> ingestCorpUserKeyAspects(EntityService entityService, int aspectCount, int startIndex) {
String aspectName = AspectGenerationUtils.getAspectName(new CorpUserKey());
Map<Urn, CorpUserKey> aspects = new HashMap<>();
for (int i = startIndex; i < startIndex + aspectCount; i++) {
Urn urn = UrnUtils.getUrn(String.format("urn:li:corpuser:tester%d", i));
CorpUserKey aspect = AspectGenerationUtils.createCorpUserKey(urn);
aspects.put(urn, aspect);
entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata());
}
return aspects;
}

@Nonnull
public static Map<Urn, CorpUserInfo> ingestCorpUserInfoAspects(@Nonnull final EntityService entityService, int aspectCount) {
return ingestCorpUserInfoAspects(entityService, aspectCount, 0);
}

@Nonnull
public static Map<Urn, CorpUserInfo> ingestCorpUserInfoAspects(@Nonnull final EntityService entityService, int aspectCount, int startIndex) {
String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo());
Map<Urn, CorpUserInfo> aspects = new HashMap<>();
for (int i = startIndex; i < startIndex + aspectCount; i++) {
Urn urn = UrnUtils.getUrn(String.format("urn:li:corpuser:tester%d", i));
String email = String.format("email%d@test.com", i);
CorpUserInfo aspect = AspectGenerationUtils.createCorpUserInfo(email);
aspects.put(urn, aspect);
entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata());
}
return aspects;
}

@Nonnull
public static Map<Urn, ChartInfo> ingestChartInfoAspects(@Nonnull final EntityService entityService, int aspectCount) {
return ingestChartInfoAspects(entityService, aspectCount, 0);
}

@Nonnull
public static Map<Urn, ChartInfo> ingestChartInfoAspects(@Nonnull final EntityService entityService, int aspectCount, int startIndex) {
String aspectName = AspectGenerationUtils.getAspectName(new ChartInfo());
Map<Urn, ChartInfo> aspects = new HashMap<>();
for (int i = startIndex; i < startIndex + aspectCount; i++) {
Urn urn = UrnUtils.getUrn(String.format("urn:li:chart:(looker,test%d)", i));
String title = String.format("Test Title %d", i);
String description = String.format("Test description %d", i);
ChartInfo aspect = AspectGenerationUtils.createChartInfo(title, description);
aspects.put(urn, aspect);
entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata());
}
return aspects;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.linkedin.metadata.entity;

import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.AspectIngestionUtils;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.models.registry.MergedEntityRegistry;
import com.linkedin.metadata.snapshot.Snapshot;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;

import static com.linkedin.metadata.Constants.*;
import static org.testng.Assert.*;


abstract public class AspectMigrationsDaoTest<T extends AspectMigrationsDao> {

protected T _migrationsDao;

protected final EntityRegistry _snapshotEntityRegistry;
protected final EntityRegistry _configEntityRegistry;
protected final EntityRegistry _testEntityRegistry;
protected EventProducer _mockProducer;

protected EntityService _entityService;
protected RetentionService _retentionService;

protected AspectMigrationsDaoTest() throws EntityRegistryException {
_snapshotEntityRegistry = new TestEntityRegistry();
_configEntityRegistry = new ConfigEntityRegistry(Snapshot.class.getClassLoader().getResourceAsStream("entity-registry.yml"));
_testEntityRegistry = new MergedEntityRegistry(_snapshotEntityRegistry).apply(_configEntityRegistry);
}

@Test
public void testListAllUrns() throws AssertionError {
final int totalAspects = 30;
final int pageSize = 25;
final int lastPageSize = 5;
Map<Urn, CorpUserKey> ingestedAspects = AspectIngestionUtils.ingestCorpUserKeyAspects(_entityService, totalAspects);
List<String> ingestedUrns = ingestedAspects.keySet().stream().map(Urn::toString).collect(Collectors.toList());
List<String> seenUrns = new ArrayList<>();

Iterable<String> page1 = _migrationsDao.listAllUrns(0, pageSize);
List<String> page1Urns = ImmutableList.copyOf(page1);

// validate first page
assertEquals(page1Urns.size(), pageSize);
for (String urn : page1Urns) {
assertNotNull(UrnUtils.getUrn(urn));
seenUrns.add(urn);
}

Iterable<String> page2 = _migrationsDao.listAllUrns(pageSize, pageSize);
List<String> page2Urns = ImmutableList.copyOf(page2);

// validate last page
assertEquals(page2Urns.size(), lastPageSize);
for (String urn : page2Urns) {
assertNotNull(UrnUtils.getUrn(urn));
seenUrns.add(urn);
}

// validate all ingested URNs were returned exactly once
for (String urn : ingestedUrns) {
assertEquals(seenUrns.stream().filter(u -> u.equals(urn)).count(), 1);
}
}

@Test
public void testCountEntities() throws AssertionError {
AspectIngestionUtils.ingestCorpUserInfoAspects(_entityService, 11);
AspectIngestionUtils.ingestChartInfoAspects(_entityService, 22);
final int expected = 33;

long actual = _migrationsDao.countEntities();

assertEquals(actual, expected);
}

@Test
public void testCheckIfAspectExists() throws AssertionError {
boolean actual = _migrationsDao.checkIfAspectExists(CORP_USER_INFO_ASPECT_NAME);
assertFalse(actual);

AspectIngestionUtils.ingestCorpUserInfoAspects(_entityService, 1);

actual = _migrationsDao.checkIfAspectExists(CORP_USER_INFO_ASPECT_NAME);
assertTrue(actual);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.linkedin.metadata.entity;

import com.datastax.oss.driver.api.core.CqlSession;
import com.linkedin.metadata.CassandraTestUtils;
import com.linkedin.metadata.entity.cassandra.CassandraAspectDao;
import com.linkedin.metadata.entity.cassandra.CassandraRetentionService;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import org.testcontainers.containers.CassandraContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;


public class CassandraAspectMigrationsDaoTest extends AspectMigrationsDaoTest<CassandraAspectDao> {

private CassandraContainer _cassandraContainer;

public CassandraAspectMigrationsDaoTest() throws EntityRegistryException {
}

@BeforeClass
public void setupContainer() {
_cassandraContainer = CassandraTestUtils.setupContainer();
}

@AfterClass
public void tearDown() {
_cassandraContainer.stop();
}

@BeforeMethod
public void setupTest() {
CassandraTestUtils.purgeData(_cassandraContainer);
configureComponents();
}

private void configureComponents() {
CqlSession session = CassandraTestUtils.createTestSession(_cassandraContainer);
CassandraAspectDao dao = new CassandraAspectDao(session);
dao.setConnectionValidated(true);
_mockProducer = mock(EventProducer.class);
_entityService = new EntityService(dao, _mockProducer, _testEntityRegistry);
_retentionService = new CassandraRetentionService(_entityService, session, 1000);
_entityService.setRetentionService(_retentionService);

_migrationsDao = dao;
}

/**
* Ideally, all tests would be in the base class, so they're reused between all implementations.
* When that's the case - test runner will ignore this class (and its base!) so we keep this dummy test
* to make sure this class will always be discovered.
*/
@Test
public void obligatoryTest() throws AssertionError {
Assert.assertTrue(true);
}
}
Loading

0 comments on commit 6751365

Please sign in to comment.