Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

new ISM template #383

Merged
merged 26 commits into from
Jan 28, 2021
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
789dd87
start index template API
bowenlan-amzn Nov 18, 2020
c93795b
start to modify put ism API to observe regulation
bowenlan-amzn Dec 9, 2020
964ef61
conform to regulation add template API
bowenlan-amzn Dec 9, 2020
a29eae1
save progress
bowenlan-amzn Dec 9, 2020
5eefbe2
test in progress
bowenlan-amzn Dec 9, 2020
2b1ac28
draft IT
bowenlan-amzn Dec 10, 2020
31889da
simple tests for request response
bowenlan-amzn Dec 10, 2020
80d4519
ktlint
bowenlan-amzn Dec 10, 2020
b9aaa7a
start to clean code
bowenlan-amzn Dec 10, 2020
ff40535
wanna see code cov
bowenlan-amzn Dec 10, 2020
182e3c3
clean up 1
bowenlan-amzn Dec 10, 2020
27b1f6f
try remove seeming not used part in template metadata
bowenlan-amzn Dec 11, 2020
82f368a
clean up
bowenlan-amzn Dec 16, 2020
24a4791
going to clean up
bowenlan-amzn Jan 12, 2021
8c7067c
new implementation
bowenlan-amzn Jan 12, 2021
796fdd5
Merge branch 'master' into ismtemplate2
bowenlan-amzn Jan 21, 2021
5d9fc6b
Merge branch 'ismtemplate2' of github.com:bowenlan-amzn/index-managem…
bowenlan-amzn Jan 22, 2021
b69d224
address Ravi comments
bowenlan-amzn Jan 22, 2021
ead0450
Merge branch 'master' into ismtemplate2
dbbaughe Jan 26, 2021
f3d54e2
Merge branch 'master' into ismtemplate2
bowenlan-amzn Jan 26, 2021
77f9d66
address Drew's comments
bowenlan-amzn Jan 26, 2021
f275a29
suppress detekt complain
bowenlan-amzn Jan 26, 2021
d5578d0
add a test for ISMTemplate Writeable
bowenlan-amzn Jan 27, 2021
31f6452
Merge branch 'master' into ismtemplate2
bowenlan-amzn Jan 28, 2021
f39d371
coordinator consistent
bowenlan-amzn Jan 28, 2021
33b3544
address Mo's comments
bowenlan-amzn Jan 28, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.Policy
import com.amazon.opendistroforelasticsearch.indexmanagement.util.NO_ID
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.utils.LockService
import kotlinx.coroutines.delay
Expand Down Expand Up @@ -70,6 +72,13 @@ fun XContentBuilder.optionalTimeField(name: String, instant: Instant?): XContent
return this.timeField(name, "${name}_in_millis", instant.toEpochMilli())
}

fun XContentBuilder.optionalISMTemplateField(name: String, ismTemplate: ISMTemplate?): XContentBuilder {
if (ismTemplate == null) {
return nullField(name)
}
return this.field(Policy.ISM_TEMPLATE, ismTemplate)
}

/**
* Retries the given [block] of code as specified by the receiver [BackoffPolicy],
* if [block] throws an [ElasticsearchException] that is retriable (502, 503, 504).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement

import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.filterNotNullValues
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.util.IndexManagementException
import org.apache.logging.log4j.LogManager
import org.apache.lucene.util.automaton.Operations
import org.elasticsearch.ElasticsearchException
import org.elasticsearch.cluster.metadata.IndexMetadata
import org.elasticsearch.common.Strings
import org.elasticsearch.common.ValidationException
import org.elasticsearch.common.regex.Regex

private val log = LogManager.getLogger(ISMTemplateService::class.java)

class ISMTemplateService {
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
companion object {
/**
* find the matching template for the index
*
* filter out hidden index
* filter out older index than template lastUpdateTime
*
* @param ismTemplates current ISM templates saved in metadata
* @param indexMetadata cluster state index metadata
* @return policyID
*/
@Suppress("ReturnCount")
fun findMatchingISMTemplate(ismTemplates: Map<String, ISMTemplate>, indexMetadata: IndexMetadata): String? {
val indexName = indexMetadata.index.name
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved

// don't include hidden index
val isHidden = IndexMetadata.INDEX_HIDDEN_SETTING.get(indexMetadata.settings)
log.info("index $indexName is hidden $isHidden")
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
if (isHidden) return null

// only process indices created after template
// traverse all ism templates for matching ones
val patternMatchPredicate = { pattern: String -> Regex.simpleMatch(pattern, indexName) }
val matchedTemplates = mutableMapOf<ISMTemplate, String>()
ismTemplates.filter { (_, template) ->
log.info("index create after template? ${template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate}")
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
template.lastUpdatedTime.toEpochMilli() < indexMetadata.creationDate
}.forEach { (templateName, template) ->
val matched = template.indexPatterns.stream().anyMatch(patternMatchPredicate)
if (matched) matchedTemplates[template] = templateName
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
}

if (matchedTemplates.isEmpty()) return null

// sort by template priority
val winner = matchedTemplates.keys.maxBy { it.priority }
log.info("winner with highest priority is $winner")
return matchedTemplates[winner]
}

