From 258ff1d609609b805117fa9b8292c6c3265f31bd Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" Date: Sat, 31 Aug 2024 00:46:30 +0000 Subject: [PATCH] Fix: Prevent resetting latest flag of real-time analysis when starting historical analysis (#1287) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR addresses a bug where starting a historical analysis after a real-time analysis on the same detector caused the real-time task’s latest flag to be incorrectly reset to false by the historical run. The fix ensures that only the latest flags of the same analysis type are reset: * Real-time analysis will only reset the latest flag of previous real-time analyses. * Historical analysis will only reset the latest flag of previous historical analyses. This PR also updated recencyEmphasis to have a minimum value of 2, aligning with RCF requirements. Testing: - Added an integration test to reproduce the bug and verified the fix. Signed-off-by: Kaituo Li (cherry picked from commit afd5da93011db90122e2b3acd3855e81f50ebb55) Signed-off-by: github-actions[bot] --- build.gradle | 2 + .../org/opensearch/ad/task/ADTaskManager.java | 33 ++---- .../transport/ADHCImputeTransportAction.java | 53 ++++++++- .../forecast/task/ForecastTaskManager.java | 2 +- .../timeseries/ml/RealTimeInferencer.java | 11 -- .../opensearch/timeseries/model/Config.java | 4 +- .../timeseries/task/TaskManager.java | 9 +- .../ad/AbstractADSyntheticDataTest.java | 103 ++++++++++++++++-- .../ad/e2e/AbstractRuleTestCase.java | 2 +- .../java/org/opensearch/ad/e2e/MissingIT.java | 2 +- .../ad/e2e/MixedRealtimeHistoricalIT.java | 74 +++++++++++++ .../ad/model/AnomalyDetectorTests.java | 2 +- .../opensearch/timeseries/TestHelpers.java | 28 +++-- 13 files changed, 263 insertions(+), 62 deletions(-) create mode 100644 src/test/java/org/opensearch/ad/e2e/MixedRealtimeHistoricalIT.java diff --git a/build.gradle b/build.gradle index efcb649f2..b4e2e9f4b 100644 --- a/build.gradle +++ b/build.gradle @@ -732,6 +732,8 @@ List jacocoExclusions = [ 'org.opensearch.ad.task.ADBatchTaskCache', 'org.opensearch.timeseries.ratelimit.RateLimitedRequestWorker', 'org.opensearch.timeseries.util.TimeUtil', + 'org.opensearch.ad.transport.ADHCImputeTransportAction', + 'org.opensearch.timeseries.ml.RealTimeInferencer', ] diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index e68e9bda5..e6aeadb54 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -19,7 +19,6 @@ import static org.opensearch.ad.constant.ADCommonName.DETECTION_STATE_INDEX; import static org.opensearch.ad.indices.ADIndexManagement.ALL_AD_RESULTS_INDEX_PATTERN; import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD; -import static org.opensearch.ad.model.ADTaskType.ALL_HISTORICAL_TASK_TYPES; import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES; import static org.opensearch.ad.model.ADTaskType.REALTIME_TASK_TYPES; import static org.opensearch.ad.settings.AnomalyDetectorSettings.AD_REQUEST_TIMEOUT; @@ -1881,21 +1880,6 @@ private void maintainRunningHistoricalTask(ConcurrentLinkedQueue taskQue }, TimeValue.timeValueSeconds(DEFAULT_MAINTAIN_INTERVAL_IN_SECONDS), AD_BATCH_TASK_THREAD_POOL_NAME); } - /** - * Get list of task types. - * 1. If date range is null, will return all realtime task types - * 2. If date range is not null, will return all historical detector level tasks types - * if resetLatestTaskStateFlag is true; otherwise return all historical tasks types include - * HC entity level task type. - * @param dateRange detection date range - * @param resetLatestTaskStateFlag reset latest task state or not - * @return list of AD task types - */ - protected List getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag) { - // AD does not support run once - return getTaskTypes(dateRange, resetLatestTaskStateFlag, false); - } - @Override protected BiCheckedFunction getTaskParser() { return ADTask::parse; @@ -1912,17 +1896,20 @@ public void createRunOnceTaskAndCleanupStaleTasks( throw new UnsupportedOperationException("AD has no run once yet"); } + /** + * Get list of task types. + * 1. If date range is null, will return all realtime task types + * 2. If date range is not null, will return all historical detector level tasks types + * + * @param dateRange detection date range + * @return list of AD task types + */ @Override - public List getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, boolean runOnce) { + public List getTaskTypes(DateRange dateRange, boolean runOnce) { if (dateRange == null) { return REALTIME_TASK_TYPES; } else { - if (resetLatestTaskStateFlag) { - // return all task types include HC entity task to make sure we can reset all tasks latest flag - return ALL_HISTORICAL_TASK_TYPES; - } else { - return HISTORICAL_DETECTOR_TASK_TYPES; - } + return HISTORICAL_DETECTOR_TASK_TYPES; } } diff --git a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java index 32c44d03e..08973296a 100644 --- a/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADHCImputeTransportAction.java @@ -19,6 +19,7 @@ import org.opensearch.action.support.nodes.TransportNodesAction; import org.opensearch.ad.caching.ADCacheProvider; import org.opensearch.ad.ml.ADRealTimeInferencer; +import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.common.io.stream.StreamInput; @@ -26,6 +27,7 @@ import org.opensearch.timeseries.AnalysisType; import org.opensearch.timeseries.NodeStateManager; import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; +import org.opensearch.timeseries.cluster.HashRing; import org.opensearch.timeseries.ml.ModelState; import org.opensearch.timeseries.ml.Sample; import org.opensearch.timeseries.model.Config; @@ -69,6 +71,7 @@ public class ADHCImputeTransportAction extends private ADCacheProvider cache; private NodeStateManager nodeStateManager; private ADRealTimeInferencer adInferencer; + private HashRing hashRing; @Inject public ADHCImputeTransportAction( @@ -78,7 +81,8 @@ public ADHCImputeTransportAction( ActionFilters actionFilters, ADCacheProvider priorityCache, NodeStateManager nodeStateManager, - ADRealTimeInferencer adInferencer + ADRealTimeInferencer adInferencer, + HashRing hashRing ) { super( ADHCImputeAction.NAME, @@ -94,6 +98,7 @@ public ADHCImputeTransportAction( this.cache = priorityCache; this.nodeStateManager = nodeStateManager; this.adInferencer = adInferencer; + this.hashRing = hashRing; } @Override @@ -131,9 +136,7 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest long executionEndTime = dataEndMillis + windowDelayMillis; String taskId = nodeRequest.getRequest().getTaskId(); for (ModelState modelState : cache.get().getAllModels(configId)) { - // execution end time (when job starts execution in this interval) >= last used time => the model state is updated in - // previous intervals - if (executionEndTime >= modelState.getLastUsedTime().toEpochMilli()) { + if (shouldProcessModelState(modelState, executionEndTime, clusterService, hashRing)) { double[] nanArray = new double[featureSize]; Arrays.fill(nanArray, Double.NaN); adInferencer @@ -156,4 +159,46 @@ protected ADHCImputeNodeResponse nodeOperation(ADHCImputeNodeRequest nodeRequest } } + /** + * Determines whether the model state should be processed based on various conditions. + * + * Conditions checked: + * - The model's last seen execution end time is not the minimum Instant value. + * - The current execution end time is greater than or equal to the model's last seen execution end time, + * indicating that the model state was updated in previous intervals. + * - The entity associated with the model state is present. + * - The owning node for real-time processing of the entity, with the same local version, is present in the hash ring. + * - The owning node for real-time processing matches the current local node. + * + * This method helps avoid processing model states that were already handled in previous intervals. The conditions + * ensure that only the relevant model states are processed while accounting for scenarios where processing can occur + * concurrently (e.g., during tests when multiple threads may operate quickly). + * + * @param modelState The current state of the model. + * @param executionEndTime The end time of the current execution interval. + * @param clusterService The service providing information about the current cluster node. + * @param hashRing The hash ring used to determine the owning node for real-time processing of entities. + * @return true if the model state should be processed; otherwise, false. + */ + private boolean shouldProcessModelState( + ModelState modelState, + long executionEndTime, + ClusterService clusterService, + HashRing hashRing + ) { + // Get the owning node for the entity in real-time processing from the hash ring + Optional owningNode = modelState.getEntity().isPresent() + ? hashRing.getOwningNodeWithSameLocalVersionForRealtime(modelState.getEntity().get().toString()) + : Optional.empty(); + + // Check if the model state conditions are met for processing + // We cannot use last used time as it will be updated whenever we update its priority in CacheBuffer.update when there is a + // PriorityCache.get. + return modelState.getLastSeenExecutionEndTime() != Instant.MIN + && executionEndTime >= modelState.getLastSeenExecutionEndTime().toEpochMilli() + && modelState.getEntity().isPresent() + && owningNode.isPresent() + && owningNode.get().getId().equals(clusterService.localNode().getId()); + } + } diff --git a/src/main/java/org/opensearch/forecast/task/ForecastTaskManager.java b/src/main/java/org/opensearch/forecast/task/ForecastTaskManager.java index bc2c63002..2f51f544f 100644 --- a/src/main/java/org/opensearch/forecast/task/ForecastTaskManager.java +++ b/src/main/java/org/opensearch/forecast/task/ForecastTaskManager.java @@ -459,7 +459,7 @@ public void createRunOnceTaskAndCleanupStaleTasks( } @Override - public List getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, boolean runOnce) { + public List getTaskTypes(DateRange dateRange, boolean runOnce) { if (runOnce) { return ForecastTaskType.RUN_ONCE_TASK_TYPES; } else { diff --git a/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java b/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java index 30b1a79a7..7a7d11630 100644 --- a/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java +++ b/src/main/java/org/opensearch/timeseries/ml/RealTimeInferencer.java @@ -5,7 +5,6 @@ package org.opensearch.timeseries.ml; -import java.time.Instant; import java.util.Collections; import java.util.Locale; import java.util.Map; @@ -135,16 +134,6 @@ private boolean processWithTimeout( } private boolean tryProcess(Sample sample, ModelState modelState, Config config, String taskId, long curExecutionEnd) { - // execution end time (when job starts execution in this interval) >= last seen execution end time => the model state is updated in - // previous intervals - // This branch being true can happen while scheduled to waiting some other threads have already scored the same interval - // (e.g., during tests when everything happens fast) - // We cannot use last used time as it will be updated whenever we update its priority in CacheBuffer.update when there is a - // PriorityCache.get. - if (modelState.getLastSeenExecutionEndTime() != Instant.MIN - && curExecutionEnd < modelState.getLastSeenExecutionEndTime().toEpochMilli()) { - return false; - } String modelId = modelState.getModelId(); try { RCFResultType result = modelManager.getResult(sample, modelState, modelId, config, taskId); diff --git a/src/main/java/org/opensearch/timeseries/model/Config.java b/src/main/java/org/opensearch/timeseries/model/Config.java index ace9525f3..8c0586cde 100644 --- a/src/main/java/org/opensearch/timeseries/model/Config.java +++ b/src/main/java/org/opensearch/timeseries/model/Config.java @@ -193,9 +193,9 @@ protected Config( return; } - if (recencyEmphasis != null && (recencyEmphasis <= 0)) { + if (recencyEmphasis != null && recencyEmphasis <= 1) { issueType = ValidationIssueType.RECENCY_EMPHASIS; - errorMessage = "recency emphasis has to be a positive integer"; + errorMessage = "Recency emphasis must be an integer greater than 1."; return; } diff --git a/src/main/java/org/opensearch/timeseries/task/TaskManager.java b/src/main/java/org/opensearch/timeseries/task/TaskManager.java index 7424ffb13..3dbe9a339 100644 --- a/src/main/java/org/opensearch/timeseries/task/TaskManager.java +++ b/src/main/java/org/opensearch/timeseries/task/TaskManager.java @@ -349,7 +349,10 @@ public void updateLatestFlagOfOldTasksAndCreateNewTask( query.filter(new TermQueryBuilder(configIdFieldName, config.getId())); query.filter(new TermQueryBuilder(TimeSeriesTask.IS_LATEST_FIELD, true)); // make sure we reset all latest task as false when user switch from single entity to HC, vice versa. - query.filter(new TermsQueryBuilder(TimeSeriesTask.TASK_TYPE_FIELD, taskTypeToString(getTaskTypes(dateRange, true, runOnce)))); + // Ensures that only the latest flags of the same analysis type are reset: + // Real-time analysis will only reset the latest flag of previous real-time analyses. + // Historical analysis will only reset the latest flag of previous historical analyses. + query.filter(new TermsQueryBuilder(TimeSeriesTask.TASK_TYPE_FIELD, taskTypeToString(getTaskTypes(dateRange, runOnce)))); updateByQueryRequest.setQuery(query); updateByQueryRequest.setRefresh(true); String script = String.format(Locale.ROOT, "ctx._source.%s=%s;", TimeSeriesTask.IS_LATEST_FIELD, false); @@ -432,7 +435,7 @@ public void getAndExecuteOnLatestConfigTask( } public List getTaskTypes(DateRange dateRange) { - return getTaskTypes(dateRange, false, false); + return getTaskTypes(dateRange, false); } /** @@ -1081,5 +1084,5 @@ public abstract void createRunOnceTaskAndCleanupStaleTasks( ActionListener listener ); - public abstract List getTaskTypes(DateRange dateRange, boolean resetLatestTaskStateFlag, boolean runOnce); + public abstract List getTaskTypes(DateRange dateRange, boolean runOnce); } diff --git a/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java index f840bce73..01a353cf0 100644 --- a/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java +++ b/src/test/java/org/opensearch/ad/AbstractADSyntheticDataTest.java @@ -19,7 +19,9 @@ import java.nio.charset.Charset; import java.time.Duration; import java.time.Instant; +import java.time.ZonedDateTime; import java.time.format.DateTimeFormatter; +import java.time.format.DateTimeParseException; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.List; @@ -56,20 +58,22 @@ protected static class TrainResult { // last data time in data public Instant finalDataTime; - public TrainResult(String detectorId, List data, int rawDataTrainTestSplit, Duration windowDelay, Instant trainTime) { + public TrainResult( + String detectorId, + List data, + int rawDataTrainTestSplit, + Duration windowDelay, + Instant trainTime, + String timeStampField + ) { this.detectorId = detectorId; this.data = data; this.rawDataTrainTestSplit = rawDataTrainTestSplit; this.windowDelay = windowDelay; this.trainTime = trainTime; - this.firstDataTime = getDataTime(0); - this.finalDataTime = getDataTime(data.size() - 1); - } - - private Instant getDataTime(int index) { - String finalTimeStr = data.get(index).get("timestamp").getAsString(); - return Instant.ofEpochMilli(Long.parseLong(finalTimeStr)); + this.firstDataTime = getDataTimeOfEpochMillis(timeStampField, data, 0); + this.finalDataTime = getDataTimeOfEpochMillis(timeStampField, data, data.size() - 1); } } @@ -689,4 +693,87 @@ public static boolean areDoublesEqual(double d1, double d2) { public interface ConditionChecker { boolean checkCondition(JsonArray hits, int expectedSize); } + + protected static Instant getDataTimeOfEpochMillis(String timestampField, List data, int index) { + String finalTimeStr = data.get(index).get(timestampField).getAsString(); + return Instant.ofEpochMilli(Long.parseLong(finalTimeStr)); + } + + protected static Instant getDataTimeofISOFormat(String timestampField, List data, int index) { + String finalTimeStr = data.get(index).get(timestampField).getAsString(); + + try { + // Attempt to parse as an ISO 8601 formatted string (e.g., "2019-11-01T00:00:00Z") + ZonedDateTime zonedDateTime = ZonedDateTime.parse(finalTimeStr, DateTimeFormatter.ISO_DATE_TIME); + return zonedDateTime.toInstant(); + } catch (DateTimeParseException ex) { + throw new IllegalArgumentException("Invalid timestamp format: " + finalTimeStr, ex); + } + } + + protected List getTasks(String detectorId, int size, ConditionChecker checker, RestClient client) + throws InterruptedException { + Request request = new Request("POST", "/_plugins/_anomaly_detection/detectors/tasks/_search"); + + String jsonTemplate = "{\n" + + " \"size\": %d,\n" + + " \"query\": {\n" + + " \"bool\": {\n" + + " \"filter\": [\n" + + " {\n" + + " \"term\": {\n" + + " \"detector_id\": \"%s\"\n" + + " }\n" + + " }\n" + + " ]\n" + + " }\n" + + " }\n" + + "}"; + + // try to get size + 10 results if there are that many + String formattedJson = String.format(Locale.ROOT, jsonTemplate, size + 10, detectorId); + + request.setJsonEntity(formattedJson); + + // wait until results are available + // max wait for 60_000 milliseconds + int maxWaitCycles = 30; + do { + try { + JsonArray hits = getHits(client, request); + if (hits != null && checker.checkCondition(hits, size)) { + List res = new ArrayList<>(); + for (int i = 0; i < hits.size(); i++) { + JsonObject source = hits.get(i).getAsJsonObject().get("_source").getAsJsonObject(); + res.add(source); + } + + return res; + } else { + LOG.info("wait for result, previous result: {}, size: {}", hits, hits.size()); + } + Thread.sleep(2_000 * size); + } catch (Exception e) { + LOG.warn("Exception while waiting for result", e); + Thread.sleep(2_000 * size); + } + } while (maxWaitCycles-- >= 0); + + // leave some debug information before returning empty + try { + String matchAll = "{\n" + " \"size\": 1000,\n" + " \"query\": {\n" + " \"match_all\": {}\n" + " }\n" + "}"; + request.setJsonEntity(matchAll); + JsonArray hits = getHits(client, request); + LOG.info("Query: {}", formattedJson); + LOG.info("match all result: {}", hits); + } catch (Exception e) { + LOG.warn("Exception while waiting for match all result", e); + } + + return new ArrayList<>(); + } + + protected static boolean getLatest(List data, int index) { + return data.get(index).get("is_latest").getAsBoolean(); + } } diff --git a/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java index bcc2468cd..030f98d20 100644 --- a/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java +++ b/src/test/java/org/opensearch/ad/e2e/AbstractRuleTestCase.java @@ -148,7 +148,7 @@ protected TrainResult ingestTrainData( long windowDelayMinutes = Duration.between(trainTime, Instant.now()).toMinutes(); Duration windowDelay = Duration.ofMinutes(windowDelayMinutes); - return new TrainResult(null, data, rawDataTrainTestSplit, windowDelay, trainTime); + return new TrainResult(null, data, rawDataTrainTestSplit, windowDelay, trainTime, "timestamp"); } public Map>> getAnomalyWindowsMap(String labelFileName) throws Exception { diff --git a/src/test/java/org/opensearch/ad/e2e/MissingIT.java b/src/test/java/org/opensearch/ad/e2e/MissingIT.java index 21f216819..f05afc5ee 100644 --- a/src/test/java/org/opensearch/ad/e2e/MissingIT.java +++ b/src/test/java/org/opensearch/ad/e2e/MissingIT.java @@ -126,7 +126,7 @@ protected TrainResult createDetector( String detectorId = createDetector(client, detector); LOG.info("Created detector {}", detectorId); - return new TrainResult(detectorId, data, trainTestSplit * numberOfEntities, windowDelay, trainTime); + return new TrainResult(detectorId, data, trainTestSplit * numberOfEntities, windowDelay, trainTime, "timestamp"); } protected Duration getWindowDelay(long trainTimeMillis) { diff --git a/src/test/java/org/opensearch/ad/e2e/MixedRealtimeHistoricalIT.java b/src/test/java/org/opensearch/ad/e2e/MixedRealtimeHistoricalIT.java new file mode 100644 index 000000000..dbb049b4e --- /dev/null +++ b/src/test/java/org/opensearch/ad/e2e/MixedRealtimeHistoricalIT.java @@ -0,0 +1,74 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.ad.e2e; + +import java.util.List; +import java.util.Locale; + +import org.opensearch.ad.AbstractADSyntheticDataTest; + +import com.google.gson.JsonObject; + +/** + * Test if real time and historical run together, historical won't reset real time's latest flag + * + */ +public class MixedRealtimeHistoricalIT extends AbstractADSyntheticDataTest { + + public void testMixed() throws Exception { + String datasetName = "synthetic"; + String dataFileName = String.format(Locale.ROOT, "data/%s.data", datasetName); + int intervalsToWait = 3; + + List data = getData(dataFileName); + + String mapping = "{ \"mappings\": { \"properties\": { \"timestamp\": { \"type\": \"date\"}," + + " \"Feature1\": { \"type\": \"double\" }, \"Feature2\": { \"type\": \"double\" } } } }"; + int trainTestSplit = 1500; + // train data plus a few data points for real time inference + int totalDataToIngest = trainTestSplit + intervalsToWait + 3; + bulkIndexTrainData(datasetName, data, totalDataToIngest, client(), mapping); + + long windowDelayMinutes = getWindowDelayMinutes(data, trainTestSplit - 1, "timestamp"); + int intervalMinutes = 1; + + // single-stream detector can use window delay 0 here because we give the run api the actual data time + String detector = String + .format( + Locale.ROOT, + "{ \"name\": \"test\", \"description\": \"test\", \"time_field\": \"timestamp\"" + + ", \"indices\": [\"%s\"], \"feature_attributes\": [{ \"feature_name\": \"feature 1\", \"feature_enabled\": " + + "\"true\", \"aggregation_query\": { \"Feature1\": { \"sum\": { \"field\": \"Feature1\" } } } }, { \"feature_name\"" + + ": \"feature 2\", \"feature_enabled\": \"true\", \"aggregation_query\": { \"Feature2\": { \"sum\": { \"field\": " + + "\"Feature2\" } } } }], \"detection_interval\": { \"period\": { \"interval\": %d, \"unit\": \"Minutes\" } }, " + + "\"window_delay\": { \"period\": {\"interval\": %d, \"unit\": \"MINUTES\"}}," + + "\"schema_version\": 0 }", + datasetName, + intervalMinutes, + windowDelayMinutes + ); + String detectorId = createDetector(client(), detector); + + startDetector(detectorId, client()); + + startHistorical( + detectorId, + getDataTimeofISOFormat("timestamp", data, 0), + getDataTimeofISOFormat("timestamp", data, totalDataToIngest), + client(), + 1 + ); + + int size = 2; + List results = getTasks(detectorId, size, (h, eSize) -> h.size() >= eSize, client()); + + assertEquals(String.format(Locale.ROOT, "Expected %d, but got %d", size, results.size()), size, results.size()); + for (int i = 0; i < size; i++) { + assert (getLatest(results, i)); + } + } + +} diff --git a/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java b/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java index d88558ab3..39e515708 100644 --- a/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java +++ b/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java @@ -605,7 +605,7 @@ public void testInvalidRecency() { null ) ); - assertEquals("recency emphasis has to be a positive integer", exception.getMessage()); + assertEquals("Recency emphasis must be an integer greater than 1.", exception.getMessage()); } public void testInvalidDetectionInterval() { diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index f44897ed0..2c75febc0 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -331,7 +331,9 @@ public static AnomalyDetector randomAnomalyDetector( user, null, TestHelpers.randomImputationOption(features), - randomIntBetween(1, 10000), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), randomIntBetween(1, TimeSeriesSettings.MAX_SHINGLE_SIZE * 2), randomIntBetween(1, 1000), new ArrayList(), @@ -384,7 +386,9 @@ public static AnomalyDetector randomDetector( null, resultIndex, TestHelpers.randomImputationOption(features), - randomIntBetween(1, 10000), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), randomIntBetween(1, 1000), null, @@ -448,7 +452,9 @@ public static AnomalyDetector randomAnomalyDetectorUsingCategoryFields( randomUser(), resultIndex, TestHelpers.randomImputationOption(features), - randomIntBetween(1, 10000), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), randomIntBetween(1, 1000), null, @@ -487,7 +493,9 @@ public static AnomalyDetector randomAnomalyDetector(String timefield, String ind randomUser(), null, TestHelpers.randomImputationOption(features), - randomIntBetween(1, 10000), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), randomIntBetween(1, 1000), null, @@ -518,7 +526,9 @@ public static AnomalyDetector randomAnomalyDetectorWithEmptyFeature() throws IOE randomUser(), null, null, - randomIntBetween(1, 10000), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), randomIntBetween(1, 1000), null, @@ -556,7 +566,9 @@ public static AnomalyDetector randomAnomalyDetectorWithInterval(TimeConfiguratio randomUser(), null, TestHelpers.randomImputationOption(featureList), - randomIntBetween(1, 10000), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), randomIntBetween(1, 1000), null, @@ -769,7 +781,9 @@ public static AnomalyDetector randomAnomalyDetectorWithInterval(TimeConfiguratio randomUser(), null, TestHelpers.randomImputationOption(features), - randomIntBetween(1, 10000), + // timeDecay (reverse of recencyEmphasis) should be less than 1. + // so we start with 2. + randomIntBetween(2, 10000), randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), randomIntBetween(1, 1000), null,