Skip to content

Commit

Permalink
moves ConceptTreeCache into BucketManager
Browse files Browse the repository at this point in the history
It was always a bit iffy having that embedded directly in the Concept, especially since it isnt persisted.
  • Loading branch information
awildturtok committed Oct 9, 2024
1 parent 720fb46 commit 4b21491
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.io.cps.CPSType;
import com.bakdata.conquery.io.jackson.Initializing;
import com.bakdata.conquery.models.datasets.Import;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.SelectHolder;
Expand All @@ -20,8 +20,6 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -43,35 +41,28 @@ public class TreeConcept extends Concept<ConceptTreeConnector> implements Concep

@JsonIgnore
private final List<ConceptTreeNode<?>> localIdMap = new ArrayList<>();

@Getter
@Setter
@Valid
private List<ConceptTreeChild> children = Collections.emptyList();

@JsonIgnore
@Getter
@Setter
private int localId;

@NotNull
@Getter
@Setter
@Valid
@JsonManagedReference
private List<UniversalSelect> selects = new ArrayList<>();

@JsonIgnore
private final Map<Import, ConceptTreeCache> caches = new ConcurrentHashMap<>();
private int nChildren = -1;

@Override
public Concept<?> findConcept() {
return getConcept();
}

public ConceptTreeCache getCache(Import imp) {
return caches.get(imp);
}

@Override
public ConceptTreeNode<?> getParent() {
Expand All @@ -84,11 +75,17 @@ public void clearMatchingStats() {
getAllChildren().forEach(ConceptTreeChild::clearMatchingStats);
}

@JsonIgnore
public Stream<ConceptTreeChild> getAllChildren() {
return localIdMap.stream().filter(ConceptTreeChild.class::isInstance).map(ConceptTreeChild.class::cast);
}

@Override
public boolean matchesPrefix(int[] conceptPrefix) {
return conceptPrefix != null && conceptPrefix[0] == 0;
}

@Override
public void init() {
setLocalId(0);
localIdMap.add(this);
Expand All @@ -102,7 +99,8 @@ public void init() {

try {
con.getCondition().init(this);
} catch (ConceptConfigurationException e) {
}
catch (ConceptConfigurationException e) {
throw new RuntimeException("Unable to init condition", e);
}
}
Expand All @@ -117,7 +115,8 @@ public void init() {

ctc.init();

} catch (Exception e) {
}
catch (Exception e) {
throw new RuntimeException("Error trying to consolidate the node " + ctc.getLabel() + " in " + getLabel(), e);
}

Expand Down Expand Up @@ -166,14 +165,6 @@ private ConceptElement findMostSpecificChild(String stringValue, CalculatedValue
return best;
}

@JsonIgnore
public Stream<ConceptTreeChild> getAllChildren() {
return localIdMap.stream().filter(ConceptTreeChild.class::isInstance).map(ConceptTreeChild.class::cast);
}

@JsonIgnore
private int nChildren = -1;

@Override
@JsonIgnore
public int countElements() {
Expand All @@ -184,13 +175,6 @@ public int countElements() {
return nChildren = 1 + (int) getAllChildren().count();
}

public void initializeIdCache(Import importId) {
caches.computeIfAbsent(importId, id -> new ConceptTreeCache(this));
}

public void removeImportCache(Import imp) {
caches.remove(imp);
}

/**
* Method to get the element of this concept tree that has the specified local ID.
Expand All @@ -208,5 +192,6 @@ public ConceptTreeNode<?> getElementByLocalId(int localId) {
return localIdMap.get(localId);
}

public static class TreeConceptInitializer extends Initializing.Converter<TreeConcept> {}
public static class TreeConceptInitializer extends Initializing.Converter<TreeConcept> {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import com.bakdata.conquery.io.storage.WorkerStorage;
import com.bakdata.conquery.models.common.CDateSet;
Expand All @@ -16,9 +17,12 @@
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.ConceptElement;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId;
import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId;
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.jobs.CalculateCBlocksJob;
import com.bakdata.conquery.models.jobs.JobManager;
import com.bakdata.conquery.models.query.entity.Entity;
Expand Down Expand Up @@ -64,6 +68,8 @@ public class BucketManager {
*/
private final Map<Table, Int2ObjectMap<List<Bucket>>> tableToBuckets;

private final Map<ConceptId, Map<ImportId, ConceptTreeCache>> conceptTreeCaches = new ConcurrentHashMap<>();

@Getter
private final int entityBucketSize;

Expand Down Expand Up @@ -157,6 +163,11 @@ public void fullUpdate() {
}
}

public ConceptTreeCache getConceptTreeCache(ConceptId concept, ImportId importId) {
return conceptTreeCaches.computeIfAbsent(concept, (ignored) -> new ConcurrentHashMap<>())
.computeIfAbsent(importId, (ignored) -> new ConceptTreeCache((TreeConcept) storage.getConcept(concept)));
}

public boolean hasCBlock(CBlockId id) {
return storage.getCBlock(id) != null;
}
Expand Down Expand Up @@ -255,7 +266,7 @@ public void removeImport(Import imp) {
continue;
}

((TreeConcept) concept).removeImportCache(imp);
conceptTreeCaches.get(concept).remove(imp.getId());
}
storage.removeImport(imp.getId());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import jakarta.validation.constraints.NotNull;

import com.bakdata.conquery.io.jackson.serializer.CBlockDeserializer;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
Expand All @@ -29,7 +30,6 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import jakarta.validation.constraints.NotNull;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.Setter;
Expand Down Expand Up @@ -92,10 +92,10 @@ public static long estimateMemoryBytes(long entities, long entries, double depth
);
}

