Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Merge 33b3544 into 7e598fe
Browse files Browse the repository at this point in the history
  • Loading branch information
bowenlan-amzn committed Jan 28, 2021
2 parents 7e598fe + 33b3544 commit 528fb6c
Show file tree
Hide file tree
Showing 19 changed files with 747 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
Expand Down Expand Up @@ -70,6 +72,13 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent
return this.timeField(name, "${name}_in_millis", instant.toEpochMilli())
}

fun XContentBuilder.optionalISMTemplateField(name: String, ismTemplate: ISMTemplate?): XContentBuilder {
if (ismTemplate == null) {
return nullField(name)
}
return this.field(Policy.ISM_TEMPLATE, ismTemplate)
}

/**
* Retries the given [block] of code as specified by the receiver [BackoffPolicy],
* if [block] throws an [ElasticsearchException] that is retriable (502, 503, 504).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexManagementException
import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.Strings
import org.elasticsearch.common.ValidationException
import org.elasticsearch.common.regex.Regex

private val log = LogManager.getLogger("ISMTemplateService")

/**
* find the matching policy based on ISM template field for the given index
*
* filter out hidden index
* filter out older index than template lastUpdateTime
*
* @param ismTemplates current ISM templates saved in metadata
* @param indexMetadata cluster state index metadata
* @return policyID
*/
@Suppress("ReturnCount")
fun Map<String, ISMTemplate>.findMatchingPolicy(indexMetadata: IndexMetadata): String? {
if (this.isEmpty()) return null

val indexName = indexMetadata.index.name

// don't include hidden index
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
if (isHidden) return null

// only process indices created after template
// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) }
var matchedPolicy: String? = null
var highestPriority: Int = -1
this.filter { (_, template) ->
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}.forEach { (policyID, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched && highestPriority < template.priority) {
highestPriority = template.priority
matchedPolicy = policyID
}
}

return matchedPolicy
}

/**
* validate the template Name and indexPattern provided in the template
*
* get the idea from ES validate function in MetadataIndexTemplateService
* acknowledge https://github.com/a2lin who should be the first contributor
*/
@Suppress("ComplexMethod")
fun validateFormat(indexPatterns: List<String>): ElasticsearchException? {
val indexPatternFormatErrors = mutableListOf<String>()
for (indexPattern in indexPatterns) {
if (indexPattern.contains("#")) {
indexPatternFormatErrors.add("index_pattern [$indexPattern] must not contain a '#'")
}
if (indexPattern.contains(":")) {
indexPatternFormatErrors.add("index_pattern [$indexPattern] must not contain a ':'")
}
if (indexPattern.startsWith("_")) {
indexPatternFormatErrors.add("index_pattern [$indexPattern] must not start with '_'")
}
if (!Strings.validFileNameExcludingAstrix(indexPattern)) {
indexPatternFormatErrors.add("index_pattern [" + indexPattern + "] must not contain the following characters " +
Strings.INVALID_FILENAME_CHARS)
}
}

if (indexPatternFormatErrors.size > 0) {
val validationException = ValidationException()
validationException.addValidationErrors(indexPatternFormatErrors)
return IndexManagementException.wrap(validationException)
}
return null
}

