Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] Add an _exists_ check to document level monitor queries (#1425) #1456

Merged
merged 2 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
var query = it.query
conflictingPaths.forEach { conflictingPath ->
if (query.contains(conflictingPath)) {
query = transformExistsQuery(query, conflictingPath, "<index>", monitorId)
query = query.replace("$conflictingPath:", "${conflictingPath}_<index>_$monitorId:")
filteredConcreteIndices.addAll(conflictingPathToConcreteIndices[conflictingPath]!!)
}
Expand All @@ -418,6 +419,7 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
var query = it.query
flattenPaths.forEach { fieldPath ->
if (!conflictingPaths.contains(fieldPath.first)) {
query = transformExistsQuery(query, fieldPath.first, sourceIndex, monitorId)
query = query.replace("${fieldPath.first}:", "${fieldPath.first}_${sourceIndex}_$monitorId:")
}
}
Expand Down Expand Up @@ -448,6 +450,26 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ
}
}

/**
* Transforms the query if it includes an _exists_ clause to append the index name and the monitor id to the field value
*/
private fun transformExistsQuery(query: String, conflictingPath: String, indexName: String, monitorId: String): String {
return query
.replace("_exists_: ", "_exists_:") // remove space to read exists query as one string
.split("\\s+".toRegex())
.joinToString(separator = " ") { segment ->
if (segment.contains("_exists_:")) {
val trimSegement = segment.trim { it == '(' || it == ')' } // remove any delimiters from ends
val (_, value) = trimSegement.split(":", limit = 2) // split into key and value
val newString = if (value == conflictingPath)
segment.replace(conflictingPath, "${conflictingPath}_${indexName}_$monitorId") else segment
newString
} else {
segment
}
}
}

private suspend fun updateQueryIndexMappings(
monitor: Monitor,
monitorMetadata: MonitorMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1997,6 +1997,350 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertEquals(1, output.objectMap("trigger_results").values.size)
}

fun `test execute monitor generates alerts and findings with NOT EQUALS query and EXISTS query`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val query = "NOT test_field: \"us-east-1\" AND _exists_: test_field"
val docQuery = DocLevelQuery(query = query, name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex")))

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)
assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5"))
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1") || findings[0].relatedDocIds.contains("5"))
}

fun `test document-level monitor when index alias contain docs that do match a NOT EQUALS query and EXISTS query`() {
val aliasName = "test-alias"
createIndexAlias(
aliasName,
"""
"properties" : {
"test_strict_date_time" : { "type" : "date", "format" : "strict_date_time" },
"test_field" : { "type" : "keyword" },
"number" : { "type" : "keyword" }
}
""".trimIndent()
)

val docQuery = DocLevelQuery(query = "NOT test_field:\"us-east-1\" AND _exists_: test_field", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf("$aliasName"), listOf(docQuery))

val action = randomAction(template = randomTemplateScript("Hello {{ctx.monitor.name}}"), destinationId = createDestination().id)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(randomDocumentLevelTrigger(condition = ALWAYS_RUN, actions = listOf(action)))
)
)

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"@timestamp": "$testTime",
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""
indexDoc(aliasName, "1", testDoc)
var response = executeMonitor(monitor.id)
var output = entityAsMap(response)
var searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
var matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)

rolloverDatastream(aliasName)
indexDoc(aliasName, "2", testDoc)
response = executeMonitor(monitor.id)
output = entityAsMap(response)
searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 1, matchingDocsToQuery.size)

deleteIndexAlias(aliasName)
}

fun `test execute monitor with wildcard index that generates alerts and findings for NOT EQUALS and EXISTS query operator`() {
val testIndexPrefix = "test-index-${randomAlphaOfLength(10).lowercase(Locale.ROOT)}"
val testQueryName = "wildcard-test-query"
val testIndex = createTestIndex("${testIndexPrefix}1")
val testIndex2 = createTestIndex("${testIndexPrefix}2")

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val query = "NOT test_field:\"us-west-1\" AND _exists_: test_field"
val docQuery = DocLevelQuery(query = query, name = testQueryName, fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf("$testIndexPrefix*"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = Script("query[name=$testQueryName]"))
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex2, "5", testDoc)

val response = executeMonitor(monitor.id)

val output = entityAsMap(response)

assertEquals(monitor.name, output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult[docQuery.id] as List<String>
assertEquals("Incorrect search result", 2, matchingDocsToQuery.size)
assertTrue("Incorrect search result", matchingDocsToQuery.containsAll(listOf("1|$testIndex", "5|$testIndex2")))

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)
val foundFindings = findings.filter { it.relatedDocIds.contains("1") || it.relatedDocIds.contains("5") }
assertEquals("Didn't find findings for docs 1 and 5", 2, foundFindings.size)
}