/**
* validate the template Name and indexPattern provided in the template
* reusing ES validate function in MetadataIndexTemplateService
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
*/
@Suppress("ComplexMethod")
fun validateFormat(indexPatterns: List<String>): ElasticsearchException? {
val validationErrors = mutableListOf<String>()
for (indexPattern in indexPatterns) {
if (indexPattern.contains(" ")) {
validationErrors.add("index_patterns [$indexPattern] must not contain a space")
}
if (indexPattern.contains(",")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ','")
}
if (indexPattern.contains("#")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a '#'")
}
if (indexPattern.contains(":")) {
validationErrors.add("index_pattern [$indexPattern] must not contain a ':'")
}
if (indexPattern.startsWith("_")) {
validationErrors.add("index_pattern [$indexPattern] must not start with '_'")
}
if (!Strings.validFileNameExcludingAstrix(indexPattern)) {
validationErrors.add("index_pattern [" + indexPattern + "] must not contain the following characters " +
Strings.INVALID_FILENAME_CHARS)
}
}

if (validationErrors.size > 0) {
val validationException = ValidationException()
validationException.addValidationErrors(validationErrors)
return IndexManagementException.wrap(validationException)
}
return null
}

/**
* find templates whose index patterns overlap with given template
*
* @return map of overlapping template name to its index patterns
*/
@Suppress("SpreadOperator")
fun findConflictingISMTemplates(
candidate: String,
indexPatterns: List<String>,
priority: Int,
ismTemplates: Map<String, ISMTemplate?>
): Map<String, List<String>> {
val automaton1 = Regex.simpleMatchToAutomaton(*indexPatterns.toTypedArray())
val overlappingTemplates = mutableMapOf<String, List<String>>()

// focus on template with same priority
ismTemplates.filterNotNullValues()
.filter { it.value.priority == priority }.forEach { (policyID, template) ->
val automaton2 = Regex.simpleMatchToAutomaton(*template.indexPatterns.toTypedArray())
if (!Operations.isEmpty(Operations.intersection(automaton1, automaton2))) {
log.info("existing ism_template in $policyID overlaps candidate $candidate")
overlappingTemplates[policyID] = template.indexPatterns
}
}
overlappingTemplates.remove(candidate)

return overlappingTemplates
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,20 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanageme

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementIndices
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.ISMTemplateService.Companion.findMatchingISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.getPolicyID
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.retry
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.filterNotNullValues
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.ismTemplatesFromSearchResponse
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldCreateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.elasticapi.shouldDeleteManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ISMTemplate
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexConfig
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.model.ManagedIndexMetaData
Expand All @@ -38,6 +43,7 @@ import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagemen
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SWEEP_PERIOD
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataAction
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.UpdateManagedIndexMetaDataRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.ISM_TEMPLATE_FIELD
import com.amazon.opendistroforelasticsearch.indexmanagement.util.OpenForTesting
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.managedIndexConfigIndexRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.indexstatemanagement.util.deleteManagedIndexRequest
Expand All @@ -61,13 +67,15 @@ import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse
import org.elasticsearch.action.bulk.BackoffPolicy
import org.elasticsearch.action.bulk.BulkRequest
import org.elasticsearch.action.bulk.BulkResponse
import org.elasticsearch.action.search.SearchRequest
import org.elasticsearch.action.search.SearchResponse
import org.elasticsearch.action.support.IndicesOptions
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.ClusterChangedEvent
import org.elasticsearch.cluster.ClusterState
import org.elasticsearch.cluster.LocalNodeMasterListener
import org.elasticsearch.cluster.ClusterStateListener
import org.elasticsearch.cluster.block.ClusterBlockException
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.bytes.BytesReference
import org.elasticsearch.common.component.LifecycleListener
Expand All @@ -79,7 +87,10 @@ import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.index.Index
import org.elasticsearch.index.IndexNotFoundException
import org.elasticsearch.index.query.QueryBuilders
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.search.builder.SearchSourceBuilder
import org.elasticsearch.threadpool.Scheduler
import org.elasticsearch.threadpool.ThreadPool

Expand All @@ -106,7 +117,7 @@ class ManagedIndexCoordinator(
private val clusterService: ClusterService,
private val threadPool: ThreadPool,
indexManagementIndices: IndexManagementIndices
) : LocalNodeMasterListener,
) : ClusterStateListener,
CoroutineScope by CoroutineScope(SupervisorJob() + Dispatchers.Default + CoroutineName("ManagedIndexCoordinator")),
LifecycleListener() {

Expand All @@ -122,10 +133,11 @@ class ManagedIndexCoordinator(
BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings))
@Volatile private var jobInterval = JOB_INTERVAL.get(settings)

@Volatile private var isMaster = false

