Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add connection to triggers for doc level alerting #316

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
Expand Down Expand Up @@ -211,7 +212,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
SearchInput.XCONTENT_REGISTRY,
DocLevelMonitorInput.XCONTENT_REGISTRY,
QueryLevelTrigger.XCONTENT_REGISTRY,
BucketLevelTrigger.XCONTENT_REGISTRY
BucketLevelTrigger.XCONTENT_REGISTRY,
DocumentLevelTrigger.XCONTENT_REGISTRY
)
}

Expand Down
166 changes: 117 additions & 49 deletions alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.AlertingConfigAccessor
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.DocumentExecutionContext
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.Finding
import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.Monitor
Expand All @@ -48,6 +50,7 @@ import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.model.docLevelInput.DocLevelMonitorInput
import org.opensearch.alerting.model.docLevelInput.DocLevelQuery
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
Expand Down Expand Up @@ -757,39 +760,133 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return
}

for (query in queries) {
runForEachQuery(monitor, lastRunContext, index, query)
val count: Int = lastRunContext["shards_count"] as Int
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it's the time we break MonitorRunner into multiple concrete implementations for different types of alerting to make sure we are segregating responsibilities? This class is growing huge. It will also simplify start adding tests with dedicated scope.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I agree and I have a task for refactoring to help cleanup monitor runner

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we handle change in shard count between the runs? Are there edge scenarios which might need this handling.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I created this github issue regarding this. Also this only happens when there is a reindexing operation, so there might still be issues during executions since the sequence numbers will be changed as well.

val updatedLastRunContext = lastRunContext.toMutableMap()
AWSHurneyt marked this conversation as resolved.
Show resolved Hide resolved
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
}

val queryToDocIds = mutableMapOf<DocLevelQuery, Set<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docsToQueries -> docIdsToQueries

val docExecutionContext = DocumentExecutionContext(queries, lastRunContext, updatedLastRunContext)
queries.forEach { query ->
val matchingDocIds = runForEachQuery(docExecutionContext, query, index)
queryToDocIds[query] = matchingDocIds
matchingDocIds.forEach {
docsToQueries.putIfAbsent(it, mutableListOf())
docsToQueries[it]?.add(query.id)
}
}

val queryIds = queries.map { it.id }

monitor.triggers.forEach {
runForEachDocTrigger(it as DocumentLevelTrigger, monitor, docsToQueries, queryIds, queryToDocIds)
}

// TODO: Check for race condition against the update monitor api
// This does the update at the end in case of errors and makes sure all the queries are executed
val updatedMonitor = monitor.copy(lastRunContext = updatedLastRunContext)
// note: update has to called in serial for shards of a given index.
// make sure this is just updated for the specific query or at the end of all the queries
updateMonitor(client, xContentRegistry, settings, updatedMonitor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is there are race due to concurrent updates to monitor from user driven update vs last context updates?
I see a value into segregating the monitor configuration information from state information. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove these comments since these have now been sorted. Yea there has been talks of having another index for additional metadata information and this can help with throttling.

}

private suspend fun runForEachQuery(monitor: Monitor, lastRunContext: MutableMap<String, Any>, index: String, query: DocLevelQuery) {
val count: Int = lastRunContext["shards_count"] as Int
private fun runForEachDocTrigger(
trigger: DocumentLevelTrigger,
monitor: Monitor,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>,
queryToDocIds: Map<DocLevelQuery, Set<String>>
) {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = triggerService.runDocLevelTrigger(monitor, trigger, triggerCtx, docsToQueries, queryIds)

logger.info("trigger results")
logger.info(triggerResult.triggeredDocs.toString())

val index = (monitor.inputs[0] as DocLevelMonitorInput).indices[0]

queryToDocIds.forEach {
val queryTriggeredDocs = it.value.intersect(triggerResult.triggeredDocs)
if (queryTriggeredDocs.isNotEmpty()) {
val findingId = createFindings(monitor, index, it.key, queryTriggeredDocs, trigger)
// TODO: check if need to create alert, if so create it and point it to FindingId
// TODO: run action as well, but this mat need to be throttled based on Mo's comment for bucket level alerting
Comment on lines +816 to +817
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think its better to cover ToDo in issues (linked under one Meta) and tag them here for more context? Will it ensure we close them out before the release?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Created a github issue and will create one meta github issue later which this will be included in

}
}
}

private fun createFindings(
monitor: Monitor,
index: String,
docLevelQuery: DocLevelQuery,
matchingDocIds: Set<String>,
trigger: DocumentLevelTrigger
): String {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = matchingDocIds.joinToString(","),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = trigger.id,
triggerName = trigger.name
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

client.index(indexRequest).actionGet()
return finding.id
}

private fun runForEachQuery(
docExecutionCtx: DocumentExecutionContext,
query: DocLevelQuery,
index: String
): Set<String> {
val count: Int = docExecutionCtx.lastRunContext["shards_count"] as Int
val matchingDocs = mutableSetOf<String>()
for (i: Int in 0 until count) {
val shard = i.toString()
try {
logger.info("Monitor execution for shard: $shard")

val maxSeqNo: Long = getMaxSeqNo(index, shard)
val maxSeqNo: Long = docExecutionCtx.updatedLastRunContext[shard].toString().toLong()
logger.info("MaxSeqNo of shard_$shard is $maxSeqNo")

// todo: scope to optimize this: in prev seqno and current max seq no are same don't search.
val hits: SearchHits = searchShard(index, shard, lastRunContext[shard].toString().toLongOrNull(), maxSeqNo, query.query)
val hits: SearchHits = searchShard(
index,
shard,
docExecutionCtx.lastRunContext[shard].toString().toLongOrNull(),
maxSeqNo,
query.query
)
logger.info("Search hits for shard_$shard is: ${hits.hits.size}")

if (hits.hits.isNotEmpty()) {
createFindings(monitor, index, query, hits)
logger.info("found matches")
matchingDocs.addAll(getAllDocIds(hits))
}

logger.info("Updating monitor: ${monitor.id}")
lastRunContext[shard] = maxSeqNo.toString()
val updatedMonitor = monitor.copy(lastRunContext = lastRunContext)
// note: update has to called in serial for shards of a given index.
updateMonitor(client, xContentRegistry, settings, updatedMonitor)
} catch (e: Exception) {
logger.info("Failed to run for shard $shard. Error: ${e.message}")
logger.debug("Failed to run for shard $shard", e)
}
}
return matchingDocs
}

// todo: add more validations.
Expand All @@ -810,7 +907,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {

private fun getShardsCount(index: String): Int {
val allShards: List<ShardRouting> = clusterService.state().routingTable().allShards(index)
return allShards.size
return allShards.filter { it.primary() }.size
}

private fun createRunContext(index: String): HashMap<String, Any> {
Expand Down Expand Up @@ -854,6 +951,9 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
}

private fun searchShard(index: String, shard: String, prevSeqNo: Long?, maxSeqNo: Long, query: String): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true) {
return SearchHits.empty()
}
val boolQueryBuilder = BoolQueryBuilder()
boolQueryBuilder.filter(QueryBuilders.rangeQuery("_seq_no").gt(prevSeqNo).lte(maxSeqNo))
boolQueryBuilder.must(QueryBuilders.queryStringQuery(query))
Expand All @@ -875,39 +975,7 @@ object MonitorRunner : JobRunner, CoroutineScope, AbstractLifecycleComponent() {
return response.hits
}

private fun createFindings(monitor: Monitor, index: String, docLevelQuery: DocLevelQuery, hits: SearchHits) {
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocId = getAllDocIds(hits),
monitorId = monitor.id,
monitorName = monitor.name,
index = index,
queryId = docLevelQuery.id,
queryTags = docLevelQuery.tags,
severity = docLevelQuery.severity,
timestamp = Instant.now(),
triggerId = null, // todo: add once integrated with actions/triggers
triggerName = null // todo: add once integrated with actions/triggers
)

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
// change this to debug.
logger.info("Findings: $findingStr")

// todo: below is all hardcoded, temp code and added only to test. replace this with proper Findings index lifecycle management.
val indexRequest = IndexRequest(".opensearch-alerting-findings")
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)