/**
* find policy templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
@Suppress("SpreadOperator")
fun Map<String, ISMTemplate>.findConflictingPolicyTemplates(
candidate: String,
indexPatterns: List<String>,
priority: Int
): Map<String, List<String>> {
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()

// focus on template with same priority
this.filter { it.value.priority == priority }
.forEach { (policyID, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("Existing ism_template for $policyID overlaps candidate $candidate")
overlappingTemplates[policyID] = template.indexPatterns
}
}
overlappingTemplates.remove(candidate)

return overlappingTemplates
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.filterNotNullValues
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyToTemplateMap
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldCreateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -38,6 +42,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SWEEP_PERIOD
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.ISM_TEMPLATE_FIELD
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest
Expand All @@ -61,13 +66,16 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.search.SearchPhaseExecutionException
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.ClusterChangedEvent
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.LocalNodeMasterListener
import org.elasticsearch.cluster.ClusterStateListener
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.component.LifecycleListener
Expand All @@ -79,7 +87,10 @@ import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.Index
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.threadpool.Scheduler
import org.elasticsearch.threadpool.ThreadPool

Expand All @@ -106,7 +117,7 @@ class ManagedIndexCoordinator(
private val clusterService: ClusterService,
private val threadPool: ThreadPool,
indexManagementIndices: IndexManagementIndices
) : LocalNodeMasterListener,
) : ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {

Expand All @@ -122,10 +133,11 @@ class ManagedIndexCoordinator(
BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings))
@Volatile private var jobInterval = JOB_INTERVAL.get(settings)

@Volatile private var isMaster = false

init {
clusterService.addListener(this)
clusterService.addLifecycleListener(this)
clusterService.addLocalNodeMasterListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEP_PERIOD) {
sweepPeriod = it
initBackgroundSweep()
Expand All @@ -142,18 +154,34 @@ class ManagedIndexCoordinator(
}
}

override fun onMaster() {
private fun executorName(): String {
return ThreadPool.Names.MANAGEMENT
}

fun onMaster() {
// Init background sweep when promoted to being master
initBackgroundSweep()
}

override fun offMaster() {
fun offMaster() {
// Cancel background sweep when demoted from being master
scheduledFullSweep?.cancel()
}

@Suppress("ReturnCount")
override fun clusterChanged(event: ClusterChangedEvent) {
// Instead of using a LocalNodeMasterListener to track master changes, this service will
// track them here to avoid conditions where master listener events run after other
// listeners that depend on what happened in the master listener
if (this.isMaster != event.localNodeMaster()) {
this.isMaster = event.localNodeMaster()
if (this.isMaster) {
onMaster()
} else {
offMaster()
}
}

if (!isIndexStateManagementEnabled()) return

if (!event.localNodeMaster()) return
Expand Down Expand Up @@ -242,12 +270,14 @@ class ManagedIndexCoordinator(
* */
var hasCreateRequests = false
val updateManagedIndicesRequests = mutableListOf<DocWriteRequest<*>>()
val indicesWithPolicyID = mutableListOf<String>()
val indicesToRemoveManagedIndexMetaDataFrom = mutableListOf<Index>()
event.state().metadata().indices().forEach {
val previousIndexMetaData = event.previousState().metadata().index(it.value.index)
val policyID = it.value.getPolicyID()
val request: DocWriteRequest<*>? = when {
it.value.shouldCreateManagedIndexConfig(previousIndexMetaData) && policyID != null -> {
indicesWithPolicyID.add(it.value.index.name)
hasCreateRequests = true
managedIndexConfigIndexRequest(it.value.index.name, it.value.indexUUID, policyID, jobInterval)
}
Expand All @@ -261,10 +291,68 @@ class ManagedIndexCoordinator(
if (it.value.shouldDeleteManagedIndexMetaData()) indicesToRemoveManagedIndexMetaDataFrom.add(it.value.index)
}

updateManagedIndices(updateManagedIndicesRequests + indicesDeletedRequests, hasCreateRequests)
// Check if newly created indices matching any ISM templates
var updateMatchingIndexReq = emptyList<DocWriteRequest<*>>()
// filter out indices with policyID, they will be picked up in previous block
val indicesCreated = event.indicesCreated().filter { it !in indicesWithPolicyID }
if (indicesCreated.isNotEmpty()) // only check template match if there are new created indices
updateMatchingIndexReq = getMatchingIndicesUpdateReq(event.state(), indicesCreated)
if (updateMatchingIndexReq.isNotEmpty()) hasCreateRequests = true

updateManagedIndices(updateManagedIndicesRequests + updateMatchingIndexReq + indicesDeletedRequests, hasCreateRequests)
clearManagedIndexMetaData(indicesToRemoveManagedIndexMetaDataFrom)
}

/**
* build requests to create jobs for indices matching ISM templates
*/
suspend fun getMatchingIndicesUpdateReq(clusterState: ClusterState, indexNames: List<String>): List<DocWriteRequest<*>> {
val updateManagedIndexReqs = mutableListOf<DocWriteRequest<*>>()
if (indexNames.isEmpty()) return updateManagedIndexReqs

val indexMetadatas = clusterState.metadata.indices
val templates = getISMTemplates()

val indexToMatchedPolicy = indexNames.map { indexName ->
indexName to templates.findMatchingPolicy(indexMetadatas[indexName])
}.toMap()

indexToMatchedPolicy.filterNotNullValues()
.forEach { (index, policyID) ->
val indexUuid = indexMetadatas[index].indexUUID
if (indexUuid != null) {
logger.info("index [$index] will be managed by policy [$policyID]")
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(index, indexUuid, policyID, jobInterval))
}
}

return updateManagedIndexReqs
}

