diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index 98b438065..3d7deab51 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -7,6 +7,7 @@ package org.opensearch.alerting import org.apache.logging.log4j.LogManager import org.opensearch.ExceptionsHelper +import org.opensearch.Version import org.opensearch.action.ActionListenerResponseHandler import org.opensearch.action.get.MultiGetItemResponse import org.opensearch.action.support.GroupedActionListener @@ -34,6 +35,9 @@ import org.opensearch.core.index.shard.ShardId import org.opensearch.core.rest.RestStatus import org.opensearch.index.IndexNotFoundException import org.opensearch.index.seqno.SequenceNumbers +import org.opensearch.node.NodeClosedException +import org.opensearch.transport.ConnectTransportException +import org.opensearch.transport.RemoteTransportException import org.opensearch.transport.TransportException import org.opensearch.transport.TransportRequestOptions import org.opensearch.transport.TransportService @@ -241,7 +245,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() { override fun onFailure(e: Exception) { logger.info("Fan out failed", e) - if (e.cause is Exception) // unwrap remote transport exception + if (e.cause is Exception) cont.resumeWithException(e.cause as Exception) else cont.resumeWithException(e) @@ -275,25 +279,33 @@ class DocumentLevelMonitorRunner : MonitorRunner() { responseReader ) { override fun handleException(e: TransportException) { - // retry in local node - transportService.sendRequest( - monitorCtx.clusterService!!.localNode(), - DocLevelMonitorFanOutAction.NAME, - docLevelMonitorFanOutRequest, - TransportRequestOptions.EMPTY, - object : ActionListenerResponseHandler( - listener, - responseReader - ) { - override fun handleException(e: TransportException) { - listener.onFailure(e) + val cause = e.unwrapCause() + if (cause is ConnectTransportException || + (e is RemoteTransportException && cause is NodeClosedException) + ) { + // retry in local node + transportService.sendRequest( + monitorCtx.clusterService!!.localNode(), + DocLevelMonitorFanOutAction.NAME, + docLevelMonitorFanOutRequest, + TransportRequestOptions.EMPTY, + object : + ActionListenerResponseHandler( + listener, + responseReader + ) { + override fun handleException(e: TransportException) { + listener.onFailure(e) + } + + override fun handleResponse(response: DocLevelMonitorFanOutResponse) { + listener.onResponse(response) + } } - - override fun handleResponse(response: DocLevelMonitorFanOutResponse) { - listener.onResponse(response) - } - } - ) + ) + } else { + listener.onFailure(e) + } } override fun handleResponse(response: DocLevelMonitorFanOutResponse) { @@ -480,8 +492,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() { return allShards.filter { it.primary() }.size } - private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): MutableMap { - return monitorCtx.clusterService!!.state().nodes.dataNodes + private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map { + return monitorCtx.clusterService!!.state().nodes.dataNodes.filter { it.value.version >= Version.CURRENT } } private fun distributeShards( diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt index dae928261..83bdfc6e3 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportDocLevelMonitorFanOutAction.kt @@ -342,7 +342,7 @@ class TransportDocLevelMonitorFanOutAction ) ) } catch (e: Exception) { - log.error("${request.monitor.id} Failed to run fan_out on node ${clusterService.localNode().id} due to error") + log.error("${request.monitor.id} Failed to run fan_out on node ${clusterService.localNode().id} due to error $e") listener.onFailure(AlertingException.wrap(e)) } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt index 8d28ba47d..5a976779a 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/AlertingRestTestCase.kt @@ -903,6 +903,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() { return response } + public fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response { + val requestBody = StringEntity(doc, APPLICATION_JSON) + val params = if (refresh) mapOf("refresh" to "true") else mapOf() + val response = client.makeRequest("POST", "$index/_doc?op_type=create", params, requestBody) + assertTrue( + "Unable to index doc: '${doc.take(15)}...' to index: '$index'", + listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus()) + ) + return response + } + protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response { val params = if (refresh) mapOf("refresh" to "true") else mapOf() val response = client().makeRequest("DELETE", "$index/_doc/$id", params) diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt index ae974b9b0..2bf20008e 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/DocumentMonitorRunnerIT.kt @@ -2748,10 +2748,4 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() { deleteDataStream(aliasName) } - - @Suppress("UNCHECKED_CAST") - /** helper that returns a field in a json map whose values are all json objects */ - private fun Map.objectMap(key: String): Map> { - return this[key] as Map> - } } diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt index e99c9635e..6a66579ef 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/TestHelpers.kt @@ -802,3 +802,9 @@ fun randomAlertContext( sampleDocs = sampleDocs ) } + +@Suppress("UNCHECKED_CAST") +/** helper that returns a field in a json map whose values are all json objects */ +fun Map.objectMap(key: String): Map> { + return this[key] as Map> +} diff --git a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt index 2c77fd480..280074964 100644 --- a/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt +++ b/alerting/src/test/kotlin/org/opensearch/alerting/bwc/AlertingBackwardsCompatibilityIT.kt @@ -8,15 +8,27 @@ package org.opensearch.alerting.bwc import org.apache.hc.core5.http.ContentType.APPLICATION_JSON import org.apache.hc.core5.http.io.entity.StringEntity import org.opensearch.alerting.ALERTING_BASE_URI +import org.opensearch.alerting.ALWAYS_RUN import org.opensearch.alerting.AlertingRestTestCase import org.opensearch.alerting.makeRequest +import org.opensearch.alerting.objectMap +import org.opensearch.alerting.randomDocumentLevelMonitor +import org.opensearch.alerting.randomDocumentLevelTrigger +import org.opensearch.client.Node +import org.opensearch.client.Request import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.XContentType +import org.opensearch.commons.alerting.model.DocLevelMonitorInput +import org.opensearch.commons.alerting.model.DocLevelQuery +import org.opensearch.commons.alerting.model.IntervalSchedule import org.opensearch.commons.alerting.model.Monitor import org.opensearch.core.rest.RestStatus import org.opensearch.index.query.QueryBuilders import org.opensearch.search.builder.SearchSourceBuilder import org.opensearch.test.OpenSearchTestCase +import java.time.ZonedDateTime +import java.time.format.DateTimeFormatter +import java.time.temporal.ChronoUnit import java.util.concurrent.TimeUnit class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { @@ -92,6 +104,32 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { } } + @Throws(Exception::class) + @Suppress("UNCHECKED_CAST") + fun `test backwards compatibility for doc-level monitors`() { + val uri = getPluginUri() + val responseMap = getAsMap(uri)["nodes"] as Map> + for (response in responseMap.values) { + val plugins = response["plugins"] as List> + val pluginNames = plugins.map { plugin -> plugin["name"] }.toSet() + when (CLUSTER_TYPE) { + ClusterType.OLD -> { + assertTrue(pluginNames.contains("opensearch-alerting")) + createDocLevelMonitor() + } + ClusterType.MIXED -> { + assertTrue(pluginNames.contains("opensearch-alerting")) + verifyMonitorExecutionSuccess() + } + ClusterType.UPGRADED -> { + assertTrue(pluginNames.contains("opensearch-alerting")) + verifyMonitorExecutionSuccess() + } + } + break + } + } + private enum class ClusterType { OLD, MIXED, @@ -179,6 +217,25 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { assertTrue("Create monitor response has incorrect version", createdVersion > 0) } + private fun createDocLevelMonitor(): Pair { + val testIndex = createTestIndex(index = "test-index", settings = Settings.builder().put("number_of_shards", "7").build()) + val docQuery = DocLevelQuery(id = "4", query = "test_field:\"us-west-2\"", fields = listOf(), name = "4") + val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery)) + + val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN) + val monitor = createMonitor( + randomDocumentLevelMonitor( + name = "test-monitor", + inputs = listOf(docLevelInput), + triggers = listOf(trigger), + enabled = true, + schedule = IntervalSchedule(1, ChronoUnit.MINUTES) + ) + ) + assertNotNull(monitor.id) + return Pair(monitor, testIndex) + } + @Throws(Exception::class) @Suppress("UNCHECKED_CAST") private fun verifyMonitorExists(uri: String) { @@ -193,7 +250,7 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content) val hits = xcp.map()["hits"]!! as Map> val numberDocsFound = hits["total"]?.get("value") - assertEquals("Unexpected number of Monitors returned", 1, numberDocsFound) + assertEquals("Unexpected number of Monitors returned", 2, numberDocsFound) } @Throws(Exception::class) @@ -219,4 +276,56 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() { assertEquals("Some nodes in stats response failed", totalNodes, successfulNodes) assertEquals("Not all nodes are on schedule", totalNodes, nodesOnSchedule) } + + @Suppress("UNCHECKED_CAST") + private fun verifyMonitorExecutionSuccess() { + val nodes = mutableListOf() + val nodesResponse = client().performRequest(Request("GET", "/_nodes")).asMap() + (nodesResponse["nodes"] as Map).forEach { nodeId, nodeInfo -> + val host = ((nodeInfo as Map)["http"] as Map)["publish_address"] + val version = nodeInfo["version"].toString() + if (nodes.isEmpty() && version >= "3.0.0") { + logger.info("use node-$nodeId") + client().nodes.forEach { + if (it.host.toHostString().contains(host.toString())) { + nodes.add(it) + } + } + } + } + val client = client() + client.setNodes(nodes) + val searchResponse = searchMonitors() + val monitorId = searchResponse.hits.hits.filter { it.sourceAsMap["name"] == "test-monitor" }.first().id + val testIndex = "test-index" + + val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS)) + val testDoc = """{ + "message" : "This is an error from IAD region", + "test_strict_date_time" : "$testTime", + "test_field" : "us-west-2" + }""" + + indexDoc(client(), testIndex, testDoc) + + var passed = false + OpenSearchTestCase.waitUntil({ + try { + val response = executeMonitor(client, monitorId) + val output = entityAsMap(response) + assertEquals("test-monitor", output["monitor_name"]) + @Suppress("UNCHECKED_CAST") + val searchResult = (output.objectMap("input_results")["results"] as List>).first() + @Suppress("UNCHECKED_CAST") + val matchingDocsToQuery = searchResult["4"] as List + passed = matchingDocsToQuery.isNotEmpty() + return@waitUntil true + } catch (e: AssertionError) { + return@waitUntil false + } + }, 1, TimeUnit.MINUTES) + if (!passed) { + assertTrue(passed) + } + } }