Skip to content

Commit

Permalink
#3231: Introduce a batching & caching context for workers
Browse files Browse the repository at this point in the history
  • Loading branch information
gjvoosten committed Nov 18, 2020
1 parent a105402 commit b3cd57e
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 36 deletions.
68 changes: 66 additions & 2 deletions src/main/java/mil/dds/anet/threads/AbstractWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

import java.lang.invoke.MethodHandles;
import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import mil.dds.anet.AnetObjectEngine;
import mil.dds.anet.beans.JobHistory;
import mil.dds.anet.config.AnetConfiguration;
import mil.dds.anet.database.JobHistoryDao;
import mil.dds.anet.utils.BatchingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -24,19 +29,78 @@ public AbstractWorker(AnetConfiguration config, String startMessage) {
this.jobHistoryDao = AnetObjectEngine.getInstance().getJobHistoryDao();
}

protected abstract void runInternal(Instant now, JobHistory jobHistory);
protected abstract void runInternal(Instant now, JobHistory jobHistory,
Map<String, Object> context);

@Override
public final void run() {
final String className = this.getClass().getSimpleName();
logger.debug("Starting {}: {}", className, startMessage);
final RunStatus runStatus = new RunStatus();
BatchingUtils batchingUtils = null;
try {
jobHistoryDao.runInTransaction(className, (now, jobHistory) -> runInternal(now, jobHistory));
batchingUtils = startDispatcher(runStatus);
final Map<String, Object> context = new HashMap<>();
context.put("dataLoaderRegistry", batchingUtils.getDataLoaderRegistry());
jobHistoryDao.runInTransaction(className,
(now, jobHistory) -> runInternal(now, jobHistory, context));
} catch (Throwable e) {
// Cannot let this thread die. Otherwise ANET will stop checking.
logger.error("Exception in run()", e);
} finally {
runStatus.setDone(true);
if (batchingUtils != null) {
batchingUtils.updateStats(AnetObjectEngine.getInstance().getMetricRegistry(),
batchingUtils.getDataLoaderRegistry());
batchingUtils.shutdown();
}
}
logger.debug("Ending {}", className);
}

private static class RunStatus {
private boolean done = false;

public boolean isDone() {
return done;
}

public void setDone(boolean done) {
this.done = done;
}

@Override
public String toString() {
return "RunStatus [done=" + done + "]";
}
}

