Skip to content

Commit

Permalink
#3231: Make relevant parts of some workers asynchronous
Browse files Browse the repository at this point in the history
This gives the context the opportunity to batch queries.
  • Loading branch information
gjvoosten committed Nov 16, 2020
1 parent 4c8e090 commit c847701
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 62 deletions.
32 changes: 19 additions & 13 deletions src/main/java/mil/dds/anet/threads/FutureEngagementWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import mil.dds.anet.AnetObjectEngine;
import mil.dds.anet.beans.AnetEmail;
import mil.dds.anet.beans.JobHistory;
Expand Down Expand Up @@ -34,19 +35,24 @@ protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Objec

// update to draft state and send emails to the authors to let them know we updated their
// report.
for (Report r : reports) {
try {
AnetEmail email = new AnetEmail();
FutureEngagementUpdated action = new FutureEngagementUpdated();
action.setReport(r);
email.setAction(action);
email.addToAddress(r.loadAuthor(context).join().getEmailAddress());
AnetEmailWorker.sendEmailAsync(email);
dao.updateToDraftState(r);
} catch (Exception e) {
logger.error("Exception when updating", e);
}
}
final CompletableFuture<?>[] allFutures = reports.stream().map(r -> {
final AnetEmail email = new AnetEmail();
final FutureEngagementUpdated action = new FutureEngagementUpdated();
action.setReport(r);
email.setAction(action);
return r.loadAuthor(context).thenApply(author -> {
try {
email.addToAddress(author.getEmailAddress());
AnetEmailWorker.sendEmailAsync(email);
dao.updateToDraftState(r);
} catch (Exception e) {
logger.error("Exception when updating", e);
}
return true;
});
}).toArray(CompletableFuture<?>[]::new);
// Wait for all our futures to complete before returning
CompletableFuture.allOf(allFutures).join();
}

}
63 changes: 34 additions & 29 deletions src/main/java/mil/dds/anet/threads/ReportApprovalWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import mil.dds.anet.beans.ApprovalStep;
import mil.dds.anet.beans.JobHistory;
import mil.dds.anet.beans.Report;
Expand Down Expand Up @@ -36,40 +37,44 @@ protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Objec
query.setState(Collections.singletonList(ReportState.PENDING_APPROVAL));
query.setSystemSearch(true);
final List<Report> reports = dao.search(query).getList();
for (final Report r : reports) {
final List<ReportAction> workflow = r.loadWorkflow(context).join();
if (workflow.isEmpty()) {
logger.error("Couldn't process report approval for report {}, it has no workflow",
r.getUuid());
} else {
for (int i = workflow.size() - 1; i >= 0; i--) {
final ReportAction reportAction = workflow.get(i);
if (reportAction.getCreatedAt() == null && i > 1) {
// Check previous action
final ReportAction previousAction = workflow.get(i - 1);
if (previousAction.getCreatedAt() != null
&& previousAction.getCreatedAt().isBefore(approvalTimeout)) {
// Approve the report
try {
final ApprovalStep approvalStep = reportAction.getStep();
final int numRows = dao.approve(r, null, approvalStep);
if (numRows == 0) {
logger.error("Couldn't process report approval for report {} step {}",
r.getUuid(), DaoUtils.getUuid(approvalStep));
} else {
AnetAuditLogger.log(
"report {} step {} automatically approved by the ReportApprovalWorker",
r.getUuid(), DaoUtils.getUuid(approvalStep));
final CompletableFuture<?>[] allFutures = reports.stream().map(r -> {
return r.loadWorkflow(context).thenApply(workflow -> {
if (workflow.isEmpty()) {
logger.error("Couldn't process report approval for report {}, it has no workflow",
r.getUuid());
} else {
for (int i = workflow.size() - 1; i >= 0; i--) {
final ReportAction reportAction = workflow.get(i);
if (reportAction.getCreatedAt() == null && i > 1) {
// Check previous action
final ReportAction previousAction = workflow.get(i - 1);
if (previousAction.getCreatedAt() != null
&& previousAction.getCreatedAt().isBefore(approvalTimeout)) {
// Approve the report
try {
final ApprovalStep approvalStep = reportAction.getStep();
final int numRows = dao.approve(r, null, approvalStep);
if (numRows == 0) {
logger.error("Couldn't process report approval for report {} step {}",
r.getUuid(), DaoUtils.getUuid(approvalStep));
} else {
AnetAuditLogger.log(
"report {} step {} automatically approved by the ReportApprovalWorker",
r.getUuid(), DaoUtils.getUuid(approvalStep));
}
} catch (Exception e) {
logger.error("Exception when approving report", e);
}
} catch (Exception e) {
logger.error("Exception when approving report", e);
break;
}
break;
}
}
}
}
}
return true;
});
}).toArray(CompletableFuture<?>[]::new);
// Wait for all our futures to complete before returning
CompletableFuture.allOf(allFutures).join();
}

}
45 changes: 25 additions & 20 deletions src/main/java/mil/dds/anet/threads/ReportPublicationWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import mil.dds.anet.beans.JobHistory;
import mil.dds.anet.beans.Report;
import mil.dds.anet.beans.Report.ReportState;
import mil.dds.anet.beans.ReportAction;
import mil.dds.anet.beans.search.ReportSearchQuery;
import mil.dds.anet.config.AnetConfiguration;
import mil.dds.anet.database.ReportDao;
Expand All @@ -34,28 +34,33 @@ protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Objec
query.setState(Collections.singletonList(ReportState.APPROVED));
query.setSystemSearch(true);
final List<Report> reports = dao.search(query).getList();
for (final Report r : reports) {
final List<ReportAction> workflow = r.loadWorkflow(context).join();
if (workflow.isEmpty()) {
logger.error("Couldn't process report publication for report {}, it has no workflow",
r.getUuid());
} else {
if (workflow.get(workflow.size() - 1).getCreatedAt().isBefore(quarantineApproval)) {
// Publish the report
try {
final int numRows = dao.publish(r, null);
if (numRows == 0) {
logger.error("Couldn't process report publication for report {}", r.getUuid());
} else {
AnetAuditLogger.log(
"report {} automatically published by the ReportPublicationWorker", r.getUuid());
final CompletableFuture<?>[] allFutures = reports.stream().map(r -> {
return r.loadWorkflow(context).thenApply(workflow -> {
if (workflow.isEmpty()) {
logger.error("Couldn't process report publication for report {}, it has no workflow",
r.getUuid());
} else {
if (workflow.get(workflow.size() - 1).getCreatedAt().isBefore(quarantineApproval)) {
// Publish the report
try {
final int numRows = dao.publish(r, null);
if (numRows == 0) {
logger.error("Couldn't process report publication for report {}", r.getUuid());
} else {
AnetAuditLogger.log(
"report {} automatically published by the ReportPublicationWorker",
r.getUuid());
}
} catch (Exception e) {
logger.error("Exception when publishing report", e);
}
} catch (Exception e) {
logger.error("Exception when publishing report", e);
}
}
}
}
return true;
});
}).toArray(CompletableFuture<?>[]::new);
// Wait for all our futures to complete before returning
CompletableFuture.allOf(allFutures).join();
}

}

0 comments on commit c847701

Please sign in to comment.