Skip to content

Commit

Permalink
Add workflowIds field in getAlerts API (#1014)
Browse files Browse the repository at this point in the history
* add alert mover test for verifying workflow post delete clean up

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add workflowIds field in get alerts API

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

* add support for fetching workflows in search monitors api

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep authored Jul 12, 2023
1 parent dc22baa commit 2430d81
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
Expand Down Expand Up @@ -166,9 +167,7 @@ class AlertMover {
monitorCtx.xContentRegistry!!,
response = getResponse
)
/** check if alert index is initialized **/
if (monitorCtx.alertIndices!!.isAlertInitialized(monitor.dataSources) == false)
return

alertIndex = monitor.dataSources.alertsIndex
alertHistoryIndex =
if (monitor.dataSources.alertsHistoryIndex == null) alertHistoryIndex
Expand All @@ -181,8 +180,12 @@ class AlertMover {
}
}
}
val dataSources = DataSources().copy(alertsHistoryIndex = alertHistoryIndex, alertsIndex = alertIndex)
/** check if alert index is initialized **/
if (monitorCtx.alertIndices!!.isAlertInitialized(dataSources) == false)
return
val boolQuery = QueryBuilders.boolQuery()
.filter(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId))
.must(QueryBuilders.termQuery(Alert.WORKFLOW_ID_FIELD, workflowId))

if (workflow != null) {
boolQuery.mustNot(QueryBuilders.termsQuery(Alert.TRIGGER_ID_FIELD, workflow.triggers.map { it.id }))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,11 @@ class RestGetAlertsAction : BaseRestHandler() {
val severityLevel = request.param("severityLevel", "ALL")
val alertState = request.param("alertState", "ALL")
val monitorId: String? = request.param("monitorId")
val workflowId: String? = request.param("workflowIds")
val workflowIds = mutableListOf<String>()
if (workflowId.isNullOrEmpty() == false) {
workflowIds.add(workflowId)
}
val table = Table(
sortOrder,
sortString,
Expand All @@ -66,7 +71,7 @@ class RestGetAlertsAction : BaseRestHandler() {
searchString
)

val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null)
val getAlertsRequest = GetAlertsRequest(table, severityLevel, alertState, monitorId, null, workflowIds = workflowIds)
return RestChannelConsumer {
channel ->
client.execute(AlertingActions.GET_ALERTS_ACTION_TYPE, getAlertsRequest, RestToXContentListener(channel))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.ScheduledJob
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.authuser.User
import org.opensearch.index.query.BoolQueryBuilder
import org.opensearch.index.query.ExistsQueryBuilder
Expand Down Expand Up @@ -60,7 +61,9 @@ class TransportSearchMonitorAction @Inject constructor(
// When querying the ALL_ALERT_INDEX_PATTERN, we don't want to check whether the MONITOR_TYPE field exists
// because we're querying alert indexes.
if (searchMonitorRequest.searchRequest.indices().contains(ScheduledJob.SCHEDULED_JOBS_INDEX)) {
queryBuilder.filter(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
val monitorWorkflowType = QueryBuilders.boolQuery().should(QueryBuilders.existsQuery(Monitor.MONITOR_TYPE))
.should(QueryBuilders.existsQuery(Workflow.WORKFLOW_TYPE))
queryBuilder.must(monitorWorkflowType)
}

searchSourceBuilder.query(queryBuilder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentFactory.jsonBuilder
import org.opensearch.common.xcontent.XContentParserUtils
import org.opensearch.common.xcontent.XContentType
import org.opensearch.common.xcontent.json.JsonXContent
import org.opensearch.common.xcontent.json.JsonXContent.jsonXContent
import org.opensearch.commons.alerting.action.GetFindingsResponse
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -645,9 +644,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))
return searchResponse.hits.hits.map {
val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() }
val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() }
Alert.parse(xcp, it.id, it.version)
}.filter { alert -> alert.monitorId == monitor.id }
}
Expand Down Expand Up @@ -690,9 +689,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))
return searchResponse.hits.hits.map {
val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() }
val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() }
Finding.parse(xcp)
}.filter { finding -> finding.monitorId == monitor.id }
}
Expand All @@ -715,9 +714,9 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val httpResponse = adminClient().makeRequest("GET", "/$indices/_search", searchParams, StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())

val searchResponse = SearchResponse.fromXContent(createParser(JsonXContent.jsonXContent, httpResponse.entity.content))
val searchResponse = SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))
return searchResponse.hits.hits.map {
val xcp = createParser(JsonXContent.jsonXContent, it.sourceRef).also { it.nextToken() }
val xcp = createParser(jsonXContent, it.sourceRef).also { it.nextToken() }
Alert.parse(xcp, it.id, it.version)
}
}
Expand Down Expand Up @@ -865,6 +864,18 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return GetFindingsResponse(response.restStatus(), totalFindings, findings)
}

protected fun searchMonitors(): SearchResponse {
var baseEndpoint = "${AlertingPlugin.MONITOR_BASE_URI}/_search?"
val request = """
{ "version" : true,
"query": { "match_all": {} }
}
""".trimIndent()
val httpResponse = adminClient().makeRequest("POST", baseEndpoint, StringEntity(request, APPLICATION_JSON))
assertEquals("Search failed", RestStatus.OK, httpResponse.restStatus())
return SearchResponse.fromXContent(createParser(jsonXContent, httpResponse.entity.content))
}

protected fun indexDoc(index: String, id: String, doc: String, refresh: Boolean = true): Response {
return indexDoc(client(), index, id, doc, refresh)
}
Expand Down Expand Up @@ -1582,7 +1593,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
)
assertEquals("Unable to create a new monitor", RestStatus.CREATED, response.restStatus())

