Skip to content

Commit

Permalink
Update RCF to v3.8 and Enable Auto AD with 'Alert Once' Option (#979)
Browse files Browse the repository at this point in the history
* Update RCF to v3.8 and Enable Auto AD with 'Alert Once' Option

This PR added support for automatic Anomaly Detection (AD) and the 'Alert Once' option introduced in RCF 3.8.

Testing done:

1. Deserialization Test:
* Verified model deserialization from 3.0-rc3.
* Ensured consistent scoring using the rc3 checkpoint and rc3 dependency on identical data.

2. Backward Compatibility Test:
* Executed a mixed cluster with versions 2.10 and 3.0.
* Validated that older detectors still produce results without throwing any exceptions in a blue-green simulation scenario.

Signed-off-by: Kaituo Li <kaituo@amazon.com>

* reduce recall since alertOnce reduced recall

Signed-off-by: Kaituo Li <kaituo@amazon.com>

* remove commented out code

Signed-off-by: Kaituo Li <kaituo@amazon.com>

---------

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Aug 15, 2023
1 parent cadc9bb commit 0c5b4b9
Show file tree
Hide file tree
Showing 12 changed files with 380 additions and 24 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ dependencies {
implementation group: 'com.yahoo.datasketches', name: 'memory', version: '0.12.2'
implementation group: 'commons-lang', name: 'commons-lang', version: '2.6'
implementation group: 'org.apache.commons', name: 'commons-pool2', version: '2.10.0'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.0-rc3'
implementation 'software.amazon.randomcutforest:randomcutforest-serialization:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-parkservices:3.8.0'
implementation 'software.amazon.randomcutforest:randomcutforest-core:3.8.0'

// we inherit jackson-core from opensearch core
implementation "com.fasterxml.jackson.core:jackson-databind:2.14.1"
Expand Down
8 changes: 5 additions & 3 deletions src/main/java/org/opensearch/ad/ml/EntityColdStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.opensearch.timeseries.util.ExceptionUtil;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
Expand Down Expand Up @@ -375,17 +376,18 @@ private void trainModelFromDataSegments(
// overlapping x3, x4, and only store x5, x6.
.shingleSize(shingleSize)
.internalShinglingEnabled(true)
.anomalyRate(1 - this.thresholdMinPvalue);
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true);

if (rcfSeed > 0) {
rcfBuilder.randomSeed(rcfSeed);
}
ThresholdedRandomCutForest trcf = new ThresholdedRandomCutForest(rcfBuilder);

while (!dataPoints.isEmpty()) {
trcf.process(dataPoints.poll(), 0);
}

EntityModel model = entityState.getModel();
if (model == null) {
model = new EntityModel(entity, new ArrayDeque<>(), null);
Expand Down
9 changes: 9 additions & 0 deletions src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.AnomalyDescriptor;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand Down Expand Up @@ -532,6 +533,10 @@ private void trainModelForStep(
.boundingBoxCacheFraction(TimeSeriesSettings.REAL_TIME_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(detector.getShingleSize())
.anomalyRate(1 - thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();
Arrays.stream(dataPoints).forEach(s -> trcf.process(s, 0));

Expand Down Expand Up @@ -622,6 +627,10 @@ public List<ThresholdingResult> getPreviewResults(double[][] dataPoints, int shi
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - this.thresholdMinPvalue)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();
return Arrays.stream(dataPoints).map(point -> {
AnomalyDescriptor descriptor = trcf.process(point, 0);
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/opensearch/ad/task/ADBatchTaskCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.timeseries.model.Entity;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

/**
Expand Down Expand Up @@ -80,6 +81,10 @@ protected ADBatchTaskCache(ADTask adTask) {
.boundingBoxCacheFraction(AnomalyDetectorSettings.BATCH_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.anomalyRate(1 - AnomalyDetectorSettings.THRESHOLD_MIN_PVALUE)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.internalShinglingEnabled(false)
.build();

this.thresholdModelTrained = false;
Expand Down
31 changes: 31 additions & 0 deletions src/test/java/org/opensearch/ad/MemoryTrackerTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.opensearch.timeseries.settings.TimeSeriesSettings;

import com.amazon.randomcutforest.config.Precision;
import com.amazon.randomcutforest.config.TransformMethod;
import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

public class MemoryTrackerTests extends OpenSearchTestCase {
Expand Down Expand Up @@ -109,6 +110,9 @@ public void setUp() throws Exception {
.boundingBoxCacheFraction(TimeSeriesSettings.REAL_TIME_BOUNDING_BOX_CACHE_RATIO)
.shingleSize(shingleSize)
.internalShinglingEnabled(true)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();

detector = mock(AnomalyDetector.class);
Expand Down Expand Up @@ -152,6 +156,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(shingleSize)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(603708, tracker.estimateTRCFModelSize(rcf2));
assertTrue(tracker.isHostingAllowed(detectorId, rcf2));
Expand All @@ -171,6 +178,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(false)
// same with dimension for opportunistic memory saving
.shingleSize(1)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1685208, tracker.estimateTRCFModelSize(rcf3));

Expand All @@ -188,6 +198,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(1)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(521304, tracker.estimateTRCFModelSize(rcf4));

Expand All @@ -205,6 +218,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(2)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(467340, tracker.estimateTRCFModelSize(rcf5));

Expand All @@ -222,6 +238,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(4)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(603676, tracker.estimateTRCFModelSize(rcf6));

Expand All @@ -239,6 +258,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(16)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(401481, tracker.estimateTRCFModelSize(rcf7));

Expand All @@ -256,6 +278,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(32)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1040432, tracker.estimateTRCFModelSize(rcf8));

Expand All @@ -273,6 +298,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(64)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
assertEquals(1040688, tracker.estimateTRCFModelSize(rcf9));

Expand All @@ -290,6 +318,9 @@ public void testEstimateModelSize() {
.internalShinglingEnabled(true)
// same with dimension for opportunistic memory saving
.shingleSize(65)
.transformMethod(TransformMethod.NORMALIZE)
.alertOnce(true)
.autoAdjust(true)
.build();
expectThrows(IllegalArgumentException.class, () -> tracker.estimateTRCFModelSize(rcf10));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public void testDataset() throws Exception {
// TODO: this test case will run for a much longer time and timeout with security enabled
if (!isHttps()) {
disableResourceNotFoundFaultTolerence();
verifyAnomaly("synthetic", 1, 1500, 8, .4, .9, 10);
verifyAnomaly("synthetic", 1, 1500, 8, .4, .7, 10);
}
}

Expand Down Expand Up @@ -96,7 +96,7 @@ private void verifyTestResults(

// recall = windows containing predicted anomaly points / total anomaly windows
double recall = anomalies.size() > 0 ? positiveAnomalies / anomalies.size() : 1;
assertTrue(recall >= minRecall);
assertTrue(String.format(Locale.ROOT, "recall should be %f but got %f", recall, minRecall), recall >= minRecall);

assertTrue(errors <= maxError);
LOG.info("Precision: {}, Window recall: {}", precision, recall);
Expand Down
Loading

0 comments on commit 0c5b4b9

Please sign in to comment.