Skip to content

Commit

Permalink
doc-level monitor fan-out approach
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Apr 1, 2024
1 parent 0588456 commit 9250a7d
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.Version
import org.opensearch.action.ActionListenerResponseHandler
import org.opensearch.action.get.MultiGetItemResponse
import org.opensearch.action.support.GroupedActionListener
Expand Down Expand Up @@ -34,6 +35,9 @@ import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.IndexNotFoundException
import org.opensearch.index.seqno.SequenceNumbers
import org.opensearch.node.NodeClosedException
import org.opensearch.transport.ConnectTransportException
import org.opensearch.transport.RemoteTransportException
import org.opensearch.transport.TransportException
import org.opensearch.transport.TransportRequestOptions
import org.opensearch.transport.TransportService
Expand Down Expand Up @@ -241,7 +245,7 @@ class DocumentLevelMonitorRunner : MonitorRunner() {

override fun onFailure(e: Exception) {
logger.info("Fan out failed", e)
if (e.cause is Exception) // unwrap remote transport exception
if (e.cause is Exception)
cont.resumeWithException(e.cause as Exception)
else
cont.resumeWithException(e)
Expand Down Expand Up @@ -275,25 +279,33 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
responseReader
) {
override fun handleException(e: TransportException) {
// retry in local node
transportService.sendRequest(
monitorCtx.clusterService!!.localNode(),
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(
listener,
responseReader
) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
val cause = e.unwrapCause()
if (cause is ConnectTransportException ||
(e is RemoteTransportException && cause is NodeClosedException)
) {
// retry in local node
transportService.sendRequest(
monitorCtx.clusterService!!.localNode(),
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object :
ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(
listener,
responseReader
) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
}
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
}
}
)
)
} else {
listener.onFailure(e)
}
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
Expand Down Expand Up @@ -480,8 +492,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
return allShards.filter { it.primary() }.size
}

private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): MutableMap<String, DiscoveryNode> {
return monitorCtx.clusterService!!.state().nodes.dataNodes
private fun getNodes(monitorCtx: MonitorRunnerExecutionContext): Map<String, DiscoveryNode> {
return monitorCtx.clusterService!!.state().nodes.dataNodes.filter { it.value.version >= Version.CURRENT }
}

private fun distributeShards(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ class TransportDocLevelMonitorFanOutAction
)
)
} catch (e: Exception) {
log.error("${request.monitor.id} Failed to run fan_out on node ${clusterService.localNode().id} due to error")
log.error("${request.monitor.id} Failed to run fan_out on node ${clusterService.localNode().id} due to error $e")
listener.onFailure(AlertingException.wrap(e))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,17 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return response
}

public fun indexDoc(client: RestClient, index: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("POST", "$index/_doc?op_type=create", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
)
return response
}

protected fun deleteDoc(index: String, id: String, refresh: Boolean = true): Response {
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client().makeRequest("DELETE", "$index/_doc/$id", params)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2748,10 +2748,4 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {

deleteDataStream(aliasName)
}

@Suppress("UNCHECKED_CAST")
/** helper that returns a field in a json map whose values are all json objects */
private fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
return this[key] as Map<String, Map<String, Any>>
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -802,3 +802,9 @@ fun randomAlertContext(
sampleDocs = sampleDocs
)
}

@Suppress("UNCHECKED_CAST")
/** helper that returns a field in a json map whose values are all json objects */
fun Map<String, Any>.objectMap(key: String): Map<String, Map<String, Any>> {
return this[key] as Map<String, Map<String, Any>>
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,27 @@ package org.opensearch.alerting.bwc
import org.apache.hc.core5.http.ContentType.APPLICATION_JSON
import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.alerting.ALERTING_BASE_URI
import org.opensearch.alerting.ALWAYS_RUN
import org.opensearch.alerting.AlertingRestTestCase
import org.opensearch.alerting.makeRequest
import org.opensearch.alerting.objectMap
import org.opensearch.alerting.randomDocumentLevelMonitor
import org.opensearch.alerting.randomDocumentLevelTrigger
import org.opensearch.client.Node
import org.opensearch.client.Request
import org.opensearch.common.settings.Settings
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.core.rest.RestStatus
import org.opensearch.index.query.QueryBuilders
import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.concurrent.TimeUnit

class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
Expand Down Expand Up @@ -92,6 +104,32 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
}
}

