From 6843b299882398ba05522d72aa6d9cc7ca831f3e Mon Sep 17 00:00:00 2001 From: Yaliang <49084640+ylwu-amzn@users.noreply.github.com> Date: Fri, 30 Jul 2021 19:32:36 +0000 Subject: [PATCH] avoid sending back verbose error message and wrong 500 error to user; fix hard code query size of historical analysis (#150) * avoid sending back verbose error message and wrong 500 error to user Signed-off-by: Yaliang Wu * put general error message into constants --- .../ad/AnomalyDetectorProfileRunner.java | 18 ++++++--- .../opensearch/ad/EntityProfileRunner.java | 20 +++++----- .../opensearch/ad/caching/PriorityCache.java | 4 +- .../ad/constant/CommonErrorMessages.java | 18 ++++++++- .../ad/feature/SearchFeatureDao.java | 3 +- .../org/opensearch/ad/ml/CheckpointDao.java | 3 +- .../IndexAnomalyDetectorActionHandler.java | 3 +- .../IndexAnomalyDetectorJobActionHandler.java | 2 +- .../ad/settings/AnomalyDetectorSettings.java | 3 +- .../org/opensearch/ad/task/ADTaskManager.java | 5 ++- .../AnomalyDetectorJobTransportAction.java | 7 +++- .../DeleteAnomalyDetectorTransportAction.java | 5 ++- .../DeleteAnomalyResultsTransportAction.java | 5 ++- .../GetAnomalyDetectorTransportAction.java | 11 +++--- .../IndexAnomalyDetectorTransportAction.java | 7 +++- ...PreviewAnomalyDetectorTransportAction.java | 20 +++++++--- ...rchAnomalyDetectorInfoTransportAction.java | 5 ++- .../StatsAnomalyDetectorTransportAction.java | 6 ++- .../StopDetectorTransportAction.java | 6 ++- .../ad/transport/handler/ADSearchHandler.java | 7 +++- .../org/opensearch/ad/util/ParseUtils.java | 20 ++++++---- .../opensearch/ad/util/RestHandlerUtils.java | 37 +++++++++++++++++++ .../ad/caching/PriorityCacheTests.java | 3 +- .../ad/rest/AnomalyDetectorRestApiIT.java | 5 ++- ...nomalyDetectorJobTransportActionTests.java | 3 +- .../ad/transport/GetAnomalyDetectorTests.java | 6 +-- ...ewAnomalyDetectorTransportActionTests.java | 7 ++-- .../opensearch/ad/util/ParseUtilsTests.java | 2 +- 28 files changed, 175 insertions(+), 66 deletions(-) diff --git a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java index ce01db12d..3a975e404 100644 --- a/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/org/opensearch/ad/AnomalyDetectorProfileRunner.java @@ -26,11 +26,14 @@ package org.opensearch.ad; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PARSE_DETECTOR_MSG; import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; +import static org.opensearch.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR; -import java.security.InvalidParameterException; import java.util.List; import java.util.Map; import java.util.Set; @@ -39,6 +42,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.core.util.Throwables; import org.apache.logging.log4j.message.ParameterizedMessage; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.search.SearchRequest; @@ -109,7 +113,7 @@ public AnomalyDetectorProfileRunner( public void profile(String detectorId, ActionListener listener, Set profilesToCollect) { if (profilesToCollect.isEmpty()) { - listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT)); + listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT)); return; } calculateTotalResponsesToWait(detectorId, profilesToCollect, listener); @@ -132,12 +136,16 @@ private void calculateTotalResponsesToWait( AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId); prepareProfile(detector, listener, profilesToCollect); } catch (Exception e) { - listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e)); + logger.error(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, e); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_PARSE_DETECTOR_MSG + detectorId, BAD_REQUEST)); } } else { - listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId)); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, BAD_REQUEST)); } - }, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception)))); + }, exception -> { + logger.error(FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, INTERNAL_SERVER_ERROR)); + })); } private void prepareProfile( diff --git a/src/main/java/org/opensearch/ad/EntityProfileRunner.java b/src/main/java/org/opensearch/ad/EntityProfileRunner.java index 0355ee1bd..1cdf3e489 100644 --- a/src/main/java/org/opensearch/ad/EntityProfileRunner.java +++ b/src/main/java/org/opensearch/ad/EntityProfileRunner.java @@ -30,7 +30,6 @@ import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; -import java.security.InvalidParameterException; import java.util.List; import java.util.Map; import java.util.Optional; @@ -77,6 +76,7 @@ public class EntityProfileRunner extends AbstractProfileRunner { private final Logger logger = LogManager.getLogger(EntityProfileRunner.class); static final String NOT_HC_DETECTOR_ERR_MSG = "This is not a high cardinality detector"; + static final String EMPTY_ENTITY_ATTRIBUTES = "Empty entity attributes"; static final String NO_ENTITY = "Cannot find entity"; private Client client; private NamedXContentRegistry xContentRegistry; @@ -102,7 +102,7 @@ public void profile( ActionListener listener ) { if (profilesToCollect == null || profilesToCollect.size() == 0) { - listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT)); + listener.onFailure(new IllegalArgumentException(CommonErrorMessages.EMPTY_PROFILES_COLLECT)); return; } GetRequest getDetectorRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId); @@ -119,10 +119,10 @@ public void profile( List categoryFields = detector.getCategoryField(); int maxCategoryFields = NumericSetting.maxCategoricalFields(); if (categoryFields == null || categoryFields.size() == 0) { - listener.onFailure(new InvalidParameterException(NOT_HC_DETECTOR_ERR_MSG)); + listener.onFailure(new IllegalArgumentException(NOT_HC_DETECTOR_ERR_MSG)); } else if (categoryFields.size() > maxCategoryFields) { listener - .onFailure(new InvalidParameterException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields))); + .onFailure(new IllegalArgumentException(CommonErrorMessages.getTooManyCategoricalFieldErr(maxCategoryFields))); } else { validateEntity(entityValue, categoryFields, detectorId, profilesToCollect, detector, listener); } @@ -130,7 +130,7 @@ public void profile( listener.onFailure(t); } } else { - listener.onFailure(new InvalidParameterException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId)); + listener.onFailure(new IllegalArgumentException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId)); } }, listener::onFailure)); } @@ -160,12 +160,12 @@ private void validateEntity( ) { Map attributes = entity.getAttributes(); if (attributes == null || attributes.size() != categoryFields.size()) { - listener.onFailure(new InvalidParameterException("Empty entity attributes")); + listener.onFailure(new IllegalArgumentException(EMPTY_ENTITY_ATTRIBUTES)); return; } for (String field : categoryFields) { if (false == attributes.containsKey(field)) { - listener.onFailure(new InvalidParameterException("Cannot find " + field)); + listener.onFailure(new IllegalArgumentException("Cannot find " + field)); return; } } @@ -184,15 +184,15 @@ private void validateEntity( client.search(searchRequest, ActionListener.wrap(searchResponse -> { try { if (searchResponse.getHits().getHits().length == 0) { - listener.onFailure(new InvalidParameterException(NO_ENTITY)); + listener.onFailure(new IllegalArgumentException(NO_ENTITY)); return; } prepareEntityProfile(listener, detectorId, entity, profilesToCollect, detector, categoryFields.get(0)); } catch (Exception e) { - listener.onFailure(new InvalidParameterException(NO_ENTITY)); + listener.onFailure(new IllegalArgumentException(NO_ENTITY)); return; } - }, e -> listener.onFailure(new InvalidParameterException(NO_ENTITY)))); + }, e -> listener.onFailure(new IllegalArgumentException(NO_ENTITY)))); } diff --git a/src/main/java/org/opensearch/ad/caching/PriorityCache.java b/src/main/java/org/opensearch/ad/caching/PriorityCache.java index 1ad5f266d..83a238a5d 100644 --- a/src/main/java/org/opensearch/ad/caching/PriorityCache.java +++ b/src/main/java/org/opensearch/ad/caching/PriorityCache.java @@ -53,10 +53,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; import org.opensearch.ad.AnomalyDetectorPlugin; import org.opensearch.ad.MemoryTracker; import org.opensearch.ad.MemoryTracker.Origin; +import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.LimitExceededException; import org.opensearch.ad.constant.CommonErrorMessages; import org.opensearch.ad.ml.CheckpointDao; @@ -637,7 +637,7 @@ public void maintenance() { }); } catch (Exception e) { // will be thrown to ES's transport broadcast handler - throw new OpenSearchException("Fail to maintain cache", e); + throw new AnomalyDetectionException("Fail to maintain cache", e); } } diff --git a/src/main/java/org/opensearch/ad/constant/CommonErrorMessages.java b/src/main/java/org/opensearch/ad/constant/CommonErrorMessages.java index f4bb4606e..9dd31aac0 100644 --- a/src/main/java/org/opensearch/ad/constant/CommonErrorMessages.java +++ b/src/main/java/org/opensearch/ad/constant/CommonErrorMessages.java @@ -44,9 +44,13 @@ public class CommonErrorMessages { public static final String ALL_FEATURES_DISABLED_ERR_MSG = "Having trouble querying data because all of your features have been disabled."; public static final String INVALID_TIMESTAMP_ERR_MSG = "timestamp is invalid"; - public static String FAIL_TO_FIND_DETECTOR_MSG = "Fail to find detector with id: "; + public static String FAIL_TO_PARSE_DETECTOR_MSG = "Fail to parse detector with id: "; + // change this error message to make it compatible with old version's integration(nexus) test + public static String FAIL_TO_FIND_DETECTOR_MSG = "Can't find detector with id: "; public static String FAIL_TO_GET_PROFILE_MSG = "Fail to get profile for detector "; public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector "; + public static String FAIL_TO_GET_USER_INFO = "Unable to get user information from detector "; + public static String NO_PERMISSION_TO_ACCESS_DETECTOR = "User does not have permissions to access detector: "; public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than "; public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid"; public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for "; @@ -71,4 +75,16 @@ public static String getTooManyCategoricalFieldErr(int limit) { public static String EXCEED_HISTORICAL_ANALYSIS_LIMIT = "Exceed max historical analysis limit per node"; public static String NO_ELIGIBLE_NODE_TO_RUN_DETECTOR = "No eligible node to run detector "; public static String EMPTY_STALE_RUNNING_ENTITIES = "Empty stale running entities"; + + public static String FAIL_TO_GET_DETECTOR = "Fail to get detector"; + public static String FAIL_TO_GET_DETECTOR_INFO = "Fail to get detector info"; + public static String FAIL_TO_CREATE_DETECTOR = "Fail to create detector"; + public static String FAIL_TO_UPDATE_DETECTOR = "Fail to update detector"; + public static String FAIL_TO_PREVIEW_DETECTOR = "Fail to preview detector"; + public static String FAIL_TO_START_DETECTOR = "Fail to start detector"; + public static String FAIL_TO_STOP_DETECTOR = "Fail to stop detector"; + public static String FAIL_TO_DELETE_DETECTOR = "Fail to delete detector"; + public static String FAIL_TO_DELETE_AD_RESULT = "Fail to delete anomaly result"; + public static String FAIL_TO_GET_STATS = "Fail to get stats"; + public static String FAIL_TO_SEARCH = "Fail to search"; } diff --git a/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java b/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java index dc79b1dc6..b0ec61949 100644 --- a/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java +++ b/src/main/java/org/opensearch/ad/feature/SearchFeatureDao.java @@ -55,6 +55,7 @@ import org.opensearch.action.ActionListener; import org.opensearch.action.search.SearchRequest; import org.opensearch.action.search.SearchResponse; +import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.constant.CommonName; import org.opensearch.ad.dataprocessor.Interpolator; import org.opensearch.ad.model.AnomalyDetector; @@ -425,7 +426,7 @@ public void onResponse(SearchResponse response) { listener.onResponse(topEntities); } else if (expirationEpochMs < clock.millis()) { if (topEntities.isEmpty()) { - listener.onFailure(new IllegalStateException("timeout to get preview results. Please retry later.")); + listener.onFailure(new AnomalyDetectionException("timeout to get preview results. Please retry later.")); } else { logger.info("timeout to get preview results. Send whatever we have."); listener.onResponse(topEntities); diff --git a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java index 763473c65..861ad8e9a 100644 --- a/src/main/java/org/opensearch/ad/ml/CheckpointDao.java +++ b/src/main/java/org/opensearch/ad/ml/CheckpointDao.java @@ -66,6 +66,7 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; +import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.ResourceNotFoundException; import org.opensearch.ad.constant.CommonName; import org.opensearch.ad.indices.ADIndex; @@ -674,7 +675,7 @@ public void batchWrite(BulkRequest request, ActionListener listene clientUtil.execute(BulkAction.INSTANCE, request, listener); } else { // create index failure. Notify callers using listener. - listener.onFailure(new RuntimeException("Creating checkpoint with mappings call not acknowledged.")); + listener.onFailure(new AnomalyDetectionException("Creating checkpoint with mappings call not acknowledged.")); } }, exception -> { if (ExceptionsHelper.unwrapCause(exception) instanceof ResourceAlreadyExistsException) { diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java index d7bfb7a7a..aaf7dce2d 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorActionHandler.java @@ -26,6 +26,7 @@ package org.opensearch.ad.rest.handler; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG; import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES; import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import static org.opensearch.ad.util.RestHandlerUtils.XCONTENT_WITH_TYPE; @@ -229,7 +230,7 @@ private void updateAnomalyDetector(String detectorId) { private void onGetAnomalyDetectorResponse(GetResponse response) { if (!response.isExists()) { - listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found with id: " + detectorId, RestStatus.NOT_FOUND)); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND)); return; } try (XContentParser parser = RestHandlerUtils.createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { diff --git a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java index 3377e3b2d..d0962578a 100644 --- a/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java +++ b/src/main/java/org/opensearch/ad/rest/handler/IndexAnomalyDetectorJobActionHandler.java @@ -233,7 +233,7 @@ private void indexAnomalyDetectorJob(AnomalyDetectorJob job, AnomalyDetectorFunc ); } - private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) throws IOException { + private void onIndexAnomalyDetectorJobResponse(IndexResponse response, AnomalyDetectorFunction function) { if (response == null || (response.getResult() != CREATED && response.getResult() != UPDATED)) { String errorMsg = getShardsFailure(response); listener.onFailure(new OpenSearchStatusException(errorMsg, response.status())); diff --git a/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java b/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java index 217c26458..2642f7e62 100644 --- a/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java +++ b/src/main/java/org/opensearch/ad/settings/AnomalyDetectorSettings.java @@ -424,12 +424,13 @@ private AnomalyDetectorSettings() {} Setting.Property.Dynamic ); + public static final int MAX_BATCH_TASK_PIECE_SIZE = 10_000; public static final Setting BATCH_TASK_PIECE_SIZE = Setting .intSetting( "plugins.anomaly_detection.batch_task_piece_size", LegacyOpenDistroAnomalyDetectorSettings.BATCH_TASK_PIECE_SIZE, 1, - 10_000, + MAX_BATCH_TASK_PIECE_SIZE, Setting.Property.NodeScope, Setting.Property.Dynamic ); diff --git a/src/main/java/org/opensearch/ad/task/ADTaskManager.java b/src/main/java/org/opensearch/ad/task/ADTaskManager.java index 125e8f2c3..497321487 100644 --- a/src/main/java/org/opensearch/ad/task/ADTaskManager.java +++ b/src/main/java/org/opensearch/ad/task/ADTaskManager.java @@ -30,6 +30,7 @@ import static org.opensearch.ad.AnomalyDetectorPlugin.AD_BATCH_TASK_THREAD_POOL_NAME; import static org.opensearch.ad.constant.CommonErrorMessages.DETECTOR_IS_RUNNING; import static org.opensearch.ad.constant.CommonErrorMessages.EXCEED_HISTORICAL_ANALYSIS_LIMIT; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG; import static org.opensearch.ad.constant.CommonErrorMessages.NO_ELIGIBLE_NODE_TO_RUN_DETECTOR; import static org.opensearch.ad.indices.AnomalyDetectionIndices.ALL_AD_RESULTS_INDEX_PATTERN; import static org.opensearch.ad.model.ADTask.DETECTOR_ID_FIELD; @@ -548,7 +549,7 @@ public void getDetector(String detectorId, Consumer consume GetRequest getRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId); client.get(getRequest, ActionListener.wrap(response -> { if (!response.isExists()) { - listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found", RestStatus.NOT_FOUND)); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND)); return; } try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { @@ -574,7 +575,7 @@ public void getDetector( GetRequest getRequest = new GetRequest(ANOMALY_DETECTORS_INDEX, detectorId); client.get(getRequest, ActionListener.wrap(response -> { if (!response.isExists()) { - listener.onFailure(new OpenSearchStatusException("AnomalyDetector is not found", RestStatus.NOT_FOUND)); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG, RestStatus.NOT_FOUND)); return; } try (XContentParser parser = createXContentParserFromRegistry(xContentRegistry, response.getSourceAsBytesRef())) { diff --git a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java index a99909f0e..6e59fd23d 100644 --- a/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportAction.java @@ -26,10 +26,13 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_START_DETECTOR; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_STOP_DETECTOR; import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.ad.settings.AnomalyDetectorSettings.REQUEST_TIMEOUT; import static org.opensearch.ad.util.ParseUtils.getUserContext; import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -88,7 +91,7 @@ public AnomalyDetectorJobTransportAction( } @Override - protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener listener) { + protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionListener actionListener) { String detectorId = request.getDetectorID(); DetectionDateRange detectionDateRange = request.getDetectionDateRange(); boolean historical = request.isHistorical(); @@ -96,6 +99,8 @@ protected void doExecute(Task task, AnomalyDetectorJobRequest request, ActionLis long primaryTerm = request.getPrimaryTerm(); String rawPath = request.getRawPath(); TimeValue requestTimeout = REQUEST_TIMEOUT.get(settings); + String errorMessage = rawPath.endsWith(RestHandlerUtils.START_JOB) ? FAIL_TO_START_DETECTOR : FAIL_TO_STOP_DETECTOR; + ActionListener listener = wrapRestActionListener(actionListener, errorMessage); // By the time request reaches here, the user permissions are validated by Security plugin. User user = getUserContext(client); diff --git a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java index 6bb5d7e37..6b8de19b1 100644 --- a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyDetectorTransportAction.java @@ -26,11 +26,13 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_DELETE_DETECTOR; import static org.opensearch.ad.model.ADTaskType.HISTORICAL_DETECTOR_TASK_TYPES; import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.ad.util.ParseUtils.getUserContext; import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import java.io.IOException; @@ -98,10 +100,11 @@ public DeleteAnomalyDetectorTransportAction( } @Override - protected void doExecute(Task task, DeleteAnomalyDetectorRequest request, ActionListener listener) { + protected void doExecute(Task task, DeleteAnomalyDetectorRequest request, ActionListener actionListener) { String detectorId = request.getDetectorID(); LOG.info("Delete anomaly detector job {}", detectorId); User user = getUserContext(client); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_DELETE_DETECTOR); // By the time request reaches here, the user permissions are validated by Security plugin. try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( diff --git a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java index 0746c9c98..1634170cb 100644 --- a/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/DeleteAnomalyResultsTransportAction.java @@ -11,9 +11,11 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_DELETE_AD_RESULT; import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.ad.util.ParseUtils.addUserBackendRolesFilter; import static org.opensearch.ad.util.ParseUtils.getUserContext; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -53,7 +55,8 @@ public DeleteAnomalyResultsTransportAction( } @Override - protected void doExecute(Task task, DeleteByQueryRequest request, ActionListener listener) { + protected void doExecute(Task task, DeleteByQueryRequest request, ActionListener actionListener) { + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_DELETE_AD_RESULT); delete(request, listener); } diff --git a/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java index 71db6edf2..ce66f7f41 100644 --- a/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/GetAnomalyDetectorTransportAction.java @@ -26,6 +26,8 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_GET_DETECTOR; import static org.opensearch.ad.model.ADTaskType.ALL_DETECTOR_TASK_TYPES; import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import static org.opensearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; @@ -33,6 +35,7 @@ import static org.opensearch.ad.util.ParseUtils.getUserContext; import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute; import static org.opensearch.ad.util.RestHandlerUtils.PROFILE; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import java.util.Arrays; @@ -140,9 +143,10 @@ public GetAnomalyDetectorTransportAction( } @Override - protected void doExecute(Task task, GetAnomalyDetectorRequest request, ActionListener listener) { + protected void doExecute(Task task, GetAnomalyDetectorRequest request, ActionListener actionListener) { String detectorID = request.getDetectorID(); User user = getUserContext(client); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_DETECTOR); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( user, @@ -293,10 +297,7 @@ public void onResponse(MultiGetResponse multiGetResponse) { for (MultiGetItemResponse response : responses) { if (ANOMALY_DETECTORS_INDEX.equals(response.getIndex())) { if (response.getResponse() == null || !response.getResponse().isExists()) { - listener - .onFailure( - new OpenSearchStatusException("Can't find detector with id: " + detectorId, RestStatus.NOT_FOUND) - ); + listener.onFailure(new OpenSearchStatusException(FAIL_TO_FIND_DETECTOR_MSG + detectorId, RestStatus.NOT_FOUND)); return; } id = response.getId(); diff --git a/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java index 1b9cdf080..db7227d96 100644 --- a/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/IndexAnomalyDetectorTransportAction.java @@ -26,10 +26,13 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_CREATE_DETECTOR; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_UPDATE_DETECTOR; import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.ad.util.ParseUtils.checkFilterByBackendRoles; import static org.opensearch.ad.util.ParseUtils.getDetector; import static org.opensearch.ad.util.ParseUtils.getUserContext; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import java.io.IOException; import java.util.List; @@ -95,10 +98,12 @@ public IndexAnomalyDetectorTransportAction( } @Override - protected void doExecute(Task task, IndexAnomalyDetectorRequest request, ActionListener listener) { + protected void doExecute(Task task, IndexAnomalyDetectorRequest request, ActionListener actionListener) { User user = getUserContext(client); String detectorId = request.getDetectorID(); RestRequest.Method method = request.getMethod(); + String errorMessage = method == RestRequest.Method.PUT ? FAIL_TO_UPDATE_DETECTOR : FAIL_TO_CREATE_DETECTOR; + ActionListener listener = wrapRestActionListener(actionListener, errorMessage); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute(user, detectorId, method, listener, (detector) -> adExecute(request, user, detector, context, listener)); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java index b2dc617f1..40a0eed8a 100644 --- a/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportAction.java @@ -26,11 +26,13 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_PREVIEW_DETECTOR; import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_ANOMALY_FEATURES; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_CONCURRENT_PREVIEW; import static org.opensearch.ad.util.ParseUtils.getUserContext; import static org.opensearch.ad.util.ParseUtils.resolveUserAndExecute; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import java.io.IOException; @@ -41,7 +43,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; @@ -49,6 +51,7 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.ad.AnomalyDetectorRunner; import org.opensearch.ad.breaker.ADCircuitBreakerService; +import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.ClientException; import org.opensearch.ad.common.exception.LimitExceededException; import org.opensearch.ad.constant.CommonErrorMessages; @@ -107,9 +110,14 @@ public PreviewAnomalyDetectorTransportAction( } @Override - protected void doExecute(Task task, PreviewAnomalyDetectorRequest request, ActionListener listener) { + protected void doExecute( + Task task, + PreviewAnomalyDetectorRequest request, + ActionListener actionListener + ) { String detectorId = request.getDetectorId(); User user = getUserContext(client); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_PREVIEW_DETECTOR); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { resolveUserAndExecute( user, @@ -152,7 +160,7 @@ void previewExecute( if (detector != null) { String error = validateDetector(detector); if (StringUtils.isNotBlank(error)) { - listener.onFailure(new OpenSearchException(error, RestStatus.BAD_REQUEST)); + listener.onFailure(new OpenSearchStatusException(error, RestStatus.BAD_REQUEST)); lock.release(); return; } @@ -199,7 +207,7 @@ public void accept(List anomalyResult) throws Exception { logger.error("Unexpected error running anomaly detector " + detector.getDetectorId(), exception); listener .onFailure( - new OpenSearchException( + new OpenSearchStatusException( "Unexpected error running anomaly detector " + detector.getDetectorId() + ". " + exception.getMessage(), RestStatus.INTERNAL_SERVER_ERROR ) @@ -236,7 +244,7 @@ public void accept(GetResponse response) throws Exception { if (!response.isExists()) { listener .onFailure( - new OpenSearchException("Can't find anomaly detector with id:" + response.getId(), RestStatus.NOT_FOUND) + new OpenSearchStatusException("Can't find anomaly detector with id:" + response.getId(), RestStatus.NOT_FOUND) ); return; } @@ -253,6 +261,6 @@ public void accept(GetResponse response) throws Exception { listener.onFailure(e); } } - }, exception -> { listener.onFailure(new OpenSearchException("Could not execute get query to find detector")); }); + }, exception -> { listener.onFailure(new AnomalyDetectionException("Could not execute get query to find detector")); }); } } diff --git a/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java b/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java index aa6511fb1..14d390ecb 100644 --- a/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/SearchAnomalyDetectorInfoTransportAction.java @@ -26,7 +26,9 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_GET_DETECTOR_INFO; import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -69,10 +71,11 @@ public SearchAnomalyDetectorInfoTransportAction( protected void doExecute( Task task, SearchAnomalyDetectorInfoRequest request, - ActionListener listener + ActionListener actionListener ) { String name = request.getName(); String rawPath = request.getRawPath(); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_DETECTOR_INFO); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { SearchRequest searchRequest = new SearchRequest().indices(ANOMALY_DETECTORS_INDEX); if (rawPath.endsWith(RestHandlerUtils.COUNT)) { diff --git a/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java index b6d610e66..4910e6aae 100644 --- a/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/StatsAnomalyDetectorTransportAction.java @@ -26,6 +26,9 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_GET_STATS; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; + import java.util.HashMap; import java.util.List; import java.util.Map; @@ -80,7 +83,8 @@ public StatsAnomalyDetectorTransportAction( } @Override - protected void doExecute(Task task, ADStatsRequest request, ActionListener listener) { + protected void doExecute(Task task, ADStatsRequest request, ActionListener actionListener) { + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_GET_STATS); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { getStats(client, listener, request); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/ad/transport/StopDetectorTransportAction.java b/src/main/java/org/opensearch/ad/transport/StopDetectorTransportAction.java index 54b6d7d4c..593f24e71 100644 --- a/src/main/java/org/opensearch/ad/transport/StopDetectorTransportAction.java +++ b/src/main/java/org/opensearch/ad/transport/StopDetectorTransportAction.java @@ -26,6 +26,8 @@ package org.opensearch.ad.transport; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_STOP_DETECTOR; + import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; @@ -87,9 +89,9 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener< listener.onResponse(new StopDetectorResponse(false)); })); } catch (Exception e) { - LOG.error("Fail to stop detector " + adID, e); + LOG.error(FAIL_TO_STOP_DETECTOR + " " + adID, e); Throwable cause = ExceptionsHelper.unwrapCause(e); - listener.onFailure(new InternalFailure(adID, cause)); + listener.onFailure(new InternalFailure(adID, FAIL_TO_STOP_DETECTOR, cause)); } } } diff --git a/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java b/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java index 6bb15006c..7a96f5bde 100644 --- a/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java +++ b/src/main/java/org/opensearch/ad/transport/handler/ADSearchHandler.java @@ -26,9 +26,11 @@ package org.opensearch.ad.transport.handler; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_SEARCH; import static org.opensearch.ad.settings.AnomalyDetectorSettings.FILTER_BY_BACKEND_ROLES; import static org.opensearch.ad.util.ParseUtils.addUserBackendRolesFilter; import static org.opensearch.ad.util.ParseUtils.getUserContext; +import static org.opensearch.ad.util.RestHandlerUtils.wrapRestActionListener; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -61,10 +63,11 @@ public ADSearchHandler(Settings settings, ClusterService clusterService, Client * and execute search. * * @param request search request - * @param listener action listerner + * @param actionListener action listerner */ - public void search(SearchRequest request, ActionListener listener) { + public void search(SearchRequest request, ActionListener actionListener) { User user = getUserContext(client); + ActionListener listener = wrapRestActionListener(actionListener, FAIL_TO_SEARCH); try (ThreadContext.StoredContext context = client.threadPool().getThreadContext().stashContext()) { validateRole(request, user, listener); } catch (Exception e) { diff --git a/src/main/java/org/opensearch/ad/util/ParseUtils.java b/src/main/java/org/opensearch/ad/util/ParseUtils.java index ac15072ee..b973c25e7 100644 --- a/src/main/java/org/opensearch/ad/util/ParseUtils.java +++ b/src/main/java/org/opensearch/ad/util/ParseUtils.java @@ -26,11 +26,15 @@ package org.opensearch.ad.util; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_GET_USER_INFO; +import static org.opensearch.ad.constant.CommonErrorMessages.NO_PERMISSION_TO_ACCESS_DETECTOR; import static org.opensearch.ad.constant.CommonName.DATE_HISTOGRAM; import static org.opensearch.ad.constant.CommonName.EPOCH_MILLIS_FORMAT; import static org.opensearch.ad.constant.CommonName.FEATURE_AGGS; import static org.opensearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_END; import static org.opensearch.ad.model.AnomalyDetector.QUERY_PARAM_PERIOD_START; +import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PIECE_SIZE; import static org.opensearch.common.xcontent.XContentParserUtils.ensureExpectedToken; import static org.opensearch.search.aggregations.AggregationBuilders.dateRange; import static org.opensearch.search.aggregations.AggregatorFactories.VALID_AGG_NAME; @@ -48,13 +52,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.lucene.search.join.ScoreMode; -import org.opensearch.OpenSearchException; -import org.opensearch.ResourceNotFoundException; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; import org.opensearch.action.search.SearchResponse; import org.opensearch.ad.common.exception.AnomalyDetectionException; +import org.opensearch.ad.common.exception.ResourceNotFoundException; import org.opensearch.ad.constant.CommonName; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.Entity; @@ -458,7 +461,7 @@ public static SearchSourceBuilder addUserBackendRolesFilter(User user, SearchSou } else if (query instanceof BoolQueryBuilder) { ((BoolQueryBuilder) query).filter(boolQueryBuilder); } else { - throw new OpenSearchException("Search API does not support queries other than BoolQuery"); + throw new AnomalyDetectionException("Search API does not support queries other than BoolQuery"); } return searchSourceBuilder; } @@ -562,13 +565,13 @@ public static void onGetAdResponse( function.accept(detector); } else { logger.debug("User: " + requestUser.getName() + " does not have permissions to access detector: " + detectorId); - listener.onFailure(new OpenSearchException("User does not have permissions to access detector: " + detectorId)); + listener.onFailure(new AnomalyDetectionException(NO_PERMISSION_TO_ACCESS_DETECTOR + detectorId)); } } catch (Exception e) { - listener.onFailure(new OpenSearchException("Unable to get user information from detector " + detectorId)); + listener.onFailure(new AnomalyDetectionException(FAIL_TO_GET_USER_INFO + detectorId)); } } else { - listener.onFailure(new ResourceNotFoundException("AnomalyDetector is not found with id: " + detectorId)); + listener.onFailure(new ResourceNotFoundException(detectorId, FAIL_TO_FIND_DETECTOR_MSG + detectorId)); } } @@ -601,7 +604,7 @@ public static boolean checkFilterByBackendRoles(User requestedUser, ActionListen if (requestedUser.getBackendRoles().isEmpty()) { listener .onFailure( - new OpenSearchException( + new AnomalyDetectionException( "Filter by backend roles is enabled and User " + requestedUser.getName() + " does not have backend roles configured" ) ); @@ -660,7 +663,8 @@ public static SearchSourceBuilder batchFeatureQuery( .fixedInterval(DateHistogramInterval.seconds((int) intervalSeconds)) ); - CompositeAggregationBuilder aggregationBuilder = new CompositeAggregationBuilder(FEATURE_AGGS, sources).size(1000); + CompositeAggregationBuilder aggregationBuilder = new CompositeAggregationBuilder(FEATURE_AGGS, sources) + .size(MAX_BATCH_TASK_PIECE_SIZE); if (detector.getEnabledFeatureIds().size() == 0) { throw new AnomalyDetectionException("No enabled feature configured").countedInStats(false); diff --git a/src/main/java/org/opensearch/ad/util/RestHandlerUtils.java b/src/main/java/org/opensearch/ad/util/RestHandlerUtils.java index 2298b9f94..4711d3d62 100644 --- a/src/main/java/org/opensearch/ad/util/RestHandlerUtils.java +++ b/src/main/java/org/opensearch/ad/util/RestHandlerUtils.java @@ -26,11 +26,18 @@ package org.opensearch.ad.util; +import static org.opensearch.rest.RestStatus.BAD_REQUEST; +import static org.opensearch.rest.RestStatus.INTERNAL_SERVER_ERROR; + import java.io.IOException; import java.util.HashSet; import java.util.List; import java.util.Set; +import org.opensearch.OpenSearchStatusException; +import org.opensearch.action.ActionListener; +import org.opensearch.ad.common.exception.AnomalyDetectionException; +import org.opensearch.ad.common.exception.ResourceNotFoundException; import org.opensearch.ad.model.AnomalyDetector; import org.opensearch.ad.model.Feature; import org.opensearch.common.Strings; @@ -41,8 +48,10 @@ import org.opensearch.common.xcontent.XContentHelper; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.common.xcontent.XContentType; +import org.opensearch.index.IndexNotFoundException; import org.opensearch.rest.RestChannel; import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestStatus; import org.opensearch.search.fetch.subphase.FetchSourceContext; import com.google.common.collect.ImmutableMap; @@ -143,4 +152,32 @@ private static String validateFeatures(List features) { } return errorMsgBuilder.toString(); } + + /** + * Wrap action listener to avoid return verbose error message and wrong 500 error to user. + * Suggestion for exception handling in AD: + * 1. If the error is caused by wrong input, throw IllegalArgumentException exception. + * 2. For other errors, please use AnomalyDetectionException or its subclass, or use + * OpenSearchStatusException. + * + * @param actionListener action listener + * @param generalErrorMessage general error message + * @param action listener response type + * @return wrapped action listener + */ + public static ActionListener wrapRestActionListener(ActionListener actionListener, String generalErrorMessage) { + return ActionListener.wrap(r -> { actionListener.onResponse(r); }, e -> { + if (e instanceof OpenSearchStatusException || e instanceof IndexNotFoundException) { + actionListener.onFailure(e); + } else { + RestStatus status = e instanceof IllegalArgumentException || e instanceof ResourceNotFoundException + ? BAD_REQUEST + : INTERNAL_SERVER_ERROR; + String errorMessage = e instanceof IllegalArgumentException || e instanceof AnomalyDetectionException + ? e.getMessage() + : generalErrorMessage; + actionListener.onFailure(new OpenSearchStatusException(errorMessage, status)); + } + }); + } } diff --git a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java index 44e1cd4c2..7e9272247 100644 --- a/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java +++ b/src/test/java/org/opensearch/ad/caching/PriorityCacheTests.java @@ -57,6 +57,7 @@ import org.mockito.ArgumentCaptor; import org.opensearch.OpenSearchException; import org.opensearch.ad.MemoryTracker; +import org.opensearch.ad.common.exception.AnomalyDetectionException; import org.opensearch.ad.common.exception.LimitExceededException; import org.opensearch.ad.ml.CheckpointDao; import org.opensearch.ad.ml.EntityModel; @@ -418,7 +419,7 @@ public void testFailedConcurrentMaintenance() throws InterruptedException { new Thread(new FailedCleanRunnable(scheduledThreadCountDown)).start(); cacheProvider.maintenance(); - } catch (OpenSearchException e) { + } catch (AnomalyDetectionException e) { scheduledThreadCountDown.countDown(); } diff --git a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java index f6db42c04..2d8716bd2 100644 --- a/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java +++ b/src/test/java/org/opensearch/ad/rest/AnomalyDetectorRestApiIT.java @@ -27,6 +27,7 @@ package org.opensearch.ad.rest; import static org.hamcrest.Matchers.containsString; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG; import java.io.IOException; import java.time.Instant; @@ -837,7 +838,7 @@ public void testStartAdJobWithNonexistingDetector() throws Exception { TestHelpers .assertFailWith( ResponseException.class, - "AnomalyDetector is not found", + FAIL_TO_FIND_DETECTOR_MSG, () -> TestHelpers .makeRequest( client(), @@ -939,7 +940,7 @@ public void testStopNonExistingAdJob() throws Exception { TestHelpers .assertFailWith( ResponseException.class, - "AnomalyDetector is not found", + FAIL_TO_FIND_DETECTOR_MSG, () -> TestHelpers .makeRequest( client(), diff --git a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java index 0e978f949..aa37bd395 100644 --- a/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/AnomalyDetectorJobTransportActionTests.java @@ -27,6 +27,7 @@ package org.opensearch.ad.transport; import static org.opensearch.ad.constant.CommonErrorMessages.DETECTOR_IS_RUNNING; +import static org.opensearch.ad.constant.CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG; import static org.opensearch.ad.settings.AnomalyDetectorSettings.BATCH_TASK_PIECE_INTERVAL_SECONDS; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_BATCH_TASK_PER_NODE; import static org.opensearch.ad.settings.AnomalyDetectorSettings.MAX_OLD_AD_TASK_DOCS_PER_DETECTOR; @@ -118,7 +119,7 @@ public void testDetectorNotFound() { OpenSearchStatusException.class, () -> client().execute(AnomalyDetectorJobAction.INSTANCE, request).actionGet(10000) ); - assertTrue(exception.getMessage().contains("AnomalyDetector is not found")); + assertTrue(exception.getMessage().contains(FAIL_TO_FIND_DETECTOR_MSG)); } @Ignore diff --git a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java index 8b8105aa7..813f52561 100644 --- a/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java +++ b/src/test/java/org/opensearch/ad/transport/GetAnomalyDetectorTests.java @@ -33,13 +33,13 @@ import static org.opensearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; import java.io.IOException; -import java.security.InvalidParameterException; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import org.junit.AfterClass; import org.junit.BeforeClass; +import org.opensearch.OpenSearchStatusException; import org.opensearch.action.ActionListener; import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.GetResponse; @@ -136,7 +136,7 @@ public void testInvalidRequest() throws IOException { future = new PlainActionFuture<>(); action.doExecute(null, request, future); - assertException(future, InvalidParameterException.class, CommonErrorMessages.EMPTY_PROFILES_COLLECT); + assertException(future, OpenSearchStatusException.class, CommonErrorMessages.EMPTY_PROFILES_COLLECT); } @SuppressWarnings("unchecked") @@ -161,6 +161,6 @@ public void testValidRequest() throws IOException { future = new PlainActionFuture<>(); action.doExecute(null, request, future); - assertException(future, InvalidParameterException.class, CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG); + assertException(future, OpenSearchStatusException.class, CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG); } } diff --git a/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java b/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java index 47bf9b5e2..f885fcf07 100644 --- a/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java +++ b/src/test/java/org/opensearch/ad/transport/PreviewAnomalyDetectorTransportActionTests.java @@ -48,7 +48,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import org.opensearch.OpenSearchException; +import org.opensearch.OpenSearchStatusException; import org.opensearch.Version; import org.opensearch.action.ActionListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; @@ -62,7 +62,6 @@ import org.opensearch.ad.AnomalyDetectorRunner; import org.opensearch.ad.TestHelpers; import org.opensearch.ad.breaker.ADCircuitBreakerService; -import org.opensearch.ad.common.exception.LimitExceededException; import org.opensearch.ad.constant.CommonErrorMessages; import org.opensearch.ad.feature.FeatureManager; import org.opensearch.ad.feature.Features; @@ -351,7 +350,7 @@ public void onResponse(PreviewAnomalyDetectorResponse response) { @Override public void onFailure(Exception e) { - Assert.assertTrue(e.getClass() == OpenSearchException.class); + Assert.assertEquals(OpenSearchStatusException.class, e.getClass()); inProgressLatch.countDown(); } }; @@ -437,7 +436,7 @@ public void onResponse(PreviewAnomalyDetectorResponse response) { @Override public void onFailure(Exception e) { - Assert.assertTrue("actual class: " + e.getClass(), e instanceof LimitExceededException); + Assert.assertTrue("actual class: " + e.getClass(), e instanceof OpenSearchStatusException); Assert.assertTrue(e.getMessage().contains(CommonErrorMessages.MEMORY_CIRCUIT_BROKEN_ERR_MSG)); inProgressLatch.countDown(); } diff --git a/src/test/java/org/opensearch/ad/util/ParseUtilsTests.java b/src/test/java/org/opensearch/ad/util/ParseUtilsTests.java index a8a493fb5..7de299646 100644 --- a/src/test/java/org/opensearch/ad/util/ParseUtilsTests.java +++ b/src/test/java/org/opensearch/ad/util/ParseUtilsTests.java @@ -223,7 +223,7 @@ public void testBatchFeatureQuery() throws IOException { + "\"to\":20,\"include_lower\":true,\"include_upper\":true,\"boost\":1.0}}}],\"should\":[{\"term\":{\"tag\":" + "{\"value\":\"wow\",\"boost\":1.0}}},{\"term\":{\"tag\":{\"value\":\"elasticsearch\",\"boost\":1.0}}}]," + "\"adjust_pure_negative\":true,\"minimum_should_match\":\"1\",\"boost\":1.0}}],\"adjust_pure_negative" - + "\":true,\"boost\":1.0}},\"aggregations\":{\"feature_aggs\":{\"composite\":{\"size\":1000,\"sources\":" + + "\":true,\"boost\":1.0}},\"aggregations\":{\"feature_aggs\":{\"composite\":{\"size\":10000,\"sources\":" + "[{\"date_histogram\":{\"date_histogram\":{\"field\":\"" + detector.getTimeField() + "\",\"missing_bucket\":false,\"order\":\"asc\","