val workflowJson = JsonXContent.jsonXContent.createParser(
val workflowJson = jsonXContent.createParser(
NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE,
response.entity.content
).map()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5462,7 +5462,6 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}

fun `test postIndex on workflow update with trigger deletion`() {
val monitorRunnerService = getInstanceFromNode(MonitorRunnerService.javaClass)
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
Expand Down Expand Up @@ -5535,4 +5534,72 @@ class MonitorDataSourcesIT : AlertingSingleNodeTestCase() {
}
Assert.assertTrue(alerts.stream().anyMatch { it.state == Alert.State.DELETED && chainedAlerts[0].id == it.id })
}

fun `test postDelete on workflow deletion`() {
val docQuery1 = DocLevelQuery(query = "test_field_1:\"us-west-2\"", name = "3")
val docLevelInput1 = DocLevelMonitorInput("description", listOf(index), listOf(docQuery1))
val trigger1 = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
var monitor1 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput1),
triggers = listOf(trigger1)
)
var monitor2 = randomDocumentLevelMonitor(
inputs = listOf(docLevelInput1),
triggers = listOf(trigger1)
)
val monitorResponse = createMonitor(monitor1)!!
val monitorResponse2 = createMonitor(monitor2)!!

val andTrigger = randomChainedAlertTrigger(
name = "1And2",
condition = Script("monitor[id=${monitorResponse.id}] && monitor[id=${monitorResponse2.id}]")
)
val notTrigger = randomChainedAlertTrigger(
name = "Not1OrNot2",
condition = Script("!monitor[id=${monitorResponse.id}] || !monitor[id=${monitorResponse2.id}]")
)
var workflow = randomWorkflow(
monitorIds = listOf(monitorResponse.id, monitorResponse2.id),
triggers = listOf(andTrigger)
)
val workflowResponse = upsertWorkflow(workflow)!!
val workflowById = searchWorkflow(workflowResponse.id)
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc1 = """{
"message" : "This is an error from IAD region",
"source.ip.v6.v2" : 16644,
"test_strict_date_time" : "$testTime",
"test_field_1" : "us-west-2"
}"""
indexDoc(index, "1", testDoc1)
val workflowId = workflowById!!.id
var executeWorkflowResponse = executeWorkflow(workflowById, workflowId, false)!!
var res = getWorkflowAlerts(
workflowId,
)
var chainedAlerts = res.alerts
Assert.assertTrue(chainedAlerts.size == 1)
val deleteRes = deleteWorkflow(workflowId, false)
logger.info(deleteRes)
OpenSearchTestCase.waitUntil({
val searchRequest = SearchRequest(AlertIndices.ALERT_HISTORY_ALL)
val sr = client().search(searchRequest).get()
sr.hits.hits.size == 3
}, 5, TimeUnit.MINUTES)
val searchRequest = SearchRequest(AlertIndices.ALERT_HISTORY_ALL)
val sr = client().search(searchRequest).get()
Assert.assertTrue(sr.hits.hits.size == 3)
val alerts = sr.hits.map { hit ->
val xcp = XContentHelper.createParser(
xContentRegistry(),
LoggingDeprecationHandler.INSTANCE,
hit.sourceRef,
XContentType.JSON
)
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp)
val alert = Alert.parse(xcp, hit.id, hit.version)
alert
}
Assert.assertTrue(alerts.stream().anyMatch { it.state == Alert.State.DELETED && chainedAlerts[0].id == it.id })
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
owner = "alerting",
triggers = listOf(andTrigger)
)
val workflowById = createWorkflow(workflow)!!
val workflowById = createWorkflow(workflow)
assertNotNull(workflowById)
val workflowId = workflowById.id

Expand All @@ -1079,7 +1079,18 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
"test_value_1"
)
)

val searchMonitorResponse = searchMonitors()
logger.error(searchMonitorResponse)
val jobsList = searchMonitorResponse.hits.toList()
var numMonitors = 0
var numWorkflows = 0
jobsList.forEach {
val map = it.sourceAsMap
if (map["type"] == "workflow") numWorkflows++
else if (map["type"] == "monitor") numMonitors++
}
Assert.assertEquals(numMonitors, 2)
Assert.assertEquals(numWorkflows, 1)
val response = executeWorkflow(workflowId = workflowId, params = emptyMap())
val executeWorkflowResponse = entityAsMap(response)
logger.info(executeWorkflowResponse)
Expand All @@ -1101,6 +1112,17 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
Assert.assertEquals(alerts[0]["monitor_id"], "")
val associatedAlerts = getWorkflowAlerts["associatedAlerts"] as List<HashMap<String, Any>>
assertEquals(associatedAlerts.size, 2)

val getAlertsRes = getAlerts(java.util.Map.of("workflowIds", listOf(workflowId)))
val getAlertsMap = getAlertsRes.asMap()
Assert.assertTrue(getAlertsMap.containsKey("alerts"))
val getAlertsAlerts = getWorkflowAlerts["alerts"] as List<HashMap<String, Any>>
assertEquals(alerts.size, 1)
Assert.assertEquals(getAlertsAlerts[0]["execution_id"], executionId)
Assert.assertEquals(getAlertsAlerts[0]["workflow_id"], workflowId)
Assert.assertEquals(getAlertsAlerts[0]["monitor_id"], "")
Assert.assertEquals(getAlertsAlerts[0]["id"], alerts[0]["id"])

val ackRes = acknowledgeChainedAlerts(workflowId, alerts[0]["id"].toString())
val acknowledgeChainedAlertsResponse = entityAsMap(ackRes)
val acknowledged = acknowledgeChainedAlertsResponse["success"] as List<String>
Expand Down

0 comments on commit 2430d81

Please sign in to comment.