@Throws(Exception::class)
@Suppress("UNCHECKED_CAST")
fun `test backwards compatibility for doc-level monitors`() {
val uri = getPluginUri()
val responseMap = getAsMap(uri)["nodes"] as Map<String, Map<String, Any>>
for (response in responseMap.values) {
val plugins = response["plugins"] as List<Map<String, Any>>
val pluginNames = plugins.map { plugin -> plugin["name"] }.toSet()
when (CLUSTER_TYPE) {
ClusterType.OLD -> {
assertTrue(pluginNames.contains("opensearch-alerting"))
createDocLevelMonitor()
}
ClusterType.MIXED -> {
assertTrue(pluginNames.contains("opensearch-alerting"))
verifyMonitorExecutionSuccess()
}
ClusterType.UPGRADED -> {
assertTrue(pluginNames.contains("opensearch-alerting"))
verifyMonitorExecutionSuccess()
}
}
break
}
}

private enum class ClusterType {
OLD,
MIXED,
Expand Down Expand Up @@ -179,6 +217,25 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
assertTrue("Create monitor response has incorrect version", createdVersion > 0)
}

private fun createDocLevelMonitor(): Pair<Monitor, String> {
val testIndex = createTestIndex(index = "test-index", settings = Settings.builder().put("number_of_shards", "7").build())
val docQuery = DocLevelQuery(id = "4", query = "test_field:\"us-west-2\"", fields = listOf(), name = "4")
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "test-monitor",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)
return Pair(monitor, testIndex)
}

@Throws(Exception::class)
@Suppress("UNCHECKED_CAST")
private fun verifyMonitorExists(uri: String) {
Expand All @@ -193,7 +250,7 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
val xcp = createParser(XContentType.JSON.xContent(), searchResponse.entity.content)
val hits = xcp.map()["hits"]!! as Map<String, Map<String, Any>>
val numberDocsFound = hits["total"]?.get("value")
assertEquals("Unexpected number of Monitors returned", 1, numberDocsFound)
assertEquals("Unexpected number of Monitors returned", 2, numberDocsFound)
}

@Throws(Exception::class)
Expand All @@ -219,4 +276,56 @@ class AlertingBackwardsCompatibilityIT : AlertingRestTestCase() {
assertEquals("Some nodes in stats response failed", totalNodes, successfulNodes)
assertEquals("Not all nodes are on schedule", totalNodes, nodesOnSchedule)
}

@Suppress("UNCHECKED_CAST")
private fun verifyMonitorExecutionSuccess() {
val nodes = mutableListOf<Node>()
val nodesResponse = client().performRequest(Request("GET", "/_nodes")).asMap()
(nodesResponse["nodes"] as Map<String, Any>).forEach { nodeId, nodeInfo ->
val host = ((nodeInfo as Map<String, Any>)["http"] as Map<String, Any>)["publish_address"]
val version = nodeInfo["version"].toString()
if (nodes.isEmpty() && version >= "3.0.0") {
logger.info("use node-$nodeId")
client().nodes.forEach {
if (it.host.toHostString().contains(host.toString())) {
nodes.add(it)
}
}
}
}
val client = client()
client.setNodes(nodes)
val searchResponse = searchMonitors()
val monitorId = searchResponse.hits.hits.filter { it.sourceAsMap["name"] == "test-monitor" }.first().id
val testIndex = "test-index"

val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

indexDoc(client(), testIndex, testDoc)

var passed = false
OpenSearchTestCase.waitUntil({
try {
val response = executeMonitor(client, monitorId)
val output = entityAsMap(response)
assertEquals("test-monitor", output["monitor_name"])
@Suppress("UNCHECKED_CAST")
val searchResult = (output.objectMap("input_results")["results"] as List<Map<String, Any>>).first()
@Suppress("UNCHECKED_CAST")
val matchingDocsToQuery = searchResult["4"] as List<String>
passed = matchingDocsToQuery.isNotEmpty()
return@waitUntil true
} catch (e: AssertionError) {
return@waitUntil false
}
}, 1, TimeUnit.MINUTES)
if (!passed) {
assertTrue(passed)
}
}
}

0 comments on commit 9250a7d

Please sign in to comment.