Skip to content

Commit

Permalink
support list of monitor ids in Chained Monitor Findings (#514)
Browse files Browse the repository at this point in the history
support list of monitor ids in Chained Monitor Findings 

Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Aug 31, 2023
1 parent bae1bee commit 7736e08
Show file tree
Hide file tree
Showing 6 changed files with 143 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -105,19 +105,41 @@ class IndexWorkflowRequest : ActionRequest {
val monitorIdOrderMap: Map<String, Int> = delegates.associate { it.monitorId to it.order }
delegates.forEach {
if (it.chainedMonitorFindings != null) {
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence",
validationException
)
// Break the flow because next check will generate the NPE
return validationException
}
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}",
validationException
)

if (it.chainedMonitorFindings.monitorId != null) {
if (monitorIdOrderMap.containsKey(it.chainedMonitorFindings.monitorId) == false) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} doesn't exist in sequence",
validationException
)
// Break the flow because next check will generate the NPE
return validationException
}
if (it.order <= monitorIdOrderMap[it.chainedMonitorFindings.monitorId]!!) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}",
validationException
)
}
} else {
for (monitorId in it.chainedMonitorFindings.monitorIds) {
if (!monitorIdOrderMap.containsKey(monitorId)) {
validationException = ValidateActions.addValidationError(
"Chained Findings Monitor $monitorId doesn't exist in sequence",
validationException
)
return validationException
} else {
val order = monitorIdOrderMap.get(monitorId)!!
if (order >= it.order) {
return ValidateActions.addValidationError(
"Chained Findings Monitor ${it.chainedMonitorFindings.monitorId} should be executed before monitor ${it.monitorId}. " +
"Order of monitor being chained [$order] should be smaller than order of monitor using findings as source data [${it.order}] in sequence",
validationException
)
}
}
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,63 +9,89 @@ import org.opensearch.core.xcontent.XContentBuilder
import org.opensearch.core.xcontent.XContentParser
import org.opensearch.core.xcontent.XContentParserUtils
import java.io.IOException
import java.util.Collections

/**
* Context passed in delegate monitor to filter data queried by a monitor based on the findings of the given monitor id.
* Context passed in delegate monitor to filter data matched by a list of monitors based on the findings of the given monitor ids.
*/
// TODO - Remove the class and move the monitorId to Delegate (as a chainedMonitorId property) if this class won't be updated by adding new properties
data class ChainedMonitorFindings(
val monitorId: String
val monitorId: String? = null,
val monitorIds: List<String> = emptyList(), // if monitorId field is non-null it would be given precendence for BWC
) : BaseModel {

init {
validateId(monitorId)
require(!(monitorId.isNullOrBlank() && monitorIds.isEmpty())) {
"at least one of fields, 'monitorIds' and 'monitorId' should be provided"
}
if (monitorId != null && monitorId.isBlank()) {
validateId(monitorId)
} else {
monitorIds.forEach { validateId(it) }
}
}

@Throws(IOException::class)
constructor(sin: StreamInput) : this(
sin.readString(), // monitorId
sin.readOptionalString(), // monitorId
Collections.unmodifiableList(sin.readStringList())
)

@Suppress("UNCHECKED_CAST")
fun asTemplateArg(): Map<String, Any> {
return mapOf(
MONITOR_ID_FIELD to monitorId,
)
MONITOR_IDS_FIELD to monitorIds
) as Map<String, Any>
}

@Throws(IOException::class)
override fun writeTo(out: StreamOutput) {
out.writeString(monitorId)
out.writeOptionalString(monitorId)
out.writeStringCollection(monitorIds)
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
.field(MONITOR_ID_FIELD, monitorId)
.field(MONITOR_IDS_FIELD, monitorIds)
.endObject()
return builder
}

companion object {
const val MONITOR_ID_FIELD = "monitor_id"
const val MONITOR_IDS_FIELD = "monitor_ids"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser): ChainedMonitorFindings {
lateinit var monitorId: String

var monitorId: String? = null
val monitorIds = mutableListOf<String>()
XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
MONITOR_ID_FIELD -> {
monitorId = xcp.text()
validateId(monitorId)
if (!xcp.currentToken().equals(XContentParser.Token.VALUE_NULL))
monitorId = xcp.text()
}

MONITOR_IDS_FIELD -> {
XContentParserUtils.ensureExpectedToken(
XContentParser.Token.START_ARRAY,
xcp.currentToken(),
xcp
)
while (xcp.nextToken() != XContentParser.Token.END_ARRAY) {
monitorIds.add(xcp.text())
}
}
}
}
return ChainedMonitorFindings(monitorId)
return ChainedMonitorFindings(monitorId, monitorIds)
}

