Skip to content

Commit

Permalink
Handle more AD exceptions thrown over the wire/network (#157)
Browse files Browse the repository at this point in the history
* Handle more AD exceptions thrown over the wire

OpenSearch restricts the kind of exceptions that can be thrown over the wire (Read OpenSearchException.OpenSearchExceptionHandle). Since we cannot add our exception like ResourceNotFoundException without modifying OpenSearch's code, we must unwrap the NotSerializableExceptionWrapper and check its root cause message. This PR adds checks all AD exceptions in such cases. Previously, we only checked a couple of them.

Previously, we wrap all exceptions thrown in AD using one of AnomalyDetectionExceptions. The wrap brings a complication to NotSerializableExceptionWrapper thrown over the wire as NotSerializableExceptionWrapper won't keep the original AnomalyDetectionExceptions object and only keeps the cause message. Therefore, we won't be able to restore the original exception encapsulated inside AnomalyDetectionExceptions. This PR stops wrapping the original exception inside AnomalyDetectionExceptions.

This PR also adds a null check inside EntityModel in case of a potential null pointer exception.

Testing done:
1. Added unit tests to check if exceptions wrapped inside NotSerializableExceptionWrapper can be decoded.
2. Did e2e testing for basic single-stream and HCAD workflow.
  • Loading branch information
kaituo authored Aug 12, 2021
1 parent 76aa828 commit 06fa595
Show file tree
Hide file tree
Showing 13 changed files with 446 additions and 64 deletions.
6 changes: 3 additions & 3 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.EntityProfileRunner',
'org.opensearch.ad.NodeStateManager',
'org.opensearch.ad.util.BulkUtil',
'org.opensearch.ad.util.ExceptionUtil',
'org.opensearch.ad.ml.EntityModel',

// TODO: unified flow caused coverage drop
'org.opensearch.ad.transport.AnomalyDetectorJobRequest',
Expand All @@ -353,7 +351,9 @@ List<String> jacocoExclusions = [
'org.opensearch.ad.transport.AnomalyDetectorJobTransportAction',
'org.opensearch.ad.transport.CronNodeRequest',
'org.opensearch.ad.transport.DeleteAnomalyResultsTransportAction',
'org.opensearch.ad.transport.GetAnomalyDetectorResponse'
'org.opensearch.ad.transport.GetAnomalyDetectorResponse',
'org.opensearch.ad.transport.ADBatchAnomalyResultRequest',
'org.opensearch.ad.transport.ADBatchTaskRemoteExecutionTransportAction',
]

jacocoTestCoverageVerification {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.ad.common.exception.NotSerializedADExceptionName;
import org.opensearch.ad.common.exception.ResourceNotFoundException;
import org.opensearch.ad.constant.CommonErrorMessages;
import org.opensearch.ad.constant.CommonName;
Expand Down Expand Up @@ -502,7 +503,11 @@ private ActionListener<RCFPollingResponse> onPollRCFUpdates(
// exception can be a RemoteTransportException
Exception causeException = (Exception) cause;
if (ExceptionUtil
.isException(causeException, ResourceNotFoundException.class, ExceptionUtil.RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE)
.isException(
causeException,
ResourceNotFoundException.class,
NotSerializedADExceptionName.RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE.getName()
)
|| (ExceptionUtil.isIndexNotAvailable(causeException)
&& causeException.getMessage().contains(CommonName.CHECKPOINT_INDEX_NAME))) {
// cannot find checkpoint
Expand Down
7 changes: 3 additions & 4 deletions src/main/java/org/opensearch/ad/NodeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.time.Instant;
import java.util.Optional;

import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.model.AnomalyDetector;

/**
Expand All @@ -50,7 +49,7 @@ public class NodeState implements ExpiringState {
// to check if the error for a detector has changed or not. If changed, trigger indexing.
private Optional<String> lastDetectionError;
// last error.
private Optional<AnomalyDetectionException> exception;
private Optional<Exception> exception;
// flag indicating whether checkpoint for the detector exists
private boolean checkPointExists;
// clock to get current time
Expand Down Expand Up @@ -150,7 +149,7 @@ public void setLastDetectionError(String lastError) {
*
* @return last exception if any
*/
public Optional<AnomalyDetectionException> getException() {
public Optional<Exception> getException() {
refreshLastUpdateTime();
return exception;
}
Expand All @@ -159,7 +158,7 @@ public Optional<AnomalyDetectionException> getException() {
*
* @param exception exception to record
*/
public void setException(AnomalyDetectionException exception) {
public void setException(Exception exception) {
this.exception = Optional.ofNullable(exception);
refreshLastUpdateTime();
}
Expand Down
15 changes: 4 additions & 11 deletions src/main/java/org/opensearch/ad/NodeStateManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.GetResponse;
import org.opensearch.ad.common.exception.AnomalyDetectionException;
import org.opensearch.ad.common.exception.EndRunException;
import org.opensearch.ad.common.exception.LimitExceededException;
import org.opensearch.ad.constant.CommonErrorMessages;
Expand Down Expand Up @@ -331,13 +330,13 @@ public void setLastDetectionError(String adID, String error) {
* @param adID detector id
* @return the detector's exception
*/
public Optional<AnomalyDetectionException> fetchExceptionAndClear(String adID) {
public Optional<Exception> fetchExceptionAndClear(String adID) {
NodeState state = states.get(adID);
if (state == null) {
return Optional.empty();
}

Optional<AnomalyDetectionException> exception = state.getException();
Optional<Exception> exception = state.getException();
exception.ifPresent(e -> state.setException(null));
return exception;
}
Expand All @@ -363,21 +362,15 @@ public void setException(String detectorId, Exception e) {
return;
}
NodeState state = states.computeIfAbsent(detectorId, d -> new NodeState(detectorId, clock));
Optional<AnomalyDetectionException> exception = state.getException();
Optional<Exception> exception = state.getException();
if (exception.isPresent()) {
Exception higherPriorityException = ExceptionUtil.selectHigherPriorityException(e, exception.get());
if (higherPriorityException != e) {
return;
}
}

AnomalyDetectionException adExep = null;
if (e instanceof AnomalyDetectionException) {
adExep = (AnomalyDetectionException) e;
} else {
adExep = new AnomalyDetectionException(detectorId, e);
}
state.setException(adExep);
state.setException(e);
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
* Modifications Copyright OpenSearch Contributors. See
* GitHub history for details.
*/

package org.opensearch.ad.common.exception;

import java.util.Optional;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.common.io.stream.NotSerializableExceptionWrapper;

/**
* OpenSearch restricts the kind of exceptions that can be thrown over the wire
* (Read OpenSearchException.OpenSearchExceptionHandle https://tinyurl.com/wv6c6t7x).
* Since we cannot add our own exception like ResourceNotFoundException without modifying
* OpenSearch's code, we have to unwrap the NotSerializableExceptionWrapper and
* check its root cause message.
*
*/
public enum NotSerializedADExceptionName {

RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new ResourceNotFoundException("", ""))),
LIMIT_EXCEEDED_EXCEPTION_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new LimitExceededException("", "", false))),
END_RUN_EXCEPTION_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new EndRunException("", "", false))),
ANOMALY_DETECTION_EXCEPTION_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new AnomalyDetectionException("", ""))),
INTERNAL_FAILURE_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new InternalFailure("", ""))),
CLIENT_EXCEPTION_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new ClientException("", ""))),
CANCELLATION_EXCEPTION_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new ADTaskCancelledException("", ""))),
DUPLICATE_TASK_EXCEPTION_NAME_UNDERSCORE(OpenSearchException.getExceptionName(new DuplicateTaskException("")));

