diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java index b92f6421f..d50588b08 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java @@ -11,16 +11,8 @@ package org.opensearch.ad; -import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; - -import java.util.Set; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.logging.log4j.core.util.Throwables; -import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.action.get.GetRequest; -import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.indices.ADIndex; import org.opensearch.ad.indices.ADIndexManagement; import org.opensearch.ad.model.ADTask; @@ -32,31 +24,23 @@ import org.opensearch.ad.task.ADTaskCacheManager; import org.opensearch.ad.task.ADTaskManager; import org.opensearch.ad.transport.ADProfileAction; -import org.opensearch.ad.transport.RCFPollingAction; -import org.opensearch.ad.transport.RCFPollingRequest; -import org.opensearch.ad.transport.RCFPollingResponse; import org.opensearch.client.Client; -import org.opensearch.common.xcontent.LoggingDeprecationHandler; -import org.opensearch.common.xcontent.XContentType; -import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; -import org.opensearch.core.xcontent.XContentParser; import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.ProfileRunner; -import org.opensearch.timeseries.common.exception.NotSerializedExceptionName; -import org.opensearch.timeseries.common.exception.ResourceNotFoundException; -import org.opensearch.timeseries.constant.CommonMessages; -import org.opensearch.timeseries.constant.CommonName; -import org.opensearch.timeseries.model.Config; -import org.opensearch.timeseries.model.ConfigState; -import org.opensearch.timeseries.model.Job; import org.opensearch.timeseries.model.ProfileName; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; -import org.opensearch.timeseries.util.ExceptionUtil; -import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener; import org.opensearch.timeseries.util.SecurityClientUtil; import org.opensearch.transport.TransportService; +/** + * Since version 2.15, we have merged the single-stream and HC detector workflows. Consequently, separate logic for profiling is no longer necessary. + * + * During a Blue/Green (B/G) deployment, if an old node communicates with a new node regarding an old model, we will not execute RCFPollingAction to + * determine model updates. However, we have fallback logic that checks for anomaly results. If any results are found, the initialization progress is + * set to 100%. + * + */ public class AnomalyDetectorProfileRunner extends ProfileRunner { @@ -95,182 +79,4 @@ public AnomalyDetectorProfileRunner( protected DetectorProfile.Builder createProfileBuilder() { return new DetectorProfile.Builder(); } - - @Override - protected void prepareProfile(Config config, ActionListener listener, Set profilesToCollect) { - boolean isHC = config.isHighCardinality(); - if (isHC) { - super.prepareProfile(config, listener, profilesToCollect); - } else { - String configId = config.getId(); - GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, configId); - client.get(getRequest, ActionListener.wrap(getResponse -> { - if (getResponse != null && getResponse.isExists()) { - try ( - XContentParser parser = XContentType.JSON - .xContent() - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) - ) { - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - Job job = Job.parse(parser); - long enabledTimeMs = job.getEnabledTime().toEpochMilli(); - - int totalResponsesToWait = 0; - if (profilesToCollect.contains(ProfileName.ERROR)) { - totalResponsesToWait++; - } - - // total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide - // when to consolidate results and return to users - - if (profilesToCollect.contains(ProfileName.STATE) || profilesToCollect.contains(ProfileName.INIT_PROGRESS)) { - totalResponsesToWait++; - } - if (profilesToCollect.contains(ProfileName.COORDINATING_NODE) - || profilesToCollect.contains(ProfileName.TOTAL_SIZE_IN_BYTES) - || profilesToCollect.contains(ProfileName.MODELS)) { - totalResponsesToWait++; - } - if (profilesToCollect.contains(ProfileName.AD_TASK)) { - totalResponsesToWait++; - } - - MultiResponsesDelegateActionListener delegateListener = - new MultiResponsesDelegateActionListener( - listener, - totalResponsesToWait, - CommonMessages.FAIL_FETCH_ERR_MSG + configId, - false - ); - if (profilesToCollect.contains(ProfileName.ERROR)) { - taskManager.getAndExecuteOnLatestConfigLevelTask(configId, realTimeTaskTypes, task -> { - DetectorProfile.Builder profileBuilder = createProfileBuilder(); - if (task.isPresent()) { - long lastUpdateTimeMs = task.get().getLastUpdateTime().toEpochMilli(); - - // if state index hasn't been updated, we should not use the error field - // For example, before a detector is enabled, if the error message contains - // the phrase "stopped due to blah", we should not show this when the detector - // is enabled. - if (lastUpdateTimeMs > enabledTimeMs && task.get().getError() != null) { - profileBuilder.error(task.get().getError()); - } - delegateListener.onResponse(profileBuilder.build()); - } else { - // detector state for this detector does not exist - delegateListener.onResponse(profileBuilder.build()); - } - }, transportService, false, delegateListener); - } - - // total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide - // when to consolidate results and return to users - - if (profilesToCollect.contains(ProfileName.STATE) || profilesToCollect.contains(ProfileName.INIT_PROGRESS)) { - profileStateRelated(config, delegateListener, job.isEnabled(), profilesToCollect); - } - if (profilesToCollect.contains(ProfileName.COORDINATING_NODE) - || profilesToCollect.contains(ProfileName.TOTAL_SIZE_IN_BYTES) - || profilesToCollect.contains(ProfileName.MODELS)) { - profileModels(config, profilesToCollect, job, delegateListener); - } - if (profilesToCollect.contains(ProfileName.AD_TASK)) { - getLatestHistoricalTaskProfile(configId, transportService, null, delegateListener); - } - - } catch (Exception e) { - logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG, e); - listener.onFailure(e); - } - } else { - onGetDetectorForPrepare(configId, listener, profilesToCollect); - } - }, exception -> { - if (ExceptionUtil.isIndexNotAvailable(exception)) { - logger.info(exception.getMessage()); - onGetDetectorForPrepare(configId, listener, profilesToCollect); - } else { - logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG + configId); - listener.onFailure(exception); - } - })); - } - } - - /** - * We expect three kinds of states: - * -Disabled: if get ad job api says the job is disabled; - * -Init: if rcf model's total updates is less than required - * -Running: if neither of the above applies and no exceptions. - * @param config config accessor - * @param listener listener to process the returned state or exception - * @param enabled whether the detector job is enabled or not - * @param profilesToCollect target profiles to fetch - */ - private void profileStateRelated( - Config config, - MultiResponsesDelegateActionListener listener, - boolean enabled, - Set profilesToCollect - ) { - if (enabled) { - RCFPollingRequest request = new RCFPollingRequest(config.getId()); - client.execute(RCFPollingAction.INSTANCE, request, onPollRCFUpdates(config, profilesToCollect, listener)); - } else { - DetectorProfile.Builder builder = new DetectorProfile.Builder(); - if (profilesToCollect.contains(ProfileName.STATE)) { - builder.state(ConfigState.DISABLED); - } - listener.onResponse(builder.build()); - } - } - - /** - * Listener for polling rcf updates through transport messaging - * @param detector anomaly detector - * @param profilesToCollect profiles to collect like state - * @param listener delegate listener - * @return Listener for polling rcf updates through transport messaging - */ - private ActionListener onPollRCFUpdates( - Config detector, - Set profilesToCollect, - MultiResponsesDelegateActionListener listener - ) { - return ActionListener.wrap(rcfPollResponse -> { - long totalUpdates = rcfPollResponse.getTotalUpdates(); - if (totalUpdates < requiredSamples) { - processInitResponse(detector, profilesToCollect, totalUpdates, false, new DetectorProfile.Builder(), listener); - } else { - DetectorProfile.Builder builder = createProfileBuilder(); - createRunningStateAndInitProgress(profilesToCollect, builder); - listener.onResponse(builder.build()); - } - }, exception -> { - // we will get an AnomalyDetectionException wrapping the real exception inside - Throwable cause = Throwables.getRootCause(exception); - - // exception can be a RemoteTransportException - Exception causeException = (Exception) cause; - if (ExceptionUtil - .isException( - causeException, - ResourceNotFoundException.class, - NotSerializedExceptionName.RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE.getName() - ) - || (ExceptionUtil.isIndexNotAvailable(causeException) - && causeException.getMessage().contains(ADCommonName.CHECKPOINT_INDEX_NAME))) { - // cannot find checkpoint - // We don't want to show the estimated time remaining to initialize - // a detector before cold start finishes, where the actual - // initialization time may be much shorter if sufficient historical - // data exists. - processInitResponse(detector, profilesToCollect, 0L, true, createProfileBuilder(), listener); - } else { - logger.error(new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getId()), exception); - listener.onFailure(exception); - } - }); - } - } diff --git a/src/main/java/org/opensearch/ad/ml/ADCheckpointDao.java b/src/main/java/org/opensearch/ad/ml/ADCheckpointDao.java index e2edcfdd0..54e133406 100644 --- a/src/main/java/org/opensearch/ad/ml/ADCheckpointDao.java +++ b/src/main/java/org/opensearch/ad/ml/ADCheckpointDao.java @@ -447,6 +447,7 @@ ThresholdedRandomCutForest toTrcf(String checkpoint) { }); trcf = trcfMapper.toModel(state); } catch (RuntimeException e) { + logger.info("checkpoint to restore: " + checkpoint); logger.error("Failed to deserialize TRCF model", e); } } diff --git a/src/main/java/org/opensearch/timeseries/transport/BaseProfileTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/BaseProfileTransportAction.java index d1163fd51..412a95430 100644 --- a/src/main/java/org/opensearch/timeseries/transport/BaseProfileTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/BaseProfileTransportAction.java @@ -122,7 +122,7 @@ protected ProfileNodeResponse nodeOperation(ProfileNodeRequest request) { // state profile requires totalUpdates as well if (profiles.contains(ProfileName.INIT_PROGRESS) || profiles.contains(ProfileName.STATE)) { - totalUpdates = cacheProvider.get().getTotalUpdates(configId);// get toal updates + totalUpdates = cacheProvider.get().getTotalUpdates(configId);// get total updates } if (profiles.contains(ProfileName.TOTAL_SIZE_IN_BYTES)) { modelSize = cacheProvider.get().getModelSize(configId); diff --git a/src/test/java/org/opensearch/ad/AbstractProfileRunnerTests.java b/src/test/java/org/opensearch/ad/AbstractProfileRunnerTests.java index 0972f2990..61ca1b2a1 100644 --- a/src/test/java/org/opensearch/ad/AbstractProfileRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AbstractProfileRunnerTests.java @@ -68,7 +68,7 @@ protected enum ErrorResultStatus { NULL_POINTER_EXCEPTION } - protected AnomalyDetectorProfileRunner runner; + protected OldAnomalyDetectorProfileRunner oldRunner; protected Client client; protected SecurityClientUtil clientUtil; protected DiscoveryNodeFilterer nodeFilter; diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorProfileRunnerTests.java b/src/test/java/org/opensearch/ad/AnomalyDetectorProfileRunnerTests.java index c418a2c8a..4036e3665 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorProfileRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorProfileRunnerTests.java @@ -108,7 +108,7 @@ private void setUpClientGet( return null; }).when(nodeStateManager).getConfig(anyString(), eq(AnalysisType.AD), any(ActionListener.class)); clientUtil = new SecurityClientUtil(nodeStateManager, Settings.EMPTY); - runner = new AnomalyDetectorProfileRunner( + oldRunner = new OldAnomalyDetectorProfileRunner( client, clientUtil, xContentRegistry(), @@ -198,7 +198,7 @@ public void testDetectorNotExist() throws IOException, InterruptedException { setUpClientGet(DetectorStatus.INDEX_NOT_EXIST, JobStatus.INDEX_NOT_EXIT, RCFPollingStatus.EMPTY, ErrorResultStatus.NO_ERROR); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile("x123", ActionListener.wrap(response -> { + oldRunner.profile("x123", ActionListener.wrap(response -> { assertTrue("Should not reach here", false); inProgressLatch.countDown(); }, exception -> { @@ -213,7 +213,7 @@ public void testDisabledJobIndexTemplate(JobStatus status) throws IOException, I ConfigProfile expectedProfile = new DetectorProfile.Builder().state(ConfigState.DISABLED).build(); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { @@ -237,7 +237,7 @@ public void testInitOrRunningStateTemplate(RCFPollingStatus status, ConfigState DetectorProfile expectedProfile = new DetectorProfile.Builder().state(expectedState).build(); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { @@ -307,7 +307,7 @@ public void testErrorStateTemplate( DetectorProfile expectedProfile = builder.build(); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { @@ -516,7 +516,7 @@ public void testProfileModels() throws InterruptedException, IOException { final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(profileResponse -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(profileResponse -> { assertEquals(node1, profileResponse.getCoordinatingNode()); assertEquals(modelSize * 2, profileResponse.getTotalSizeInBytes()); assertEquals(2, profileResponse.getModelProfile().length); @@ -547,7 +547,7 @@ public void testInitProgress() throws IOException, InterruptedException { expectedProfile.setInitProgress(profile); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { @@ -566,7 +566,7 @@ public void testInitProgressFailImmediately() throws IOException, InterruptedExc expectedProfile.setInitProgress(profile); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertTrue("Should not reach here ", false); inProgressLatch.countDown(); }, exception -> { @@ -584,7 +584,7 @@ public void testInitNoUpdateNoIndex() throws IOException, InterruptedException { .build(); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { @@ -606,7 +606,7 @@ public void testInitNoIndex() throws IOException, InterruptedException { .build(); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { @@ -640,7 +640,7 @@ public void testFailRCFPolling() throws IOException, InterruptedException { setUpClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, RCFPollingStatus.EXCEPTION, ErrorResultStatus.NO_ERROR); final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertTrue("Should not reach here ", false); inProgressLatch.countDown(); }, exception -> { diff --git a/src/test/java/org/opensearch/ad/OldAnomalyDetectorProfileRunner.java b/src/test/java/org/opensearch/ad/OldAnomalyDetectorProfileRunner.java new file mode 100644 index 000000000..ed9ad4ec0 --- /dev/null +++ b/src/test/java/org/opensearch/ad/OldAnomalyDetectorProfileRunner.java @@ -0,0 +1,282 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.ad; + +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; + +import java.util.Set; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.apache.logging.log4j.core.util.Throwables; +import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.action.get.GetRequest; +import org.opensearch.ad.constant.ADCommonName; +import org.opensearch.ad.indices.ADIndex; +import org.opensearch.ad.indices.ADIndexManagement; +import org.opensearch.ad.model.ADTask; +import org.opensearch.ad.model.ADTaskProfile; +import org.opensearch.ad.model.ADTaskType; +import org.opensearch.ad.model.AnomalyDetector; +import org.opensearch.ad.model.DetectorProfile; +import org.opensearch.ad.settings.ADNumericSetting; +import org.opensearch.ad.task.ADTaskCacheManager; +import org.opensearch.ad.task.ADTaskManager; +import org.opensearch.ad.transport.ADProfileAction; +import org.opensearch.ad.transport.RCFPollingAction; +import org.opensearch.ad.transport.RCFPollingRequest; +import org.opensearch.ad.transport.RCFPollingResponse; +import org.opensearch.client.Client; +import org.opensearch.common.xcontent.LoggingDeprecationHandler; +import org.opensearch.common.xcontent.XContentType; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; +import org.opensearch.core.xcontent.XContentParser; +import org.opensearch.timeseries.AnalysisType; +import org.opensearch.timeseries.ProfileRunner; +import org.opensearch.timeseries.common.exception.NotSerializedExceptionName; +import org.opensearch.timeseries.common.exception.ResourceNotFoundException; +import org.opensearch.timeseries.constant.CommonMessages; +import org.opensearch.timeseries.constant.CommonName; +import org.opensearch.timeseries.model.Config; +import org.opensearch.timeseries.model.ConfigState; +import org.opensearch.timeseries.model.Job; +import org.opensearch.timeseries.model.ProfileName; +import org.opensearch.timeseries.util.DiscoveryNodeFilterer; +import org.opensearch.timeseries.util.ExceptionUtil; +import org.opensearch.timeseries.util.MultiResponsesDelegateActionListener; +import org.opensearch.timeseries.util.SecurityClientUtil; +import org.opensearch.transport.TransportService; + +/** + * Profile runner that deals with single stream detectors have different workflow (e.g., require RCFPollingAction + * to get model updates). Keep the old code here so that I can write tests for old logic. Need to keep old code (e.g., + * RCFPollingAction and ModelManager.getTotalUpdates to deal with requests from old node during B/G). + * + */ +public class OldAnomalyDetectorProfileRunner extends + ProfileRunner { + + private final Logger logger = LogManager.getLogger(AnomalyDetectorProfileRunner.class); + + public OldAnomalyDetectorProfileRunner( + Client client, + SecurityClientUtil clientUtil, + NamedXContentRegistry xContentRegistry, + DiscoveryNodeFilterer nodeFilter, + long requiredSamples, + TransportService transportService, + ADTaskManager adTaskManager, + ADTaskProfileRunner taskProfileRunner + ) { + super( + client, + clientUtil, + xContentRegistry, + nodeFilter, + requiredSamples, + transportService, + adTaskManager, + AnalysisType.AD, + ADTaskType.REALTIME_TASK_TYPES, + ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES, + ADNumericSetting.maxCategoricalFields(), + ProfileName.AD_TASK, + ADProfileAction.INSTANCE, + AnomalyDetector::parse, + taskProfileRunner + ); + } + + @Override + protected DetectorProfile.Builder createProfileBuilder() { + return new DetectorProfile.Builder(); + } + + @Override + protected void prepareProfile(Config config, ActionListener listener, Set profilesToCollect) { + boolean isHC = config.isHighCardinality(); + if (isHC) { + super.prepareProfile(config, listener, profilesToCollect); + } else { + String configId = config.getId(); + GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX, configId); + client.get(getRequest, ActionListener.wrap(getResponse -> { + if (getResponse != null && getResponse.isExists()) { + try ( + XContentParser parser = XContentType.JSON + .xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + Job job = Job.parse(parser); + long enabledTimeMs = job.getEnabledTime().toEpochMilli(); + + int totalResponsesToWait = 0; + if (profilesToCollect.contains(ProfileName.ERROR)) { + totalResponsesToWait++; + } + + // total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide + // when to consolidate results and return to users + + if (profilesToCollect.contains(ProfileName.STATE) || profilesToCollect.contains(ProfileName.INIT_PROGRESS)) { + totalResponsesToWait++; + } + if (profilesToCollect.contains(ProfileName.COORDINATING_NODE) + || profilesToCollect.contains(ProfileName.TOTAL_SIZE_IN_BYTES) + || profilesToCollect.contains(ProfileName.MODELS)) { + totalResponsesToWait++; + } + if (profilesToCollect.contains(ProfileName.AD_TASK)) { + totalResponsesToWait++; + } + + MultiResponsesDelegateActionListener delegateListener = + new MultiResponsesDelegateActionListener( + listener, + totalResponsesToWait, + CommonMessages.FAIL_FETCH_ERR_MSG + configId, + false + ); + if (profilesToCollect.contains(ProfileName.ERROR)) { + taskManager.getAndExecuteOnLatestConfigLevelTask(configId, realTimeTaskTypes, task -> { + DetectorProfile.Builder profileBuilder = createProfileBuilder(); + if (task.isPresent()) { + long lastUpdateTimeMs = task.get().getLastUpdateTime().toEpochMilli(); + + // if state index hasn't been updated, we should not use the error field + // For example, before a detector is enabled, if the error message contains + // the phrase "stopped due to blah", we should not show this when the detector + // is enabled. + if (lastUpdateTimeMs > enabledTimeMs && task.get().getError() != null) { + profileBuilder.error(task.get().getError()); + } + delegateListener.onResponse(profileBuilder.build()); + } else { + // detector state for this detector does not exist + delegateListener.onResponse(profileBuilder.build()); + } + }, transportService, false, delegateListener); + } + + // total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide + // when to consolidate results and return to users + + if (profilesToCollect.contains(ProfileName.STATE) || profilesToCollect.contains(ProfileName.INIT_PROGRESS)) { + profileStateRelated(config, delegateListener, job.isEnabled(), profilesToCollect); + } + if (profilesToCollect.contains(ProfileName.COORDINATING_NODE) + || profilesToCollect.contains(ProfileName.TOTAL_SIZE_IN_BYTES) + || profilesToCollect.contains(ProfileName.MODELS)) { + profileModels(config, profilesToCollect, job, delegateListener); + } + if (profilesToCollect.contains(ProfileName.AD_TASK)) { + getLatestHistoricalTaskProfile(configId, transportService, null, delegateListener); + } + + } catch (Exception e) { + logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG, e); + listener.onFailure(e); + } + } else { + onGetDetectorForPrepare(configId, listener, profilesToCollect); + } + }, exception -> { + if (ExceptionUtil.isIndexNotAvailable(exception)) { + logger.info(exception.getMessage()); + onGetDetectorForPrepare(configId, listener, profilesToCollect); + } else { + logger.error(CommonMessages.FAIL_TO_GET_PROFILE_MSG + configId); + listener.onFailure(exception); + } + })); + } + } + + /** + * We expect three kinds of states: + * -Disabled: if get ad job api says the job is disabled; + * -Init: if rcf model's total updates is less than required + * -Running: if neither of the above applies and no exceptions. + * @param config config accessor + * @param listener listener to process the returned state or exception + * @param enabled whether the detector job is enabled or not + * @param profilesToCollect target profiles to fetch + */ + private void profileStateRelated( + Config config, + MultiResponsesDelegateActionListener listener, + boolean enabled, + Set profilesToCollect + ) { + if (enabled) { + RCFPollingRequest request = new RCFPollingRequest(config.getId()); + client.execute(RCFPollingAction.INSTANCE, request, onPollRCFUpdates(config, profilesToCollect, listener)); + } else { + DetectorProfile.Builder builder = new DetectorProfile.Builder(); + if (profilesToCollect.contains(ProfileName.STATE)) { + builder.state(ConfigState.DISABLED); + } + listener.onResponse(builder.build()); + } + } + + /** + * Listener for polling rcf updates through transport messaging + * @param detector anomaly detector + * @param profilesToCollect profiles to collect like state + * @param listener delegate listener + * @return Listener for polling rcf updates through transport messaging + */ + private ActionListener onPollRCFUpdates( + Config detector, + Set profilesToCollect, + MultiResponsesDelegateActionListener listener + ) { + return ActionListener.wrap(rcfPollResponse -> { + long totalUpdates = rcfPollResponse.getTotalUpdates(); + if (totalUpdates < requiredSamples) { + processInitResponse(detector, profilesToCollect, totalUpdates, false, new DetectorProfile.Builder(), listener); + } else { + DetectorProfile.Builder builder = createProfileBuilder(); + createRunningStateAndInitProgress(profilesToCollect, builder); + listener.onResponse(builder.build()); + } + }, exception -> { + // we will get an AnomalyDetectionException wrapping the real exception inside + Throwable cause = Throwables.getRootCause(exception); + + // exception can be a RemoteTransportException + Exception causeException = (Exception) cause; + if (ExceptionUtil + .isException( + causeException, + ResourceNotFoundException.class, + NotSerializedExceptionName.RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE.getName() + ) + || (ExceptionUtil.isIndexNotAvailable(causeException) + && causeException.getMessage().contains(ADCommonName.CHECKPOINT_INDEX_NAME))) { + // cannot find checkpoint + // We don't want to show the estimated time remaining to initialize + // a detector before cold start finishes, where the actual + // initialization time may be much shorter if sufficient historical + // data exists. + processInitResponse(detector, profilesToCollect, 0L, true, createProfileBuilder(), listener); + } else { + logger.error(new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getId()), exception); + listener.onFailure(exception); + } + }); + } + +} diff --git a/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java b/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java index 0748c23d8..bb37434f3 100644 --- a/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java +++ b/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java @@ -30,7 +30,7 @@ import org.opensearch.action.search.SearchResponse; import org.opensearch.ad.ADTaskProfileRunner; import org.opensearch.ad.AbstractProfileRunnerTests; -import org.opensearch.ad.AnomalyDetectorProfileRunner; +import org.opensearch.ad.OldAnomalyDetectorProfileRunner; import org.opensearch.ad.constant.ADCommonName; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.transport.ADProfileAction; @@ -79,7 +79,7 @@ private void setUpMultiEntityClientGet(DetectorStatus detectorStatus, JobStatus return null; }).when(nodeStateManager).getConfig(anyString(), eq(AnalysisType.AD), any(ActionListener.class)); clientUtil = new SecurityClientUtil(nodeStateManager, Settings.EMPTY); - runner = new AnomalyDetectorProfileRunner( + oldRunner = new OldAnomalyDetectorProfileRunner( client, clientUtil, xContentRegistry(), @@ -216,7 +216,7 @@ public void testFailGetEntityStats() throws IOException, InterruptedException { final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertTrue("Should not reach here ", false); inProgressLatch.countDown(); }, exception -> { @@ -238,7 +238,7 @@ public void testNoResultsNoError() throws IOException, InterruptedException { final AtomicInteger called = new AtomicInteger(0); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertTrue(response.getInitProgress() != null); called.getAndIncrement(); }, exception -> { @@ -261,7 +261,7 @@ public void testFailConfirmInitted() throws IOException, InterruptedException { final CountDownLatch inProgressLatch = new CountDownLatch(1); - runner.profile(detector.getId(), ActionListener.wrap(response -> { + oldRunner.profile(detector.getId(), ActionListener.wrap(response -> { assertTrue("Should not reach here ", false); inProgressLatch.countDown(); }, exception -> {