init {
clusterService.addListener(this)
clusterService.addLifecycleListener(this)
clusterService.addLocalNodeMasterListener(this)
clusterService.clusterSettings.addSettingsUpdateConsumer(SWEEP_PERIOD) {
sweepPeriod = it
initBackgroundSweep()
Expand All @@ -142,18 +154,34 @@ class ManagedIndexCoordinator(
}
}

override fun onMaster() {
private fun executorName(): String {
return ThreadPool.Names.MANAGEMENT
}

fun onMaster() {
// Init background sweep when promoted to being master
initBackgroundSweep()
}

override fun offMaster() {
fun offMaster() {
// Cancel background sweep when demoted from being master
scheduledFullSweep?.cancel()
}

@Suppress("ReturnCount")
override fun clusterChanged(event: ClusterChangedEvent) {
// Instead of using a LocalNodeMasterListener to track master changes, this service will
// track them here to avoid conditions where master listener events run after other
// listeners that depend on what happened in the master listener
if (this.isMaster != event.localNodeMaster()) {
this.isMaster = event.localNodeMaster()
if (this.isMaster) {
onMaster()
} else {
offMaster()
}
}

if (!isIndexStateManagementEnabled()) return

if (!event.localNodeMaster()) return
Expand Down Expand Up @@ -261,10 +289,56 @@ class ManagedIndexCoordinator(
if (it.value.shouldDeleteManagedIndexMetaData()) indicesToRemoveManagedIndexMetaDataFrom.add(it.value.index)
}

updateManagedIndices(updateManagedIndicesRequests + indicesDeletedRequests, hasCreateRequests)
// check if newly created indices matching any ISM templates
val updateMatchingIndexReqs = getMatchingIndicesUpdateReqs(event.state(), event.indicesCreated())
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
if (updateMatchingIndexReqs.isNotEmpty()) hasCreateRequests = true

updateManagedIndices(updateManagedIndicesRequests + updateMatchingIndexReqs + indicesDeletedRequests, hasCreateRequests)
clearManagedIndexMetaData(indicesToRemoveManagedIndexMetaDataFrom)
}

/**
* build requests to create jobs for indices matching ISM templates
*/
suspend fun getMatchingIndicesUpdateReqs(clusterState: ClusterState, indexNames: List<String>): List<DocWriteRequest<*>> {
val indexMetadatas = clusterState.metadata.indices
val templates = getISMTemplates()
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved

val indexToMatchedPolicy = indexNames.map { indexName ->
indexName to findMatchingISMTemplate(templates, indexMetadatas[indexName])
}.toMap()

val updateManagedIndexReqs = mutableListOf<DocWriteRequest<*>>()
indexToMatchedPolicy.filterNotNullValues()
.forEach { (index, policyID) ->
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
val indexUuid = indexMetadatas[index].indexUUID
if (indexUuid != null) {
logger.info("auto manage index $index to policy $policyID")
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
updateManagedIndexReqs.add(
managedIndexConfigIndexRequest(index, indexUuid, policyID, jobInterval))
}
}

return updateManagedIndexReqs
}

suspend fun getISMTemplates(): Map<String, ISMTemplate> {
val searchRequest = SearchRequest()
.source(
SearchSourceBuilder().query(
QueryBuilders.existsQuery(ISM_TEMPLATE_FIELD)))
.indices(INDEX_MANAGEMENT_INDEX)

return try {
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
val response: SearchResponse = client.suspendUntil { search(searchRequest, it) }
ismTemplatesFromSearchResponse(response).filterNotNullValues()
} catch (ex: IndexNotFoundException) {
emptyMap()
} catch (ex: ClusterBlockException) {
emptyMap()
bowenlan-amzn marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
* Background sweep process that periodically sweeps for updates to ManagedIndices
*
Expand Down Expand Up @@ -302,7 +376,7 @@ class ManagedIndexCoordinator(
}
}

scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, ThreadPool.Names.SAME)
scheduledFullSweep = threadPool.scheduleWithFixedDelay(scheduledSweep, sweepPeriod, executorName())
}

private fun getFullSweepElapsedTime(): TimeValue =
Expand All @@ -317,6 +391,13 @@ class ManagedIndexCoordinator(
@OpenForTesting
suspend fun sweep() {
val currentManagedIndices = sweepManagedIndexJobs(client, ismIndices.indexManagementIndexExists())

// check all un-managed indices, if matches any ism template
val unManagedIndices = clusterService.state().metadata.indices.values().filterNotNull()
.filter { it.value.indexUUID !in currentManagedIndices.keys }.map { it.value.index.name }
val updateMatchingIndicesReqs = getMatchingIndicesUpdateReqs(clusterService.state(), unManagedIndices)
updateManagedIndices(updateMatchingIndicesReqs, updateMatchingIndicesReqs.isNotEmpty())

val clusterStateManagedIndices = sweepClusterState(clusterService.state())

val createManagedIndexRequests =
Expand All @@ -330,6 +411,7 @@ class ManagedIndexCoordinator(
val requests = createManagedIndexRequests + deleteManagedIndexRequests
updateManagedIndices(requests, createManagedIndexRequests.isNotEmpty())
clearManagedIndexMetaData(indicesToDeleteManagedIndexMetaDataFrom)

lastFullSweepTimeNano = System.nanoTime()
}

Expand Down
Loading