From b3cd57ee8b7031adb5ed6fb53392e2ccf6800950 Mon Sep 17 00:00:00 2001 From: Gertjan van Oosten Date: Tue, 27 Oct 2020 13:12:22 +0100 Subject: [PATCH] NCI-Agency/anet#3231: Introduce a batching & caching context for workers --- .../mil/dds/anet/threads/AbstractWorker.java | 68 ++++++++++++++++++- .../threads/AccountDeactivationWorker.java | 3 +- .../mil/dds/anet/threads/AnetEmailWorker.java | 48 ++++++------- .../anet/threads/FutureEngagementWorker.java | 3 +- .../MaterializedViewRefreshWorker.java | 3 +- .../anet/threads/ReportApprovalWorker.java | 4 +- .../anet/threads/ReportPublicationWorker.java | 4 +- 7 files changed, 97 insertions(+), 36 deletions(-) diff --git a/src/main/java/mil/dds/anet/threads/AbstractWorker.java b/src/main/java/mil/dds/anet/threads/AbstractWorker.java index 6a48b1d5b1..285a399b17 100644 --- a/src/main/java/mil/dds/anet/threads/AbstractWorker.java +++ b/src/main/java/mil/dds/anet/threads/AbstractWorker.java @@ -2,10 +2,15 @@ import java.lang.invoke.MethodHandles; import java.time.Instant; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import mil.dds.anet.AnetObjectEngine; import mil.dds.anet.beans.JobHistory; import mil.dds.anet.config.AnetConfiguration; import mil.dds.anet.database.JobHistoryDao; +import mil.dds.anet.utils.BatchingUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,19 +29,78 @@ public AbstractWorker(AnetConfiguration config, String startMessage) { this.jobHistoryDao = AnetObjectEngine.getInstance().getJobHistoryDao(); } - protected abstract void runInternal(Instant now, JobHistory jobHistory); + protected abstract void runInternal(Instant now, JobHistory jobHistory, + Map context); @Override public final void run() { final String className = this.getClass().getSimpleName(); logger.debug("Starting {}: {}", className, startMessage); + final RunStatus runStatus = new RunStatus(); + BatchingUtils batchingUtils = null; try { - jobHistoryDao.runInTransaction(className, (now, jobHistory) -> runInternal(now, jobHistory)); + batchingUtils = startDispatcher(runStatus); + final Map context = new HashMap<>(); + context.put("dataLoaderRegistry", batchingUtils.getDataLoaderRegistry()); + jobHistoryDao.runInTransaction(className, + (now, jobHistory) -> runInternal(now, jobHistory, context)); } catch (Throwable e) { // Cannot let this thread die. Otherwise ANET will stop checking. logger.error("Exception in run()", e); + } finally { + runStatus.setDone(true); + if (batchingUtils != null) { + batchingUtils.updateStats(AnetObjectEngine.getInstance().getMetricRegistry(), + batchingUtils.getDataLoaderRegistry()); + batchingUtils.shutdown(); + } } logger.debug("Ending {}", className); } + private static class RunStatus { + private boolean done = false; + + public boolean isDone() { + return done; + } + + public void setDone(boolean done) { + this.done = done; + } + + @Override + public String toString() { + return "RunStatus [done=" + done + "]"; + } + } + + private BatchingUtils startDispatcher(final RunStatus runStatus) { + final BatchingUtils batchingUtils = + new BatchingUtils(AnetObjectEngine.getInstance(), true, true); + final Runnable dispatcher = () -> { + while (!runStatus.isDone()) { + // Wait a while, giving other threads the chance to do some work + try { + Thread.yield(); + Thread.sleep(50); + } catch (InterruptedException ignored) { + // just retry + } + + // Dispatch all our data loaders until the request is done; + // we have data loaders at various depths (one dependent on another), + // e.g. in {@link Report#loadWorkflow} + final CompletableFuture[] dispatchersWithWork = batchingUtils.getDataLoaderRegistry() + .getDataLoaders().stream().filter(dl -> dl.dispatchDepth() > 0) + .map(dl -> (CompletableFuture) dl.dispatch()).toArray(CompletableFuture[]::new); + if (dispatchersWithWork.length > 0) { + CompletableFuture.allOf(dispatchersWithWork).join(); + } + } + }; + Executors.newSingleThreadExecutor().execute(dispatcher); + return batchingUtils; + } + } diff --git a/src/main/java/mil/dds/anet/threads/AccountDeactivationWorker.java b/src/main/java/mil/dds/anet/threads/AccountDeactivationWorker.java index d841a804b0..aa3fc87029 100644 --- a/src/main/java/mil/dds/anet/threads/AccountDeactivationWorker.java +++ b/src/main/java/mil/dds/anet/threads/AccountDeactivationWorker.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import mil.dds.anet.AnetObjectEngine; import mil.dds.anet.beans.AnetEmail; @@ -33,7 +34,7 @@ public AccountDeactivationWorker(AnetConfiguration config, PersonDao dao, } @Override - protected void runInternal(Instant now, JobHistory jobHistory) { + protected void runInternal(Instant now, JobHistory jobHistory, Map context) { // Make sure the mechanism will be triggered, so account deactivation checking can take place final List ignoredDomainNames = getDomainNamesToIgnore(); final List daysTillEndOfTourWarnings = getDaysTillEndOfTourWarnings(); diff --git a/src/main/java/mil/dds/anet/threads/AnetEmailWorker.java b/src/main/java/mil/dds/anet/threads/AnetEmailWorker.java index c37729ba05..09b8fb25ca 100644 --- a/src/main/java/mil/dds/anet/threads/AnetEmailWorker.java +++ b/src/main/java/mil/dds/anet/threads/AnetEmailWorker.java @@ -68,7 +68,7 @@ public static void setInstance(AnetEmailWorker instance) { } @Override - protected void runInternal(Instant now, JobHistory jobHistory) { + protected void runInternal(Instant now, JobHistory jobHistory, Map context) { @SuppressWarnings("unchecked") final List activeDomainNames = ((List) config.getDictionaryEntry("activeDomainNames")).stream() @@ -80,16 +80,16 @@ protected void runInternal(Instant now, JobHistory jobHistory) { // Send the emails! final List processedEmails = new LinkedList(); for (final AnetEmail email : emails) { - Map context = null; + Map emailContext = null; try { - context = buildContext(email, smtpConfig); - if (context != null) { + emailContext = buildContext(context, email); + if (emailContext != null) { logger.info("{} Sending email to {} re: {}", smtpConfig.isDisabled() ? "[Disabled] " : "", - email.getToAddresses(), email.getAction().getSubject(context)); + email.getToAddresses(), email.getAction().getSubject(emailContext)); if (!smtpConfig.isDisabled()) { - sendEmail(email, context, smtpProps, smtpAuth, activeDomainNames); + sendEmail(email, emailContext, smtpProps, smtpAuth, activeDomainNames); } } processedEmails.add(email.getId()); @@ -104,7 +104,7 @@ protected void runInternal(Instant now, JobHistory jobHistory) { String message = "Purging stale email to "; try { message += email.getToAddresses(); - message += email.getAction().getSubject(context); + message += email.getAction().getSubject(emailContext); } finally { logger.info(message); processedEmails.add(email.getId()); @@ -118,40 +118,40 @@ protected void runInternal(Instant now, JobHistory jobHistory) { dao.deletePendingEmails(processedEmails); } - private Map buildContext(final AnetEmail email, - final SmtpConfiguration smtpConfig) { + private Map buildContext(final Map context, + final AnetEmail email) { AnetObjectEngine engine = AnetObjectEngine.getInstance(); - Map context = new HashMap(); - context.put("context", engine.getContext()); - context.put("serverUrl", serverUrl); - context.put(AdminSettingKeys.SECURITY_BANNER_TEXT.name(), + Map emailContext = new HashMap(); + emailContext.put("context", context); + emailContext.put("serverUrl", serverUrl); + emailContext.put(AdminSettingKeys.SECURITY_BANNER_TEXT.name(), engine.getAdminSetting(AdminSettingKeys.SECURITY_BANNER_TEXT)); - context.put(AdminSettingKeys.SECURITY_BANNER_COLOR.name(), + emailContext.put(AdminSettingKeys.SECURITY_BANNER_COLOR.name(), engine.getAdminSetting(AdminSettingKeys.SECURITY_BANNER_COLOR)); - context.put("SUPPORT_EMAIL_ADDR", config.getDictionaryEntry("SUPPORT_EMAIL_ADDR")); - context.put("dateFormatter", + emailContext.put("SUPPORT_EMAIL_ADDR", config.getDictionaryEntry("SUPPORT_EMAIL_ADDR")); + emailContext.put("dateFormatter", DateTimeFormatter.ofPattern((String) config.getDictionaryEntry("dateFormats.email.date")) .withZone(DaoUtils.getDefaultZoneId())); - context.put("dateTimeFormatter", + emailContext.put("dateTimeFormatter", DateTimeFormatter .ofPattern((String) config.getDictionaryEntry("dateFormats.email.withTime")) .withZone(DaoUtils.getDefaultZoneId())); final boolean engagementsIncludeTimeAndDuration = Boolean.TRUE .equals((Boolean) config.getDictionaryEntry("engagementsIncludeTimeAndDuration")); - context.put("engagementsIncludeTimeAndDuration", engagementsIncludeTimeAndDuration); + emailContext.put("engagementsIncludeTimeAndDuration", engagementsIncludeTimeAndDuration); final String edtfPattern = (String) config .getDictionaryEntry(engagementsIncludeTimeAndDuration ? "dateFormats.email.withTime" : "dateFormats.email.date"); - context.put("engagementDateFormatter", + emailContext.put("engagementDateFormatter", DateTimeFormatter.ofPattern(edtfPattern).withZone(DaoUtils.getDefaultZoneId())); @SuppressWarnings("unchecked") final Map fields = (Map) config.getDictionaryEntry("fields"); - context.put("fields", fields); + emailContext.put("fields", fields); - return email.getAction().buildContext(context); + return email.getAction().buildContext(emailContext); } - private void sendEmail(final AnetEmail email, final Map context, + private void sendEmail(final AnetEmail email, final Map emailContext, final Properties smtpProps, final Authenticator smtpAuth, final List activeDomainNames) throws MessagingException, IOException, TemplateException { @@ -170,11 +170,11 @@ private void sendEmail(final AnetEmail email, final Map context, final Template temp = Utils.getFreemarkerConfig(this.getClass()).getTemplate(email.getAction().getTemplateName()); // scan:ignore — false positive, we know which template we are processing - temp.process(context, writer); + temp.process(emailContext, writer); final Session session = Session.getInstance(smtpProps, smtpAuth); final Email mail = EmailBuilder.startingBlank().from(new InternetAddress(fromAddr)) - .toMultiple(email.getToAddresses()).withSubject(email.getAction().getSubject(context)) + .toMultiple(email.getToAddresses()).withSubject(email.getAction().getSubject(emailContext)) .withHTMLText(writer.toString()).buildEmail(); try { diff --git a/src/main/java/mil/dds/anet/threads/FutureEngagementWorker.java b/src/main/java/mil/dds/anet/threads/FutureEngagementWorker.java index ce6d077875..e538f42819 100644 --- a/src/main/java/mil/dds/anet/threads/FutureEngagementWorker.java +++ b/src/main/java/mil/dds/anet/threads/FutureEngagementWorker.java @@ -22,7 +22,7 @@ public FutureEngagementWorker(AnetConfiguration config, ReportDao dao) { } @Override - protected void runInternal(Instant now, JobHistory jobHistory) { + protected void runInternal(Instant now, JobHistory jobHistory, Map context) { // Get a list of all reports related to upcoming engagements which have just // become past engagements and need to change their report status to draft. // When a report is for an engagement which just moved from future to past @@ -35,7 +35,6 @@ protected void runInternal(Instant now, JobHistory jobHistory) { // update to draft state and send emails to the authors to let them know we updated their // report. - final Map context = AnetObjectEngine.getInstance().getContext(); for (Report r : reports) { try { AnetEmail email = new AnetEmail(); diff --git a/src/main/java/mil/dds/anet/threads/MaterializedViewRefreshWorker.java b/src/main/java/mil/dds/anet/threads/MaterializedViewRefreshWorker.java index 4061a275c5..0b1d0ba113 100644 --- a/src/main/java/mil/dds/anet/threads/MaterializedViewRefreshWorker.java +++ b/src/main/java/mil/dds/anet/threads/MaterializedViewRefreshWorker.java @@ -1,6 +1,7 @@ package mil.dds.anet.threads; import java.time.Instant; +import java.util.Map; import mil.dds.anet.beans.JobHistory; import mil.dds.anet.config.AnetConfiguration; import mil.dds.anet.database.AdminDao; @@ -19,7 +20,7 @@ public MaterializedViewRefreshWorker(AnetConfiguration config, AdminDao dao) { } @Override - protected void runInternal(Instant now, JobHistory jobHistory) { + protected void runInternal(Instant now, JobHistory jobHistory, Map context) { for (final String materializedView : materializedViews) { try { dao.updateMaterializedView(materializedView); diff --git a/src/main/java/mil/dds/anet/threads/ReportApprovalWorker.java b/src/main/java/mil/dds/anet/threads/ReportApprovalWorker.java index 39405a246b..e5f067487c 100644 --- a/src/main/java/mil/dds/anet/threads/ReportApprovalWorker.java +++ b/src/main/java/mil/dds/anet/threads/ReportApprovalWorker.java @@ -5,7 +5,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import mil.dds.anet.AnetObjectEngine; import mil.dds.anet.beans.ApprovalStep; import mil.dds.anet.beans.JobHistory; import mil.dds.anet.beans.Report; @@ -27,7 +26,7 @@ public ReportApprovalWorker(AnetConfiguration config, ReportDao dao) { } @Override - protected void runInternal(Instant now, JobHistory jobHistory) { + protected void runInternal(Instant now, JobHistory jobHistory, Map context) { final Instant approvalTimeout = now.minus((Integer) config.getDictionaryEntry("reportWorkflow.nbOfHoursApprovalTimeout"), ChronoUnit.HOURS); @@ -37,7 +36,6 @@ protected void runInternal(Instant now, JobHistory jobHistory) { query.setState(Collections.singletonList(ReportState.PENDING_APPROVAL)); query.setSystemSearch(true); final List reports = dao.search(query).getList(); - final Map context = AnetObjectEngine.getInstance().getContext(); for (final Report r : reports) { final List workflow = r.loadWorkflow(context).join(); if (workflow.isEmpty()) { diff --git a/src/main/java/mil/dds/anet/threads/ReportPublicationWorker.java b/src/main/java/mil/dds/anet/threads/ReportPublicationWorker.java index 86f65f2ec4..589feadf83 100644 --- a/src/main/java/mil/dds/anet/threads/ReportPublicationWorker.java +++ b/src/main/java/mil/dds/anet/threads/ReportPublicationWorker.java @@ -5,7 +5,6 @@ import java.util.Collections; import java.util.List; import java.util.Map; -import mil.dds.anet.AnetObjectEngine; import mil.dds.anet.beans.JobHistory; import mil.dds.anet.beans.Report; import mil.dds.anet.beans.Report.ReportState; @@ -25,7 +24,7 @@ public ReportPublicationWorker(AnetConfiguration config, ReportDao dao) { } @Override - protected void runInternal(Instant now, JobHistory jobHistory) { + protected void runInternal(Instant now, JobHistory jobHistory, Map context) { final Instant quarantineApproval = now.minus((Integer) config.getDictionaryEntry("reportWorkflow.nbOfHoursQuarantineApproved"), ChronoUnit.HOURS); @@ -35,7 +34,6 @@ protected void runInternal(Instant now, JobHistory jobHistory) { query.setState(Collections.singletonList(ReportState.APPROVED)); query.setSystemSearch(true); final List reports = dao.search(query).getList(); - final Map context = AnetObjectEngine.getInstance().getContext(); for (final Report r : reports) { final List workflow = r.loadWorkflow(context).join(); if (workflow.isEmpty()) {