Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

moves ConceptTreeCache into BucketManager #3598

Merged
merged 2 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
Expand All @@ -21,7 +20,6 @@
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptElementId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.query.PrintSettings;
import com.bakdata.conquery.util.CalculatedValue;
import com.fasterxml.jackson.annotation.JsonIgnore;
Expand Down Expand Up @@ -49,8 +47,6 @@ public class TreeConcept extends Concept<ConceptTreeConnector> implements Concep

@JsonIgnore
private final List<ConceptTreeNode<?>> localIdMap = new ArrayList<>();
@JsonIgnore
private final Map<ImportId, ConceptTreeCache> caches = new ConcurrentHashMap<>();
@Getter
@Setter
@Valid
Expand All @@ -73,10 +69,6 @@ public Concept<?> findConcept() {
return getConcept();
}

public ConceptTreeCache getCache(ImportId imp) {
return caches.get(imp);
}

@Override
public ConceptTreeNode<?> getParent() {
return null;
Expand Down Expand Up @@ -144,13 +136,7 @@ private ConceptTreeChild findMostSpecificChild(
return best;
}

public void initializeIdCache(ImportId importId) {
caches.computeIfAbsent(importId, id -> new ConceptTreeCache(this));
}

public void removeImportCache(ImportId imp) {
caches.remove(imp);
}

/**
* Method to get the element of this concept tree that has the specified local ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.models.common.CDateSet;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.ids.specific.BucketId;
import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConnectorId;
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
Expand Down Expand Up @@ -69,6 +72,8 @@ public class BucketManager {
@Getter
private final int entityBucketSize;

private final Map<ConceptId, Map<ImportId, ConceptTreeCache>> treeCaches = new ConcurrentHashMap<>();

public static BucketManager create(Worker worker, WorkerStorage storage, int entityBucketSize) {
final Map<ConnectorId, Int2ObjectMap<Map<BucketId, CBlockId>>> connectorCBlocks = new HashMap<>();
final Map<TableId, Int2ObjectMap<List<BucketId>>> tableBuckets = new HashMap<>();
Expand All @@ -85,9 +90,7 @@ public static BucketManager create(Worker worker, WorkerStorage storage, int ent
registerBucket(bucket, entity2Bucket, tableBuckets);
});

storage.getAllCBlocks().forEach(cBlock ->
registerCBlock(cBlock, connectorCBlocks)
);
storage.getAllCBlocks().forEach(cBlock -> registerCBlock(cBlock, connectorCBlocks));

return new BucketManager(worker.getJobManager(), storage, worker, entity2Bucket, connectorCBlocks, tableBuckets, entityBucketSize);
}
Expand All @@ -107,10 +110,9 @@ private static void registerBucket(Bucket bucket, Object2IntMap<String> entity2B
entity2Bucket.put(entity, bucket.getBucket());
}

tableBuckets
.computeIfAbsent(bucket.getTable(), id -> new Int2ObjectAVLTreeMap<>())
.computeIfAbsent(bucket.getBucket(), n -> new ArrayList<>())
.add(bucket.getId());
tableBuckets.computeIfAbsent(bucket.getTable(), id -> new Int2ObjectAVLTreeMap<>())
.computeIfAbsent(bucket.getBucket(), n -> new ArrayList<>())
.add(bucket.getId());
}

/**
Expand All @@ -127,9 +129,7 @@ private static void registerCBlock(CBlock cBlock, Map<ConnectorId, Int2ObjectMap
public void fullUpdate() {
final CalculateCBlocksJob job = new CalculateCBlocksJob(storage, this, worker.getJobsExecutorService());

storage.getAllConcepts()
.filter(TreeConcept.class::isInstance)
.flatMap(concept -> concept.getConnectors().stream().map(ConceptTreeConnector.class::cast))
storage.getAllConcepts().filter(TreeConcept.class::isInstance).flatMap(concept -> concept.getConnectors().stream().map(ConceptTreeConnector.class::cast))

.forEach(connector -> storage.getAllBucketIds().forEach(bucketId -> {

Expand Down Expand Up @@ -182,23 +182,16 @@ public void removeTable(TableId table) {

// It's possible no buckets were registered yet
if (removed != null) {
removed.values()
.stream()
.flatMap(List::stream)
.forEach(this::removeBucket);
removed.values().stream().flatMap(List::stream).forEach(this::removeBucket);
}

storage.removeTable(table);
}

public void removeBucket(BucketId bucket) {
storage.getAllCBlockIds()
.filter(cblock -> cblock.getBucket().equals(bucket))
.forEach(this::removeCBlock);
storage.getAllCBlockIds().filter(cblock -> cblock.getBucket().equals(bucket)).forEach(this::removeCBlock);

tableToBuckets.getOrDefault(bucket.getImp().getTable(), Int2ObjectMaps.emptyMap())
.getOrDefault(bucket.getBucket(), Collections.emptyList())
.remove(bucket);
tableToBuckets.getOrDefault(bucket.getImp().getTable(), Int2ObjectMaps.emptyMap()).getOrDefault(bucket.getBucket(), Collections.emptyList()).remove(bucket);

storage.removeBucket(bucket);
}
Expand All @@ -221,23 +214,20 @@ public Set<String> getEntities() {
* Remove all buckets comprising the import. Which will in-turn remove all CBLocks.
*/
public void removeImport(ImportId imp) {
storage.getAllBucketIds()
.filter(bucket -> bucket.getImp().equals(imp))
.forEach(this::removeBucket);
storage.getAllBucketIds().filter(bucket -> bucket.getImp().equals(imp)).forEach(this::removeBucket);


storage.getAllConcepts()
.filter(TreeConcept.class::isInstance)
.forEach(concept -> ((TreeConcept) concept).removeImportCache(imp));
.forEach(concept -> removeConceptTreeCacheByImport(concept.getId(), imp));

storage.removeImport(imp);
}

public List<BucketId> getEntityBucketsForTable(Entity entity, TableId table) {
final int bucketId = getBucket(entity.getId());

return tableToBuckets.getOrDefault(table, Int2ObjectMaps.emptyMap())
.getOrDefault(bucketId, Collections.emptyList());
return tableToBuckets.getOrDefault(table, Int2ObjectMaps.emptyMap()).getOrDefault(bucketId, Collections.emptyList());
}

private int getBucket(String id) {
Expand Down Expand Up @@ -276,14 +266,12 @@ public Set<String> getEntitiesWithConcepts(Collection<ConceptElement<?>> concept
public Map<BucketId, CBlockId> getEntityCBlocksForConnector(Entity entity, ConnectorId connector) {
final int bucketId = getBucket(entity.getId());

return connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap())
.getOrDefault(bucketId, Collections.emptyMap());
return connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap()).getOrDefault(bucketId, Collections.emptyMap());
}

public boolean hasEntityCBlocksForConnector(Entity entity, ConnectorId connector) {
final int bucketId = getBucket(entity.getId());
final Map<BucketId, CBlockId> cblocks = connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap())
.getOrDefault(bucketId, Collections.emptyMap());
final Map<BucketId, CBlockId> cblocks = connectorToCblocks.getOrDefault(connector, Int2ObjectMaps.emptyMap()).getOrDefault(bucketId, Collections.emptyMap());

for (BucketId bucket : cblocks.keySet()) {
if (bucket.resolve().containsEntity(entity.getId())) {
Expand Down Expand Up @@ -311,13 +299,12 @@ public void removeConcept(Concept<?> concept) {

// It's possible that no data has been loaded yet
if (removed != null) {
removed.values().stream()
.map(Map::values)
.flatMap(Collection::stream)
.forEach(storage::removeCBlock);
removed.values().stream().map(Map::values).flatMap(Collection::stream).forEach(storage::removeCBlock);
}
}

removeConceptTreeCacheByConcept(concept.getId());

storage.removeConcept(concept.getId());
}

Expand All @@ -341,4 +328,16 @@ public void addConcept(Concept<?> concept) {
}


public ConceptTreeCache getConceptTreeCache(TreeConcept concept, ImportId imp) {
return treeCaches.computeIfAbsent(concept.getId(), (ignored) -> new ConcurrentHashMap<>()).computeIfAbsent(imp, (ignored) -> new ConceptTreeCache(concept));
}

public void removeConceptTreeCacheByImport(ConceptId concept, ImportId imp) {
treeCaches.get(concept).remove(imp);
}

public void removeConceptTreeCacheByConcept(ConceptId concept) {
treeCaches.remove(concept);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,11 @@ public static long estimateMemoryBytes(long entities, long entries, double depth
);
}

public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, int bucketSize) {
public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, BucketManager bucketManager) {
final int bucketSize = bucketManager.getEntityBucketSize();
final int root = bucket.getBucket() * bucketSize;

final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector);
final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector, bucketManager);
//TODO Object2LongMap
final Map<String, Long> includedConcepts = calculateConceptElementPathBloomFilter(bucketSize, bucket, mostSpecificChildren);
final Map<String, CDateRange> entitySpans = calculateEntityDateIndices(bucket);
Expand All @@ -105,12 +106,12 @@ public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket,
* Calculates the path for each event from the root of the {@link TreeConcept} to the most specific {@link ConceptTreeChild}
* denoted by the individual {@link ConceptTreeChild#getPrefix()}.
*/
private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector) {
private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector, BucketManager bucketManager) {
if (connector.getColumn() == null) {
return calculateSpecificChildrenPathsWithoutColumn(bucket, connector);
}

return calculateSpecificChildrenPathsWithColumn(bucket, connector);
return calculateSpecificChildrenPathsWithColumn(bucket, connector, bucketManager);
}

