Skip to content

Commit

Permalink
Refactor: Reorganize code for Forecasting and AnomalyDetector packages (
Browse files Browse the repository at this point in the history
#925)

* Refactor: Reorganize code for Forecasting and AnomalyDetector packages

This PR includes several refactorings to increase code reuse and improve readability:

1. Entity, FeatureData, and RestHandlerUtils have been moved from the 'ad' package to the 'timeseries' package, enabling their use within the Forecasting code.
2. A new class, DataByFeatureId, has been created to hold shared code previously located in FeatureData. FeatureData now inherits from DataByFeatureId. The standalone usage of DataByFeatureId will be detailed in subsequent PRs.
3. Renamed checkAnomalyDetectorFeaturesSyntax to checkFeaturesSyntax in RestHandlerUtils to make it more generic and usable for Forecasting.
4. Constants AGG_NAME_MAX_TIME, AGG_NAME_MIN_TIME, DATE_HISTOGRAM, and FEATURE_AGGS have been moved from ADCommonName to CommonName to make them accessible for Forecasting.
5. A new method, parseConfig, has been added to the Config class. This method can parse configuration for either AnomalyDetector or Forecaster based on input.

Testing:
The changes have been validated with a successful gradle build.

Signed-off-by: Kaituo Li <kaituo@amazon.com>

* address Amit and Owais's comments

Signed-off-by: Kaituo Li <kaituo@amazon.com>

---------

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Jun 14, 2023
1 parent 02c120f commit fce78c2
Show file tree
Hide file tree
Showing 117 changed files with 474 additions and 259 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@
import static org.opensearch.action.DocWriteResponse.Result.CREATED;
import static org.opensearch.action.DocWriteResponse.Result.UPDATED;
import static org.opensearch.ad.AnomalyDetectorPlugin.AD_THREAD_POOL_NAME;
import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE;

import java.io.IOException;
import java.time.Instant;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/AnomalyDetectorRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.EntityAnomalyResult;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.model.FeatureData;

/**
* Runner to trigger an anomaly detector.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/EntityProfileRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.EntityProfile;
import org.opensearch.ad.model.EntityProfileName;
import org.opensearch.ad.model.EntityState;
Expand All @@ -56,6 +55,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down Expand Up @@ -460,7 +460,7 @@ private SearchRequest createLastSampleTimeRequest(String detectorId, long enable

SearchSourceBuilder source = new SearchSourceBuilder()
.query(boolQueryBuilder)
.aggregation(AggregationBuilders.max(ADCommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD))
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(CommonName.EXECUTION_END_TIME_FIELD))
.trackTotalHits(false)
.size(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfileName;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.ad.task.ADTaskCacheManager;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultResponse;
Expand All @@ -49,6 +48,7 @@
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;

public class ExecuteADResultResponseRecorder {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/caching/EntityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.opensearch.ad.ml.EntityModel;
import org.opensearch.ad.ml.ModelState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.timeseries.model.Entity;

public interface EntityCache extends MaintenanceState, CleanState, DetectorModelSize {
/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/caching/PriorityCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.opensearch.ad.ml.ModelManager.ModelType;
import org.opensearch.ad.ml.ModelState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.ModelProfile;
import org.opensearch.ad.ratelimit.CheckpointMaintainWorker;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
Expand All @@ -62,6 +61,7 @@
import org.opensearch.timeseries.common.exception.LimitExceededException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.settings.TimeSeriesSettings;

import com.google.common.cache.Cache;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import static org.opensearch.ad.model.ADTask.TASK_TYPE_FIELD;
import static org.opensearch.ad.model.ADTaskType.taskTypeToString;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_DETECTOR_UPPER_LIMIT;
import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.ad.util.RestHandlerUtils.createXContentParserFromRegistry;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.XCONTENT_WITH_TYPE;
import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry;

import java.io.IOException;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public class ADCommonMessages {
public static String HISTORICAL_ANALYSIS_CANCELLED = "Historical analysis cancelled by user";
public static String HC_DETECTOR_TASK_IS_UPDATING = "HC detector task is updating";
public static String INVALID_TIME_CONFIGURATION_UNITS = "Time unit %s is not supported";
public static String DUPLICATE_FEATURE_AGGREGATION_NAMES = "Detector has duplicate feature aggregation query names: ";
public static String FAIL_TO_GET_DETECTOR = "Fail to get detector";
public static String FAIL_TO_GET_DETECTOR_INFO = "Fail to get detector info";
public static String FAIL_TO_CREATE_DETECTOR = "Fail to create detector";
Expand Down
12 changes: 0 additions & 12 deletions src/main/java/org/opensearch/ad/constant/ADCommonName.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,18 +76,6 @@ public class ADCommonName {
public static final String AD_TASK_REMOTE = "ad_task_remote";
public static final String CANCEL_TASK = "cancel_task";

// ======================================
// Query
// ======================================
// Used in finding the max timestamp
public static final String AGG_NAME_MAX_TIME = "max_timefield";
// Used in finding the min timestamp
public static final String AGG_NAME_MIN_TIME = "min_timefield";
// date histogram aggregation name
public static final String DATE_HISTOGRAM = "date_histogram";
// feature aggregation name
public static final String FEATURE_AGGS = "feature_aggs";

// ======================================
// Used in stats API
// ======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
Expand All @@ -47,6 +46,7 @@
import org.opensearch.search.aggregations.bucket.composite.CompositeAggregationBuilder;
import org.opensearch.search.aggregations.bucket.composite.TermsValuesSourceBuilder;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.Feature;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@
import org.opensearch.action.support.ThreadedActionListener;
import org.opensearch.ad.CleanState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.model.Entity;

/**
* A facade managing feature data operations and buffers.
Expand Down
9 changes: 4 additions & 5 deletions src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
package org.opensearch.ad.feature;

import static org.apache.commons.math3.linear.MatrixUtils.createRealMatrix;
import static org.opensearch.ad.constant.ADCommonName.DATE_HISTOGRAM;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_ENTITIES_FOR_PREVIEW;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.PAGE_SIZE;
import static org.opensearch.ad.settings.AnomalyDetectorSettings.PREVIEW_TIMEOUT_IN_MILLIS;
Expand Down Expand Up @@ -40,9 +39,7 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -69,7 +66,9 @@
import org.opensearch.search.sort.FieldSortBuilder;
import org.opensearch.search.sort.SortOrder;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down Expand Up @@ -169,7 +168,7 @@ public SearchFeatureDao(
*/
public void getLatestDataTime(AnomalyDetector detector, ActionListener<Optional<Long>> listener) {
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
.aggregation(AggregationBuilders.max(ADCommonName.AGG_NAME_MAX_TIME).field(detector.getTimeField()))
.aggregation(AggregationBuilders.max(CommonName.AGG_NAME_MAX_TIME).field(detector.getTimeField()))
.size(0);
SearchRequest searchRequest = new SearchRequest().indices(detector.getIndices().toArray(new String[0])).source(searchSourceBuilder);
final ActionListener<SearchResponse> searchResponseListener = ActionListener
Expand Down Expand Up @@ -569,7 +568,7 @@ private Map<Long, Optional<double[]>> parseBucketAggregationResponse(SearchRespo
List<InternalComposite.InternalBucket> buckets = ((InternalComposite) agg).getBuckets();
buckets.forEach(bucket -> {
Optional<double[]> featureData = parseAggregations(Optional.ofNullable(bucket.getAggregations()), featureIds);
dataPoints.put((Long) bucket.getKey().get(DATE_HISTOGRAM), featureData);
dataPoints.put((Long) bucket.getKey().get(CommonName.DATE_HISTOGRAM), featureData);
});
}
return dataPoints;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/CheckpointDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.client.Client;
import org.opensearch.index.IndexNotFoundException;
Expand All @@ -69,6 +68,7 @@
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;

import com.amazon.randomcutforest.RandomCutForest;
import com.amazon.randomcutforest.config.Precision;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/EntityColdStarter.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.feature.SearchFeatureDao;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.ratelimit.CheckpointWriteWorker;
import org.opensearch.ad.ratelimit.RequestPriority;
import org.opensearch.ad.settings.ADEnabledSetting;
Expand All @@ -55,6 +54,7 @@
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.settings.TimeSeriesSettings;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/EntityModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import java.util.Optional;
import java.util.Queue;

import org.opensearch.ad.model.Entity;
import org.opensearch.timeseries.model.Entity;

import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/ml/ModelManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.settings.TimeSeriesSettings;

import com.amazon.randomcutforest.RandomCutForest;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/org/opensearch/ad/ml/ThresholdingResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.commons.lang.builder.ToStringBuilder;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.FeatureData;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.FeatureData;

/**
* Data object containing thresholding results.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.model.Entity;

/**
* HC detector's entity task profile.
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/ad/model/ADTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.annotation.Generated;
import org.opensearch.timeseries.model.DateRange;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.util.ParseUtils;

import com.google.common.base.Objects;
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/org/opensearch/ad/model/AnomalyResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.opensearch.core.xcontent.XContentParser;
import org.opensearch.timeseries.annotation.Generated;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.util.ParseUtils;

import com.google.common.base.Objects;
Expand Down
1 change: 1 addition & 0 deletions src/main/java/org/opensearch/ad/model/ModelProfile.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.xcontent.ToXContentObject;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.Entity;

/**
* Used to show model information in profile API
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
import org.opensearch.ad.ml.ThresholdingResult;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.stats.ADStats;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -56,6 +55,7 @@
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.common.exception.EndRunException;
import org.opensearch.timeseries.constant.CommonMessages;
import org.opensearch.timeseries.model.Entity;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.util.ParseUtils;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.opensearch.ad.ratelimit;

import org.opensearch.ad.model.Entity;
import org.opensearch.timeseries.model.Entity;

public class EntityFeatureRequest extends EntityRequest {
private final double[] currentFeature;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@

import java.util.Optional;

import org.opensearch.ad.model.Entity;
import org.opensearch.timeseries.model.Entity;

public class EntityRequest extends QueuedRequest {
private final Entity entity;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@

package org.opensearch.ad.rest;

import static org.opensearch.ad.util.RestHandlerUtils.getSourceContext;
import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS;
import static org.opensearch.timeseries.util.RestHandlerUtils.getSourceContext;

import java.io.IOException;
import java.util.ArrayList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
package org.opensearch.ad.rest;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static org.opensearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.ad.util.RestHandlerUtils.IF_PRIMARY_TERM;
import static org.opensearch.ad.util.RestHandlerUtils.IF_SEQ_NO;
import static org.opensearch.ad.util.RestHandlerUtils.START_JOB;
import static org.opensearch.ad.util.RestHandlerUtils.STOP_JOB;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.timeseries.util.RestHandlerUtils.IF_PRIMARY_TERM;
import static org.opensearch.timeseries.util.RestHandlerUtils.IF_SEQ_NO;
import static org.opensearch.timeseries.util.RestHandlerUtils.START_JOB;
import static org.opensearch.timeseries.util.RestHandlerUtils.STOP_JOB;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package org.opensearch.ad.rest;

import static org.opensearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.timeseries.util.RestHandlerUtils.DETECTOR_ID;

import java.io.IOException;
import java.util.List;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
package org.opensearch.ad.rest;

import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT;
import static org.opensearch.ad.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.ad.util.RestHandlerUtils.RUN;
import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken;
import static org.opensearch.timeseries.util.RestHandlerUtils.DETECTOR_ID;
import static org.opensearch.timeseries.util.RestHandlerUtils.RUN;

import java.io.IOException;
import java.util.List;
Expand Down
Loading

0 comments on commit fce78c2

Please sign in to comment.