From 9919e19490397d401dec4c5e5a83c005a626a99b Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 13 Jul 2023 16:00:15 -0700 Subject: [PATCH] add workflow type field exists check in search monitors action to return both workflows and monitors on search (#1026) * add workflow type field exists check in search monitors action to retunr both workflows and monitors on search Signed-off-by: Surya Sashank Nistala * remove .get() invocation on future and replace with suspendUntil call for search Associated monitors Signed-off-by: Surya Sashank Nistala * add workflowIds param in rest get alerts action Signed-off-by: Surya Sashank Nistala --------- Signed-off-by: Surya Sashank Nistala (cherry picked from commit 064e5f5912088bf63a6da049b46abd8b41cede0c) --- .../transport/TransportGetMonitorAction.kt | 40 ++++-- .../alerting/MonitorDataSourcesIT.kt | 131 ++++++++++++++++++ 2 files changed, 156 insertions(+), 15 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt index 470f990fd..ae5b01f33 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt @@ -5,13 +5,15 @@ package org.opensearch.alerting.transport +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.apache.lucene.search.join.ScoreMode import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse -import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters @@ -20,6 +22,7 @@ import org.opensearch.alerting.action.GetMonitorAction import org.opensearch.alerting.action.GetMonitorRequest import org.opensearch.alerting.action.GetMonitorResponse import org.opensearch.alerting.action.GetMonitorResponse.AssociatedWorkflow +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH @@ -42,6 +45,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportGetMonitorAction @Inject constructor( transportService: TransportService, @@ -117,18 +121,24 @@ class TransportGetMonitorAction @Inject constructor( } } } - - actionListener.onResponse( - GetMonitorResponse( - response.id, - response.version, - response.seqNo, - response.primaryTerm, - RestStatus.OK, - monitor, - getAssociatedWorkflows(response.id) - ) - ) + try { + scope.launch { + val associatedCompositeMonitors = getAssociatedWorkflows(response.id) + actionListener.onResponse( + GetMonitorResponse( + response.id, + response.version, + response.seqNo, + response.primaryTerm, + RestStatus.OK, + monitor, + associatedCompositeMonitors + ) + ) + } + } catch (e: Exception) { + log.error("Failed to get associate workflows in get monitor action", e) + } } override fun onFailure(t: Exception) { @@ -139,7 +149,7 @@ class TransportGetMonitorAction @Inject constructor( } } - private fun getAssociatedWorkflows(id: String): List { + private suspend fun getAssociatedWorkflows(id: String): List { try { val associatedWorkflows = mutableListOf() val queryBuilder = QueryBuilders.nestedQuery( @@ -155,7 +165,7 @@ class TransportGetMonitorAction @Inject constructor( val searchRequest = SearchRequest() .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) .source(SearchSourceBuilder().query(queryBuilder).fetchField("_id")) - val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get() + val response: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } for (hit in response.hits) { XContentType.JSON.xContent().createParser( diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 2650fac91..c19e8b556 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -303,6 +303,65 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Findings saved for test monitor", 4, findings.size) } + fun `test execute monitor without triggers`() { + val docQuery = DocLevelQuery(query = "eventType:\"login\"", name = "3") + + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + val testDoc = """{ + "eventType" : "login" + }""" + indexDoc(index, "1", testDoc) + + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + // Execute dry run first and expect no alerts or findings + var executeMonitorResponse = executeMonitor(monitor, id, true) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + var table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.isEmpty()) + var findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + + // Execute real run - expect findings, but no alerts + executeMonitorResponse = executeMonitor(monitor, id, false) + + searchAlerts(id) + table = Table("asc", "id", null, 1, 0, "") + getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.isEmpty()) + + findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size) + } + fun `test execute monitor with custom query index`() { val q1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val q2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") @@ -1393,6 +1452,78 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1) } + fun `test execute pre-existing monitor without triggers`() { + val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings()) + .settings(Settings.builder().put("index.hidden", true).build()) + client().admin().indices().create(request) + val monitorStringWithoutName = """ + { + "monitor": { + "type": "monitor", + "schema_version": 0, + "name": "UayEuXpZtb", + "monitor_type": "doc_level_monitor", + "user": { + "name": "", + "backend_roles": [], + "roles": [], + "custom_attribute_names": [], + "user_requested_tenant": null + }, + "enabled": true, + "enabled_time": 1662753436791, + "schedule": { + "period": { + "interval": 5, + "unit": "MINUTES" + } + }, + "inputs": [{ + "doc_level_input": { + "description": "description", + "indices": [ + "$index" + ], + "queries": [{ + "id": "63efdcce-b5a1-49f4-a25f-6b5f9496a755", + "name": "3", + "query": "test_field:\"us-west-2\"", + "tags": [] + }] + } + }], + "triggers": [], + "last_update_time": 1662753436791 + } + } + """.trimIndent() + val monitorId = "abc" + indexDoc(SCHEDULED_JOBS_INDEX, monitorId, monitorStringWithoutName) + val getMonitorResponse = getMonitorResponse(monitorId) + Assert.assertNotNull(getMonitorResponse) + Assert.assertNotNull(getMonitorResponse.monitor) + val monitor = getMonitorResponse.monitor + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(index, "1", testDoc) + var executeMonitorResponse = executeMonitor(monitor!!, monitorId, false) + Assert.assertNotNull(executeMonitorResponse) + if (executeMonitorResponse != null) { + Assert.assertNotNull(executeMonitorResponse.monitorRunResult.monitorName) + } + val alerts = searchAlerts(monitorId) + assertEquals(0, alerts.size) + + val findings = searchFindings(monitorId) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + } + fun `test execute monitor with empty source index`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery))