Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds cluster setting to configure index state management jitter #153

Merged
merged 2 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ class IndexManagementPlugin : JobSchedulerExtension, NetworkPlugin, ActionPlugin
ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED,
ManagedIndexSettings.METADATA_SERVICE_ENABLED,
ManagedIndexSettings.AUTO_MANAGE,
ManagedIndexSettings.JITTER,
ManagedIndexSettings.JOB_INTERVAL,
ManagedIndexSettings.SWEEP_PERIOD,
ManagedIndexSettings.COORDINATOR_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndex
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_COUNT
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.COORDINATOR_BACKOFF_MILLIS
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.INDEX_STATE_MANAGEMENT_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JITTER
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.JOB_INTERVAL
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.METADATA_SERVICE_ENABLED
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings.Companion.SWEEP_PERIOD
Expand Down Expand Up @@ -145,6 +146,7 @@ class ManagedIndexCoordinator(
@Volatile private var retryPolicy =
BackoffPolicy.constantBackoff(COORDINATOR_BACKOFF_MILLIS.get(settings), COORDINATOR_BACKOFF_COUNT.get(settings))
@Volatile private var jobInterval = JOB_INTERVAL.get(settings)
@Volatile private var jobJitter = JITTER.get(settings)

@Volatile private var isMaster = false

Expand All @@ -158,6 +160,9 @@ class ManagedIndexCoordinator(
clusterService.clusterSettings.addSettingsUpdateConsumer(JOB_INTERVAL) {
jobInterval = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(JITTER) {
jobJitter = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(INDEX_STATE_MANAGEMENT_ENABLED) {
indexStateManagementEnabled = it
if (!indexStateManagementEnabled) disable() else enable()
Expand Down Expand Up @@ -328,7 +333,8 @@ class ManagedIndexCoordinator(
indexUuid,
policy.id,
jobInterval,
policy
policy,
jobJitter
thalurur marked this conversation as resolved.
Show resolved Hide resolved
)
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ data class ManagedIndexConfig(
val policySeqNo: Long?,
val policyPrimaryTerm: Long?,
val policy: Policy?,
val changePolicy: ChangePolicy?
val changePolicy: ChangePolicy?,
val jobJitter: Double?
) : ScheduledJobParameter {

init {
Expand All @@ -79,6 +80,10 @@ data class ManagedIndexConfig(

override fun getLockDurationSeconds(): Long = 3600L // 1 hour

override fun getJitter(): Double? {
return jobJitter
}
thalurur marked this conversation as resolved.
Show resolved Hide resolved

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder
.startObject()
Expand All @@ -95,6 +100,7 @@ data class ManagedIndexConfig(
.field(POLICY_PRIMARY_TERM_FIELD, policyPrimaryTerm)
.field(POLICY_FIELD, policy, XCONTENT_WITHOUT_TYPE)
.field(CHANGE_POLICY_FIELD, changePolicy)
.field(JITTER, jobJitter)
builder.endObject()
return builder.endObject()
}
Expand All @@ -114,6 +120,7 @@ data class ManagedIndexConfig(
const val POLICY_SEQ_NO_FIELD = "policy_seq_no"
const val POLICY_PRIMARY_TERM_FIELD = "policy_primary_term"
const val CHANGE_POLICY_FIELD = "change_policy"
const val JITTER = "jitter"

@Suppress("ComplexMethod", "LongMethod")
@JvmStatic
Expand All @@ -137,6 +144,7 @@ data class ManagedIndexConfig(
var enabled = true
var policyPrimaryTerm: Long? = SequenceNumbers.UNASSIGNED_PRIMARY_TERM
var policySeqNo: Long? = SequenceNumbers.UNASSIGNED_SEQ_NO
var jitter: Double? = null

ensureExpectedToken(Token.START_OBJECT, xcp.currentToken(), xcp)
while (xcp.nextToken() != Token.END_OBJECT) {
Expand Down Expand Up @@ -164,6 +172,9 @@ data class ManagedIndexConfig(
CHANGE_POLICY_FIELD -> {
changePolicy = if (xcp.currentToken() == Token.VALUE_NULL) null else ChangePolicy.parse(xcp)
}
JITTER -> {
jitter = if (xcp.currentToken() == Token.VALUE_NULL) null else xcp.doubleValue()
}
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in ManagedIndexConfig.")
}
}
Expand Down Expand Up @@ -192,7 +203,8 @@ data class ManagedIndexConfig(
seqNo = policySeqNo ?: SequenceNumbers.UNASSIGNED_SEQ_NO,
primaryTerm = policyPrimaryTerm ?: SequenceNumbers.UNASSIGNED_PRIMARY_TERM
),
changePolicy = changePolicy
changePolicy = changePolicy,
jobJitter = jitter
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ class ManagedIndexSettings {
const val DEFAULT_ISM_ENABLED = true
const val DEFAULT_METADATA_SERVICE_ENABLED = true
const val DEFAULT_JOB_INTERVAL = 5
const val DEFAULT_JITTER = 0.6
private val ALLOW_LIST_ALL = ActionConfig.ActionType.values().toList().map { it.type }
val ALLOW_LIST_NONE = emptyList<String>()
val SNAPSHOT_DENY_LIST_NONE = emptyList<String>()
Expand Down Expand Up @@ -179,5 +180,14 @@ class ManagedIndexSettings {
Setting.Property.NodeScope,
Setting.Property.Dynamic
)

val JITTER: Setting<Double> = Setting.doubleSetting(
"plugins.index_state_management.jitter",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Do you think it makes sense to set this setting at index_management level so it can be applied to other features of index management in future if needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that if a jitter setting is added to the other features of index management then it would make sense to have the setting feature-scoped by adding it as a setting at the transforms or rollups level, as I don't think a user's setting of jitter for index management would directly relate to how that user might want to configure a potential rollup jitter.

DEFAULT_JITTER,
0.0,
1.0,
Setting.Property.NodeScope,
Setting.Property.Dynamic
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,16 @@ class TransportAddPolicyAction @Inject constructor(
) {

@Volatile private var jobInterval = ManagedIndexSettings.JOB_INTERVAL.get(settings)
@Volatile private var jobJitter = ManagedIndexSettings.JITTER.get(settings)
@Volatile private var filterByEnabled = IndexManagementSettings.FILTER_BY_BACKEND_ROLES.get(settings)

init {
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.JOB_INTERVAL) {
jobInterval = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(ManagedIndexSettings.JITTER) {
jobJitter = it
}
clusterService.clusterSettings.addSettingsUpdateConsumer(IndexManagementSettings.FILTER_BY_BACKEND_ROLES) {
filterByEnabled = it
}
Expand Down Expand Up @@ -329,7 +333,7 @@ class TransportAddPolicyAction @Inject constructor(
val bulkReq = BulkRequest().timeout(TimeValue.timeValueMillis(bulkReqTimeout))
indicesToAdd.forEach { (uuid, name) ->
bulkReq.add(
managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user))
managedIndexConfigIndexRequest(name, uuid, request.policyID, jobInterval, policy = policy.copy(user = this.user), jobJitter)
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,14 @@ import java.net.InetAddress
import java.time.Instant
import java.time.temporal.ChronoUnit

@Suppress("LongParameterList")
fun managedIndexConfigIndexRequest(
index: String,
uuid: String,
policyID: String,
jobInterval: Int,
policy: Policy? = null
policy: Policy? = null,
jobJitter: Double?
): IndexRequest {
val managedIndexConfig = ManagedIndexConfig(
jobName = index,
Expand All @@ -95,7 +97,8 @@ fun managedIndexConfigIndexRequest(
policy = policy,
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
changePolicy = null
changePolicy = null,
jobJitter = jobJitter
)

return IndexRequest(INDEX_MANAGEMENT_INDEX)
Expand Down
5 changes: 4 additions & 1 deletion src/main/resources/mappings/opendistro-ism-config.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"_meta" : {
"schema_version": 11
"schema_version": 12
},
"dynamic": "strict",
"properties": {
Expand Down Expand Up @@ -614,6 +614,9 @@
}
}
}
},
"jitter": {
"type": "double"
downsrob marked this conversation as resolved.
Show resolved Hide resolved
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import javax.management.remote.JMXServiceURL

abstract class IndexManagementRestTestCase : ODFERestTestCase() {

val configSchemaVersion = 11
val configSchemaVersion = 12
val historySchemaVersion = 3

// Having issues with tests leaking into other tests and mappings being incorrect and they are not caught by any pending task wait check as
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ class IndexManagementSettingsTests : OpenSearchTestCase() {
ManagedIndexSettings.COORDINATOR_BACKOFF_MILLIS,
ManagedIndexSettings.ALLOW_LIST,
ManagedIndexSettings.SNAPSHOT_DENY_LIST,
ManagedIndexSettings.JITTER,
RollupSettings.ROLLUP_INGEST_BACKOFF_COUNT,
RollupSettings.ROLLUP_INGEST_BACKOFF_MILLIS,
RollupSettings.ROLLUP_SEARCH_BACKOFF_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package org.opensearch.indexmanagement.indexstatemanagement

import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity
import org.junit.Before
import org.opensearch.OpenSearchParseException
import org.opensearch.action.ActionRequest
import org.opensearch.action.ActionResponse
Expand All @@ -54,6 +55,7 @@ import org.opensearch.indexmanagement.indexstatemanagement.model.Policy
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.PolicyRetryInfoMetaData
import org.opensearch.indexmanagement.indexstatemanagement.model.managedindexmetadata.StateMetaData
import org.opensearch.indexmanagement.indexstatemanagement.resthandler.RestExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.settings.ManagedIndexSettings
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.ExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.explain.TransportExplainAction
import org.opensearch.indexmanagement.indexstatemanagement.transport.action.updateindexmetadata.TransportUpdateManagedIndexMetaDataAction
Expand All @@ -73,6 +75,11 @@ import java.time.Duration
import java.time.Instant

abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
@Before
fun disableIndexStateManagementJitter() {
// jitter would add a test-breaking delay to the integration tests
updateIndexStateManagementJitterSetting(0.0)
}

protected val isMixedNodeRegressionTest = System.getProperty("cluster.mixed", "false")!!.toBoolean()

Expand Down Expand Up @@ -357,4 +364,8 @@ abstract class IndexStateManagementIntegTestCase : OpenSearchIntegTestCase() {
)
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateIndexStateManagementJitterSetting(value: Double?) {
updateClusterSetting(ManagedIndexSettings.JITTER.key, value.toString(), false)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.http.HttpHeaders
import org.apache.http.entity.ContentType.APPLICATION_JSON
import org.apache.http.entity.StringEntity
import org.apache.http.message.BasicHeader
import org.junit.Before
import org.opensearch.OpenSearchParseException
import org.opensearch.action.get.GetResponse
import org.opensearch.action.search.SearchResponse
Expand Down Expand Up @@ -95,6 +96,12 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
val explainResponseOpendistroPolicyIdSetting = "index.opendistro.index_state_management.policy_id"
val explainResponseOpenSearchPolicyIdSetting = "index.plugins.index_state_management.policy_id"

@Before
protected fun disableIndexStateManagementJitter() {
// jitter would add a test-breaking delay to the integration tests
updateIndexStateManagementJitterSetting(0.0)
}

protected fun createPolicy(
policy: Policy,
policyId: String = OpenSearchTestCase.randomAlphaOfLength(10),
Expand Down Expand Up @@ -268,6 +275,10 @@ abstract class IndexStateManagementRestTestCase : IndexManagementRestTestCase()
assertEquals("Request failed", RestStatus.OK, res.restStatus())
}

protected fun updateIndexStateManagementJitterSetting(value: Double) {
updateClusterSetting(ManagedIndexSettings.JITTER.key, value.toString(), false)
}

protected fun updateIndexSetting(
index: String,
key: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class MetadataRegressionIT : IndexStateManagementIntegTestCase() {
fun cleanClusterSetting() {
// need to clean up otherwise will throw error
updateClusterSetting(ManagedIndexSettings.METADATA_SERVICE_ENABLED.key, null, false)
updateIndexStateManagementJitterSetting(null)
}

fun `test move metadata service`() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ fun randomManagedIndexConfig(
enabledTime: Instant? = if (enabled) Instant.now().truncatedTo(ChronoUnit.MILLIS) else null,
policyID: String = OpenSearchRestTestCase.randomAlphaOfLength(10),
policy: Policy? = randomPolicy(),
changePolicy: ChangePolicy? = randomChangePolicy()
changePolicy: ChangePolicy? = randomChangePolicy(),
jitter: Double? = 0.0
): ManagedIndexConfig {
return ManagedIndexConfig(
jobName = name,
Expand All @@ -289,7 +290,8 @@ fun randomManagedIndexConfig(
policySeqNo = policy?.seqNo,
policyPrimaryTerm = policy?.primaryTerm,
policy = policy?.copy(seqNo = SequenceNumbers.UNASSIGNED_SEQ_NO, primaryTerm = SequenceNumbers.UNASSIGNED_PRIMARY_TERM),
changePolicy = changePolicy
changePolicy = changePolicy,
jobJitter = jitter
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ class ManagedIndexCoordinatorTests : OpenSearchAllocationTestCase() {
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
settingSet.add(ManagedIndexSettings.SWEEP_PERIOD)
settingSet.add(ManagedIndexSettings.JITTER)
settingSet.add(ManagedIndexSettings.JOB_INTERVAL)
settingSet.add(ManagedIndexSettings.INDEX_STATE_MANAGEMENT_ENABLED)
settingSet.add(ManagedIndexSettings.METADATA_SERVICE_ENABLED)
Expand Down
Loading