Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(cassandra): fix Cassandra queries used by IngestDataPlatformInstancesStep #5199

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6ba313d
fix query that checks if cassandra table exists
May 31, 2022
d7e28b0
fix problems with cassandra queries used by a bootstrapper
May 31, 2022
d5ce1a5
add an integration test for IngestDataPlatformInstancesStep
Jun 17, 2022
5bcf081
test Cassandra EntityService paging more robustly
Jun 17, 2022
3e3d4b9
make use of AspectGenerationUtils in EntityServiceTest
Jun 17, 2022
cf655e9
fix the test
Jun 17, 2022
7c40a4d
Merge branch 'master' into cassandra-boot-fixes
Jun 17, 2022
45ba92e
Merge branch 'master' into cassandra-boot-fixes
Jun 21, 2022
94c6ba0
add unit tests for IngestDataPlatformInstancesStep
Jun 24, 2022
f13964a
remove integration tests for IngestDataPlatformInstances
Jun 24, 2022
e67f679
add integration tests for AspectMigrationsDao
Jun 24, 2022
5bf2f49
optimize imports
Jun 24, 2022
fd7e24a
Merge branch 'master' into cassandra-boot-fixes
Jun 24, 2022
9d64bb8
fix code style
Jun 24, 2022
0eb00b0
Merge branch 'master' into cassandra-boot-fixes
Jun 26, 2022
f3baa88
Merge branch 'master' into cassandra-boot-fixes
Jun 27, 2022
d7d4321
Merge branch 'master' into cassandra-boot-fixes
Jun 27, 2022
0f44a5f
Merge branch 'master' into cassandra-boot-fixes
Jun 27, 2022
8023697
Merge branch 'master' into cassandra-boot-fixes
Jun 28, 2022
dad2e8f
Merge branch 'master' into cassandra-boot-fixes
Jun 29, 2022
c17bd0a
Merge branch 'master' into cassandra-boot-fixes
Jun 30, 2022
cdff59a
Merge branch 'master' into cassandra-boot-fixes
Jun 30, 2022
170412b
Merge branch 'master' into cassandra-boot-fixes
Jul 1, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ private AspectStorageValidationUtil() {
* @return {@code true} if table exists.
*/
public static boolean checkTableExists(@Nonnull CqlSession session) {
String query = String.format("SELECT columnfamily_name\n "
+ "FROM schema_columnfamilies WHERE keyspace_name='%s';",
String query = String.format("SELECT table_name \n "
+ "FROM system_schema.tables where table_name = '%s' allow filtering;",
CassandraAspect.TABLE_NAME);
ResultSet rs = session.execute(query);
return rs.all().size() > 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.datastax.oss.driver.api.core.paging.OffsetPager;
import com.datastax.oss.driver.api.core.paging.OffsetPager.Page;
import com.datastax.oss.driver.api.querybuilder.QueryBuilder;
Expand All @@ -28,10 +27,6 @@
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.ListResultMetadata;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.HashMap;
Expand All @@ -41,13 +36,12 @@
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.deleteFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.insertInto;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.literal;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.selectFrom;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.update;
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.datastax.oss.driver.api.querybuilder.QueryBuilder.*;
import static com.linkedin.metadata.Constants.*;

@Slf4j
public class CassandraAspectDao implements AspectDao, AspectMigrationsDao {
Expand Down Expand Up @@ -97,10 +91,13 @@ public long countEntities() {
SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME)
.distinct()
Copy link
Collaborator

@RyanHolstien RyanHolstien Jun 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason for switching from distinct to groupBy? They should result in the same query in this case since we're not using aggregates, but it seems like an odd choice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

QueryBuilder ignored count() when using distinct() and I was getting URNs back instead of a number. That was my workaround.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So strange 🥴 alright, that makes sense, really curious as to why it behaves that way though. Will approve, this was the only thing that seemed off to me.

.column(CassandraAspect.URN_COLUMN)
.countAll()
.build();

return _cqlSession.execute(ss).one().getLong(0);
ResultSet rs = _cqlSession.execute(ss);
// TODO: make sure it doesn't blow up on a large database
// Getting a count of distinct values in a Cassandra query doesn't seem to be feasible, but counting them in the app is dangerous
// The saving grace here is that the only place where this method is used should only run once, what the database is still young
return rs.all().size();
}

@Override
Expand All @@ -110,10 +107,11 @@ public boolean checkIfAspectExists(@Nonnull String aspectName) {
.column(CassandraAspect.URN_COLUMN)
.whereColumn(CassandraAspect.ASPECT_COLUMN).isEqualTo(literal(aspectName))
.limit(1)
.allowFiltering()
.build();

ResultSet rs = _cqlSession.execute(ss);
return rs.all().size() > 0;
return rs.one() != null;
}

private Map<String, Long> getMaxVersions(@Nonnull final String urn, @Nonnull final Set<String> aspectNames) {
Expand Down Expand Up @@ -451,7 +449,6 @@ public Iterable<String> listAllUrns(int start, int pageSize) {
validateConnection();
SimpleStatement ss = selectFrom(CassandraAspect.TABLE_NAME)
.column(CassandraAspect.URN_COLUMN)
.orderBy(CassandraAspect.URN_COLUMN, ClusteringOrder.ASC)
.build();

ResultSet rs = _cqlSession.execute(ss);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package com.linkedin.metadata;

import com.linkedin.chart.ChartInfo;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.ChangeAuditStamps;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.mxe.SystemMetadata;
import javax.annotation.Nonnull;


public class AspectGenerationUtils {

private AspectGenerationUtils() {
}

@Nonnull
public static AuditStamp createAuditStamp() {
return new AuditStamp().setTime(123L).setActor(UrnUtils.getUrn("urn:li:corpuser:tester"));
}

@Nonnull
public static SystemMetadata createSystemMetadata() {
return createSystemMetadata(1625792689, "run-123");
}

@Nonnull
public static SystemMetadata createSystemMetadata(long lastObserved, @Nonnull String runId) {
SystemMetadata metadata = new SystemMetadata();
metadata.setLastObserved(lastObserved);
metadata.setRunId(runId);
return metadata;
}

@Nonnull
public static CorpUserKey createCorpUserKey(Urn urn) {
return (CorpUserKey) EntityKeyUtils.convertUrnToEntityKey(urn, new CorpUserKey().schema());
}

@Nonnull
public static CorpUserInfo createCorpUserInfo(@Nonnull String email) {
CorpUserInfo corpUserInfo = new CorpUserInfo();
corpUserInfo.setEmail(email);
corpUserInfo.setActive(true);
return corpUserInfo;
}

@Nonnull
public static ChartInfo createChartInfo(@Nonnull String title, @Nonnull String description) {
ChartInfo chartInfo = new ChartInfo();
chartInfo.setTitle(title);
chartInfo.setDescription(description);
ChangeAuditStamps lastModified = new ChangeAuditStamps();
lastModified.setCreated(createAuditStamp());
lastModified.setLastModified(createAuditStamp());
chartInfo.setLastModified(lastModified);
return chartInfo;
}

@Nonnull
public static String getAspectName(RecordTemplate record) {
return PegasusUtils.getAspectNameFromSchema(record.schema());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.linkedin.metadata;

import com.linkedin.chart.ChartInfo;
import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.identity.CorpUserInfo;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.CorpUserKey;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nonnull;


public class AspectIngestionUtils {

private AspectIngestionUtils() {
}

@Nonnull
public static Map<Urn, CorpUserKey> ingestCorpUserKeyAspects(EntityService entityService, int aspectCount) {
return ingestCorpUserKeyAspects(entityService, aspectCount, 0);
}

@Nonnull
public static Map<Urn, CorpUserKey> ingestCorpUserKeyAspects(EntityService entityService, int aspectCount, int startIndex) {
String aspectName = AspectGenerationUtils.getAspectName(new CorpUserKey());
Map<Urn, CorpUserKey> aspects = new HashMap<>();
for (int i = startIndex; i < startIndex + aspectCount; i++) {
Urn urn = UrnUtils.getUrn(String.format("urn:li:corpuser:tester%d", i));
CorpUserKey aspect = AspectGenerationUtils.createCorpUserKey(urn);
aspects.put(urn, aspect);
entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata());
}
return aspects;
}

@Nonnull
public static Map<Urn, CorpUserInfo> ingestCorpUserInfoAspects(@Nonnull final EntityService entityService, int aspectCount) {
return ingestCorpUserInfoAspects(entityService, aspectCount, 0);
}

@Nonnull
public static Map<Urn, CorpUserInfo> ingestCorpUserInfoAspects(@Nonnull final EntityService entityService, int aspectCount, int startIndex) {
String aspectName = AspectGenerationUtils.getAspectName(new CorpUserInfo());
Map<Urn, CorpUserInfo> aspects = new HashMap<>();
for (int i = startIndex; i < startIndex + aspectCount; i++) {
Urn urn = UrnUtils.getUrn(String.format("urn:li:corpuser:tester%d", i));
String email = String.format("email%d@test.com", i);
CorpUserInfo aspect = AspectGenerationUtils.createCorpUserInfo(email);
aspects.put(urn, aspect);
entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata());
}
return aspects;
}

@Nonnull
public static Map<Urn, ChartInfo> ingestChartInfoAspects(@Nonnull final EntityService entityService, int aspectCount) {
return ingestChartInfoAspects(entityService, aspectCount, 0);
}

@Nonnull
public static Map<Urn, ChartInfo> ingestChartInfoAspects(@Nonnull final EntityService entityService, int aspectCount, int startIndex) {
String aspectName = AspectGenerationUtils.getAspectName(new ChartInfo());
Map<Urn, ChartInfo> aspects = new HashMap<>();
for (int i = startIndex; i < startIndex + aspectCount; i++) {
Urn urn = UrnUtils.getUrn(String.format("urn:li:chart:(looker,test%d)", i));
String title = String.format("Test Title %d", i);
String description = String.format("Test description %d", i);
ChartInfo aspect = AspectGenerationUtils.createChartInfo(title, description);
aspects.put(urn, aspect);
entityService.ingestAspect(urn, aspectName, aspect, AspectGenerationUtils.createAuditStamp(), AspectGenerationUtils.createSystemMetadata());
}
return aspects;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package com.linkedin.metadata.entity;

import com.linkedin.common.urn.Urn;
import com.linkedin.common.urn.UrnUtils;
import com.linkedin.metadata.AspectIngestionUtils;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.key.CorpUserKey;
import com.linkedin.metadata.models.registry.ConfigEntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import com.linkedin.metadata.models.registry.MergedEntityRegistry;
import com.linkedin.metadata.snapshot.Snapshot;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
import org.testng.annotations.Test;

import static com.linkedin.metadata.Constants.*;
import static org.testng.Assert.*;


abstract public class AspectMigrationsDaoTest<T extends AspectMigrationsDao> {

protected T _migrationsDao;

protected final EntityRegistry _snapshotEntityRegistry;
protected final EntityRegistry _configEntityRegistry;
protected final EntityRegistry _testEntityRegistry;
protected EventProducer _mockProducer;

protected EntityService _entityService;
protected RetentionService _retentionService;

protected AspectMigrationsDaoTest() throws EntityRegistryException {
_snapshotEntityRegistry = new TestEntityRegistry();
_configEntityRegistry = new ConfigEntityRegistry(Snapshot.class.getClassLoader().getResourceAsStream("entity-registry.yml"));
_testEntityRegistry = new MergedEntityRegistry(_snapshotEntityRegistry).apply(_configEntityRegistry);
}

@Test
public void testListAllUrns() throws AssertionError {
final int totalAspects = 30;
final int pageSize = 25;
final int lastPageSize = 5;
Map<Urn, CorpUserKey> ingestedAspects = AspectIngestionUtils.ingestCorpUserKeyAspects(_entityService, totalAspects);
List<String> ingestedUrns = ingestedAspects.keySet().stream().map(Urn::toString).collect(Collectors.toList());
List<String> seenUrns = new ArrayList<>();

Iterable<String> page1 = _migrationsDao.listAllUrns(0, pageSize);
List<String> page1Urns = ImmutableList.copyOf(page1);

// validate first page
assertEquals(page1Urns.size(), pageSize);
for (String urn : page1Urns) {
assertNotNull(UrnUtils.getUrn(urn));
seenUrns.add(urn);
}

Iterable<String> page2 = _migrationsDao.listAllUrns(pageSize, pageSize);
List<String> page2Urns = ImmutableList.copyOf(page2);

// validate last page
assertEquals(page2Urns.size(), lastPageSize);
for (String urn : page2Urns) {
assertNotNull(UrnUtils.getUrn(urn));
seenUrns.add(urn);
}

// validate all ingested URNs were returned exactly once
for (String urn : ingestedUrns) {
assertEquals(seenUrns.stream().filter(u -> u.equals(urn)).count(), 1);
}
}

@Test
public void testCountEntities() throws AssertionError {
AspectIngestionUtils.ingestCorpUserInfoAspects(_entityService, 11);
AspectIngestionUtils.ingestChartInfoAspects(_entityService, 22);
final int expected = 33;

long actual = _migrationsDao.countEntities();

assertEquals(actual, expected);
}

@Test
public void testCheckIfAspectExists() throws AssertionError {
boolean actual = _migrationsDao.checkIfAspectExists(CORP_USER_INFO_ASPECT_NAME);
assertFalse(actual);

AspectIngestionUtils.ingestCorpUserInfoAspects(_entityService, 1);

actual = _migrationsDao.checkIfAspectExists(CORP_USER_INFO_ASPECT_NAME);
assertTrue(actual);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package com.linkedin.metadata.entity;

import com.datastax.oss.driver.api.core.CqlSession;
import com.linkedin.metadata.CassandraTestUtils;
import com.linkedin.metadata.entity.cassandra.CassandraAspectDao;
import com.linkedin.metadata.entity.cassandra.CassandraRetentionService;
import com.linkedin.metadata.event.EventProducer;
import com.linkedin.metadata.models.registry.EntityRegistryException;
import org.testcontainers.containers.CassandraContainer;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

import static org.mockito.Mockito.*;


public class CassandraAspectMigrationsDaoTest extends AspectMigrationsDaoTest<CassandraAspectDao> {

private CassandraContainer _cassandraContainer;

public CassandraAspectMigrationsDaoTest() throws EntityRegistryException {
}

@BeforeClass
public void setupContainer() {
_cassandraContainer = CassandraTestUtils.setupContainer();
}

@AfterClass
public void tearDown() {
_cassandraContainer.stop();
}

@BeforeMethod
public void setupTest() {
CassandraTestUtils.purgeData(_cassandraContainer);
configureComponents();
}

private void configureComponents() {
CqlSession session = CassandraTestUtils.createTestSession(_cassandraContainer);
CassandraAspectDao dao = new CassandraAspectDao(session);
dao.setConnectionValidated(true);
_mockProducer = mock(EventProducer.class);
_entityService = new EntityService(dao, _mockProducer, _testEntityRegistry);
_retentionService = new CassandraRetentionService(_entityService, session, 1000);
_entityService.setRetentionService(_retentionService);

_migrationsDao = dao;
}

/**
* Ideally, all tests would be in the base class, so they're reused between all implementations.
* When that's the case - test runner will ignore this class (and its base!) so we keep this dummy test
* to make sure this class will always be discovered.
*/
@Test
public void obligatoryTest() throws AssertionError {
Assert.assertTrue(true);
}
}
Loading