From 064e5f5912088bf63a6da049b46abd8b41cede0c Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Thu, 13 Jul 2023 16:00:15 -0700 Subject: [PATCH] add workflow type field exists check in search monitors action to return both workflows and monitors on search (#1026) * add workflow type field exists check in search monitors action to retunr both workflows and monitors on search Signed-off-by: Surya Sashank Nistala * remove .get() invocation on future and replace with suspendUntil call for search Associated monitors Signed-off-by: Surya Sashank Nistala * add workflowIds param in rest get alerts action Signed-off-by: Surya Sashank Nistala --------- Signed-off-by: Surya Sashank Nistala --- .../resthandler/RestGetAlertsAction.kt | 7 +- .../transport/TransportGetMonitorAction.kt | 40 ++-- .../transport/TransportSearchMonitorAction.kt | 5 +- .../alerting/AlertingRestTestCase.kt | 12 ++ .../alerting/MonitorDataSourcesIT.kt | 199 ++++++++++++++++++ .../alerting/resthandler/WorkflowRestApiIT.kt | 24 ++- 6 files changed, 269 insertions(+), 18 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt index 4ba14d59a..cf83cec8c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt @@ -57,6 +57,11 @@ class RestGetAlertsAction : BaseRestHandler() { val severityLevel = request.param("severityLevel", "ALL") val alertState = request.param("alertState", "ALL") val monitorId: String? = request.param("monitorId") + val workflowId: String? = request.param("workflowIds") + val workflowIds = mutableListOf() + if (workflowId.isNullOrEmpty() == false) { + workflowIds.add(workflowId) + } val table = Table( sortOrder, sortString, @@ -66,7 +71,7 @@ class RestGetAlertsAction : BaseRestHandler() { searchString ) - val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null) + val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null, workflowIds = workflowIds) return RestChannelConsumer { channel -> client.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, getAlertsRequest, RestToXContentListener(channel)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt index 470f990fd..ae5b01f33 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt @@ -5,13 +5,15 @@ package org.opensearch.alerting.transport +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.apache.lucene.search.join.ScoreMode import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener import org.opensearch.action.get.GetRequest import org.opensearch.action.get.GetResponse -import org.opensearch.action.search.SearchAction import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters @@ -20,6 +22,7 @@ import org.opensearch.alerting.action.GetMonitorAction import org.opensearch.alerting.action.GetMonitorRequest import org.opensearch.alerting.action.GetMonitorResponse import org.opensearch.alerting.action.GetMonitorResponse.AssociatedWorkflow +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH @@ -42,6 +45,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportGetMonitorAction @Inject constructor( transportService: TransportService, @@ -117,18 +121,24 @@ class TransportGetMonitorAction @Inject constructor( } } } - - actionListener.onResponse( - GetMonitorResponse( - response.id, - response.version, - response.seqNo, - response.primaryTerm, - RestStatus.OK, - monitor, - getAssociatedWorkflows(response.id) - ) - ) + try { + scope.launch { + val associatedCompositeMonitors = getAssociatedWorkflows(response.id) + actionListener.onResponse( + GetMonitorResponse( + response.id, + response.version, + response.seqNo, + response.primaryTerm, + RestStatus.OK, + monitor, + associatedCompositeMonitors + ) + ) + } + } catch (e: Exception) { + log.error("Failed to get associate workflows in get monitor action", e) + } } override fun onFailure(t: Exception) { @@ -139,7 +149,7 @@ class TransportGetMonitorAction @Inject constructor( } } - private fun getAssociatedWorkflows(id: String): List { + private suspend fun getAssociatedWorkflows(id: String): List { try { val associatedWorkflows = mutableListOf() val queryBuilder = QueryBuilders.nestedQuery( @@ -155,7 +165,7 @@ class TransportGetMonitorAction @Inject constructor( val searchRequest = SearchRequest() .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) .source(SearchSourceBuilder().query(queryBuilder).fetchField("_id")) - val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get() + val response: SearchResponse = client.suspendUntil { client.search(searchRequest, it) } for (hit in response.hits) { XContentType.JSON.xContent().createParser( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index ac8a81a30..8bf55070e 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -22,6 +22,7 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.authuser.User import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.ExistsQueryBuilder @@ -62,7 +63,9 @@ class TransportSearchMonitorAction @Inject constructor( // When querying the ALL_ALERT_INDEX_PATTERN, we don't want to check whether the MONITOR_TYPE field exists // because we're querying alert indexes. if (searchMonitorRequest.searchRequest.indices().contains(ScheduledJob.SCHEDULED_JOBS_INDEX)) { - queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) + val monitorWorkflowType = QueryBuilders.boolQuery().should(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) + .should(QueryBuilders.existsQuery(Workflow.WORKFLOW_TYPE)) + queryBuilder.must(monitorWorkflowType) } searchSourceBuilder.query(queryBuilder) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index dccb67536..a79d424b7 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -883,6 +883,18 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return GetFindingsResponse(response.restStatus(), totalFindings, findings) } + protected fun searchMonitors(): SearchResponse { + var baseEndpoint = "${AlertingPlugin.MONITOR_BASE_URI}/_search?" + val request = """ + { "version" : true, + "query": { "match_all": {} } + } + """.trimIndent() + val httpResponse = adminClient().makeRequest("POST", baseEndpoint, StringEntity(request, APPLICATION_JSON)) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + return SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) + } + protected fun indexDoc(index: String, id: String, doc: String, refresh: Boolean = true): Response { return indexDoc(client(), index, id, doc, refresh) } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 3fc8ae4d6..ff3180cd4 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -303,6 +303,65 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { assertEquals("Findings saved for test monitor", 4, findings.size) } + fun `test execute monitor without triggers`() { + val docQuery = DocLevelQuery(query = "eventType:\"login\"", name = "3") + + val docLevelInput = DocLevelMonitorInput( + "description", listOf(index), listOf(docQuery) + ) + val customFindingsIndex = "custom_findings_index" + val customFindingsIndexPattern = "custom_findings_index-1" + val customQueryIndex = "custom_alerts_index" + var monitor = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput), + triggers = listOf(), + dataSources = DataSources( + queryIndex = customQueryIndex, + findingsIndex = customFindingsIndex, + findingsIndexPattern = customFindingsIndexPattern + ) + ) + val monitorResponse = createMonitor(monitor) + assertFalse(monitorResponse?.id.isNullOrEmpty()) + + val testDoc = """{ + "eventType" : "login" + }""" + indexDoc(index, "1", testDoc) + + monitor = monitorResponse!!.monitor + val id = monitorResponse.id + // Execute dry run first and expect no alerts or findings + var executeMonitorResponse = executeMonitor(monitor, id, true) + Assert.assertEquals(executeMonitorResponse!!.monitorRunResult.monitorName, monitor.name) + Assert.assertEquals(executeMonitorResponse.monitorRunResult.triggerResults.size, 0) + searchAlerts(id) + var table = Table("asc", "id", null, 1, 0, "") + var getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.isEmpty()) + var findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 0, findings.size) + + // Execute real run - expect findings, but no alerts + executeMonitorResponse = executeMonitor(monitor, id, false) + + searchAlerts(id) + table = Table("asc", "id", null, 1, 0, "") + getAlertsResponse = client() + .execute(AlertingActions.GET_ALERTS_ACTION_TYPE, GetAlertsRequest(table, "ALL", "ALL", null, null)) + .get() + Assert.assertTrue(getAlertsResponse != null) + Assert.assertTrue(getAlertsResponse.alerts.isEmpty()) + + findings = searchFindings(id, customFindingsIndex) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + assertEquals("Didn't match query", 1, findings[0].docLevelQueries.size) + } + fun `test execute monitor with custom query index`() { val q1 = DocLevelQuery(query = "source.ip.v6.v1:12345", name = "3") val q2 = DocLevelQuery(query = "source.ip.v6.v2:16645", name = "4") @@ -1393,6 +1452,78 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { Assert.assertEquals(searchMonitorResponse.hits.hits.size, 1) } + fun `test execute pre-existing monitor without triggers`() { + val request = CreateIndexRequest(SCHEDULED_JOBS_INDEX).mapping(ScheduledJobIndices.scheduledJobMappings()) + .settings(Settings.builder().put("index.hidden", true).build()) + client().admin().indices().create(request) + val monitorStringWithoutName = """ + { + "monitor": { + "type": "monitor", + "schema_version": 0, + "name": "UayEuXpZtb", + "monitor_type": "doc_level_monitor", + "user": { + "name": "", + "backend_roles": [], + "roles": [], + "custom_attribute_names": [], + "user_requested_tenant": null + }, + "enabled": true, + "enabled_time": 1662753436791, + "schedule": { + "period": { + "interval": 5, + "unit": "MINUTES" + } + }, + "inputs": [{ + "doc_level_input": { + "description": "description", + "indices": [ + "$index" + ], + "queries": [{ + "id": "63efdcce-b5a1-49f4-a25f-6b5f9496a755", + "name": "3", + "query": "test_field:\"us-west-2\"", + "tags": [] + }] + } + }], + "triggers": [], + "last_update_time": 1662753436791 + } + } + """.trimIndent() + val monitorId = "abc" + indexDoc(SCHEDULED_JOBS_INDEX, monitorId, monitorStringWithoutName) + val getMonitorResponse = getMonitorResponse(monitorId) + Assert.assertNotNull(getMonitorResponse) + Assert.assertNotNull(getMonitorResponse.monitor) + val monitor = getMonitorResponse.monitor + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + indexDoc(index, "1", testDoc) + var executeMonitorResponse = executeMonitor(monitor!!, monitorId, false) + Assert.assertNotNull(executeMonitorResponse) + if (executeMonitorResponse != null) { + Assert.assertNotNull(executeMonitorResponse.monitorRunResult.monitorName) + } + val alerts = searchAlerts(monitorId) + assertEquals(0, alerts.size) + + val findings = searchFindings(monitorId) + assertEquals("Findings saved for test monitor", 1, findings.size) + assertTrue("Findings saved for test monitor", findings[0].relatedDocIds.contains("1")) + } + fun `test execute monitor with empty source index`() { val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3") val docLevelInput = DocLevelMonitorInput("description", listOf(index), listOf(docQuery)) @@ -5441,4 +5572,72 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } Assert.assertTrue(alerts.stream().anyMatch { it.state == Alert.State.DELETED && chainedAlerts[0].id == it.id }) } + + fun `test postDelete on workflow deletion`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val andTrigger = randomChainedAlertTrigger( + name = "1And2", + condition = Script("monitor[id=${monitorResponse.id}] && monitor[id=${monitorResponse2.id}]") + ) + val notTrigger = randomChainedAlertTrigger( + name = "Not1OrNot2", + condition = Script("!monitor[id=${monitorResponse.id}] || !monitor[id=${monitorResponse2.id}]") + ) + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id), + triggers = listOf(andTrigger) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + val workflowId = workflowById!!.id + var executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + var res = getWorkflowAlerts( + workflowId, + ) + var chainedAlerts = res.alerts + Assert.assertTrue(chainedAlerts.size == 1) + val deleteRes = deleteWorkflow(workflowId, false) + logger.info(deleteRes) + OpenSearchTestCase.waitUntil({ + val searchRequest = SearchRequest(AlertIndices.ALERT_HISTORY_ALL) + val sr = client().search(searchRequest).get() + sr.hits.hits.size == 3 + }, 5, TimeUnit.MINUTES) + val searchRequest = SearchRequest(AlertIndices.ALERT_HISTORY_ALL) + val sr = client().search(searchRequest).get() + Assert.assertTrue(sr.hits.hits.size == 3) + val alerts = sr.hits.map { hit -> + val xcp = XContentHelper.createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val alert = Alert.parse(xcp, hit.id, hit.version) + alert + } + Assert.assertTrue(alerts.stream().anyMatch { it.state == Alert.State.DELETED && chainedAlerts[0].id == it.id }) + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index de31fab55..e7cc5eddb 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -1079,7 +1079,18 @@ class WorkflowRestApiIT : AlertingRestTestCase() { "test_value_1" ) ) - + val searchMonitorResponse = searchMonitors() + logger.error(searchMonitorResponse) + val jobsList = searchMonitorResponse.hits.toList() + var numMonitors = 0 + var numWorkflows = 0 + jobsList.forEach { + val map = it.sourceAsMap + if (map["type"] == "workflow") numWorkflows++ + else if (map["type"] == "monitor") numMonitors++ + } + Assert.assertEquals(numMonitors, 2) + Assert.assertEquals(numWorkflows, 1) val response = executeWorkflow(workflowId = workflowId, params = emptyMap()) val executeWorkflowResponse = entityAsMap(response) logger.info(executeWorkflowResponse) @@ -1101,6 +1112,17 @@ class WorkflowRestApiIT : AlertingRestTestCase() { Assert.assertEquals(alerts[0]["monitor_id"], "") val associatedAlerts = getWorkflowAlerts["associatedAlerts"] as List> assertEquals(associatedAlerts.size, 2) + + val getAlertsRes = getAlerts(java.util.Map.of("workflowIds", listOf(workflowId))) + val getAlertsMap = getAlertsRes.asMap() + Assert.assertTrue(getAlertsMap.containsKey("alerts")) + val getAlertsAlerts = getWorkflowAlerts["alerts"] as List> + assertEquals(alerts.size, 1) + Assert.assertEquals(getAlertsAlerts[0]["execution_id"], executionId) + Assert.assertEquals(getAlertsAlerts[0]["workflow_id"], workflowId) + Assert.assertEquals(getAlertsAlerts[0]["monitor_id"], "") + Assert.assertEquals(getAlertsAlerts[0]["id"], alerts[0]["id"]) + val ackRes = acknowledgeChainedAlerts(workflowId, alerts[0]["id"].toString()) val acknowledgeChainedAlertsResponse = entityAsMap(ackRes) val acknowledged = acknowledgeChainedAlertsResponse["success"] as List