diff --git a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java index ea5f91cd78..686e8200eb 100644 --- a/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java +++ b/backend/src/main/java/com/bakdata/conquery/commands/PreprocessorCommand.java @@ -44,6 +44,7 @@ import net.sourceforge.argparse4j.inf.ArgumentGroup; import net.sourceforge.argparse4j.inf.Namespace; import net.sourceforge.argparse4j.inf.Subparser; +import org.jetbrains.annotations.NotNull; @Slf4j @FieldNameConstants @@ -52,7 +53,7 @@ public class PreprocessorCommand extends ConqueryCommand { private final List failed = Collections.synchronizedList(new ArrayList<>()); private final List success = Collections.synchronizedList(new ArrayList<>()); private ExecutorService pool; - private boolean isFailFast = false; + private boolean isFailFast; private boolean isStrict = true; public PreprocessorCommand() { @@ -71,14 +72,14 @@ public static boolean requiresProcessing(PreprocessingJob preprocessingJob) { log.info("EXISTS ALREADY"); - int currentHash = preprocessingJob.getDescriptor() - .calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag()); + final int currentHash = preprocessingJob.getDescriptor() + .calculateValidityHash(preprocessingJob.getCsvDirectory(), preprocessingJob.getTag()); final ObjectMapper om = Jackson.BINARY_MAPPER.copy(); try (final PreprocessedReader parser = new PreprocessedReader(new GZIPInputStream(new FileInputStream(preprocessingJob.getPreprocessedFile())), om)) { - PreprocessedHeader header = parser.readHeader(); + final PreprocessedHeader header = parser.readHeader(); if (header.getValidityHash() == currentHash) { log.info("\tHASH STILL VALID"); @@ -133,13 +134,18 @@ public void configure(Subparser subparser) { group.addArgument("--fast-fail") .action(Arguments.storeTrue()) .setDefault(false) - .help("Stop preprocessing and exit with failure if an error occures that prevents the generation of a cqpp."); + .help("Stop preprocessing and exit with failure if an error occurs that prevents the generation of a cqpp."); group.addArgument("--strict") .type(new BooleanArgumentType()) .setDefault(true) .help("Escalate missing files to errors."); + group.addArgument("--buckets") + .type(Integer.class) + .setDefault(100) + .help("Number of buckets to use for id-hashing. This value is required to be a constant per-dataset."); + } @Override @@ -150,41 +156,49 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig // Tag if present is appended to input-file csvs, output-file cqpp and used as id of cqpps + // Seems to be a bug with dropwizard and boolean default-values isFailFast = Optional.ofNullable(namespace.getBoolean("fast-fail")).orElse(false); - isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(true); + isStrict = Optional.ofNullable(namespace.getBoolean("strict")).orElse(false); - final List tags = namespace.getList("tag"); + final List tags = namespace.getList("tag"); final File inDir = namespace.get("in"); final File outDir = namespace.get("out"); - final List descriptionFiles = namespace.getList("desc"); + final List descriptionFilesRoot = namespace.getList("desc"); + final int buckets = namespace.getInt("buckets"); log.info("Preprocessing from command line config."); - final Collection jobs = new ArrayList<>(); + final Collection jobs = collectJobs(descriptionFilesRoot, tags, inDir, outDir, environment); - if (tags == null || tags.isEmpty()) { - for (File desc : descriptionFiles) { - final List descriptions = - findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator()); - jobs.addAll(descriptions); - } + final List broken = validateJobs(jobs, environment); + + jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing)); + + preprocessJobs(jobs, buckets, config); + + + log.info("Successfully Preprocess {} Jobs:", success.size()); + success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc)); + + if (!broken.isEmpty()) { + log.warn("Did not find {} Files", broken.size()); + broken.forEach(desc -> log.warn("\tDid not find file for {}", desc)); } - else { - for (String tag : tags) { - for (File desc : descriptionFiles) { - final List jobDescriptions = - findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator()); - jobs.addAll(jobDescriptions); - } - } + if (isFailed()) { + log.error("Failed {} Preprocessing Jobs:", failed.size()); + failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc)); + doFail(); } + } - List broken = new ArrayList<>(); + @NotNull + private List validateJobs(Collection jobs, Environment environment) { + final List broken = new ArrayList<>(); - for (Iterator iterator = jobs.iterator(); iterator.hasNext(); ) { + for (final Iterator iterator = jobs.iterator(); iterator.hasNext(); ) { final PreprocessingJob job = iterator.next(); try { @@ -213,22 +227,48 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig log.error("FAILED Preprocessing, files are missing or invalid."); doFail(); } + return broken; + } - jobs.removeIf(Predicate.not(PreprocessorCommand::requiresProcessing)); + @NotNull + private Collection collectJobs(List descriptionFiles, List tags, File inDir, File outDir, Environment environment) + throws IOException { + final Collection jobs = new ArrayList<>(); + if (tags == null || tags.isEmpty()) { + for (File desc : descriptionFiles) { + final List descriptions = + findPreprocessingDescriptions(desc, inDir, outDir, Optional.empty(), environment.getValidator()); + jobs.addAll(descriptions); + } + } + else { + for (String tag : tags) { + for (File desc : descriptionFiles) { + final List jobDescriptions = + findPreprocessingDescriptions(desc, inDir, outDir, Optional.of(tag), environment.getValidator()); + + jobs.addAll(jobDescriptions); + } + } + } + return jobs; + } + + private void preprocessJobs(Collection jobs, int buckets, ConqueryConfig config) throws InterruptedException { final long totalSize = jobs.stream() .mapToLong(PreprocessingJob::estimateTotalCsvSizeBytes) .sum(); log.info("Required to preprocess {} in total", BinaryByteUnit.format(totalSize)); - ProgressBar totalProgress = new ProgressBar(totalSize, System.out); + final ProgressBar totalProgress = new ProgressBar(totalSize, System.out); for (PreprocessingJob job : jobs) { pool.submit(() -> { ConqueryMDC.setLocation(job.toString()); try { - Preprocessor.preprocess(job, totalProgress, config); + Preprocessor.preprocess(job, totalProgress, config, buckets); success.add(job.toString()); } catch (FileNotFoundException e) { @@ -246,23 +286,6 @@ protected void run(Environment environment, Namespace namespace, ConqueryConfig pool.awaitTermination(24, TimeUnit.HOURS); ConqueryMDC.clearLocation(); - - - if (!success.isEmpty()) { - log.info("Successfully Preprocess {} Jobs:", success.size()); - success.forEach(desc -> log.info("\tSucceeded Preprocessing for {}", desc)); - } - - if (!broken.isEmpty()) { - log.warn("Did not find {} Files", broken.size()); - broken.forEach(desc -> log.warn("\tDid not find file for {}", desc)); - } - - if (isFailed()) { - log.error("Failed {} Preprocessing Jobs:", failed.size()); - failed.forEach(desc -> log.error("\tFailed Preprocessing for {}", desc)); - doFail(); - } } private void addMissing(PreprocessingJob job) { @@ -281,7 +304,7 @@ private void addFailed(PreprocessingJob job) { public List findPreprocessingDescriptions(File descriptionFiles, File inDir, File outputDir, Optional tag, Validator validator) throws IOException { - List out = new ArrayList<>(); + final List out = new ArrayList<>(); final File[] files = descriptionFiles.isFile() ? new File[]{descriptionFiles} @@ -302,8 +325,7 @@ private boolean isFailed() { return !failed.isEmpty(); } - private Optional tryExtractDescriptor(Validator validator, Optional tag, File descriptionFile, File outputDir, File csvDir) - throws IOException { + private Optional tryExtractDescriptor(Validator validator, Optional tag, File descriptionFile, File outputDir, File csvDir) { try { final TableImportDescriptor descriptor = diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java index 7b29656e6d..7de472e7b3 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/NamespaceStorage.java @@ -2,7 +2,6 @@ import java.util.Collection; import java.util.Objects; -import java.util.OptionalInt; import com.bakdata.conquery.io.storage.xodus.stores.CachedStore; import com.bakdata.conquery.io.storage.xodus.stores.SingletonStore; @@ -109,22 +108,13 @@ public int getNumberOfEntities() { return entity2Bucket.count(); } - public OptionalInt getEntityBucket(String entity) { - final Integer bucket = entity2Bucket.get(entity); - if(bucket == null){ - return OptionalInt.empty(); - } - - return OptionalInt.of(bucket); + public boolean containsEntity(String entity) { + return entity2Bucket.get(entity) != null; } - public int assignEntityBucket(String entity, int bucketSize) { - final int bucket = (int) Math.ceil((1d + getNumberOfEntities()) / (double) bucketSize); - - entity2Bucket.add(entity, bucket); - - return bucket; + public void registerEntity(String entity, int bucket) { + entity2Bucket.update(entity, bucket); } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java b/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java index adfe9b2841..84ff4e90d6 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/WorkerStorage.java @@ -72,7 +72,7 @@ private void decorateCBlockStore(IdentifiableStore baseStoreCreator) { public void addCBlock(CBlock cBlock) { - log.debug("Adding CBlock[{}]", cBlock.getId()); + log.trace("Adding CBlock[{}]", cBlock.getId()); cBlocks.add(cBlock); } @@ -81,7 +81,7 @@ public CBlock getCBlock(CBlockId id) { } public void removeCBlock(CBlockId id) { - log.debug("Removing CBlock[{}]", id); + log.trace("Removing CBlock[{}]", id); cBlocks.remove(id); } @@ -90,7 +90,7 @@ public Collection getAllCBlocks() { } public void addBucket(Bucket bucket) { - log.debug("Adding Bucket[{}]", bucket.getId()); + log.trace("Adding Bucket[{}]", bucket.getId()); buckets.add(bucket); } @@ -99,7 +99,7 @@ public Bucket getBucket(BucketId id) { } public void removeBucket(BucketId id) { - log.debug("Removing Bucket[{}]", id); + log.trace("Removing Bucket[{}]", id); buckets.remove(id); } diff --git a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java index 0840ca471a..bf6588683f 100644 --- a/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java +++ b/backend/src/main/java/com/bakdata/conquery/io/storage/xodus/stores/CachedStore.java @@ -33,8 +33,7 @@ public void add(KEY key, VALUE value) { @Override public VALUE get(KEY key) { - // TODO: 08.01.2020 fk: This assumes that all values have been read at some point! - return cache.get(key); + return cache.computeIfAbsent(key, store::get); } @Override diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java index 991d5efd64..f5a1b5179b 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterImportHandler.java @@ -1,49 +1,155 @@ package com.bakdata.conquery.mode.cluster; +import java.io.IOException; import java.io.InputStream; import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import com.bakdata.conquery.mode.ImportHandler; -import com.bakdata.conquery.models.config.ConqueryConfig; import com.bakdata.conquery.models.datasets.Import; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; import com.bakdata.conquery.models.datasets.concepts.Connector; +import com.bakdata.conquery.models.events.Bucket; import com.bakdata.conquery.models.identifiable.ids.specific.DatasetId; -import com.bakdata.conquery.models.jobs.ImportJob; +import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; +import com.bakdata.conquery.models.identifiable.ids.specific.TableId; +import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; +import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; +import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; +import com.bakdata.conquery.models.preproc.PreprocessedData; +import com.bakdata.conquery.models.preproc.PreprocessedHeader; +import com.bakdata.conquery.models.preproc.PreprocessedReader; import com.bakdata.conquery.models.worker.DatasetRegistry; import com.bakdata.conquery.models.worker.DistributedNamespace; import com.bakdata.conquery.models.worker.Namespace; +import com.bakdata.conquery.models.worker.WorkerInformation; +import jakarta.ws.rs.BadRequestException; +import jakarta.ws.rs.NotFoundException; +import jakarta.ws.rs.WebApplicationException; +import jakarta.ws.rs.core.Response; import lombok.AllArgsConstructor; import lombok.SneakyThrows; +import lombok.extern.slf4j.Slf4j; /** * Handler of {@link Import} requests that realizes them both on the manager and the cluster's shards. */ @AllArgsConstructor -public -class ClusterImportHandler implements ImportHandler { +@Slf4j +public class ClusterImportHandler implements ImportHandler { - private final ConqueryConfig config; private final DatasetRegistry datasetRegistry; @SneakyThrows @Override public void updateImport(Namespace namespace, InputStream inputStream) { - ImportJob job = ImportJob.createOrUpdate( - datasetRegistry.get(namespace.getDataset().getId()), - inputStream, - config.getCluster().getEntityBucketSize(), - true - ); + handleImport(namespace, inputStream, true); + } + + private static void handleImport(Namespace namespace, InputStream inputStream, boolean update) throws IOException { + try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { + // We parse semi-manually as the incoming file consist of multiple documents we read progressively: + // 1) the header to check metadata + // 2...) The chunked Buckets + + final PreprocessedHeader header = parser.readHeader(); + + final Table table = validateImportable(((DistributedNamespace) namespace), header, update); + + readAndDistributeImport(((DistributedNamespace) namespace), table, header, parser); + + clearDependentConcepts(namespace.getStorage().getAllConcepts(), table); + } + } + + /** + * Handle validity and update logic. + */ + private static Table validateImportable(DistributedNamespace namespace, PreprocessedHeader header, boolean update) { + final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); + final ImportId importId = new ImportId(tableId, header.getName()); + + final Table table = namespace.getStorage().getTable(tableId); + + if (table == null) { + throw new BadRequestException("Table[%s] does not exist.".formatted(tableId)); + } + + // Ensure that Import and Table have the same schema + final List errors = header.assertMatch(table); + + if (!errors.isEmpty()) { + final String errorsMessage = String.join("\n - ", errors); + + log.error("Problems concerning Import `{}`:\n{}", importId, errorsMessage); + throw new BadRequestException("Headers[%s] do not match Table[%s]:\n%s".formatted(importId, table.getId(), errorsMessage)); + } + + final Import processedImport = namespace.getStorage().getImport(importId); + + if (update) { + if (processedImport == null) { + throw new NotFoundException("Import[%s] is not present.".formatted(importId)); + } + + // before updating the import, make sure that all workers removed the prior import + namespace.getWorkerHandler().sendToAll(new RemoveImportJob(processedImport)); + namespace.getStorage().removeImport(importId); + } + else if (processedImport != null) { + throw new WebApplicationException("Import[%s] is already present.".formatted(importId), Response.Status.CONFLICT); + } + + return table; + } - namespace.getJobManager().addSlowJob(job); + private static void readAndDistributeImport(DistributedNamespace namespace, Table table, PreprocessedHeader header, PreprocessedReader reader) { + final TableId tableId = new TableId(namespace.getDataset().getId(), header.getTable()); + final ImportId importId = new ImportId(tableId, header.getName()); + + log.info("BEGIN importing {} into {}", header.getName(), table); + + Import imp = null; + + final Map> collectedEntities = new HashMap<>(); + + for (PreprocessedData container : (Iterable) () -> reader) { + + if (imp == null) { + // We need a container to create a description. + imp = header.createImportDescription(table, container.getStores()); + + namespace.getWorkerHandler().sendToAll(new AddImport(imp)); + namespace.getStorage().updateImport(imp); + } + + + final Bucket bucket = Bucket.fromPreprocessed(table, container, imp); + + log.trace("DONE reading bucket `{}`, contains {} entities.", bucket.getId(), bucket.entities().size()); + + final WorkerInformation responsibleWorker = namespace.getWorkerHandler().assignResponsibleWorker(bucket.getId()); + + sendBucket(bucket, responsibleWorker); + + // NOTE: I want the bucket to be GC'd as early as possible, so I just store the part(s) I need later. + + collectedEntities.put(bucket.getBucket(), bucket.entities()); + } + + namespace.getJobManager().addSlowJob(new RegisterImportEntities(collectedEntities, namespace, importId)); + + log.debug("Successfully read {} Buckets, containing {} entities for `{}`", header.getNumberOfBuckets(), header.getNumberOfEntities(), importId); + + namespace.getWorkerHandler().sendUpdatedWorkerInformation(); - clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable()); } - private void clearDependentConcepts(Collection> allConcepts, Table table) { + private static void clearDependentConcepts(Collection> allConcepts, Table table) { for (Concept c : allConcepts) { for (Connector con : c.getConnectors()) { if (!con.getTable().equals(table)) { @@ -55,24 +161,29 @@ private void clearDependentConcepts(Collection> allConcepts, Table ta } } + /** + * select, then send buckets. + */ + public static WorkerId sendBucket(Bucket bucket, WorkerInformation responsibleWorker) { + + responsibleWorker.awaitFreeJobQueue(); + + log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); + responsibleWorker.send(new ImportBucket(bucket.getId().toString(), bucket)); + + return responsibleWorker.getId(); + } + @SneakyThrows @Override public void addImport(Namespace namespace, InputStream inputStream) { - ImportJob job = ImportJob.createOrUpdate( - datasetRegistry.get(namespace.getDataset().getId()), - inputStream, - config.getCluster().getEntityBucketSize(), - false - ); - namespace.getJobManager().addSlowJob(job); - - clearDependentConcepts(namespace.getStorage().getAllConcepts(), job.getTable()); + handleImport(namespace, inputStream, false); } @Override public void deleteImport(Import imp) { - DatasetId id = imp.getTable().getDataset().getId(); + final DatasetId id = imp.getTable().getDataset().getId(); final DistributedNamespace namespace = datasetRegistry.get(id); clearDependentConcepts(namespace.getStorage().getAllConcepts(), imp.getTable()); @@ -83,4 +194,5 @@ public void deleteImport(Import imp) { // Remove bucket assignments for consistency report namespace.getWorkerHandler().removeBucketAssignmentsForImportFormWorkers(imp); } + } diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java index cafc855713..ed682e7112 100644 --- a/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/ClusterManagerProvider.java @@ -30,7 +30,7 @@ public ClusterManager provideManager(ConqueryConfig config, Environment environm final ClusterConnectionManager connectionManager = new ClusterConnectionManager(datasetRegistry, jobManager, environment.getValidator(), config, creator, clusterState); - final ImportHandler importHandler = new ClusterImportHandler(config, datasetRegistry); + final ImportHandler importHandler = new ClusterImportHandler(datasetRegistry); final StorageListener extension = new ClusterStorageListener(jobManager, datasetRegistry); final Supplier> nodeProvider = () -> clusterState.getShardNodes().values(); final List adminTasks = List.of(new ReportConsistencyTask(clusterState)); diff --git a/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java b/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java new file mode 100644 index 0000000000..e6c89725c1 --- /dev/null +++ b/backend/src/main/java/com/bakdata/conquery/mode/cluster/RegisterImportEntities.java @@ -0,0 +1,42 @@ +package com.bakdata.conquery.mode.cluster; + +import java.util.Collection; +import java.util.Map; + +import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; +import com.bakdata.conquery.models.jobs.Job; +import com.bakdata.conquery.models.worker.DistributedNamespace; +import lombok.Data; + +/** + * This class handles registration of entities. Relevant for counting and resolving entities from external sources. + */ +@Data +class RegisterImportEntities extends Job { + + private final Map> collectedEntities; + + + private final DistributedNamespace namespace; + private final ImportId importId; + + @Override + public void execute() { + // This task is quite slow, so be delay it as far as possible. + for (Map.Entry> bucket2Entities : collectedEntities.entrySet()) { + for (String entity : bucket2Entities.getValue()) { + + if (namespace.getStorage().containsEntity(entity)) { + continue; + } + + namespace.getStorage().registerEntity(entity, bucket2Entities.getKey()); + } + } + } + + @Override + public String getLabel() { + return "Handle Bucket %s assignments.".formatted(importId); + } +} diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java index 5875e89264..128dcf61bd 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/ConceptTreeCache.java @@ -4,6 +4,7 @@ import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.exceptions.ConceptConfigurationException; import com.bakdata.conquery.util.CalculatedValue; import com.fasterxml.jackson.annotation.JsonIgnore; @@ -35,7 +36,7 @@ public class ConceptTreeCache { * @implNote ConcurrentHashMap does not allow null values, but we want to have null values in the map. So we wrap the values in Optional. */ @JsonIgnore - private final Map> cached = new ConcurrentHashMap<>();; + private final Map>> cached = new ConcurrentHashMap<>();; /** @@ -43,7 +44,7 @@ public class ConceptTreeCache { * * @param value */ - public ConceptTreeChild findMostSpecificChild(String value, CalculatedValue> rowMap) throws ConceptConfigurationException { + public ConceptElement findMostSpecificChild(String value, CalculatedValue> rowMap) throws ConceptConfigurationException { if(cached.containsKey(value)) { hits++; @@ -52,7 +53,7 @@ public ConceptTreeChild findMostSpecificChild(String value, CalculatedValue child = treeConcept.findMostSpecificChild(value, rowMap); if(!rowMap.isCalculated()) { cached.put(value, Optional.ofNullable(child)); diff --git a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java index ab302384e9..0c2c90d8fd 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java +++ b/backend/src/main/java/com/bakdata/conquery/models/datasets/concepts/tree/TreeConcept.java @@ -126,11 +126,11 @@ public void initElements() throws ConfigurationException, JSONException { } } - public ConceptTreeChild findMostSpecificChild(String stringValue, CalculatedValue> rowMap) throws ConceptConfigurationException { + public ConceptElement findMostSpecificChild(String stringValue, CalculatedValue> rowMap) throws ConceptConfigurationException { return findMostSpecificChild(stringValue, rowMap, null, getChildren()); } - private ConceptTreeChild findMostSpecificChild(String stringValue, CalculatedValue> rowMap, ConceptTreeChild best, List currentList) + private ConceptElement findMostSpecificChild(String stringValue, CalculatedValue> rowMap, ConceptElement best, List currentList) throws ConceptConfigurationException { while (currentList != null && !currentList.isEmpty()) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java index 2257ab8b2c..530aa94187 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/Bucket.java @@ -5,6 +5,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import com.bakdata.conquery.io.jackson.serializer.NsIdRef; import com.bakdata.conquery.models.common.CDateSet; @@ -26,12 +27,14 @@ import com.bakdata.conquery.models.identifiable.IdentifiableImpl; import com.bakdata.conquery.models.identifiable.ids.NamespacedIdentifiable; import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; +import com.bakdata.conquery.models.preproc.PreprocessedData; import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonManagedReference; import com.google.common.collect.ImmutableSet; import io.dropwizard.validation.ValidationMethod; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; import lombok.AccessLevel; @@ -50,16 +53,16 @@ @FieldNameConstants @Getter @Setter -@ToString(of = {"numberOfEvents", "stores"}, callSuper = true) +@ToString(onlyExplicitlyIncluded = true, callSuper = true) @AllArgsConstructor @RequiredArgsConstructor(onConstructor_ = {@JsonCreator}, access = AccessLevel.PROTECTED) + public class Bucket extends IdentifiableImpl implements NamespacedIdentifiable { @Min(0) private final int bucket; - @Min(0) - private final int numberOfEvents; + @ToString.Include @JsonManagedReference @Setter(AccessLevel.PROTECTED) private ColumnStore[] stores; @@ -74,9 +77,25 @@ public class Bucket extends IdentifiableImpl implements NamespacedIden */ private final Object2IntMap ends; + private final int numberOfEvents; + @NsIdRef private final Import imp; + private static ColumnStore[] sortColumns(Table table, Map stores) { + return Arrays.stream(table.getColumns()) + .map(Column::getName) + .map(stores::get) + .map(Objects::requireNonNull) + .toArray(ColumnStore[]::new); + } + + public static Bucket fromPreprocessed(Table table, PreprocessedData container, Import imp) { + final ColumnStore[] storesSorted = sortColumns(table, container.getStores()); + final int numberOfEvents = container.getEnds().values().stream().mapToInt(i -> i).max().orElse(0); + + return new Bucket(container.getBucketId(), storesSorted, new Object2IntOpenHashMap<>(container.getStarts()), new Object2IntOpenHashMap<>(container.getEnds()),numberOfEvents, imp); + } @JsonIgnore @ValidationMethod(message = "Number of events does not match the length of some stores.") diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java index fe91ec2ba8..d7f2a99f4a 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/BucketManager.java @@ -124,7 +124,11 @@ private static void registerCBlock(CBlock cBlock, Map c : storage.getAllConcepts()) { + final Collection> allConcepts = storage.getAllConcepts(); + + log.info("BEGIN full update for {} concepts.", allConcepts.size()); + + for (Concept c : allConcepts) { if (!(c instanceof TreeConcept)) { continue; } @@ -142,7 +146,7 @@ public void fullUpdate() { continue; } - log.warn("CBlock[{}] missing in Storage. Queuing recalculation", cBlockId); + log.trace("CBlock[{}] missing in Storage. Queuing recalculation", cBlockId); job.addCBlock(bucket, con); } } @@ -365,6 +369,11 @@ public void addConcept(Concept concept) { job.addCBlock(bucket, connector); } } + + if(job.isEmpty()){ + return; + } + jobManager.addSlowJob(job); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java index 424fe60dc1..cfc5e0182b 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/CBlock.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.datasets.concepts.Connector; import com.bakdata.conquery.models.datasets.concepts.conditions.CTCondition; import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeCache; @@ -164,7 +165,7 @@ else if (treeConcept.countElements() == 1) { continue; } - final ConceptTreeChild child = cache == null + final ConceptElement child = cache == null ? treeConcept.findMostSpecificChild(stringValue, rowMap) : cache.findMostSpecificChild(stringValue, rowMap); diff --git a/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java b/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java index f0b6f64951..5d1551cb8d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java +++ b/backend/src/main/java/com/bakdata/conquery/models/events/EmptyBucket.java @@ -20,7 +20,7 @@ public class EmptyBucket extends Bucket { private static final EmptyBucket Instance = new EmptyBucket(); public EmptyBucket() { - super(0, 0, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), null); + super(0, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), 0, null); this.setStores(new ColumnStore[0]); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/identifiable/InjectingCentralRegistry.java b/backend/src/main/java/com/bakdata/conquery/models/identifiable/InjectingCentralRegistry.java deleted file mode 100644 index 05d4395dde..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/identifiable/InjectingCentralRegistry.java +++ /dev/null @@ -1,26 +0,0 @@ -package com.bakdata.conquery.models.identifiable; - -import java.util.Map; - -import com.bakdata.conquery.models.identifiable.ids.Id; -import lombok.Data; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; - -/** - * Central Registry used to wire up incoming ids with already established ids. - */ -@RequiredArgsConstructor -@Data -public class InjectingCentralRegistry extends CentralRegistry{ - /** - * This map is intentionally NOT an IdMap as it allows wiring up mismatched ids. - */ - @NonNull - private final Map, Identifiable> injections; - - @Override - protected > T get(Id name) { - return (T) injections.get(name); - } -} diff --git a/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java b/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java index 52b56dd00c..6bd986ed3d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java +++ b/backend/src/main/java/com/bakdata/conquery/models/identifiable/mapping/EntityIdMap.java @@ -147,7 +147,7 @@ public String resolve(ExternalId key) { } // Maybe we can find them directly in the dictionary? - if (storage.getEntityBucket(key.getId()).isPresent()) { + if (storage.containsEntity(key.getId())) { return key.getId(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java index afad884d47..91287384b6 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java @@ -3,6 +3,9 @@ import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import com.bakdata.conquery.io.storage.WorkerStorage; @@ -15,9 +18,9 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; -import lombok.Getter; +import lombok.Data; import lombok.RequiredArgsConstructor; -import lombok.Setter; +import lombok.ToString; import lombok.extern.slf4j.Slf4j; /** @@ -27,44 +30,65 @@ */ @RequiredArgsConstructor @Slf4j - +@Data +@ToString(onlyExplicitlyIncluded = true) public class CalculateCBlocksJob extends Job { - private final List infos = new ArrayList<>(); + private final List tasks = new ArrayList<>(); private final WorkerStorage storage; private final BucketManager bucketManager; private final ExecutorService executorService; + @ToString.Include @Override public String getLabel() { - return "Calculate CBlocks[" + infos.size() + "]"; + return "Calculate CBlocks[" + tasks.size() + "]"; } public void addCBlock(Bucket bucket, ConceptTreeConnector connector) { - infos.add(new CalculationInformation(connector, bucket)); + tasks.add(createInformationProcessor(connector, bucket)); + } + + private CalculationInformationProcessor createInformationProcessor(ConceptTreeConnector connector, Bucket bucket) { + return new CalculationInformationProcessor(connector, bucket, bucketManager, storage); } @Override public void execute() throws Exception { - if (infos.isEmpty()) { + if (tasks.isEmpty()) { return; } - getProgressReporter().setMax(infos.size()); + log.info("BEGIN calculate CBlocks for {} entries.", tasks.size()); - final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(this.executorService); + getProgressReporter().setMax(tasks.size()); - final List> futures = infos.stream() - .map(this::createInformationProcessor) - .map(executorService::submit) - .peek(f -> f.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor())) - .collect(Collectors.toList()); + final ListeningExecutorService executorService = MoreExecutors.listeningDecorator(getExecutorService()); - Futures.allAsList(futures).get(); - } + final List> futures = + tasks.stream() + .map(executorService::submit) + .peek(fut -> fut.addListener(this::incrementProgressReporter, MoreExecutors.directExecutor())) + .collect(Collectors.toList()); + + + final ListenableFuture all = Futures.allAsList(futures); + + while (!all.isDone()) { + try { + all.get(1, TimeUnit.MINUTES); + } + catch (TimeoutException exception) { + log.debug("submitted={}, pool={}", tasks.size(), getExecutorService()); + + if (log.isTraceEnabled() && getExecutorService() instanceof ThreadPoolExecutor) { + log.trace("Waiting for {}", ((ThreadPoolExecutor) getExecutorService()).getQueue()); + } + } + } + + log.debug("DONE CalculateCBlocks for {} entries.", tasks.size()); - private CalculationInformationProcessor createInformationProcessor(CalculationInformation info) { - return new CalculationInformationProcessor(info, bucketManager, storage); } private void incrementProgressReporter() { @@ -72,52 +96,45 @@ private void incrementProgressReporter() { } public boolean isEmpty() { - return infos.isEmpty(); + return tasks.isEmpty(); } - @RequiredArgsConstructor - @Getter - @Setter - private static class CalculationInformation { + + @Data + @ToString(onlyExplicitlyIncluded = true) + private static class CalculationInformationProcessor implements Runnable { private final ConceptTreeConnector connector; private final Bucket bucket; - public CBlockId getCBlockId() { - return new CBlockId(getBucket().getId(), getConnector().getId()); - } - } - - - @RequiredArgsConstructor - private static class CalculationInformationProcessor implements Runnable { - private final CalculationInformation info; private final BucketManager bucketManager; private final WorkerStorage storage; @Override public void run() { try { - if (bucketManager.hasCBlock(info.getCBlockId())) { - log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", info.getCBlockId()); + if (bucketManager.hasCBlock(getCBlockId())) { + log.trace("Skipping calculation of CBlock[{}] because its already present in the BucketManager.", getCBlockId()); return; } - CBlock cBlock = CBlock.createCBlock(info.getConnector(), info.getBucket(), bucketManager.getEntityBucketSize()); + log.trace("BEGIN calculating CBlock for {}", getCBlockId()); + + final CBlock cBlock = CBlock.createCBlock(getConnector(), getBucket(), bucketManager.getEntityBucketSize()); + + log.trace("DONE calculating CBlock for {}", getCBlockId()); bucketManager.addCalculatedCBlock(cBlock); storage.addCBlock(cBlock); } catch (Exception e) { - throw new RuntimeException( - String.format( - "Exception in CalculateCBlocksJob (CBlock=%s, connector=%s)", - info.getCBlockId(), - info.getConnector() - ), - e - ); + throw new RuntimeException("Exception in CalculateCBlocksJob %s".formatted(getCBlockId()), e); } } + @ToString.Include + public CBlockId getCBlockId() { + return new CBlockId(getBucket().getId(), getConnector().getId()); + } + } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java deleted file mode 100644 index 92fbd6ee47..0000000000 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ /dev/null @@ -1,372 +0,0 @@ -package com.bakdata.conquery.models.jobs; - -import java.io.IOException; -import java.io.InputStream; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; - -import com.bakdata.conquery.models.datasets.Column; -import com.bakdata.conquery.models.datasets.Dataset; -import com.bakdata.conquery.models.datasets.Import; -import com.bakdata.conquery.models.datasets.ImportColumn; -import com.bakdata.conquery.models.datasets.Table; -import com.bakdata.conquery.models.events.Bucket; -import com.bakdata.conquery.models.events.MajorTypeId; -import com.bakdata.conquery.models.events.stores.root.ColumnStore; -import com.bakdata.conquery.models.exceptions.JSONException; -import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; -import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; -import com.bakdata.conquery.models.identifiable.ids.specific.TableId; -import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.bakdata.conquery.models.messages.namespaces.specific.AddImport; -import com.bakdata.conquery.models.messages.namespaces.specific.ImportBucket; -import com.bakdata.conquery.models.messages.namespaces.specific.RemoveImportJob; -import com.bakdata.conquery.models.preproc.PPColumn; -import com.bakdata.conquery.models.preproc.PreprocessedData; -import com.bakdata.conquery.models.preproc.PreprocessedHeader; -import com.bakdata.conquery.models.preproc.PreprocessedReader; -import com.bakdata.conquery.models.worker.DistributedNamespace; -import com.bakdata.conquery.models.worker.WorkerHandler; -import com.bakdata.conquery.models.worker.WorkerInformation; -import com.bakdata.conquery.util.progressreporter.ProgressReporter; -import com.google.common.base.Functions; -import it.unimi.dsi.fastutil.ints.IntArrayList; -import it.unimi.dsi.fastutil.ints.IntList; -import it.unimi.dsi.fastutil.objects.Object2IntMap; -import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; -import jakarta.ws.rs.BadRequestException; -import jakarta.ws.rs.WebApplicationException; -import jakarta.ws.rs.core.Response; -import lombok.Getter; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; - -/** - * This is the main routine to load data into Conquery. - */ -@RequiredArgsConstructor -@Slf4j -public class ImportJob extends Job { - - private static final int NUMBER_OF_STEPS = /* directly in execute = */4; - private final DistributedNamespace namespace; - @Getter - private final Table table; - private final int bucketSize; - private final PreprocessedHeader header; - private final PreprocessedData container; - - public static ImportJob createOrUpdate(DistributedNamespace namespace, InputStream inputStream, int entityBucketSize, boolean update) - throws IOException { - - try (PreprocessedReader parser = new PreprocessedReader(inputStream, namespace.getPreprocessMapper())) { - - final Dataset ds = namespace.getDataset(); - - // We parse semi-manually as the incoming file consist of multiple documents we only read progressively: - // 1) the header to check metadata - // 2) The Dictionaries to be imported and transformed - // 3) The ColumnStores themselves which contain references to the previously imported dictionaries. - - - final PreprocessedHeader header = parser.readHeader(); - - final TableId tableId = new TableId(ds.getId(), header.getTable()); - final Table table = namespace.getStorage().getTable(tableId); - - if (table == null) { - throw new BadRequestException(String.format("Table[%s] does not exist.", tableId)); - } - - // Ensure that Import and Table have the same schema - final List validationErrors = ensureHeadersMatch(table, header); - - if(!validationErrors.isEmpty()){ - final String errorMessage = String.join("\n -", validationErrors); - - log.error("Problems concerning Import `{}`:{}", header.getName(), errorMessage); - throw new BadRequestException(String.format("Import[%s.%s] does not match Table[%s]:%s", header.getTable(), header.getName(), table.getId(), errorMessage)); - } - - final ImportId importId = new ImportId(table.getId(), header.getName()); - final Import processedImport = namespace.getStorage().getImport(importId); - - if (update) { - if (processedImport == null) { - throw new WebApplicationException(String.format("Import[%s] is not present.", importId), Response.Status.NOT_FOUND); - } - // before updating the import, make sure that all workers removed the last import - namespace.getWorkerHandler().sendToAll(new RemoveImportJob(processedImport)); - namespace.getStorage().removeImport(importId); - } - else if (processedImport != null) { - throw new WebApplicationException(String.format("Import[%s] is already present.", importId), Response.Status.CONFLICT); - } - - - log.trace("Begin reading data."); - - final PreprocessedData container = parser.readData(); - - log.debug("Done reading data. Contains {} Entities.", container.size()); - - log.info("Importing {} into {}", header.getName(), tableId); - - return new ImportJob( - namespace, - table, - entityBucketSize, - header, - container - ); - } - } - - /** - * Verify that the supplied table matches the preprocessed data in shape. - */ - public static List ensureHeadersMatch(Table table, PreprocessedHeader importHeaders) { -// final StringJoiner errors = new StringJoiner("\n - ", "\n - ", ""); - - final List errors = new ArrayList<>(); - - if (table.getColumns().length != importHeaders.getColumns().length) { - errors.add(String.format("Import column count=%d does not match table column count=%d", importHeaders.getColumns().length, table.getColumns().length)); - } - - final Map typesByName = Arrays.stream(importHeaders.getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); - - for (PPColumn column : importHeaders.getColumns()) { - if (!typesByName.containsKey(column.getName())) { - errors.add("Column[%s] is missing." - .formatted(column.getName())); - } - else if (!typesByName.get(column.getName()).equals(column.getType())) { - errors.add("Column[%s] Types do not match %s != %s" - .formatted(column.getName(), typesByName.get(column.getName()), column.getType())); - } - } - - return errors; - } - - - @Override - public void execute() throws JSONException, InterruptedException, IOException { - - getProgressReporter().setMax(NUMBER_OF_STEPS); - - log.trace("Updating primary dictionary"); - - getProgressReporter().report(1); - - // Distribute the new IDs among workers - distributeWorkerResponsibilities(container.entities()); - - getProgressReporter().report(1); - - final Import imp = createImport(header, container.getStores(), table.getColumns(), container.size()); - - namespace.getStorage().updateImport(imp); - - final Map> buckets2LocalEntities = groupEntitiesByBucket(container.entities(), bucketSize); - - - final ColumnStore[] storesSorted = Arrays.stream(table.getColumns()) - .map(Column::getName) - .map(container.getStores()::get) - .map(Objects::requireNonNull) - .toArray(ColumnStore[]::new); - - - log.info("Start sending {} Buckets", buckets2LocalEntities.size()); - - // we use this to track assignment to workers. - final Map> workerAssignments = - sendBuckets(container.getStarts(), container.getLengths(), imp, buckets2LocalEntities, storesSorted); - - final WorkerHandler handler = namespace.getWorkerHandler(); - workerAssignments.forEach(handler::addBucketsToWorker); - - } - - private void distributeWorkerResponsibilities(Set entities) { - log.debug("Updating bucket assignments."); - - synchronized (namespace) { - for (String entity : entities) { - final int bucket = namespace.getBucket(entity, bucketSize); - - if (namespace.getWorkerHandler().getResponsibleWorkerForBucket(bucket) != null) { - continue; - } - - namespace.getWorkerHandler().addResponsibility(bucket); - } - } - } - - private Import createImport(PreprocessedHeader header, Map stores, Column[] columns, int size) { - final Import imp = new Import(table); - - imp.setName(header.getName()); - imp.setNumberOfEntries(header.getRows()); - imp.setNumberOfEntities(size); - - final ImportColumn[] importColumns = new ImportColumn[columns.length]; - - for (int i = 0; i < columns.length; i++) { - final ColumnStore store = stores.get(columns[i].getName()); - - final ImportColumn col = new ImportColumn(imp, store.createDescription(), store.getLines(), store.estimateMemoryConsumptionBytes()); - - col.setName(columns[i].getName()); - - importColumns[i] = col; - } - - imp.setColumns(importColumns); - - namespace.getWorkerHandler().sendToAll(new AddImport(imp)); - return imp; - } - - /** - * Group entities by their global bucket id. - */ - private Map> groupEntitiesByBucket(Set entities, int bucketSize) { - return entities.stream() - .collect(Collectors.groupingBy(entity -> namespace.getBucket(entity, bucketSize))); - - } - - /** - * select, then send buckets. - */ - private Map> sendBuckets(Map starts, Map lengths, Import imp, Map> buckets2LocalEntities, ColumnStore[] storesSorted) { - - final Map> newWorkerAssignments = new HashMap<>(); - - final ProgressReporter subJob = getProgressReporter().subJob(buckets2LocalEntities.size()); - - for (Map.Entry> bucket2entities : buckets2LocalEntities.entrySet()) { - - - final int bucketId = bucket2entities.getKey(); - final List entities = bucket2entities.getValue(); - - final WorkerInformation responsibleWorker = Objects.requireNonNull( - namespace - .getWorkerHandler() - .getResponsibleWorkerForBucket(bucketId), - () -> "No responsible worker for Bucket#" + bucketId - ); - - awaitFreeJobQueue(responsibleWorker); - - final Map bucketStarts = entities.stream() - .filter(starts::containsKey) - .collect(Collectors.toMap(Functions.identity(), starts::get)); - - final Map bucketLengths = entities.stream() - .filter(lengths::containsKey) - .collect(Collectors.toMap(Functions.identity(), lengths::get)); - - - assert !Collections.disjoint(bucketStarts.keySet(), bucketLengths.keySet()); - - - final Bucket bucket = - selectBucket(bucketStarts, bucketLengths, storesSorted, imp, bucketId); - - newWorkerAssignments.computeIfAbsent(responsibleWorker.getId(), (ignored) -> new HashSet<>()) - .add(bucket.getId()); - - log.trace("Sending Bucket[{}] to {}", bucket.getId(), responsibleWorker.getId()); - responsibleWorker.send(ImportBucket.forBucket(bucket)); - - subJob.report(1); - } - - return newWorkerAssignments; - } - - private void awaitFreeJobQueue(WorkerInformation responsibleWorker) { - try { - responsibleWorker.getConnectedShardNode().waitForFreeJobQueue(); - } - catch (InterruptedException e) { - log.error("Interrupted while waiting for worker[{}] to have free space in queue", responsibleWorker, e); - } - } - - /** - * - remap Entity-Ids to global - * - calculate per-Entity regions of Bucklet (start/end) - * - split stores - */ - private Bucket selectBucket(Map localStarts, Map localLengths, ColumnStore[] stores, Import imp, int bucketId) { - - - final IntList selectionStart = new IntArrayList(); - final IntList selectionLength = new IntArrayList(); - - - // First entity of Bucket starts at 0, the following are appended. - final Object2IntMap entityStarts = new Object2IntOpenHashMap<>(); - final Object2IntMap entityEnds = new Object2IntOpenHashMap<>(); - - - int currentStart = 0; - - for (Map.Entry entity2Start : localStarts.entrySet()) { - final String entity = entity2Start.getKey(); - final int start = entity2Start.getValue(); - - final int length = localLengths.get(entity); - - selectionStart.add(start); - - selectionLength.add(length); - - entityStarts.put(entity, currentStart); - entityEnds.put(entity, currentStart + length); - - currentStart += length; - } - - // copy only the parts of the bucket we need - final ColumnStore[] bucketStores = - Arrays.stream(stores) - .map(store -> store.select(selectionStart.toIntArray(), selectionLength.toIntArray())) - .toArray(ColumnStore[]::new); - - return new Bucket( - bucketId, - selectionLength.intStream().sum(), - bucketStores, - entityStarts, - entityEnds, - imp - ); - } - - private Dataset getDataset() { - return namespace.getDataset(); - } - - - @Override - public String getLabel() { - return "Importing into " + table + " from " + header.getName(); - } - -} diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java index 5470d205c9..659f2a2493 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/AddImport.java @@ -20,7 +20,7 @@ public class AddImport extends WorkerMessage { @Override public void react(Worker context) throws Exception { - log.info("Received Import[{}], containing {} entries.", imp.getId(), imp.getNumberOfEntries()); + log.trace("Received Import[{}], containing {} entries.", imp.getId(), imp.getNumberOfEntries()); context.addImport(imp); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java index b2b02666d8..def021735d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/ImportBucket.java @@ -22,13 +22,9 @@ public class ImportBucket extends WorkerMessage { private final Bucket bucket; - public static ImportBucket forBucket(Bucket bucket) { - return new ImportBucket(bucket.getId().toString(),bucket); - } - @Override public void react(Worker context) throws Exception { - log.trace("Received {}", bucket.getId()); + log.debug("Received {}, containing {} entities", bucket.getId(), bucket.entities().size()); context.addBucket(bucket); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java index 3bb69761f2..e32f51b359 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessed.java @@ -6,7 +6,9 @@ import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.IntSummaryStatistics; import java.util.List; import java.util.Map; @@ -22,11 +24,14 @@ import com.bakdata.conquery.models.preproc.parser.Parser; import com.bakdata.conquery.models.preproc.parser.specific.StringParser; import com.fasterxml.jackson.core.JsonGenerator; +import com.google.common.collect.Maps; +import com.google.common.hash.Hashing; import it.unimi.dsi.fastutil.ints.IntArrayList; import it.unimi.dsi.fastutil.ints.IntList; import it.unimi.dsi.fastutil.ints.IntLists; import it.unimi.dsi.fastutil.objects.Object2IntAVLTreeMap; import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; import lombok.Data; import lombok.extern.slf4j.Slf4j; @@ -79,7 +84,7 @@ public Preprocessed(ConqueryConfig config, PreprocessingJob preprocessingJob) th } - public void write(File file) throws IOException { + public void write(File file, int buckets) throws IOException { final Object2IntMap entityStart = new Object2IntAVLTreeMap<>(); final Object2IntMap entityLength = new Object2IntAVLTreeMap<>(); @@ -95,15 +100,23 @@ public void write(File file) throws IOException { log.debug("Writing Headers"); - final int hash = descriptor.calculateValidityHash(job.getCsvDirectory(), job.getTag()); + //TODO this could actually be done at read-time, avoiding large allocations entirely. But in a different smaller PR. + final Map> bucket2Entity = entityStart.keySet().stream() + .collect(Collectors.groupingBy(id -> getEntityBucket(buckets, id))) + .entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> new HashSet<>(entry.getValue()))); - final PreprocessedHeader header = new PreprocessedHeader(descriptor.getName(), descriptor.getTable(), rows, columns, hash); + final int hash = descriptor.calculateValidityHash(job.getCsvDirectory(), job.getTag()); - final PreprocessedData data = new PreprocessedData(entityStart, entityLength, columnStores); + final PreprocessedHeader header = + new PreprocessedHeader(descriptor.getName(), descriptor.getTable(), rows, entityStart.size(), bucket2Entity.size(), columns, hash); + writePreprocessed(file, header, entityStart, entityLength, columnStores, bucket2Entity); + } - writePreprocessed(file, header, data); + public static int getEntityBucket(int buckets, String id) { + return Hashing.consistentHash(id.hashCode(), buckets); } /** @@ -172,7 +185,7 @@ private Map combineStores(Object2IntMap entityStart return columnStores; } - private static void writePreprocessed(File file, PreprocessedHeader header, PreprocessedData data) throws IOException { + private static void writePreprocessed(File file, PreprocessedHeader header, Map globalStarts, Map globalLengths, Map data, Map> bucket2Entities) throws IOException { final OutputStream out = new GZIPOutputStream(new FileOutputStream(file)); try (JsonGenerator generator = Jackson.BINARY_MAPPER.copy().enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET).getFactory().createGenerator(out)) { @@ -180,11 +193,61 @@ private static void writePreprocessed(File file, PreprocessedHeader header, Prep generator.writeObject(header); - log.debug("Writing data"); - generator.writeObject(data); + for (Map.Entry> bucketIds : bucket2Entities.entrySet()) { + final Collection entities = bucketIds.getValue(); + + final Map starts = Maps.filterKeys(globalStarts, entities::contains); + final Map lengths = Maps.filterKeys(globalLengths, entities::contains); + + final PreprocessedData preprocessedData = selectBucket(bucketIds.getKey(), starts, lengths, data); + + generator.writeObject(preprocessedData); + } + } + } + + private static PreprocessedData selectBucket(int bucket, Map localStarts, Map localLengths, Map stores) { + + + final IntList selectionStart = new IntArrayList(); + final IntList selectionLength = new IntArrayList(); + + + // First entity of Bucket starts at 0, the following are appended. + final Object2IntMap entityStarts = new Object2IntOpenHashMap<>(); + final Object2IntMap entityEnds = new Object2IntOpenHashMap<>(); + + + int currentStart = 0; + + for (Map.Entry entity2Start : localStarts.entrySet()) { + final String entity = entity2Start.getKey(); + final int start = entity2Start.getValue(); + + final int length = localLengths.get(entity); + + selectionStart.add(start); + + selectionLength.add(length); + + entityStarts.put(entity, currentStart); + entityEnds.put(entity, currentStart + length); + + currentStart += length; } + + final Map selected = new HashMap<>(); + + for (Map.Entry entry : stores.entrySet()) { + final String name = entry.getKey(); + final ColumnStore store = entry.getValue(); + + selected.put(name, store.select(selectionStart.toIntArray(), selectionLength.toIntArray())); + } + + return new PreprocessedData(bucket, entityStarts, entityEnds, selected); } public synchronized String addPrimary(String primary) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java index 313d44ee09..0df0e7ccc7 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedData.java @@ -12,9 +12,10 @@ @Data @AllArgsConstructor(onConstructor_ = @JsonCreator) public class PreprocessedData { + private final int bucketId; private final Map starts; - private final Map lengths; + private final Map ends; private final Map stores; diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java index b032dbb8a8..6b33c7abd8 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedHeader.java @@ -1,5 +1,17 @@ package com.bakdata.conquery.models.preproc; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.bakdata.conquery.models.datasets.Column; +import com.bakdata.conquery.models.datasets.Import; +import com.bakdata.conquery.models.datasets.ImportColumn; +import com.bakdata.conquery.models.datasets.Table; +import com.bakdata.conquery.models.events.MajorTypeId; +import com.bakdata.conquery.models.events.stores.root.ColumnStore; import com.fasterxml.jackson.annotation.JsonCreator; import lombok.AllArgsConstructor; import lombok.Data; @@ -33,6 +45,10 @@ public class PreprocessedHeader { * Number of rows in the Preprocessed file. */ private long rows; + private long numberOfEntities; + + //TODO use Set to track actually included buckets,to split phase bucket assignment. + private int numberOfBuckets; /** * The specific columns and their associated MajorType for validation. @@ -44,4 +60,60 @@ public class PreprocessedHeader { */ private int validityHash; + public Import createImportDescription(Table table, Map stores) { + final Import imp = new Import(table); + + imp.setName(getName()); + imp.setNumberOfEntries(getRows()); + imp.setNumberOfEntities(getNumberOfEntities()); + + final ImportColumn[] importColumns = new ImportColumn[columns.length]; + + for (int i = 0; i < columns.length; i++) { + final ColumnStore store = stores.get(columns[i].getName()); + + final ImportColumn col = new ImportColumn(imp, store.createDescription(), store.getLines(), numberOfBuckets * store.estimateMemoryConsumptionBytes()); + + col.setName(columns[i].getName()); + + importColumns[i] = col; + } + + imp.setColumns(importColumns); + + return imp; + } + + + /** + * Verify that the supplied table matches the preprocessed' data in shape. + * + * @return + */ + public List assertMatch(Table table) { + final List errors = new ArrayList<>(); + + if (table.getColumns().length != getColumns().length) { + errors.add("Import column count=`%d` does not match table column count=`%d`".formatted(getColumns().length, table.getColumns().length)); + } + + final Map typesByName = Arrays.stream(getColumns()).collect(Collectors.toMap(PPColumn::getName, PPColumn::getType)); + + for (int i = 0; i < Math.min(table.getColumns().length, getColumns().length); i++) { + final Column column = table.getColumns()[i]; + + if (!typesByName.containsKey(column.getName())) { + errors.add("Column[%s] is missing.".formatted(column.getName())); + continue; + } + + if (!typesByName.get(column.getName()).equals(column.getType())) { + errors.add("Column[%s] Types do not match %s != %s".formatted(column.getName(), typesByName.get(column.getName()), column.getType())); + } + } + + return errors; + } + + } diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java index abdd8574d1..340433ae6c 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/PreprocessedReader.java @@ -2,13 +2,8 @@ import java.io.IOException; import java.io.InputStream; -import java.util.HashMap; -import java.util.Map; +import java.util.Iterator; -import com.bakdata.conquery.models.identifiable.Identifiable; -import com.bakdata.conquery.models.identifiable.InjectingCentralRegistry; -import com.bakdata.conquery.models.identifiable.ids.Id; -import com.bakdata.conquery.models.worker.SingletonNamespaceCollection; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser; import com.fasterxml.jackson.databind.ObjectMapper; @@ -16,6 +11,7 @@ import lombok.AccessLevel; import lombok.Getter; import lombok.RequiredArgsConstructor; +import lombok.SneakyThrows; import lombok.experimental.Accessors; /** @@ -23,7 +19,7 @@ * Header then Dictionaries then Data. Only this order is possible. */ @RequiredArgsConstructor(access = AccessLevel.PACKAGE) -public class PreprocessedReader implements AutoCloseable { +public class PreprocessedReader implements AutoCloseable, Iterator { @Override public void close() throws IOException { parser.close(); @@ -40,45 +36,39 @@ public enum LastRead { @Getter private LastRead lastRead = LastRead.BEGIN; + private int bucketsRemaining; private final JsonParser parser; - private final Map, Identifiable> replacements = new HashMap<>(); public PreprocessedReader(InputStream inputStream, ObjectMapper objectMapper) throws IOException { - final InjectingCentralRegistry injectingCentralRegistry = new InjectingCentralRegistry(replacements); - final SingletonNamespaceCollection namespaceCollection = new SingletonNamespaceCollection(injectingCentralRegistry); - parser = namespaceCollection.injectIntoNew(objectMapper) - .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) - .getFactory() - .createParser(inputStream); + parser = objectMapper.copy().enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .getFactory() + .createParser(inputStream); } - public void addReplacement(Id id, Identifiable replacement) { - this.replacements.put(id, replacement); - } - - public , V extends Identifiable> void addAllReplacements(Map replacements) { - this.replacements.putAll(replacements); - } public PreprocessedHeader readHeader() throws IOException { Preconditions.checkState(lastRead.equals(LastRead.BEGIN)); final PreprocessedHeader header = parser.readValueAs(PreprocessedHeader.class); + bucketsRemaining = header.getNumberOfBuckets(); lastRead = lastRead.next(); return header; } + @Override + public boolean hasNext() { + return bucketsRemaining > 0; + } - public PreprocessedData readData() throws IOException { - Preconditions.checkState(lastRead.equals(LastRead.HEADER)); - - final PreprocessedData dictionaries = parser.readValueAs(PreprocessedData.class); + @SneakyThrows + @Override + public PreprocessedData next() { + bucketsRemaining--; - lastRead = lastRead.next(); - return dictionaries; + return parser.readValueAs(PreprocessedData.class); } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java index a7908aa324..f3e31e6078 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java +++ b/backend/src/main/java/com/bakdata/conquery/models/preproc/Preprocessor.java @@ -54,7 +54,7 @@ public static File getTaggedVersion(File file, String tag, String extension) { *

* Reads CSV file, per row extracts the primary key, then applies other transformations on each row, then compresses the data with {@link ColumnStore}. */ - public static void preprocess(PreprocessingJob preprocessingJob, ProgressBar totalProgress, ConqueryConfig config) throws IOException { + public static void preprocess(PreprocessingJob preprocessingJob, ProgressBar totalProgress, ConqueryConfig config, int buckets) throws IOException { final File preprocessedFile = preprocessingJob.getPreprocessedFile(); TableImportDescriptor descriptor = preprocessingJob.getDescriptor(); @@ -209,7 +209,7 @@ else if (errors == config.getPreprocessor().getMaximumPrintedErrors()) { exceptions.forEach((clazz, count) -> log.warn("Got {} `{}`", count, clazz.getSimpleName())); } - result.write(tmp); + result.write(tmp, buckets); if (errors > 0) { log.warn("Had {}% faulty lines ({} of ~{} lines)", String.format("%.2f", 100d * (double) errors / (double) lineId), errors, lineId); diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java index 76a5d4664f..dda525ee97 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/DistributedNamespace.java @@ -54,18 +54,12 @@ public DistributedNamespace( this.workerHandler = workerHandler; } - public int getBucket(String entity, int bucketSize) { - final NamespaceStorage storage = getStorage(); - return storage.getEntityBucket(entity) - .orElseGet(() -> storage.assignEntityBucket(entity, bucketSize)); - } - @Override void updateMatchingStats() { - final Collection> concepts = this.getStorage().getAllConcepts() - .stream() - .filter(concept -> concept.getMatchingStats() == null) - .collect(Collectors.toSet()); + final Collection> concepts = getStorage().getAllConcepts() + .stream() + .filter(concept -> concept.getMatchingStats() == null) + .collect(Collectors.toSet()); getWorkerHandler().sendToAll(new UpdateMatchingStatsMessage(concepts)); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java index fe15ed6524..cc6c780ddf 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/ShardNodeInformation.java @@ -12,6 +12,7 @@ import com.bakdata.conquery.models.messages.network.MessageToShardNode; import com.codahale.metrics.SharedMetricRegistries; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.common.base.Stopwatch; import lombok.Getter; import lombok.ToString; import lombok.extern.slf4j.Slf4j; @@ -102,8 +103,12 @@ public void waitForFreeJobQueue() throws InterruptedException { } synchronized (jobManagerSync) { - log.trace("Have to wait for free JobQueue"); + final Stopwatch waiting = Stopwatch.createStarted(); + log.trace("Shard {}, have to wait for free JobQueue (backpressure={})", session, backpressure); + jobManagerSync.wait(); + + log.debug("Shard {}, Waited {} for free JobQueue", session, waiting.stop()); } } } diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java index 070e546974..c6c65dae41 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerHandler.java @@ -103,24 +103,24 @@ public synchronized void removeBucketAssignmentsForImportFormWorkers(@NonNull Im sendUpdatedWorkerInformation(); } - private synchronized void sendUpdatedWorkerInformation() { + public synchronized void sendUpdatedWorkerInformation() { for (WorkerInformation w : workers.values()) { w.send(new UpdateWorkerBucket(w)); } } - public synchronized void addBucketsToWorker(@NonNull WorkerId id, @NonNull Set bucketIds) { + public synchronized void registerBucketForWorker(@NonNull WorkerId id, @NonNull BucketId bucketId) { // Ensure that add and remove are not executed at the same time. // We don't make assumptions about the underlying implementation regarding thread safety WorkerToBucketsMap workerBuckets = storage.getWorkerBuckets(); + if (workerBuckets == null) { workerBuckets = createWorkerBucketsMap(); } - workerBuckets.addBucketForWorker(id, bucketIds); - storage.setWorkerToBucketsMap(workerBuckets); + workerBuckets.addBucketForWorker(id, bucketId); - sendUpdatedWorkerInformation(); + storage.setWorkerToBucketsMap(workerBuckets); } private synchronized WorkerToBucketsMap createWorkerBucketsMap() { @@ -138,11 +138,12 @@ public synchronized WorkerInformation getResponsibleWorkerForBucket(int bucket) } /** + * @return * @implNote Currently the least occupied Worker receives a new Bucket, this can change in later implementations. (For example for * dedicated Workers, or entity weightings) */ - public synchronized void addResponsibility(int bucket) { + public synchronized WorkerInformation addResponsibility(int bucket) { final WorkerInformation smallest = workers .stream() .min(Comparator.comparing(si -> si.getIncludedBuckets().size())) @@ -153,6 +154,8 @@ public synchronized void addResponsibility(int bucket) { bucket2WorkerMap.put(bucket, smallest); smallest.getIncludedBuckets().add(bucket); + + return smallest; } public void register(ShardNodeInformation node, WorkerInformation info) { @@ -192,6 +195,19 @@ public Set getBucketsForWorker(WorkerId workerId) { return workerBuckets.getBucketsForWorker(workerId); } + public synchronized WorkerInformation assignResponsibleWorker(BucketId bucket) { + + WorkerInformation responsibleWorkerForBucket = getResponsibleWorkerForBucket(bucket.getBucket()); + + if (responsibleWorkerForBucket == null) { + responsibleWorkerForBucket = addResponsibility(bucket.getBucket()); + } + + registerBucketForWorker(responsibleWorkerForBucket.getId(), bucket); + + return responsibleWorkerForBucket; + } + private record PendingReaction(UUID callerId, Set pendingWorkers, ActionReactionMessage parent) { /** diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java index def028d002..8eb023e427 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerInformation.java @@ -12,13 +12,13 @@ import it.unimi.dsi.fastutil.ints.IntArraySet; import jakarta.validation.constraints.Min; import jakarta.validation.constraints.NotNull; -import lombok.EqualsAndHashCode; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; +import lombok.NoArgsConstructor; +import lombok.extern.slf4j.Slf4j; -@Getter -@Setter -@EqualsAndHashCode(callSuper = true) +@Data +@NoArgsConstructor +@Slf4j public class WorkerInformation extends NamedImpl implements MessageSender.Transforming { @NotNull private DatasetId dataset; @@ -32,6 +32,15 @@ public class WorkerInformation extends NamedImpl implements MessageSen @Min(0) private int entityBucketSize; + public void awaitFreeJobQueue() { + try { + getConnectedShardNode().waitForFreeJobQueue(); + } + catch (InterruptedException e) { + log.error("Interrupted while waiting for worker[{}] to have free space in queue", this, e); + } + } + @Override public WorkerId createId() { diff --git a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java index 4f7b03072f..70b98cd832 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java +++ b/backend/src/main/java/com/bakdata/conquery/models/worker/WorkerToBucketsMap.java @@ -9,7 +9,6 @@ import com.bakdata.conquery.models.identifiable.ids.specific.BucketId; import com.bakdata.conquery.models.identifiable.ids.specific.ImportId; import com.bakdata.conquery.models.identifiable.ids.specific.WorkerId; -import com.google.common.collect.ImmutableSet; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; @@ -21,23 +20,23 @@ @Getter @Setter public class WorkerToBucketsMap { - private Map> map = new ConcurrentHashMap<>(); + private Map> map = new ConcurrentHashMap<>(); - public Set getBucketsForWorker(WorkerId workerId) { - // Don't modify the underlying map here - Set buckets = map.get(workerId); - if (buckets != null) { + public Set getBucketsForWorker(WorkerId workerId) { + // Don't modify the underlying map here + Set buckets = map.get(workerId); + if (buckets != null) { // Don't allow modification return Collections.unmodifiableSet(buckets); } - return Collections.emptySet(); - } + return Collections.emptySet(); + } - public void addBucketForWorker(@NonNull WorkerId id, @NonNull Set bucketIds) { - map.computeIfAbsent(id, k -> new HashSet<>()).addAll(bucketIds); - } + public void addBucketForWorker(@NonNull WorkerId id, @NonNull BucketId bucketId) { + map.computeIfAbsent(id, k -> new HashSet<>()).add(bucketId); + } - public void removeBucketsOfImport(@NonNull ImportId importId) { - map.values().forEach(set -> set.removeIf(bucketId -> bucketId.getImp().equals(importId))); - } + public void removeBucketsOfImport(@NonNull ImportId importId) { + map.values().forEach(set -> set.removeIf(bucketId -> bucketId.getImp().equals(importId))); + } } diff --git a/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java b/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java index 9f643cd18b..908aa69235 100644 --- a/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java +++ b/backend/src/main/java/com/bakdata/conquery/resources/api/ConceptsProcessor.java @@ -26,9 +26,9 @@ import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.PreviewConfig; import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.datasets.concepts.FrontEndConceptBuilder; import com.bakdata.conquery.models.datasets.concepts.filters.specific.SelectFilter; -import com.bakdata.conquery.models.datasets.concepts.tree.ConceptTreeChild; import com.bakdata.conquery.models.datasets.concepts.tree.TreeConcept; import com.bakdata.conquery.models.exceptions.ConceptConfigurationException; import com.bakdata.conquery.models.exceptions.ValidatorHelper; @@ -294,7 +294,7 @@ public ResolvedConceptsResult resolveConceptElements(TreeConcept concept, List(Collections::emptyMap)); + final ConceptElement child = concept.findMostSpecificChild(conceptCode, new CalculatedValue<>(Collections::emptyMap)); if (child != null) { resolvedCodes.add(child.getId()); diff --git a/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java b/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java index c6c12cf9e0..ce429d4d82 100644 --- a/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java +++ b/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java @@ -2,6 +2,7 @@ import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import com.fasterxml.jackson.annotation.JsonValue; import lombok.Getter; @@ -10,11 +11,13 @@ @Slf4j public class ProgressReporterImpl implements ProgressReporter { + public long getMax() { + return max.get(); + } - @Getter(onMethod_ = @Override) - private long max = 1; - private long innerProgress = 0; - private long reservedForChildren = 0; + private final AtomicLong max = new AtomicLong(1); + private final AtomicLong innerProgress = new AtomicLong(0); + private final AtomicLong reservedForChildren = new AtomicLong(0); private final List children = new ArrayList<>(); @Getter @@ -53,7 +56,7 @@ public double getProgress() { } public long getAbsoluteProgress() { - long absoluteProgress = innerProgress; + long absoluteProgress = innerProgress.get(); for (ProgressReporterImpl child : children) { absoluteProgress += child.getAbsoluteProgress(); @@ -63,7 +66,7 @@ public long getAbsoluteProgress() { } public long getAbsoluteMax() { - long absoluteMax = max; + long absoluteMax = max.get(); for (ProgressReporterImpl child : children) { absoluteMax += child.getAbsoluteMax(); @@ -78,7 +81,7 @@ public ProgressReporter subJob(long steps) { throw new IllegalStateException("You need to start the Progress Reporter before you can add subjobs"); } - reservedForChildren += steps; + reservedForChildren.addAndGet(steps); ProgressReporterImpl childPr = new ProgressReporterImpl(); childPr.start(); @@ -98,12 +101,12 @@ public void report(int steps) { log.warn("Progress reporter was not started"); return; } - if (innerProgress + steps > max) { + if (innerProgress.get() + steps > max.get()) { log.warn("Progress({}) + ChildProgressReserve({}) + Steps({}) is bigger than the maximum Progress({}). There might be to many reports in the code.", innerProgress, reservedForChildren, steps, max); return; } - innerProgress += steps; + innerProgress.addAndGet(steps); } @Override @@ -114,12 +117,7 @@ public void setMax(long max) { return; } - if (this.max > max) { - log.warn("Max cannot be decreased."); - return; - } - - this.max = max; + this.max.set(max); } @Override @@ -138,7 +136,7 @@ public void done() { log.trace("Done was called before all steps were been reported. There might be missing reporting steps in the code."); } - innerProgress = max - reservedForChildren; + innerProgress.set(max.get() - reservedForChildren.get()); } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java index 3b64377522..ef12549871 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/common/LoadingUtil.java @@ -17,11 +17,6 @@ import java.util.Map; import java.util.UUID; -import jakarta.ws.rs.client.Entity; -import jakarta.ws.rs.client.Invocation; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; - import com.bakdata.conquery.ConqueryConstants; import com.bakdata.conquery.apiv1.query.ConceptQuery; import com.bakdata.conquery.apiv1.query.Query; @@ -53,6 +48,10 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import com.univocity.parsers.csv.CsvParser; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.Invocation; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; import lombok.NonNull; import lombok.experimental.UtilityClass; import lombok.extern.slf4j.Slf4j; @@ -132,10 +131,6 @@ private static void uploadTable(StandaloneSupport support, Table table) { } } - public static void importTableContents(StandaloneSupport support, RequiredTable[] tables) throws Exception { - importTableContents(support, Arrays.asList(tables)); - } - public static List generateCqpp(StandaloneSupport support, Collection tables) throws Exception { List preprocessedFiles = new ArrayList<>(); List descriptions = new ArrayList<>(); @@ -177,57 +172,49 @@ public static List generateCqpp(StandaloneSupport support, Collection> entity = Entity.entity(Entity.json(""), MediaType.APPLICATION_JSON_TYPE); final Invocation.Builder request = support.getClient() .target(addImport) .request(MediaType.APPLICATION_JSON); - try (final Response response = request - .post(Entity.entity(null, MediaType.APPLICATION_JSON_TYPE))) { - - assertThat(response.getStatusInfo().getFamily()) - .describedAs(new LazyTextDescription(() -> response.readEntity(String.class))) - .isEqualTo(Response.Status.Family.SUCCESSFUL); - } - } - public static void updateCqppFile(StandaloneSupport support, File cqpp, Response.Status.Family expectedResponseFamily, String expectedReason) { - assertThat(cqpp).exists(); + final Invocation invocation = update ? request.buildPut(entity) : request.buildPost(entity); - final URI addImport = HierarchyHelper.hierarchicalPath(support.defaultAdminURIBuilder(), AdminDatasetResource.class, "updateImport") - .queryParam("file", cqpp) - .buildFromMap(Map.of( - ResourceConstants.DATASET, support.getDataset().getId() - )); + log.info("sending CQPP with {}", invocation); - final Invocation.Builder request = support.getClient() - .target(addImport) - .request(MediaType.APPLICATION_JSON); - try (final Response response = request - .put(Entity.entity(Entity.json(""), MediaType.APPLICATION_JSON_TYPE))) { + try (final Response response = invocation.invoke()) { assertThat(response.getStatusInfo().getFamily()) .describedAs(new LazyTextDescription(() -> response.readEntity(String.class))) .isEqualTo(expectedResponseFamily); - assertThat(response.getStatusInfo().getReasonPhrase()) - .describedAs(new LazyTextDescription(() -> response.readEntity(String.class))) - .isEqualTo(expectedReason); } } public static void importCqppFiles(StandaloneSupport support, List cqppFiles) { for (File cqpp : cqppFiles) { - importCqppFile(support, cqpp); + uploadCqpp(support, cqpp, false, Response.Status.Family.SUCCESSFUL); } + + support.waitUntilWorkDone(); + + } public static void importTableContents(StandaloneSupport support, Collection tables) throws Exception { List cqpps = generateCqpp(support, tables); + importCqppFiles(support, cqpps); } @@ -275,6 +262,8 @@ public static void updateConcepts(StandaloneSupport support, ArrayNode rawConcep for (Concept concept : concepts) { updateConcept(support, concept, expectedResponseFamily); } + + } private static void updateConcept(@NonNull StandaloneSupport support, @NonNull Concept concept, @NonNull Response.Status.Family expectedResponseFamily) { diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java index 7e35786d72..3291d3ac4f 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/ImportUpdateTest.java @@ -6,8 +6,6 @@ import java.io.File; import java.util.List; -import jakarta.ws.rs.core.Response; - import com.bakdata.conquery.ConqueryConstants; import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.commands.ShardNode; @@ -32,6 +30,7 @@ import com.bakdata.conquery.util.support.StandaloneSupport; import com.bakdata.conquery.util.support.TestConquery; import com.github.powerlibraries.io.In; +import jakarta.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -78,7 +77,9 @@ public void execute(String name, TestConquery testConquery) throws Exception { assertThat(cqpps.size()).isEqualTo(tables.size()); LoadingUtil.importCqppFiles(conquery, List.of(cqpps.get(0))); + conquery.waitUntilWorkDone(); + } final Query query = IntegrationUtils.parseQuery(conquery, test.getRawQuery()); @@ -125,7 +126,7 @@ public void execute(String name, TestConquery testConquery) throws Exception { } //Try to update an import that does not exist should throw a Not-Found Webapplication Exception - LoadingUtil.updateCqppFile(conquery, cqpps.get(1), Response.Status.Family.CLIENT_ERROR, "Not Found"); + LoadingUtil.uploadCqpp(conquery, cqpps.get(1), true, Response.Status.Family.CLIENT_ERROR); conquery.waitUntilWorkDone(); //Load manually new data for import and update the concerned import @@ -170,7 +171,10 @@ public void execute(String name, TestConquery testConquery) throws Exception { log.info("updating import"); //correct update of the import - LoadingUtil.updateCqppFile(conquery, newPreprocessedFile, Response.Status.Family.SUCCESSFUL, "No Content"); + LoadingUtil.uploadCqpp(conquery, newPreprocessedFile, true, Response.Status.Family.SUCCESSFUL); + conquery.waitUntilWorkDone(); + + conquery.waitUntilWorkDone(); } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java index 5b3e8280fc..d1f0360479 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/RestartTest.java @@ -148,6 +148,8 @@ public void execute(String name, TestConquery testConquery) throws Exception { test.executeTest(support); + storage = support.getMetaStorage(); + {// Auth actual tests User userStored = storage.getUser(user.getId()); assertThat(userStored).isEqualTo(user); diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java index 8795093de6..0b1e1ee887 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ConceptUpdateAndDeletionTest.java @@ -114,9 +114,13 @@ public void execute(String name, TestConquery testConquery) throws Exception { // To perform the update, the old concept will be deleted first and the new concept will be added. That means the deletion of concept is also covered here { log.info("Executing update"); + LoadingUtil.updateConcepts(conquery, test2.getRawConcepts(), Response.Status.Family.SUCCESSFUL); conquery.waitUntilWorkDone(); + log.info("Update executed"); + + } diff --git a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java index e123998d0c..5536f967cf 100644 --- a/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java +++ b/backend/src/test/java/com/bakdata/conquery/integration/tests/deletion/ImportDeletionTest.java @@ -10,9 +10,6 @@ import java.util.Map; import java.util.zip.GZIPInputStream; -import jakarta.ws.rs.core.MediaType; -import jakarta.ws.rs.core.Response; - import com.bakdata.conquery.ConqueryConstants; import com.bakdata.conquery.apiv1.query.Query; import com.bakdata.conquery.commands.ShardNode; @@ -42,6 +39,8 @@ import com.bakdata.conquery.util.support.StandaloneSupport; import com.bakdata.conquery.util.support.TestConquery; import com.github.powerlibraries.io.In; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; @@ -242,6 +241,9 @@ public void execute(String name, TestConquery testConquery) throws Exception { //import preprocessedFiles conquery.getDatasetsProcessor().addImport(conquery.getNamespace(), new GZIPInputStream(new FileInputStream(preprocessedFile))); conquery.waitUntilWorkDone(); + + + conquery.waitUntilWorkDone(); } // State after reimport. diff --git a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java index 2ae22e13bf..99e0d08580 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java +++ b/backend/src/test/java/com/bakdata/conquery/models/SerializationTests.java @@ -220,7 +220,7 @@ public void bucketCompoundDateRange() throws JSONException, IOException { ColumnStore startStore = new IntegerDateStore(new ShortArrayStore(new short[]{1, 2, 3, 4}, Short.MIN_VALUE)); ColumnStore endStore = new IntegerDateStore(new ShortArrayStore(new short[]{5, 6, 7, 8}, Short.MIN_VALUE)); - Bucket bucket = new Bucket(0, 4, new ColumnStore[]{startStore, endStore, compoundStore}, Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), imp); + Bucket bucket = new Bucket(0, new ColumnStore[]{startStore, endStore, compoundStore}, Object2IntMaps.singleton("0", 0), Object2IntMaps.singleton("0", 4),4, imp); compoundStore.setParent(bucket); @@ -583,7 +583,7 @@ public void serialize() throws IOException, JSONException { final Import imp = new Import(table); imp.setName("import"); - final Bucket bucket = new Bucket(0, 0, new ColumnStore[0], Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(), imp); + final Bucket bucket = new Bucket(0, new ColumnStore[0], Object2IntMaps.emptyMap(), Object2IntMaps.emptyMap(),0, imp); final CBlock cBlock = CBlock.createCBlock(connector, bucket, 10); diff --git a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java index 3afe94bb22..97cef7a5df 100644 --- a/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java +++ b/backend/src/test/java/com/bakdata/conquery/models/datasets/concepts/tree/GroovyIndexedTest.java @@ -8,7 +8,6 @@ import java.util.Random; import java.util.function.Supplier; import java.util.stream.Stream; -import jakarta.validation.Validator; import com.bakdata.conquery.io.jackson.Injectable; import com.bakdata.conquery.io.jackson.Jackson; @@ -16,6 +15,7 @@ import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.datasets.Table; import com.bakdata.conquery.models.datasets.concepts.Concept; +import com.bakdata.conquery.models.datasets.concepts.ConceptElement; import com.bakdata.conquery.models.events.MajorTypeId; import com.bakdata.conquery.models.exceptions.ConfigurationException; import com.bakdata.conquery.models.exceptions.JSONException; @@ -27,6 +27,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.github.powerlibraries.io.In; import io.dropwizard.jersey.validation.Validators; +import jakarta.validation.Validator; import lombok.extern.slf4j.Slf4j; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.parallel.Execution; @@ -111,11 +112,11 @@ public static void init() throws IOException, JSONException, ConfigurationExcept public void basic(String key, CalculatedValue> rowMap) throws JSONException { log.trace("Searching for {}", key); - ConceptTreeChild idxResult = indexedConcept.findMostSpecificChild(key, rowMap); - ConceptTreeChild oldResult = oldConcept.findMostSpecificChild(key, rowMap); + ConceptElement idxResult = indexedConcept.findMostSpecificChild(key, rowMap); + ConceptElement oldResult = oldConcept.findMostSpecificChild(key, rowMap); assertThat(oldResult.getId()).describedAs("%s hierarchical name", key).isEqualTo(idxResult.getId()); } -} \ No newline at end of file +} diff --git a/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java b/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java index 49042dada3..8f3410c085 100644 --- a/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java +++ b/backend/src/test/java/com/bakdata/conquery/util/support/StandaloneSupport.java @@ -73,8 +73,10 @@ public void preprocessTmp(File tmpDir, List descriptions) throws Exception Map.of( "in", tmpDir, "out", tmpDir, - "desc", descriptions - + "desc", descriptions, + "buckets", 10, + "strict", true, + "fast-fail", true ) );