From 3ee0f10c0fe63e216695fb834bd0e537fff29917 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Tue, 26 Mar 2024 02:40:48 +0000 Subject: [PATCH] doc-level monitor fan-out approach Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 69 +++++++++---------- .../alerting/MonitorRunnerService.kt | 16 ++++- .../model/DocumentLevelTriggerRunResult.kt | 34 ++++++++- .../TransportDocLevelMonitorFanOutAction.kt | 35 ++++++---- .../TransportExecuteWorkflowAction.kt | 8 ++- .../workflow/CompositeWorkflowRunner.kt | 11 ++- .../alerting/DocumentMonitorRunnerIT.kt | 4 +- 7 files changed, 119 insertions(+), 58 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 9b9e8106f..154a6a9e7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -28,7 +28,6 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput import org.opensearch.commons.alerting.model.DocLevelQuery import org.opensearch.commons.alerting.model.Monitor import org.opensearch.core.action.ActionListener -import org.opensearch.core.common.bytes.BytesReference import org.opensearch.core.common.io.stream.Writeable import org.opensearch.core.index.shard.ShardId import org.opensearch.core.rest.RestStatus @@ -249,38 +248,43 @@ class DocumentLevelMonitorRunner : MonitorRunner() { cont.resumeWithException(e) } }, - nodeMap.size + nodeShardAssignments.size ) val responseReader = Writeable.Reader { DocLevelMonitorFanOutResponse(it) } for (node in nodeMap) { - val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( - monitor, - dryrun, - monitorMetadata, - executionId, - indexExecutionContext, - nodeShardAssignments[node.key]!!.toList(), - concreteIndicesSeenSoFar, - workflowRunContext - ) - - transportService.sendRequest( - node.value, - DocLevelMonitorFanOutAction.NAME, - docLevelMonitorFanOutRequest, - TransportRequestOptions.EMPTY, - object : ActionListenerResponseHandler(listener, responseReader) { - override fun handleException(e: TransportException) { - listener.onFailure(e) - } - - override fun handleResponse(response: DocLevelMonitorFanOutResponse) { - listener.onResponse(response) + if (nodeShardAssignments.containsKey(node.key)) { + val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest( + monitor, + dryrun, + monitorMetadata, + executionId, + indexExecutionContext, + nodeShardAssignments[node.key]!!.toList(), + concreteIndicesSeenSoFar, + workflowRunContext + ) + + transportService.sendRequest( + node.value, + DocLevelMonitorFanOutAction.NAME, + docLevelMonitorFanOutRequest, + TransportRequestOptions.EMPTY, + object : ActionListenerResponseHandler( + listener, + responseReader + ) { + override fun handleException(e: TransportException) { + listener.onFailure(e) + } + + override fun handleResponse(response: DocLevelMonitorFanOutResponse) { + listener.onResponse(response) + } } - } - ) + ) + } } } docLevelMonitorFanOutResponses.addAll(responses) @@ -475,15 +479,4 @@ class DocumentLevelMonitorRunner : MonitorRunner() { } return nodeShardAssignments } - - /** - * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name - * and doc source. A list of these POJOs would be passed to percolate query execution logic. - */ - data class TransformedDocDto( - var indexName: String, - var concreteIndexName: String, - var docId: String, - var docSource: BytesReference - ) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index 0a53108c6..4e7bf3796 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -384,11 +384,23 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon } } - suspend fun runJob(workflow: Workflow, periodStart: Instant, periodEnd: Instant, dryrun: Boolean, transportService: TransportService?): WorkflowRunResult { + suspend fun runJob( + workflow: Workflow, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean, + transportService: TransportService? + ): WorkflowRunResult { return CompositeWorkflowRunner.runWorkflow(workflow, monitorCtx, periodStart, periodEnd, dryrun, transportService) } - suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean, transportService: TransportService?): MonitorRunResult<*> { + suspend fun runJob( + job: ScheduledJob, + periodStart: Instant, + periodEnd: Instant, + dryrun: Boolean, + transportService: TransportService? + ): MonitorRunResult<*> { // Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping // has not been updated. if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) { diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt index 9d98aab42..a89d89da7 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/model/DocumentLevelTriggerRunResult.kt @@ -25,7 +25,7 @@ data class DocumentLevelTriggerRunResult( triggerName = sin.readString(), error = sin.readException(), triggeredDocs = sin.readStringList(), - actionResultsMap = sin.readMap() as MutableMap> + actionResultsMap = readActionResults(sin) ) override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { @@ -39,7 +39,15 @@ data class DocumentLevelTriggerRunResult( override fun writeTo(out: StreamOutput) { super.writeTo(out) out.writeStringCollection(triggeredDocs) - out.writeMap(actionResultsMap as Map) + out.writeInt(actionResultsMap.size) + actionResultsMap.forEach { (alert, actionResults) -> + out.writeString(alert) + out.writeInt(actionResults.size) + actionResults.forEach { (id, result) -> + out.writeString(id) + result.writeTo(out) + } + } } companion object { @@ -48,5 +56,27 @@ data class DocumentLevelTriggerRunResult( fun readFrom(sin: StreamInput): TriggerRunResult { return DocumentLevelTriggerRunResult(sin) } + + @JvmStatic + fun readActionResults(sin: StreamInput): MutableMap> { + val actionResultsMapReconstruct: MutableMap> = mutableMapOf() + val size = sin.readInt() + var idx = 0 + while (idx < size) { + val alert = sin.readString() + val actionResultsSize = sin.readInt() + val actionRunResultElem = mutableMapOf() + var i = 0 + while (i < actionResultsSize) { + val actionId = sin.readString() + val actionResult = ActionRunResult.readFrom(sin) + actionRunResultElem[actionId] = actionResult + ++i + } + actionResultsMapReconstruct[alert] = actionRunResultElem + ++idx + } + return actionResultsMapReconstruct + } } } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index 06eb3807f..71598a7a5 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -228,6 +228,7 @@ class TransportDocLevelMonitorFanOutAction val inputRunResults = mutableMapOf>() val docsToQueries = mutableMapOf>() val indexName = shardIds.first().indexName + val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf() else monitorMetadata.lastRunContext.toMutableMap() as MutableMap> @@ -316,17 +317,20 @@ class TransportDocLevelMonitorFanOutAction } } - // If any error happened during trigger execution, upsert monitor error alert - val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) - if (errorMessage.isNotEmpty()) { - alertService.upsertMonitorErrorAlert( - monitor = monitor, - errorMessage = errorMessage, - executionId = executionId, - workflowRunContext - ) - } else { - onSuccessfulMonitorRun(monitor) + if (!isTempMonitor) { + // If any error happened during trigger execution, upsert monitor error alert + val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults) + log.info(errorMessage) + if (errorMessage.isNotEmpty()) { + alertService.upsertMonitorErrorAlert( + monitor = monitor, + errorMessage = errorMessage, + executionId = executionId, + workflowRunContext + ) + } else { + onSuccessfulMonitorRun(monitor) + } } listener.onResponse( @@ -605,7 +609,14 @@ class TransportDocLevelMonitorFanOutAction } } } - ActionRunResult(action.id, action.name, actionOutput, false, Instant.ofEpochMilli(client.threadPool().absoluteTimeInMillis()), null) + ActionRunResult( + action.id, + action.name, + actionOutput, + false, + Instant.ofEpochMilli(client.threadPool().absoluteTimeInMillis()), + null + ) } catch (e: Exception) { ActionRunResult(action.id, action.name, mapOf(), false, Instant.ofEpochMilli(client.threadPool().absoluteTimeInMillis()), e) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt index 2045f0696..0d9fe527a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportExecuteWorkflowAction.kt @@ -73,7 +73,13 @@ class TransportExecuteWorkflowAction @Inject constructor( "dryrun: ${execWorkflowRequest.dryrun}" ) val workflowRunResult = - MonitorRunnerService.runJob(workflow, periodStart, periodEnd, execWorkflowRequest.dryrun, transportService = transportService) + MonitorRunnerService.runJob( + workflow, + periodStart, + periodEnd, + execWorkflowRequest.dryrun, + transportService = transportService + ) withContext(Dispatchers.IO, { actionListener.onResponse( ExecuteWorkflowResponse( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index 9288cdd69..f8cd2fefd 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -137,7 +137,16 @@ object CompositeWorkflowRunner : WorkflowRunner() { try { dataSources = delegateMonitor.dataSources val delegateRunResult = - runDelegateMonitor(delegateMonitor, monitorCtx, periodStart, periodEnd, dryRun, workflowRunContext, executionId, transportService) + runDelegateMonitor( + delegateMonitor, + monitorCtx, + periodStart, + periodEnd, + dryRun, + workflowRunContext, + executionId, + transportService + ) resultList.add(delegateRunResult!!) } catch (ex: Exception) { logger.error("Error executing workflow delegate monitor ${delegate.monitorId}", ex) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index f0f897434..ae974b9b0 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2732,7 +2732,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(aliasName, "6", testDoc) indexDoc(aliasName, "7", testDoc) OpenSearchTestCase.waitUntil( - { searchFindings(monitor).size == 1 }, 2, TimeUnit.MINUTES + { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES ) rolloverDatastream(aliasName) @@ -2743,7 +2743,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { indexDoc(aliasName, "16", testDoc) indexDoc(aliasName, "17", testDoc) OpenSearchTestCase.waitUntil( - { searchFindings(monitor).size == 2 }, 2, TimeUnit.MINUTES + { searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES ) deleteDataStream(aliasName)