Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
filter out exceptions which should not be counted in failure stats (#344
Browse files Browse the repository at this point in the history
)
  • Loading branch information
ylwu-amzn authored Dec 29, 2020
1 parent 5d0bcb5 commit 667dd5c
Show file tree
Hide file tree
Showing 16 changed files with 471 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -20,7 +20,14 @@
*/
public class AnomalyDetectionException extends RuntimeException {

private final String anomalyDetectorId;
private String anomalyDetectorId;
// countedInStats will be used to tell whether the exception should be
// counted in failure stats.
private boolean countedInStats = true;

public AnomalyDetectionException(String message) {
super(message);
}

/**
* Constructor with an anomaly detector ID and a message.
Expand All @@ -38,6 +45,10 @@ public AnomalyDetectionException(String adID, String message, Throwable cause) {
this.anomalyDetectorId = adID;
}

public AnomalyDetectionException(Throwable cause) {
super(cause);
}

public AnomalyDetectionException(String adID, Throwable cause) {
super(cause);
this.anomalyDetectorId = adID;
Expand All @@ -51,4 +62,34 @@ public AnomalyDetectionException(String adID, Throwable cause) {
public String getAnomalyDetectorId() {
return this.anomalyDetectorId;
}

/**
* Returns if the exception should be counted in stats.
*
* @return true if should count the exception in stats; otherwise return false
*/
public boolean isCountedInStats() {
return countedInStats;
}

/**
* Set if the exception should be counted in stats.
*
* @param countInStats count the exception in stats
* @return the exception itself
*/
public AnomalyDetectionException countedInStats(boolean countInStats) {
this.countedInStats = countInStats;
return this;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("Anomaly Detector ");
sb.append(anomalyDetectorId);
sb.append(' ');
sb.append(super.toString());
return sb.toString();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -16,11 +16,14 @@
package com.amazon.opendistroforelasticsearch.ad.common.exception;

/**
* All exception visible to AD transport layer's client is under ClientVisible.
*
* All exception visible to AD transport layer's client is under ClientException.
*/
public class ClientException extends AnomalyDetectionException {

public ClientException(String message) {
super(message);
}

public ClientException(String anomalyDetectorId, String message) {
super(anomalyDetectorId, message);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,11 @@
public class EndRunException extends ClientException {
private boolean endNow;

public EndRunException(String message, boolean endNow) {
super(message);
this.endNow = endNow;
}

public EndRunException(String anomalyDetectorId, String message, boolean endNow) {
super(anomalyDetectorId, message);
this.endNow = endNow;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,5 +28,6 @@ public class LimitExceededException extends EndRunException {
*/
public LimitExceededException(String anomalyDetectorId, String message) {
super(anomalyDetectorId, message, true);
this.countedInStats(false);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
Expand Down Expand Up @@ -28,5 +28,6 @@ public class ResourceNotFoundException extends AnomalyDetectionException {
*/
public ResourceNotFoundException(String detectorId, String message) {
super(detectorId, message);
countedInStats(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,5 @@ public class CommonErrorMessages {
public static final String FEATURE_NOT_AVAILABLE_ERR_MSG = "No Feature in current detection window.";
public static final String MEMORY_CIRCUIT_BROKEN_ERR_MSG = "AD memory circuit is broken.";
public static final String DISABLED_ERR_MSG = "AD plugin is disabled. To enable update opendistro.anomaly_detection.enabled to true";
public static final String INVALID_SEARCH_QUERY_MSG = "Invalid search query.";
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import com.amazon.opendistroforelasticsearch.ad.common.exception.EndRunException;
import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector;
import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration;
import com.amazon.opendistroforelasticsearch.ad.util.ClientUtil;
Expand Down Expand Up @@ -192,7 +193,9 @@ private double parseAggregation(Aggregation aggregation) {
result = percentile.next().getValue();
}
}
return Optional.ofNullable(result).orElseThrow(() -> new IllegalStateException("Failed to parse aggregation " + aggregation));
return Optional
.ofNullable(result)
.orElseThrow(() -> new EndRunException("Failed to parse aggregation " + aggregation, true).countedInStats(false));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public AnomalyDetectionIndices(
* @return anomaly detector index mapping
* @throws IOException IOException if mapping file can't be read correctly
*/
private String getAnomalyDetectorMappings() throws IOException {
public static String getAnomalyDetectorMappings() throws IOException {
URL url = AnomalyDetectionIndices.class.getClassLoader().getResource(ANOMALY_DETECTORS_INDEX_MAPPING_FILE);
return Resources.toString(url, Charsets.UTF_8);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,13 @@ private void onGetAnomalyDetectorResponse(GetResponse response) throws IOExcepti
channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "Can't start detector job as no features configured"));
return;
}
if (detector.getEnabledFeatureIds().size() == 0) {
channel
.sendResponse(
new BytesRestResponse(RestStatus.BAD_REQUEST, "Can't start detector job as no enabled features configured")
);
return;
}

IntervalTimeConfiguration interval = (IntervalTimeConfiguration) detector.getDetectionInterval();
Schedule schedule = new IntervalSchedule(Instant.now(), (int) interval.getInterval(), interval.getUnit());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.action.support.IndicesOptions;
Expand All @@ -68,6 +70,7 @@
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper;
import org.elasticsearch.node.NodeClosedException;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.ConnectTransportException;
import org.elasticsearch.transport.ReceiveTimeoutTransportException;
Expand All @@ -76,6 +79,8 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexNotFoundException;

import static com.amazon.opendistroforelasticsearch.ad.constant.CommonErrorMessages.INVALID_SEARCH_QUERY_MSG;

public class AnomalyResultTransportAction extends HandledTransportAction<ActionRequest, AnomalyResultResponse> {

private static final Logger LOG = LogManager.getLogger(AnomalyResultTransportAction.class);
Expand All @@ -91,6 +96,7 @@ public class AnomalyResultTransportAction extends HandledTransportAction<ActionR
static final String RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE = ElasticsearchException
.getExceptionName(new ResourceNotFoundException("", ""));
static final String NULL_RESPONSE = "Received null response from";
static final String TROUBLE_QUERYING_ERR_MSG = "Having trouble querying data: ";

private final TransportService transportService;
private final ADStateManager stateManager;
Expand Down Expand Up @@ -202,14 +208,18 @@ protected void doExecute(Task task, ActionRequest actionRequest, ActionListener<
AnomalyResultRequest request = AnomalyResultRequest.fromActionRequest(actionRequest);
ActionListener<AnomalyResultResponse> original = listener;
listener = ActionListener.wrap(original::onResponse, e -> {
adStats.getStat(StatNames.AD_EXECUTE_FAIL_COUNT.getName()).increment();
// If exception is AnomalyDetectionException and it should not be counted in stats,
// we will not count it in failure stats.
if (!(e instanceof AnomalyDetectionException) || ((AnomalyDetectionException) e).isCountedInStats()) {
adStats.getStat(StatNames.AD_EXECUTE_FAIL_COUNT.getName()).increment();
}
original.onFailure(e);
});

String adID = request.getAdID();

if (!EnabledSetting.isADPluginEnabled()) {
throw new EndRunException(adID, CommonErrorMessages.DISABLED_ERR_MSG, true);
throw new EndRunException(adID, CommonErrorMessages.DISABLED_ERR_MSG, true).countedInStats(false);
}

adStats.getStat(StatNames.AD_EXECUTE_REQUEST_COUNT.getName()).increment();
Expand Down Expand Up @@ -237,6 +247,10 @@ private ActionListener<Optional<AnomalyDetector>> onGetDetector(
return;
}
AnomalyDetector anomalyDetector = detector.get();
if (anomalyDetector.getEnabledFeatureIds().size() == 0) {
listener.onFailure(new EndRunException(ALL_FEATURES_DISABLED_ERR_MSG, true).countedInStats(false));
return;
}

String thresholdModelID = modelManager.getThresholdModelId(adID);
Optional<DiscoveryNode> asThresholdNode = hashRing.getOwningNode(thresholdModelID);
Expand Down Expand Up @@ -367,15 +381,7 @@ private ActionListener<SinglePointFeatures> onFeatureResponse(
new ActionListenerResponseHandler<>(rcfListener, RCFResultResponse::new)
);
}
}, exception -> {
if (exception instanceof IndexNotFoundException) {
listener.onFailure(new EndRunException(adID, "Having trouble querying data: " + exception.getMessage(), true));
} else if (exception instanceof IllegalArgumentException && detector.getEnabledFeatureIds().isEmpty()) {
listener.onFailure(new EndRunException(adID, ALL_FEATURES_DISABLED_ERR_MSG, true));
} else {
handleExecuteException(exception, listener, adID);
}
});
}, exception -> { handleFailure(exception, listener, adID); });
}

/**
Expand Down Expand Up @@ -477,17 +483,52 @@ private CombinedRcfResult getCombinedResult(List<RCFResultResponse> rcfResults)
return modelManager.combineRcfResults(rcfResultLib);
}

private void handleFailure(Exception exception, ActionListener<AnomalyResultResponse> listener, String adID) {
if (exception instanceof IndexNotFoundException) {
listener.onFailure(new EndRunException(adID, TROUBLE_QUERYING_ERR_MSG + exception.getMessage(), true).countedInStats(false));
} else if (exception instanceof EndRunException) {
// invalid feature query
listener.onFailure(exception);
} else {
handleExecuteException(exception, listener, adID);
}
}

void handleExecuteException(Exception ex, ActionListener<AnomalyResultResponse> listener, String adID) {
if (ex instanceof ClientException) {
listener.onFailure(ex);
} else if (ex instanceof AnomalyDetectionException) {
listener.onFailure(new InternalFailure((AnomalyDetectionException) ex));
} else if (ex instanceof SearchPhaseExecutionException && invalidQuery((SearchPhaseExecutionException) ex)) {
// This is to catch invalid aggregation on wrong field type. For example,
// sum aggregation on text field. We should end detector run for such case.
listener
.onFailure(
new EndRunException(
adID,
INVALID_SEARCH_QUERY_MSG + ((SearchPhaseExecutionException) ex).getDetailedMessage(),
ex,
true
).countedInStats(false)
);
} else {
Throwable cause = ExceptionsHelper.unwrapCause(ex);
listener.onFailure(new InternalFailure(adID, cause));
}
}

private boolean invalidQuery(SearchPhaseExecutionException ex) {
boolean invalidQuery = true;
// If all shards return bad request and failure cause is IllegalArgumentException, we
// consider the feature query is invalid and will not count the error in failure stats.
for (ShardSearchFailure failure : ex.shardFailures()) {
if (RestStatus.BAD_REQUEST != failure.status() || !(failure.getCause() instanceof IllegalArgumentException)) {
invalidQuery = false;
}
}
return invalidQuery;
}

class RCFActionListener implements ActionListener<RCFResultResponse> {
private List<RCFResultResponse> rcfResults;
private String modelID;
Expand Down
Loading

0 comments on commit 667dd5c

Please sign in to comment.