/**
Expand Down Expand Up @@ -244,13 +245,11 @@ private static int[][] calculateSpecificChildrenPathsWithoutColumn(Bucket bucket
* Calculates the path for each event from the root of the {@link TreeConcept} to the most specific {@link ConceptTreeChild}
* denoted by the individual {@link ConceptTreeChild#getPrefix()}.
*/
private static int[][] calculateSpecificChildrenPathsWithColumn(Bucket bucket, ConceptTreeConnector connector) {
private static int[][] calculateSpecificChildrenPathsWithColumn(Bucket bucket, ConceptTreeConnector connector, BucketManager bucketManager) {

final Column column = connector.getColumn().resolve();

connector.getConcept().initializeIdCache(bucket.getImp());

final ConceptTreeCache cache = connector.getConcept().getCache(bucket.getImp());
final ConceptTreeCache cache = bucketManager.getConceptTreeCache(connector.getConcept(), bucket.getImp());
final int[] rootPrefix = connector.getConcept().getPrefix();

final IntFunction<Map<String, Object>> mapCalculator = bucket.mapCalculator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public void run() {

log.trace("BEGIN calculating CBlock for {}", getCBlockId());

final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize());
final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager);

log.trace("DONE calculating CBlock for {}", getCBlockId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,7 @@ public void cBlock() throws IOException, JSONException {

workerStorage.addBucket(bucket);

final CBlock cBlock = CBlock.createCBlock(connector, bucket, 10);

final CBlock cBlock = new CBlock(bucket.getId(), connector.getId(), 0, Collections.emptyMap(), Collections.emptyMap(), new int[0][]);

SerializationTestUtil.forType(CBlock.class)
.objectMappers(getShardInternalMapper())
Expand Down
Loading