Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk index findings and sequentially invoke auto-correlations #1355

Merged
merged 6 commits into from
Feb 6, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ package org.opensearch.alerting
import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.OpenSearchStatusException
import org.opensearch.action.DocWriteRequest
import org.opensearch.action.bulk.BulkRequest
import org.opensearch.action.bulk.BulkResponse
import org.opensearch.action.index.IndexRequest
import org.opensearch.action.index.IndexResponse
import org.opensearch.action.search.SearchAction
import org.opensearch.action.search.SearchRequest
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -273,10 +275,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// If there are no triggers defined, we still want to generate findings
if (monitor.triggers.isEmpty()) {
if (dryrun == false && monitor.id != Monitor.NO_ID) {
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
createFindings(monitor, monitorCtx, triggeredQueries, it.key, true)
}
createFindings(monitor, monitorCtx, docsToQueries, idQueryMap, true)
}
} else {
monitor.triggers.forEach {
Expand Down Expand Up @@ -365,7 +364,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
trigger: DocumentLevelTrigger,
monitor: Monitor,
idQueryMap: Map<String, DocLevelQuery>,
docsToQueries: Map<String, List<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
queryToDocIds: Map<DocLevelQuery, Set<String>>,
dryrun: Boolean,
workflowRunContext: WorkflowRunContext?,
Expand All @@ -374,35 +373,34 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val triggerCtx = DocumentLevelTriggerExecutionContext(monitor, trigger)
val triggerResult = monitorCtx.triggerService!!.runDocLevelTrigger(monitor, trigger, queryToDocIds)

val findings = mutableListOf<String>()
val findingDocPairs = mutableListOf<Pair<String, String>>()
val triggerFindingDocPairs = mutableListOf<Pair<String, String>>()

// TODO: Implement throttling for findings
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }
val findingId = createFindings(
monitor,
monitorCtx,
triggeredQueries,
it.key,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)
findings.add(findingId)
val findingToDocPairs = createFindings(
monitor,
monitorCtx,
docsToQueries,
idQueryMap,
!dryrun && monitor.id != Monitor.NO_ID,
executionId
)

if (triggerResult.triggeredDocs.contains(it.key)) {
findingDocPairs.add(Pair(findingId, it.key))
findingToDocPairs.forEach {
// Only pick those entries whose docs have triggers associated with them
if (triggerResult.triggeredDocs.contains(it.second)) {
triggerFindingDocPairs.add(Pair(it.first, it.second))
}
}

val actionCtx = triggerCtx.copy(
triggeredDocs = triggerResult.triggeredDocs,
relatedFindings = findings,
// confirm if this is right or only trigger-able findings should be present in this list
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: plz add TODO/FIXME and maintain pr in draft if it's not ready to merge

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the previous behaviour? why are we not just refactoring? is there a behaviour change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change is preserving the existing behavior. Left by mistake, removed the comment.

relatedFindings = findingToDocPairs.map { it.first },
error = monitorResult.error ?: triggerResult.error
)

val alerts = mutableListOf<Alert>()
findingDocPairs.forEach {
triggerFindingDocPairs.forEach {
val alert = monitorCtx.alertService!!.composeDocLevelAlert(
listOf(it.first),
listOf(it.second),
Expand Down Expand Up @@ -461,51 +459,82 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
return triggerResult
}

/**
* 1. Bulk index all findings based on shouldCreateFinding flag
* 2. invoke publishFinding() to kickstart auto-correlations
* 3. Returns a list of pairs for finding id to doc id
*/
private suspend fun createFindings(
monitor: Monitor,
monitorCtx: MonitorRunnerExecutionContext,
docLevelQueries: List<DocLevelQuery>,
matchingDocId: String,
docsToQueries: MutableMap<String, MutableList<String>>,
idQueryMap: Map<String, DocLevelQuery>,
shouldCreateFinding: Boolean,
workflowExecutionId: String? = null,
): String {
// Before the "|" is the doc id and after the "|" is the index
val docIndex = matchingDocId.split("|")
): List<Pair<String, String>> {

val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = docLevelQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
val findingDocPairs = mutableListOf<Pair<String, String>>()
val findings = mutableListOf<Finding>()
val indexRequests = mutableListOf<IndexRequest>()

val findingStr = finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS).string()
logger.debug("Findings: $findingStr")
docsToQueries.forEach {
val triggeredQueries = it.value.map { queryId -> idQueryMap[queryId]!! }

// Before the "|" is the doc id and after the "|" is the index
val docIndex = it.key.split("|")

if (shouldCreateFinding) {
val indexRequest = IndexRequest(monitor.dataSources.findingsIndex)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
val finding = Finding(
id = UUID.randomUUID().toString(),
relatedDocIds = listOf(docIndex[0]),
correlatedDocIds = listOf(docIndex[0]),
monitorId = monitor.id,
monitorName = monitor.name,
index = docIndex[1],
docLevelQueries = triggeredQueries,
timestamp = Instant.now(),
executionId = workflowExecutionId
)
findingDocPairs.add(Pair(finding.id, it.key))
findings.add(finding)

val findingStr =
finding.toXContent(XContentBuilder.builder(XContentType.JSON.xContent()), ToXContent.EMPTY_PARAMS)
.string()
logger.debug("Findings: $findingStr")

if (shouldCreateFinding) {
indexRequests += IndexRequest(monitor.dataSources.findingsIndex)
.source(findingStr, XContentType.JSON)
.id(finding.id)
.routing(finding.id)
goyamegh marked this conversation as resolved.
Show resolved Hide resolved
.opType(DocWriteRequest.OpType.INDEX)
goyamegh marked this conversation as resolved.
Show resolved Hide resolved
}
}

monitorCtx.client!!.suspendUntil<Client, IndexResponse> {
monitorCtx.client!!.index(indexRequest, it)
if (indexRequests.isNotEmpty()) {
val bulkResponse: BulkResponse = monitorCtx.client!!.suspendUntil {
goyamegh marked this conversation as resolved.
Show resolved Hide resolved
bulk(BulkRequest().add(indexRequests).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE), it)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont refresh for every batch. refresh findings index only once after all batches are bulk-ingested

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
if (bulkResponse.hasFailures()) {
bulkResponse.items.forEach { item ->
if (item.isFailed) {
goyamegh marked this conversation as resolved.
Show resolved Hide resolved
logger.debug("Failed indexing the finding ${item.id} of monitor [${monitor.id}]")
}
}
} else {
logger.debug("[${bulkResponse.items.size}] All findings successfully indexed.")
}
}

try {
publishFinding(monitor, monitorCtx, finding)
findings.forEach { finding ->
publishFinding(monitor, monitorCtx, finding)
}
} catch (e: Exception) {
// suppress exception
logger.error("Optional finding callback failed", e)
}
return finding.id
return findingDocPairs
}

private fun publishFinding(
Expand Down
Loading