Skip to content

Commit

Permalink
add auditDelegateMonitorAlerts flag (#476) (#477)
Browse files Browse the repository at this point in the history
* add auditDelegateMonitorAlerts flag

* add audit state check in error alert validation

* add test to verify workflow with auditDelegateMonitor flag null

---------

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
(cherry picked from commit 90bea0c)
  • Loading branch information
eirsep authored and github-actions[bot] committed Jul 11, 2023
1 parent 1800974 commit 437eba4
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ class GetWorkflowAlertsRequest : ActionRequest {
val severityLevel: String
val alertState: String
val alertIndex: String?
val associatedAlertsIndex: String?
val monitorIds: List<String>?
val workflowIds: List<String>?
val alertIds: List<String>?
Expand All @@ -22,15 +23,17 @@ class GetWorkflowAlertsRequest : ActionRequest {
severityLevel: String,
alertState: String,
alertIndex: String?,
associatedAlertsIndex: String?,
monitorIds: List<String>? = null,
workflowIds: List<String>? = null,
alertIds: List<String>? = null,
getAssociatedAlerts: Boolean
getAssociatedAlerts: Boolean,
) : super() {
this.table = table
this.severityLevel = severityLevel
this.alertState = alertState
this.alertIndex = alertIndex
this.associatedAlertsIndex = associatedAlertsIndex
this.monitorIds = monitorIds
this.workflowIds = workflowIds
this.alertIds = alertIds
Expand All @@ -43,6 +46,7 @@ class GetWorkflowAlertsRequest : ActionRequest {
severityLevel = sin.readString(),
alertState = sin.readString(),
alertIndex = sin.readOptionalString(),
associatedAlertsIndex = sin.readOptionalString(),
monitorIds = sin.readOptionalStringList(),
workflowIds = sin.readOptionalStringList(),
alertIds = sin.readOptionalStringList(),
Expand All @@ -59,6 +63,7 @@ class GetWorkflowAlertsRequest : ActionRequest {
out.writeString(severityLevel)
out.writeString(alertState)
out.writeOptionalString(alertIndex)
out.writeOptionalString(associatedAlertsIndex)
out.writeOptionalStringCollection(monitorIds)
out.writeOptionalStringCollection(workflowIds)
out.writeOptionalStringCollection(alertIds)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ data class Alert(
) : Writeable, ToXContent {

init {
if (errorMessage != null) require(state == State.DELETED || state == State.ERROR) {
if (errorMessage != null) require(state == State.DELETED || state == State.ERROR || state == State.AUDIT) {
"Attempt to create an alert with an error in state: $state"
}
}
Expand Down Expand Up @@ -421,7 +421,9 @@ data class Alert(
SCHEMA_VERSION_FIELD -> schemaVersion = xcp.intValue()
MONITOR_NAME_FIELD -> monitorName = xcp.text()
MONITOR_VERSION_FIELD -> monitorVersion = xcp.longValue()
MONITOR_USER_FIELD -> monitorUser = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null else User.parse(xcp)
MONITOR_USER_FIELD ->
monitorUser = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) null
else User.parse(xcp)
TRIGGER_ID_FIELD -> triggerId = xcp.text()
FINDING_IDS -> {
ensureExpectedToken(XContentParser.Token.START_ARRAY, xcp.currentToken(), xcp)
Expand Down
18 changes: 14 additions & 4 deletions src/main/kotlin/org/opensearch/commons/alerting/model/Workflow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ data class Workflow(
val schemaVersion: Int = NO_SCHEMA_VERSION,
val inputs: List<WorkflowInput>,
val owner: String? = DEFAULT_OWNER,
val triggers: List<Trigger>
val triggers: List<Trigger>,
val auditDelegateMonitorAlerts: Boolean? = true,
) : ScheduledJob {
override val type = WORKFLOW_TYPE

Expand Down Expand Up @@ -70,7 +71,8 @@ data class Workflow(
schemaVersion = sin.readInt(),
inputs = sin.readList((WorkflowInput)::readFrom),
owner = sin.readOptionalString(),
triggers = sin.readList((Trigger)::readFrom)
triggers = sin.readList((Trigger)::readFrom),
auditDelegateMonitorAlerts = sin.readOptionalBoolean()
)

// This enum classifies different workflows
Expand Down Expand Up @@ -99,7 +101,7 @@ data class Workflow(
private fun createXContentBuilder(
builder: XContentBuilder,
params: ToXContent.Params,
secure: Boolean
secure: Boolean,
): XContentBuilder {
builder.startObject()
if (params.paramAsBoolean("with_type", false)) builder.startObject(type)
Expand All @@ -119,6 +121,9 @@ data class Workflow(
.field(TRIGGERS_FIELD, triggers.toTypedArray())
.optionalTimeField(LAST_UPDATE_TIME_FIELD, lastUpdateTime)
builder.field(OWNER_FIELD, owner)
if (auditDelegateMonitorAlerts != null) {
builder.field(AUDIT_DELEGATE_MONITOR_ALERTS_FIELD, auditDelegateMonitorAlerts)
}
if (params.paramAsBoolean("with_type", false)) builder.endObject()
return builder.endObject()
}
Expand Down Expand Up @@ -159,6 +164,7 @@ data class Workflow(
}
it.writeTo(out)
}
out.writeOptionalBoolean(auditDelegateMonitorAlerts)
}

companion object {
Expand All @@ -177,6 +183,7 @@ data class Workflow(
const val ENABLED_TIME_FIELD = "enabled_time"
const val TRIGGERS_FIELD = "triggers"
const val OWNER_FIELD = "owner"
const val AUDIT_DELEGATE_MONITOR_ALERTS_FIELD = "audit_delegate_monitor_alerts"

// This is defined here instead of in ScheduledJob to avoid having the ScheduledJob class know about all
// the different subclasses and creating circular dependencies
Expand All @@ -201,6 +208,7 @@ data class Workflow(
val inputs: MutableList<WorkflowInput> = mutableListOf()
val triggers: MutableList<Trigger> = mutableListOf()
var owner = DEFAULT_OWNER
var auditDelegateMonitorAlerts = true

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
Expand Down Expand Up @@ -245,6 +253,7 @@ data class Workflow(
}
ENABLED_TIME_FIELD -> enabledTime = xcp.instant()
LAST_UPDATE_TIME_FIELD -> lastUpdateTime = xcp.instant()
AUDIT_DELEGATE_MONITOR_ALERTS_FIELD -> auditDelegateMonitorAlerts = xcp.booleanValue()
OWNER_FIELD -> {
owner = if (xcp.currentToken() == XContentParser.Token.VALUE_NULL) owner else xcp.text()
}
Expand Down Expand Up @@ -272,7 +281,8 @@ data class Workflow(
schemaVersion,
inputs.toList(),
owner,
triggers
triggers,
auditDelegateMonitorAlerts
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ fun randomWorkflow(
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
lastUpdateTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS),
triggers: List<Trigger> = listOf(randomChainedAlertTrigger()),
auditDelegateMonitorAlerts: Boolean? = true
): Workflow {
val delegates = mutableListOf<Delegate>()
if (!monitorIds.isNullOrEmpty()) {
Expand All @@ -195,7 +196,7 @@ fun randomWorkflow(
return Workflow(
name = name, workflowType = Workflow.WorkflowType.COMPOSITE, enabled = enabled, inputs = input,
schedule = schedule, enabledTime = enabledTime, lastUpdateTime = lastUpdateTime, user = user,
triggers = triggers
triggers = triggers, auditDelegateMonitorAlerts = auditDelegateMonitorAlerts
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ internal class GetWorkflowAlertsRequestTests {
workflowIds = listOf("w1", "w2"),
alertIds = emptyList(),
alertIndex = null,
associatedAlertsIndex = null,
monitorIds = emptyList()
)
assertNotNull(req)
Expand All @@ -41,6 +42,42 @@ internal class GetWorkflowAlertsRequestTests {
assertTrue(newReq.alertIds!!.isEmpty())
assertTrue(newReq.monitorIds!!.isEmpty())
assertNull(newReq.alertIndex)
assertNull(newReq.associatedAlertsIndex)
assertTrue(newReq.getAssociatedAlerts)
}

@Test
fun `test get alerts request with custom alerts and associated alerts indices`() {

val table = Table("asc", "sortString", null, 1, 0, "")

val req = GetWorkflowAlertsRequest(
table = table,
severityLevel = "1",
alertState = "active",
getAssociatedAlerts = true,
workflowIds = listOf("w1", "w2"),
alertIds = emptyList(),
alertIndex = "alertIndex",
associatedAlertsIndex = "associatedAlertsIndex",
monitorIds = emptyList()
)
assertNotNull(req)

val out = BytesStreamOutput()
req.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newReq = GetWorkflowAlertsRequest(sin)

assertEquals("1", newReq.severityLevel)
assertEquals("active", newReq.alertState)
assertEquals(table, newReq.table)
assertTrue(newReq.workflowIds!!.contains("w1"))
assertTrue(newReq.workflowIds!!.contains("w2"))
assertTrue(newReq.alertIds!!.isEmpty())
assertTrue(newReq.monitorIds!!.isEmpty())
assertEquals(newReq.alertIndex, "alertIndex")
assertEquals(newReq.associatedAlertsIndex, "associatedAlertsIndex")
assertTrue(newReq.getAssociatedAlerts)
}

Expand All @@ -55,7 +92,8 @@ internal class GetWorkflowAlertsRequestTests {
getAssociatedAlerts = true,
workflowIds = listOf("w1, w2"),
alertIds = emptyList(),
alertIndex = null
alertIndex = null,
associatedAlertsIndex = null
)
assertNotNull(req)
assertNull(req.validate())
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.opensearch.commons.alerting.action

import org.junit.jupiter.api.Assertions
import org.junit.jupiter.api.Test
import org.opensearch.common.io.stream.BytesStreamOutput
import org.opensearch.common.io.stream.StreamInput
import org.opensearch.commons.alerting.model.CompositeInput
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Workflow
import org.opensearch.commons.alerting.randomDelegate
import org.opensearch.commons.alerting.randomUser
import org.opensearch.commons.alerting.randomWorkflow
import org.opensearch.rest.RestStatus
import java.time.Instant
import java.time.temporal.ChronoUnit

class GetWorkflowResponseTests {

@Test
fun testGetWorkflowResponse() {
val workflow = randomWorkflow(auditDelegateMonitorAlerts = false)
val response = GetWorkflowResponse(
id = "id", version = 1, seqNo = 1, primaryTerm = 1, status = RestStatus.OK, workflow = workflow
)
val out = BytesStreamOutput()
response.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newRes = GetWorkflowResponse(sin)
Assertions.assertEquals("id", newRes.id)
Assertions.assertFalse(newRes.workflow!!.auditDelegateMonitorAlerts!!)
Assertions.assertEquals(workflow.name, newRes.workflow!!.name)
Assertions.assertEquals(workflow.owner, newRes.workflow!!.owner)
}

@Test
fun testGetWorkflowResponseWhereAuditDelegateMonitorAlertsFlagIsNotSet() {
val workflow = Workflow(
id = "",
version = Workflow.NO_VERSION,
name = "test",
enabled = true,
schemaVersion = 2,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES),
lastUpdateTime = Instant.now(),
enabledTime = Instant.now(),
workflowType = Workflow.WorkflowType.COMPOSITE,
user = randomUser(),
inputs = listOf(CompositeInput(org.opensearch.commons.alerting.model.Sequence(listOf(randomDelegate())))),
owner = "",
triggers = listOf()
)
val response = GetWorkflowResponse(
id = "id", version = 1, seqNo = 1, primaryTerm = 1, status = RestStatus.OK, workflow = workflow
)
val out = BytesStreamOutput()
response.writeTo(out)
val sin = StreamInput.wrap(out.bytes().toBytesRef().bytes)
val newRes = GetWorkflowResponse(sin)
Assertions.assertEquals("id", newRes.id)
Assertions.assertTrue(newRes.workflow!!.auditDelegateMonitorAlerts!!)
Assertions.assertEquals(workflow.name, newRes.workflow!!.name)
Assertions.assertEquals(workflow.owner, newRes.workflow!!.owner)
Assertions.assertEquals(workflow.auditDelegateMonitorAlerts, newRes.workflow!!.auditDelegateMonitorAlerts)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class IndexWorkflowRequestTests {

val req = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.POST,
randomWorkflow()
randomWorkflow(auditDelegateMonitorAlerts = false)
)
Assertions.assertNotNull(req)

Expand All @@ -42,6 +42,7 @@ class IndexWorkflowRequestTests {
Assertions.assertEquals(2L, newReq.primaryTerm)
Assertions.assertEquals(RestRequest.Method.POST, newReq.method)
Assertions.assertNotNull(newReq.workflow)
Assertions.assertFalse(newReq.workflow.auditDelegateMonitorAlerts!!)
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,14 @@ class XContentTests {
Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work")
}

@Test
fun `test composite workflow parsing with auditDelegateMonitorAlerts flag disabled`() {
val workflow = randomWorkflow(auditDelegateMonitorAlerts = false)
val monitorString = workflow.toJsonStringWithUser()
val parsedMonitor = Workflow.parse(parser(monitorString))
Assertions.assertEquals(workflow, parsedMonitor, "Round tripping BucketLevelMonitor doesn't work")
}

@Test
fun `test query-level trigger parsing`() {
val trigger = randomQueryLevelTrigger()
Expand Down

0 comments on commit 437eba4

Please sign in to comment.