diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java index e0cea657f6..7a3e2cde38 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java @@ -2,7 +2,6 @@ import java.util.Collection; import java.util.Objects; -import java.util.OptionalInt; import com.bakdata.conquery.io.storage.xodus.stores.CachedStore; import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore; @@ -30,7 +29,7 @@ public class NamespaceStorage extends NamespacedStorage { protected SingletonStore preview; protected SingletonStore workerToBuckets; - protected CachedStore entity2Bucket; + protected CachedStore entity2Bucket; public NamespaceStorage(StoreFactory storageFactory, String pathName, Validator validator) { super(storageFactory, pathName); @@ -110,20 +109,13 @@ public int getNumberOfEntities() { return entity2Bucket.count(); } - public OptionalInt getEntityBucket(String entity) { - // TODO no longer needed/used - final Integer bucket = entity2Bucket.get(entity); - if(bucket == null){ - return OptionalInt.empty(); - } - - return OptionalInt.of(bucket); + public boolean containsEntity(String entity) { + return entity2Bucket.get(entity) != null; } - public void assignEntityBucket(String entity, int bucket) { - // TODO only needed to track entity count - entity2Bucket.update(entity, bucket); + public void registerEntity(String entity) { + entity2Bucket.update(entity, Boolean.TRUE); } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java index e7fcfc09e6..b539029f6c 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java @@ -4,8 +4,6 @@ import java.io.InputStream; import java.util.Collection; import java.util.List; -import java.util.Set; -import java.util.concurrent.atomic.AtomicReference; import com.bakdata.conquery.mode.ImportHandler; import com.bakdata.conquery.models.datasets.Import; @@ -13,13 +11,13 @@ import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.events.Bucket; -import com.bakdata.conquery.models.events.stores.root.ColumnStore; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.TableId; -import com.bakdata.conquery.models.jobs.ImportJob; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; import com.bakdata.conquery.models.jobs.Job; import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; +import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; import com.bakdata.conquery.models.messages.namespaces.specific.StartCalculateCblocks; import com.bakdata.conquery.models.preproc.PreprocessedData; @@ -29,7 +27,6 @@ import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.models.worker.Namespace; import com.bakdata.conquery.models.worker.WorkerInformation; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import jakarta.ws.rs.BadRequestException; import jakarta.ws.rs.NotFoundException; import jakarta.ws.rs.WebApplicationException; @@ -53,13 +50,6 @@ public void updateImport(Namespace namespace, InputStream inputStream) { handleImport(namespace, inputStream, true); } - @SneakyThrows - @Override - public void addImport(Namespace namespace, InputStream inputStream) { - handleImport(namespace, inputStream, false); - } - - private static void handleImport(Namespace namespace, InputStream inputStream, boolean update) throws IOException { try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { // We parse semi-manually as the incoming file consist of multiple documents we read progressively: @@ -70,7 +60,7 @@ private static void handleImport(Namespace namespace, InputStream inputStream, b final Table table = validateImportable(((DistributedNamespace) namespace), header, update); - readAndSubmitImportJobs(((DistributedNamespace) namespace), table, header, parser); + readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser); clearDependentConcepts(namespace.getStorage().getAllConcepts(), table); } @@ -92,7 +82,7 @@ private static Table validateImportable(DistributedNamespace namespace, Preproce // Ensure that Import and Table have the same schema final List errors = header.assertMatch(table); - if (!errors.isEmpty()){ + if (!errors.isEmpty()) { final String errorsMessage = String.join("\n - ", errors); log.error("Problems concerning Import `{}`:\n{}", importId, errorsMessage); @@ -117,51 +107,45 @@ else if (processedImport != null) { return table; } - private static void readAndSubmitImportJobs(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) { + private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) { final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); final ImportId importId = new ImportId(tableId, header.getName()); log.info("BEGIN importing {} into {}", header.getName(), table); - final AtomicReference imp = new AtomicReference<>(); - - int procesed = 0; - + Import imp = null; for (PreprocessedData container : (Iterable) () -> reader) { - if (imp.get() == null) { + if (imp == null) { // We need a container to create a description. - imp.set(header.createImportDescription(table, container.getStores())); + imp = header.createImportDescription(table, container.getStores()); - namespace.getWorkerHandler().sendToAll(new AddImport(imp.get())); - namespace.getStorage().updateImport(imp.get()); + namespace.getWorkerHandler().sendToAll(new AddImport(imp)); + namespace.getStorage().updateImport(imp); } - final int bucketId = container.getBucketId(); + log.trace("DONE reading bucket {}.{}, contains {} entities.", importId, container.getBucketId(), container.size()); - log.trace("DONE reading bucket {}.{}, contains {} entities.", importId, bucketId, container.size()); + final Bucket bucket = Bucket.fromPreprocessed(table, container, imp); - final WorkerInformation assigned = namespace.getWorkerHandler().assignResponsibleWorker(bucketId); + final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucket.getId()); - final ColumnStore[] storesSorted = ImportJob.sortColumns(table, container.getStores()); - - final Bucket bucket = - new Bucket(bucketId, storesSorted, new Object2IntOpenHashMap<>(container.getStarts()), new Object2IntOpenHashMap<>(container.getEnds()), imp.get()); + sendBucket(bucket, responsibleWorker); + // NOTE: I want the bucket to be GC'd as early as possible, so I just store the part(s) I need later. final Collection entities = bucket.entities(); - ImportJob.sendBucket(bucket, namespace); - namespace.getJobManager().addSlowJob( new Job() { @Override public void execute() { + //TODO make this into a job. + + // This task is quite slow, so be delay it as far as possible. for (String entity : entities) { - namespace.getStorage().assignEntityBucket(entity, bucketId); + namespace.getStorage().registerEntity(entity); } - - namespace.getWorkerHandler().addBucketsToWorker(assigned.getId(), Set.of(bucket.getId())); } @Override @@ -170,11 +154,12 @@ public String getLabel() { } } ); - - procesed++; } - log.debug("Successfully read {} Buckets for {}", procesed, importId); + log.debug("Successfully read {} Buckets, containing {} entities for {}", header.getNumberOfBuckets(), header.getNumberOfEntities(), importId); + + namespace.getWorkerHandler().sendUpdatedWorkerInformation(); + } private static void clearDependentConcepts(Collection> allConcepts, Table table) { @@ -189,6 +174,24 @@ private static void clearDependentConcepts(Collection> allConcepts, T } } + /** + * select, then send buckets. + */ + public static WorkerId sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { + + responsibleWorker.awaitFreeJobQueue(); + + log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); + responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); + + return responsibleWorker.getId(); + } + + @SneakyThrows + @Override + public void addImport(Namespace namespace, InputStream inputStream) { + handleImport(namespace, inputStream, false); + } @Override public void deleteImport(Import imp) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java b/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java index f79907fa0a..6c7b853f49 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/StoreFactory.java @@ -83,5 +83,5 @@ public interface StoreFactory { SingletonStore createPreviewStore(String pathName, CentralRegistry centralRegistry, ObjectMapper objectMapper); - CachedStore createEntity2BucketStore(String pathName, ObjectMapper objectMapper); + CachedStore createEntity2BucketStore(String pathName, ObjectMapper objectMapper); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java index ae74ba9f23..c5fee449ab 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java +++ b/backend/src/main/java/com/bakdata/conquery/models/config/XodusStoreFactory.java @@ -277,7 +277,7 @@ public SingletonStore createPreviewStore(String pathName, Central } @Override - public CachedStore createEntity2BucketStore(String pathName, ObjectMapper objectMapper) { + public CachedStore createEntity2BucketStore(String pathName, ObjectMapper objectMapper) { return StoreMappings.cached(createStore(findEnvironment(pathName), validator, ENTITY_TO_BUCKET, objectMapper)); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java index 050e2aead2..b3106f1fed 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java @@ -5,6 +5,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import com.bakdata.conquery.io.jackson.serializer.NsIdRef; import com.bakdata.conquery.models.common.CDateSet; @@ -26,12 +27,14 @@ import com.bakdata.conquery.models.identifiable.IdentifiableImpl; import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable; import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; +import com.bakdata.conquery.models.preproc.PreprocessedData; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.google.common.collect.ImmutableSet; import io.dropwizard.validation.ValidationMethod; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import lombok.AccessLevel; @@ -76,6 +79,20 @@ public class Bucket extends IdentifiableImpl implements NamespacedIden @NsIdRef private final Import imp; + private static ColumnStore[] sortColumns(Table table, Map stores) { + return Arrays.stream(table.getColumns()) + .map(Column::getName) + .map(stores::get) + .map(Objects::requireNonNull) + .toArray(ColumnStore[]::new); + } + + public static Bucket fromPreprocessed(Table table, PreprocessedData container, Import imp) { + final ColumnStore[] storesSorted = sortColumns(table, container.getStores()); + + return new Bucket(container.getBucketId(), storesSorted, new Object2IntOpenHashMap<>(container.getStarts()), new Object2IntOpenHashMap<>(container.getEnds()), imp); + } + @JsonIgnore @ToString.Include diff --git a/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java b/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java index 4bbb70ce92..b77034d954 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java +++ b/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java @@ -152,7 +152,7 @@ public String resolve(ExternalId key) { } // Maybe we can find them directly in the dictionary? - if (storage.getEntityBucket(key.getId()).isPresent()) { + if (storage.containsEntity(key.getId())) { return key.getId(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java deleted file mode 100644 index d715a461c5..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ /dev/null @@ -1,116 +0,0 @@ -package com.bakdata.conquery.models.jobs; - -import java.io.IOException; -import java.util.Arrays; -import java.util.Map; -import java.util.Objects; -import java.util.Set; - -import com.bakdata.conquery.models.datasets.Column; -import com.bakdata.conquery.models.datasets.Import; -import com.bakdata.conquery.models.datasets.Table; -import com.bakdata.conquery.models.events.Bucket; -import com.bakdata.conquery.models.events.stores.root.ColumnStore; -import com.bakdata.conquery.models.exceptions.JSONException; -import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; -import com.bakdata.conquery.models.preproc.PreprocessedData; -import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.WorkerInformation; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; -import lombok.Data; -import lombok.Getter; -import lombok.extern.slf4j.Slf4j; - -/** - * This is the main routine to load data into Conquery. - */ -@Slf4j -@Data -public class ImportJob extends Job { - - private final DistributedNamespace namespace; - @Getter - private final Table table; - private final Import imp; - - private final int bucketId; - private final PreprocessedData container; - - - @Override - public void execute() throws JSONException, InterruptedException, IOException { - - namespace.getWorkerHandler().assignResponsibleWorker(bucketId); - - final ColumnStore[] storesSorted = sortColumns(table, container.getStores()); - - final Bucket bucket = - new Bucket(bucketId, storesSorted, new Object2IntOpenHashMap<>(container.getStarts()), new Object2IntOpenHashMap<>(container.getEnds()), imp); - - log.debug("BEGIN distributing Bucket {}", bucket.getId()); - - for (String entity : bucket.entities()) { - namespace.getStorage().assignEntityBucket(entity, bucketId); - } - - // we use this to track assignment to workers. - final WorkerId workerAssignments = sendBucket(bucket, namespace); - - namespace.getWorkerHandler().addBucketsToWorker(workerAssignments, Set.of(bucket.getId())); - - log.trace("DONE distributing Bucket {}", bucket.getId()); - - } - - public static ColumnStore[] sortColumns(Table table, Map stores) { - final ColumnStore[] storesSorted = - Arrays.stream(table.getColumns()) - .map(Column::getName) - .map(stores::get) - .map(Objects::requireNonNull) - .toArray(ColumnStore[]::new); - return storesSorted; - } - - /** - * select, then send buckets. - */ - public static WorkerId sendBucket(Bucket bucket, DistributedNamespace namespace1) { - - log.trace("BEGIN distributing Bucket {}", bucket.getId()); - - final WorkerInformation responsibleWorker = Objects.requireNonNull( - namespace1 - .getWorkerHandler() - .getResponsibleWorkerForBucket(bucket.getBucket()), - () -> "No responsible worker for Bucket#" + bucket.getBucket() - ); - - awaitFreeJobQueue(responsibleWorker); - - log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); - responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); - - log.trace("DONE distributing Bucket {}", bucket.getId()); - - - return responsibleWorker.getId(); - } - - private static void awaitFreeJobQueue(WorkerInformation responsibleWorker) { - try { - responsibleWorker.getConnectedShardNode().waitForFreeJobQueue(); - } - catch (InterruptedException e) { - log.error("Interrupted while waiting for worker[{}] to have free space in queue", responsibleWorker, e); - } - } - - - @Override - public String getLabel() { - return "Importing into %s from %s.%s".formatted(table.getName(), imp.getName(), getBucketId()); - } - -} diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java index 2a5e802b35..c6c65dae41 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java @@ -103,13 +103,13 @@ public synchronized void removeBucketAssignmentsForImportFormWorkers(@NonNull Im sendUpdatedWorkerInformation(); } - private synchronized void sendUpdatedWorkerInformation() { + public synchronized void sendUpdatedWorkerInformation() { for (WorkerInformation w : workers.values()) { w.send(new UpdateWorkerBucket(w)); } } - public synchronized void addBucketsToWorker(@NonNull WorkerId id, @NonNull Set bucketIds) { + public synchronized void registerBucketForWorker(@NonNull WorkerId id, @NonNull BucketId bucketId) { // Ensure that add and remove are not executed at the same time. // We don't make assumptions about the underlying implementation regarding thread safety WorkerToBucketsMap workerBuckets = storage.getWorkerBuckets(); @@ -118,11 +118,9 @@ public synchronized void addBucketsToWorker(@NonNull WorkerId id, @NonNull Set getBucketsForWorker(WorkerId workerId) { return workerBuckets.getBucketsForWorker(workerId); } - public synchronized WorkerInformation assignResponsibleWorker(int bucket) { - log.debug("Updating bucket assignments."); + public synchronized WorkerInformation assignResponsibleWorker(BucketId bucket) { + WorkerInformation responsibleWorkerForBucket = getResponsibleWorkerForBucket(bucket.getBucket()); - if (getResponsibleWorkerForBucket(bucket) != null) { - return getResponsibleWorkerForBucket(bucket); + if (responsibleWorkerForBucket == null) { + responsibleWorkerForBucket = addResponsibility(bucket.getBucket()); } - return addResponsibility(bucket); + registerBucketForWorker(responsibleWorkerForBucket.getId(), bucket); + + return responsibleWorkerForBucket; } private record PendingReaction(UUID callerId, Set pendingWorkers, ActionReactionMessage parent) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java index def028d002..8eb023e427 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java @@ -12,13 +12,13 @@ import it.unimi.dsi.fastutil.ints.IntArraySet; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; -@Getter -@Setter -@EqualsAndHashCode(callSuper = true) +@Data +@NoArgsConstructor +@Slf4j public class WorkerInformation extends NamedImpl implements MessageSender.Transforming { @NotNull private DatasetId dataset; @@ -32,6 +32,15 @@ public class WorkerInformation extends NamedImpl implements MessageSen @Min(0) private int entityBucketSize; + public void awaitFreeJobQueue() { + try { + getConnectedShardNode().waitForFreeJobQueue(); + } + catch (InterruptedException e) { + log.error("Interrupted while waiting for worker[{}] to have free space in queue", this, e); + } + } + @Override public WorkerId createId() { diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java index 4f7b03072f..70b98cd832 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java @@ -9,7 +9,6 @@ import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.google.common.collect.ImmutableSet; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -21,23 +20,23 @@ @Getter @Setter public class WorkerToBucketsMap { - private Map> map = new ConcurrentHashMap<>(); + private Map> map = new ConcurrentHashMap<>(); - public Set getBucketsForWorker(WorkerId workerId) { - // Don't modify the underlying map here - Set buckets = map.get(workerId); - if (buckets != null) { + public Set getBucketsForWorker(WorkerId workerId) { + // Don't modify the underlying map here + Set buckets = map.get(workerId); + if (buckets != null) { // Don't allow modification return Collections.unmodifiableSet(buckets); } - return Collections.emptySet(); - } + return Collections.emptySet(); + } - public void addBucketForWorker(@NonNull WorkerId id, @NonNull Set bucketIds) { - map.computeIfAbsent(id, k -> new HashSet<>()).addAll(bucketIds); - } + public void addBucketForWorker(@NonNull WorkerId id, @NonNull BucketId bucketId) { + map.computeIfAbsent(id, k -> new HashSet<>()).add(bucketId); + } - public void removeBucketsOfImport(@NonNull ImportId importId) { - map.values().forEach(set -> set.removeIf(bucketId -> bucketId.getImp().equals(importId))); - } + public void removeBucketsOfImport(@NonNull ImportId importId) { + map.values().forEach(set -> set.removeIf(bucketId -> bucketId.getImp().equals(importId))); + } } diff --git a/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStoreFactory.java b/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStoreFactory.java index c9622d73e1..04972264a5 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStoreFactory.java +++ b/backend/src/test/java/com/bakdata/conquery/util/NonPersistentStoreFactory.java @@ -103,7 +103,7 @@ public SingletonStore createPreviewStore(String pathName, Central } @Override - public CachedStore createEntity2BucketStore(String pathName, ObjectMapper objectMapper) { + public CachedStore createEntity2BucketStore(String pathName, ObjectMapper objectMapper) { return StoreMappings.cached(entity2Bucket.computeIfAbsent(pathName, ignored -> new NonPersistentStore<>())); }