suspend fun getISMTemplates(): Map<String, ISMTemplate> {
val searchRequest = SearchRequest()
.source(
SearchSourceBuilder().query(
QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD)))
.indices(INDEX_MANAGEMENT_INDEX)

return try {
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
getPolicyToTemplateMap(response).filterNotNullValues()
} catch (ex: IndexNotFoundException) {
emptyMap()
} catch (ex: ClusterBlockException) {
emptyMap()
} catch (e: SearchPhaseExecutionException) {
logger.error("Failed to get ISM templates: $e")
emptyMap()
} catch (e: Exception) {
logger.error("Failed to get ISM templates", e)
emptyMap()
}
}

/**
* Background sweep process that periodically sweeps for updates to ManagedIndices
*
Expand Down Expand Up @@ -302,7 +390,7 @@ class ManagedIndexCoordinator(
}
}

scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, ThreadPool.Names.SAME)
scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, executorName())
}

private fun getFullSweepElapsedTime(): TimeValue =
Expand All @@ -317,6 +405,13 @@ class ManagedIndexCoordinator(
@OpenForTesting
suspend fun sweep() {
val currentManagedIndices = sweepManagedIndexJobs(client, ismIndices.indexManagementIndexExists())

// check all un-managed indices, if matches any ism template
val unManagedIndices = clusterService.state().metadata.indices.values().filterNotNull()
.filter { it.value.indexUUID !in currentManagedIndices.keys }.map { it.value.index.name }
val updateMatchingIndicesReq = getMatchingIndicesUpdateReq(clusterService.state(), unManagedIndices)
updateManagedIndices(updateMatchingIndicesReq, updateMatchingIndicesReq.isNotEmpty())

val clusterStateManagedIndices = sweepClusterState(clusterService.state())

val createManagedIndexRequests =
Expand All @@ -330,6 +425,7 @@ class ManagedIndexCoordinator(
val requests = createManagedIndexRequests + deleteManagedIndexRequests
updateManagedIndices(requests, createManagedIndexRequests.isNotEmpty())
clearManagedIndexMetaData(indicesToDeleteManagedIndexMetaDataFrom)

lastFullSweepTimeNano = System.nanoTime()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ object ManagedIndexRunner : ScheduledJobRunner,
}
}

@Suppress("ReturnCount", "ComplexMethod", "LongMethod")
@Suppress("ReturnCount", "ComplexMethod", "LongMethod", "ComplexCondition")
private suspend fun runManagedIndexConfig(managedIndexConfig: ManagedIndexConfig) {
// doing a check of local cluster health as we do not want to overload master node with potentially a lot of calls
if (clusterIsRed()) {
Expand Down
Loading

0 comments on commit 528fb6c

Please sign in to comment.