fun `test execute monitor with indices having fields with same name different field mappings in multiple indices with NOT EQUALS`() {
val testIndex = createTestIndex(
"test1",
""""properties": {
"source": {
"properties": {
"device": {
"properties": {
"hwd": {
"properties": {
"id": {
"type":"text",
"analyzer":"whitespace"
}
}
}
}
}
}
},
"test_field" : {
"type":"text"
}
}
""".trimIndent()
)

val testIndex2 = createTestIndex(
"test2",
""""properties": {
"test_field" : {
"type":"keyword"
}
}
""".trimIndent()
)

val testIndex4 = createTestIndex(
"test4",
""""properties": {
"source": {
"properties": {
"device": {
"properties": {
"hwd": {
"properties": {
"id": {
"type":"text"
}
}
}
}
}
}
},
"test_field" : {
"type":"text"
}
}
""".trimIndent()
)

val testDoc1 = """{
"source" : {"device" : {"hwd" : {"id" : "123456"}} },
"nested_field": { "test1": "some text" }
}"""
val testDoc2 = """{
"nested_field": { "test1": "some text" },
"test_field": "123456"
}"""

val docQuery1 = DocLevelQuery(
query = "NOT test_field:\"12345\" AND _exists_: test_field",
name = "4",
fields = listOf()
)
val docQuery2 = DocLevelQuery(
query = "NOT source.device.hwd.id:\"12345\" AND _exists_: source.device.hwd.id",
name = "5",
fields = listOf()
)

val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery1, docQuery2))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex4, "1", testDoc1)
indexDoc(testIndex2, "1", testDoc2)
indexDoc(testIndex, "1", testDoc1)
indexDoc(testIndex, "2", testDoc2)

executeMonitor(monitor.id)

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 4, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 4, findings.size)

val request = """{
"size": 0,
"query": {
"match_all": {}
}
}"""
val httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.totalHits?.let { assertEquals(5L, it.value) }
}

fun `test execute monitor with indices having fields with same name but different field mappings with NOT EQUALS`() {
val testIndex = createTestIndex(
"test1",
""""properties": {
"source": {
"properties": {
"id": {
"type":"text",
"analyzer":"whitespace"
}
}
},
"test_field" : {
"type":"text",
"analyzer":"whitespace"
}
}
""".trimIndent()
)

val testIndex2 = createTestIndex(
"test2",
""""properties": {
"source": {
"properties": {
"id": {
"type":"text"
}
}
},
"test_field" : {
"type":"text"
}
}
""".trimIndent()
)
val testDoc = """{
"source" : {"id" : "12345" },
"nested_field": { "test1": "some text" },
"test_field": "12345"
}"""

val docQuery = DocLevelQuery(
query = "(NOT test_field:\"123456\" AND _exists_:test_field) AND source.id:\"12345\"",
name = "5",
fields = listOf()
)
val docLevelInput = DocLevelMonitorInput("description", listOf("test*"), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(randomDocumentLevelMonitor(inputs = listOf(docLevelInput), triggers = listOf(trigger)))
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex2, "1", testDoc)

executeMonitor(monitor.id)

val alerts = searchAlertsWithFilter(monitor)
assertEquals("Alert saved for test monitor", 2, alerts.size)

val findings = searchFindings(monitor)
assertEquals("Findings saved for test monitor", 2, findings.size)

// as mappings of source.id & test_field are different so, both of them expands
val expectedQueries = listOf(
"(NOT test_field_test2_${monitor.id}:\"123456\" AND _exists_:test_field_test2_${monitor.id}) " +
"AND source.id_test2_${monitor.id}:\"12345\"",
"(NOT test_field_test1_${monitor.id}:\"123456\" AND _exists_:test_field_test1_${monitor.id}) " +
"AND source.id_test1_${monitor.id}:\"12345\""
)

val request = """{
"size": 10,
"query": {
"match_all": {}
}
}"""
var httpResponse = adminClient().makeRequest(
"GET", "/${monitor.dataSources.queryIndex}/_search",
StringEntity(request, ContentType.APPLICATION_JSON)
)
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
var searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
searchResponse.hits.forEach { hit ->
val query = ((hit.sourceAsMap["query"] as Map<String, Any>)["query_string"] as Map<String, Any>)["query"]
assertTrue(expectedQueries.contains(query))
}
}

@Suppress("UNCHECKED_CAST")
/** helper that returns a field in a json map whose values are all json objects */
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
Expand Down
Loading