@JvmStatic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ fun randomClusterMetricsInput(
return ClusterMetricsInput(path, pathParams, url)
}

fun ChainedMonitorFindings.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string()
}

fun Workflow.toJsonString(): String {
val builder = XContentFactory.jsonBuilder()
return this.toXContentWithUser(builder, ToXContent.EMPTY_PARAMS).string()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import org.opensearch.search.SearchModule
import java.lang.Exception
import java.lang.IllegalArgumentException
import java.util.UUID
import kotlin.test.assertNotNull
import kotlin.test.assertNull
import kotlin.test.assertTrue

class IndexWorkflowRequestTests {

Expand Down Expand Up @@ -196,6 +199,21 @@ class IndexWorkflowRequestTests {
delegates = listOf(
Delegate(1, "monitor-1")
)

// Chained finding list of monitors valid
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(2, "monitor-2"),
Delegate(3, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))),

)
val req7 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
assertNull(req7.validate())
try {
IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
Expand All @@ -207,5 +225,21 @@ class IndexWorkflowRequestTests {
Assert.assertTrue(ex is IllegalArgumentException)
Assert.assertTrue(ex.message!!.contains("Workflows can only have 1 search input."))
}

// Chained finding list of monitors invalid order and old field null
delegates = listOf(
Delegate(1, "monitor-1"),
Delegate(3, "monitor-2"),
Delegate(2, "monitor-3", ChainedMonitorFindings(null, listOf("monitor-1", "monitor-2"))),

)
val req8 = IndexWorkflowRequest(
"1234", 1L, 2L, WriteRequest.RefreshPolicy.IMMEDIATE, RestRequest.Method.PUT,
randomWorkflowWithDelegates(
input = listOf(CompositeInput(Sequence(delegates = delegates)))
)
)
assertNotNull(req8.validate())
assertTrue(req8.validate()!!.message!!.contains("should be executed before monitor"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,25 @@ class CompositeInputTests {
}

@Test
fun `test create Chained Findings with illegal monitorId value`() {
fun `test create Chained Findings with illegal monitorId value and empty monitorIds list`() {
try {
ChainedMonitorFindings("")
Assertions.fail("Expecting an illegal argument exception")
} catch (e: IllegalArgumentException) {
e.message?.let {
Assertions.assertTrue(
it.contains("at least one of fields, 'monitorIds' and 'monitorId' should be provided")

)
}
}
}

@Test
fun `test create Chained Findings with null monitorId value and monitorIds list with blank monitorIds`() {
try {
ChainedMonitorFindings("", listOf("", ""))
Assertions.fail("Expecting an illegal argument exception")
} catch (e: IllegalArgumentException) {
e.message?.let {
Assertions.assertTrue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,12 +267,27 @@ class XContentTests {
@Test
fun `test workflow parsing`() {
val workflow = randomWorkflow(monitorIds = listOf("1", "2", "3"))

val monitorString = workflow.toJsonString()
val parsedWorkflow = Workflow.parse(parser(monitorString))
Assertions.assertEquals(workflow, parsedWorkflow, "Round tripping workflow failed")
}

@Test
fun `test chainedMonitorFindings parsing`() {
val cmf1 = ChainedMonitorFindings(monitorId = "m1")
val cmf1String = cmf1.toJsonString()
Assertions.assertEquals(
ChainedMonitorFindings.parse(parser(cmf1String)), cmf1,
"Round tripping chained monitor findings failed"
)
val cmf2 = ChainedMonitorFindings(monitorIds = listOf("m1", "m2"))
val cmf2String = cmf2.toJsonString()
Assertions.assertEquals(
ChainedMonitorFindings.parse(parser(cmf2String)), cmf2,
"Round tripping chained monitor findings failed"
)
}

@Test
fun `test old monitor format parsing`() {
val monitorString = """
Expand Down

0 comments on commit 7736e08

Please sign in to comment.