Skip to content

Commit

Permalink
Check if indices exist in the presence of empty search results (#495)
Browse files Browse the repository at this point in the history
* Check if indices exist in the presence of empty search results

Previously, CompositeRetriever throws an IllegalArgumentException in the presence of empty results, which gets translated to internal failure and increments AD failure count. When the source index is a regex like blah*, we will get an empty response even if the index does not exist. This PR checks indices exist in the presence of empty search results. If yes, we throw an IndexNotFoundException that ends up being converted to EndRunException; if no, we still throw an IllegalArgumentException.

Testing done:
1. added unit tests
2. reproduced manually and verified the change fixed the issue.

Signed-off-by: Kaituo Li <kaituo@amazon.com>
  • Loading branch information
kaituo authored Apr 11, 2022
1 parent ee057e2 commit 9d44f1b
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 19 deletions.
62 changes: 49 additions & 13 deletions src/main/java/org/opensearch/ad/feature/CompositeRetriever.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import java.io.IOException;
import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
Expand All @@ -25,13 +26,17 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.ad.model.AnomalyDetector;
import org.opensearch.ad.model.Entity;
import org.opensearch.ad.model.Feature;
import org.opensearch.ad.util.ParseUtils;
import org.opensearch.client.Client;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.xcontent.NamedXContentRegistry;
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.query.BoolQueryBuilder;
import org.opensearch.index.query.RangeQueryBuilder;
import org.opensearch.search.aggregations.Aggregation;
Expand Down Expand Up @@ -66,6 +71,8 @@ public class CompositeRetriever extends AbstractRetriever {
private final int pageSize;
private long expirationEpochMs;
private Clock clock;
private IndexNameExpressionResolver indexNameExpressionResolver;
private ClusterService clusterService;

public CompositeRetriever(
long dataStartEpoch,
Expand All @@ -77,7 +84,9 @@ public CompositeRetriever(
Clock clock,
Settings settings,
int maxEntitiesPerInterval,
int pageSize
int pageSize,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService
) {
this.dataStartEpoch = dataStartEpoch;
this.dataEndEpoch = dataEndEpoch;
Expand All @@ -89,6 +98,8 @@ public CompositeRetriever(
this.pageSize = pageSize;
this.expirationEpochMs = expirationEpochMs;
this.clock = clock;
this.indexNameExpressionResolver = indexNameExpressionResolver;
this.clusterService = clusterService;
}

// a constructor that provide default value of clock
Expand All @@ -101,7 +112,9 @@ public CompositeRetriever(
long expirationEpochMs,
Settings settings,
int maxEntitiesPerInterval,
int pageSize
int pageSize,
IndexNameExpressionResolver indexNameExpressionResolver,
ClusterService clusterService
) {
this(
dataStartEpoch,
Expand All @@ -113,7 +126,9 @@ public CompositeRetriever(
Clock.systemUTC(),
settings,
maxEntitiesPerInterval,
pageSize
pageSize,
indexNameExpressionResolver,
clusterService
);
}

Expand Down Expand Up @@ -156,18 +171,22 @@ public PageIterator iterator() throws IOException {
public class PageIterator {
private SearchSourceBuilder source;
// a map from categorical field name to values (type: java.lang.Comparable)
Map<String, Object> afterKey;
private Map<String, Object> afterKey;
// number of iterations so far
private int iterations;

public PageIterator(SearchSourceBuilder source) {
this.source = source;
this.afterKey = null;
this.iterations = 0;
}

/**
* Results are returned using listener
* @param listener Listener to return results
*/
public void next(ActionListener<Page> listener) {
iterations++;
SearchRequest searchRequest = new SearchRequest(anomalyDetector.getIndices().toArray(new String[0]), source);
client.search(searchRequest, new ActionListener<SearchResponse>() {
@Override
Expand All @@ -183,13 +202,13 @@ public void onFailure(Exception e) {
}

private void processResponse(SearchResponse response, Runnable retry, ActionListener<Page> listener) {
if (shouldRetryDueToEmptyPage(response)) {
updateCompositeAfterKey(response, source);
retry.run();
return;
}

try {
if (shouldRetryDueToEmptyPage(response)) {
updateCompositeAfterKey(response, source);
retry.run();
return;
}

Page page = analyzePage(response);
// we can process at most maxEntities entities
if (totalResults <= maxEntities && afterKey != null) {
Expand Down Expand Up @@ -284,11 +303,28 @@ private boolean shouldRetryDueToEmptyPage(SearchResponse response) {
}

Optional<CompositeAggregation> getComposite(SearchResponse response) {
// When the source index is a regex like blah*, we will get empty response like
// the following even if no index starting with blah exists.
// {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}}
// Without regex, we will get IndexNotFoundException instead.
// {"error":{"root_cause":[{"type":"index_not_found_exception","reason":"no such
// index
// [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"}],"type":"index_not_found_exception","reason":"no
// such index
// [blah]","index":"blah","resource.id":"blah","resource.type":"index_or_alias","index_uuid":"_na_"},"status":404}%
if (response == null || response.getAggregations() == null) {
return Optional.empty();
List<String> sourceIndices = anomalyDetector.getIndices();
String[] concreteIndices = indexNameExpressionResolver
.concreteIndexNames(clusterService.state(), IndicesOptions.lenientExpandOpen(), sourceIndices.toArray(new String[0]));
if (concreteIndices.length == 0) {
throw new IndexNotFoundException(String.join(",", sourceIndices));
} else {
return Optional.empty();
}
}
Aggregation agg = response.getAggregations().get(AGG_NAME_COMP);
if (agg == null) {
// when current interval has no data
return Optional.empty();
}

Expand All @@ -301,12 +337,12 @@ Optional<CompositeAggregation> getComposite(SearchResponse response) {

/**
* Whether next page exists. Conditions are:
* 1) we haven't fetched any page yet (totalResults == 0) or afterKey is not null
* 1) this is the first time we query (iterations == 0) or afterKey is not null
* 2) next detection interval has not started
* @return true if the iteration has more pages.
*/
public boolean hasNext() {
return (totalResults == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis();
return (iterations == 0 || (totalResults > 0 && afterKey != null)) && expirationEpochMs > clock.millis();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,9 @@ private void executeAnomalyDetection(
nextDetectionStartTime,
settings,
maxEntitiesPerInterval,
pageSize
pageSize,
indexNameExpressionResolver,
clusterService
);

PageIterator pageIterator = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,10 @@ public void testParseBuckets() throws InstantiationException,
new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
1
);
hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100)));
hllpp.collect(0, BitMixer.mix64(randomIntBetween(1, 100)));
long hash1 = BitMixer.mix64(randomIntBetween(1, 100));
long hash2 = BitMixer.mix64(randomIntBetween(1, 100));
hllpp.collect(0, hash1);
hllpp.collect(0, hash2);

Constructor ctor = null;
ctor = InternalCardinality.class.getDeclaredConstructor(String.class, AbstractHyperLogLogPlusPlus.class, Map.class);
Expand All @@ -626,7 +628,8 @@ public void testParseBuckets() throws InstantiationException,
assertTrue(parsedResult.isPresent());
double[] parsedCardinality = parsedResult.get();
assertEquals(1, parsedCardinality.length);
assertEquals(2, parsedCardinality[0], 0.001);
double buckets = hash1 == hash2 ? 1 : 2;
assertEquals(buckets, parsedCardinality[0], 0.001);

// release MockBigArrays; otherwise, test will fail
Releasables.close(hllpp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,9 @@ public void testPageToString() {
clock,
settings,
10000,
1000
1000,
indexNameResolver,
clusterService
);
Map<Entity, double[]> results = new HashMap<>();
Entity entity1 = Entity.createEntityByReordering(attrs1);
Expand All @@ -1122,7 +1124,9 @@ public void testEmptyPageToString() {
clock,
settings,
10000,
1000
1000,
indexNameResolver,
clusterService
);

CompositeRetriever.Page page = retriever.new Page(null);
Expand Down Expand Up @@ -1283,4 +1287,44 @@ public void testSelectHigherExceptionInModelNode() throws InterruptedException,
EndRunException endRunException = (EndRunException) (exceptionCaptor.getValue());
assertTrue(!endRunException.isEndNow());
}

/**
* A missing index will cause the search result to contain null aggregation
* like {"took":0,"timed_out":false,"_shards":{"total":0,"successful":0,"skipped":0,"failed":0},"hits":{"max_score":0.0,"hits":[]}}
*
* The test verifies we can handle such situation and won't throw exceptions
* @throws InterruptedException while waiting for execution gets interruptted
*/
public void testMissingIndex() throws InterruptedException {
final CountDownLatch inProgressLatch = new CountDownLatch(1);

doAnswer(invocation -> {
ActionListener<SearchResponse> listener = invocation.getArgument(1);
listener
.onResponse(
new SearchResponse(
new SearchResponseSections(SearchHits.empty(), null, null, false, null, null, 1),
null,
1,
1,
0,
0,
ShardSearchFailure.EMPTY_ARRAY,
Clusters.EMPTY
)
);
inProgressLatch.countDown();
return null;
}).when(client).search(any(), any());

PlainActionFuture<AnomalyResultResponse> listener = new PlainActionFuture<>();

action.doExecute(null, request, listener);

AnomalyResultResponse response = listener.actionGet(10000L);
assertEquals(Double.NaN, response.getAnomalyGrade(), 0.01);

assertTrue(inProgressLatch.await(10000L, TimeUnit.MILLISECONDS));
verify(stateManager, times(1)).setException(eq(detectorId), any(EndRunException.class));
}
}

0 comments on commit 9d44f1b

Please sign in to comment.