Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport 2.x] optimize doc-level monitor execution workflow for datastreams #1322

Merged
merged 1 commit into from
Dec 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {

try {
// Resolve all passed indices to concrete indices
val concreteIndices = IndexUtils.resolveAllIndices(
val allConcreteIndices = IndexUtils.resolveAllIndices(
docLevelMonitorInput.indices,
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (concreteIndices.isEmpty()) {
if (allConcreteIndices.isEmpty()) {
logger.error("indices not found-${docLevelMonitorInput.indices.joinToString(",")}")
throw IndexNotFoundException(docLevelMonitorInput.indices.joinToString(","))
}
Expand All @@ -141,7 +141,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// cleanup old indices that are not monitored anymore from the same monitor
val runContextKeys = updatedLastRunContext.keys.toMutableSet()
for (ind in runContextKeys) {
if (!concreteIndices.contains(ind)) {
if (!allConcreteIndices.contains(ind)) {
updatedLastRunContext.remove(ind)
}
}
Expand All @@ -150,11 +150,26 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

docLevelMonitorInput.indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
var lastWriteIndex: String? = null
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
lastWriteIndex = concreteIndices.find { lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val conflictingFields = monitorCtx.docLevelMonitorQueries!!.getAllConflictingFields(
monitorCtx.clusterService!!.state(),
Expand All @@ -179,7 +194,16 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx,
concreteIndexName
) as MutableMap<String, Any>
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
} else {
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}

val count: Int = indexLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -216,11 +217,19 @@ object MonitorMetadataService :
val lastRunContext = existingRunContext?.toMutableMap() ?: mutableMapOf()
try {
if (index == null) return mutableMapOf()
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)

val indices = mutableListOf<String>()
if (IndexUtils.isAlias(index, clusterService.state()) ||
IndexUtils.isDataStream(index, clusterService.state())
) {
IndexUtils.getWriteIndex(index, clusterService.state())?.let { indices.add(it) }
} else {
val getIndexRequest = GetIndexRequest().indices(index)
val getIndexResponse: GetIndexResponse = client.suspendUntil {
client.admin().indices().getIndex(getIndexRequest, it)
}
indices.addAll(getIndexResponse.indices())
}
val indices = getIndexResponse.indices()

indices.forEach { indexName ->
if (!lastRunContext.containsKey(indexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,11 +207,25 @@ class DocLevelMonitorQueries(private val client: Client, private val clusterServ

// Run through each backing index and apply appropriate mappings to query index
indices.forEach { indexName ->
val concreteIndices = IndexUtils.resolveAllIndices(
var concreteIndices = IndexUtils.resolveAllIndices(
listOf(indexName),
monitorCtx.clusterService!!,
monitorCtx.indexNameExpressionResolver!!
)
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
val lastWriteIndex = concreteIndices.find { monitorMetadata.lastRunContext.containsKey(it) }
if (lastWriteIndex != null) {
val lastWriteIndexCreationDate =
IndexUtils.getCreationDateForIndex(lastWriteIndex, monitorCtx.clusterService!!.state())
concreteIndices = IndexUtils.getNewestIndicesByCreationDate(
concreteIndices,
monitorCtx.clusterService!!.state(),
lastWriteIndexCreationDate
)
}
}
val updatedIndexName = indexName.replace("*", "_")
val updatedProperties = mutableMapOf<String, Any>()
val allFlattenPaths = mutableSetOf<Pair<String, String>>()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.client.IndicesAdminClient
import org.opensearch.cluster.ClusterState
import org.opensearch.cluster.metadata.IndexAbstraction
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.metadata.IndexNameExpressionResolver
import org.opensearch.cluster.service.ClusterService
Expand Down Expand Up @@ -153,5 +154,47 @@ class IndexUtils {

return result
}

@JvmStatic
fun isDataStream(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().dataStreams().containsKey(name)
}

@JvmStatic
fun isAlias(name: String, clusterState: ClusterState): Boolean {
return clusterState.metadata().hasAlias(name)
}

@JvmStatic
fun getWriteIndex(index: String, clusterState: ClusterState): String? {
if (isAlias(index, clusterState) || isDataStream(index, clusterState)) {
val metadata = clusterState.metadata.indicesLookup[index]?.writeIndex
if (metadata != null) {
return metadata.index.name
}
}
return null
}

@JvmStatic
fun getNewestIndicesByCreationDate(concreteIndices: List<String>, clusterState: ClusterState, thresholdDate: Long): List<String> {
val filteredIndices = mutableListOf<String>()
val lookup = clusterState.metadata().indicesLookup
concreteIndices.forEach { indexName ->
val index = lookup[indexName]
val indexMetadata = clusterState.metadata.index(indexName)
if (index != null && index.type == IndexAbstraction.Type.CONCRETE_INDEX) {
if (indexMetadata.creationDate >= thresholdDate) {
filteredIndices.add(indexName)
}
}
}
return filteredIndices
}

@JvmStatic
fun getCreationDateForIndex(index: String, clusterState: ClusterState): Long {
return clusterState.metadata.index(index).creationDate
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ import javax.management.MBeanServerInvocationHandler
import javax.management.ObjectName
import javax.management.remote.JMXConnectorFactory
import javax.management.remote.JMXServiceURL
import kotlin.collections.ArrayList
import kotlin.collections.HashMap

/**
* Superclass for tests that interact with an external test cluster using OpenSearch's RestClient
Expand Down Expand Up @@ -911,7 +913,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
private fun indexDoc(client: RestClient, index: String, id: String, doc: String, refresh: Boolean = true): Response {
val requestBody = StringEntity(doc, APPLICATION_JSON)
val params = if (refresh) mapOf("refresh" to "true") else mapOf()
val response = client.makeRequest("PUT", "$index/_doc/$id", params, requestBody)
val response = client.makeRequest("POST", "$index/_doc/$id?op_type=create", params, requestBody)
assertTrue(
"Unable to index doc: '${doc.take(15)}...' to index: '$index'",
listOf(RestStatus.OK, RestStatus.CREATED).contains(response.restStatus())
Expand Down Expand Up @@ -947,6 +949,11 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return index
}

protected fun createTestIndex(index: String, mapping: String?, alias: String): String {
createIndex(index, Settings.EMPTY, mapping?.trimIndent(), alias)
return index
}

protected fun createTestConfigIndex(index: String = "." + randomAlphaOfLength(10).lowercase(Locale.ROOT)): String {
try {
createIndex(
Expand Down Expand Up @@ -983,7 +990,7 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
val indicesMap = mutableMapOf<String, Boolean>()
val indicesJson = jsonBuilder().startObject().startArray("actions")
indices.keys.map {
val indexName = createTestIndex(index = it.lowercase(Locale.ROOT), mapping = "")
val indexName = createTestIndex(index = it, mapping = "")
val isWriteIndex = indices.getOrDefault(indexName, false)
indicesMap[indexName] = isWriteIndex
val indexMap = mapOf(
Expand All @@ -1000,17 +1007,155 @@ abstract class AlertingRestTestCase : ODFERestTestCase() {
return mutableMapOf(alias to indicesMap)
}

protected fun createDataStream(datastream: String, mappings: String?, useComponentTemplate: Boolean) {
val indexPattern = "$datastream*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
if (useComponentTemplate) {
// Setup index_template
createComponentTemplateWithMappings(
"my_ds_component_template-$datastream",
componentTemplateMappings
)
}
createComposableIndexTemplate(
"my_index_template_ds-$datastream",
listOf(indexPattern),
(if (useComponentTemplate) "my_ds_component_template-$datastream" else null),
mappings,
true,
0
)
createDataStream(datastream)
}

protected fun createDataStream(datastream: String? = randomAlphaOfLength(10).lowercase(Locale.ROOT)) {
client().makeRequest("PUT", "_data_stream/$datastream")
}

protected fun deleteDataStream(datastream: String) {
client().makeRequest("DELETE", "_data_stream/$datastream")
}

protected fun createIndexAlias(alias: String, mappings: String?) {
val indexPattern = "$alias*"
var componentTemplateMappings = "\"properties\": {" +
" \"netflow.destination_transport_port\":{ \"type\": \"long\" }," +
" \"netflow.destination_ipv4_address\":{ \"type\": \"ip\" }" +
"}"
if (mappings != null) {
componentTemplateMappings = mappings
}
createComponentTemplateWithMappings(
"my_alias_component_template-$alias",
componentTemplateMappings
)
createComposableIndexTemplate(
"my_index_template_alias-$alias",
listOf(indexPattern),
"my_alias_component_template-$alias",
mappings,
false,
0
)
createTestIndex(
"$alias-000001",
null,
"""
"$alias": {
"is_write_index": true
}
""".trimIndent()
)
}

protected fun deleteIndexAlias(alias: String) {
client().makeRequest("DELETE", "$alias*/_alias/$alias")
}

protected fun createComponentTemplateWithMappings(componentTemplateName: String, mappings: String?) {
val body = """{"template" : { "mappings": {$mappings} }}"""
client().makeRequest(
"PUT",
"_component_template/$componentTemplateName",
emptyMap(),
StringEntity(body, ContentType.APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun createComposableIndexTemplate(
templateName: String,
indexPatterns: List<String>,
componentTemplateName: String?,
mappings: String?,
isDataStream: Boolean,
priority: Int
) {
var body = "{\n"
if (isDataStream) {
body += "\"data_stream\": { },"
}
body += "\"index_patterns\": [" +
indexPatterns.stream().collect(
Collectors.joining(",", "\"", "\"")
) + "],"
if (componentTemplateName == null) {
body += "\"template\": {\"mappings\": {$mappings}},"
}
if (componentTemplateName != null) {
body += "\"composed_of\": [\"$componentTemplateName\"],"
}
body += "\"priority\":$priority}"
client().makeRequest(
"PUT",
"_index_template/$templateName",
emptyMap(),
StringEntity(body, APPLICATION_JSON),
BasicHeader("Content-Type", "application/json")
)
}

protected fun getDatastreamWriteIndex(datastream: String): String {
val response = client().makeRequest("GET", "_data_stream/$datastream", emptyMap(), null)
var respAsMap = responseAsMap(response)
if (respAsMap.containsKey("data_streams")) {
respAsMap = (respAsMap["data_streams"] as ArrayList<HashMap<String, *>>)[0]
val indices = respAsMap["indices"] as List<Map<String, Any>>
val index = indices.last()
return index["index_name"] as String
} else {
respAsMap = respAsMap[datastream] as Map<String, Object>
}
val indices = respAsMap["indices"] as Array<String>
return indices.last()
}

protected fun rolloverDatastream(datastream: String) {
client().makeRequest(
"POST",
datastream + "/_rollover",
emptyMap(),
null
)
}

protected fun randomAliasIndices(
alias: String,
num: Int = randomIntBetween(1, 10),
includeWriteIndex: Boolean = true,
): Map<String, Boolean> {
val indices = mutableMapOf<String, Boolean>()
val writeIndex = randomIntBetween(0, num)
val writeIndex = randomIntBetween(0, num - 1)
for (i: Int in 0 until num) {
var indexName = randomAlphaOfLength(10)
var indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
while (indexName.equals(alias) || indices.containsKey(indexName))
indexName = randomAlphaOfLength(10)
indexName = randomAlphaOfLength(10).lowercase(Locale.ROOT)
indices[indexName] = includeWriteIndex && i == writeIndex
}
return indices
Expand Down
Loading
Loading