Skip to content

Commit

Permalink
Refactoring Index Creation for Improved Code Reuse and Consistency (#932
Browse files Browse the repository at this point in the history
)

This Pull Request significantly refactors the index creation module to promote greater code reusability and consistency. The primary changes are outlined as follows:

1. Code Migration: Common code segments have been moved from AnomalyDetectionIndices to IndexManagement. In addition, component-specific code has been established in ADIndexManagement and ForecastIndexManagement. The bulk of changes are concentrated in these three classes, with the remainder of the modifications relating to reference/test changes.

2. Function Renaming: Several functions have been renamed for broader applicability and consistency:
* AnomalyDetectorFunction has been renamed to ExecutorFunction for potential use in Forecasting.
* AnomalyDetectorSettings.MAX_PRIMARY_SHARDS has been renamed to AnomalyDetectorSettings.AD_MAX_PRIMARY_SHARDS in light of the new ForecastSettings.FORECAST_MAX_PRIMARY_SHARDS.
* doesAnomalyDetectorJobIndexExist() has been renamed to doesJobIndexExist() to allow for job index reusability across Anomaly Detection (AD) and forecasting. Analogous changes have been made to other job index-related functions.
* doesAnomalyDetectorIndexExist() has been renamed to doesConfigIndexExist() to allow for config index reusability across AD and forecasting. Analogous changes have been made to other config index-related functions.
* detectionIndices.doesDetectorStateIndexExist() has been renamed to detectionIndices.doesStateIndexExist(), as the former name was unnecessarily redundant. Similar modifications have been made to the result and checkpoint index.

3. Class Migration: The classes ThrowingSupplierWrapper, ThrowingConsumer, and ThrowingSupplier have been moved from org.opensearch.ad.util to org.opensearch.timeseries.function to promote code reuse.

4. Initial Forecast Indices' Mapping: An initial version of forecast indices' mapping has been added, which can be adjusted as required in the future.

5. In terms of version updates, 2.x has been bumped to 2.9, prompting an increment of the Backward Compatibility (BWC) test version to 2.9.

6. Update dependency com.google.guava:guava to v32 for cve fix.

