From 4b214910d7bf60f34dfcdaf59c34b501ce8acd94 Mon Sep 17 00:00:00 2001 From: awildturtok <1553491+awildturtok@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:39:30 +0200 Subject: [PATCH] moves ConceptTreeCache into BucketManager It was always a bit iffy having that embedded directly in the Concept, especially since it isnt persisted. --- .../datasets/concepts/tree/TreeConcept.java | 45 +++++++------------ .../conquery/models/events/BucketManager.java | 13 +++++- .../conquery/models/events/CBlock.java | 34 +++++++------- .../models/jobs/CalculateCBlocksJob.java | 12 ++--- .../conquery/models/SerializationTests.java | 2 +- 5 files changed, 49 insertions(+), 57 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java index 8cbc0d80f45..9e3ae17a4d0 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java @@ -4,12 +4,12 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Stream; +import jakarta.validation.Valid; +import jakarta.validation.constraints.NotNull; import com.bakdata.conquery.io.cps.CPSType; import com.bakdata.conquery.io.jackson.Initializing; -import com.bakdata.conquery.models.datasets.Import; import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.datasets.concepts.SelectHolder; @@ -20,8 +20,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import jakarta.validation.Valid; -import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.Setter; import lombok.extern.slf4j.Slf4j; @@ -43,35 +41,28 @@ public class TreeConcept extends Concept implements Concep @JsonIgnore private final List> localIdMap = new ArrayList<>(); - @Getter @Setter @Valid private List children = Collections.emptyList(); - @JsonIgnore @Getter @Setter private int localId; - @NotNull @Getter @Setter @Valid @JsonManagedReference private List selects = new ArrayList<>(); - @JsonIgnore - private final Map caches = new ConcurrentHashMap<>(); + private int nChildren = -1; @Override public Concept findConcept() { return getConcept(); } - public ConceptTreeCache getCache(Import imp) { - return caches.get(imp); - } @Override public ConceptTreeNode getParent() { @@ -84,11 +75,17 @@ public void clearMatchingStats() { getAllChildren().forEach(ConceptTreeChild::clearMatchingStats); } + @JsonIgnore + public Stream getAllChildren() { + return localIdMap.stream().filter(ConceptTreeChild.class::isInstance).map(ConceptTreeChild.class::cast); + } + @Override public boolean matchesPrefix(int[] conceptPrefix) { return conceptPrefix != null && conceptPrefix[0] == 0; } + @Override public void init() { setLocalId(0); localIdMap.add(this); @@ -102,7 +99,8 @@ public void init() { try { con.getCondition().init(this); - } catch (ConceptConfigurationException e) { + } + catch (ConceptConfigurationException e) { throw new RuntimeException("Unable to init condition", e); } } @@ -117,7 +115,8 @@ public void init() { ctc.init(); - } catch (Exception e) { + } + catch (Exception e) { throw new RuntimeException("Error trying to consolidate the node " + ctc.getLabel() + " in " + getLabel(), e); } @@ -166,14 +165,6 @@ private ConceptElement findMostSpecificChild(String stringValue, CalculatedValue return best; } - @JsonIgnore - public Stream getAllChildren() { - return localIdMap.stream().filter(ConceptTreeChild.class::isInstance).map(ConceptTreeChild.class::cast); - } - - @JsonIgnore - private int nChildren = -1; - @Override @JsonIgnore public int countElements() { @@ -184,13 +175,6 @@ public int countElements() { return nChildren = 1 + (int) getAllChildren().count(); } - public void initializeIdCache(Import importId) { - caches.computeIfAbsent(importId, id -> new ConceptTreeCache(this)); - } - - public void removeImportCache(Import imp) { - caches.remove(imp); - } /** * Method to get the element of this concept tree that has the specified local ID. @@ -208,5 +192,6 @@ public ConceptTreeNode getElementByLocalId(int localId) { return localIdMap.get(localId); } - public static class TreeConceptInitializer extends Initializing.Converter {} + public static class TreeConceptInitializer extends Initializing.Converter { + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java index d7f2a99f4a9..6377aaf11bb 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java @@ -8,6 +8,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import com.bakdata.conquery.io.storage.WorkerStorage; import com.bakdata.conquery.models.common.CDateSet; @@ -16,9 +17,12 @@ import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.datasets.concepts.Connector; +import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector; import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; import com.bakdata.conquery.models.identifiable.ids.specific.CBlockId; +import com.bakdata.conquery.models.identifiable.ids.specific.ConceptId; +import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.jobs.CalculateCBlocksJob; import com.bakdata.conquery.models.jobs.JobManager; import com.bakdata.conquery.models.query.entity.Entity; @@ -64,6 +68,8 @@ public class BucketManager { */ private final Map>> tableToBuckets; + private final Map> conceptTreeCaches = new ConcurrentHashMap<>(); + @Getter private final int entityBucketSize; @@ -157,6 +163,11 @@ public void fullUpdate() { } } + public ConceptTreeCache getConceptTreeCache(ConceptId concept, ImportId importId) { + return conceptTreeCaches.computeIfAbsent(concept, (ignored) -> new ConcurrentHashMap<>()) + .computeIfAbsent(importId, (ignored) -> new ConceptTreeCache((TreeConcept) storage.getConcept(concept))); + } + public boolean hasCBlock(CBlockId id) { return storage.getCBlock(id) != null; } @@ -255,7 +266,7 @@ public void removeImport(Import imp) { continue; } - ((TreeConcept) concept).removeImportCache(imp); + conceptTreeCaches.get(concept).remove(imp.getId()); } storage.removeImport(imp.getId()); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java index cfc5e0182ba..bb4e5a74f95 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java @@ -4,6 +4,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import jakarta.validation.constraints.NotNull; import com.bakdata.conquery.io.jackson.serializer.CBlockDeserializer; import com.bakdata.conquery.io.jackson.serializer.NsIdRef; @@ -29,7 +30,6 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import jakarta.validation.constraints.NotNull; import lombok.Getter; import lombok.RequiredArgsConstructor; import lombok.Setter; @@ -92,10 +92,10 @@ public static long estimateMemoryBytes(long entities, long entries, double depth ); } - public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, int bucketSize) { + public static CBlock calculateCBlock(ConceptTreeConnector connector, Bucket bucket, int bucketSize, BucketManager bucketManager) { final int root = bucket.getBucket() * bucketSize; - final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector); + final int[][] mostSpecificChildren = calculateSpecificChildrenPaths(bucket, connector, bucketManager); //TODO Object2LongMap final Map includedConcepts = calculateConceptElementPathBloomFilter(bucketSize, bucket, mostSpecificChildren); final Map entitySpans = calculateEntityDateIndices(bucket); @@ -107,38 +107,34 @@ public static CBlock createCBlock(ConceptTreeConnector connector, Bucket bucket, * Calculates the path for each event from the root of the {@link TreeConcept} to the most specific {@link ConceptTreeChild} * denoted by the individual {@link ConceptTreeChild#getPrefix()}. */ - private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector) { + private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTreeConnector connector, BucketManager bucketManager) { - final Column column; + final TreeConcept concept = connector.getConcept(); + final CTCondition connectorCondition = connector.getCondition(); - final TreeConcept treeConcept = connector.getConcept(); + final Column column; + final ConceptTreeCache cache; // If we have a column, and it is of string-type, we initialize a cache. if (connector.getColumn() != null && bucket.getStore(connector.getColumn()) instanceof StringStore) { - column = connector.getColumn(); - - treeConcept.initializeIdCache(bucket.getImp()); + cache = bucketManager.getConceptTreeCache(concept.getId(), bucket.getImp().getId()); } // No column only possible if we have just one tree element! - else if (treeConcept.countElements() == 1) { + else if (concept.countElements() == 1) { column = null; + cache = null; } else { throw new IllegalStateException(String.format("Cannot build tree over Connector[%s] without Column", connector.getId())); } - final CTCondition connectorCondition = connector.getCondition(); - final int[][] mostSpecificChildren = new int[bucket.getNumberOfEvents()][]; Arrays.fill(mostSpecificChildren, ConceptTreeConnector.NOT_CONTAINED); - final ConceptTreeCache cache = treeConcept.getCache(bucket.getImp()); - for (int event = 0; event < bucket.getNumberOfEvents(); event++) { - try { String stringValue = ""; @@ -161,17 +157,17 @@ else if (treeConcept.countElements() == 1) { // Events without values are assigned to the root if (column != null && !has) { - mostSpecificChildren[event] = treeConcept.getPrefix(); + mostSpecificChildren[event] = concept.getPrefix(); continue; } final ConceptElement child = cache == null - ? treeConcept.findMostSpecificChild(stringValue, rowMap) + ? concept.findMostSpecificChild(stringValue, rowMap) : cache.findMostSpecificChild(stringValue, rowMap); // All unresolved elements resolve to the root. if (child == null) { - mostSpecificChildren[event] = treeConcept.getPrefix(); + mostSpecificChildren[event] = concept.getPrefix(); continue; } @@ -179,7 +175,7 @@ else if (treeConcept.countElements() == 1) { mostSpecificChildren[event] = child.getPrefix(); } catch (ConceptConfigurationException ex) { - log.error("Failed to resolve event {}-{} against concept {}", bucket, event, treeConcept, ex); + log.error("Failed to resolve event {}-{} against concept {}", bucket, event, concept, ex); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java index 91287384b69..ceeaa4d7ef5 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java @@ -63,11 +63,11 @@ public void execute() throws Exception { getProgressReporter().setMax(tasks.size()); - final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(getExecutorService()); + final ListeningExecutorService listenableExecutorService = MoreExecutors.listeningDecorator(executorService); final List> futures = tasks.stream() - .map(executorService::submit) + .map(listenableExecutorService::submit) .peek(fut -> fut.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor())) .collect(Collectors.toList()); @@ -79,10 +79,10 @@ public void execute() throws Exception { all.get(1, TimeUnit.MINUTES); } catch (TimeoutException exception) { - log.debug("submitted={}, pool={}", tasks.size(), getExecutorService()); + log.debug("submitted={}, pool={}", tasks.size(), executorService); - if (log.isTraceEnabled() && getExecutorService() instanceof ThreadPoolExecutor) { - log.trace("Waiting for {}", ((ThreadPoolExecutor) getExecutorService()).getQueue()); + if (log.isTraceEnabled() && executorService instanceof ThreadPoolExecutor) { + log.trace("Waiting for {}", ((ThreadPoolExecutor) executorService).getQueue()); } } } @@ -119,7 +119,7 @@ public void run() { log.trace("BEGIN calculating CBlock for {}", getCBlockId()); - final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize()); + final CBlock cBlock = CBlock.calculateCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize(), bucketManager); log.trace("DONE calculating CBlock for {}", getCBlockId()); diff --git a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java index cd0bd04dc0e..0ab76f05c16 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java @@ -585,7 +585,7 @@ public void serialize() throws IOException, JSONException { final Bucket bucket = new Bucket(0, new ColumnStore[0], Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(),0, imp); - final CBlock cBlock = CBlock.createCBlock(connector, bucket, 10); + final CBlock cBlock = CBlock.calculateCBlock(connector, bucket, 10, bucketManager); registry.register(dataset) .register(table)