Skip to content

Commit

Permalink
cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
awildturtok committed Jun 24, 2024
1 parent 2afbc9d commit 92c3561
Show file tree
Hide file tree
Showing 11 changed files with 105 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Collection;
import java.util.Objects;
import java.util.OptionalInt;

import com.bakdata.conquery.io.storage.xodus.stores.CachedStore;
import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore;
Expand Down Expand Up @@ -30,7 +29,7 @@ public class NamespaceStorage extends NamespacedStorage {
protected SingletonStore<PreviewConfig> preview;
protected SingletonStore<WorkerToBucketsMap> workerToBuckets;

protected CachedStore<String, Integer> entity2Bucket;
protected CachedStore<String, Boolean> entity2Bucket;

public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator) {
super(storageFactory, pathName);
Expand Down Expand Up @@ -110,20 +109,13 @@ public int getNumberOfEntities() {
return entity2Bucket.count();
}

public OptionalInt getEntityBucket(String entity) {
// TODO no longer needed/used
final Integer bucket = entity2Bucket.get(entity);

if(bucket == null){
return OptionalInt.empty();
}

return OptionalInt.of(bucket);
public boolean containsEntity(String entity) {
return entity2Bucket.get(entity) != null;
}

public void assignEntityBucket(String entity, int bucket) {
// TODO only needed to track entity count
entity2Bucket.update(entity, bucket);
public void registerEntity(String entity) {
entity2Bucket.update(entity, Boolean.TRUE);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,20 @@
import java.io.InputStream;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;

import com.bakdata.conquery.mode.ImportHandler;
import com.bakdata.conquery.models.datasets.Import;
import com.bakdata.conquery.models.datasets.Table;
import com.bakdata.conquery.models.datasets.concepts.Concept;
import com.bakdata.conquery.models.datasets.concepts.Connector;
import com.bakdata.conquery.models.events.Bucket;
import com.bakdata.conquery.models.events.stores.root.ColumnStore;
import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId;
import com.bakdata.conquery.models.identifiable.ids.specific.ImportId;
import com.bakdata.conquery.models.identifiable.ids.specific.TableId;
import com.bakdata.conquery.models.jobs.ImportJob;
import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId;
import com.bakdata.conquery.models.jobs.Job;
import com.bakdata.conquery.models.messages.namespaces.specific.AddImport;
import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket;
import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob;
import com.bakdata.conquery.models.messages.namespaces.specific.StartCalculateCblocks;
import com.bakdata.conquery.models.preproc.PreprocessedData;
Expand All @@ -29,7 +27,6 @@
import com.bakdata.conquery.models.worker.DistributedNamespace;
import com.bakdata.conquery.models.worker.Namespace;
import com.bakdata.conquery.models.worker.WorkerInformation;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import jakarta.ws.rs.BadRequestException;
import jakarta.ws.rs.NotFoundException;
import jakarta.ws.rs.WebApplicationException;
Expand All @@ -53,13 +50,6 @@ public void updateImport(Namespace namespace, InputStream inputStream) {
handleImport(namespace, inputStream, true);
}

@SneakyThrows
@Override
public void addImport(Namespace namespace, InputStream inputStream) {
handleImport(namespace, inputStream, false);
}


private static void handleImport(Namespace namespace, InputStream inputStream, boolean update) throws IOException {
try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) {
// We parse semi-manually as the incoming file consist of multiple documents we read progressively:
Expand All @@ -70,7 +60,7 @@ private static void handleImport(Namespace namespace, InputStream inputStream, b

final Table table = validateImportable(((DistributedNamespace) namespace), header, update);

readAndSubmitImportJobs(((DistributedNamespace) namespace), table, header, parser);
readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser);

clearDependentConcepts(namespace.getStorage().getAllConcepts(), table);
}
Expand All @@ -92,7 +82,7 @@ private static Table validateImportable(DistributedNamespace namespace, Preproce
// Ensure that Import and Table have the same schema
final List<String> errors = header.assertMatch(table);

if (!errors.isEmpty()){
if (!errors.isEmpty()) {
final String errorsMessage = String.join("\n - ", errors);

log.error("Problems concerning Import `{}`:\n{}", importId, errorsMessage);
Expand All @@ -117,51 +107,45 @@ else if (processedImport != null) {
return table;
}

private static void readAndSubmitImportJobs(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) {
private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) {
final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable());
final ImportId importId = new ImportId(tableId, header.getName());

log.info("BEGIN importing {} into {}", header.getName(), table);

final AtomicReference<Import> imp = new AtomicReference<>();

int procesed = 0;

Import imp = null;

for (PreprocessedData container : (Iterable<? extends PreprocessedData>) () -> reader) {

if (imp.get() == null) {
if (imp == null) {
// We need a container to create a description.
imp.set(header.createImportDescription(table, container.getStores()));
imp = header.createImportDescription(table, container.getStores());

namespace.getWorkerHandler().sendToAll(new AddImport(imp.get()));
namespace.getStorage().updateImport(imp.get());
namespace.getWorkerHandler().sendToAll(new AddImport(imp));
namespace.getStorage().updateImport(imp);
}

final int bucketId = container.getBucketId();
log.trace("DONE reading bucket {}.{}, contains {} entities.", importId, container.getBucketId(), container.size());

log.trace("DONE reading bucket {}.{}, contains {} entities.", importId, bucketId, container.size());
final Bucket bucket = Bucket.fromPreprocessed(table, container, imp);

final WorkerInformation assigned = namespace.getWorkerHandler().assignResponsibleWorker(bucketId);
final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucket.getId());

final ColumnStore[] storesSorted = ImportJob.sortColumns(table, container.getStores());

final Bucket bucket =
new Bucket(bucketId, storesSorted, new Object2IntOpenHashMap<>(container.getStarts()), new Object2IntOpenHashMap<>(container.getEnds()), imp.get());
sendBucket(bucket, responsibleWorker);

// NOTE: I want the bucket to be GC'd as early as possible, so I just store the part(s) I need later.
final Collection<String> entities = bucket.entities();

ImportJob.sendBucket(bucket, namespace);

namespace.getJobManager().addSlowJob(
new Job() {
@Override
public void execute() {
//TODO make this into a job.

// This task is quite slow, so be delay it as far as possible.
for (String entity : entities) {
namespace.getStorage().assignEntityBucket(entity, bucketId);
namespace.getStorage().registerEntity(entity);
}

namespace.getWorkerHandler().addBucketsToWorker(assigned.getId(), Set.of(bucket.getId()));
}

@Override
Expand All @@ -170,11 +154,12 @@ public String getLabel() {
}
}
);

procesed++;
}

log.debug("Successfully read {} Buckets for {}", procesed, importId);
log.debug("Successfully read {} Buckets, containing {} entities for {}", header.getNumberOfBuckets(), header.getNumberOfEntities(), importId);

namespace.getWorkerHandler().sendUpdatedWorkerInformation();

}

private static void clearDependentConcepts(Collection<Concept<?>> allConcepts, Table table) {
Expand All @@ -189,6 +174,24 @@ private static void clearDependentConcepts(Collection<Concept<?>> allConcepts, T
}
}

/**
* select, then send buckets.
*/
public static WorkerId sendBucket(Bucket bucket, WorkerInformation responsibleWorker) {

responsibleWorker.awaitFreeJobQueue();

log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId());
responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket));

return responsibleWorker.getId();
}

@SneakyThrows
@Override
public void addImport(Namespace namespace, InputStream inputStream) {
handleImport(namespace, inputStream, false);
}

@Override
public void deleteImport(Import imp) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,5 +83,5 @@ public interface StoreFactory {

SingletonStore<PreviewConfig> createPreviewStore(String pathName, CentralRegistry centralRegistry, ObjectMapper objectMapper);

CachedStore<String, Integer> createEntity2BucketStore(String pathName, ObjectMapper objectMapper);
CachedStore<String, Boolean> createEntity2BucketStore(String pathName, ObjectMapper objectMapper);
}
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ public SingletonStore<PreviewConfig> createPreviewStore(String pathName, Central
}

@Override
public CachedStore<String, Integer> createEntity2BucketStore(String pathName, ObjectMapper objectMapper) {
public CachedStore<String, Boolean> createEntity2BucketStore(String pathName, ObjectMapper objectMapper) {
return StoreMappings.cached(createStore(findEnvironment(pathName), validator, ENTITY_TO_BUCKET, objectMapper));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;

import com.bakdata.conquery.io.jackson.serializer.NsIdRef;
import com.bakdata.conquery.models.common.CDateSet;
Expand All @@ -26,12 +27,14 @@
import com.bakdata.conquery.models.identifiable.IdentifiableImpl;
import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable;
import com.bakdata.conquery.models.identifiable.ids.specific.BucketId;
import com.bakdata.conquery.models.preproc.PreprocessedData;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonManagedReference;
import com.google.common.collect.ImmutableSet;
import io.dropwizard.validation.ValidationMethod;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotNull;
import lombok.AccessLevel;
Expand Down Expand Up @@ -76,6 +79,20 @@ public class Bucket extends IdentifiableImpl<BucketId> implements NamespacedIden
@NsIdRef
private final Import imp;

private static ColumnStore[] sortColumns(Table table, Map<String, ColumnStore> stores) {
return Arrays.stream(table.getColumns())
.map(Column::getName)
.map(stores::get)
.map(Objects::requireNonNull)
.toArray(ColumnStore[]::new);
}

public static Bucket fromPreprocessed(Table table, PreprocessedData container, Import imp) {
final ColumnStore[] storesSorted = sortColumns(table, container.getStores());

return new Bucket(container.getBucketId(), storesSorted, new Object2IntOpenHashMap<>(container.getStarts()), new Object2IntOpenHashMap<>(container.getEnds()), imp);
}


@JsonIgnore
@ToString.Include
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public String resolve(ExternalId key) {
}

// Maybe we can find them directly in the dictionary?
if (storage.getEntityBucket(key.getId()).isPresent()) {
if (storage.containsEntity(key.getId())) {
return key.getId();
}

Expand Down
Loading

0 comments on commit 92c3561

Please sign in to comment.