Skip to content

Commit

Permalink
Merge pull request #2196 from bakdata/feature/bucket-iterator
Browse files Browse the repository at this point in the history
Implement our own iterator for Bucket.Entry:
  • Loading branch information
awildturtok authored Nov 16, 2021
2 parents 9a70c31 + 87ef537 commit a44c909
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,25 @@
package com.bakdata.conquery.io.jackson.serializer;

import java.io.IOException;
import java.util.Optional;

import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.events.CBlock;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.*;
import com.fasterxml.jackson.databind.BeanDescription;
import com.fasterxml.jackson.databind.BeanProperty;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.deser.ContextualDeserializer;
import com.fasterxml.jackson.databind.deser.ResolvableDeserializer;
import com.fasterxml.jackson.databind.jsontype.TypeDeserializer;
import lombok.AllArgsConstructor;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.Optional;

@Slf4j
@AllArgsConstructor @NoArgsConstructor
public class CBlockDeserializer extends JsonDeserializer<CBlock> implements ContextualDeserializer {
Expand All @@ -27,7 +32,7 @@ public CBlock deserialize(JsonParser p, DeserializationContext ctxt) throws IOEx

TreeConcept concept = block.getConnector().getConcept();

if(concept != null && block.getMostSpecificChildren() != null) {
if(block.getMostSpecificChildren() != null) {

// deduplicate concrete paths after loading from disk.
for (int event = 0; event < block.getMostSpecificChildren().length; event++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,17 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import javax.annotation.CheckForNull;
import javax.validation.Valid;

import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.conditions.CTCondition;
import com.bakdata.conquery.models.datasets.concepts.filters.Filter;
import com.bakdata.conquery.models.datasets.Column;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.BucketEntry;
import com.bakdata.conquery.models.events.CBlock;
import com.bakdata.conquery.models.events.MajorTypeId;
import com.bakdata.conquery.models.events.stores.root.StringStore;
import com.bakdata.conquery.models.exceptions.ConceptConfigurationException;
import com.bakdata.conquery.util.CalculatedValue;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
import io.dropwizard.validation.ValidationMethod;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.stream.IntStream;

import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
Expand Down Expand Up @@ -94,14 +93,6 @@ public boolean containsEntity(int entity) {
return getEntityStart(entity) != -1;
}

public Iterable<BucketEntry> entries() {
return () -> entities()
.stream()
.flatMap(entity -> IntStream.range(getEntityStart(entity), getEntityEnd(entity))
.mapToObj(e -> new BucketEntry(entity, e))
)
.iterator();
}

public int getEntityStart(int entityId) {
return start[getEntityIndex(entityId)];
Expand Down

This file was deleted.

126 changes: 96 additions & 30 deletions backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
package com.bakdata.conquery.models.events;

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import javax.validation.constraints.NotNull;

import com.bakdata.conquery.io.jackson.serializer.CBlockDeserializer;
import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.models.common.daterange.CDateRange;
Expand All @@ -8,7 +14,11 @@
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.datasets.concepts.tree.*;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild;
import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeConnector;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeChildPrefixIndex;
import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept;
import com.bakdata.conquery.models.events.stores.root.StringStore;
import com.bakdata.conquery.models.exceptions.ConceptConfigurationException;
import com.bakdata.conquery.models.identifiable.IdentifiableImpl;
Expand All @@ -24,11 +34,6 @@
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import javax.validation.constraints.NotNull;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
* Metadata for connection of {@link Bucket} and {@link Concept}
* <p>
Expand All @@ -44,6 +49,7 @@ public class CBlock extends IdentifiableImpl<CBlockId> implements NamespacedIden

/**
* Estimate the memory usage of CBlocks.
*
* @param depthEstimate estimate of depth of mostSpecificChildren
*/
public static long estimateMemoryBytes(long entities, long entries, double depthEstimate) {
Expand Down Expand Up @@ -71,7 +77,7 @@ public static long estimateMemoryBytes(long entities, long entries, double depth
private final int root;

/**
* Crude Bloomfilter for Concept inclusion per Entity: Each set bit denotes that the concept (with localId <= 64) or a descendant of that concept (with localId > 64) is present for the entity in this Bucket.
* Crude Bloomfilter for Concept inclusion per Entity: Each set bit denotes that the concept (with localId <= 64) or a descendant of that concept (with localId > 64) is present for the entity in this Bucket.
*/
private final long[] includedConceptElementsPerEntity;

Expand Down Expand Up @@ -157,7 +163,7 @@ private static int[][] calculateSpecificChildrenPaths(Bucket bucket, ConceptTree
treeConcept.initializeIdCache(stringStore, bucket.getImp());
}
// No column only possible if we have just one tree element!
else if(treeConcept.countElements() == 1){
else if (treeConcept.countElements() == 1) {
stringStore = null;
}
else {
Expand All @@ -173,10 +179,10 @@ else if(treeConcept.countElements() == 1){

final int[] root = treeConcept.getPrefix();

for (BucketEntry entry : bucket.entries()) {
try {
final int event = entry.getEvent();
for (int event = 0; event < bucket.getNumberOfEvents(); event++) {


try {
// Events without values are omitted
// Events can also be filtered, allowing a single table to be used by multiple connectors.
if (column != null && !bucket.has(event, column)) {
Expand All @@ -192,7 +198,9 @@ else if(treeConcept.countElements() == 1){
}

// Lazy evaluation of map to avoid allocations if possible.
final CalculatedValue<Map<String, Object>> rowMap = new CalculatedValue<>(() -> bucket.calculateMap(event));
// Copy event for closure.
final int _event = event;
final CalculatedValue<Map<String, Object>> rowMap = new CalculatedValue<>(() -> bucket.calculateMap(_event));


if ((connector.getCondition() != null && !connector.getCondition().matches(stringValue, rowMap))) {
Expand All @@ -201,8 +209,8 @@ else if(treeConcept.countElements() == 1){
}

ConceptTreeChild child = cache == null
? treeConcept.findMostSpecificChild(stringValue, rowMap)
: cache.findMostSpecificChild(valueIndex, stringValue, rowMap);
? treeConcept.findMostSpecificChild(stringValue, rowMap)
: cache.findMostSpecificChild(valueIndex, stringValue, rowMap);

// All unresolved elements resolve to the root.
if (child == null) {
Expand All @@ -214,7 +222,7 @@ else if(treeConcept.countElements() == 1){
mostSpecificChildren[event] = child.getPrefix();
}
catch (ConceptConfigurationException ex) {
log.error("Failed to resolve event " + bucket + "-" + entry.getEvent() + " against concept " + treeConcept, ex);
log.error("Failed to resolve event {}-{} against concept {}", bucket, event, treeConcept, ex);
}
}

Expand All @@ -239,15 +247,25 @@ else if(treeConcept.countElements() == 1){
*/
private static long[] calculateConceptElementPathBloomFilter(int bucketSize, Bucket bucket, int[][] mostSpecificChildren) {
long[] includedConcepts = new long[bucketSize];
for (BucketEntry entry : bucket.entries()) {
final int[] mostSpecificChild = mostSpecificChildren[entry.getEvent()];

for (int i = 0; i < mostSpecificChild.length; i++) {
for (int entity : bucket.getEntities()) {

final int entityIndex = bucket.getEntityIndex(entity);
final int end = bucket.getEntityEnd(entity);

for (int event = bucket.getEntityStart(entity); event < end; event++) {

final int[] mostSpecificChild = mostSpecificChildren[event];

for (int i = 0; i < mostSpecificChild.length; i++) {

final long mask = calculateBitMask(i, mostSpecificChild);
includedConcepts[entry.getEntity() - bucketSize*bucket.getBucket()] |= mask;
final long mask = calculateBitMask(i, mostSpecificChild);

includedConcepts[entityIndex] |= mask;
}
}
}

return includedConcepts;
}

Expand All @@ -259,40 +277,88 @@ public static long calculateBitMask(int pathIndex, int[] mostSpecificChild) {
if (pathIndex < 0) {
return 0;
}
if (mostSpecificChild[pathIndex] < 64) {
if (mostSpecificChild[pathIndex] < Long.SIZE) {
return 1L << mostSpecificChild[pathIndex];
}
return calculateBitMask(pathIndex-1, mostSpecificChild);
return calculateBitMask(pathIndex - 1, mostSpecificChild);
}



/**
* For every included entity, calculate min and max and store them as statistics in the CBlock.
*
* @implNote This is an unrolled implementation of {@link CDateRange#spanClosed(CDateRange)}.
*/
private static CDateRange[] calculateEntityDateIndices(Bucket bucket, int bucketSize) {
CDateRange[] spans = new CDateRange[bucketSize];

Arrays.fill(spans, CDateRange.all());

// First initialize to an illegal state that's easy on our comparisons

Table table = bucket.getTable();


for (Column column : table.getColumns()) {
if (!column.getType().isDateCompatible()) {
continue;
}

for (BucketEntry entry : bucket.entries()) {
if (!bucket.has(entry.getEvent(), column)) {
continue;
}
for (int entity : bucket.getEntities()) {
final int index = bucket.getEntityIndex(entity);
final int end = bucket.getEntityEnd(entity);

// We unroll spanClosed for the whole bucket/entity, this avoids costly
int max = Integer.MIN_VALUE;
int min = Integer.MAX_VALUE;

CDateRange range = bucket.getAsDateRange(entry.getEvent(), column);

final int index = bucket.getEntityIndex(entry.getEntity());
for (int event = bucket.getEntityStart(entity); event < end; event++) {
if (!bucket.has(event, column)) {
continue;
}

CDateRange range = bucket.getAsDateRange(event, column);

if (range.hasLowerBound()) {
final int minValue = range.getMinValue();

max = Math.max(max, minValue);
min = Math.min(min, minValue);
}

if (range.hasUpperBound()) {
final int maxValue = range.getMaxValue();

max = Math.max(max, maxValue);
min = Math.min(min, maxValue);
}
}

spans[index] = spans[index].spanClosed(range);

spans[index] = createClosed(max, min, spans[index]);
}
}

return spans;
}

/**
* Helper method for calculateEntityDateIndices, swapping {@link Integer#MIN_VALUE}/{@link Integer#MAX_VALUE} for higher performance.
*/
private static CDateRange createClosed(int max, int min, CDateRange in) {
if(max == Integer.MIN_VALUE && min == Integer.MAX_VALUE){
return in;
}

if (max == Integer.MIN_VALUE){
return in.spanClosed(CDateRange.atLeast(min));
}

if (min == Integer.MAX_VALUE) {
return in.spanClosed(CDateRange.atMost(max));
}

return in.spanClosed(CDateRange.of(min, max));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,11 @@
import com.bakdata.conquery.models.events.stores.root.ColumnStore;
import com.bakdata.conquery.models.exceptions.JSONException;
import com.bakdata.conquery.models.identifiable.CentralRegistry;
import lombok.SneakyThrows;
import org.junit.jupiter.api.Test;

class CBlockTest {
@SneakyThrows
@Test
public void serialize() throws IOException, JSONException {
final CentralRegistry registry = new CentralRegistry();
Expand All @@ -40,7 +42,9 @@ public void serialize() throws IOException, JSONException {
final Import imp = new Import(table);
imp.setName("import");

final Bucket bucket = new Bucket(0, 0, 10, new ColumnStore[0], Collections.emptySet(),new int[10], new int[10], imp);
concept.initElements();

final Bucket bucket = new Bucket(0, 0, 10, new ColumnStore[0], Collections.emptySet(), new int[10], new int[10], imp);


final CBlock cBlock = CBlock.createCBlock(connector, bucket, 10);
Expand Down

0 comments on commit a44c909

Please sign in to comment.