Testing done:
Testing has been performed with new tests added for forecast index creation, and the grade build passes successfully.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo committed Jun 26, 2023
1 parent aedb781 commit 99749df
Show file tree
Hide file tree
Showing 94 changed files with 2,959 additions and 1,259 deletions.
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ buildscript {
js_resource_folder = "src/test/resources/job-scheduler"
common_utils_version = System.getProperty("common_utils.version", opensearch_build)
job_scheduler_version = System.getProperty("job_scheduler.version", opensearch_build)
bwcVersionShort = "2.8.0"
bwcVersionShort = "2.9.0"
bwcVersion = bwcVersionShort + ".0"
bwcOpenSearchADDownload = 'https://ci.opensearch.org/ci/dbc/distribution-build-opensearch/' + bwcVersionShort + '/latest/linux/x64/tar/builds/' +
'opensearch/plugins/opensearch-anomaly-detection-' + bwcVersion + '.zip'
Expand Down Expand Up @@ -117,7 +117,7 @@ dependencies {
compileOnly "org.opensearch:opensearch-job-scheduler-spi:${job_scheduler_version}"
implementation "org.opensearch:common-utils:${common_utils_version}"
implementation "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
compileOnly group: 'com.google.guava', name: 'guava', version:'31.0.1-jre'
compileOnly group: 'com.google.guava', name: 'guava', version:'32.0.1-jre'
compileOnly group: 'com.google.guava', name: 'failureaccess', version:'1.0.1'
implementation group: 'org.javassist', name: 'javassist', version:'3.28.0-GA'
implementation group: 'org.apache.commons', name: 'commons-math3', version: '3.6.1'
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/org/opensearch/ad/AnomalyDetectorJobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
import org.opensearch.action.get.GetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.settings.AnomalyDetectorSettings;
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.ad.transport.AnomalyResultAction;
Expand Down Expand Up @@ -63,6 +62,7 @@
import org.opensearch.timeseries.common.exception.InternalFailure;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;

import com.google.common.base.Throwables;

Expand All @@ -77,7 +77,7 @@ public class AnomalyDetectorJobRunner implements ScheduledJobRunner {
private Client client;
private ThreadPool threadPool;
private ConcurrentHashMap<String, Integer> detectorEndRunExceptionCount;
private AnomalyDetectionIndices anomalyDetectionIndices;
private ADIndexManagement anomalyDetectionIndices;
private ADTaskManager adTaskManager;
private NodeStateManager nodeStateManager;
private ExecuteADResultResponseRecorder recorder;
Expand Down Expand Up @@ -117,7 +117,7 @@ public void setAdTaskManager(ADTaskManager adTaskManager) {
this.adTaskManager = adTaskManager;
}

public void setAnomalyDetectionIndices(AnomalyDetectionIndices anomalyDetectionIndices) {
public void setAnomalyDetectionIndices(ADIndexManagement anomalyDetectionIndices) {
this.anomalyDetectionIndices = anomalyDetectionIndices;
}

Expand Down Expand Up @@ -514,7 +514,7 @@ private void stopAdJobForEndRunException(
);
}

private void stopAdJob(String detectorId, AnomalyDetectorFunction function) {
private void stopAdJob(String detectorId, ExecutorFunction function) {
GetRequest getRequest = new GetRequest(CommonName.JOB_INDEX).id(detectorId);
ActionListener<GetResponse> listener = ActionListener.wrap(response -> {
if (response.isExists()) {
Expand Down
52 changes: 39 additions & 13 deletions src/main/java/org/opensearch/ad/AnomalyDetectorPlugin.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.feature.FeatureManager;
import org.opensearch.ad.feature.SearchFeatureDao;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.ml.CheckpointDao;
import org.opensearch.ad.ml.EntityColdStarter;
import org.opensearch.ad.ml.HybridThresholdingModel;
Expand Down Expand Up @@ -154,7 +154,6 @@
import org.opensearch.ad.transport.handler.AnomalyResultBulkIndexHandler;
import org.opensearch.ad.transport.handler.MultiEntityResultHandler;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.IndexUtils;
import org.opensearch.ad.util.SecurityClientUtil;
import org.opensearch.ad.util.Throttler;
Expand All @@ -176,6 +175,7 @@
import org.opensearch.env.Environment;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.forecast.model.Forecaster;
import org.opensearch.forecast.settings.ForecastSettings;
import org.opensearch.jobscheduler.spi.JobSchedulerExtension;
import org.opensearch.jobscheduler.spi.ScheduledJobParser;
import org.opensearch.jobscheduler.spi.ScheduledJobRunner;
Expand All @@ -194,7 +194,10 @@
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.dataprocessor.Imputer;
import org.opensearch.timeseries.dataprocessor.LinearUniformImputer;
import org.opensearch.timeseries.function.ThrowingSupplierWrapper;
import org.opensearch.timeseries.settings.TimeSeriesSettings;
import org.opensearch.timeseries.stats.StatNames;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.watcher.ResourceWatcherService;

import com.amazon.randomcutforest.parkservices.state.ThresholdedRandomCutForestMapper;
Expand Down Expand Up @@ -226,7 +229,7 @@ public class AnomalyDetectorPlugin extends Plugin implements ActionPlugin, Scrip
public static final String AD_BATCH_TASK_THREAD_POOL_NAME = "ad-batch-task-threadpool";
public static final String AD_JOB_TYPE = "opendistro_anomaly_detector";
private static Gson gson;
private AnomalyDetectionIndices anomalyDetectionIndices;
private ADIndexManagement anomalyDetectionIndices;
private AnomalyDetectorRunner anomalyDetectorRunner;
private Client client;
private ClusterService clusterService;
Expand Down Expand Up @@ -333,14 +336,19 @@ public Collection<Object> createComponents(
this.clientUtil = new ClientUtil(settings, client, throttler, threadPool);
this.indexUtils = new IndexUtils(client, clientUtil, clusterService, indexNameExpressionResolver);
this.nodeFilter = new DiscoveryNodeFilterer(clusterService);
this.anomalyDetectionIndices = new AnomalyDetectionIndices(
client,
clusterService,
threadPool,
settings,
nodeFilter,
AnomalyDetectorSettings.MAX_UPDATE_RETRY_TIMES
);
// convert from checked IOException to unchecked RuntimeException
this.anomalyDetectionIndices = ThrowingSupplierWrapper
.throwingSupplierWrapper(
() -> new ADIndexManagement(
client,
clusterService,
threadPool,
settings,
nodeFilter,
TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES
)
)
.get();
this.clusterService = clusterService;

Imputer imputer = new LinearUniformImputer(true);
Expand Down Expand Up @@ -853,6 +861,9 @@ public List<Setting<?>> getSettings() {

List<Setting<?>> systemSetting = ImmutableList
.of(
// ======================================
// AD settings
// ======================================
// HCAD cache
LegacyOpenDistroAnomalyDetectorSettings.MAX_CACHE_MISS_HANDLING_PER_SECOND,
AnomalyDetectorSettings.DEDICATED_CACHE_SIZE,
Expand Down Expand Up @@ -894,7 +905,7 @@ public List<Setting<?>> getSettings() {
AnomalyDetectorSettings.MAX_MULTI_ENTITY_ANOMALY_DETECTORS,
AnomalyDetectorSettings.AD_INDEX_PRESSURE_SOFT_LIMIT,
AnomalyDetectorSettings.AD_INDEX_PRESSURE_HARD_LIMIT,
AnomalyDetectorSettings.MAX_PRIMARY_SHARDS,
AnomalyDetectorSettings.AD_MAX_PRIMARY_SHARDS,
// Security
LegacyOpenDistroAnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES,
Expand Down Expand Up @@ -938,7 +949,22 @@ public List<Setting<?>> getSettings() {
// clean resource
AnomalyDetectorSettings.DELETE_AD_RESULT_WHEN_DELETE_DETECTOR,
// stats/profile API
AnomalyDetectorSettings.MAX_MODEL_SIZE_PER_NODE
AnomalyDetectorSettings.MAX_MODEL_SIZE_PER_NODE,
// ======================================
// Forecast settings
// ======================================
// result index rollover
ForecastSettings.FORECAST_RESULT_HISTORY_MAX_DOCS_PER_SHARD,
ForecastSettings.FORECAST_RESULT_HISTORY_RETENTION_PERIOD,
ForecastSettings.FORECAST_RESULT_HISTORY_ROLLOVER_PERIOD,
// resource usage control
ForecastSettings.FORECAST_MODEL_MAX_SIZE_PERCENTAGE,
// TODO: add validation code
// ForecastSettings.FORECAST_MAX_SINGLE_STREAM_FORECASTERS,
// ForecastSettings.FORECAST_MAX_HC_FORECASTERS,
ForecastSettings.FORECAST_INDEX_PRESSURE_SOFT_LIMIT,
ForecastSettings.FORECAST_INDEX_PRESSURE_HARD_LIMIT,
ForecastSettings.FORECAST_MAX_PRIMARY_SHARDS
);
return unmodifiableList(
Stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.RCFPollingResponse;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.ad.util.MultiResponsesDelegateActionListener;
import org.opensearch.ad.util.SecurityClientUtil;
Expand All @@ -73,6 +72,7 @@
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;
import org.opensearch.transport.TransportService;

public class AnomalyDetectorProfileRunner extends AbstractProfileRunner {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.ad.constant.ADCommonMessages;
import org.opensearch.ad.indices.ADIndex;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyResult;
import org.opensearch.ad.model.DetectorProfileName;
Expand All @@ -37,7 +37,6 @@
import org.opensearch.ad.transport.RCFPollingAction;
import org.opensearch.ad.transport.RCFPollingRequest;
import org.opensearch.ad.transport.handler.AnomalyIndexHandler;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
Expand All @@ -50,11 +49,12 @@
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.model.FeatureData;
import org.opensearch.timeseries.model.IntervalTimeConfiguration;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

public class ExecuteADResultResponseRecorder {
private static final Logger log = LogManager.getLogger(ExecuteADResultResponseRecorder.class);

private AnomalyDetectionIndices anomalyDetectionIndices;
private ADIndexManagement anomalyDetectionIndices;
private AnomalyIndexHandler<AnomalyResult> anomalyResultHandler;
private ADTaskManager adTaskManager;
private DiscoveryNodeFilterer nodeFilter;
Expand All @@ -65,7 +65,7 @@ public class ExecuteADResultResponseRecorder {
private int rcfMinSamples;

public ExecuteADResultResponseRecorder(
AnomalyDetectionIndices anomalyDetectionIndices,
ADIndexManagement anomalyDetectionIndices,
AnomalyIndexHandler<AnomalyResult> anomalyResultHandler,
ADTaskManager adTaskManager,
DiscoveryNodeFilterer nodeFilter,
Expand Down
16 changes: 8 additions & 8 deletions src/main/java/org/opensearch/ad/cluster/ADDataMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,13 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.WriteRequest;
import org.opensearch.ad.constant.ADCommonName;
import org.opensearch.ad.indices.AnomalyDetectionIndices;
import org.opensearch.ad.indices.ADIndexManagement;
import org.opensearch.ad.model.ADTask;
import org.opensearch.ad.model.ADTaskState;
import org.opensearch.ad.model.ADTaskType;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.AnomalyDetectorJob;
import org.opensearch.ad.model.DetectorInternalState;
import org.opensearch.ad.rest.handler.AnomalyDetectorFunction;
import org.opensearch.ad.util.ExceptionUtil;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -60,6 +59,7 @@
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.timeseries.common.exception.ResourceNotFoundException;
import org.opensearch.timeseries.constant.CommonName;
import org.opensearch.timeseries.function.ExecutorFunction;

/**
* Migrate AD data to support backward compatibility.
Expand All @@ -71,14 +71,14 @@ public class ADDataMigrator {
private final Client client;
private final ClusterService clusterService;
private final NamedXContentRegistry xContentRegistry;
private final AnomalyDetectionIndices detectionIndices;
private final ADIndexManagement detectionIndices;
private final AtomicBoolean dataMigrated;

public ADDataMigrator(
Client client,
ClusterService clusterService,
NamedXContentRegistry xContentRegistry,
AnomalyDetectionIndices detectionIndices
ADIndexManagement detectionIndices
) {
this.client = client;
this.clusterService = clusterService;
Expand All @@ -94,12 +94,12 @@ public void migrateData() {
if (!dataMigrated.getAndSet(true)) {
logger.info("Start migrating AD data");

if (!detectionIndices.doesAnomalyDetectorJobIndexExist()) {
if (!detectionIndices.doesJobIndexExist()) {
logger.info("AD job index doesn't exist, no need to migrate");
return;
}

if (detectionIndices.doesDetectorStateIndexExist()) {
if (detectionIndices.doesStateIndexExist()) {
migrateDetectorInternalStateToRealtimeTask();
} else {
// If detection index doesn't exist, create index and backfill realtime task.
Expand Down Expand Up @@ -179,7 +179,7 @@ public void backfillRealtimeTask(ConcurrentLinkedQueue<AnomalyDetectorJob> detec
}
String jobId = job.getName();

AnomalyDetectorFunction createRealtimeTaskFunction = () -> {
ExecutorFunction createRealtimeTaskFunction = () -> {
GetRequest getRequest = new GetRequest(DETECTION_STATE_INDEX, jobId);
client.get(getRequest, ActionListener.wrap(r -> {
if (r != null && r.isExists()) {
Expand All @@ -204,7 +204,7 @@ public void backfillRealtimeTask(ConcurrentLinkedQueue<AnomalyDetectorJob> detec

private void checkIfRealtimeTaskExistsAndBackfill(
AnomalyDetectorJob job,
AnomalyDetectorFunction createRealtimeTaskFunction,
ExecutorFunction createRealtimeTaskFunction,
ConcurrentLinkedQueue<AnomalyDetectorJob> detectorJobs,
boolean migrateAll
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import org.opensearch.ad.cluster.diskcleanup.ModelCheckpointIndexRetention;
import org.opensearch.ad.util.ClientUtil;
import org.opensearch.ad.util.DateUtils;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.Client;
import org.opensearch.cluster.LocalNodeClusterManagerListener;
import org.opensearch.cluster.service.ClusterService;
Expand All @@ -28,6 +27,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.threadpool.Scheduler.Cancellable;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.google.common.annotations.VisibleForTesting;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/cluster/HashRing.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.opensearch.action.admin.cluster.node.info.PluginsAndModules;
import org.opensearch.ad.ml.ModelManager;
import org.opensearch.ad.ml.SingleStreamModelIdMapper;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.AdminClient;
import org.opensearch.client.Client;
import org.opensearch.client.ClusterAdminClient;
Expand All @@ -54,6 +53,7 @@
import org.opensearch.common.unit.TimeValue;
import org.opensearch.plugins.PluginInfo;
import org.opensearch.timeseries.common.exception.TimeSeriesException;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

import com.google.common.collect.Sets;

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/opensearch/ad/cluster/HourlyCron.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@
import org.opensearch.action.FailedNodeException;
import org.opensearch.ad.transport.CronAction;
import org.opensearch.ad.transport.CronRequest;
import org.opensearch.ad.util.DiscoveryNodeFilterer;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.timeseries.util.DiscoveryNodeFilterer;

public class HourlyCron implements Runnable {
private static final Logger LOG = LogManager.getLogger(HourlyCron.class);
Expand Down
Loading

0 comments on commit 99749df

Please sign in to comment.