diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index df307c5b7..9c5797f1d 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -52,7 +52,7 @@ jobs: -Dtests.seed=B4BA12CCF1D9E825 -Dtests.security.manager=false \ -Dtests.jvm.argline='-XX:TieredStopAtLevel=1 -XX:ReservedCodeCacheSize=64m' \ -Dtests.locale=ar-JO -Dtests.timezone=Asia/Samarkand -Dmodel-benchmark=true \ - -Dtests.timeoutSuite=3600000! -Dtests.logs=true" + -Dtests.timeoutSuite=3600000! -Dtest.logs=true" ;; hc) su `id -un 1000` -c "./gradlew ':test' --tests 'org.opensearch.ad.ml.HCADModelPerfTests' \ diff --git a/src/main/java/org/opensearch/ad/ADJobProcessor.java b/src/main/java/org/opensearch/ad/ADJobProcessor.java index 4492d3708..189a53bb4 100644 --- a/src/main/java/org/opensearch/ad/ADJobProcessor.java +++ b/src/main/java/org/opensearch/ad/ADJobProcessor.java @@ -81,7 +81,7 @@ protected void validateResultIndexAndRunJob( ExecuteADResultResponseRecorder recorder, Config detector ) { - String resultIndex = jobParameter.getCustomResultIndex(); + String resultIndex = jobParameter.getCustomResultIndexOrAlias(); if (resultIndex == null) { runJob(jobParameter, lockService, lock, executionStartTime, executionEndTime, configId, user, roles, recorder, detector); return; diff --git a/src/main/java/org/opensearch/ad/indices/ADIndex.java b/src/main/java/org/opensearch/ad/indices/ADIndex.java index b345ef33e..a598eabb1 100644 --- a/src/main/java/org/opensearch/ad/indices/ADIndex.java +++ b/src/main/java/org/opensearch/ad/indices/ADIndex.java @@ -37,7 +37,8 @@ public enum ADIndex implements TimeSeriesIndex { false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getCheckpointMappings) ), - STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings)); + STATE(ADCommonName.DETECTION_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getStateMappings)), + CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ADIndexManagement::getResultMappings)),; private final String indexName; // whether we use an alias for the index @@ -64,10 +65,4 @@ public boolean isAlias() { public String getMapping() { return mapping; } - - @Override - public boolean isJobIndex() { - return CommonName.JOB_INDEX.equals(indexName); - } - } diff --git a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java index d0a40ecd8..c056a7890 100644 --- a/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java +++ b/src/main/java/org/opensearch/ad/indices/ADIndexManagement.java @@ -30,12 +30,14 @@ import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; import org.opensearch.ad.constant.ADCommonName; +import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyResult; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.threadpool.ThreadPool; @@ -64,6 +66,7 @@ public class ADIndexManagement extends IndexManagement { * @param settings OS cluster setting * @param nodeFilter Used to filter eligible nodes to host AD indices * @param maxUpdateRunningTimes max number of retries to update index mapping and setting + * @param xContentRegistry registry for json parser * @throws IOException */ public ADIndexManagement( @@ -72,7 +75,8 @@ public ADIndexManagement( ThreadPool threadPool, Settings settings, DiscoveryNodeFilterer nodeFilter, - int maxUpdateRunningTimes + int maxUpdateRunningTimes, + NamedXContentRegistry xContentRegistry ) throws IOException { super( @@ -87,7 +91,10 @@ public ADIndexManagement( AD_RESULT_HISTORY_ROLLOVER_PERIOD.get(settings), AD_RESULT_HISTORY_MAX_DOCS_PER_SHARD.get(settings), AD_RESULT_HISTORY_RETENTION_PERIOD.get(settings), - ADIndex.RESULT.getMapping() + ADIndex.RESULT.getMapping(), + xContentRegistry, + AnomalyDetector::parse, + ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "*" ); this.clusterService.addLocalNodeClusterManagerListener(this); @@ -181,7 +188,7 @@ public void initDefaultResultIndexDirectly(ActionListener a AD_RESULT_HISTORY_INDEX_PATTERN, ADCommonName.ANOMALY_RESULT_INDEX_ALIAS, true, - AD_RESULT_HISTORY_INDEX_PATTERN, + true, ADIndex.RESULT, actionListener ); @@ -270,6 +277,6 @@ protected DeleteRequest createDummyDeleteRequest(String resultIndex) throws IOEx @Override public void initCustomResultIndexDirectly(String resultIndex, ActionListener actionListener) { - initResultIndexDirectly(resultIndex, null, false, AD_RESULT_HISTORY_INDEX_PATTERN, ADIndex.RESULT, actionListener); + initResultIndexDirectly(getCustomResultIndexPattern(resultIndex), resultIndex, false, false, ADIndex.RESULT, actionListener); } } diff --git a/src/main/java/org/opensearch/ad/model/ADTask.java b/src/main/java/org/opensearch/ad/model/ADTask.java index 3070fd8e1..d51d39022 100644 --- a/src/main/java/org/opensearch/ad/model/ADTask.java +++ b/src/main/java/org/opensearch/ad/model/ADTask.java @@ -336,7 +336,7 @@ public static ADTask parse(XContentParser parser, String taskId) throws IOExcept detector.getLastUpdateTime(), detector.getCategoryFields(), detector.getUser(), - detector.getCustomResultIndex(), + detector.getCustomResultIndexOrAlias(), detector.getImputationOption(), detector.getRecencyEmphasis(), detector.getSeasonIntervals(), diff --git a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java index e09c1bf96..32d9b4e2c 100644 --- a/src/main/java/org/opensearch/ad/model/AnomalyDetector.java +++ b/src/main/java/org/opensearch/ad/model/AnomalyDetector.java @@ -265,7 +265,7 @@ public AnomalyDetector(StreamInput input) throws IOException { } else { this.uiMetadata = null; } - customResultIndex = input.readOptionalString(); + customResultIndexOrAlias = input.readOptionalString(); if (input.readBoolean()) { this.imputationOption = new ImputationOption(input); } else { @@ -326,7 +326,7 @@ public void writeTo(StreamOutput output) throws IOException { } else { output.writeBoolean(false); } - output.writeOptionalString(customResultIndex); + output.writeOptionalString(customResultIndexOrAlias); if (imputationOption != null) { output.writeBoolean(true); imputationOption.writeTo(output); diff --git a/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java b/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java index aeb265072..d82eab34a 100644 --- a/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java +++ b/src/main/java/org/opensearch/ad/ratelimit/ADSaveResultStrategy.java @@ -87,7 +87,7 @@ public void saveResult(AnomalyResult result, Config config) { config.getId(), result.getAnomalyGrade() > 0 ? RequestPriority.HIGH : RequestPriority.MEDIUM, result, - config.getCustomResultIndex() + config.getCustomResultIndexOrAlias() ) ); } diff --git a/src/main/java/org/opensearch/ad/rest/AbstractADSearchAction.java b/src/main/java/org/opensearch/ad/rest/AbstractADSearchAction.java index ef901f40c..ee72c8850 100644 --- a/src/main/java/org/opensearch/ad/rest/AbstractADSearchAction.java +++ b/src/main/java/org/opensearch/ad/rest/AbstractADSearchAction.java @@ -13,7 +13,7 @@ import org.opensearch.ad.constant.ADCommonMessages; import org.opensearch.ad.settings.ADEnabledSetting; import org.opensearch.core.xcontent.ToXContentObject; -import org.opensearch.timeseries.AbstractSearchAction; +import org.opensearch.timeseries.rest.AbstractSearchAction; public abstract class AbstractADSearchAction extends AbstractSearchAction { diff --git a/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java index e605b1d8b..ef2b7d754 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/AbstractAnomalyDetectorActionHandler.java @@ -231,7 +231,7 @@ protected AnomalyDetector copyConfig(User user, Config config) { Instant.now(), config.getCategoryFields(), user, - config.getCustomResultIndex(), + config.getCustomResultIndexOrAlias(), config.getImputationOption(), config.getRecencyEmphasis(), config.getSeasonIntervals(), diff --git a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java index 264fd8944..6ff20287c 100644 --- a/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java +++ b/src/main/java/org/opensearch/ad/task/ADBatchTaskRunner.java @@ -1158,7 +1158,7 @@ private void detectAnomaly( user = adTask.getUser().getName(); roles = adTask.getUser().getRoles(); } - String resultIndex = adTask.getDetector().getCustomResultIndex(); + String resultIndex = adTask.getDetector().getCustomResultIndexOrAlias(); if (resultIndex == null) { // if result index is null, store anomaly result directly diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index b3c16c290..e68e9bda5 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -571,10 +571,9 @@ protected void scaleTaskLaneOnCoordinatingNode( TransportService transportService, ActionListener listener ) { - DiscoveryNode coordinatingNode = getCoordinatingNode(adTask); transportService .sendRequest( - coordinatingNode, + getCoordinatingNode(adTask), ForwardADTaskAction.NAME, new ForwardADTaskRequest(adTask, approvedTaskSlot, ADTaskAction.SCALE_ENTITY_TASK_SLOTS), transportRequestOptions, @@ -582,20 +581,56 @@ protected void scaleTaskLaneOnCoordinatingNode( ); } + /** + * Retrieves the coordinating node for the given ADTask. + * + * This method looks for a node in the list of eligible data nodes that matches the coordinating node ID + * and version specified in the ADTask. The matching criteria are: + * 1. The node ID must be equal to the coordinating node ID. + * 2. Both node versions must be either null or equal. + * + * If the coordinating node ID and the local node have different software versions, this method will + * throw a ResourceNotFoundException. + * + * @param adTask the ADTask containing the coordinating node information. + * @return a DiscoveryNode containing the matching DiscoveryNode if found, or throws ResourceNotFoundException if no match is found. + * The caller is supposed to handle the thrown exception. + * @throws ResourceNotFoundException if the coordinating node has a different version than the local node or if the coordinating node is not found. + */ private DiscoveryNode getCoordinatingNode(ADTask adTask) { - String coordinatingNode = adTask.getCoordinatingNode(); - DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes(); - DiscoveryNode targetNode = null; - for (DiscoveryNode node : eligibleDataNodes) { - if (node.getId().equals(coordinatingNode)) { - targetNode = node; - break; + try { + String coordinatingNodeId = adTask.getCoordinatingNode(); + Version coordinatingNodeVersion = hashRing.getVersion(coordinatingNodeId); + Version localNodeVersion = hashRing.getVersion(clusterService.localNode().getId()); + if (!isSameVersion(coordinatingNodeVersion, localNodeVersion)) { + throw new ResourceNotFoundException( + adTask.getConfigId(), + "AD task coordinating node has different version than local node" + ); } - } - if (targetNode == null) { + + DiscoveryNode[] eligibleDataNodes = nodeFilter.getEligibleDataNodes(); + + for (DiscoveryNode node : eligibleDataNodes) { + String nodeId = node.getId(); + if (nodeId == null) { + continue; + } + + if (nodeId.equals(coordinatingNodeId)) { + return node; + } + } + + throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found"); + } catch (Exception e) { + logger.error("Error locating coordinating node", e); throw new ResourceNotFoundException(adTask.getConfigId(), "AD task coordinating node not found"); } - return targetNode; + } + + private boolean isSameVersion(Version version1, Version version2) { + return (version1 == null && version2 == null) || (version1 != null && version2 != null && version1.compareTo(version2) == 0); } @Override @@ -791,7 +826,7 @@ public void cleanConfigCache( } catch (ResourceNotFoundException e) { logger .warn( - "Task coordinating node left cluster, taskId: {}, detectorId: {}, coordinatingNode: {}", + "Task coordinating node left cluster or has different software version, taskId: {}, detectorId: {}, coordinatingNode: {}", taskId, detectorId, coordinatingNode diff --git a/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java index d5442e57c..2de6b07e3 100644 --- a/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADResultBulkTransportAction.java @@ -68,7 +68,7 @@ public ADResultBulkTransportAction( @Override protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ADResultBulkRequest request) { BulkRequest bulkRequest = new BulkRequest(); - List results = request.getAnomalyResults(); + List results = request.getResults(); if (indexingPressurePercent <= softLimit) { for (ADResultWriteRequest resultWriteRequest : results) { diff --git a/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java index 5ef180e91..983c089b5 100644 --- a/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/ADSingleStreamResultTransportAction.java @@ -80,7 +80,7 @@ public ADResultWriteRequest createResultWriteRequest(Config config, AnomalyResul config.getId(), RequestPriority.MEDIUM, result, - config.getCustomResultIndex() + config.getCustomResultIndexOrAlias() ); } diff --git a/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java b/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java index afe3c4729..d7b05f647 100644 --- a/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/SearchTopAnomalyResultTransportAction.java @@ -301,10 +301,10 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio SearchRequest searchRequest = generateSearchRequest(request); // Adding search over any custom result indices - String rawCustomResultIndex = getAdResponse.getDetector().getCustomResultIndex(); - String customResultIndex = rawCustomResultIndex == null ? null : rawCustomResultIndex.trim(); - if (!Strings.isNullOrEmpty(customResultIndex)) { - searchRequest.indices(defaultIndex, customResultIndex); + String rawCustomResultIndexPattern = getAdResponse.getDetector().getCustomResultIndexPattern(); + String customResultIndexPattern = rawCustomResultIndexPattern == null ? null : rawCustomResultIndexPattern.trim(); + if (!Strings.isNullOrEmpty(customResultIndexPattern)) { + searchRequest.indices(defaultIndex, customResultIndexPattern); } // Utilizing the existing search() from SearchHandler to handle security permissions. Both user role @@ -321,7 +321,7 @@ protected void doExecute(Task task, SearchTopAnomalyResultRequest request, Actio clock.millis() + TOP_ANOMALY_RESULT_TIMEOUT_IN_MILLIS, request.getSize(), orderType, - customResultIndex + customResultIndexPattern ) ); diff --git a/src/main/java/org/opensearch/ad/transport/handler/ADIndexMemoryPressureAwareResultHandler.java b/src/main/java/org/opensearch/ad/transport/handler/ADIndexMemoryPressureAwareResultHandler.java index b6a05e19c..1ac134768 100644 --- a/src/main/java/org/opensearch/ad/transport/handler/ADIndexMemoryPressureAwareResultHandler.java +++ b/src/main/java/org/opensearch/ad/transport/handler/ADIndexMemoryPressureAwareResultHandler.java @@ -15,6 +15,8 @@ import org.apache.logging.log4j.Logger; import org.opensearch.ad.indices.ADIndex; import org.opensearch.ad.indices.ADIndexManagement; +import org.opensearch.ad.model.AnomalyResult; +import org.opensearch.ad.ratelimit.ADResultWriteRequest; import org.opensearch.ad.transport.ADResultBulkAction; import org.opensearch.ad.transport.ADResultBulkRequest; import org.opensearch.client.Client; @@ -27,7 +29,7 @@ import org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler; public class ADIndexMemoryPressureAwareResultHandler extends - IndexMemoryPressureAwareResultHandler { + IndexMemoryPressureAwareResultHandler { private static final Logger LOG = LogManager.getLogger(ADIndexMemoryPressureAwareResultHandler.class); @Inject diff --git a/src/main/java/org/opensearch/forecast/ForecastJobProcessor.java b/src/main/java/org/opensearch/forecast/ForecastJobProcessor.java index d6128d030..e863c6583 100644 --- a/src/main/java/org/opensearch/forecast/ForecastJobProcessor.java +++ b/src/main/java/org/opensearch/forecast/ForecastJobProcessor.java @@ -85,7 +85,7 @@ protected void validateResultIndexAndRunJob( Exception exception = new EndRunException(configId, e.getMessage(), false); handleException(jobParameter, lockService, lock, executionStartTime, executionEndTime, exception, recorder, detector); }); - String resultIndex = jobParameter.getCustomResultIndex(); + String resultIndex = jobParameter.getCustomResultIndexOrAlias(); if (resultIndex == null) { indexManagement.validateDefaultResultIndexForBackendJob(configId, user, roles, () -> { listener.onResponse(true); diff --git a/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java b/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java index deb31cad7..2494c8e12 100644 --- a/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java +++ b/src/main/java/org/opensearch/forecast/constant/ForecastCommonMessages.java @@ -46,7 +46,6 @@ public class ForecastCommonMessages { // ====================================== // Used for custom forecast result index // ====================================== - public static String CAN_NOT_FIND_RESULT_INDEX = "Can't find result index "; public static String INVALID_RESULT_INDEX_PREFIX = "Result index must start with " + CUSTOM_RESULT_INDEX_PREFIX; // ====================================== diff --git a/src/main/java/org/opensearch/forecast/indices/ForecastIndex.java b/src/main/java/org/opensearch/forecast/indices/ForecastIndex.java index 8e514dd6e..4fe80815e 100644 --- a/src/main/java/org/opensearch/forecast/indices/ForecastIndex.java +++ b/src/main/java/org/opensearch/forecast/indices/ForecastIndex.java @@ -37,7 +37,8 @@ public enum ForecastIndex implements TimeSeriesIndex { ForecastCommonName.FORECAST_STATE_INDEX, false, ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getStateMappings) - ); + ), + CUSTOM_RESULT(CUSTOM_RESULT_INDEX, true, ThrowingSupplierWrapper.throwingSupplierWrapper(ForecastIndexManagement::getResultMappings)); private final String indexName; // whether we use an alias for the index @@ -64,9 +65,4 @@ public boolean isAlias() { public String getMapping() { return mapping; } - - @Override - public boolean isJobIndex() { - return CommonName.JOB_INDEX.equals(indexName); - } } diff --git a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java index bc2798773..35ade2ef7 100644 --- a/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java +++ b/src/main/java/org/opensearch/forecast/indices/ForecastIndexManagement.java @@ -23,30 +23,27 @@ import java.io.IOException; import java.util.EnumMap; import java.util.List; -import java.util.Map; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; import org.opensearch.ResourceAlreadyExistsException; -import org.opensearch.action.admin.indices.alias.get.GetAliasesRequest; -import org.opensearch.action.admin.indices.alias.get.GetAliasesResponse; import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.create.CreateIndexResponse; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.support.IndicesOptions; import org.opensearch.client.Client; -import org.opensearch.cluster.metadata.AliasMetadata; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.Settings; import org.opensearch.common.xcontent.XContentType; import org.opensearch.commons.InjectSecurity; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.forecast.constant.ForecastCommonName; import org.opensearch.forecast.model.ForecastResult; +import org.opensearch.forecast.model.Forecaster; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.common.exception.EndRunException; import org.opensearch.timeseries.common.exception.TimeSeriesException; @@ -72,6 +69,7 @@ public class ForecastIndexManagement extends IndexManagement { * @param settings OS cluster setting * @param nodeFilter Used to filter eligible nodes to host forecast indices * @param maxUpdateRunningTimes max number of retries to update index mapping and setting + * @param xContentRegistry registry for json parser * @throws IOException */ public ForecastIndexManagement( @@ -80,7 +78,8 @@ public ForecastIndexManagement( ThreadPool threadPool, Settings settings, DiscoveryNodeFilterer nodeFilter, - int maxUpdateRunningTimes + int maxUpdateRunningTimes, + NamedXContentRegistry xContentRegistry ) throws IOException { super( @@ -95,7 +94,10 @@ public ForecastIndexManagement( FORECAST_RESULT_HISTORY_ROLLOVER_PERIOD.get(settings), FORECAST_RESULT_HISTORY_MAX_DOCS_PER_SHARD.get(settings), FORECAST_RESULT_HISTORY_RETENTION_PERIOD.get(settings), - ForecastIndex.RESULT.getMapping() + ForecastIndex.RESULT.getMapping(), + xContentRegistry, + Forecaster::parse, + ForecastCommonName.CUSTOM_RESULT_INDEX_PREFIX + "*" ); this.indexStates = new EnumMap(ForecastIndex.class); @@ -268,7 +270,7 @@ public void initDefaultResultIndexDirectly(ActionListener a FORECAST_RESULT_HISTORY_INDEX_PATTERN, ForecastIndex.RESULT.getIndexName(), false, - FORECAST_RESULT_HISTORY_INDEX_PATTERN, + true, ForecastIndex.RESULT, actionListener ); @@ -276,8 +278,7 @@ public void initDefaultResultIndexDirectly(ActionListener a @Override public void initCustomResultIndexDirectly(String resultIndex, ActionListener actionListener) { - // throws IOException { - initResultIndexDirectly(resultIndex, null, false, FORECAST_RESULT_HISTORY_INDEX_PATTERN, ForecastIndex.RESULT, actionListener); + initResultIndexDirectly(getCustomResultIndexPattern(resultIndex), resultIndex, false, false, ForecastIndex.RESULT, actionListener); } public void validateDefaultResultIndexForBackendJob( @@ -288,49 +289,17 @@ public void validateDefaultResultIndexForBackendJob( ActionListener listener ) { if (doesAliasExist(ForecastCommonName.FORECAST_RESULT_INDEX_ALIAS)) { - handleExistingIndex(configId, user, roles, function, listener); + validateResultIndexAndExecute( + ForecastCommonName.FORECAST_RESULT_INDEX_ALIAS, + () -> executeWithSecurityContext(configId, user, roles, function, listener, ForecastCommonName.FORECAST_RESULT_INDEX_ALIAS), + false, + listener + ); } else { initDefaultResultIndex(configId, user, roles, function, listener); } } - private void handleExistingIndex( - String configId, - String user, - List roles, - ExecutorFunction function, - ActionListener listener - ) { - GetAliasesRequest getAliasRequest = new GetAliasesRequest() - .aliases(ForecastCommonName.FORECAST_RESULT_INDEX_ALIAS) - .indicesOptions(IndicesOptions.lenientExpandOpenHidden()); - - adminClient.indices().getAliases(getAliasRequest, ActionListener.wrap(getAliasResponse -> { - String concreteIndex = getConcreteIndexFromAlias(getAliasResponse); - if (concreteIndex == null) { - listener.onFailure(new EndRunException("Result index alias mapping is empty", false)); - return; - } - - if (!isValidResultIndexMapping(concreteIndex)) { - listener.onFailure(new EndRunException("Result index mapping is not correct", false)); - return; - } - - executeWithSecurityContext(configId, user, roles, function, listener, concreteIndex); - - }, listener::onFailure)); - } - - private String getConcreteIndexFromAlias(GetAliasesResponse getAliasResponse) { - for (Map.Entry> entry : getAliasResponse.getAliases().entrySet()) { - if (!entry.getValue().isEmpty()) { - return entry.getKey(); - } - } - return null; - } - private void initDefaultResultIndex( String configId, String user, @@ -378,5 +347,4 @@ private void executeWithSecurityContext( listener.onFailure(e); } } - } diff --git a/src/main/java/org/opensearch/forecast/model/ForecastTask.java b/src/main/java/org/opensearch/forecast/model/ForecastTask.java index 61bf826c5..33bd114ab 100644 --- a/src/main/java/org/opensearch/forecast/model/ForecastTask.java +++ b/src/main/java/org/opensearch/forecast/model/ForecastTask.java @@ -334,7 +334,7 @@ public static ForecastTask parse(XContentParser parser, String taskId) throws IO forecaster.getLastUpdateTime(), forecaster.getCategoryFields(), forecaster.getUser(), - forecaster.getCustomResultIndex(), + forecaster.getCustomResultIndexOrAlias(), forecaster.getHorizon(), forecaster.getImputationOption(), forecaster.getRecencyEmphasis(), diff --git a/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java b/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java index 1dc3029a0..10f256278 100644 --- a/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java +++ b/src/main/java/org/opensearch/forecast/ratelimit/ForecastSaveResultStrategy.java @@ -83,7 +83,7 @@ public void saveResult(ForecastResult result, Config config) { config.getId(), RequestPriority.MEDIUM, result, - config.getCustomResultIndex() + config.getCustomResultIndexOrAlias() ) ); } diff --git a/src/main/java/org/opensearch/forecast/rest/AbstractForecastSearchAction.java b/src/main/java/org/opensearch/forecast/rest/AbstractForecastSearchAction.java index 842bc1ca9..8801ac333 100644 --- a/src/main/java/org/opensearch/forecast/rest/AbstractForecastSearchAction.java +++ b/src/main/java/org/opensearch/forecast/rest/AbstractForecastSearchAction.java @@ -13,7 +13,7 @@ import org.opensearch.core.xcontent.ToXContentObject; import org.opensearch.forecast.constant.ForecastCommonMessages; import org.opensearch.forecast.settings.ForecastEnabledSetting; -import org.opensearch.timeseries.AbstractSearchAction; +import org.opensearch.timeseries.rest.AbstractSearchAction; /** * Provides an abstract base class for handling search actions within the forecast module. diff --git a/src/main/java/org/opensearch/forecast/rest/handler/AbstractForecasterActionHandler.java b/src/main/java/org/opensearch/forecast/rest/handler/AbstractForecasterActionHandler.java index dafc7123b..92bbf9325 100644 --- a/src/main/java/org/opensearch/forecast/rest/handler/AbstractForecasterActionHandler.java +++ b/src/main/java/org/opensearch/forecast/rest/handler/AbstractForecasterActionHandler.java @@ -249,7 +249,7 @@ protected Config copyConfig(User user, Config config) { Instant.now(), config.getCategoryFields(), user, - config.getCustomResultIndex(), + config.getCustomResultIndexOrAlias(), ((Forecaster) config).getHorizon(), config.getImputationOption(), config.getRecencyEmphasis(), diff --git a/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java b/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java index cea8c4a16..b1635f9d3 100644 --- a/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java +++ b/src/main/java/org/opensearch/forecast/settings/ForecastEnabledSetting.java @@ -27,6 +27,8 @@ public class ForecastEnabledSetting extends DynamicNumericSetting { */ public static final String FORECAST_ENABLED = "plugins.forecast.enabled"; + public static final boolean enabled = false; + public static final Map> settings = unmodifiableMap(new HashMap>() { { /** @@ -53,6 +55,8 @@ public static synchronized ForecastEnabledSetting getInstance() { * @return whether forecasting is enabled. */ public static boolean isForecastEnabled() { - return ForecastEnabledSetting.getInstance().getSettingValue(ForecastEnabledSetting.FORECAST_ENABLED); + // return ForecastEnabledSetting.getInstance().getSettingValue(ForecastEnabledSetting.FORECAST_ENABLED); + // TODO: enable forecasting before released + return enabled; } } diff --git a/src/main/java/org/opensearch/forecast/transport/ForecastResultBulkTransportAction.java b/src/main/java/org/opensearch/forecast/transport/ForecastResultBulkTransportAction.java index 95422a98a..dcb792fba 100644 --- a/src/main/java/org/opensearch/forecast/transport/ForecastResultBulkTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/ForecastResultBulkTransportAction.java @@ -60,7 +60,7 @@ public ForecastResultBulkTransportAction( @Override protected BulkRequest prepareBulkRequest(float indexingPressurePercent, ForecastResultBulkRequest request) { BulkRequest bulkRequest = new BulkRequest(); - List results = request.getAnomalyResults(); + List results = request.getResults(); if (indexingPressurePercent <= softLimit) { for (ForecastResultWriteRequest resultWriteRequest : results) { diff --git a/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java b/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java index dcd132a05..49eb2b995 100644 --- a/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/ForecastRunOnceTransportAction.java @@ -275,8 +275,8 @@ private void checkForecastResults(String forecastID, String taskId, Config confi SearchRequest request = new SearchRequest(ForecastIndexManagement.ALL_FORECAST_RESULTS_INDEX_PATTERN); request.source(source); - if (config.getCustomResultIndex() != null) { - request.indices(config.getCustomResultIndex()); + if (config.getCustomResultIndexOrAlias() != null) { + request.indices(config.getCustomResultIndexPattern()); } client.search(request, ActionListener.wrap(searchResponse -> { diff --git a/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java index 9cde11725..3672a3e43 100644 --- a/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/ForecastSingleStreamResultTransportAction.java @@ -84,7 +84,7 @@ public ForecastResultWriteRequest createResultWriteRequest(Config config, Foreca config.getId(), RequestPriority.MEDIUM, result, - config.getCustomResultIndex() + config.getCustomResultIndexOrAlias() ); } } diff --git a/src/main/java/org/opensearch/forecast/transport/SearchTopForecastResultTransportAction.java b/src/main/java/org/opensearch/forecast/transport/SearchTopForecastResultTransportAction.java index 069914f86..bd0df0d83 100644 --- a/src/main/java/org/opensearch/forecast/transport/SearchTopForecastResultTransportAction.java +++ b/src/main/java/org/opensearch/forecast/transport/SearchTopForecastResultTransportAction.java @@ -184,8 +184,8 @@ protected void doExecute(Task task, SearchTopForecastResultRequest request, Acti SearchRequest searchRequest = generateQuery(request, forecaster); // Adding search over any custom result indices - if (!Strings.isNullOrEmpty(forecaster.getCustomResultIndex())) { - searchRequest.indices(forecaster.getCustomResultIndex()); + if (!Strings.isNullOrEmpty(forecaster.getCustomResultIndexPattern())) { + searchRequest.indices(forecaster.getCustomResultIndexPattern()); } // Utilizing the existing search() from SearchHandler to handle security // permissions. Both user role @@ -335,7 +335,9 @@ private void findMatchingCategoricalFieldValuePair( SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().query(internalFilterQuery).size(1); - String resultIndex = Strings.isNullOrEmpty(forecaster.getCustomResultIndex()) ? defaultIndex : forecaster.getCustomResultIndex(); + String resultIndex = Strings.isNullOrEmpty(forecaster.getCustomResultIndexOrAlias()) + ? defaultIndex + : forecaster.getCustomResultIndexPattern(); SearchRequest searchRequest = new SearchRequest() .indices(resultIndex) .source(searchSourceBuilder) diff --git a/src/main/java/org/opensearch/forecast/transport/handler/ForecastIndexMemoryPressureAwareResultHandler.java b/src/main/java/org/opensearch/forecast/transport/handler/ForecastIndexMemoryPressureAwareResultHandler.java index 3ea3187a0..df5ed807b 100644 --- a/src/main/java/org/opensearch/forecast/transport/handler/ForecastIndexMemoryPressureAwareResultHandler.java +++ b/src/main/java/org/opensearch/forecast/transport/handler/ForecastIndexMemoryPressureAwareResultHandler.java @@ -19,6 +19,8 @@ import org.opensearch.core.action.ActionListener; import org.opensearch.forecast.indices.ForecastIndex; import org.opensearch.forecast.indices.ForecastIndexManagement; +import org.opensearch.forecast.model.ForecastResult; +import org.opensearch.forecast.ratelimit.ForecastResultWriteRequest; import org.opensearch.forecast.transport.ForecastResultBulkAction; import org.opensearch.forecast.transport.ForecastResultBulkRequest; import org.opensearch.timeseries.common.exception.TimeSeriesException; @@ -27,7 +29,7 @@ import org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler; public class ForecastIndexMemoryPressureAwareResultHandler extends - IndexMemoryPressureAwareResultHandler { + IndexMemoryPressureAwareResultHandler { private static final Logger LOG = LogManager.getLogger(ForecastIndexMemoryPressureAwareResultHandler.class); @Inject diff --git a/src/main/java/org/opensearch/timeseries/EntityProfileRunner.java b/src/main/java/org/opensearch/timeseries/EntityProfileRunner.java index 43dbe3cbc..eb6307ddd 100644 --- a/src/main/java/org/opensearch/timeseries/EntityProfileRunner.java +++ b/src/main/java/org/opensearch/timeseries/EntityProfileRunner.java @@ -303,7 +303,7 @@ private void getJob( detectorId, enabledTimeMs, entityValue, - config.getCustomResultIndex() + config.getCustomResultIndexPattern() ); EntityProfile.Builder builder = new EntityProfile.Builder(); diff --git a/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java b/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java index 028af5416..102eb2f6d 100644 --- a/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java +++ b/src/main/java/org/opensearch/timeseries/ExecuteResultResponseRecorder.java @@ -130,7 +130,7 @@ public void indexResult( response.getError() ); - String resultIndex = config.getCustomResultIndex(); + String resultIndex = config.getCustomResultIndexOrAlias(); resultHandler .bulk( resultIndex, @@ -262,13 +262,8 @@ public void indexResultException( User user = config.getUser(); IndexableResultType resultToSave = createErrorResult(configId, dataStartTime, dataEndTime, executeEndTime, errorMessage, user); - String resultIndex = config.getCustomResultIndex(); - if (resultIndex != null && !indexManagement.doesIndexExist(resultIndex)) { - // Set result index as null, will write exception to default result index. - resultHandler.index(resultToSave, configId, null); - } else { - resultHandler.index(resultToSave, configId, resultIndex); - } + String resultIndexOrAlias = config.getCustomResultIndexOrAlias(); + resultHandler.index(resultToSave, configId, resultIndexOrAlias); if (errorMessage.contains(ADCommonMessages.NO_MODEL_ERR_MSG) && !config.isHighCardinality()) { // single stream detector raises ResourceNotFoundException containing ADCommonMessages.NO_CHECKPOINT_ERR_MSG diff --git a/src/main/java/org/opensearch/timeseries/ProfileUtil.java b/src/main/java/org/opensearch/timeseries/ProfileUtil.java index b6de04ba7..2711dc392 100644 --- a/src/main/java/org/opensearch/timeseries/ProfileUtil.java +++ b/src/main/java/org/opensearch/timeseries/ProfileUtil.java @@ -94,9 +94,13 @@ public static void confirmRealtimeInitStatus( ) { SearchRequest searchLatestResult = null; if (analysisType.isAD()) { - searchLatestResult = createADRealtimeInittedEverRequest(config.getId(), enabledTime, config.getCustomResultIndex()); + searchLatestResult = createADRealtimeInittedEverRequest(config.getId(), enabledTime, config.getCustomResultIndexPattern()); } else if (analysisType.isForecast()) { - searchLatestResult = createForecastRealtimeInittedEverRequest(config.getId(), enabledTime, config.getCustomResultIndex()); + searchLatestResult = createForecastRealtimeInittedEverRequest( + config.getId(), + enabledTime, + config.getCustomResultIndexPattern() + ); } else { throw new IllegalArgumentException("Analysis type is not supported, type: : " + analysisType); } diff --git a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java index cde068b7c..29df5742d 100644 --- a/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java +++ b/src/main/java/org/opensearch/timeseries/TimeSeriesAnalyticsPlugin.java @@ -574,7 +574,8 @@ public PooledObject wrap(LinkedBuffer obj) { threadPool, settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + xContentRegistry ) ) .get(); @@ -973,7 +974,8 @@ public PooledObject wrap(LinkedBuffer obj) { threadPool, settings, nodeFilter, - ForecastSettings.FORECAST_MAX_UPDATE_RETRY_TIMES + ForecastSettings.FORECAST_MAX_UPDATE_RETRY_TIMES, + xContentRegistry ) ) .get(); diff --git a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java index 2f9785db4..9d2076488 100644 --- a/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java +++ b/src/main/java/org/opensearch/timeseries/indices/IndexManagement.java @@ -11,7 +11,8 @@ package org.opensearch.timeseries.indices; -import static org.opensearch.timeseries.constant.CommonMessages.CAN_NOT_FIND_RESULT_INDEX; +import static org.opensearch.core.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.timeseries.util.RestHandlerUtils.createXContentParserFromRegistry; import java.io.IOException; import java.net.URL; @@ -20,8 +21,10 @@ import java.util.Arrays; import java.util.EnumMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -45,9 +48,9 @@ import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; import org.opensearch.action.delete.DeleteRequest; import org.opensearch.action.index.IndexRequest; +import org.opensearch.action.search.SearchRequest; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.IndicesOptions; -import org.opensearch.ad.indices.ADIndex; import org.opensearch.client.AdminClient; import org.opensearch.client.Client; import org.opensearch.cluster.LocalNodeClusterManagerListener; @@ -67,13 +70,21 @@ import org.opensearch.core.xcontent.XContentParser; import org.opensearch.core.xcontent.XContentParser.Token; import org.opensearch.index.IndexNotFoundException; +import org.opensearch.index.query.BoolQueryBuilder; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.search.SearchHit; +import org.opensearch.search.builder.SearchSourceBuilder; import org.opensearch.threadpool.Scheduler; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.common.exception.EndRunException; +import org.opensearch.timeseries.common.exception.ResourceNotFoundException; +import org.opensearch.timeseries.common.exception.TimeSeriesException; import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.constant.CommonValue; +import org.opensearch.timeseries.function.BiCheckedFunction; import org.opensearch.timeseries.function.ExecutorFunction; +import org.opensearch.timeseries.model.Config; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.opensearch.timeseries.util.DiscoveryNodeFilterer; @@ -120,6 +131,9 @@ public abstract class IndexManagement & TimeSe // result index mapping to valida custom index private Map RESULT_FIELD_CONFIGS; private String resultMapping; + private NamedXContentRegistry xContentRegistry; + protected BiCheckedFunction configParser; + protected String customResultIndexRegex; protected class IndexState { // keep track of whether the mapping version is up-to-date @@ -148,7 +162,10 @@ protected IndexManagement( TimeValue historyRolloverPeriod, Long historyMaxDocs, TimeValue historyRetentionPeriod, - String resultMapping + String resultMapping, + NamedXContentRegistry xContentRegistry, + BiCheckedFunction configParser, + String customResultIndexRegex ) throws IOException { this.client = client; @@ -170,6 +187,9 @@ protected IndexManagement( this.updateRunning = new AtomicBoolean(false); this.updateRunningTimes = 0; this.resultMapping = resultMapping; + this.xContentRegistry = xContentRegistry; + this.configParser = configParser; + this.customResultIndexRegex = customResultIndexRegex; } /** @@ -360,7 +380,7 @@ protected void updateJobIndexSettingIfNecessary(String indexName, IndexState job // if the auto expand setting is already there, return immediately if (autoExpandReplica != null) { jobIndexState.settingUpToDate = true; - logger.info(new ParameterizedMessage("Mark [{}]'s mapping up-to-date", indexName)); + logger.info(new ParameterizedMessage("Mark [{}]'s setting up-to-date", indexName)); listener.onResponse(null); return; } @@ -392,14 +412,14 @@ protected void updateJobIndexSettingIfNecessary(String indexName, IndexState job final UpdateSettingsRequest updateSettingsRequest = new UpdateSettingsRequest(indexName).settings(updatedSettings); client.admin().indices().updateSettings(updateSettingsRequest, ActionListener.wrap(response -> { jobIndexState.settingUpToDate = true; - logger.info(new ParameterizedMessage("Mark [{}]'s mapping up-to-date", indexName)); + logger.info(new ParameterizedMessage("Mark [{}]'s setting up-to-date", indexName)); listener.onResponse(null); }, listener::onFailure)); }, e -> { if (e instanceof IndexNotFoundException) { // new index will be created with auto expand replica setting jobIndexState.settingUpToDate = true; - logger.info(new ParameterizedMessage("Mark [{}]'s mapping up-to-date", indexName)); + logger.info(new ParameterizedMessage("Mark [{}]'s setting up-to-date", indexName)); listener.onResponse(null); } else { listener.onFailure(e); @@ -521,7 +541,7 @@ public void initJobIndex(ActionListener actionListener) { *

* * @param The type of the action listener's response. - * @param resultIndex The custom result index to validate. + * @param resultIndexOrAlias The custom result index to validate. * @param function The function to be executed if validation is successful. * @param mappingValidated Indicates whether the mapping for the result index has been previously validated. * @param listener The listener to be notified of the success or failure of the operation. @@ -529,39 +549,52 @@ public void initJobIndex(ActionListener actionListener) { * @throws IllegalArgumentException If the result index mapping is found to be invalid. */ public void validateResultIndexAndExecute( - String resultIndex, + String resultIndexOrAlias, ExecutorFunction function, boolean mappingValidated, ActionListener listener ) { - try { - if (!mappingValidated && !isValidResultIndexMapping(resultIndex)) { - logger.warn("Can't create analysis with custom result index {} as its mapping is invalid", resultIndex); - listener.onFailure(new IllegalArgumentException(CommonMessages.INVALID_RESULT_INDEX_MAPPING + resultIndex)); - return; + if (!mappingValidated) { + validateResultIndexMapping(resultIndexOrAlias, ActionListener.wrap(validMapping -> { + if (validMapping) { + executeAfterValidateResultIndexMapping(resultIndexOrAlias, function, listener); + } else { + logger.warn("Can't create analysis with custom result index {} as its mapping is invalid", resultIndexOrAlias); + listener.onFailure(new IllegalArgumentException(CommonMessages.INVALID_RESULT_INDEX_MAPPING + resultIndexOrAlias)); + } + }, listener::onFailure)); + } else { + try { + executeAfterValidateResultIndexMapping(resultIndexOrAlias, function, listener); + } catch (Exception e) { + logger.error("Failed to validate custom result index " + resultIndexOrAlias, e); + listener.onFailure(e); } + } + } - IndexRequest indexRequest = createDummyIndexRequest(resultIndex); - - // User may have no write permission on custom result index. Talked with security plugin team, seems no easy way to verify - // if user has write permission. So just tried to write and delete a dummy forecast result to verify. - client.index(indexRequest, ActionListener.wrap(response -> { - logger.debug("Successfully wrote dummy result to result index {}", resultIndex); - client.delete(createDummyDeleteRequest(resultIndex), ActionListener.wrap(deleteResponse -> { - logger.debug("Successfully deleted dummy result from result index {}", resultIndex); - function.execute(); - }, ex -> { - logger.error("Failed to delete dummy result from result index " + resultIndex, ex); - listener.onFailure(ex); - })); - }, exception -> { - logger.error("Failed to write dummy result to result index " + resultIndex, exception); - listener.onFailure(exception); + private void executeAfterValidateResultIndexMapping( + String resultIndexOrAlias, + ExecutorFunction function, + ActionListener listener + ) throws IOException { + IndexRequest indexRequest = createDummyIndexRequest(resultIndexOrAlias); + + // User may have no write permission on custom result index. Talked with security plugin team, seems no easy way to verify + // if user has write permission. So just tried to write and delete a dummy result to verify. + client.index(indexRequest, ActionListener.wrap(response -> { + logger.debug("Successfully wrote dummy result to result index {}", resultIndexOrAlias); + client.delete(createDummyDeleteRequest(resultIndexOrAlias), ActionListener.wrap(deleteResponse -> { + logger.debug("Successfully deleted dummy result from result index {}", resultIndexOrAlias); + function.execute(); + }, ex -> { + logger.error("Failed to delete dummy result from result index " + resultIndexOrAlias, ex); + listener.onFailure(ex); })); - } catch (Exception e) { - logger.error("Failed to validate custom result index " + resultIndex, e); - listener.onFailure(e); - } + }, exception -> { + logger.error("Failed to write dummy result to result index " + resultIndexOrAlias, exception); + listener.onFailure(exception); + })); } public void update() { @@ -609,7 +642,7 @@ private void updateSettingIfNecessary(GroupedActionListener delegateListen final GroupedActionListener conglomerateListeneer = new GroupedActionListener<>( ActionListener.wrap(r -> delegateListeneer.onResponse(null), exception -> { delegateListeneer.onResponse(null); - logger.error("Fail to update time series indices' mappings", exception); + logger.error("Fail to update time series indices' settings", exception); }), updates.size() ); @@ -617,7 +650,7 @@ private void updateSettingIfNecessary(GroupedActionListener delegateListen logger.info(new ParameterizedMessage("Check [{}]'s setting", timeseriesIndex.getIndexName())); if (timeseriesIndex.isJobIndex() && doesIndexExist(timeseriesIndex.getIndexName())) { updateJobIndexSettingIfNecessary( - ADIndex.JOB.getIndexName(), + timeseriesIndex.getIndexName(), indexStates.computeIfAbsent(timeseriesIndex, k -> new IndexState(k.getMapping())), conglomerateListeneer ); @@ -662,51 +695,208 @@ private void updateMappingIfNecessary(GroupedActionListener delegateListen ); for (IndexType index : updates) { - logger.info(new ParameterizedMessage("Check [{}]'s mapping", index.getIndexName())); - shouldUpdateIndex(index, ActionListener.wrap(shouldUpdate -> { - if (shouldUpdate) { - adminClient - .indices() - .putMapping( - new PutMappingRequest().indices(index.getIndexName()).source(index.getMapping(), XContentType.JSON), - ActionListener.wrap(putMappingResponse -> { - if (putMappingResponse.isAcknowledged()) { - logger.info(new ParameterizedMessage("Succeeded in updating [{}]'s mapping", index.getIndexName())); - markMappingUpdated(index); - } else { - logger.error(new ParameterizedMessage("Fail to update [{}]'s mapping", index.getIndexName())); - } - conglomerateListeneer.onResponse(null); - }, exception -> { - logger - .error( - new ParameterizedMessage( - "Fail to update [{}]'s mapping due to [{}]", - index.getIndexName(), - exception.getMessage() - ) - ); - conglomerateListeneer.onFailure(exception); - }) + if (index.isCustomResultIndex()) { + updateCustomResultIndexMapping(index, conglomerateListeneer); + } else { + logger.info(new ParameterizedMessage("Check [{}]'s mapping", index.getIndexName())); + shouldUpdateIndex(index, ActionListener.wrap(shouldUpdate -> { + if (shouldUpdate) { + adminClient + .indices() + .putMapping( + new PutMappingRequest().indices(index.getIndexName()).source(index.getMapping(), XContentType.JSON), + ActionListener.wrap(putMappingResponse -> { + if (putMappingResponse.isAcknowledged()) { + logger.info(new ParameterizedMessage("Succeeded in updating [{}]'s mapping", index.getIndexName())); + markMappingUpdated(index); + } else { + logger.error(new ParameterizedMessage("Fail to update [{}]'s mapping", index.getIndexName())); + } + conglomerateListeneer.onResponse(null); + }, exception -> { + logger + .error( + new ParameterizedMessage( + "Fail to update [{}]'s mapping due to [{}]", + index.getIndexName(), + exception.getMessage() + ) + ); + conglomerateListeneer.onFailure(exception); + }) + ); + } else { + // index does not exist or the version is already up-to-date. + // When creating index, new mappings will be used. + // We don't need to update it. + logger.info(new ParameterizedMessage("We don't need to update [{}]'s mapping", index.getIndexName())); + markMappingUpdated(index); + conglomerateListeneer.onResponse(null); + } + }, exception -> { + logger + .error( + new ParameterizedMessage("Fail to check whether we should update [{}]'s mapping", index.getIndexName()), + exception ); - } else { - // index does not exist or the version is already up-to-date. - // When creating index, new mappings will be used. - // We don't need to update it. - logger.info(new ParameterizedMessage("We don't need to update [{}]'s mapping", index.getIndexName())); - markMappingUpdated(index); - conglomerateListeneer.onResponse(null); + conglomerateListeneer.onFailure(exception); + })); + } + } + } + + private void updateCustomResultIndexMapping(IndexType customIndex, GroupedActionListener delegateListeneer) { + getConfigsWithCustomResultIndexAlias(ActionListener.wrap(candidateResultAliases -> { + if (candidateResultAliases.size() == 0) { + logger.info("candidate custom result indices are empty."); + markMappingUpdated(customIndex); + delegateListeneer.onResponse(null); + return; + } + + final GroupedActionListener customIndexMappingUpdateListener = new GroupedActionListener<>( + ActionListener.wrap(mappingUpdateResponse -> { + markMappingUpdated(customIndex); + delegateListeneer.onResponse(null); + }, exception -> { + delegateListeneer.onResponse(null); + logger.error("Fail to update result indices' mappings", exception); + }), + candidateResultAliases.size() + ); + + processResultIndexMappingIteration( + 0, + getSchemaVersion(customIndex), + customIndex.getMapping(), + candidateResultAliases, + customIndexMappingUpdateListener + ); + }, e -> delegateListeneer.onFailure(new TimeSeriesException("Fail to update custom result indices' mapping.", e)))); + } + + private void getConfigsWithCustomResultIndexAlias(ActionListener> listener) { + IndexType configIndex = null; + for (IndexType timeseriesIndex : indexType.getEnumConstants()) { + if (timeseriesIndex.isConfigIndex() && doesIndexExist(timeseriesIndex.getIndexName())) { + configIndex = timeseriesIndex; + break; + } + } + if (configIndex == null || configIndex.getIndexName() == null) { + listener.onFailure(new ResourceNotFoundException("fail to find config index")); + return; + } + BoolQueryBuilder boolQuery = QueryBuilders.boolQuery(); + BoolQueryBuilder shouldQueries = new BoolQueryBuilder(); + shouldQueries.should(QueryBuilders.wildcardQuery(Config.RESULT_INDEX_FIELD, customResultIndexRegex)); + if (shouldQueries.should().isEmpty() == false) { + boolQuery.filter(shouldQueries); + } + + SearchRequest searchRequest = new SearchRequest() + .indices(new String[] { configIndex.getIndexName() }) + .source(new SearchSourceBuilder().size(10000).query(boolQuery)); + client.search(searchRequest, ActionListener.wrap(r -> { + if (r == null || r.getHits().getTotalHits() == null || r.getHits().getTotalHits().value == 0) { + logger.info("no config available."); + listener.onResponse(null); + return; + } + Iterator iterator = r.getHits().iterator(); + + List candidateConfigs = new ArrayList<>(); + while (iterator.hasNext()) { + SearchHit searchHit = iterator.next(); + try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, searchHit.getSourceRef())) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + Config config = configParser.apply(parser, searchHit.getId()); + String indexOrAlias = config.getCustomResultIndexOrAlias(); + + // old custom index is an index, new custom index is an alias. We will only deal with new result index for simplicity. + if (doesAliasExist(indexOrAlias)) { + candidateConfigs.add(config); + } + } catch (Exception e) { + logger.error("failed to parse config " + searchHit.getId(), e); } - }, exception -> { - logger - .error( - new ParameterizedMessage("Fail to check whether we should update [{}]'s mapping", index.getIndexName()), - exception - ); - conglomerateListeneer.onFailure(exception); - })); + } + listener.onResponse(candidateConfigs); + }, e -> listener.onFailure(new TimeSeriesException("Fail to update custom result indices' mapping.", e)))); + } + private void processResultIndexMappingIteration( + int indexPos, + Integer newestSchemaVersion, + String mappingSource, + List candidateResultIndices, + GroupedActionListener conglomerateListeneer + ) { + if (indexPos >= candidateResultIndices.size()) { + return; } + String index = candidateResultIndices.get(indexPos).getCustomResultIndexOrAlias(); + logger.info(new ParameterizedMessage("Check [{}]'s mapping", index)); + shouldUpdateIndex(index, true, newestSchemaVersion, ActionListener.wrap(shouldUpdate -> { + if (shouldUpdate) { + adminClient + .indices() + .putMapping( + new PutMappingRequest().indices(index).source(mappingSource, XContentType.JSON), + ActionListener.wrap(putMappingResponse -> { + if (putMappingResponse.isAcknowledged()) { + logger.info(new ParameterizedMessage("Succeeded in updating [{}]'s mapping", index)); + } else { + logger.error(new ParameterizedMessage("Fail to update [{}]'s mapping", index)); + } + conglomerateListeneer.onResponse(null); + processResultIndexMappingIteration( + indexPos + 1, + newestSchemaVersion, + mappingSource, + candidateResultIndices, + conglomerateListeneer + ); + }, exception -> { + logger + .error( + new ParameterizedMessage("Fail to update [{}]'s mapping due to [{}]", index, exception.getMessage()) + ); + conglomerateListeneer.onFailure(exception); + processResultIndexMappingIteration( + indexPos + 1, + newestSchemaVersion, + mappingSource, + candidateResultIndices, + conglomerateListeneer + ); + }) + ); + } else { + // index does not exist or the version is already up-to-date. + // When creating index, new mappings will be used. + // We don't need to update it. + logger.info(new ParameterizedMessage("We don't need to update [{}]'s mapping", index)); + conglomerateListeneer.onResponse(null); + processResultIndexMappingIteration( + indexPos + 1, + newestSchemaVersion, + mappingSource, + candidateResultIndices, + conglomerateListeneer + ); + } + }, exception -> { + logger.error(new ParameterizedMessage("Fail to check whether we should update [{}]'s mapping", index), exception); + conglomerateListeneer.onFailure(exception); + processResultIndexMappingIteration( + indexPos + 1, + newestSchemaVersion, + mappingSource, + candidateResultIndices, + conglomerateListeneer + ); + })); } private void markMappingUpdated(IndexType adIndex) { @@ -718,21 +908,25 @@ private void markMappingUpdated(IndexType adIndex) { } private void shouldUpdateIndex(IndexType index, ActionListener thenDo) { + Integer newVersion = indexStates.computeIfAbsent(index, k -> new IndexState(k.getMapping())).schemaVersion; + shouldUpdateIndex(index.getIndexName(), index.isAlias(), newVersion, thenDo); + } + + private void shouldUpdateIndex(String indexOrAliasName, boolean isAlias, Integer newVersion, ActionListener thenDo) { boolean exists = false; - if (index.isAlias()) { - exists = doesAliasExist(index.getIndexName()); + if (isAlias) { + exists = doesAliasExist(indexOrAliasName); } else { - exists = doesIndexExist(index.getIndexName()); + exists = doesIndexExist(indexOrAliasName); } if (false == exists) { thenDo.onResponse(Boolean.FALSE); return; } - Integer newVersion = indexStates.computeIfAbsent(index, k -> new IndexState(k.getMapping())).schemaVersion; - if (index.isAlias()) { + if (isAlias) { GetAliasesRequest getAliasRequest = new GetAliasesRequest() - .aliases(index.getIndexName()) + .aliases(indexOrAliasName) .indicesOptions(IndicesOptions.lenientExpandOpenHidden()); adminClient.indices().getAliases(getAliasRequest, ActionListener.wrap(getAliasResponse -> { String concreteIndex = null; @@ -748,9 +942,31 @@ private void shouldUpdateIndex(IndexType index, ActionListener thenDo) return; } shouldUpdateConcreteIndex(concreteIndex, newVersion, thenDo); - }, exception -> logger.error(new ParameterizedMessage("Fail to get [{}]'s alias", index.getIndexName()), exception))); + }, exception -> logger.error(new ParameterizedMessage("Fail to get [{}]'s alias", indexOrAliasName), exception))); + } else { + shouldUpdateConcreteIndex(indexOrAliasName, newVersion, thenDo); + } + } + + protected void getConcreteIndex(String indexOrAliasName, ActionListener thenDo) { + if (doesAliasExist(indexOrAliasName)) { + GetAliasesRequest getAliasRequest = new GetAliasesRequest() + .aliases(indexOrAliasName) + .indicesOptions(IndicesOptions.lenientExpandOpenHidden()); + adminClient.indices().getAliases(getAliasRequest, ActionListener.wrap(getAliasResponse -> { + String concreteIndex = null; + for (Map.Entry> entry : getAliasResponse.getAliases().entrySet()) { + if (false == entry.getValue().isEmpty()) { + // we assume the alias map to one concrete index, thus we can return after finding one + concreteIndex = entry.getKey(); + break; + } + } + thenDo.onResponse(concreteIndex); + }, exception -> logger.error(new ParameterizedMessage("Fail to get [{}]'s alias", indexOrAliasName), exception))); } else { - shouldUpdateConcreteIndex(index.getIndexName(), newVersion, thenDo); + // if this is not an alias or the index does not exist yet, return indexOrAliasName + thenDo.onResponse(indexOrAliasName); } } @@ -764,54 +980,82 @@ public int getSchemaVersion(IndexType index) { return indexState.schemaVersion; } - public void initCustomResultIndexAndExecute(String resultIndex, ExecutorFunction function, ActionListener listener) { - if (!doesIndexExist(resultIndex)) { - initCustomResultIndexDirectly(resultIndex, ActionListener.wrap(response -> { + public void initCustomResultIndexAndExecute(String resultIndexOrAlias, ExecutorFunction function, ActionListener listener) { + if (!doesIndexExist(resultIndexOrAlias) && !doesAliasExist(resultIndexOrAlias)) { + initCustomResultIndexDirectly(resultIndexOrAlias, ActionListener.wrap(response -> { if (response.isAcknowledged()) { - logger.info("Successfully created result index {}", resultIndex); - validateResultIndexAndExecute(resultIndex, function, false, listener); + logger.info("Successfully created result index {}", resultIndexOrAlias); + validateResultIndexAndExecute(resultIndexOrAlias, function, false, listener); } else { - String error = "Creating result index with mappings call not acknowledged: " + resultIndex; + String error = "Creating result index with mappings call not acknowledged: " + resultIndexOrAlias; logger.error(error); listener.onFailure(new EndRunException(error, false)); } }, exception -> { if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { // It is possible the index has been created while we sending the create request - validateResultIndexAndExecute(resultIndex, function, false, listener); + validateResultIndexAndExecute(resultIndexOrAlias, function, false, listener); } else { - logger.error("Failed to create result index " + resultIndex, exception); + logger.error("Failed to create result index " + resultIndexOrAlias, exception); listener.onFailure(exception); } })); } else { - validateResultIndexAndExecute(resultIndex, function, false, listener); + validateResultIndexAndExecute(resultIndexOrAlias, function, false, listener); } } public void validateCustomIndexForBackendJob( - String resultIndex, + String resultIndexOrAlias, String securityLogId, String user, List roles, ExecutorFunction function, ActionListener listener ) { - if (!doesIndexExist(resultIndex)) { - listener.onFailure(new EndRunException(CAN_NOT_FIND_RESULT_INDEX + resultIndex, true)); - return; - } - if (!isValidResultIndexMapping(resultIndex)) { - listener.onFailure(new EndRunException("Result index mapping is not correct", true)); - return; + if (!doesIndexExist(resultIndexOrAlias) && !doesAliasExist(resultIndexOrAlias)) { + initCustomResultIndexDirectly(resultIndexOrAlias, ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + executeOnCustomIndex(resultIndexOrAlias, securityLogId, user, roles, function, listener); + } else { + String error = "Creating custom result index with mappings call not acknowledged"; + logger.error(error); + listener.onFailure(new TimeSeriesException(error)); + } + }, exception -> { + if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { + // It is possible the index has been created while we sending the create request + executeOnCustomIndex(resultIndexOrAlias, securityLogId, user, roles, function, listener); + } else { + listener.onFailure(exception); + } + })); + } else { + validateResultIndexMapping(resultIndexOrAlias, ActionListener.wrap(validMapping -> { + if (validMapping) { + executeOnCustomIndex(resultIndexOrAlias, securityLogId, user, roles, function, listener); + } else { + listener.onFailure(new EndRunException("Result index mapping is not correct", true)); + } + }, listener::onFailure)); } + } + + private void executeOnCustomIndex( + String resultIndexOrAlias, + String securityLogId, + String user, + List roles, + ExecutorFunction function, + ActionListener listener + ) { try (InjectSecurity injectSecurity = new InjectSecurity(securityLogId, settings, client.threadPool().getThreadContext())) { injectSecurity.inject(user, roles); ActionListener wrappedListener = ActionListener.wrap(r -> { listener.onResponse(r); }, e -> { injectSecurity.close(); listener.onFailure(e); }); - validateResultIndexAndExecute(resultIndex, () -> { + validateResultIndexAndExecute(resultIndexOrAlias, () -> { injectSecurity.close(); function.execute(); }, true, wrappedListener); @@ -878,48 +1122,87 @@ private void initResultMapping() throws IOException { /** * Check if custom result index has correct index mapping. - * @param resultIndex result index - * @return true if result index mapping is valid + * @param resultIndexOrAlias result index name or alias + * @param thenDo listener returns true if result index mapping is valid. + * */ - public boolean isValidResultIndexMapping(String resultIndex) { - try { - initResultMapping(); - if (RESULT_FIELD_CONFIGS == null) { - // failed to populate the field - return false; - } - IndexMetadata indexMetadata = clusterService.state().metadata().index(resultIndex); - Map indexMapping = indexMetadata.mapping().sourceAsMap(); - String propertyName = CommonName.PROPERTIES; - if (!indexMapping.containsKey(propertyName) || !(indexMapping.get(propertyName) instanceof LinkedHashMap)) { - return false; - } - LinkedHashMap mapping = (LinkedHashMap) indexMapping.get(propertyName); - boolean correctResultIndexMapping = true; - - for (String fieldName : RESULT_FIELD_CONFIGS.keySet()) { - Object defaultSchema = RESULT_FIELD_CONFIGS.get(fieldName); - // the field might be a map or map of map - // example: map: {type=date, format=strict_date_time||epoch_millis} - // map of map: {type=nested, properties={likelihood={type=double}, value_list={type=nested, properties={data={type=double}, - // feature_id={type=keyword}}}}} - // if it is a map of map, Object.equals can compare them regardless of order - if (!mapping.containsKey(fieldName) || !defaultSchema.equals(mapping.get(fieldName))) { - logger.warn("mapping mismatch due to {}", fieldName); - correctResultIndexMapping = false; - break; + public void validateResultIndexMapping(String resultIndexOrAlias, ActionListener thenDo) { + getConcreteIndex(resultIndexOrAlias, ActionListener.wrap(concreteIndex -> { + try { + initResultMapping(); + if (RESULT_FIELD_CONFIGS == null) { + // failed to populate the field + thenDo.onResponse(false); + } + IndexMetadata indexMetadata = clusterService.state().metadata().index(concreteIndex); + Map indexMapping = indexMetadata.mapping().sourceAsMap(); + String propertyName = CommonName.PROPERTIES; + if (!indexMapping.containsKey(propertyName) || !(indexMapping.get(propertyName) instanceof LinkedHashMap)) { + thenDo.onResponse(false); + } + LinkedHashMap mapping = (LinkedHashMap) indexMapping.get(propertyName); + boolean correctResultIndexMapping = true; + + for (String fieldName : RESULT_FIELD_CONFIGS.keySet()) { + Object defaultSchema = RESULT_FIELD_CONFIGS.get(fieldName); + // the field might be a map or map of map + // example: map: {type=date, format=strict_date_time||epoch_millis} + // map of map: {type=nested, properties={likelihood={type=double}, value_list={type=nested, + // properties={data={type=double}, + // feature_id={type=keyword}}}}} + // if it is a map of map, Object.equals can compare them regardless of order + if (!mapping.containsKey(fieldName)) { + logger.warn("mapping mismatch due to missing {}", fieldName); + correctResultIndexMapping = false; + break; + } + Object actualSchema = mapping.get(fieldName); + if (!isSchemaSuperset(actualSchema, defaultSchema)) { + logger.warn("mapping mismatch due to {}", fieldName); + correctResultIndexMapping = false; + break; + } } + thenDo.onResponse(correctResultIndexMapping); + } catch (Exception e) { + logger.error("Failed to validate result index mapping for index " + concreteIndex, e); + thenDo.onResponse(false); } - return correctResultIndexMapping; - } catch (Exception e) { - logger.error("Failed to validate result index mapping for index " + resultIndex, e); + }, thenDo::onFailure)); + } + + /** + * Recursively checks if schema1 is a superset of schema2. + * @param schema1 the potential superset schema object + * @param schema2 the subset schema object + * @return true if schema1 is a superset of schema2 + */ + private boolean isSchemaSuperset(Object schema1, Object schema2) { + if (schema1 == schema2) { + return true; + } + if (schema1 == null || schema2 == null) { return false; } - + if (schema1 instanceof Map && schema2 instanceof Map) { + Map map1 = (Map) schema1; + Map map2 = (Map) schema2; + for (Map.Entry entry : map2.entrySet()) { + Object key = entry.getKey(); + if (!map1.containsKey(key)) { + return false; + } + if (!isSchemaSuperset(map1.get(key), entry.getValue())) { + return false; + } + } + return true; + } + return schema1.equals(schema2); } /** - * Create forecast result index if not exist. + * Create result index if not exist. * * @param actionListener action called after create index */ @@ -974,14 +1257,19 @@ protected void rolloverAndDeleteHistoryIndex( logger.info("{} rolled over. Conditions were: {}", resultIndexAlias, response.getConditionStatus()); deleteOldHistoryIndices(allResultIndicesPattern, historyRetentionPeriod); } - }, exception -> { logger.error("Fail to roll over result index", exception); })); + }, exception -> { + // e.g., we may roll over too often. Since the index pattern is opensearch-ad-plugin-result-d-history-{now/d}-000001, + // we cannot roll over twice in the same day as the index with the same name exists. We will get + // resource_already_exists_exception. + logger.error("Fail to roll over result index", exception); + })); } protected void initResultIndexDirectly( String resultIndexName, String alias, boolean hiddenIndex, - String resultIndexPattern, + boolean defaultResultIndex, IndexType resultIndex, ActionListener actionListener ) { @@ -989,14 +1277,24 @@ protected void initResultIndexDirectly( if (alias != null) { request.alias(new Alias(alias)); } + + // make index hidden if default result index is true choosePrimaryShards(request, hiddenIndex); - if (resultIndexPattern.equals(resultIndexName)) { + if (defaultResultIndex) { adminClient.indices().create(request, markMappingUpToDate(resultIndex, actionListener)); } else { adminClient.indices().create(request, actionListener); } } + protected String getCustomResultIndexPattern(String customResultIndexAlias) { + return String.format(Locale.ROOT, "<%s-history-{now/d}-1>", customResultIndexAlias); + } + + public static String getAllCustomResultIndexPattern(String customResultIndexAlias) { + return String.format(Locale.ROOT, "%s*", customResultIndexAlias); + } + public abstract boolean doesCheckpointIndexExist(); public abstract void initCheckpointIndex(ActionListener actionListener); diff --git a/src/main/java/org/opensearch/timeseries/indices/TimeSeriesIndex.java b/src/main/java/org/opensearch/timeseries/indices/TimeSeriesIndex.java index e7364ed32..090501adf 100644 --- a/src/main/java/org/opensearch/timeseries/indices/TimeSeriesIndex.java +++ b/src/main/java/org/opensearch/timeseries/indices/TimeSeriesIndex.java @@ -11,12 +11,26 @@ package org.opensearch.timeseries.indices; +import org.opensearch.timeseries.constant.CommonName; + public interface TimeSeriesIndex { + String CUSTOM_RESULT_INDEX = "custom_result_index"; + public String getIndexName(); public boolean isAlias(); public String getMapping(); - public boolean isJobIndex(); + public default boolean isJobIndex() { + return CommonName.JOB_INDEX.equals(getIndexName()); + } + + public default boolean isCustomResultIndex() { + return getIndexName() == CUSTOM_RESULT_INDEX; + } + + public default boolean isConfigIndex() { + return CommonName.CONFIG_INDEX.equals(getIndexName()); + } } diff --git a/src/main/java/org/opensearch/timeseries/model/Config.java b/src/main/java/org/opensearch/timeseries/model/Config.java index 8e6009c60..1dfdfb54c 100644 --- a/src/main/java/org/opensearch/timeseries/model/Config.java +++ b/src/main/java/org/opensearch/timeseries/model/Config.java @@ -39,6 +39,7 @@ import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.dataprocessor.ImputationMethod; import org.opensearch.timeseries.dataprocessor.ImputationOption; +import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.settings.TimeSeriesSettings; import org.owasp.encoder.Encode; @@ -90,7 +91,7 @@ public abstract class Config implements Writeable, ToXContentObject { protected TimeConfiguration interval; protected TimeConfiguration windowDelay; protected Integer shingleSize; - protected String customResultIndex; + protected String customResultIndexOrAlias; protected Map uiMetadata; protected Integer schemaVersion; protected Instant lastUpdateTime; @@ -255,7 +256,7 @@ protected Config( this.lastUpdateTime = lastUpdateTime; this.categoryFields = categoryFields; this.user = user; - this.customResultIndex = Strings.trimToNull(resultIndex); + this.customResultIndexOrAlias = Strings.trimToNull(resultIndex); this.imputationOption = imputationOption; this.issueType = null; this.errorMessage = null; @@ -297,14 +298,14 @@ public Config(StreamInput input) throws IOException { } else { this.uiMetadata = null; } - customResultIndex = input.readOptionalString(); + customResultIndexOrAlias = input.readOptionalString(); if (input.readBoolean()) { this.imputationOption = new ImputationOption(input); } else { this.imputationOption = null; } this.recencyEmphasis = input.readInt(); - this.seasonIntervals = input.readInt(); + this.seasonIntervals = input.readOptionalInt(); this.historyIntervals = input.readInt(); this.customResultIndexMinSize = input.readOptionalInt(); this.customResultIndexMinAge = input.readOptionalInt(); @@ -348,7 +349,7 @@ public void writeTo(StreamOutput output) throws IOException { } else { output.writeBoolean(false); } - output.writeOptionalString(customResultIndex); + output.writeOptionalString(customResultIndexOrAlias); if (imputationOption != null) { output.writeBoolean(true); imputationOption.writeTo(output); @@ -356,7 +357,7 @@ public void writeTo(StreamOutput output) throws IOException { output.writeBoolean(false); } output.writeInt(recencyEmphasis); - output.writeInt(seasonIntervals); + output.writeOptionalInt(seasonIntervals); output.writeInt(historyIntervals); output.writeOptionalInt(customResultIndexMinSize); output.writeOptionalInt(customResultIndexMinAge); @@ -410,7 +411,7 @@ public boolean equals(Object o) { && Objects.equal(shingleSize, config.shingleSize) && Objects.equal(categoryFields, config.categoryFields) && Objects.equal(user, config.user) - && Objects.equal(customResultIndex, config.customResultIndex) + && Objects.equal(customResultIndexOrAlias, config.customResultIndexOrAlias) && Objects.equal(imputationOption, config.imputationOption) && Objects.equal(recencyEmphasis, config.recencyEmphasis) && Objects.equal(seasonIntervals, config.seasonIntervals) @@ -437,7 +438,7 @@ public int hashCode() { categoryFields, schemaVersion, user, - customResultIndex, + customResultIndexOrAlias, imputationOption, recencyEmphasis, seasonIntervals, @@ -475,8 +476,8 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (user != null) { builder.field(USER_FIELD, user); } - if (customResultIndex != null) { - builder.field(RESULT_INDEX_FIELD, customResultIndex); + if (customResultIndexOrAlias != null) { + builder.field(RESULT_INDEX_FIELD, customResultIndexOrAlias); } if (imputationOption != null) { builder.field(IMPUTATION_OPTION_FIELD, imputationOption); @@ -593,8 +594,16 @@ public void setUser(User user) { this.user = user; } - public String getCustomResultIndex() { - return customResultIndex; + /** + * Since 2.15, custom result index is changed to an alias to ease rollover as rollover target can only be an alias or data stream. + * @return custom result index name or alias + */ + public String getCustomResultIndexOrAlias() { + return customResultIndexOrAlias; + } + + public String getCustomResultIndexPattern() { + return Strings.isEmpty(customResultIndexOrAlias) ? null : IndexManagement.getAllCustomResultIndexPattern(customResultIndexOrAlias); } public boolean isHighCardinality() { @@ -739,7 +748,7 @@ public String toString() { .append("categoryFields", categoryFields) .append("schemaVersion", schemaVersion) .append("user", user) - .append("customResultIndex", customResultIndex) + .append("customResultIndex", customResultIndexOrAlias) .append("imputationOption", imputationOption) .append("recencyEmphasis", recencyEmphasis) .append("seasonIntervals", seasonIntervals) diff --git a/src/main/java/org/opensearch/timeseries/model/Job.java b/src/main/java/org/opensearch/timeseries/model/Job.java index 140a2f673..492bf9dc7 100644 --- a/src/main/java/org/opensearch/timeseries/model/Job.java +++ b/src/main/java/org/opensearch/timeseries/model/Job.java @@ -267,7 +267,7 @@ public boolean equals(Object o) { && Objects.equal(getDisabledTime(), that.getDisabledTime()) && Objects.equal(getLastUpdateTime(), that.getLastUpdateTime()) && Objects.equal(getLockDurationSeconds(), that.getLockDurationSeconds()) - && Objects.equal(getCustomResultIndex(), that.getCustomResultIndex()) + && Objects.equal(getCustomResultIndexOrAlias(), that.getCustomResultIndexOrAlias()) && Objects.equal(getAnalysisType(), that.getAnalysisType()); } @@ -318,7 +318,7 @@ public User getUser() { return user; } - public String getCustomResultIndex() { + public String getCustomResultIndexOrAlias() { return resultIndex; } diff --git a/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java b/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java index faaf7852e..772062fef 100644 --- a/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java +++ b/src/main/java/org/opensearch/timeseries/ratelimit/ResultWriteWorker.java @@ -41,7 +41,7 @@ import org.opensearch.timeseries.transport.handler.IndexMemoryPressureAwareResultHandler; import org.opensearch.timeseries.util.ExceptionUtil; -public abstract class ResultWriteWorker, BatchRequestType extends ResultBulkRequest, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, ResultHandlerType extends IndexMemoryPressureAwareResultHandler> +public abstract class ResultWriteWorker, BatchRequestType extends ResultBulkRequest, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement, ResultHandlerType extends IndexMemoryPressureAwareResultHandler> extends BatchWorker { private static final Logger LOG = LogManager.getLogger(ResultWriteWorker.class); protected final ResultHandlerType resultHandler; @@ -199,7 +199,7 @@ private ActionListener> onGetConfig( id, resultToRetry.isHighPriority() ? RequestPriority.HIGH : RequestPriority.MEDIUM, resultToRetry, - config.getCustomResultIndex() + config.getCustomResultIndexOrAlias() ) ); diff --git a/src/main/java/org/opensearch/timeseries/AbstractSearchAction.java b/src/main/java/org/opensearch/timeseries/rest/AbstractSearchAction.java similarity index 96% rename from src/main/java/org/opensearch/timeseries/AbstractSearchAction.java rename to src/main/java/org/opensearch/timeseries/rest/AbstractSearchAction.java index 43681e78f..65a16a8d3 100644 --- a/src/main/java/org/opensearch/timeseries/AbstractSearchAction.java +++ b/src/main/java/org/opensearch/timeseries/rest/AbstractSearchAction.java @@ -9,7 +9,7 @@ * GitHub history for details. */ -package org.opensearch.timeseries; +package org.opensearch.timeseries.rest; import static org.opensearch.core.xcontent.ToXContent.EMPTY_PARAMS; import static org.opensearch.timeseries.util.RestHandlerUtils.getSourceContext; @@ -47,7 +47,7 @@ public abstract class AbstractSearchAction extends B protected final List urlPaths; protected final List> deprecatedPaths; protected final ActionType actionType; - protected final Supplier adEnabledSupplier; + protected final Supplier enabledSupplier; protected final String disabledMsg; private final Logger logger = LogManager.getLogger(AbstractSearchAction.class); @@ -66,13 +66,13 @@ public AbstractSearchAction( this.urlPaths = urlPaths; this.deprecatedPaths = deprecatedPaths; this.actionType = actionType; - this.adEnabledSupplier = adEnabledSupplier; + this.enabledSupplier = adEnabledSupplier; this.disabledMsg = disabledMsg; } @Override protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { - if (!adEnabledSupplier.get()) { + if (!enabledSupplier.get()) { throw new IllegalStateException(disabledMsg); } try { diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java index 0aead14b1..1adafb16a 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/AbstractTimeSeriesActionHandler.java @@ -235,18 +235,18 @@ public AbstractTimeSeriesActionHandler( */ @Override public void start(ActionListener listener) { - String resultIndex = config.getCustomResultIndex(); + String resultIndexOrAlias = config.getCustomResultIndexOrAlias(); // use default detector result index which is system index - if (resultIndex == null) { + if (resultIndexOrAlias == null) { createOrUpdateConfig(listener); return; } if (this.isDryRun) { - if (timeSeriesIndices.doesIndexExist(resultIndex)) { + if (timeSeriesIndices.doesIndexExist(resultIndexOrAlias) || timeSeriesIndices.doesAliasExist(resultIndexOrAlias)) { timeSeriesIndices .validateResultIndexAndExecute( - resultIndex, + resultIndexOrAlias, () -> createOrUpdateConfig(listener), false, ActionListener.wrap(r -> createOrUpdateConfig(listener), ex -> { @@ -262,7 +262,7 @@ public void start(ActionListener listener) { } } // use custom result index if not validating and resultIndex not null - timeSeriesIndices.initCustomResultIndexAndExecute(resultIndex, () -> createOrUpdateConfig(listener), listener); + timeSeriesIndices.initCustomResultIndexAndExecute(resultIndexOrAlias, () -> createOrUpdateConfig(listener), listener); } // if isDryRun is true then this method is being executed through Validation API meaning actual @@ -418,7 +418,7 @@ private void onGetConfigResponse(GetResponse response, boolean indexingDryRun, S listener.onFailure(new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_CATEGORY_FIELD, RestStatus.BAD_REQUEST)); return; } - if (!Objects.equals(existingConfig.getCustomResultIndex(), config.getCustomResultIndex())) { + if (!Objects.equals(existingConfig.getCustomResultIndexOrAlias(), config.getCustomResultIndexOrAlias())) { listener .onFailure( new OpenSearchStatusException(CommonMessages.CAN_NOT_CHANGE_CUSTOM_RESULT_INDEX, RestStatus.BAD_REQUEST) diff --git a/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java b/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java index 9dfdb5039..cf60e5978 100644 --- a/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java +++ b/src/main/java/org/opensearch/timeseries/rest/handler/IndexJobActionHandler.java @@ -222,7 +222,7 @@ private void createJob(Config config, TransportService transportService, ActionL Instant.now(), duration.getSeconds(), config.getUser(), - config.getCustomResultIndex(), + config.getCustomResultIndexOrAlias(), analysisType ); @@ -276,7 +276,7 @@ private void onGetJobForWrite( Instant.now(), job.getLockDurationSeconds(), job.getUser(), - job.getCustomResultIndex(), + job.getCustomResultIndexOrAlias(), job.getAnalysisType() ); // Get latest realtime task and check its state before index job. Will reset running realtime task @@ -429,7 +429,7 @@ public void stopJob(String configId, TransportService transportService, ActionLi Instant.now(), job.getLockDurationSeconds(), job.getUser(), - job.getCustomResultIndex(), + job.getCustomResultIndexOrAlias(), job.getAnalysisType() ); indexJob( @@ -538,7 +538,7 @@ public void startConfig( listener.onFailure(new OpenSearchStatusException(errorMessage, RestStatus.BAD_REQUEST)); return; } - String resultIndex = config.get().getCustomResultIndex(); + String resultIndex = config.get().getCustomResultIndexOrAlias(); if (resultIndex == null) { startRealtimeOrHistoricalAnalysis(dateRange, user, transportService, listener, config); return; diff --git a/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java index bc9093ba7..b5f08785a 100644 --- a/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/AbstractSingleStreamResultTransportAction.java @@ -54,7 +54,7 @@ import com.amazon.randomcutforest.parkservices.ThresholdedRandomCutForest; -public abstract class AbstractSingleStreamResultTransportAction & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriterType extends CheckpointWriteWorker, CheckpointMaintainerType extends CheckpointMaintainWorker, CacheBufferType extends CacheBuffer, PriorityCacheType extends PriorityCache, CacheProviderType extends CacheProvider, ResultType extends IndexableResult, RCFResultType extends IntermediateResult, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, CheckpointReadWorkerType extends CheckpointReadWorker, ResultWriteRequestType extends ResultWriteRequest, BatchRequestType extends ResultBulkRequest, ResultHandlerType extends IndexMemoryPressureAwareResultHandler, ResultWriteWorkerType extends ResultWriteWorker> +public abstract class AbstractSingleStreamResultTransportAction & TimeSeriesIndex, IndexManagementType extends IndexManagement, CheckpointDaoType extends CheckpointDao, CheckpointWriterType extends CheckpointWriteWorker, CheckpointMaintainerType extends CheckpointMaintainWorker, CacheBufferType extends CacheBuffer, PriorityCacheType extends PriorityCache, CacheProviderType extends CacheProvider, ResultType extends IndexableResult, RCFResultType extends IntermediateResult, ColdStarterType extends ModelColdStart, ModelManagerType extends ModelManager, CacheType extends TimeSeriesCache, SaveResultStrategyType extends SaveResultStrategy, ColdStartWorkerType extends ColdStartWorker, CheckpointReadWorkerType extends CheckpointReadWorker, ResultWriteRequestType extends ResultWriteRequest, BatchRequestType extends ResultBulkRequest, ResultHandlerType extends IndexMemoryPressureAwareResultHandler, ResultWriteWorkerType extends ResultWriteWorker> extends HandledTransportAction { private static final Logger LOG = LogManager.getLogger(AbstractSingleStreamResultTransportAction.class); protected CircuitBreakerService circuitBreakerService; diff --git a/src/main/java/org/opensearch/timeseries/transport/ResultBulkRequest.java b/src/main/java/org/opensearch/timeseries/transport/ResultBulkRequest.java index cd8efc9de..74a6d7bbd 100644 --- a/src/main/java/org/opensearch/timeseries/transport/ResultBulkRequest.java +++ b/src/main/java/org/opensearch/timeseries/transport/ResultBulkRequest.java @@ -60,13 +60,14 @@ public void writeTo(StreamOutput out) throws IOException { for (ResultWriteRequestType result : results) { result.writeTo(out); } + } /** * * @return all of the results to send */ - public List getAnomalyResults() { + public List getResults() { return results; } diff --git a/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java b/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java index f070c38c6..20b3e9fba 100644 --- a/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java +++ b/src/main/java/org/opensearch/timeseries/transport/ResultBulkTransportAction.java @@ -86,7 +86,7 @@ protected void doExecute(Task task, ResultBulkRequestType request, ActionListene long totalBytes = indexingPressure.getCurrentCombinedCoordinatingAndPrimaryBytes() + indexingPressure.getCurrentReplicaBytes(); float indexingPressurePercent = (float) totalBytes / primaryAndCoordinatingLimits; @SuppressWarnings("rawtypes") - List results = request.getAnomalyResults(); + List results = request.getResults(); if (results == null || results.size() < 1) { listener.onResponse(new ResultBulkResponse()); diff --git a/src/main/java/org/opensearch/timeseries/transport/handler/IndexMemoryPressureAwareResultHandler.java b/src/main/java/org/opensearch/timeseries/transport/handler/IndexMemoryPressureAwareResultHandler.java index 1d107cd63..a5f914f4e 100644 --- a/src/main/java/org/opensearch/timeseries/transport/handler/IndexMemoryPressureAwareResultHandler.java +++ b/src/main/java/org/opensearch/timeseries/transport/handler/IndexMemoryPressureAwareResultHandler.java @@ -11,6 +11,11 @@ package org.opensearch.timeseries.transport.handler; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.ExceptionsHelper; @@ -24,6 +29,9 @@ import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.indices.TimeSeriesIndex; +import org.opensearch.timeseries.model.IndexableResult; +import org.opensearch.timeseries.ratelimit.ResultWriteRequest; +import org.opensearch.timeseries.transport.ResultBulkRequest; /** * Different from ResultIndexingHandler and ResultBulkIndexingHandler, this class uses @@ -36,7 +44,7 @@ * @param forecasting or AD result index * @param Index management class */ -public abstract class IndexMemoryPressureAwareResultHandler & TimeSeriesIndex, IndexManagementType extends IndexManagement> { +public abstract class IndexMemoryPressureAwareResultHandler, BatchRequestType extends ResultBulkRequest, BatchResponseType, IndexType extends Enum & TimeSeriesIndex, IndexManagementType extends IndexManagement> { private static final Logger LOG = LogManager.getLogger(IndexMemoryPressureAwareResultHandler.class); protected final Client client; @@ -62,33 +70,76 @@ public void flush(BatchRequestType currentBulkRequest, ActionListener { - if (initResponse.isAcknowledged()) { - bulk(currentBulkRequest, listener); - } else { - LOG.warn("Creating result index with mappings call not acknowledged."); - listener.onFailure(new TimeSeriesException("", "Creating result index with mappings call not acknowledged.")); - } - }, exception -> { - if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { - // It is possible the index has been created while we sending the create request - bulk(currentBulkRequest, listener); - } else { - LOG.warn("Unexpected error creating result index", exception); - listener.onFailure(exception); - } - })); - } else { - bulk(currentBulkRequest, listener); + Set customResultIndexOrAlias = new HashSet<>(); + for (ResultWriteRequestType result : currentBulkRequest.getResults()) { + if (result.getResultIndex() != null) { + customResultIndexOrAlias.add(result.getResultIndex()); } - } catch (Exception e) { - LOG.warn("Error in bulking results", e); - listener.onFailure(e); + } + List customResultIndexOrAliasList = new ArrayList<>(customResultIndexOrAlias); + + // We create custom result index when creating a detector. Custom result index can be rolled over and thus we may need to create a + // new one. + if (!timeSeriesIndices.doesDefaultResultIndexExist()) { + timeSeriesIndices.initDefaultResultIndexDirectly(ActionListener.wrap(initResponse -> { + if (initResponse.isAcknowledged()) { + initCustomIndices(currentBulkRequest, customResultIndexOrAliasList, listener); + } else { + LOG.warn("Creating result index with mappings call not acknowledged."); + listener.onFailure(new TimeSeriesException("", "Creating result index with mappings call not acknowledged.")); + } + }, exception -> { + if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { + // It is possible the index has been created while we sending the create request + initCustomIndices(currentBulkRequest, customResultIndexOrAliasList, listener); + } else { + LOG.warn("Unexpected error creating result index", exception); + listener.onFailure(exception); + } + })); + } else { + initCustomIndices(currentBulkRequest, customResultIndexOrAliasList, listener); + } + } + + private void initCustomIndices( + BatchRequestType currentBulkRequest, + List customResultIndexOrAlias, + ActionListener listener + ) { + initCustomIndicesIteration(0, currentBulkRequest, customResultIndexOrAlias, listener); + } + + private void initCustomIndicesIteration( + int i, + BatchRequestType currentBulkRequest, + List customResultIndexOrAlias, + ActionListener listener + ) { + if (i >= customResultIndexOrAlias.size()) { + bulk(currentBulkRequest, listener); + return; + } + String indexOrAliasName = customResultIndexOrAlias.get(i); + if (!timeSeriesIndices.doesIndexExist(indexOrAliasName) && !timeSeriesIndices.doesAliasExist(indexOrAliasName)) { + timeSeriesIndices.initCustomResultIndexDirectly(indexOrAliasName, ActionListener.wrap(initResponse -> { + if (initResponse.isAcknowledged()) { + initCustomIndicesIteration(i + 1, currentBulkRequest, customResultIndexOrAlias, listener); + } else { + LOG.warn("Creating result index {} with mappings call not acknowledged.", indexOrAliasName); + initCustomIndicesIteration(i + 1, currentBulkRequest, customResultIndexOrAlias, listener); + } + }, exception -> { + if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { + // It is possible the index has been created while we sending the create request + initCustomIndicesIteration(i + 1, currentBulkRequest, customResultIndexOrAlias, listener); + } else { + LOG.warn("Unexpected error creating result index", exception); + initCustomIndicesIteration(i + 1, currentBulkRequest, customResultIndexOrAlias, listener); + } + })); + } else { + bulk(currentBulkRequest, listener); } } diff --git a/src/main/java/org/opensearch/timeseries/transport/handler/ResultBulkIndexingHandler.java b/src/main/java/org/opensearch/timeseries/transport/handler/ResultBulkIndexingHandler.java index 4cec1c127..5a4c94a5c 100644 --- a/src/main/java/org/opensearch/timeseries/transport/handler/ResultBulkIndexingHandler.java +++ b/src/main/java/org/opensearch/timeseries/transport/handler/ResultBulkIndexingHandler.java @@ -32,7 +32,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.common.exception.EndRunException; import org.opensearch.timeseries.common.exception.TimeSeriesException; -import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.model.IndexableResult; @@ -79,32 +78,49 @@ public ResultBulkIndexingHandler( /** * Bulk index results. Create result index first if it doesn't exist. * - * @param resultIndex result index + * @param resultIndexOrAlias result index * @param results results to save * @param configId Config Id * @param listener action listener */ - public void bulk(String resultIndex, List results, String configId, ActionListener listener) { + public void bulk(String resultIndexOrAlias, List results, String configId, ActionListener listener) { if (results == null || results.size() == 0) { listener.onResponse(null); return; } try { - if (resultIndex != null) { - // Only create custom result index when creating detector, won’t recreate custom AD result index in realtime - // job and historical analysis later if it’s deleted. If user delete the custom AD result index, and AD plugin - // recreate it, that may bring confusion. - if (!timeSeriesIndices.doesIndexExist(resultIndex)) { - throw new EndRunException(configId, CommonMessages.CAN_NOT_FIND_RESULT_INDEX + resultIndex, true); + if (resultIndexOrAlias != null) { + // We create custom result index when creating a detector. Custom result index can be rolled over and thus we may need to + // create a new one. + if (!timeSeriesIndices.doesIndexExist(resultIndexOrAlias) && !timeSeriesIndices.doesAliasExist(resultIndexOrAlias)) { + timeSeriesIndices.initCustomResultIndexDirectly(resultIndexOrAlias, ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + bulk(resultIndexOrAlias, results, listener); + } else { + String error = "Creating custom result index with mappings call not acknowledged"; + LOG.error(error); + listener.onFailure(new TimeSeriesException(error)); + } + }, exception -> { + if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { + // It is possible the index has been created while we sending the create request + bulk(resultIndexOrAlias, results, listener); + } else { + listener.onFailure(exception); + } + })); + } else { + timeSeriesIndices.validateResultIndexMapping(resultIndexOrAlias, ActionListener.wrap(valid -> { + if (!valid) { + throw new EndRunException(configId, "wrong index mapping of custom result index", true); + } else { + bulk(resultIndexOrAlias, results, listener); + } + }, listener::onFailure)); } - if (!timeSeriesIndices.isValidResultIndexMapping(resultIndex)) { - throw new EndRunException(configId, "wrong index mapping of custom result index", true); - } - bulk(resultIndex, results, listener); return; - } - if (!timeSeriesIndices.doesDefaultResultIndexExist()) { + } else if (!timeSeriesIndices.doesDefaultResultIndexExist()) { timeSeriesIndices.initDefaultResultIndexDirectly(ActionListener.wrap(response -> { if (response.isAcknowledged()) { bulk(results, listener); diff --git a/src/main/java/org/opensearch/timeseries/transport/handler/ResultIndexingHandler.java b/src/main/java/org/opensearch/timeseries/transport/handler/ResultIndexingHandler.java index 87e59b116..d7beb64a8 100644 --- a/src/main/java/org/opensearch/timeseries/transport/handler/ResultIndexingHandler.java +++ b/src/main/java/org/opensearch/timeseries/transport/handler/ResultIndexingHandler.java @@ -36,7 +36,6 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.common.exception.EndRunException; import org.opensearch.timeseries.common.exception.TimeSeriesException; -import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.indices.IndexManagement; import org.opensearch.timeseries.indices.TimeSeriesIndex; import org.opensearch.timeseries.model.IndexableResult; @@ -110,48 +109,81 @@ public void setFixedDoc(boolean fixedDoc) { } // TODO: check if user has permission to index. - public void index(ResultType toSave, String detectorId, String customIndexName) { + public void index(ResultType toSave, String detectorId, String indexOrAliasName) { try { - if (customIndexName != null) { - if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, customIndexName)) { + if (indexOrAliasName != null) { + if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, indexOrAliasName)) { LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, detectorId)); return; } - // Only create custom AD result index when create detector, won’t recreate custom AD result index in realtime - // job and historical analysis later if it’s deleted. If user delete the custom AD result index, and AD plugin - // recreate it, that may bring confusion. - if (!timeSeriesIndices.doesIndexExist(customIndexName)) { - throw new EndRunException(detectorId, CommonMessages.CAN_NOT_FIND_RESULT_INDEX + customIndexName, true); + // We create custom result index when creating a detector. Custom result index can be rolled over and thus we may need to + // create a new one. + if (!timeSeriesIndices.doesIndexExist(indexOrAliasName) && !timeSeriesIndices.doesAliasExist(indexOrAliasName)) { + timeSeriesIndices.initCustomResultIndexDirectly(indexOrAliasName, ActionListener.wrap(response -> { + if (response.isAcknowledged()) { + save(toSave, detectorId, indexOrAliasName); + } else { + throw new TimeSeriesException( + detectorId, + String + .format( + Locale.ROOT, + "Creating custom result index %s with mappings call not acknowledged", + indexOrAliasName + ) + ); + } + }, exception -> { + if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { + // It is possible the index has been created while we sending the create request + save(toSave, detectorId, indexOrAliasName); + } else { + throw new TimeSeriesException( + detectorId, + String.format(Locale.ROOT, "cannot create result index %s", indexOrAliasName), + exception + ); + } + })); + } else { + timeSeriesIndices.validateResultIndexMapping(indexOrAliasName, ActionListener.wrap(valid -> { + if (!valid) { + throw new EndRunException(detectorId, "wrong index mapping of custom AD result index", true); + } else { + save(toSave, detectorId, indexOrAliasName); + } + }, exception -> { + throw new TimeSeriesException( + detectorId, + String.format(Locale.ROOT, "cannot validate result index %s", indexOrAliasName), + exception + ); + })); } - if (!timeSeriesIndices.isValidResultIndexMapping(customIndexName)) { - throw new EndRunException(detectorId, "wrong index mapping of custom AD result index", true); - } - save(toSave, detectorId, customIndexName); - return; - } - - if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, this.defaultResultIndexName)) { - LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, detectorId)); - return; - } - if (!timeSeriesIndices.doesDefaultResultIndexExist()) { - timeSeriesIndices - .initDefaultResultIndexDirectly( - ActionListener.wrap(initResponse -> onCreateIndexResponse(initResponse, toSave, detectorId), exception -> { - if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { - // It is possible the index has been created while we sending the create request - save(toSave, detectorId); - } else { - throw new TimeSeriesException( - detectorId, - String.format(Locale.ROOT, "Unexpected error creating index %s", defaultResultIndexName), - exception - ); - } - }) - ); } else { - save(toSave, detectorId); + if (indexUtils.checkIndicesBlocked(clusterService.state(), ClusterBlockLevel.WRITE, this.defaultResultIndexName)) { + LOG.warn(String.format(Locale.ROOT, CANNOT_SAVE_ERR_MSG, detectorId)); + return; + } + if (!timeSeriesIndices.doesDefaultResultIndexExist()) { + timeSeriesIndices + .initDefaultResultIndexDirectly( + ActionListener.wrap(initResponse -> onCreateIndexResponse(initResponse, toSave, detectorId), exception -> { + if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { + // It is possible the index has been created while we sending the create request + save(toSave, detectorId); + } else { + throw new TimeSeriesException( + detectorId, + String.format(Locale.ROOT, "Unexpected error creating index %s", defaultResultIndexName), + exception + ); + } + }) + ); + } else { + save(toSave, detectorId); + } } } catch (Exception e) { throw new TimeSeriesException( diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java index e11402485..0db8b9ef5 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorJobRunnerTests.java @@ -461,7 +461,7 @@ private void testRunAdJobWithEndRunExceptionNowAndStopAdJob(boolean jobExists, b Instant.now(), 60L, TestHelpers.randomUser(), - jobParameter.getCustomResultIndex(), + jobParameter.getCustomResultIndexOrAlias(), AnalysisType.AD ).toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS) ), diff --git a/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java b/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java index 59b786782..74505be01 100644 --- a/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java +++ b/src/test/java/org/opensearch/ad/AnomalyDetectorRestTestCase.java @@ -15,14 +15,26 @@ import java.io.IOException; import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.time.Instant; import java.util.ArrayList; import java.util.Collections; import java.util.Locale; import java.util.Map; +import javax.management.MBeanServerInvocationHandler; +import javax.management.MalformedObjectNameException; +import javax.management.ObjectName; +import javax.management.remote.JMXConnector; +import javax.management.remote.JMXConnectorFactory; +import javax.management.remote.JMXServiceURL; + import org.apache.http.HttpHeaders; import org.apache.http.message.BasicHeader; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.core.Logger; +import org.junit.AfterClass; import org.opensearch.ad.model.ADTask; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.AnomalyDetectorExecutionInput; @@ -51,6 +63,7 @@ import com.google.gson.JsonArray; public abstract class AnomalyDetectorRestTestCase extends ODFERestTestCase { + private static final Logger LOG = (Logger) LogManager.getLogger(AnomalyDetectorRestTestCase.class); public static final int MAX_RETRY_TIMES = 10; @@ -305,7 +318,7 @@ public ToXContentObject[] getConfig(String detectorId, BasicHeader header, boole detector.getLastUpdateTime(), null, detector.getUser(), - detector.getCustomResultIndex(), + detector.getCustomResultIndexOrAlias(), detector.getImputationOption(), detector.getRecencyEmphasis(), detector.getSeasonIntervals(), @@ -670,4 +683,46 @@ protected Response validateAnomalyDetector(AnomalyDetector detector, RestClient ); } + /** + * We need to be able to dump the jacoco coverage before cluster is shut down. + * The new internal testing framework removed some of the gradle tasks we were listening to + * to choose a good time to do it. This will dump the executionData to file after each test. + * TODO: This is also currently just overwriting integTest.exec with the updated execData without + * resetting after writing each time. This can be improved to either write an exec file per test + * or by letting jacoco append to the file + */ + public interface IProxy { + byte[] getExecutionData(boolean reset); + + void dump(boolean reset); + + void reset(); + } + + @AfterClass + public static void dumpCoverage() throws IOException, MalformedObjectNameException { + // jacoco.dir is set in esplugin-coverage.gradle, if it doesn't exist we don't + // want to collect coverage so we can return early + String jacocoBuildPath = System.getProperty("jacoco.dir"); + if (org.opensearch.core.common.Strings.isNullOrEmpty(jacocoBuildPath)) { + return; + } + + String serverUrl = System.getProperty("jmx.serviceUrl"); + if (serverUrl == null) { + LOG.error("Failed to dump coverage because JMX Service URL is null"); + throw new IllegalArgumentException("JMX Service URL is null"); + } + + try (JMXConnector connector = JMXConnectorFactory.connect(new JMXServiceURL(serverUrl))) { + IProxy proxy = MBeanServerInvocationHandler + .newProxyInstance(connector.getMBeanServerConnection(), new ObjectName("org.jacoco:type=Runtime"), IProxy.class, false); + + Path path = Path.of(Path.of(jacocoBuildPath, "integTest.exec").toFile().getCanonicalPath()); + Files.write(path, proxy.getExecutionData(false)); + } catch (Exception ex) { + LOG.error("Failed to dump coverage: ", ex); + throw new RuntimeException("Failed to dump coverage: " + ex); + } + } } diff --git a/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java b/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java index 6318fc29a..f9cb1d018 100644 --- a/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java +++ b/src/test/java/org/opensearch/ad/e2e/RuleModelPerfIT.java @@ -45,7 +45,7 @@ public void testRule() throws Exception { minPrecision.put("Scottsdale", 0.5); Map minRecall = new HashMap<>(); minRecall.put("Phoenix", 0.9); - minRecall.put("Scottsdale", 0.9); + minRecall.put("Scottsdale", 0.6); verifyRule("rule", 10, minPrecision.size(), 1500, minPrecision, minRecall, 20); } } diff --git a/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java b/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java index eb68bbe7f..83672a63c 100644 --- a/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java +++ b/src/test/java/org/opensearch/ad/indices/AnomalyDetectionIndicesTests.java @@ -19,6 +19,7 @@ import org.opensearch.ad.constant.ADCommonName; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.plugins.Plugin; import org.opensearch.timeseries.TestHelpers; import org.opensearch.timeseries.TimeSeriesAnalyticsPlugin; @@ -59,7 +60,8 @@ public void setup() throws IOException { client().threadPool(), settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); } @@ -135,7 +137,7 @@ public void testValidateCustomIndexForBackendJobInvalidMapping() { validateCustomIndexForBackendJobInvalidMapping(indices); } - public void testValidateCustomIndexForBackendJobNoIndex() { + public void testValidateCustomIndexForBackendJobNoIndex() throws InterruptedException { validateCustomIndexForBackendJobNoIndex(indices); } } diff --git a/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java b/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java index da3d049b8..637733067 100644 --- a/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java +++ b/src/test/java/org/opensearch/ad/indices/CustomIndexTests.java @@ -20,6 +20,8 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.opensearch.Version; import org.opensearch.ad.model.AnomalyResult; @@ -33,6 +35,8 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.timeseries.AbstractTimeSeriesTest; import org.opensearch.timeseries.constant.CommonName; import org.opensearch.timeseries.settings.TimeSeriesSettings; @@ -88,7 +92,8 @@ public void setUp() throws Exception { threadPool, settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); } @@ -222,7 +227,7 @@ private Map createMapping() { return mappings; } - public void testCorrectMapping() throws IOException { + public void testCorrectMapping() throws IOException, InterruptedException { Map mappings = createMapping(); IndexMetadata indexMetadata1 = new IndexMetadata.Builder(customIndexName) @@ -238,15 +243,24 @@ public void testCorrectMapping() throws IOException { when(clusterService.state()) .thenReturn(ClusterState.builder(clusterName).metadata(Metadata.builder().put(indexMetadata1, true).build()).build()); - assertTrue(adIndices.isValidResultIndexMapping(customIndexName)); + CountDownLatch countDown = new CountDownLatch(1); + adIndices.validateResultIndexMapping(customIndexName, ActionListener.wrap(valid -> { + assertTrue(valid); + countDown.countDown(); + }, exception -> { + assertTrue(exception.getMessage(), false); + countDown.countDown(); + })); + countDown.await(60, TimeUnit.SECONDS); } /** * Test that the mapping returned by get mapping request returns the same mapping * but with different order * @throws IOException when MappingMetadata constructor throws errors + * @throws InterruptedException */ - public void testCorrectReordered() throws IOException { + public void testCorrectReordered() throws IOException, InterruptedException { Map mappings = createMapping(); Map feature_mapping = new HashMap<>(); @@ -272,15 +286,24 @@ public void testCorrectReordered() throws IOException { when(clusterService.state()) .thenReturn(ClusterState.builder(clusterName).metadata(Metadata.builder().put(indexMetadata1, true).build()).build()); - assertTrue(adIndices.isValidResultIndexMapping(customIndexName)); + CountDownLatch countDown = new CountDownLatch(1); + adIndices.validateResultIndexMapping(customIndexName, ActionListener.wrap(valid -> { + assertTrue(valid); + countDown.countDown(); + }, exception -> { + assertTrue(exception.getMessage(), false); + countDown.countDown(); + })); + countDown.await(60, TimeUnit.SECONDS); } /** * Test that the mapping returned by get mapping request returns a super set * of result index mapping * @throws IOException when MappingMetadata constructor throws errors + * @throws InterruptedException */ - public void testSuperset() throws IOException { + public void testSuperset() throws IOException, InterruptedException { Map mappings = createMapping(); Map feature_mapping = new HashMap<>(); @@ -305,10 +328,18 @@ public void testSuperset() throws IOException { when(clusterService.state()) .thenReturn(ClusterState.builder(clusterName).metadata(Metadata.builder().put(indexMetadata1, true).build()).build()); - assertTrue(adIndices.isValidResultIndexMapping(customIndexName)); + CountDownLatch countDown = new CountDownLatch(1); + adIndices.validateResultIndexMapping(customIndexName, ActionListener.wrap(valid -> { + assertTrue(valid); + countDown.countDown(); + }, exception -> { + assertTrue(exception.getMessage(), false); + countDown.countDown(); + })); + countDown.await(60, TimeUnit.SECONDS); } - public void testInCorrectMapping() throws IOException { + public void testInCorrectMapping() throws IOException, InterruptedException { Map mappings = new HashMap<>(); Map past_mapping = new HashMap<>(); @@ -340,7 +371,15 @@ public void testInCorrectMapping() throws IOException { when(clusterService.state()) .thenReturn(ClusterState.builder(clusterName).metadata(Metadata.builder().put(indexMetadata1, true).build()).build()); - assertTrue(!adIndices.isValidResultIndexMapping(customIndexName)); + CountDownLatch countDown = new CountDownLatch(1); + adIndices.validateResultIndexMapping(customIndexName, ActionListener.wrap(valid -> { + assertTrue("Should be invalid mapping", !valid); + countDown.countDown(); + }, exception -> { + assertTrue("should not reach here.", false); + countDown.countDown(); + })); + assertTrue(countDown.await(60, TimeUnit.SECONDS)); } } diff --git a/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java b/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java index edb932d4c..ec541a294 100644 --- a/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java +++ b/src/test/java/org/opensearch/ad/indices/InitAnomalyDetectionIndicesTests.java @@ -40,6 +40,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AbstractTimeSeriesTest; import org.opensearch.timeseries.constant.CommonName; @@ -103,7 +104,8 @@ public void setUp() throws Exception { threadPool, settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); } diff --git a/src/test/java/org/opensearch/ad/indices/RolloverTests.java b/src/test/java/org/opensearch/ad/indices/RolloverTests.java index 34cd44c20..e021f5976 100644 --- a/src/test/java/org/opensearch/ad/indices/RolloverTests.java +++ b/src/test/java/org/opensearch/ad/indices/RolloverTests.java @@ -46,6 +46,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AbstractTimeSeriesTest; import org.opensearch.timeseries.settings.TimeSeriesSettings; @@ -103,7 +104,8 @@ public void setUp() throws Exception { threadPool, settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); clusterAdminClient = mock(ClusterAdminClient.class); diff --git a/src/test/java/org/opensearch/ad/indices/UpdateMappingTests.java b/src/test/java/org/opensearch/ad/indices/UpdateMappingTests.java index 9be285d5c..e86a26e55 100644 --- a/src/test/java/org/opensearch/ad/indices/UpdateMappingTests.java +++ b/src/test/java/org/opensearch/ad/indices/UpdateMappingTests.java @@ -39,6 +39,7 @@ import org.opensearch.action.admin.indices.settings.get.GetSettingsAction; import org.opensearch.action.admin.indices.settings.get.GetSettingsResponse; import org.opensearch.action.admin.indices.settings.put.UpdateSettingsRequest; +import org.opensearch.action.search.SearchResponse; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.ad.settings.AnomalyDetectorSettings; import org.opensearch.client.AdminClient; @@ -54,6 +55,7 @@ import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.index.IndexNotFoundException; import org.opensearch.timeseries.AbstractTimeSeriesTest; import org.opensearch.timeseries.settings.TimeSeriesSettings; @@ -129,19 +131,28 @@ public void setUp() throws Exception { threadPool, settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); + + // simulate search config index for custom result index + doAnswer(invocation -> { + ActionListener listener = (ActionListener) invocation.getArgument(1); + + listener.onResponse(null); + return null; + }).when(client).search(any(), any()); } public void testNoIndexToUpdate() { adIndices.update(); verify(indicesAdminClient, never()).putMapping(any(), any()); // for an index, we may check doesAliasExists/doesIndexExists for both mapping and setting - // 5 indices * mapping/setting checks = 10 - verify(clusterService, times(10)).state(); + // 5 indices * mapping/setting checks + 1 doesIndexExist in updateCustomResultIndexMapping = 11 + verify(clusterService, times(11)).state(); adIndices.update(); // we will not trigger new check since we have checked all indices before - verify(clusterService, times(10)).state(); + verify(clusterService, times(11)).state(); } @SuppressWarnings({ "serial", "unchecked" }) @@ -279,10 +290,10 @@ public void testJobIndexNotFound() { } @SuppressWarnings("unchecked") - public void testFailtoUpdateJobSetting() { + public void testFailtoUpdateJobSetting() throws InterruptedException { setUpSuccessfulGetJobSetting(); doAnswer(invocation -> { - ActionListener listener = (ActionListener) invocation.getArgument(2); + ActionListener listener = (ActionListener) invocation.getArgument(1); listener.onFailure(new RuntimeException(ADIndex.JOB.getIndexName())); return null; @@ -308,7 +319,7 @@ public void testTooManyUpdate() throws IOException { return null; }).when(indicesAdminClient).updateSettings(any(), any()); - adIndices = new ADIndexManagement(client, clusterService, threadPool, settings, nodeFilter, 1); + adIndices = new ADIndexManagement(client, clusterService, threadPool, settings, nodeFilter, 1, NamedXContentRegistry.EMPTY); adIndices.update(); adIndices.update(); diff --git a/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java b/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java index bfdb2a84c..422c1be94 100644 --- a/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java +++ b/src/test/java/org/opensearch/ad/model/AnomalyDetectorTests.java @@ -79,7 +79,7 @@ public void testParseAnomalyDetectorWithCustomIndex() throws IOException { detectorString = detectorString .replaceFirst("\\{", String.format(Locale.ROOT, "{\"%s\":\"%s\",", randomAlphaOfLength(5), randomAlphaOfLength(5))); AnomalyDetector parsedDetector = AnomalyDetector.parse(TestHelpers.parser(detectorString)); - assertEquals("Parsing result index doesn't work", resultIndex, parsedDetector.getCustomResultIndex()); + assertEquals("Parsing result index doesn't work", resultIndex, parsedDetector.getCustomResultIndexOrAlias()); assertEquals("Parsing anomaly detector doesn't work", detector, parsedDetector); } diff --git a/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java b/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java index aadb8156e..7b1625395 100644 --- a/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/HistoricalAnalysisRestApiIT.java @@ -317,7 +317,7 @@ private AnomalyDetector randomAnomalyDetector(AnomalyDetector detector) { detector.getLastUpdateTime(), detector.getCategoryFields(), detector.getUser(), - detector.getCustomResultIndex(), + detector.getCustomResultIndexOrAlias(), detector.getImputationOption(), randomIntBetween(1, 10000), randomInt(TimeSeriesSettings.MAX_SHINGLE_SIZE / 2), diff --git a/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java b/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java index 7acf98273..63bfcc92c 100644 --- a/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java +++ b/src/test/java/org/opensearch/ad/rest/SecureADRestIT.java @@ -415,7 +415,7 @@ public void testCreateAnomalyDetectorWithCustomResultIndex() throws IOException resultIndex = ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "test2"; TestHelpers.createIndexWithTimeField(client(), anomalyDetector.getIndices().get(0), anomalyDetector.getTimeField()); AnomalyDetector detectorOfCat = createAnomalyDetector(cloneDetector(anomalyDetector, resultIndex), true, catClient); - assertEquals(resultIndex, detectorOfCat.getCustomResultIndex()); + assertEquals(resultIndex, detectorOfCat.getCustomResultIndexOrAlias()); } public void testPreviewAnomalyDetectorWithWriteAccess() throws IOException { diff --git a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java index 25b3c2bb2..5ba75b9bc 100644 --- a/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java +++ b/src/test/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandlerTests.java @@ -356,7 +356,7 @@ public void testIndexException() throws IOException { ); when(anomalyDetectionIndices.doesIndexExist(anyString())).thenReturn(false); handler.startJob(detector, transportService, listener); - verify(anomalyResultHandler, times(1)).index(any(), any(), eq(null)); + verify(anomalyResultHandler, times(1)).index(any(), any(), eq(ADCommonName.CUSTOM_RESULT_INDEX_PREFIX + "index")); verify(threadPool, times(1)).schedule(any(), any(), any()); } } diff --git a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java index de206fcf5..6202f0fc9 100644 --- a/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java +++ b/src/test/java/org/opensearch/ad/task/ADTaskManagerTests.java @@ -89,7 +89,10 @@ import org.opensearch.ad.transport.ForwardADTaskRequest; import org.opensearch.client.Client; import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Settings; @@ -112,6 +115,7 @@ import org.opensearch.search.SearchHits; import org.opensearch.search.aggregations.InternalAggregations; import org.opensearch.search.internal.InternalSearchResponse; +import org.opensearch.test.ClusterServiceUtils; import org.opensearch.threadpool.ThreadPool; import org.opensearch.timeseries.AbstractTimeSeriesTest; import org.opensearch.timeseries.AnalysisType; @@ -298,6 +302,12 @@ public void onFailure(Exception e) {} emptySet(), Version.CURRENT ); + ClusterState initialClusterState = ClusterState + .builder(new ClusterName(ClusterServiceUtils.class.getSimpleName())) + .nodes(DiscoveryNodes.builder().add(node1).localNodeId(node1.getId()).masterNodeId(node1.getId())) + .blocks(ClusterBlocks.EMPTY_CLUSTER_BLOCK) + .build(); + clusterService.getClusterApplierService().setInitialState(initialClusterState); maxRunningEntities = MAX_RUNNING_ENTITIES_PER_DETECTOR_FOR_HISTORICAL_ANALYSIS.get(settings).intValue(); } diff --git a/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java b/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java index df7176005..32e9e14e9 100644 --- a/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java +++ b/src/test/java/org/opensearch/ad/transport/handler/AnomalyResultBulkIndexHandlerTests.java @@ -14,6 +14,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.eq; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -104,17 +105,22 @@ public void testNullAnomalyResults() { public void testAnomalyResultBulkIndexHandler_IndexNotExist() { when(anomalyDetectionIndices.doesIndexExist("testIndex")).thenReturn(false); + when(anomalyDetectionIndices.doesAliasExist("testIndex")).thenReturn(false); AnomalyResult anomalyResult = mock(AnomalyResult.class); when(anomalyResult.getConfigId()).thenReturn(configId); bulkIndexHandler.bulk("testIndex", ImmutableList.of(anomalyResult), configId, listener); - verify(listener, times(1)).onFailure(exceptionCaptor.capture()); - assertEquals("Can't find result index testIndex", exceptionCaptor.getValue().getMessage()); + verify(anomalyDetectionIndices, times(1)).initCustomResultIndexDirectly(eq("testIndex"), any()); } public void testAnomalyResultBulkIndexHandler_InValidResultIndexMapping() { when(anomalyDetectionIndices.doesIndexExist("testIndex")).thenReturn(true); - when(anomalyDetectionIndices.isValidResultIndexMapping("testIndex")).thenReturn(false); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + listener.onResponse(false); + return null; + }).when(anomalyDetectionIndices).validateResultIndexMapping(eq("testIndex"), any()); + AnomalyResult anomalyResult = mock(AnomalyResult.class); when(anomalyResult.getConfigId()).thenReturn(configId); @@ -126,7 +132,11 @@ public void testAnomalyResultBulkIndexHandler_InValidResultIndexMapping() { public void testAnomalyResultBulkIndexHandler_FailBulkIndexAnomaly() throws IOException { when(anomalyDetectionIndices.doesIndexExist("testIndex")).thenReturn(true); - when(anomalyDetectionIndices.isValidResultIndexMapping("testIndex")).thenReturn(true); + doAnswer(invocation -> { + ActionListener listener = invocation.getArgument(1); + listener.onResponse(true); + return null; + }).when(anomalyDetectionIndices).validateResultIndexMapping(eq("testIndex"), any()); AnomalyResult anomalyResult = mock(AnomalyResult.class); when(anomalyResult.getConfigId()).thenReturn(configId); when(anomalyResult.toXContent(any(), any())).thenThrow(new RuntimeException()); diff --git a/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java b/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java index cdc19a4ac..0b3edf521 100644 --- a/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java +++ b/src/test/java/org/opensearch/forecast/indices/ForecastIndexManagementTests.java @@ -28,6 +28,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; import org.opensearch.core.action.ActionListener; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.index.IndexNotFoundException; import org.opensearch.plugins.Plugin; import org.opensearch.test.OpenSearchIntegTestCase; @@ -78,7 +79,8 @@ public void setup() throws IOException { client().threadPool(), settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); } @@ -181,7 +183,7 @@ public void testJobIndexExists() throws IOException { assertTrue(indices.doesJobIndexExist()); } - public void testValidateCustomIndexForBackendJobNoIndex() { + public void testValidateCustomIndexForBackendJobNoIndex() throws InterruptedException { validateCustomIndexForBackendJobNoIndex(indices); } @@ -221,7 +223,8 @@ public void testRollOver() throws IOException, InterruptedException { client().threadPool(), settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); indices.rolloverAndDeleteHistoryIndex(); diff --git a/src/test/java/org/opensearch/forecast/indices/ForecastResultIndexTests.java b/src/test/java/org/opensearch/forecast/indices/ForecastResultIndexTests.java index 8cbe42d00..5e2f0ec84 100644 --- a/src/test/java/org/opensearch/forecast/indices/ForecastResultIndexTests.java +++ b/src/test/java/org/opensearch/forecast/indices/ForecastResultIndexTests.java @@ -40,6 +40,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.core.action.ActionListener; import org.opensearch.core.index.Index; +import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.env.Environment; import org.opensearch.forecast.settings.ForecastSettings; import org.opensearch.threadpool.ThreadPool; @@ -103,7 +104,8 @@ public void setUp() throws Exception { threadPool, settings, nodeFilter, - TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES + TimeSeriesSettings.MAX_UPDATE_RETRY_TIMES, + NamedXContentRegistry.EMPTY ); clusterAdminClient = mock(ClusterAdminClient.class); diff --git a/src/test/java/org/opensearch/forecast/model/ForecasterTests.java b/src/test/java/org/opensearch/forecast/model/ForecasterTests.java index 5dd5927d5..66af227ee 100644 --- a/src/test/java/org/opensearch/forecast/model/ForecasterTests.java +++ b/src/test/java/org/opensearch/forecast/model/ForecasterTests.java @@ -107,7 +107,7 @@ public void testForecasterConstructor() { assertEquals(lastUpdateTime, forecaster.getLastUpdateTime()); assertEquals(categoryFields, forecaster.getCategoryFields()); assertEquals(user, forecaster.getUser()); - assertEquals(resultIndex, forecaster.getCustomResultIndex()); + assertEquals(resultIndex, forecaster.getCustomResultIndexOrAlias()); assertEquals(horizon, forecaster.getHorizon()); assertEquals(imputationOption, forecaster.getImputationOption()); } @@ -333,7 +333,7 @@ public void testValidCustomResultIndex() { customResultIndexTTL ); - assertEquals(resultIndex, forecaster.getCustomResultIndex()); + assertEquals(resultIndex, forecaster.getCustomResultIndexOrAlias()); } public void testInvalidHorizon() { diff --git a/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java b/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java index 57f56eda8..880ca5a02 100644 --- a/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java +++ b/src/test/java/org/opensearch/forecast/settings/ForecastEnabledSettingTests.java @@ -13,7 +13,9 @@ public void testIsForecastEnabled() { boolean original = ForecastEnabledSetting.isForecastEnabled(); try { ForecastEnabledSetting.getInstance().setSettingValue(ForecastEnabledSetting.FORECAST_ENABLED, true); - assertTrue(ForecastEnabledSetting.isForecastEnabled()); + // TODO: currently, we disable forecasting + // assertTrue(ForecastEnabledSetting.isForecastEnabled()); + assertTrue(!ForecastEnabledSetting.isForecastEnabled()); ForecastEnabledSetting.getInstance().setSettingValue(ForecastEnabledSetting.FORECAST_ENABLED, false); assertTrue(!ForecastEnabledSetting.isForecastEnabled()); } finally { diff --git a/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java b/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java index f2d746e32..0748c23d8 100644 --- a/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java +++ b/src/test/java/org/opensearch/search/aggregations/metrics/CardinalityProfileTests.java @@ -242,7 +242,8 @@ public void testNoResultsNoError() throws IOException, InterruptedException { assertTrue(response.getInitProgress() != null); called.getAndIncrement(); }, exception -> { - assertTrue("Should not reach here ", false); + LOG.error(exception); + assertTrue("Should not reach here", false); called.getAndIncrement(); }), totalInitProgress); diff --git a/src/test/java/org/opensearch/timeseries/TestHelpers.java b/src/test/java/org/opensearch/timeseries/TestHelpers.java index 2ae2a1498..d2217e3fe 100644 --- a/src/test/java/org/opensearch/timeseries/TestHelpers.java +++ b/src/test/java/org/opensearch/timeseries/TestHelpers.java @@ -715,7 +715,11 @@ public AnomalyDetector build() { imputationOption, randomIntBetween(1, 10000), randomIntBetween(1, TimeSeriesSettings.MAX_SHINGLE_SIZE * 2), - randomIntBetween(1, 1000), + // make history intervals at least TimeSeriesSettings.NUM_MIN_SAMPLES. + // Otherwise, tests like EntityColdStarterTests.testTwoSegments may fail + // as ModelColdStart.selectNumberOfSamples will select the smaller of + // 32 and historical intervals. + randomIntBetween(TimeSeriesSettings.NUM_MIN_SAMPLES, 1000), null, null, null, diff --git a/src/test/java/org/opensearch/timeseries/indices/IndexManagementIntegTestCase.java b/src/test/java/org/opensearch/timeseries/indices/IndexManagementIntegTestCase.java index 56a7ef0c8..cf70c5cf5 100644 --- a/src/test/java/org/opensearch/timeseries/indices/IndexManagementIntegTestCase.java +++ b/src/test/java/org/opensearch/timeseries/indices/IndexManagementIntegTestCase.java @@ -9,6 +9,7 @@ import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import java.io.IOException; @@ -26,7 +27,6 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.test.OpenSearchIntegTestCase; import org.opensearch.timeseries.common.exception.EndRunException; -import org.opensearch.timeseries.constant.CommonMessages; import org.opensearch.timeseries.function.ExecutorFunction; public abstract class IndexManagementIntegTestCase & TimeSeriesIndex, ISMType extends IndexManagement> @@ -87,18 +87,22 @@ public void validateCustomIndexForBackendJobInvalidMapping(ISMType indices) { assertEquals("Result index mapping is not correct", exceptionCaptor.getValue().getMessage()); } - public void validateCustomIndexForBackendJobNoIndex(ISMType indices) { - String resultIndex = "testIndex"; + public void validateCustomIndexForBackendJobNoIndex(ISMType indices) throws InterruptedException { + String resultIndex = "testindex"; String securityLogId = "logId"; String user = "testUser"; List roles = Arrays.asList("role1", "role2"); + CountDownLatch countDown = new CountDownLatch(1); ExecutorFunction function = mock(ExecutorFunction.class); ActionListener listener = mock(ActionListener.class); indices.validateCustomIndexForBackendJob(resultIndex, securityLogId, user, roles, function, listener); - - ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(EndRunException.class); - verify(listener).onFailure(exceptionCaptor.capture()); - assertEquals(CommonMessages.CAN_NOT_FIND_RESULT_INDEX + resultIndex, exceptionCaptor.getValue().getMessage()); + // function.execute should be executed after indices are created + doAnswer(invocation -> { + countDown.countDown(); + return null; + }).when(function).execute(); + assertTrue(countDown.await(60, TimeUnit.SECONDS)); + verify(function, times(1)).execute(); } } diff --git a/src/test/java/org/opensearch/timeseries/transport/DeleteAnomalyDetectorTests.java b/src/test/java/org/opensearch/timeseries/transport/DeleteAnomalyDetectorTests.java index 3015b92e5..49e0484c8 100644 --- a/src/test/java/org/opensearch/timeseries/transport/DeleteAnomalyDetectorTests.java +++ b/src/test/java/org/opensearch/timeseries/transport/DeleteAnomalyDetectorTests.java @@ -306,7 +306,7 @@ private void setupMocks( Instant.now(), 60L, TestHelpers.randomUser(), - jobParameter.getCustomResultIndex(), + jobParameter.getCustomResultIndexOrAlias(), AnalysisType.AD ).toXContent(TestHelpers.builder(), ToXContent.EMPTY_PARAMS) ),