From 928c3c705504f1a8056e07d7d15e4b6bfc701e9a Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Thu, 7 Mar 2024 09:51:29 -0800 Subject: [PATCH 1/2] Add an _exists_ check to document level monitor queries (#1425) * clean up and add integ tests Signed-off-by: Joanne Wang * refactored out common method and renamed test Signed-off-by: Joanne Wang * remove _exists_ flag Signed-off-by: Joanne Wang --------- Signed-off-by: Joanne Wang --- .../alerting/util/DocLevelMonitorQueries.kt | 22 ++ .../alerting/DocumentMonitorRunnerIT.kt | 344 ++++++++++++++++++ 2 files changed, 366 insertions(+) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt index 3e0e91d76..52ef1d26f 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/util/DocLevelMonitorQueries.kt @@ -396,6 +396,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ var query = it.query conflictingPaths.forEach { conflictingPath -> if (query.contains(conflictingPath)) { + query = transformExistsQuery(query, conflictingPath, "", monitorId) query = query.replace("$conflictingPath:", "${conflictingPath}__$monitorId:") filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!) } @@ -418,6 +419,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ var query = it.query flattenPaths.forEach { fieldPath -> if (!conflictingPaths.contains(fieldPath.first)) { + query = transformExistsQuery(query, fieldPath.first, sourceIndex, monitorId) query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:") } } @@ -448,6 +450,26 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ } } + /** + * Transforms the query if it includes an _exists_ clause to append the index name and the monitor id to the field value + */ + private fun transformExistsQuery(query: String, conflictingPath: String, indexName: String, monitorId: String): String { + return query + .replace("_exists_: ", "_exists_:") // remove space to read exists query as one string + .split("\\s+".toRegex()) + .joinToString(separator = " ") { segment -> + if (segment.contains("_exists_:")) { + val trimSegement = segment.trim { it == '(' || it == ')' } // remove any delimiters from ends + val (_, value) = trimSegement.split(":", limit = 2) // split into key and value + val newString = if (value == conflictingPath) + segment.replace(conflictingPath, "${conflictingPath}_${indexName}_$monitorId") else segment + newString + } else { + segment + } + } + } + private suspend fun updateQueryIndexMappings( monitor: Monitor, monitorMetadata: MonitorMetadata, diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index f73ec2222..c8021abd7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -1997,6 +1997,350 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { assertEquals(1, output.objectMap("trigger_results").values.size) } + fun `test execute monitor generates alerts and findings with NOT EQUALS query and EXISTS query`() { + val testIndex = createTestIndex() + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val query = "NOT test_field: \"us-east-1\" AND _exists_: test_field" + val docQuery = DocLevelQuery(query = query, name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex, "5", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + } + + fun `test document-level monitor when index alias contain docs that do match a NOT EQUALS query and EXISTS query`() { + val aliasName = "test-alias" + createIndexAlias( + aliasName, + """ + "properties" : { + "test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" }, + "test_field" : { "type" : "keyword" }, + "number" : { "type" : "keyword" } + } + """.trimIndent() + ) + + val docQuery = DocLevelQuery(query = "NOT test_field:\"us-east-1\" AND _exists_: test_field", name = "3", fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("$aliasName"), listOf(docQuery)) + + val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id) + val monitor = createMonitor( + randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action))) + ) + ) + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "@timestamp": "$testTime", + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(aliasName, "1", testDoc) + var response = executeMonitor(monitor.id) + var output = entityAsMap(response) + var searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + var matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + rolloverDatastream(aliasName) + indexDoc(aliasName, "2", testDoc) + response = executeMonitor(monitor.id) + output = entityAsMap(response) + searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 1, matchingDocsToQuery.size) + + deleteIndexAlias(aliasName) + } + + fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS and EXISTS query operator`() { + val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}" + val testQueryName = "wildcard-test-query" + val testIndex = createTestIndex("${testIndexPrefix}1") + val testIndex2 = createTestIndex("${testIndexPrefix}2") + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + val query = "NOT test_field:\"us-west-1\" AND _exists_: test_field" + val docQuery = DocLevelQuery(query = query, name = testQueryName, fields = listOf()) + val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]")) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "5", testDoc) + + val response = executeMonitor(monitor.id) + + val output = entityAsMap(response) + + assertEquals(monitor.name, output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult[docQuery.id] as List + assertEquals("Incorrect search result", 2, matchingDocsToQuery.size) + assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex2"))) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") } + assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size) + } + + fun `test execute monitor with indices having fields with same name different field mappings in multiple indices with NOT EQUALS`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "test_field" : { + "type":"keyword" + } + } + """.trimIndent() + ) + + val testIndex4 = createTestIndex( + "test4", + """"properties": { + "source": { + "properties": { + "device": { + "properties": { + "hwd": { + "properties": { + "id": { + "type":"text" + } + } + } + } + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + + val testDoc1 = """{ + "source" : {"device" : {"hwd" : {"id" : "123456"}} }, + "nested_field": { "test1": "some text" } + }""" + val testDoc2 = """{ + "nested_field": { "test1": "some text" }, + "test_field": "123456" + }""" + + val docQuery1 = DocLevelQuery( + query = "NOT test_field:\"12345\" AND _exists_: test_field", + name = "4", + fields = listOf() + ) + val docQuery2 = DocLevelQuery( + query = "NOT source.device.hwd.id:\"12345\" AND _exists_: source.device.hwd.id", + name = "5", + fields = listOf() + ) + + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex4, "1", testDoc1) + indexDoc(testIndex2, "1", testDoc2) + indexDoc(testIndex, "1", testDoc1) + indexDoc(testIndex, "2", testDoc2) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 4, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 4, findings.size) + + val request = """{ + "size": 0, + "query": { + "match_all": {} + } + }""" + val httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + + val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.totalHits?.let { assertEquals(5L, it.value) } + } + + fun `test execute monitor with indices having fields with same name but different field mappings with NOT EQUALS`() { + val testIndex = createTestIndex( + "test1", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text", + "analyzer":"whitespace" + } + } + }, + "test_field" : { + "type":"text", + "analyzer":"whitespace" + } + } + """.trimIndent() + ) + + val testIndex2 = createTestIndex( + "test2", + """"properties": { + "source": { + "properties": { + "id": { + "type":"text" + } + } + }, + "test_field" : { + "type":"text" + } + } + """.trimIndent() + ) + val testDoc = """{ + "source" : {"id" : "12345" }, + "nested_field": { "test1": "some text" }, + "test_field": "12345" + }""" + + val docQuery = DocLevelQuery( + query = "(NOT test_field:\"123456\" AND _exists_:test_field) AND source.id:\"12345\"", + name = "5", + fields = listOf() + ) + val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger))) + assertNotNull(monitor.id) + + indexDoc(testIndex, "1", testDoc) + indexDoc(testIndex2, "1", testDoc) + + executeMonitor(monitor.id) + + val alerts = searchAlertsWithFilter(monitor) + assertEquals("Alert saved for test monitor", 2, alerts.size) + + val findings = searchFindings(monitor) + assertEquals("Findings saved for test monitor", 2, findings.size) + + // as mappings of source.id & test_field are different so, both of them expands + val expectedQueries = listOf( + "(NOT test_field_test2_${monitor.id}:\"123456\" AND _exists_:test_field_test2_${monitor.id}) " + + "AND source.id_test2_${monitor.id}:\"12345\"", + "(NOT test_field_test1_${monitor.id}:\"123456\" AND _exists_:test_field_test1_${monitor.id}) " + + "AND source.id_test1_${monitor.id}:\"12345\"" + ) + + val request = """{ + "size": 10, + "query": { + "match_all": {} + } + }""" + var httpResponse = adminClient().makeRequest( + "GET", "/${monitor.dataSources.queryIndex}/_search", + StringEntity(request, ContentType.APPLICATION_JSON) + ) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + searchResponse.hits.forEach { hit -> + val query = ((hit.sourceAsMap["query"] as Map)["query_string"] as Map)["query"] + assertTrue(expectedQueries.contains(query)) + } + } + @Suppress("UNCHECKED_CAST") /** helper that returns a field in a json map whose values are all json objects */ private fun Map.objectMap(key: String): Map> { From 3696eafe311992a2fa504b1d70f3e6de78cf1598 Mon Sep 17 00:00:00 2001 From: Joanne Wang Date: Thu, 7 Mar 2024 17:23:39 -0800 Subject: [PATCH 2/2] fix integ test Signed-off-by: Joanne Wang --- .../kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index c8021abd7..226714075 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2034,8 +2034,8 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { val findings = searchFindings(monitor) assertEquals("Findings saved for test monitor", 2, findings.size) - assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) - assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5")) + assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5")) } fun `test document-level monitor when index alias contain docs that do match a NOT EQUALS query and EXISTS query`() {