private static final Logger LOG = LogManager.getLogger(NotSerializedADExceptionName.class);
private final String name;

NotSerializedADExceptionName(String name) {
this.name = name;
}

public String getName() {
return name;
}

/**
* Convert from a NotSerializableExceptionWrapper to an AnomalyDetectionException.
* Since NotSerializableExceptionWrapper does not keep some details we need, we
* initialize the exception with default values.
* @param exception an NotSerializableExceptionWrapper exception.
* @param adID Detector Id.
* @return converted AnomalyDetectionException
*/
public static Optional<AnomalyDetectionException> convertWrappedAnomalyDetectionException(
NotSerializableExceptionWrapper exception,
String adID
) {
String exceptionMsg = exception.getMessage().trim();

AnomalyDetectionException convertedException = null;
for (NotSerializedADExceptionName adException : values()) {
if (exceptionMsg.startsWith(adException.getName())) {
switch (adException) {
case RESOURCE_NOT_FOUND_EXCEPTION_NAME_UNDERSCORE:
convertedException = new ResourceNotFoundException(adID, exceptionMsg);
break;
case LIMIT_EXCEEDED_EXCEPTION_NAME_UNDERSCORE:
convertedException = new LimitExceededException(adID, exceptionMsg, false);
break;
case END_RUN_EXCEPTION_NAME_UNDERSCORE:
convertedException = new EndRunException(adID, exceptionMsg, false);
break;
case ANOMALY_DETECTION_EXCEPTION_NAME_UNDERSCORE:
convertedException = new AnomalyDetectionException(adID, exceptionMsg);
break;
case INTERNAL_FAILURE_NAME_UNDERSCORE:
convertedException = new InternalFailure(adID, exceptionMsg);
break;
case CLIENT_EXCEPTION_NAME_UNDERSCORE:
convertedException = new ClientException(adID, exceptionMsg);
break;
case CANCELLATION_EXCEPTION_NAME_UNDERSCORE:
convertedException = new ADTaskCancelledException(exceptionMsg, "");
break;
case DUPLICATE_TASK_EXCEPTION_NAME_UNDERSCORE:
convertedException = new DuplicateTaskException(exceptionMsg);
break;
default:
LOG.warn(new ParameterizedMessage("Unexpected AD exception {}", adException));
break;
}
}
}
return Optional.ofNullable(convertedException);
}
}
8 changes: 7 additions & 1 deletion src/main/java/org/opensearch/ad/ml/EntityModel.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

package org.opensearch.ad.ml;

import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;

Expand Down Expand Up @@ -61,6 +62,9 @@ public Queue<double[]> getSamples() {
}

public void addSample(double[] sample) {
if (this.samples == null) {
this.samples = new ArrayDeque<>();
}
if (sample != null && sample.length != 0) {
this.samples.add(sample);
}
Expand All @@ -83,7 +87,9 @@ public void setThreshold(ThresholdingModel threshold) {
}

public void clear() {
samples.clear();
if (samples != null) {
samples.clear();
}
rcf = null;
threshold = null;
}
Expand Down
Loading

0 comments on commit 06fa595

Please sign in to comment.