Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
address Drew's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed Jan 26, 2021
1 parent f3d54e2 commit c990bcd
Show file tree
Hide file tree
Showing 11 changed files with 148 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,112 +26,107 @@ import org.elasticsearch.common.Strings
import org.elasticsearch.common.ValidationException
import org.elasticsearch.common.regex.Regex

private val log = LogManager.getLogger(ISMTemplateService::class.java)
private val log = LogManager.getLogger("ISMTemplateService")

class ISMTemplateService {
companion object {
/**
* find the matching policy based on ISM template field for the given index
*
* filter out hidden index
* filter out older index than template lastUpdateTime
*
* @param ismTemplates current ISM templates saved in metadata
* @param indexMetadata cluster state index metadata
* @return policyID
*/
@Suppress("ReturnCount")
fun findMatchingPolicy(ismTemplates: Map<String, ISMTemplate>, indexMetadata: IndexMetadata): String? {
if (ismTemplates.isEmpty()) return null

val indexName = indexMetadata.index.name
/**
* find the matching policy based on ISM template field for the given index
*
* filter out hidden index
* filter out older index than template lastUpdateTime
*
* @param ismTemplates current ISM templates saved in metadata
* @param indexMetadata cluster state index metadata
* @return policyID
*/
fun Map<String, ISMTemplate>.findMatchingPolicy(indexMetadata: IndexMetadata): String? {
if (this.isEmpty()) return null

// don't include hidden index
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
if (isHidden) return null
val indexName = indexMetadata.index.name

// only process indices created after template
// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1
ismTemplates.filter { (_, template) ->
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}.forEach { (policyID, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched && highestPriority < template.priority) {
highestPriority = template.priority
matchedPolicy = policyID
}
}
// don't include hidden index
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
if (isHidden) return null

return matchedPolicy
// only process indices created after template
// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1
this.filter { (_, template) ->
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}.forEach { (policyID, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched && highestPriority < template.priority) {
highestPriority = template.priority
matchedPolicy = policyID
}
}

return matchedPolicy
}

/**
* validate the template Name and indexPattern provided in the template
* reusing ES validate function in MetadataIndexTemplateService
*/
@Suppress("ComplexMethod")
fun validateFormat(indexPatterns: List<String>): ElasticsearchException? {
val validationErrors = mutableListOf<String>()
for (indexPattern in indexPatterns) {
if (indexPattern.contains(" ")) {
validationErrors.add("index_patterns [$indexPattern] must not contain a space")
}
if (indexPattern.contains(",")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ','")
}
if (indexPattern.contains("#")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a '#'")
}
if (indexPattern.contains(":")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ':'")
}
if (indexPattern.startsWith("_")) {
validationErrors.add("index_pattern [$indexPattern] must not start with '_'")
}
if (!Strings.validFileNameExcludingAstrix(indexPattern)) {
validationErrors.add("index_pattern [" + indexPattern + "] must not contain the following characters " +
Strings.INVALID_FILENAME_CHARS)
}
}

if (validationErrors.size > 0) {
val validationException = ValidationException()
validationException.addValidationErrors(validationErrors)
return IndexManagementException.wrap(validationException)
}
return null
/**
* validate the template Name and indexPattern provided in the template
* reusing ES validate function in MetadataIndexTemplateService
*/
@Suppress("ComplexMethod")
fun validateFormat(indexPatterns: List<String>): ElasticsearchException? {
val validationErrors = mutableListOf<String>()
for (indexPattern in indexPatterns) {
if (indexPattern.contains(" ")) {
validationErrors.add("index_patterns [$indexPattern] must not contain a space")
}
if (indexPattern.contains(",")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ','")
}
if (indexPattern.contains("#")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a '#'")
}
if (indexPattern.contains(":")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ':'")
}
if (indexPattern.startsWith("_")) {
validationErrors.add("index_pattern [$indexPattern] must not start with '_'")
}
if (!Strings.validFileNameExcludingAstrix(indexPattern)) {
validationErrors.add("index_pattern [" + indexPattern + "] must not contain the following characters " +
Strings.INVALID_FILENAME_CHARS)
}
}

/**
* find policy templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
@Suppress("SpreadOperator")
fun findConflictingPolicyTemplates(
candidate: String,
indexPatterns: List<String>,
priority: Int,
ismTemplates: Map<String, ISMTemplate?>
): Map<String, List<String>> {
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()
if (validationErrors.size > 0) {
val validationException = ValidationException()
validationException.addValidationErrors(validationErrors)
return IndexManagementException.wrap(validationException)
}
return null
}

// focus on template with same priority
ismTemplates.filterNotNullValues()
.filter { it.value.priority == priority }.forEach { (policyID, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("existing ism_template in $policyID overlaps candidate $candidate")
overlappingTemplates[policyID] = template.indexPatterns
}
}
overlappingTemplates.remove(candidate)
/**
* find policy templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
@Suppress("SpreadOperator")
fun Map<String, ISMTemplate>.findConflictingPolicyTemplates(
candidate: String,
indexPatterns: List<String>,
priority: Int
): Map<String, List<String>> {
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()

return overlappingTemplates
// focus on template with same priority
this.filter { it.value.priority == priority }
.forEach { (policyID, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("existing ism_template in $policyID overlaps candidate $candidate")
overlappingTemplates[policyID] = template.indexPatterns
}
}
overlappingTemplates.remove(candidate)

return overlappingTemplates
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlug
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ISMTemplateService.Companion.findMatchingPolicy
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
Expand Down Expand Up @@ -270,12 +269,14 @@ class ManagedIndexCoordinator(
* */
var hasCreateRequests = false
val updateManagedIndicesRequests = mutableListOf<DocWriteRequest<*>>()
val indicesWithPolicyID = mutableListOf<String>()
val indicesToRemoveManagedIndexMetaDataFrom = mutableListOf<Index>()
event.state().metadata().indices().forEach {
val previousIndexMetaData = event.previousState().metadata().index(it.value.index)
val policyID = it.value.getPolicyID()
val request: DocWriteRequest<*>? = when {
it.value.shouldCreateManagedIndexConfig(previousIndexMetaData) && policyID != null -> {
indicesWithPolicyID.add(it.value.index.name)
hasCreateRequests = true
managedIndexConfigIndexRequest(it.value.index.name, it.value.indexUUID, policyID, jobInterval)
}
Expand All @@ -289,8 +290,12 @@ class ManagedIndexCoordinator(
if (it.value.shouldDeleteManagedIndexMetaData()) indicesToRemoveManagedIndexMetaDataFrom.add(it.value.index)
}

// check if newly created indices matching any ISM templates
val updateMatchingIndexReq = getMatchingIndicesUpdateReq(event.state(), event.indicesCreated())
// Check if newly created indices matching any ISM templates
var updateMatchingIndexReq = emptyList<DocWriteRequest<*>>()
// filter out indices with policyID, they will be picked up in previous block
val indicesCreated = event.indicesCreated().filter { it !in indicesWithPolicyID }
if (indicesCreated.isNotEmpty()) // only check template match if there are new created indices
updateMatchingIndexReq = getMatchingIndicesUpdateReq(event.state(), indicesCreated)
if (updateMatchingIndexReq.isNotEmpty()) hasCreateRequests = true

updateManagedIndices(updateManagedIndicesRequests + updateMatchingIndexReq + indicesDeletedRequests, hasCreateRequests)
Expand All @@ -305,7 +310,7 @@ class ManagedIndexCoordinator(
val templates = getISMTemplates()

val indexToMatchedPolicy = indexNames.map { indexName ->
indexName to findMatchingPolicy(templates, indexMetadatas[indexName])
indexName to templates.findMatchingPolicy(indexMetadatas[indexName])
}.toMap()

val updateManagedIndexReqs = mutableListOf<DocWriteRequest<*>>()
Expand Down Expand Up @@ -336,6 +341,9 @@ class ManagedIndexCoordinator(
emptyMap()
} catch (ex: ClusterBlockException) {
emptyMap()
} catch (e: Exception) {
logger.error("Failed to get ISM templates", e)
emptyMap()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,9 @@ fun IndexMetadata.getManagedIndexMetaData(): ManagedIndexMetaData? {
* parse search response with this function
*
* @return map of policyID to ISMTemplate in this policy
* @throws [IllegalArgumentException]
*/
@Throws(Exception::class)
fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY):
Map<String, ISMTemplate?> {
return response.hits.hits.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ data class ISMTemplate(
) : ToXContentObject, Writeable {

init {
require(indexPatterns.isNotEmpty()) { "at least give one index pattern" }
require(priority >= 0) { "Requires priority to be >= 0" }
require(indexPatterns.isNotEmpty()) { "Requires at least one index pattern" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ISMTemplateService.Companion.findConflictingPolicyTemplates
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ISMTemplateService.Companion.validateFormat
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.filterNotNullValues
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyToTemplateMap
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.findConflictingPolicyTemplates
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexManagementException
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.ISM_TEMPLATE_FIELD
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.validateFormat
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils
import org.apache.logging.log4j.LogManager
import org.elasticsearch.ElasticsearchStatusException
Expand Down Expand Up @@ -111,14 +112,14 @@ class TransportIndexPolicyAction @Inject constructor(

client.search(searchRequest, object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
val policyToTemplateMap = getPolicyToTemplateMap(response, xContentRegistry)
val overlaps = findConflictingPolicyTemplates(request.policyID, indexPatterns, priority, policyToTemplateMap)
if (overlaps.isNotEmpty()) {
val esg = "new policy ${request.policyID} has an ism template with index pattern $indexPatterns " +
"matching existing policy templates ${overlaps.entries.stream().map { "policy [${it.key}] => ${it.value}" }.collect(
val policyToTemplateMap = getPolicyToTemplateMap(response, xContentRegistry).filterNotNullValues()
val conflictingPolicyTemplates = policyToTemplateMap.findConflictingPolicyTemplates(request.policyID, indexPatterns, priority)
if (conflictingPolicyTemplates.isNotEmpty()) {
val errorMessage = "new policy ${request.policyID} has an ism template with index pattern $indexPatterns " +
"matching existing policy templates ${conflictingPolicyTemplates.entries.stream().map { "policy [${it.key}] => ${it.value}" }.collect(
Collectors.joining(","))}," +
" please use a different priority than $priority"
actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(esg)))
actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage)))
return
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.util

import org.elasticsearch.ElasticsearchException
Expand Down
3 changes: 0 additions & 3 deletions src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,6 @@
},
"ism_template": {
"properties": {
"template_name": {
"type": "keyword"
},
"index_patterns": {
"type": "keyword"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,6 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
val templates = response["ism_templates"] as ArrayList<Map<String, Any?>>

templatePredicates.forEachIndexed { ind, (_, predicates) ->
// assertTrue("The template: $name was not found in the response: $response", templates.containsKey(name))
val template = templates[ind]
predicates.forEach { (fieldName, predicate) ->
assertTrue("The key: $fieldName was not found in the response: $template", template.containsKey(fieldName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor
import org.elasticsearch.common.settings.Settings
import org.elasticsearch.common.xcontent.XContentType
import org.junit.Assert
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.Locale
Expand Down Expand Up @@ -70,6 +71,8 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() {
}

fun `test creating index with invalid policy_id`() {
deleteIndex(INDEX_MANAGEMENT_INDEX)
waitFor { assertFalse(indexExists(INDEX_MANAGEMENT_INDEX)) }
val indexOne = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val indexTwo = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
val indexThree = randomAlphaOfLength(10).toLowerCase(Locale.ROOT)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase
Expand Down
3 changes: 0 additions & 3 deletions src/test/resources/mappings/cached-opendistro-ism-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -445,9 +445,6 @@
},
"ism_template": {
"properties": {
"template_name": {
"type": "keyword"
},
"index_patterns": {
"type": "keyword"
},
Expand Down

0 comments on commit c990bcd

Please sign in to comment.