From 0588456e866edf6861fb9ea96136c7f8c0802813 Mon Sep 17 00:00:00 2001 From: Subhobrata Dey Date: Thu, 28 Mar 2024 06:28:46 +0000 Subject: [PATCH] doc-level monitor fan-out approach Signed-off-by: Subhobrata Dey --- .../alerting/DocumentLevelMonitorRunner.kt | 38 +++++++++++++------ 1 file changed, 26 insertions(+), 12 deletions(-) diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 8e7fff0aa..98b438065 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -184,7 +184,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() { if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) ) { - if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) { + if (concreteIndexName == IndexUtils.getWriteIndex( + indexName, + monitorCtx.clusterService!!.state() + ) + ) { updatedLastRunContext.remove(lastWriteIndex) updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext } @@ -199,7 +203,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { // update lastRunContext if its a temp monitor as we only want to view the last bit of data then // TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data if (isTempMonitor) { - indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) + indexLastRunContext[shard] = + max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10) } } val indexExecutionContext = IndexExecutionContext( @@ -227,13 +232,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { concreteIndexName ) - nodeShardAssignments.forEach { - logger.info(it.key) - it.value.forEach { it1 -> - logger.info(it1.id.toString()) - } - } - val responses: Collection = suspendCoroutine { cont -> val listener = GroupedActionListener( object : ActionListener> { @@ -277,7 +275,25 @@ class DocumentLevelMonitorRunner : MonitorRunner() { responseReader ) { override fun handleException(e: TransportException) { - listener.onFailure(e) + // retry in local node + transportService.sendRequest( + monitorCtx.clusterService!!.localNode(), + DocLevelMonitorFanOutAction.NAME, + docLevelMonitorFanOutRequest, + TransportRequestOptions.EMPTY, + object : ActionListenerResponseHandler( + listener, + responseReader + ) { + override fun handleException(e: TransportException) { + listener.onFailure(e) + } + + override fun handleResponse(response: DocLevelMonitorFanOutResponse) { + listener.onResponse(response) + } + } + ) } override fun handleResponse(response: DocLevelMonitorFanOutResponse) { @@ -353,7 +369,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { val triggerResults = mutableMapOf() val triggerErrorMap = mutableMapOf>() for (res in docLevelMonitorFanOutResponses) { - logger.info("trigger results-${res.triggerResults}") for (triggerId in res.triggerResults.keys) { val documentLevelTriggerRunResult = res.triggerResults[triggerId] if (documentLevelTriggerRunResult != null) { @@ -395,7 +410,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() { triggerResults[triggerId]!!.error = AlertingException.merge(*errorList.toTypedArray()) } } - logger.info("final trigger results-$triggerResults") return triggerResults }