Skip to content

Commit

Permalink
Adding remote index and multi-index checks in validation (#1290) (#1294)
Browse files Browse the repository at this point in the history
* Adding remote index and multi index checks in validation



* adding more tests



---------


(cherry picked from commit ec16d53)

Signed-off-by: Amit Galitzky <amgalitz@amazon.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
1 parent 5428f2b commit 67ac4e1
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 93 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public static String getTooManyCategoricalFieldErr(int limit) {
"Result index name has invalid character. Valid characters are a-z, 0-9, -(hyphen) and _(underscore)";
public static String FAIL_TO_VALIDATE = "failed to validate";
public static String INVALID_TIMESTAMP = "Timestamp field: (%s) must be of type date";
public static String NON_EXISTENT_TIMESTAMP_IN_INDEX = "Timestamp field: (%s) is not found in the (%s) index mapping";
public static String NON_EXISTENT_TIMESTAMP = "Timestamp field: (%s) is not found in index mapping";
public static String INVALID_NAME = "Valid characters for name are a-z, A-Z, 0-9, -(hyphen), _(underscore) and .(period)";
// change this error message to make it compatible with old version's integration(nexus) test
Expand Down Expand Up @@ -74,6 +75,9 @@ public static String getTooManyCategoricalFieldErr(int limit) {
+ " characters.";
public static final String INDEX_NOT_FOUND = "index does not exist";
public static final String FAIL_TO_GET_MAPPING_MSG = "Fail to get the index mapping of %s";
public static final String FAIL_TO_GET_MAPPING = "Fail to get the index mapping";
public static final String TIMESTAMP_VALIDATION_FAILED = "Validation failed for timefield of %s, ";

public static final String FAIL_TO_GET_CONFIG_MSG = "Fail to get config";

// ======================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ private Boolean validationTypesAreAccepted(String validationType) {
public ValidateConfigRequest prepareRequest(RestRequest request, NodeClient client, String typesStr) throws IOException {
XContentParser parser = request.contentParser();
ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser);

// if type param isn't blank and isn't a part of possible validation types throws exception
if (!StringUtils.isBlank(typesStr)) {
if (!validationTypesAreAccepted(typesStr)) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,7 @@
import static org.opensearch.timeseries.util.ParseUtils.checkFilterByBackendRoles;

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.*;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -205,7 +202,6 @@ public void validateExecute(
storedContext.restore();
Config config = request.getConfig();
ActionListener<ValidateConfigResponse> validateListener = ActionListener.wrap(response -> {
logger.debug("Result of validation process " + response);
// forcing response to be empty
listener.onResponse(new ValidateConfigResponse((ConfigValidationIssue) null));
}, exception -> {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.timeseries.util;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.service.ClusterService;

public class CrossClusterConfigUtils {
private static final Logger logger = LogManager.getLogger(ParseUtils.class);

/**
* Uses the clusterName to determine whether the target client is the local or a remote client,
* and returns the appropriate client.
* @param clusterName The name of the cluster to evaluate.
* @param client The local {@link NodeClient}.
* @param localClusterName The name of the local cluster.
* @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster.
*/
public static Client getClientForCluster(String clusterName, Client client, String localClusterName) {
return clusterName.equals(localClusterName) ? client : client.getRemoteClusterClient(clusterName);
}

/**
* Uses the clusterName to determine whether the target client is the local or a remote client,
* and returns the appropriate client.
* @param clusterName The name of the cluster to evaluate.
* @param client The local {@link NodeClient}.
* @param clusterService Used to retrieve the name of the local cluster.
* @return The local {@link NodeClient} for the local cluster, or a remote client for a remote cluster.
*/
public static Client getClientForCluster(String clusterName, Client client, ClusterService clusterService) {
return getClientForCluster(clusterName, client, clusterService.getClusterName().value());
}

/**
* Parses the list of indexes into a map of cluster_name to List of index names
* @param indexes A list of index names in cluster_name:index_name format.
* Local indexes can also be in index_name format.
* @param clusterService Used to retrieve the name of the local cluster.
* @return A map of cluster_name:index names
*/
public static HashMap<String, List<String>> separateClusterIndexes(List<String> indexes, ClusterService clusterService) {
return separateClusterIndexes(indexes, clusterService.getClusterName().value());
}

/**
* Parses the list of indexes into a map of cluster_name to list of index_name
* @param indexes A list of index names in cluster_name:index_name format.
* @param localClusterName The name of the local cluster.
* @return A map of cluster_name to List index_name
*/
public static HashMap<String, List<String>> separateClusterIndexes(List<String> indexes, String localClusterName) {
HashMap<String, List<String>> output = new HashMap<>();
for (String index : indexes) {
// Use the refactored method to get both cluster and index names in one call
Pair<String, String> clusterAndIndex = parseClusterAndIndexName(index);
String clusterName = clusterAndIndex.getKey();
String indexName = clusterAndIndex.getValue();
logger.info("clusterName: " + clusterName);
logger.info("indexName: " + indexName);

// If the index entry does not have a cluster_name, it indicates the index is on the local cluster.
if (clusterName.isEmpty()) {
clusterName = localClusterName;
}
output.computeIfAbsent(clusterName, k -> new ArrayList<>()).add(indexName);
}
return output;
}

/**
* Parses the cluster and index names from the given input string.
* The input can be in either "cluster_name:index_name" format or just "index_name".
* @param index The name of the index to evaluate.
* @return A Pair where the left is the cluster name (or empty if not present), and the right is the index name.
*/
public static Pair<String, String> parseClusterAndIndexName(String index) {
if (index.contains(":")) {
String[] parts = index.split(":", 2);
String clusterName = parts[0];
String indexName = parts.length > 1 ? parts[1] : "";
return Pair.of(clusterName, indexName);
} else {
return Pair.of("", index);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ public class AbstractForecasterActionHandlerTestCase extends AbstractTimeSeriesT
protected ThreadContext threadContext;
protected SecurityClientUtil clientUtil;
protected String categoricalField;
// @Mock
protected ClusterName clusterName;

@SuppressWarnings("unchecked")
@Override
Expand All @@ -85,6 +87,9 @@ public void setUp() throws Exception {

clusterService = mock(ClusterService.class);
ClusterName clusterName = new ClusterName("test");
clusterName = mock(ClusterName.class);
when(clusterService.getClusterName()).thenReturn(clusterName);
when(clusterName.value()).thenReturn("test");
ClusterState clusterState = ClusterState.builder(clusterName).metadata(Metadata.builder().build()).build();
when(clusterService.state()).thenReturn(clusterState);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public class IndexAnomalyDetectorActionHandlerTests extends AbstractTimeSeriesTe
private RestRequest.Method method;
private ADTaskManager adTaskManager;
private SearchFeatureDao searchFeatureDao;
private ClusterName clusterName;

@BeforeClass
public static void beforeClass() {
Expand Down Expand Up @@ -157,6 +158,10 @@ public void setUp() throws Exception {

searchFeatureDao = mock(SearchFeatureDao.class);

clusterName = mock(ClusterName.class);
when(clusterService.getClusterName()).thenReturn(clusterName);
when(clusterName.value()).thenReturn("test");

handler = new IndexAnomalyDetectorActionHandler(
clusterService,
clientMock,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.opensearch.action.support.replication.ReplicationResponse;
import org.opensearch.action.support.replication.ReplicationResponse.ShardInfo;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.routing.AllocationId;
import org.opensearch.common.settings.Settings;
import org.opensearch.core.action.ActionListener;
Expand Down Expand Up @@ -240,7 +241,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
verify(clientSpy, times(1)).execute(eq(GetAction.INSTANCE), any(), any());
}

public void testFaiToParse() throws InterruptedException {
public void testFailToParse() throws InterruptedException {
NodeClient client = new NodeClient(Settings.EMPTY, threadPool) {
@Override
public <Request extends ActionRequest, Response extends ActionResponse> void doExecute(
Expand Down Expand Up @@ -273,6 +274,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
}
};
NodeClient clientSpy = spy(client);
clusterName = mock(ClusterName.class);
when(clusterService.getClusterName()).thenReturn(clusterName);
when(clusterName.value()).thenReturn("test");

method = RestRequest.Method.PUT;

Expand Down Expand Up @@ -508,6 +512,9 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
}
};
NodeClient clientSpy = spy(client);
clusterName = mock(ClusterName.class);
when(clusterService.getClusterName()).thenReturn(clusterName);
when(clusterName.value()).thenReturn("test");

method = RestRequest.Method.POST;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.opensearch.ad.task.ADTaskManager;
import org.opensearch.client.Client;
import org.opensearch.client.node.NodeClient;
import org.opensearch.cluster.ClusterName;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
Expand Down Expand Up @@ -91,6 +92,7 @@ public class ValidateAnomalyDetectorActionHandlerTests extends AbstractTimeSerie
@Mock
protected ThreadPool threadPool;
protected ThreadContext threadContext;
protected ClusterName mockClusterName;

@SuppressWarnings("unchecked")
@Override
Expand All @@ -106,7 +108,9 @@ public void setUp() throws Exception {

anomalyDetectionIndices = mock(ADIndexManagement.class);
when(anomalyDetectionIndices.doesConfigIndexExist()).thenReturn(true);

mockClusterName = mock(ClusterName.class);
when(clusterService.getClusterName()).thenReturn(mockClusterName);
when(mockClusterName.value()).thenReturn("test");
detectorId = "123";
seqNo = 0L;
primaryTerm = 0L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.forecast.rest.handler.ValidateForecasterActionHandler;
import org.opensearch.timeseries.common.exception.ValidationException;
import org.opensearch.timeseries.model.ValidationAspect;

public class ValidateForecasterActionHandlerTests extends AbstractForecasterActionHandlerTestCase {
Expand Down Expand Up @@ -100,7 +101,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void doE
assertTrue("should not reach here", false);
inProgressLatch.countDown();
}, e -> {
assertTrue(e instanceof IllegalArgumentException);
assertTrue(e instanceof ValidationException);
inProgressLatch.countDown();
}));
assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS));
Expand Down
67 changes: 67 additions & 0 deletions src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -1370,6 +1370,73 @@ public void testValidateAnomalyDetectorWithNoTimeField() throws Exception {
assertEquals("time field missing", CommonMessages.NULL_TIME_FIELD, messageMap.get("time_field").get("message"));
}

public void testValidateAnomalyDetectorWithMultipleIndicesOneNotFound() throws Exception {
TestHelpers.createIndex(client(), "test-index", TestHelpers.toHttpEntity("{\"timestamp\": " + Instant.now().toEpochMilli() + "}"));
Response resp = TestHelpers
.makeRequest(
client(),
"POST",
TestHelpers.AD_BASE_DETECTORS_URI + "/_validate",
ImmutableMap.of(),
TestHelpers
.toHttpEntity(
"{\"name\":\""
+ "test-detector"
+ "\",\"description\":\"Test detector\",\"time_field\":\"timestamp\","
+ "\"indices\":[\"test-index\", \"test-index-2\"],\"feature_attributes\":[{\"feature_name\":\"cpu-sum\",\""
+ "feature_enabled\":true,\"aggregation_query\":{\"total_cpu\":{\"sum\":{\"field\":\"cpu\"}}}},"
+ "{\"feature_name\":\"error-sum\",\"feature_enabled\":true,\"aggregation_query\":"
+ "{\"total_error\":"
+ "{\"sum\":{\"field\":\"error\"}}}}],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":"
+ "{\"field\":"
+ "\"cpu\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"detection_interval\":"
+ "{\"period\":{\"interval\":1,\"unit\":\"Minutes\"}},"
+ "\"window_delay\":{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}},"
+ "\"shingle_size\": 8}"
),
null
);
Map<String, Object> responseMap = entityAsMap(resp);

@SuppressWarnings("unchecked")
Map<String, Map<String, String>> messageMap = (Map<String, Map<String, String>>) XContentMapValues
.extractValue("detector", responseMap);
String errorMessage = "index does not exist";
assertEquals("index does not exist", errorMessage, messageMap.get("indices").get("message"));
}

public void testValidateAnomalyDetectorWithMultipleIndices() throws Exception {
TestHelpers.createIndexWithTimeField(client(), "test-index", TIME_FIELD);
TestHelpers.createIndexWithTimeField(client(), "test-index-2", TIME_FIELD);

Response resp = TestHelpers
.makeRequest(
client(),
"POST",
TestHelpers.AD_BASE_DETECTORS_URI + "/_validate",
ImmutableMap.of(),
TestHelpers
.toHttpEntity(
"{\"name\":\""
+ "test-detector"
+ "\",\"description\":\"Test detector\",\"time_field\":\"timestamp\","
+ "\"indices\":[\"test-index\", \"test-index-2\"],\"feature_attributes\":[{\"feature_name\":\"cpu-sum\",\""
+ "feature_enabled\":true,\"aggregation_query\":{\"total_cpu\":{\"sum\":{\"field\":\"cpu\"}}}},"
+ "{\"feature_name\":\"error-sum\",\"feature_enabled\":true,\"aggregation_query\":"
+ "{\"total_error\":"
+ "{\"sum\":{\"field\":\"error\"}}}}],\"filter_query\":{\"bool\":{\"filter\":[{\"exists\":"
+ "{\"field\":"
+ "\"cpu\",\"boost\":1}}],\"adjust_pure_negative\":true,\"boost\":1}},\"detection_interval\":"
+ "{\"period\":{\"interval\":1,\"unit\":\"Minutes\"}},"
+ "\"window_delay\":{\"period\":{\"interval\":2,\"unit\":\"Minutes\"}},"
+ "\"shingle_size\": 8}"
),
null
);
Map<String, Object> responseMap = entityAsMap(resp);
assertEquals("no issue, empty response body", new HashMap<String, Object>(), responseMap);
}

public void testValidateAnomalyDetectorWithIncorrectShingleSize() throws Exception {
TestHelpers.createIndex(client(), "test-index", TestHelpers.toHttpEntity("{\"timestamp\": " + Instant.now().toEpochMilli() + "}"));
Response resp = TestHelpers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -488,9 +488,8 @@ public void testValidateAnomalyDetectorWithNonExistentTimefield() throws IOExcep
ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000);
assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType());
assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect());
assertEquals(
String.format(Locale.ROOT, CommonMessages.NON_EXISTENT_TIMESTAMP, anomalyDetector.getTimeField()),
response.getIssue().getMessage()
assertTrue(
response.getIssue().getMessage().contains("Timestamp field: (" + anomalyDetector.getTimeField() + ") is not found in the")
);
}

Expand All @@ -511,9 +510,11 @@ public void testValidateAnomalyDetectorWithNonDateTimeField() throws IOException
ValidateConfigResponse response = client().execute(ValidateAnomalyDetectorAction.INSTANCE, request).actionGet(5_000);
assertEquals(ValidationIssueType.TIMEFIELD_FIELD, response.getIssue().getType());
assertEquals(ValidationAspect.DETECTOR, response.getIssue().getAspect());
assertEquals(
String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField()),
response.getIssue().getMessage()
assertTrue(
response
.getIssue()
.getMessage()
.contains(String.format(Locale.ROOT, CommonMessages.INVALID_TIMESTAMP, anomalyDetector.getTimeField()))
);
}

Expand Down
Loading

0 comments on commit 67ac4e1

Please sign in to comment.