Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

start and done reporters centrally, removes to many reports #2583

Merged
merged 4 commits into from
Jul 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ public void execute() throws Exception {
.collect(Collectors.toList());

Futures.allAsList(futures).get();

getProgressReporter().done();
}

private CalculationInformationProcessor createInformationProcessor(CalculationInformation info) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ public void execute() throws JSONException, InterruptedException, IOException {

log.info("Remapping Dictionaries {}", sharedDictionaryMappings.values());

applyDictionaryMappings(sharedDictionaryMappings, container.getStores());
remapToSharedDictionary(sharedDictionaryMappings, container.getStores());


Import imp = createImport(header, container.getStores(), table.getColumns(), container.size());
Expand All @@ -295,14 +295,12 @@ public void execute() throws JSONException, InterruptedException, IOException {

workerAssignments.forEach(namespace::addBucketsToWorker);

getProgressReporter().done();
}

/**
* select, then send buckets.
*/
private Map<WorkerId, Set<BucketId>> sendBuckets(Map<Integer, Integer> starts, Map<Integer, Integer> lengths, DictionaryMapping primaryMapping, Import imp, Map<Integer, List<Integer>> buckets2LocalEntities, ColumnStore[] storesSorted)
throws JsonProcessingException {
private Map<WorkerId, Set<BucketId>> sendBuckets(Map<Integer, Integer> starts, Map<Integer, Integer> lengths, DictionaryMapping primaryMapping, Import imp, Map<Integer, List<Integer>> buckets2LocalEntities, ColumnStore[] storesSorted) {

Map<WorkerId, Set<BucketId>> newWorkerAssignments = new HashMap<>();

Expand All @@ -328,8 +326,6 @@ private Map<WorkerId, Set<BucketId>> sendBuckets(Map<Integer, Integer> starts, M
subJob.report(1);
}

subJob.done();

return newWorkerAssignments;
}

Expand Down Expand Up @@ -454,7 +450,13 @@ private void distributeWorkerResponsibilities(DictionaryMapping primaryMapping)
/**
* Apply new positions into incoming shared dictionaries.
*/
private void applyDictionaryMappings(Map<String, DictionaryMapping> mappings, Map<String, ColumnStore> values) {
private void remapToSharedDictionary(Map<String, DictionaryMapping> mappings, Map<String, ColumnStore> values) {

if (mappings.isEmpty()) {
log.trace("No columns with shared dictionary appear to be in the import.");
return;
}

final ProgressReporter subJob = getProgressReporter().subJob(mappings.size());

for (Map.Entry<String, DictionaryMapping> entry : mappings.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,8 @@ public void run() {
} finally {
ConqueryMDC.setLocation(this.getName());

job.getProgressReporter().done();

log.trace("Finished job {} within {}", job, timer.stop());
time.stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,14 @@ public String getLabel() {

@Override
public void execute() throws Exception {
final ProgressReporter progressReporter = getProgressReporter().subJob(worker.getStorage().getAllConcepts().size());
if (worker.getStorage().getAllCBlocks().isEmpty()) {
log.debug("Worker {} is empty, skipping.", worker);
progressReporter.done();
return;
}

final ProgressReporter progressReporter = getProgressReporter();
progressReporter.setMax(worker.getStorage().getAllConcepts().size());

log.info("BEGIN update Matching stats for {} Concepts", worker.getStorage().getAllConcepts().size());

// SubJobs collect into this Map.
Expand Down Expand Up @@ -118,7 +119,6 @@ public void execute() throws Exception {
worker.send(new UpdateElementMatchingStats(worker.getInfo().getId(), messages));
}

progressReporter.done();
}


Expand Down Expand Up @@ -172,8 +172,6 @@ private void calculateConceptMatches(Concept<?> concept, Map<ConceptElement<?>,
}
}

getProgressReporter().report(1);

log.trace("DONE calculating for `{}`", concept.getId());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public class ProgressReporterImpl implements ProgressReporter {
private long max = 1;
private long innerProgress = 0;
private long reservedForChildren = 0;
private final List<ProgressReporterImpl> children = new ArrayList<ProgressReporterImpl>();
private final List<ProgressReporterImpl> children = new ArrayList<>();

@Getter
private final long creationTimeMillis;
Expand Down Expand Up @@ -94,6 +94,10 @@ public String getEstimate() {

@Override
public void report(int steps) {
if (!isStarted()) {
log.warn("Progress reporter was not started");
return;
}
if (innerProgress + steps > max) {
log.warn("Progress({}) + ChildProgressReserve({}) + Steps({}) is bigger than the maximum Progress({}). There might be to many reports in the code.", innerProgress, reservedForChildren, steps, max);
return;
Expand All @@ -104,30 +108,30 @@ public void report(int steps) {

@Override
public void setMax(long max) {
if (this.max > max) {
log.warn("Max cannot be decreased.");

if (max <= 0) {
log.warn("Max can not be 0 or less");
return;
}

if (max <= 0) {
throw new IllegalArgumentException("Max can not be 0 or less");
if (this.max > max) {
log.warn("Max cannot be decreased.");
return;
}

this.max = max;
}

@Override
public void done() {
if(endTimeMillis > -1) {
if (isDone()) {
log.warn("Done was called again for {}", this);
return;
}
endTimeMillis = System.currentTimeMillis();

for (ProgressReporter child : children) {
if (!child.isDone()) {
log.warn("One or more Children are not done yet");
}
child.done();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, das eigentlich schon richtig so, dass parent ein Problem hat, wenn child noch nicht fertig ist.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ich würde es immer so machen, dass wenn parent done auf ruft auch die children auf done gesetzt werden.
Children sind normalerweise eigene Threads auf die der Parent warten sollte um sicher zugehen, dass der Job abgeschlossen ist. So ist es momentan auch.

}

if (getAbsoluteProgress() < getAbsoluteMax()) {
Expand Down