From 6d5b6619de7795ecb9cb21c4c78a2efe4db83461 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Thu, 17 Aug 2023 00:27:21 +0000 Subject: [PATCH] optimize doc-level monitor workflow for index patterns Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 131 ++++++++++-------- .../alerting/util/DocLevelMonitorQueries.kt | 121 +++++++++------- .../opensearch/alerting/util/IndexUtils.kt | 5 - .../alerting/DocumentMonitorRunnerIT.kt | 121 ++++++++++++++++ 4 files changed, 264 insertions(+), 114 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index ec836ee47..435c012af 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -51,6 +51,7 @@ import org.opensearch.core.common.bytes.BytesReference import org.opensearch.core.rest.RestStatus import org.opensearch.core.xcontent.ToXContent import org.opensearch.core.xcontent.XContentBuilder +import org.opensearch.index.IndexNotFoundException import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.Operator import org.opensearch.index.query.QueryBuilders @@ -118,11 +119,14 @@ object DocumentLevelMonitorRunner : MonitorRunner() { try { // Resolve all passed indices to concrete indices - val indices = IndexUtils.resolveAllIndices( + val concreteIndices = IndexUtils.resolveAllIndices( docLevelMonitorInput.indices, monitorCtx.clusterService!!, monitorCtx.indexNameExpressionResolver!! ) + if (concreteIndices.isEmpty()) { + throw IndexNotFoundException(docLevelMonitorInput.indices[0]) + } monitorCtx.docLevelMonitorQueries!!.initDocLevelQueryIndex(monitor.dataSources) monitorCtx.docLevelMonitorQueries!!.indexDocLevelQueries( @@ -134,7 +138,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // cleanup old indices that are not monitored anymore from the same monitor for (ind in updatedLastRunContext.keys) { - if (!indices.contains(ind)) { + if (!concreteIndices.contains(ind)) { updatedLastRunContext.remove(ind) } } @@ -142,65 +146,76 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex - indices.forEach { indexName -> - // Prepare lastRunContext for each index - val indexLastRunContext = lastRunContext.getOrPut(indexName) { - val isIndexCreatedRecently = createdRecently( - monitor, - periodStart, - periodEnd, - monitorCtx.clusterService!!.state().metadata.index(indexName) - ) - MonitorMetadataService.createRunContextForIndex(indexName, isIndexCreatedRecently) - } - - // Prepare updatedLastRunContext for each index - val indexUpdatedRunContext = updateLastRunContext( - indexLastRunContext.toMutableMap(), - monitorCtx, - indexName - ) as MutableMap - updatedLastRunContext[indexName] = indexUpdatedRunContext - - val count: Int = indexLastRunContext["shards_count"] as Int - for (i: Int in 0 until count) { - val shard = i.toString() - - // update lastRunContext if its a temp monitor as we only want to view the last bit of data then - // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data - if (isTempMonitor) { - indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + docLevelMonitorInput.indices.forEach { indexName -> + val concreteIndices = IndexUtils.resolveAllIndices( + listOf(indexName), + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + val updatedIndexName = indexName.replace("*", "_") + + concreteIndices.forEach { concreteIndexName -> + // Prepare lastRunContext for each index + val indexLastRunContext = lastRunContext.getOrPut(concreteIndexName) { + val isIndexCreatedRecently = createdRecently( + monitor, + periodStart, + periodEnd, + monitorCtx.clusterService!!.state().metadata.index(concreteIndexName) + ) + MonitorMetadataService.createRunContextForIndex(concreteIndexName, isIndexCreatedRecently) } - } - // Prepare DocumentExecutionContext for each index - val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) + // Prepare updatedLastRunContext for each index + val indexUpdatedRunContext = updateLastRunContext( + indexLastRunContext.toMutableMap(), + monitorCtx, + concreteIndexName + ) as MutableMap + updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext + + val count: Int = indexLastRunContext["shards_count"] as Int + for (i: Int in 0 until count) { + val shard = i.toString() + + // update lastRunContext if its a temp monitor as we only want to view the last bit of data then + // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data + if (isTempMonitor) { + indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + } + } - val matchingDocs = getMatchingDocs( - monitor, - monitorCtx, - docExecutionContext, - indexName, - matchingDocIdsPerIndex?.get(indexName) - ) + // Prepare DocumentExecutionContext for each index + val docExecutionContext = DocumentExecutionContext(queries, indexLastRunContext, indexUpdatedRunContext) - if (matchingDocs.isNotEmpty()) { - val matchedQueriesForDocs = getMatchedQueries( - monitorCtx, - matchingDocs.map { it.second }, + val matchingDocs = getMatchingDocs( monitor, - monitorMetadata, - indexName + monitorCtx, + docExecutionContext, + updatedIndexName, + concreteIndexName, + matchingDocIdsPerIndex?.get(concreteIndexName) ) - matchedQueriesForDocs.forEach { hit -> - val id = hit.id.replace("_${indexName}_${monitor.id}", "") - - val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } - docIndices.forEach { idx -> - val docIndex = "${matchingDocs[idx].first}|$indexName" - inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) - docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + if (matchingDocs.isNotEmpty()) { + val matchedQueriesForDocs = getMatchedQueries( + monitorCtx, + matchingDocs.map { it.second }, + monitor, + monitorMetadata, + updatedIndexName, + concreteIndexName + ) + + matchedQueriesForDocs.forEach { hit -> + val id = hit.id.replace("_${updatedIndexName}_${monitor.id}", "") + + val docIndices = hit.field("_percolator_document_slot").values.map { it.toString().toInt() } + docIndices.forEach { idx -> + val docIndex = "${matchingDocs[idx].first}|$concreteIndexName" + inputRunResults.getOrPut(id) { mutableSetOf() }.add(docIndex) + docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) + } } } } @@ -554,6 +569,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorCtx: MonitorRunnerExecutionContext, docExecutionCtx: DocumentExecutionContext, index: String, + concreteIndex: String, docIds: List? = null ): List> { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int @@ -566,7 +582,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val hits: SearchHits = searchShard( monitorCtx, - index, + concreteIndex, shard, prevSeqNo, maxSeqNo, @@ -628,7 +644,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docs: List, monitor: Monitor, monitorMetadata: MonitorMetadata, - index: String + index: String, + concreteIndex: String ): SearchHits { val boolQueryBuilder = BoolQueryBuilder().must(QueryBuilders.matchQuery("index", index).operator(Operator.AND)) @@ -641,7 +658,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { val queryIndex = monitorMetadata.sourceToQueryIndexMapping[index + monitor.id] if (queryIndex == null) { val message = "Failed to resolve concrete queryIndex from sourceIndex during monitor execution!" + - " sourceIndex:$index queryIndex:${monitor.dataSources.queryIndex}" + " sourceIndex:$concreteIndex queryIndex:${monitor.dataSources.queryIndex}" logger.error(message) throw AlertingException.wrap( OpenSearchStatusException(message, RestStatus.INTERNAL_SERVER_ERROR) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 42512738e..06c938692 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -201,59 +201,69 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ val docLevelMonitorInput = monitor.inputs[0] as DocLevelMonitorInput val queries: List = docLevelMonitorInput.queries - val indices = IndexUtils.resolveAllIndices( - docLevelMonitorInput.indices, - monitorCtx.clusterService!!, - monitorCtx.indexNameExpressionResolver!! - ) - + val indices = docLevelMonitorInput.indices val clusterState = clusterService.state() // Run through each backing index and apply appropriate mappings to query index - indices?.forEach { indexName -> - if (clusterState.routingTable.hasIndex(indexName)) { - val indexMetadata = clusterState.metadata.index(indexName) - if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { - val properties = ( - (indexMetadata.mapping()?.sourceAsMap?.get("properties")) - as MutableMap - ) - // Node processor function is used to process leaves of index mappings tree - // - val leafNodeProcessor = - fun(fieldName: String, props: MutableMap): Triple> { - val newProps = props.toMutableMap() - if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { - val mappingsByType = monitor.dataSources.queryIndexMappingsByType - if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { - mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> - newProps[iter.key] = iter.value + indices.forEach { indexName -> + val concreteIndices = IndexUtils.resolveAllIndices( + listOf(indexName), + monitorCtx.clusterService!!, + monitorCtx.indexNameExpressionResolver!! + ) + val updatedIndexName = indexName.replace("*", "_") + val updatedProperties = mutableMapOf() + val allFlattenPaths = mutableListOf() + var sourceIndexFieldLimit = 0L + + concreteIndices.forEach { concreteIndexName -> + if (clusterState.routingTable.hasIndex(concreteIndexName)) { + val indexMetadata = clusterState.metadata.index(concreteIndexName) + if (indexMetadata.mapping()?.sourceAsMap?.get("properties") != null) { + val properties = ( + (indexMetadata.mapping()?.sourceAsMap?.get("properties")) + as MutableMap + ) + // Node processor function is used to process leaves of index mappings tree + // + val leafNodeProcessor = + fun(fieldName: String, props: MutableMap): Triple> { + val newProps = props.toMutableMap() + if (monitor.dataSources.queryIndexMappingsByType.isNotEmpty()) { + val mappingsByType = monitor.dataSources.queryIndexMappingsByType + if (props.containsKey("type") && mappingsByType.containsKey(props["type"]!!)) { + mappingsByType[props["type"]]?.entries?.forEach { iter: Map.Entry -> + newProps[iter.key] = iter.value + } } } + if (props.containsKey("path")) { + newProps["path"] = "${props["path"]}_${updatedIndexName}_$monitorId" + } + return Triple(fieldName, "${fieldName}_${updatedIndexName}_$monitorId", newProps) } - if (props.containsKey("path")) { - newProps["path"] = "${props["path"]}_${indexName}_$monitorId" - } - return Triple(fieldName, "${fieldName}_${indexName}_$monitorId", newProps) - } - // Traverse and update index mappings here while extracting flatten field paths - val flattenPaths = mutableListOf() - traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) - // Updated mappings ready to be applied on queryIndex - val updatedProperties = properties - // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. - var (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( - monitor, - monitorMetadata, - indexName, - updatedProperties - ) - - if (updateMappingResponse.isAcknowledged) { - doIndexAllQueries(concreteQueryIndex, indexName, monitorId, queries, flattenPaths, refreshPolicy, indexTimeout) + // Traverse and update index mappings here while extracting flatten field paths + val flattenPaths = mutableListOf() + traverseMappingsAndUpdate(properties, "", leafNodeProcessor, flattenPaths) + allFlattenPaths.addAll(flattenPaths) + // Updated mappings ready to be applied on queryIndex + updatedProperties.putAll(properties) + sourceIndexFieldLimit += checkMaxFieldLimit(concreteIndexName) } } } + // Updates mappings of concrete queryIndex. This can rollover queryIndex if field mapping limit is reached. + var (updateMappingResponse, concreteQueryIndex) = updateQueryIndexMappings( + monitor, + monitorMetadata, + updatedIndexName, + sourceIndexFieldLimit, + updatedProperties + ) + + if (updateMappingResponse.isAcknowledged) { + doIndexAllQueries(concreteQueryIndex, updatedIndexName, monitorId, queries, allFlattenPaths, refreshPolicy, indexTimeout) + } } } @@ -302,6 +312,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ monitor: Monitor, monitorMetadata: MonitorMetadata, sourceIndex: String, + sourceIndexFieldLimit: Long, updatedProperties: MutableMap ): Pair { var targetQueryIndex = monitorMetadata.sourceToQueryIndexMapping[sourceIndex + monitor.id] @@ -324,7 +335,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ var updateMappingResponse = AcknowledgedResponse(false) try { // Adjust max field limit in mappings for query index, if needed. - checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) + adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit, targetQueryIndex) updateMappingResponse = client.suspendUntil { client.admin().indices().putMapping(updateMappingRequest, it) } @@ -338,7 +349,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ // Do queryIndex rollover targetQueryIndex = rolloverQueryIndex(monitor) // Adjust max field limit in mappings for new index. - checkAndAdjustMaxFieldLimit(sourceIndex, targetQueryIndex) + adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit, targetQueryIndex) // PUT mappings to newly created index val updateMappingRequest = PutMappingRequest(targetQueryIndex) updateMappingRequest.source(mapOf("properties" to updatedProperties)) @@ -382,21 +393,27 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ * Adjusts max field limit index setting for query index if source index has higher limit. * This will prevent max field limit exception, when source index has more fields then query index limit */ - private suspend fun checkAndAdjustMaxFieldLimit(sourceIndex: String, concreteQueryIndex: String) { + private suspend fun checkMaxFieldLimit(sourceIndex: String): Long { + val getSettingsResponse: GetSettingsResponse = client.suspendUntil { + admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex), it) + } + return getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L + } + + private suspend fun adjustMaxFieldLimitForQueryIndex(sourceIndexFieldLimit: Long, concreteQueryIndex: String) { val getSettingsResponse: GetSettingsResponse = client.suspendUntil { - admin().indices().getSettings(GetSettingsRequest().indices(sourceIndex, concreteQueryIndex), it) + admin().indices().getSettings(GetSettingsRequest().indices(concreteQueryIndex), it) } - val sourceIndexLimit = - getSettingsResponse.getSetting(sourceIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L val queryIndexLimit = getSettingsResponse.getSetting(concreteQueryIndex, INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key)?.toLong() ?: 1000L // Our query index initially has 3 fields we defined and 5 more builtin metadata fields in mappings so we have to account for that - if (sourceIndexLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) { + if (sourceIndexFieldLimit > (queryIndexLimit - QUERY_INDEX_BASE_FIELDS_COUNT)) { val updateSettingsResponse: AcknowledgedResponse = client.suspendUntil { admin().indices().updateSettings( UpdateSettingsRequest(concreteQueryIndex).settings( Settings.builder().put( - INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, sourceIndexLimit + QUERY_INDEX_BASE_FIELDS_COUNT + INDEX_MAPPING_TOTAL_FIELDS_LIMIT_SETTING.key, + sourceIndexFieldLimit + QUERY_INDEX_BASE_FIELDS_COUNT ) ), it diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt index 34afcb11c..ec3bd0031 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/IndexUtils.kt @@ -21,7 +21,6 @@ import org.opensearch.commons.alerting.util.IndexUtils import org.opensearch.core.action.ActionListener import org.opensearch.core.xcontent.NamedXContentRegistry import org.opensearch.core.xcontent.XContentParser -import org.opensearch.index.IndexNotFoundException class IndexUtils { @@ -151,10 +150,6 @@ class IndexUtils { result.addAll(concreteIndices) } - if (result.size == 0) { - throw IndexNotFoundException(indices[0]) - } - return result } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index f29b49a06..20dc4ea84 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -5,10 +5,14 @@ package org.opensearch.alerting +import org.apache.hc.core5.http.ContentType +import org.apache.hc.core5.http.io.entity.StringEntity +import org.opensearch.action.search.SearchResponse import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN import org.opensearch.client.Response import org.opensearch.client.ResponseException +import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.DocLevelMonitorInput @@ -17,6 +21,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy import org.opensearch.commons.alerting.model.action.AlertCategory import org.opensearch.commons.alerting.model.action.PerAlertActionScope import org.opensearch.commons.alerting.model.action.PerExecutionActionScope +import org.opensearch.core.rest.RestStatus import org.opensearch.script.Script import java.time.ZonedDateTime import java.time.format.DateTimeFormatter @@ -495,6 +500,122 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals("Findings saved for test monitor expected 14, 51 and 10", 3, foundFindings.size) } + fun `test no of queries generated for document-level monitor based on wildcard indexes`() { + val testIndex = createTestIndex("test1") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + executeMonitor(monitor.id) + + val request = """{ + "size": 0, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(1L, it.value) } + + val testIndex2 = createTestIndex("test2") + indexDoc(testIndex2, "1", testDoc) + executeMonitor(monitor.id) + + httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(1L, it.value) } + } + + fun `test execute monitor with new index added after first execution that generates alerts and findings from new query`() { + val testIndex = createTestIndex("test1") + val testIndex2 = createTestIndex("test2") + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val docQuery1 = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") + val docQuery2 = DocLevelQuery(query = "test_field_new:\"us-west-2\"", name = "4") + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "5", testDoc) + executeMonitor(monitor.id) + + var alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + var findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + var foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } + assertEquals("Findings saved for test monitor expected 1 and 5", 2, foundFindings.size) + + // clear previous findings and alerts + deleteIndex(ALL_FINDING_INDEX_PATTERN) + deleteIndex(ALL_ALERT_INDEX_PATTERN) + + val testDocNew = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field_new" : "us-west-2" + }""" + + val testIndex3 = createTestIndex("test3") + indexDoc(testIndex3, "10", testDocNew) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery2.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("10|$testIndex3"))) + + alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 1, alerts.size) + + findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 1, findings.size) + + foundFindings = findings.filter { + it.relatedDocIds.contains("10") + } + assertEquals("Findings saved for test monitor expected 10", 1, foundFindings.size) + } + fun `test document-level monitor when alias only has write index with 0 docs`() { // Monitor should execute, but create 0 findings. val alias = createTestAlias(includeWriteIndex = true)