client.index(indexRequest).actionGet()
}

private fun getAllDocIds(hits: SearchHits): String {
var sb = StringBuilder()
for (hit in hits) {
sb.append(hit.id)
sb.append(",")
}
return sb.substring(0, sb.length - 1)
private fun getAllDocIds(hits: SearchHits): List<String> {
return hits.map { hit -> hit.id }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ import org.opensearch.alerting.model.AggregationResultBucket
import org.opensearch.alerting.model.Alert
import org.opensearch.alerting.model.BucketLevelTrigger
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.DocumentLevelTrigger
import org.opensearch.alerting.model.DocumentLevelTriggerRunResult
import org.opensearch.alerting.model.Monitor
import org.opensearch.alerting.model.QueryLevelTrigger
import org.opensearch.alerting.model.QueryLevelTriggerRunResult
import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.script.DocumentLevelTriggerExecutionContext
import org.opensearch.alerting.script.QueryLevelTriggerExecutionContext
import org.opensearch.alerting.script.TriggerScript
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.script.ScriptService
import org.opensearch.search.aggregations.Aggregation
import org.opensearch.search.aggregations.Aggregations
import org.opensearch.search.aggregations.support.AggregationPath
import java.lang.IllegalArgumentException

/** Service that handles executing Triggers */
class TriggerService(val scriptService: ScriptService) {
Expand Down Expand Up @@ -53,6 +55,37 @@ class TriggerService(val scriptService: ScriptService) {
}
}

// TODO: improve performance and support match all and match any
fun runDocLevelTrigger(
monitor: Monitor,
trigger: DocumentLevelTrigger,
ctx: DocumentLevelTriggerExecutionContext,
docsToQueries: Map<String, List<String>>,
queryIds: List<String>
): DocumentLevelTriggerRunResult {
return try {
val triggeredDocs = mutableListOf<String>()

for (doc in docsToQueries.keys) {
val params = trigger.condition.params.toMutableMap()
for (queryId in queryIds) {
params[queryId] = docsToQueries[doc]!!.contains(queryId)
}
val triggered = scriptService.compile(trigger.condition, TriggerScript.CONTEXT)
.newInstance(params)
.execute(ctx)
logger.info("trigger val: $triggered")
if (triggered) triggeredDocs.add(doc)
}

DocumentLevelTriggerRunResult(trigger.name, triggeredDocs, null)
} catch (e: Exception) {
Comment on lines +66 to +82
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of Document -> Query, can we do Query -> Document as number of documents will be fairly large compared to number of rules. Also it will simplify match all cases, since it will boil down to the problem of common ids across multiple lists. thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still in discussion and we will have a path forward in a future PR.

logger.info("Error running script for monitor ${monitor.id}, trigger: ${trigger.id}", e)
// if the script fails we need to send an alert so set triggered = true
DocumentLevelTriggerRunResult(trigger.name, emptyList(), e)
}
}

@Suppress("UNCHECKED_CAST")
fun runBucketLevelTrigger(
monitor: Monitor,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package org.opensearch.alerting.model

import org.opensearch.alerting.model.docLevelInput.DocLevelQuery

data class DocumentExecutionContext(
val queries: List<DocLevelQuery>,
val lastRunContext: Map<String, Any>,
val updatedLastRunContext: Map<String, Any>
)
Loading