private BatchingUtils startDispatcher(final RunStatus runStatus) {
final BatchingUtils batchingUtils =
new BatchingUtils(AnetObjectEngine.getInstance(), true, true);
final Runnable dispatcher = () -> {
while (!runStatus.isDone()) {
// Wait a while, giving other threads the chance to do some work
try {
Thread.yield();
Thread.sleep(50);
} catch (InterruptedException ignored) {
// just retry
}

// Dispatch all our data loaders until the request is done;
// we have data loaders at various depths (one dependent on another),
// e.g. in {@link Report#loadWorkflow}
final CompletableFuture<?>[] dispatchersWithWork = batchingUtils.getDataLoaderRegistry()
.getDataLoaders().stream().filter(dl -> dl.dispatchDepth() > 0)
.map(dl -> (CompletableFuture<?>) dl.dispatch()).toArray(CompletableFuture<?>[]::new);
if (dispatchersWithWork.length > 0) {
CompletableFuture.allOf(dispatchersWithWork).join();
}
}
};
Executors.newSingleThreadExecutor().execute(dispatcher);
return batchingUtils;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import mil.dds.anet.AnetObjectEngine;
import mil.dds.anet.beans.AnetEmail;
Expand Down Expand Up @@ -33,7 +34,7 @@ public AccountDeactivationWorker(AnetConfiguration config, PersonDao dao,
}

@Override
protected void runInternal(Instant now, JobHistory jobHistory) {
protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Object> context) {
// Make sure the mechanism will be triggered, so account deactivation checking can take place
final List<String> ignoredDomainNames = getDomainNamesToIgnore();
final List<Integer> daysTillEndOfTourWarnings = getDaysTillEndOfTourWarnings();
Expand Down
48 changes: 24 additions & 24 deletions src/main/java/mil/dds/anet/threads/AnetEmailWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static void setInstance(AnetEmailWorker instance) {
}

@Override
protected void runInternal(Instant now, JobHistory jobHistory) {
protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Object> context) {
@SuppressWarnings("unchecked")
final List<String> activeDomainNames =
((List<String>) config.getDictionaryEntry("activeDomainNames")).stream()
Expand All @@ -80,16 +80,16 @@ protected void runInternal(Instant now, JobHistory jobHistory) {
// Send the emails!
final List<Integer> processedEmails = new LinkedList<Integer>();
for (final AnetEmail email : emails) {
Map<String, Object> context = null;
Map<String, Object> emailContext = null;

try {
context = buildContext(email, smtpConfig);
if (context != null) {
emailContext = buildContext(context, email);
if (emailContext != null) {
logger.info("{} Sending email to {} re: {}", smtpConfig.isDisabled() ? "[Disabled] " : "",
email.getToAddresses(), email.getAction().getSubject(context));
email.getToAddresses(), email.getAction().getSubject(emailContext));

if (!smtpConfig.isDisabled()) {
sendEmail(email, context, smtpProps, smtpAuth, activeDomainNames);
sendEmail(email, emailContext, smtpProps, smtpAuth, activeDomainNames);
}
}
processedEmails.add(email.getId());
Expand All @@ -104,7 +104,7 @@ protected void runInternal(Instant now, JobHistory jobHistory) {
String message = "Purging stale email to ";
try {
message += email.getToAddresses();
message += email.getAction().getSubject(context);
message += email.getAction().getSubject(emailContext);
} finally {
logger.info(message);
processedEmails.add(email.getId());
Expand All @@ -118,40 +118,40 @@ protected void runInternal(Instant now, JobHistory jobHistory) {
dao.deletePendingEmails(processedEmails);
}

private Map<String, Object> buildContext(final AnetEmail email,
final SmtpConfiguration smtpConfig) {
private Map<String, Object> buildContext(final Map<String, Object> context,
final AnetEmail email) {
AnetObjectEngine engine = AnetObjectEngine.getInstance();
Map<String, Object> context = new HashMap<String, Object>();
context.put("context", engine.getContext());
context.put("serverUrl", serverUrl);
context.put(AdminSettingKeys.SECURITY_BANNER_TEXT.name(),
Map<String, Object> emailContext = new HashMap<String, Object>();
emailContext.put("context", context);
emailContext.put("serverUrl", serverUrl);
emailContext.put(AdminSettingKeys.SECURITY_BANNER_TEXT.name(),
engine.getAdminSetting(AdminSettingKeys.SECURITY_BANNER_TEXT));
context.put(AdminSettingKeys.SECURITY_BANNER_COLOR.name(),
emailContext.put(AdminSettingKeys.SECURITY_BANNER_COLOR.name(),
engine.getAdminSetting(AdminSettingKeys.SECURITY_BANNER_COLOR));
context.put("SUPPORT_EMAIL_ADDR", config.getDictionaryEntry("SUPPORT_EMAIL_ADDR"));
context.put("dateFormatter",
emailContext.put("SUPPORT_EMAIL_ADDR", config.getDictionaryEntry("SUPPORT_EMAIL_ADDR"));
emailContext.put("dateFormatter",
DateTimeFormatter.ofPattern((String) config.getDictionaryEntry("dateFormats.email.date"))
.withZone(DaoUtils.getDefaultZoneId()));
context.put("dateTimeFormatter",
emailContext.put("dateTimeFormatter",
DateTimeFormatter
.ofPattern((String) config.getDictionaryEntry("dateFormats.email.withTime"))
.withZone(DaoUtils.getDefaultZoneId()));
final boolean engagementsIncludeTimeAndDuration = Boolean.TRUE
.equals((Boolean) config.getDictionaryEntry("engagementsIncludeTimeAndDuration"));
context.put("engagementsIncludeTimeAndDuration", engagementsIncludeTimeAndDuration);
emailContext.put("engagementsIncludeTimeAndDuration", engagementsIncludeTimeAndDuration);
final String edtfPattern = (String) config
.getDictionaryEntry(engagementsIncludeTimeAndDuration ? "dateFormats.email.withTime"
: "dateFormats.email.date");
context.put("engagementDateFormatter",
emailContext.put("engagementDateFormatter",
DateTimeFormatter.ofPattern(edtfPattern).withZone(DaoUtils.getDefaultZoneId()));
@SuppressWarnings("unchecked")
final Map<String, Object> fields = (Map<String, Object>) config.getDictionaryEntry("fields");
context.put("fields", fields);
emailContext.put("fields", fields);

return email.getAction().buildContext(context);
return email.getAction().buildContext(emailContext);
}

private void sendEmail(final AnetEmail email, final Map<String, Object> context,
private void sendEmail(final AnetEmail email, final Map<String, Object> emailContext,
final Properties smtpProps, final Authenticator smtpAuth,
final List<String> activeDomainNames)
throws MessagingException, IOException, TemplateException {
Expand All @@ -170,11 +170,11 @@ private void sendEmail(final AnetEmail email, final Map<String, Object> context,
final Template temp =
Utils.getFreemarkerConfig(this.getClass()).getTemplate(email.getAction().getTemplateName());
// scan:ignore — false positive, we know which template we are processing
temp.process(context, writer);
temp.process(emailContext, writer);

final Session session = Session.getInstance(smtpProps, smtpAuth);
final Email mail = EmailBuilder.startingBlank().from(new InternetAddress(fromAddr))
.toMultiple(email.getToAddresses()).withSubject(email.getAction().getSubject(context))
.toMultiple(email.getToAddresses()).withSubject(email.getAction().getSubject(emailContext))
.withHTMLText(writer.toString()).buildEmail();

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public FutureEngagementWorker(AnetConfiguration config, ReportDao dao) {
}

@Override
protected void runInternal(Instant now, JobHistory jobHistory) {
protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Object> context) {
// Get a list of all reports related to upcoming engagements which have just
// become past engagements and need to change their report status to draft.
// When a report is for an engagement which just moved from future to past
Expand All @@ -35,7 +35,6 @@ protected void runInternal(Instant now, JobHistory jobHistory) {

// update to draft state and send emails to the authors to let them know we updated their
// report.
final Map<String, Object> context = AnetObjectEngine.getInstance().getContext();
for (Report r : reports) {
try {
AnetEmail email = new AnetEmail();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package mil.dds.anet.threads;

import java.time.Instant;
import java.util.Map;
import mil.dds.anet.beans.JobHistory;
import mil.dds.anet.config.AnetConfiguration;
import mil.dds.anet.database.AdminDao;
Expand All @@ -19,7 +20,7 @@ public MaterializedViewRefreshWorker(AnetConfiguration config, AdminDao dao) {
}

@Override
protected void runInternal(Instant now, JobHistory jobHistory) {
protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Object> context) {
for (final String materializedView : materializedViews) {
try {
dao.updateMaterializedView(materializedView);
Expand Down
4 changes: 1 addition & 3 deletions src/main/java/mil/dds/anet/threads/ReportApprovalWorker.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import mil.dds.anet.AnetObjectEngine;
import mil.dds.anet.beans.ApprovalStep;
import mil.dds.anet.beans.JobHistory;
import mil.dds.anet.beans.Report;
Expand All @@ -27,7 +26,7 @@ public ReportApprovalWorker(AnetConfiguration config, ReportDao dao) {
}

@Override
protected void runInternal(Instant now, JobHistory jobHistory) {
protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Object> context) {
final Instant approvalTimeout =
now.minus((Integer) config.getDictionaryEntry("reportWorkflow.nbOfHoursApprovalTimeout"),
ChronoUnit.HOURS);
Expand All @@ -37,7 +36,6 @@ protected void runInternal(Instant now, JobHistory jobHistory) {
query.setState(Collections.singletonList(ReportState.PENDING_APPROVAL));
query.setSystemSearch(true);
final List<Report> reports = dao.search(query).getList();
final Map<String, Object> context = AnetObjectEngine.getInstance().getContext();
for (final Report r : reports) {
final List<ReportAction> workflow = r.loadWorkflow(context).join();
if (workflow.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import mil.dds.anet.AnetObjectEngine;
import mil.dds.anet.beans.JobHistory;
import mil.dds.anet.beans.Report;
import mil.dds.anet.beans.Report.ReportState;
Expand All @@ -25,7 +24,7 @@ public ReportPublicationWorker(AnetConfiguration config, ReportDao dao) {
}

@Override
protected void runInternal(Instant now, JobHistory jobHistory) {
protected void runInternal(Instant now, JobHistory jobHistory, Map<String, Object> context) {
final Instant quarantineApproval =
now.minus((Integer) config.getDictionaryEntry("reportWorkflow.nbOfHoursQuarantineApproved"),
ChronoUnit.HOURS);
Expand All @@ -35,7 +34,6 @@ protected void runInternal(Instant now, JobHistory jobHistory) {
query.setState(Collections.singletonList(ReportState.APPROVED));
query.setSystemSearch(true);
final List<Report> reports = dao.search(query).getList();
final Map<String, Object> context = AnetObjectEngine.getInstance().getContext();
for (final Report r : reports) {
final List<ReportAction> workflow = r.loadWorkflow(context).join();
if (workflow.isEmpty()) {
Expand Down

0 comments on commit b3cd57e

Please sign in to comment.