diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/elasticapi/ElasticExtensions.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/elasticapi/ElasticExtensions.kt index d0dab01d2..735ba75ca 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/elasticapi/ElasticExtensions.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/elasticapi/ElasticExtensions.kt @@ -17,6 +17,8 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService import kotlinx.coroutines.delay @@ -70,6 +72,13 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent return this.timeField(name, "${name}_in_millis", instant.toEpochMilli()) } +fun XContentBuilder.optionalISMTemplateField(name: String, ismTemplate: ISMTemplate?): XContentBuilder { + if (ismTemplate == null) { + return nullField(name) + } + return this.field(Policy.ISM_TEMPLATE, ismTemplate) +} + /** * Retries the given [block] of code as specified by the receiver [BackoffPolicy], * if [block] throws an [ElasticsearchException] that is retriable (502, 503, 504). diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt new file mode 100644 index 000000000..f27e3b088 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ISMTemplateService.kt @@ -0,0 +1,127 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate +import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexManagementException +import org.apache.logging.log4j.LogManager +import org.apache.lucene.util.automaton.Operations +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.cluster.metadata.IndexMetadata +import org.elasticsearch.common.Strings +import org.elasticsearch.common.ValidationException +import org.elasticsearch.common.regex.Regex + +private val log = LogManager.getLogger("ISMTemplateService") + +/** + * find the matching policy based on ISM template field for the given index + * + * filter out hidden index + * filter out older index than template lastUpdateTime + * + * @param ismTemplates current ISM templates saved in metadata + * @param indexMetadata cluster state index metadata + * @return policyID + */ +@Suppress("ReturnCount") +fun Map.findMatchingPolicy(indexMetadata: IndexMetadata): String? { + if (this.isEmpty()) return null + + val indexName = indexMetadata.index.name + + // don't include hidden index + val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings) + if (isHidden) return null + + // only process indices created after template + // traverse all ism templates for matching ones + val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) } + var matchedPolicy: String? = null + var highestPriority: Int = -1 + this.filter { (_, template) -> + template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate + }.forEach { (policyID, template) -> + val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate) + if (matched && highestPriority < template.priority) { + highestPriority = template.priority + matchedPolicy = policyID + } + } + + return matchedPolicy +} + +/** + * validate the template Name and indexPattern provided in the template + * + * get the idea from ES validate function in MetadataIndexTemplateService + * acknowledge https://github.com/a2lin who should be the first contributor + */ +@Suppress("ComplexMethod") +fun validateFormat(indexPatterns: List): ElasticsearchException? { + val indexPatternFormatErrors = mutableListOf() + for (indexPattern in indexPatterns) { + if (indexPattern.contains("#")) { + indexPatternFormatErrors.add("index_pattern [$indexPattern] must not contain a '#'") + } + if (indexPattern.contains(":")) { + indexPatternFormatErrors.add("index_pattern [$indexPattern] must not contain a ':'") + } + if (indexPattern.startsWith("_")) { + indexPatternFormatErrors.add("index_pattern [$indexPattern] must not start with '_'") + } + if (!Strings.validFileNameExcludingAstrix(indexPattern)) { + indexPatternFormatErrors.add("index_pattern [" + indexPattern + "] must not contain the following characters " + + Strings.INVALID_FILENAME_CHARS) + } + } + + if (indexPatternFormatErrors.size > 0) { + val validationException = ValidationException() + validationException.addValidationErrors(indexPatternFormatErrors) + return IndexManagementException.wrap(validationException) + } + return null +} + +/** + * find policy templates whose index patterns overlap with given template + * + * @return map of overlapping template name to its index patterns + */ +@Suppress("SpreadOperator") +fun Map.findConflictingPolicyTemplates( + candidate: String, + indexPatterns: List, + priority: Int +): Map> { + val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray()) + val overlappingTemplates = mutableMapOf>() + + // focus on template with same priority + this.filter { it.value.priority == priority } + .forEach { (policyID, template) -> + val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray()) + if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) { + log.info("Existing ism_template for $policyID overlaps candidate $candidate") + overlappingTemplates[policyID] = template.indexPatterns + } + } + overlappingTemplates.remove(candidate) + + return overlappingTemplates +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt index 9b4be1412..910c9e6bb 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexCoordinator.kt @@ -17,15 +17,19 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices +import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry +import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.filterNotNullValues +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyToTemplateMap import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldCreateManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexMetaData -import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData @@ -38,6 +42,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SWEEP_PERIOD import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.ISM_TEMPLATE_FIELD import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest @@ -61,13 +66,16 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse import org.elasticsearch.action.bulk.BackoffPolicy import org.elasticsearch.action.bulk.BulkRequest import org.elasticsearch.action.bulk.BulkResponse +import org.elasticsearch.action.search.SearchPhaseExecutionException +import org.elasticsearch.action.search.SearchRequest import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.support.IndicesOptions import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.Client import org.elasticsearch.cluster.ClusterChangedEvent import org.elasticsearch.cluster.ClusterState -import org.elasticsearch.cluster.LocalNodeMasterListener +import org.elasticsearch.cluster.ClusterStateListener +import org.elasticsearch.cluster.block.ClusterBlockException import org.elasticsearch.cluster.service.ClusterService import org.elasticsearch.common.bytes.BytesReference import org.elasticsearch.common.component.LifecycleListener @@ -79,7 +87,10 @@ import org.elasticsearch.common.xcontent.XContentHelper import org.elasticsearch.common.xcontent.XContentParser import org.elasticsearch.common.xcontent.XContentType import org.elasticsearch.index.Index +import org.elasticsearch.index.IndexNotFoundException +import org.elasticsearch.index.query.QueryBuilders import org.elasticsearch.rest.RestStatus +import org.elasticsearch.search.builder.SearchSourceBuilder import org.elasticsearch.threadpool.Scheduler import org.elasticsearch.threadpool.ThreadPool @@ -106,7 +117,7 @@ class ManagedIndexCoordinator( private val clusterService: ClusterService, private val threadPool: ThreadPool, indexManagementIndices: IndexManagementIndices -) : LocalNodeMasterListener, +) : ClusterStateListener, CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")), LifecycleListener() { @@ -122,10 +133,11 @@ class ManagedIndexCoordinator( BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings)) @Volatile private var jobInterval = JOB_INTERVAL.get(settings) + @Volatile private var isMaster = false + init { clusterService.addListener(this) clusterService.addLifecycleListener(this) - clusterService.addLocalNodeMasterListener(this) clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEP_PERIOD) { sweepPeriod = it initBackgroundSweep() @@ -142,18 +154,34 @@ class ManagedIndexCoordinator( } } - override fun onMaster() { + private fun executorName(): String { + return ThreadPool.Names.MANAGEMENT + } + + fun onMaster() { // Init background sweep when promoted to being master initBackgroundSweep() } - override fun offMaster() { + fun offMaster() { // Cancel background sweep when demoted from being master scheduledFullSweep?.cancel() } @Suppress("ReturnCount") override fun clusterChanged(event: ClusterChangedEvent) { + // Instead of using a LocalNodeMasterListener to track master changes, this service will + // track them here to avoid conditions where master listener events run after other + // listeners that depend on what happened in the master listener + if (this.isMaster != event.localNodeMaster()) { + this.isMaster = event.localNodeMaster() + if (this.isMaster) { + onMaster() + } else { + offMaster() + } + } + if (!isIndexStateManagementEnabled()) return if (!event.localNodeMaster()) return @@ -242,12 +270,14 @@ class ManagedIndexCoordinator( * */ var hasCreateRequests = false val updateManagedIndicesRequests = mutableListOf>() + val indicesWithPolicyID = mutableListOf() val indicesToRemoveManagedIndexMetaDataFrom = mutableListOf() event.state().metadata().indices().forEach { val previousIndexMetaData = event.previousState().metadata().index(it.value.index) val policyID = it.value.getPolicyID() val request: DocWriteRequest<*>? = when { it.value.shouldCreateManagedIndexConfig(previousIndexMetaData) && policyID != null -> { + indicesWithPolicyID.add(it.value.index.name) hasCreateRequests = true managedIndexConfigIndexRequest(it.value.index.name, it.value.indexUUID, policyID, jobInterval) } @@ -261,10 +291,68 @@ class ManagedIndexCoordinator( if (it.value.shouldDeleteManagedIndexMetaData()) indicesToRemoveManagedIndexMetaDataFrom.add(it.value.index) } - updateManagedIndices(updateManagedIndicesRequests + indicesDeletedRequests, hasCreateRequests) + // Check if newly created indices matching any ISM templates + var updateMatchingIndexReq = emptyList>() + // filter out indices with policyID, they will be picked up in previous block + val indicesCreated = event.indicesCreated().filter { it !in indicesWithPolicyID } + if (indicesCreated.isNotEmpty()) // only check template match if there are new created indices + updateMatchingIndexReq = getMatchingIndicesUpdateReq(event.state(), indicesCreated) + if (updateMatchingIndexReq.isNotEmpty()) hasCreateRequests = true + + updateManagedIndices(updateManagedIndicesRequests + updateMatchingIndexReq + indicesDeletedRequests, hasCreateRequests) clearManagedIndexMetaData(indicesToRemoveManagedIndexMetaDataFrom) } + /** + * build requests to create jobs for indices matching ISM templates + */ + suspend fun getMatchingIndicesUpdateReq(clusterState: ClusterState, indexNames: List): List> { + val updateManagedIndexReqs = mutableListOf>() + if (indexNames.isEmpty()) return updateManagedIndexReqs + + val indexMetadatas = clusterState.metadata.indices + val templates = getISMTemplates() + + val indexToMatchedPolicy = indexNames.map { indexName -> + indexName to templates.findMatchingPolicy(indexMetadatas[indexName]) + }.toMap() + + indexToMatchedPolicy.filterNotNullValues() + .forEach { (index, policyID) -> + val indexUuid = indexMetadatas[index].indexUUID + if (indexUuid != null) { + logger.info("index [$index] will be managed by policy [$policyID]") + updateManagedIndexReqs.add( + managedIndexConfigIndexRequest(index, indexUuid, policyID, jobInterval)) + } + } + + return updateManagedIndexReqs + } + + suspend fun getISMTemplates(): Map { + val searchRequest = SearchRequest() + .source( + SearchSourceBuilder().query( + QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD))) + .indices(INDEX_MANAGEMENT_INDEX) + + return try { + val response: SearchResponse = client.suspendUntil { search(searchRequest, it) } + getPolicyToTemplateMap(response).filterNotNullValues() + } catch (ex: IndexNotFoundException) { + emptyMap() + } catch (ex: ClusterBlockException) { + emptyMap() + } catch (e: SearchPhaseExecutionException) { + logger.error("Failed to get ISM templates: $e") + emptyMap() + } catch (e: Exception) { + logger.error("Failed to get ISM templates", e) + emptyMap() + } + } + /** * Background sweep process that periodically sweeps for updates to ManagedIndices * @@ -302,7 +390,7 @@ class ManagedIndexCoordinator( } } - scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, ThreadPool.Names.SAME) + scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, executorName()) } private fun getFullSweepElapsedTime(): TimeValue = @@ -317,6 +405,13 @@ class ManagedIndexCoordinator( @OpenForTesting suspend fun sweep() { val currentManagedIndices = sweepManagedIndexJobs(client, ismIndices.indexManagementIndexExists()) + + // check all un-managed indices, if matches any ism template + val unManagedIndices = clusterService.state().metadata.indices.values().filterNotNull() + .filter { it.value.indexUUID !in currentManagedIndices.keys }.map { it.value.index.name } + val updateMatchingIndicesReq = getMatchingIndicesUpdateReq(clusterService.state(), unManagedIndices) + updateManagedIndices(updateMatchingIndicesReq, updateMatchingIndicesReq.isNotEmpty()) + val clusterStateManagedIndices = sweepClusterState(clusterService.state()) val createManagedIndexRequests = @@ -330,6 +425,7 @@ class ManagedIndexCoordinator( val requests = createManagedIndexRequests + deleteManagedIndexRequests updateManagedIndices(requests, createManagedIndexRequests.isNotEmpty()) clearManagedIndexMetaData(indicesToDeleteManagedIndexMetaDataFrom) + lastFullSweepTimeNano = System.nanoTime() } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt index 7d9104d57..177e51e44 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/ManagedIndexRunner.kt @@ -190,7 +190,7 @@ object ManagedIndexRunner : ScheduledJobRunner, } } - @Suppress("ReturnCount", "ComplexMethod", "LongMethod") + @Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition") private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig) { // doing a check of local cluster health as we do not want to overload master node with potentially a lot of calls if (clusterIsRed()) { diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/elasticapi/ElasticExtensions.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/elasticapi/ElasticExtensions.kt index ab2d32b28..629488264 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/elasticapi/ElasticExtensions.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/elasticapi/ElasticExtensions.kt @@ -17,10 +17,18 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi +import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.cluster.metadata.IndexMetadata +import org.elasticsearch.common.xcontent.LoggingDeprecationHandler +import org.elasticsearch.common.xcontent.NamedXContentRegistry +import org.elasticsearch.common.xcontent.XContentFactory +import org.elasticsearch.common.xcontent.XContentType /** * Compares current and previous IndexMetaData to determine if we should create [ManagedIndexConfig]. @@ -98,3 +106,28 @@ fun IndexMetadata.getManagedIndexMetaData(): ManagedIndexMetaData? { } return null } + +/** + * Do a exists search query to retrieve all policy with ism_template field + * parse search response with this function + * + * @return map of policyID to ISMTemplate in this policy + * @throws [IllegalArgumentException] + */ +@Throws(Exception::class) +fun getPolicyToTemplateMap(response: SearchResponse, xContentRegistry: NamedXContentRegistry = NamedXContentRegistry.EMPTY): + Map { + return response.hits.hits.map { + val id = it.id + val seqNo = it.seqNo + val primaryTerm = it.primaryTerm + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, it.sourceAsString) + xcp.parseWithType(id, seqNo, primaryTerm, Policy.Companion::parse) + .copy(id = id, seqNo = seqNo, primaryTerm = primaryTerm) + }.map { it.id to it.ismTemplate }.toMap() +} + +@Suppress("UNCHECKED_CAST") +fun Map.filterNotNullValues(): Map = + filterValues { it != null } as Map diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ISMTemplate.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ISMTemplate.kt new file mode 100644 index 000000000..31382f0af --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ISMTemplate.kt @@ -0,0 +1,106 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model + +import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.instant +import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.optionalTimeField +import org.apache.logging.log4j.LogManager +import org.elasticsearch.common.io.stream.StreamInput +import org.elasticsearch.common.io.stream.StreamOutput +import org.elasticsearch.common.io.stream.Writeable +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentParser.Token +import org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken +import java.io.IOException +import java.lang.IllegalArgumentException +import java.time.Instant + +private val log = LogManager.getLogger(ISMTemplate::class.java) + +data class ISMTemplate( + val indexPatterns: List, + val priority: Int, + val lastUpdatedTime: Instant +) : ToXContentObject, Writeable { + + init { + require(priority >= 0) { "Requires priority to be >= 0" } + require(indexPatterns.isNotEmpty()) { "Requires at least one index pattern" } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + return builder.startObject() + .field(INDEX_PATTERN, indexPatterns) + .field(PRIORITY, priority) + .optionalTimeField(LAST_UPDATED_TIME_FIELD, lastUpdatedTime) + .endObject() + } + + @Throws(IOException::class) + constructor(sin: StreamInput) : this( + sin.readStringList(), + sin.readInt(), + sin.readInstant() + ) + + @Throws(IOException::class) + override fun writeTo(out: StreamOutput) { + out.writeStringCollection(indexPatterns) + out.writeInt(priority) + out.writeInstant(lastUpdatedTime) + } + + companion object { + const val ISM_TEMPLATE_TYPE = "ism_template" + const val INDEX_PATTERN = "index_patterns" + const val PRIORITY = "priority" + const val LAST_UPDATED_TIME_FIELD = "last_updated_time" + + @Suppress("ComplexMethod") + fun parse(xcp: XContentParser): ISMTemplate { + val indexPatterns: MutableList = mutableListOf() + var priority = 0 + var lastUpdatedTime: Instant? = null + + ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + INDEX_PATTERN -> { + ensureExpectedToken(Token.START_ARRAY, xcp.currentToken(), xcp) + while (xcp.nextToken() != Token.END_ARRAY) { + indexPatterns.add(xcp.text()) + } + } + PRIORITY -> priority = if (xcp.currentToken() == Token.VALUE_NULL) 0 else xcp.intValue() + LAST_UPDATED_TIME_FIELD -> lastUpdatedTime = xcp.instant() + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ISMTemplate.") + } + } + + return ISMTemplate( + indexPatterns = indexPatterns, + priority = priority, + lastUpdatedTime = lastUpdatedTime ?: Instant.now() + ) + } + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt index 7e20203be..7c52044bd 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/Policy.kt @@ -16,6 +16,7 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.instant +import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.optionalISMTemplateField import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.optionalTimeField import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.WITH_TYPE @@ -41,7 +42,8 @@ data class Policy( val lastUpdatedTime: Instant, val errorNotification: ErrorNotification?, val defaultState: String, - val states: List + val states: List, + val ismTemplate: ISMTemplate? = null ) : ToXContentObject, Writeable { init { @@ -72,6 +74,7 @@ data class Policy( .field(ERROR_NOTIFICATION_FIELD, errorNotification) .field(DEFAULT_STATE_FIELD, defaultState) .field(STATES_FIELD, states.toTypedArray()) + .optionalISMTemplateField(ISM_TEMPLATE, ismTemplate) if (params.paramAsBoolean(WITH_TYPE, true)) builder.endObject() return builder.endObject() } @@ -86,7 +89,8 @@ data class Policy( lastUpdatedTime = sin.readInstant(), errorNotification = sin.readOptionalWriteable(::ErrorNotification), defaultState = sin.readString(), - states = sin.readList(::State) + states = sin.readList(::State), + ismTemplate = sin.readOptionalWriteable(::ISMTemplate) ) @Throws(IOException::class) @@ -100,6 +104,7 @@ data class Policy( out.writeOptionalWriteable(errorNotification) out.writeString(defaultState) out.writeList(states) + out.writeOptionalWriteable(ismTemplate) } companion object { @@ -112,6 +117,7 @@ data class Policy( const val ERROR_NOTIFICATION_FIELD = "error_notification" const val DEFAULT_STATE_FIELD = "default_state" const val STATES_FIELD = "states" + const val ISM_TEMPLATE = "ism_template" @Suppress("ComplexMethod") @JvmStatic @@ -129,6 +135,7 @@ data class Policy( var lastUpdatedTime: Instant? = null var schemaVersion: Long = IndexUtils.DEFAULT_SCHEMA_VERSION val states: MutableList = mutableListOf() + var ismTemplate: ISMTemplate? = null ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp) while (xcp.nextToken() != Token.END_OBJECT) { @@ -148,6 +155,7 @@ data class Policy( states.add(State.parse(xcp)) } } + ISM_TEMPLATE -> ismTemplate = if (xcp.currentToken() == Token.VALUE_NULL) null else ISMTemplate.parse(xcp) else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Policy.") } } @@ -161,7 +169,8 @@ data class Policy( lastUpdatedTime = lastUpdatedTime ?: Instant.now(), errorNotification = errorNotification, defaultState = requireNotNull(defaultState) { "$DEFAULT_STATE_FIELD is null" }, - states = states.toList() + states = states.toList(), + ismTemplate = ismTemplate ) } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt index d3d5ede7d..ca0bd5467 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt @@ -147,7 +147,8 @@ class AttemptSnapshotStep( companion object { const val name = "attempt_snapshot" - fun getBlockedMessage(denyList: List, repoName: String, index: String) = "Snapshot repository [$repoName] is blocked in $denyList [index=$index]" + fun getBlockedMessage(denyList: List, repoName: String, index: String) = + "Snapshot repository [$repoName] is blocked in $denyList [index=$index]" fun getFailedMessage(index: String) = "Failed to create snapshot [index=$index]" fun getFailedConcurrentSnapshotMessage(index: String) = "Concurrent snapshot in progress, retrying next execution [index=$index]" fun getSuccessMessage(index: String) = "Successfully started snapshot [index=$index]" diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt index 4a7f62e97..db79063ca 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/transport/action/indexpolicy/TransportIndexPolicyAction.kt @@ -17,6 +17,12 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.filterNotNullValues +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyToTemplateMap +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.findConflictingPolicyTemplates +import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexManagementException +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.ISM_TEMPLATE_FIELD +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.validateFormat import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexUtils import org.apache.logging.log4j.LogManager import org.elasticsearch.ElasticsearchStatusException @@ -24,16 +30,22 @@ import org.elasticsearch.action.ActionListener import org.elasticsearch.action.DocWriteRequest import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.action.index.IndexResponse +import org.elasticsearch.action.search.SearchRequest +import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.action.support.ActionFilters import org.elasticsearch.action.support.HandledTransportAction import org.elasticsearch.action.support.master.AcknowledgedResponse import org.elasticsearch.client.node.NodeClient import org.elasticsearch.common.inject.Inject +import org.elasticsearch.common.xcontent.NamedXContentRegistry import org.elasticsearch.common.xcontent.XContentFactory +import org.elasticsearch.index.query.QueryBuilders import org.elasticsearch.index.seqno.SequenceNumbers import org.elasticsearch.rest.RestStatus +import org.elasticsearch.search.builder.SearchSourceBuilder import org.elasticsearch.tasks.Task import org.elasticsearch.transport.TransportService +import java.util.stream.Collectors private val log = LogManager.getLogger(TransportIndexPolicyAction::class.java) @@ -41,7 +53,8 @@ class TransportIndexPolicyAction @Inject constructor( val client: NodeClient, transportService: TransportService, actionFilters: ActionFilters, - val ismIndices: IndexManagementIndices + val ismIndices: IndexManagementIndices, + val xContentRegistry: NamedXContentRegistry ) : HandledTransportAction( IndexPolicyAction.NAME, transportService, actionFilters, ::IndexPolicyRequest ) { @@ -69,7 +82,12 @@ class TransportIndexPolicyAction @Inject constructor( private fun onCreateMappingsResponse(response: AcknowledgedResponse) { if (response.isAcknowledged) { log.info("Successfully created or updated ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mappings.") - putPolicy() + + // if there is template field, we will check + val reqTemplate = request.policy.ismTemplate + if (reqTemplate != null) { + checkTemplate(reqTemplate.indexPatterns, reqTemplate.priority) + } else putPolicy() } else { log.error("Unable to create or update ${IndexManagementPlugin.INDEX_MANAGEMENT_INDEX} with newest mapping.") @@ -79,6 +97,42 @@ class TransportIndexPolicyAction @Inject constructor( } } + private fun checkTemplate(indexPatterns: List, priority: Int) { + val possibleEx = validateFormat(indexPatterns) + if (possibleEx != null) { + actionListener.onFailure(possibleEx) + return + } + + val searchRequest = SearchRequest() + .source( + SearchSourceBuilder().query( + QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD))) + .indices(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX) + + client.search(searchRequest, object : ActionListener { + override fun onResponse(response: SearchResponse) { + val policyToTemplateMap = getPolicyToTemplateMap(response, xContentRegistry).filterNotNullValues() + val conflictingPolicyTemplates = policyToTemplateMap.findConflictingPolicyTemplates(request.policyID, indexPatterns, priority) + if (conflictingPolicyTemplates.isNotEmpty()) { + val errorMessage = "new policy ${request.policyID} has an ism template with index pattern $indexPatterns " + + "matching existing policy templates ${conflictingPolicyTemplates.entries.stream() + .map { "policy [${it.key}] => ${it.value}" }.collect( + Collectors.joining(","))}," + + " please use a different priority than $priority" + actionListener.onFailure(IndexManagementException.wrap(IllegalArgumentException(errorMessage))) + return + } + + putPolicy() + } + + override fun onFailure(t: Exception) { + actionListener.onFailure(t) + } + }) + } + private fun putPolicy() { request.policy.copy(schemaVersion = IndexUtils.indexManagementConfigSchemaVersion) diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt index 53791da25..569cde5b7 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/util/RestHandlerUtils.kt @@ -35,6 +35,8 @@ const val FAILURES = "failures" const val FAILED_INDICES = "failed_indices" const val UPDATED_INDICES = "updated_indices" +const val ISM_TEMPLATE_FIELD = "policy.ism_template" + const val DEFAULT_PAGINATION_SIZE = 20 const val DEFAULT_PAGINATION_FROM = 0 const val DEFAULT_JOB_SORT_FIELD = "managed_index.index" diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/util/IndexManagementException.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/util/IndexManagementException.kt new file mode 100644 index 000000000..7b9526a48 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/util/IndexManagementException.kt @@ -0,0 +1,60 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.util + +import org.elasticsearch.ElasticsearchException +import org.elasticsearch.common.Strings +import org.elasticsearch.common.ValidationException +import org.elasticsearch.index.IndexNotFoundException +import org.elasticsearch.rest.RestStatus +import java.lang.IllegalArgumentException + +class IndexManagementException(message: String, val status: RestStatus, ex: Exception) : ElasticsearchException(message, ex) { + + override fun status(): RestStatus { + return status + } + + companion object { + @JvmStatic + fun wrap(ex: Exception): ElasticsearchException { + + var friendlyMsg = ex.message as String + var status = RestStatus.INTERNAL_SERVER_ERROR + when (ex) { + is IndexNotFoundException -> { + status = ex.status() + friendlyMsg = "Configuration index not found" + } + is IllegalArgumentException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + is ValidationException -> { + status = RestStatus.BAD_REQUEST + friendlyMsg = ex.message as String + } + else -> { + if (!Strings.isNullOrEmpty(ex.message)) { + friendlyMsg = ex.message as String + } + } + } + + return IndexManagementException(friendlyMsg, status, Exception("${ex.javaClass.name}: ${ex.message}")) + } + } +} diff --git a/src/main/resources/mappings/opendistro-ism-config.json b/src/main/resources/mappings/opendistro-ism-config.json index e98c644ef..c664fa39f 100644 --- a/src/main/resources/mappings/opendistro-ism-config.json +++ b/src/main/resources/mappings/opendistro-ism-config.json @@ -442,6 +442,20 @@ } } } + }, + "ism_template": { + "properties": { + "index_patterns": { + "type": "keyword" + }, + "priority": { + "type": "long" + }, + "last_updated_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } } } }, diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt index 81000dbf7..bc4bef980 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -24,6 +24,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlug import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementRestTestCase import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ChangePolicy +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy @@ -717,4 +718,35 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase() } return true } + + protected fun assertPredicatesOnISMTemplatesMap( + templatePredicates: List Boolean>>>>, // response map name: predicate + response: Map + ) { + val templates = response["ism_templates"] as ArrayList> + + templatePredicates.forEachIndexed { ind, (_, predicates) -> + val template = templates[ind] + predicates.forEach { (fieldName, predicate) -> + assertTrue("The key: $fieldName was not found in the response: $template", template.containsKey(fieldName)) + assertTrue("Failed predicate assertion for $fieldName in response=($template) predicate=$predicate", predicate(template[fieldName])) + } + } + } + + protected fun assertISMTemplateEquals(expected: ISMTemplate, actualISMTemplateMap: Any?): Boolean { + actualISMTemplateMap as Map + assertEquals(expected.indexPatterns, actualISMTemplateMap[ISMTemplate.INDEX_PATTERN]) + assertEquals(expected.priority, actualISMTemplateMap[ISMTemplate.PRIORITY]) + return true + } + + protected fun assertISMTemplateEquals(expected: ISMTemplate, actual: ISMTemplate?): Boolean { + assertNotNull(actual) + if (actual != null) { + assertEquals(expected.indexPatterns, actual.indexPatterns) + assertEquals(expected.priority, actual.priority) + } + return true + } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/TestHelpers.kt index b081b9d70..67097dc71 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/TestHelpers.kt @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.string import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ChangePolicy import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Conditions import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ErrorNotification +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy @@ -66,10 +67,11 @@ fun randomPolicy( schemaVersion: Long = ESRestTestCase.randomLong(), lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS), errorNotification: ErrorNotification? = randomErrorNotification(), - states: List = List(ESRestTestCase.randomIntBetween(1, 10)) { randomState() } + states: List = List(ESRestTestCase.randomIntBetween(1, 10)) { randomState() }, + ismTemplate: ISMTemplate? = null ): Policy { return Policy(id = id, schemaVersion = schemaVersion, lastUpdatedTime = lastUpdatedTime, - errorNotification = errorNotification, defaultState = states[0].name, states = states, description = description) + errorNotification = errorNotification, defaultState = states[0].name, states = states, description = description, ismTemplate = ismTemplate) } fun randomState( @@ -314,6 +316,18 @@ fun randomSweptManagedIndexConfig( ) } +fun randomISMTemplate( + indexPatterns: List = listOf(ESRestTestCase.randomAlphaOfLength(10) + "*"), + priority: Int = ESRestTestCase.randomIntBetween(0, 100), + lastUpdatedTime: Instant = Instant.now().truncatedTo(ChronoUnit.MILLIS) +): ISMTemplate { + return ISMTemplate( + indexPatterns = indexPatterns, + priority = priority, + lastUpdatedTime = lastUpdatedTime + ) +} + fun Policy.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContent(builder).string() @@ -403,3 +417,8 @@ fun RollupActionConfig.toJsonString(): String { val builder = XContentFactory.jsonBuilder() return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() } + +fun ISMTemplate.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt index 92dc0030b..b1adc67a1 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/coordinator/ManagedIndexCoordinatorIT.kt @@ -70,6 +70,9 @@ class ManagedIndexCoordinatorIT : IndexStateManagementRestTestCase() { } fun `test creating index with invalid policy_id`() { + wipeAllODFEIndices() + assertIndexDoesNotExist(INDEX_MANAGEMENT_INDEX) + val indexOne = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) val indexTwo = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) val indexThree = randomAlphaOfLength(10).toLowerCase(Locale.ROOT) diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ISMTemplateTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ISMTemplateTests.kt new file mode 100644 index 000000000..06a95ef87 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/model/ISMTemplateTests.kt @@ -0,0 +1,27 @@ +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomISMTemplate +import org.elasticsearch.common.io.stream.InputStreamStreamInput +import org.elasticsearch.common.io.stream.OutputStreamStreamOutput +import org.elasticsearch.test.ESTestCase +import java.io.ByteArrayInputStream +import java.io.ByteArrayOutputStream + +class ISMTemplateTests : ESTestCase() { + + fun `test basic`() { + val expectedISMTemplate = randomISMTemplate() + + roundTripISMTemplate(expectedISMTemplate) + } + + private fun roundTripISMTemplate(expectedISMTemplate: ISMTemplate) { + val baos = ByteArrayOutputStream() + val osso = OutputStreamStreamOutput(baos) + expectedISMTemplate.writeTo(osso) + val input = InputStreamStreamInput(ByteArrayInputStream(baos.toByteArray())) + + val actualISMTemplate = ISMTemplate(input) + assertEquals(expectedISMTemplate, actualISMTemplate) + } +} \ No newline at end of file diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt new file mode 100644 index 000000000..1241bf898 --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/ISMTemplateRestAPIIT.kt @@ -0,0 +1,123 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.resthandler + +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.IndexStateManagementRestTestCase +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.State +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.action.ReadOnlyActionConfig +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.randomPolicy +import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings +import com.amazon.opendistroforelasticsearch.indexmanagement.randomInstant +import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor +import org.elasticsearch.client.ResponseException +import org.elasticsearch.common.settings.Settings +import org.elasticsearch.rest.RestStatus +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class ISMTemplateRestAPIIT : IndexStateManagementRestTestCase() { + + private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + + private val policyID1 = "t1" + private val policyID2 = "t2" + + fun `test add template with invalid index pattern`() { + try { + val ismTemp = ISMTemplate(listOf(" "), 100, randomInstant()) + createPolicy(randomPolicy(ismTemplate = ismTemp), policyID1) + fail("Expect a failure") + } catch (e: ResponseException) { + assertEquals("Unexpected RestStatus", RestStatus.BAD_REQUEST, e.response.restStatus()) + val actualMessage = e.response.asMap()["error"] as Map + val expectedReason = "Validation Failed: 1: index_pattern [ ] must not contain the following characters [ , \", *, \\, <, |, ,, >, /, ?];" + assertEquals(expectedReason, actualMessage["reason"]) + } + } + + fun `test add template with overlapping index pattern`() { + try { + val ismTemp = ISMTemplate(listOf("log*"), 100, randomInstant()) + val ismTemp2 = ISMTemplate(listOf("lo*"), 100, randomInstant()) + createPolicy(randomPolicy(ismTemplate = ismTemp), policyID1) + createPolicy(randomPolicy(ismTemplate = ismTemp2), policyID2) + fail("Expect a failure") + } catch (e: ResponseException) { + assertEquals("Unexpected RestStatus", RestStatus.BAD_REQUEST, e.response.restStatus()) + val actualMessage = e.response.asMap()["error"] as Map + val expectedReason = "new policy $policyID2 has an ism template with index pattern [lo*] matching existing policy templates policy [$policyID1] => [log*], please use a different priority than 100" + assertEquals(expectedReason, actualMessage["reason"]) + } + } + + fun `test ism template managing index`() { + val indexName1 = "log-000001" + val indexName2 = "log-000002" + val indexName3 = "log-000003" + val policyID = "${testIndexName}_testPolicyName_1" + + // need to specify policyID null, can remove after policyID deprecated + createIndex(indexName1, null) + + val ismTemp = ISMTemplate(listOf("log*"), 100, randomInstant()) + + val actionConfig = ReadOnlyActionConfig(0) + val states = listOf( + State("ReadOnlyState", listOf(actionConfig), listOf()) + ) + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states, + ismTemplate = ismTemp + ) + createPolicy(policy, policyID) + + createIndex(indexName2, null) + createIndex(indexName3, Settings.builder().put("index.hidden", true).build()) + + waitFor { assertNotNull(getManagedIndexConfig(indexName2)) } + + // TODO uncomment in remove policy id + // val managedIndexConfig = getExistingManagedIndexConfig(indexName2) + // updateManagedIndexConfigStartTime(managedIndexConfig) + // waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName2).policyID) } + + // only index create after template can be managed + assertPredicatesOnMetaData( + listOf(indexName1 to listOf(ManagedIndexSettings.POLICY_ID.key to fun(policyID: Any?): Boolean = policyID == null)), + getExplainMap(indexName1), + true + ) + assertNull(getManagedIndexConfig(indexName1)) + + // hidden index will not be manage + assertPredicatesOnMetaData( + listOf(indexName1 to listOf(ManagedIndexSettings.POLICY_ID.key to fun(policyID: Any?): Boolean = policyID == null)), + getExplainMap(indexName1), + true + ) + assertNull(getManagedIndexConfig(indexName3)) + } +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt index a1faadf59..e9b8bf8e3 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexmanagement/indexstatemanagement/resthandler/IndexStateManagementRestApiIT.kt @@ -308,6 +308,7 @@ class IndexStateManagementRestApiIT : IndexStateManagementRestTestCase() { "policy_id" to policy.id, "last_updated_time" to policy.lastUpdatedTime.toEpochMilli(), "default_state" to policy.defaultState, + "ism_template" to null, "description" to policy.description, "error_notification" to policy.errorNotification, "states" to policy.states.map { diff --git a/src/test/resources/mappings/cached-opendistro-ism-config.json b/src/test/resources/mappings/cached-opendistro-ism-config.json index e98c644ef..c664fa39f 100644 --- a/src/test/resources/mappings/cached-opendistro-ism-config.json +++ b/src/test/resources/mappings/cached-opendistro-ism-config.json @@ -442,6 +442,20 @@ } } } + }, + "ism_template": { + "properties": { + "index_patterns": { + "type": "keyword" + }, + "priority": { + "type": "long" + }, + "last_updated_time": { + "type": "date", + "format": "strict_date_time||epoch_millis" + } + } } } },