diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt index c7742d1c7..dfe3be6f0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/alerts/AlertMover.kt @@ -28,6 +28,7 @@ import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType import org.opensearch.commons.alerting.model.Alert import org.opensearch.commons.alerting.model.CompositeInput +import org.opensearch.commons.alerting.model.DataSources import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob import org.opensearch.commons.alerting.model.Workflow @@ -166,9 +167,7 @@ class AlertMover { monitorCtx.xContentRegistry!!, response = getResponse ) - /** check if alert index is initialized **/ - if (monitorCtx.alertIndices!!.isAlertInitialized(monitor.dataSources) == false) - return + alertIndex = monitor.dataSources.alertsIndex alertHistoryIndex = if (monitor.dataSources.alertsHistoryIndex == null) alertHistoryIndex @@ -181,8 +180,12 @@ class AlertMover { } } } + val dataSources = DataSources().copy(alertsHistoryIndex = alertHistoryIndex, alertsIndex = alertIndex) + /** check if alert index is initialized **/ + if (monitorCtx.alertIndices!!.isAlertInitialized(dataSources) == false) + return val boolQuery = QueryBuilders.boolQuery() - .filter(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId)) + .must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId)) if (workflow != null) { boolQuery.mustNot(QueryBuilders.termsQuery(Alert.TRIGGER_ID_FIELD, workflow.triggers.map { it.id })) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt index 4ba14d59a..cf83cec8c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/resthandler/RestGetAlertsAction.kt @@ -57,6 +57,11 @@ class RestGetAlertsAction : BaseRestHandler() { val severityLevel = request.param("severityLevel", "ALL") val alertState = request.param("alertState", "ALL") val monitorId: String? = request.param("monitorId") + val workflowId: String? = request.param("workflowIds") + val workflowIds = mutableListOf() + if (workflowId.isNullOrEmpty() == false) { + workflowIds.add(workflowId) + } val table = Table( sortOrder, sortString, @@ -66,7 +71,7 @@ class RestGetAlertsAction : BaseRestHandler() { searchString ) - val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null) + val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null, workflowIds = workflowIds) return RestChannelConsumer { channel -> client.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, getAlertsRequest, RestToXContentListener(channel)) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt index 7f2a5c88f..1f2e7403c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportSearchMonitorAction.kt @@ -23,6 +23,7 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.commons.alerting.model.Monitor import org.opensearch.commons.alerting.model.ScheduledJob +import org.opensearch.commons.alerting.model.Workflow import org.opensearch.commons.authuser.User import org.opensearch.index.query.BoolQueryBuilder import org.opensearch.index.query.ExistsQueryBuilder @@ -60,7 +61,9 @@ class TransportSearchMonitorAction @Inject constructor( // When querying the ALL_ALERT_INDEX_PATTERN, we don't want to check whether the MONITOR_TYPE field exists // because we're querying alert indexes. if (searchMonitorRequest.searchRequest.indices().contains(ScheduledJob.SCHEDULED_JOBS_INDEX)) { - queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) + val monitorWorkflowType = QueryBuilders.boolQuery().should(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE)) + .should(QueryBuilders.existsQuery(Workflow.WORKFLOW_TYPE)) + queryBuilder.must(monitorWorkflowType) } searchSourceBuilder.query(queryBuilder) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 47250f92b..9adfed367 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -41,7 +41,6 @@ import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentFactory.jsonBuilder import org.opensearch.common.xcontent.XContentParserUtils import org.opensearch.common.xcontent.XContentType -import org.opensearch.common.xcontent.json.JsonXContent import org.opensearch.common.xcontent.json.JsonXContent.jsonXContent import org.opensearch.commons.alerting.action.GetFindingsResponse import org.opensearch.commons.alerting.model.Alert @@ -645,9 +644,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON)) assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) - val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) return searchResponse.hits.hits.map { - val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() } Alert.parse(xcp, it.id, it.version) }.filter { alert -> alert.monitorId == monitor.id } } @@ -690,9 +689,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON)) assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) - val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) return searchResponse.hits.hits.map { - val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() } Finding.parse(xcp) }.filter { finding -> finding.monitorId == monitor.id } } @@ -715,9 +714,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", searchParams, StringEntity(request, APPLICATION_JSON)) assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) - val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content)) + val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) return searchResponse.hits.hits.map { - val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() } + val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() } Alert.parse(xcp, it.id, it.version) } } @@ -865,6 +864,18 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return GetFindingsResponse(response.restStatus(), totalFindings, findings) } + protected fun searchMonitors(): SearchResponse { + var baseEndpoint = "${AlertingPlugin.MONITOR_BASE_URI}/_search?" + val request = """ + { "version" : true, + "query": { "match_all": {} } + } + """.trimIndent() + val httpResponse = adminClient().makeRequest("POST", baseEndpoint, StringEntity(request, APPLICATION_JSON)) + assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus()) + return SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content)) + } + protected fun indexDoc(index: String, id: String, doc: String, refresh: Boolean = true): Response { return indexDoc(client(), index, id, doc, refresh) } @@ -1582,7 +1593,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { ) assertEquals("Unable to create a new monitor", RestStatus.CREATED, response.restStatus()) - val workflowJson = JsonXContent.jsonXContent.createParser( + val workflowJson = jsonXContent.createParser( NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, response.entity.content ).map() diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt index 4fb761544..808839d16 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/MonitorDataSourcesIT.kt @@ -5462,7 +5462,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } fun `test postIndex on workflow update with trigger deletion`() { - val monitorRunnerService = getInstanceFromNode(MonitorRunnerService.javaClass) val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) @@ -5535,4 +5534,72 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() { } Assert.assertTrue(alerts.stream().anyMatch { it.state == Alert.State.DELETED && chainedAlerts[0].id == it.id }) } + + fun `test postDelete on workflow deletion`() { + val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3") + val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1)) + val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + var monitor1 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + var monitor2 = randomDocumentLevelMonitor( + inputs = listOf(docLevelInput1), + triggers = listOf(trigger1) + ) + val monitorResponse = createMonitor(monitor1)!! + val monitorResponse2 = createMonitor(monitor2)!! + + val andTrigger = randomChainedAlertTrigger( + name = "1And2", + condition = Script("monitor[id=${monitorResponse.id}] && monitor[id=${monitorResponse2.id}]") + ) + val notTrigger = randomChainedAlertTrigger( + name = "Not1OrNot2", + condition = Script("!monitor[id=${monitorResponse.id}] || !monitor[id=${monitorResponse2.id}]") + ) + var workflow = randomWorkflow( + monitorIds = listOf(monitorResponse.id, monitorResponse2.id), + triggers = listOf(andTrigger) + ) + val workflowResponse = upsertWorkflow(workflow)!! + val workflowById = searchWorkflow(workflowResponse.id) + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS)) + val testDoc1 = """{ + "message" : "This is an error from IAD region", + "source.ip.v6.v2" : 16644, + "test_strict_date_time" : "$testTime", + "test_field_1" : "us-west-2" + }""" + indexDoc(index, "1", testDoc1) + val workflowId = workflowById!!.id + var executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!! + var res = getWorkflowAlerts( + workflowId, + ) + var chainedAlerts = res.alerts + Assert.assertTrue(chainedAlerts.size == 1) + val deleteRes = deleteWorkflow(workflowId, false) + logger.info(deleteRes) + OpenSearchTestCase.waitUntil({ + val searchRequest = SearchRequest(AlertIndices.ALERT_HISTORY_ALL) + val sr = client().search(searchRequest).get() + sr.hits.hits.size == 3 + }, 5, TimeUnit.MINUTES) + val searchRequest = SearchRequest(AlertIndices.ALERT_HISTORY_ALL) + val sr = client().search(searchRequest).get() + Assert.assertTrue(sr.hits.hits.size == 3) + val alerts = sr.hits.map { hit -> + val xcp = XContentHelper.createParser( + xContentRegistry(), + LoggingDeprecationHandler.INSTANCE, + hit.sourceRef, + XContentType.JSON + ) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + val alert = Alert.parse(xcp, hit.id, hit.version) + alert + } + Assert.assertTrue(alerts.stream().anyMatch { it.state == Alert.State.DELETED && chainedAlerts[0].id == it.id }) + } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt index de31fab55..ec128109f 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/resthandler/WorkflowRestApiIT.kt @@ -1069,7 +1069,7 @@ class WorkflowRestApiIT : AlertingRestTestCase() { owner = "alerting", triggers = listOf(andTrigger) ) - val workflowById = createWorkflow(workflow)!! + val workflowById = createWorkflow(workflow) assertNotNull(workflowById) val workflowId = workflowById.id @@ -1079,7 +1079,18 @@ class WorkflowRestApiIT : AlertingRestTestCase() { "test_value_1" ) ) - + val searchMonitorResponse = searchMonitors() + logger.error(searchMonitorResponse) + val jobsList = searchMonitorResponse.hits.toList() + var numMonitors = 0 + var numWorkflows = 0 + jobsList.forEach { + val map = it.sourceAsMap + if (map["type"] == "workflow") numWorkflows++ + else if (map["type"] == "monitor") numMonitors++ + } + Assert.assertEquals(numMonitors, 2) + Assert.assertEquals(numWorkflows, 1) val response = executeWorkflow(workflowId = workflowId, params = emptyMap()) val executeWorkflowResponse = entityAsMap(response) logger.info(executeWorkflowResponse) @@ -1101,6 +1112,17 @@ class WorkflowRestApiIT : AlertingRestTestCase() { Assert.assertEquals(alerts[0]["monitor_id"], "") val associatedAlerts = getWorkflowAlerts["associatedAlerts"] as List> assertEquals(associatedAlerts.size, 2) + + val getAlertsRes = getAlerts(java.util.Map.of("workflowIds", listOf(workflowId))) + val getAlertsMap = getAlertsRes.asMap() + Assert.assertTrue(getAlertsMap.containsKey("alerts")) + val getAlertsAlerts = getWorkflowAlerts["alerts"] as List> + assertEquals(alerts.size, 1) + Assert.assertEquals(getAlertsAlerts[0]["execution_id"], executionId) + Assert.assertEquals(getAlertsAlerts[0]["workflow_id"], workflowId) + Assert.assertEquals(getAlertsAlerts[0]["monitor_id"], "") + Assert.assertEquals(getAlertsAlerts[0]["id"], alerts[0]["id"]) + val ackRes = acknowledgeChainedAlerts(workflowId, alerts[0]["id"].toString()) val acknowledgeChainedAlertsResponse = entityAsMap(ackRes) val acknowledged = acknowledgeChainedAlertsResponse["success"] as List