diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java index 060f395386..a401694d00 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/CalculateCBlocksJob.java @@ -62,8 +62,6 @@ public void execute() throws Exception { .collect(Collectors.toList()); Futures.allAsList(futures).get(); - - getProgressReporter().done(); } private CalculationInformationProcessor createInformationProcessor(CalculationInformation info) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java index 49cea30816..149ab9d740 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/ImportJob.java @@ -269,7 +269,7 @@ public void execute() throws JSONException, InterruptedException, IOException { log.info("Remapping Dictionaries {}", sharedDictionaryMappings.values()); - applyDictionaryMappings(sharedDictionaryMappings, container.getStores()); + remapToSharedDictionary(sharedDictionaryMappings, container.getStores()); Import imp = createImport(header, container.getStores(), table.getColumns(), container.size()); @@ -295,14 +295,12 @@ public void execute() throws JSONException, InterruptedException, IOException { workerAssignments.forEach(namespace::addBucketsToWorker); - getProgressReporter().done(); } /** * select, then send buckets. */ - private Map> sendBuckets(Map starts, Map lengths, DictionaryMapping primaryMapping, Import imp, Map> buckets2LocalEntities, ColumnStore[] storesSorted) - throws JsonProcessingException { + private Map> sendBuckets(Map starts, Map lengths, DictionaryMapping primaryMapping, Import imp, Map> buckets2LocalEntities, ColumnStore[] storesSorted) { Map> newWorkerAssignments = new HashMap<>(); @@ -328,8 +326,6 @@ private Map> sendBuckets(Map starts, M subJob.report(1); } - subJob.done(); - return newWorkerAssignments; } @@ -454,7 +450,13 @@ private void distributeWorkerResponsibilities(DictionaryMapping primaryMapping) /** * Apply new positions into incoming shared dictionaries. */ - private void applyDictionaryMappings(Map mappings, Map values) { + private void remapToSharedDictionary(Map mappings, Map values) { + + if (mappings.isEmpty()) { + log.trace("No columns with shared dictionary appear to be in the import."); + return; + } + final ProgressReporter subJob = getProgressReporter().subJob(mappings.size()); for (Map.Entry entry : mappings.entrySet()) { diff --git a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java index 70ed43de74..739fb29adb 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java +++ b/backend/src/main/java/com/bakdata/conquery/models/jobs/JobExecutor.java @@ -130,6 +130,8 @@ public void run() { } finally { ConqueryMDC.setLocation(this.getName()); + job.getProgressReporter().done(); + log.trace("Finished job {} within {}", job, timer.stop()); time.stop(); } diff --git a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java index 8d6a35b398..c421263ad1 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java +++ b/backend/src/main/java/com/bakdata/conquery/models/messages/namespaces/specific/UpdateMatchingStatsMessage.java @@ -52,13 +52,14 @@ public String getLabel() { @Override public void execute() throws Exception { - final ProgressReporter progressReporter = getProgressReporter().subJob(worker.getStorage().getAllConcepts().size()); if (worker.getStorage().getAllCBlocks().isEmpty()) { log.debug("Worker {} is empty, skipping.", worker); - progressReporter.done(); return; } + final ProgressReporter progressReporter = getProgressReporter(); + progressReporter.setMax(worker.getStorage().getAllConcepts().size()); + log.info("BEGIN update Matching stats for {} Concepts", worker.getStorage().getAllConcepts().size()); // SubJobs collect into this Map. @@ -118,7 +119,6 @@ public void execute() throws Exception { worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), messages)); } - progressReporter.done(); } @@ -172,8 +172,6 @@ private void calculateConceptMatches(Concept concept, Map, } } - getProgressReporter().report(1); - log.trace("DONE calculating for `{}`", concept.getId()); } diff --git a/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java b/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java index 3c2531e12c..c6c12cf9e0 100644 --- a/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java +++ b/backend/src/main/java/com/bakdata/conquery/util/progressreporter/ProgressReporterImpl.java @@ -15,7 +15,7 @@ public class ProgressReporterImpl implements ProgressReporter { private long max = 1; private long innerProgress = 0; private long reservedForChildren = 0; - private final List children = new ArrayList(); + private final List children = new ArrayList<>(); @Getter private final long creationTimeMillis; @@ -94,6 +94,10 @@ public String getEstimate() { @Override public void report(int steps) { + if (!isStarted()) { + log.warn("Progress reporter was not started"); + return; + } if (innerProgress + steps > max) { log.warn("Progress({}) + ChildProgressReserve({}) + Steps({}) is bigger than the maximum Progress({}). There might be to many reports in the code.", innerProgress, reservedForChildren, steps, max); return; @@ -104,13 +108,15 @@ public void report(int steps) { @Override public void setMax(long max) { - if (this.max > max) { - log.warn("Max cannot be decreased."); + + if (max <= 0) { + log.warn("Max can not be 0 or less"); return; } - if (max <= 0) { - throw new IllegalArgumentException("Max can not be 0 or less"); + if (this.max > max) { + log.warn("Max cannot be decreased."); + return; } this.max = max; @@ -118,16 +124,14 @@ public void setMax(long max) { @Override public void done() { - if(endTimeMillis > -1) { + if (isDone()) { log.warn("Done was called again for {}", this); return; } endTimeMillis = System.currentTimeMillis(); for (ProgressReporter child : children) { - if (!child.isDone()) { - log.warn("One or more Children are not done yet"); - } + child.done(); } if (getAbsoluteProgress() < getAbsoluteMax()) {