public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, int bucketSize) {
public static CBlock calculateCBlock(ConceptTreeConnector connector, Bucket bucket, int bucketSize, BucketManager bucketManager) {
final int root = bucket.getBucket() * bucketSize;

final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector);
final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector, bucketManager);
//TODO Object2LongMap
final Map<String, Long> includedConcepts = calculateConceptElementPathBloomFilter(bucketSize, bucket, mostSpecificChildren);
final Map<String, CDateRange> entitySpans = calculateEntityDateIndices(bucket);
Expand All @@ -107,38 +107,34 @@ public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket,
* Calculates the path for each event from the root of the {@link TreeConcept} to the most specific {@link ConceptTreeChild}
* denoted by the individual {@link ConceptTreeChild#getPrefix()}.
*/
private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector) {
private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector, BucketManager bucketManager) {

final Column column;
final TreeConcept concept = connector.getConcept();
final CTCondition connectorCondition = connector.getCondition();

final TreeConcept treeConcept = connector.getConcept();
final Column column;
final ConceptTreeCache cache;

// If we have a column, and it is of string-type, we initialize a cache.
if (connector.getColumn() != null && bucket.getStore(connector.getColumn()) instanceof StringStore) {

column = connector.getColumn();

treeConcept.initializeIdCache(bucket.getImp());
cache = bucketManager.getConceptTreeCache(concept.getId(), bucket.getImp().getId());
}
// No column only possible if we have just one tree element!
else if (treeConcept.countElements() == 1) {
else if (concept.countElements() == 1) {
column = null;
cache = null;
}
else {
throw new IllegalStateException(String.format("Cannot build tree over Connector[%s] without Column", connector.getId()));
}

final CTCondition connectorCondition = connector.getCondition();

final int[][] mostSpecificChildren = new int[bucket.getNumberOfEvents()][];

Arrays.fill(mostSpecificChildren, ConceptTreeConnector.NOT_CONTAINED);

final ConceptTreeCache cache = treeConcept.getCache(bucket.getImp());

for (int event = 0; event < bucket.getNumberOfEvents(); event++) {


try {
String stringValue = "";

Expand All @@ -161,25 +157,25 @@ else if (treeConcept.countElements() == 1) {

// Events without values are assigned to the root
if (column != null && !has) {
mostSpecificChildren[event] = treeConcept.getPrefix();
mostSpecificChildren[event] = concept.getPrefix();
continue;
}

final ConceptElement<?> child = cache == null
? treeConcept.findMostSpecificChild(stringValue, rowMap)
? concept.findMostSpecificChild(stringValue, rowMap)
: cache.findMostSpecificChild(stringValue, rowMap);

// All unresolved elements resolve to the root.
if (child == null) {
mostSpecificChildren[event] = treeConcept.getPrefix();
mostSpecificChildren[event] = concept.getPrefix();
continue;
}

// put path into event
mostSpecificChildren[event] = child.getPrefix();
}
catch (ConceptConfigurationException ex) {
log.error("Failed to resolve event {}-{} against concept {}", bucket, event, treeConcept, ex);
log.error("Failed to resolve event {}-{} against concept {}", bucket, event, concept, ex);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ public void execute() throws Exception {

getProgressReporter().setMax(tasks.size());

final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(getExecutorService());
final ListeningExecutorService listenableExecutorService = MoreExecutors.listeningDecorator(executorService);

final List<? extends ListenableFuture<?>> futures =
tasks.stream()
.map(executorService::submit)
.map(listenableExecutorService::submit)
.peek(fut -> fut.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor()))
.collect(Collectors.toList());

Expand All @@ -79,10 +79,10 @@ public void execute() throws Exception {
all.get(1, TimeUnit.MINUTES);
}
catch (TimeoutException exception) {
log.debug("submitted={}, pool={}", tasks.size(), getExecutorService());
log.debug("submitted={}, pool={}", tasks.size(), executorService);

if (log.isTraceEnabled() && getExecutorService() instanceof ThreadPoolExecutor) {
log.trace("Waiting for {}", ((ThreadPoolExecutor) getExecutorService()).getQueue());
if (log.isTraceEnabled() && executorService instanceof ThreadPoolExecutor) {
log.trace("Waiting for {}", ((ThreadPoolExecutor) executorService).getQueue());
}
}
}
Expand Down Expand Up @@ -119,7 +119,7 @@ public void run() {

log.trace("BEGIN calculating CBlock for {}", getCBlockId());

final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize());
final CBlock cBlock = CBlock.calculateCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize(), bucketManager);

log.trace("DONE calculating CBlock for {}", getCBlockId());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ public void serialize() throws IOException, JSONException {
final Bucket bucket = new Bucket(0, new ColumnStore[0], Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(),0, imp);


final CBlock cBlock = CBlock.createCBlock(connector, bucket, 10);
final CBlock cBlock = CBlock.calculateCBlock(connector, bucket, 10, bucketManager);

registry.register(dataset)
.register(table)
Expand Down

0 comments on commit 4b21491

Please sign in to comment.