From 4a4d707ee9cc6a7901a820bff89f94ea69da3e62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20Bry=C5=82a?= <31926560+dabr-grapeup@users.noreply.github.com> Date: Thu, 14 May 2020 21:05:15 +0200 Subject: [PATCH] Snapshot implementation (#135) * Snapshot implementation Action has two steps: attempt_snapshot and wait_for_snapshot * Add timestamp to snapshot * added missing license headers replaced LinkedHashMap in SnapshotAction handle waitForSnapshotStep is not completed added usermetadata added hash at the end of the name set explicit waitForCompletion using CONDITION_NOT_MET * added snapshot name hash * implemented isIdempotent method * Do not fail snapshot if another one is in progress + Restore snapshot name with timestamp * replace wildcard imports and set isIdempotent to false * removed includeGlobalState from Snapshotaction Co-authored-by: Daniel Bryla Co-authored-by: Javier Abrego --- .../action/SnapshotAction.kt | 56 +++++++++ .../model/action/ActionConfig.kt | 4 +- .../model/action/SnapshotActionConfig.kt | 90 ++++++++++++++ .../step/snapshot/AttemptSnapshotStep.kt | 111 ++++++++++++++++++ .../step/snapshot/WaitForSnapshotStep.kt | 79 +++++++++++++ .../IndexStateManagementRestTestCase.kt | 58 +++++++++ .../indexstatemanagement/TestHelpers.kt | 10 ++ .../action/NotificationActionIT.kt | 2 +- .../action/SnapshotActionIT.kt | 70 +++++++++++ .../model/ActionConfigTests.kt | 13 ++ .../model/XContentTests.kt | 9 ++ 11 files changed, 500 insertions(+), 2 deletions(-) create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotAction.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/SnapshotActionConfig.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt create mode 100644 src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt create mode 100644 src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotAction.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotAction.kt new file mode 100644 index 000000000..6e21f238e --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotAction.kt @@ -0,0 +1,56 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.action + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig.ActionType +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot.AttemptSnapshotStep +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot.WaitForSnapshotStep +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService + +class SnapshotAction( + clusterService: ClusterService, + client: Client, + managedIndexMetaData: ManagedIndexMetaData, + config: SnapshotActionConfig +) : Action(ActionType.SNAPSHOT, config, managedIndexMetaData) { + private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, client, config, managedIndexMetaData) + private val waitForSnapshotStep = WaitForSnapshotStep(clusterService, client, config, managedIndexMetaData) + + override fun getSteps(): List = listOf(attemptSnapshotStep, waitForSnapshotStep) + + @Suppress("ReturnCount") + override fun getStepToExecute(): Step { + // If stepMetaData is null, return the first step + val stepMetaData = managedIndexMetaData.stepMetaData ?: return attemptSnapshotStep + + // If the current step has completed, return the next step + if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) { + return when (stepMetaData.name) { + AttemptSnapshotStep.name -> waitForSnapshotStep + else -> attemptSnapshotStep + } + } + + return when (stepMetaData.name) { + AttemptSnapshotStep.name -> attemptSnapshotStep + else -> waitForSnapshotStep + } + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/ActionConfig.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/ActionConfig.kt index f2e7df1f2..6eb0ac00c 100644 --- a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/ActionConfig.kt +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/ActionConfig.kt @@ -61,7 +61,8 @@ abstract class ActionConfig( READ_WRITE("read_write"), REPLICA_COUNT("replica_count"), FORCE_MERGE("force_merge"), - NOTIFICATION("notification"); + NOTIFICATION("notification"), + SNAPSHOT("snapshot"); override fun toString(): String { return type @@ -94,6 +95,7 @@ abstract class ActionConfig( ActionType.REPLICA_COUNT.type -> actionConfig = ReplicaCountActionConfig.parse(xcp, index) ActionType.FORCE_MERGE.type -> actionConfig = ForceMergeActionConfig.parse(xcp, index) ActionType.NOTIFICATION.type -> actionConfig = NotificationActionConfig.parse(xcp, index) + ActionType.SNAPSHOT.type -> actionConfig = SnapshotActionConfig.parse(xcp, index) else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Action.") } } diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/SnapshotActionConfig.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/SnapshotActionConfig.kt new file mode 100644 index 000000000..5f7fe0e20 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/action/SnapshotActionConfig.kt @@ -0,0 +1,90 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.Action +import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.SnapshotAction +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.common.xcontent.ToXContent +import org.elasticsearch.common.xcontent.ToXContentObject +import org.elasticsearch.common.xcontent.XContentBuilder +import org.elasticsearch.common.xcontent.XContentParser +import org.elasticsearch.common.xcontent.XContentParserUtils +import org.elasticsearch.script.ScriptService +import java.io.IOException + +data class SnapshotActionConfig( + val repository: String?, + val snapshot: String?, + val index: Int +) : ToXContentObject, ActionConfig(ActionType.SNAPSHOT, index) { + + init { + require(repository != null) { "SnapshotActionConfig repository must be specified" } + require(snapshot != null) { "SnapshotActionConfig snapshot must be specified" } + } + + override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder { + builder.startObject() + super.toXContent(builder, params) + .startObject(ActionType.SNAPSHOT.type) + if (repository != null) builder.field(REPOSITORY_FIELD, repository) + if (snapshot != null) builder.field(SNAPSHOT_FIELD, snapshot) + return builder.endObject().endObject() + } + + override fun isFragment(): Boolean = super.isFragment() + + override fun toAction( + clusterService: ClusterService, + scriptService: ScriptService, + client: Client, + managedIndexMetaData: ManagedIndexMetaData + ): Action = SnapshotAction(clusterService, client, managedIndexMetaData, this) + + companion object { + const val REPOSITORY_FIELD = "repository" + const val SNAPSHOT_FIELD = "snapshot" + const val INCLUDE_GLOBAL_STATE = "include_global_state" + + @JvmStatic + @Throws(IOException::class) + fun parse(xcp: XContentParser, index: Int): SnapshotActionConfig { + var repository: String? = null + var snapshot: String? = null + + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation) + while (xcp.nextToken() != XContentParser.Token.END_OBJECT) { + val fieldName = xcp.currentName() + xcp.nextToken() + + when (fieldName) { + REPOSITORY_FIELD -> repository = xcp.text() + SNAPSHOT_FIELD -> snapshot = xcp.text() + else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in SnapshotActionConfig.") + } + } + + return SnapshotActionConfig( + repository = repository, + snapshot = snapshot, + index = index + ) + } + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt new file mode 100644 index 000000000..65033dd07 --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/AttemptSnapshotStep.kt @@ -0,0 +1,111 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step +import org.apache.logging.log4j.LogManager +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest +import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService +import org.elasticsearch.rest.RestStatus +import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException +import java.time.LocalDateTime +import java.time.ZoneId +import java.time.format.DateTimeFormatter +import java.util.Locale + +class AttemptSnapshotStep( + val clusterService: ClusterService, + val client: Client, + val config: SnapshotActionConfig, + managedIndexMetaData: ManagedIndexMetaData +) : Step(name, managedIndexMetaData) { + + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + override fun isIdempotent() = false + + @Suppress("TooGenericExceptionCaught") + override suspend fun execute() { + try { + logger.info("Executing snapshot on ${managedIndexMetaData.index}") + val snapshotName = config + .snapshot + .plus("-") + .plus(LocalDateTime + .now(ZoneId.of("UTC")) + .format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT))) + val mutableInfo = mutableMapOf("snapshotName" to snapshotName) + + val createSnapshotRequest = CreateSnapshotRequest() + .userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management")) + .indices(managedIndexMetaData.index) + .snapshot(snapshotName) + .repository(config.repository) + .waitForCompletion(false) + + val response: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(createSnapshotRequest, it) } + when (response.status()) { + RestStatus.ACCEPTED -> { + stepStatus = StepStatus.COMPLETED + mutableInfo["message"] = "Snapshot creation started and is still in progress for index: ${managedIndexMetaData.index}" + } + RestStatus.OK -> { + stepStatus = StepStatus.COMPLETED + mutableInfo["message"] = "Snapshot created for index: ${managedIndexMetaData.index}" + } + else -> { + stepStatus = StepStatus.FAILED + mutableInfo["message"] = "There was an error during snapshot creation for index: ${managedIndexMetaData.index}" + mutableInfo["cause"] = response.toString() + } + } + info = mutableInfo.toMap() + } catch (e: ConcurrentSnapshotExecutionException) { + val message = "Snapshot creation already in progress." + logger.debug(message, e) + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to message) + } catch (e: Exception) { + val message = "Failed to create snapshot for index: ${managedIndexMetaData.index}" + logger.error(message, e) + stepStatus = StepStatus.FAILED + val mutableInfo = mutableMapOf("message" to message) + val errorMessage = e.message + if (errorMessage != null) mutableInfo["cause"] = errorMessage + info = mutableInfo.toMap() + } + } + + override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { + return currentMetaData.copy( + stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), + transitionTo = null, + info = info + ) + } + + companion object { + const val name = "attempt_snapshot" + } +} diff --git a/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt new file mode 100644 index 000000000..fc042aeee --- /dev/null +++ b/src/main/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/step/snapshot/WaitForSnapshotStep.kt @@ -0,0 +1,79 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData +import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step +import org.apache.logging.log4j.LogManager +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest +import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse +import org.elasticsearch.client.Client +import org.elasticsearch.cluster.service.ClusterService + +class WaitForSnapshotStep( + val clusterService: ClusterService, + val client: Client, + val config: SnapshotActionConfig, + managedIndexMetaData: ManagedIndexMetaData +) : Step(name, managedIndexMetaData) { + private val logger = LogManager.getLogger(javaClass) + private var stepStatus = StepStatus.STARTING + private var info: Map? = null + + override fun isIdempotent() = true + + override suspend fun execute() { + logger.info("Waiting for snapshot to complete...") + val request = SnapshotsStatusRequest() + .snapshots(arrayOf(managedIndexMetaData.info?.get("snapshotName").toString())) + .repository(config.repository) + val response: SnapshotsStatusResponse = client.admin().cluster().suspendUntil { snapshotsStatus(request, it) } + val status: SnapshotStatus? = response + .snapshots + .find { snapshotStatus -> + snapshotStatus.snapshot.snapshotId.name == managedIndexMetaData.info?.get("snapshotName").toString() && + snapshotStatus.snapshot.repository == config.repository + } + if (status != null) { + if (status.state.completed()) { + stepStatus = StepStatus.COMPLETED + info = mapOf("message" to "Snapshot created for index: ${managedIndexMetaData.index}") + } else { + stepStatus = StepStatus.CONDITION_NOT_MET + info = mapOf("message" to "Creating snapshot in progress for index: ${managedIndexMetaData.index}") + } + } else { + stepStatus = StepStatus.FAILED + info = mapOf("message" to "Snapshot doesn't exist for index: ${managedIndexMetaData.index}") + } + } + + override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData { + return currentMetaData.copy( + stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus), + transitionTo = null, + info = info + ) + } + + companion object { + const val name = "wait_for_snapshot" + } +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt index 10152efd4..68f5f30e9 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/IndexStateManagementRestTestCase.kt @@ -41,6 +41,7 @@ import org.apache.http.HttpHeaders import org.apache.http.entity.ContentType.APPLICATION_JSON import org.apache.http.entity.StringEntity import org.apache.http.message.BasicHeader +import org.elasticsearch.ElasticsearchParseException import org.elasticsearch.action.search.SearchResponse import org.elasticsearch.client.Request import org.elasticsearch.client.Response @@ -48,6 +49,7 @@ import org.elasticsearch.client.RestClient import org.elasticsearch.cluster.metadata.IndexMetaData import org.elasticsearch.common.settings.Settings import org.elasticsearch.common.unit.TimeValue +import org.elasticsearch.common.xcontent.DeprecationHandler import org.elasticsearch.common.xcontent.LoggingDeprecationHandler import org.elasticsearch.common.xcontent.NamedXContentRegistry import org.elasticsearch.common.xcontent.XContentParser.Token @@ -62,6 +64,7 @@ import org.elasticsearch.test.ESTestCase import org.elasticsearch.test.rest.ESRestTestCase import org.junit.AfterClass import org.junit.rules.DisableOnDebug +import java.io.IOException import java.time.Duration import java.time.Instant import java.util.Locale @@ -417,6 +420,61 @@ abstract class IndexStateManagementRestTestCase : ESRestTestCase() { return metadata } + protected fun createRepository( + repository: String + ) { + val path = getRepoPath() + val response = client() + .makeRequest( + "PUT", + "_snapshot/$repository", + emptyMap(), + StringEntity("{\"type\":\"fs\", \"settings\": {\"location\": \"$path\"}}", APPLICATION_JSON) + ) + assertEquals("Unable to create a new repository", RestStatus.OK, response.restStatus()) + } + + @Suppress("UNCHECKED_CAST") + private fun getRepoPath(): String { + val response = client() + .makeRequest( + "GET", + "_nodes", + emptyMap() + ) + assertEquals("Unable to get a nodes settings", RestStatus.OK, response.restStatus()) + return ((response.asMap()["nodes"] as HashMap>>>).values.first()["settings"]!!["path"]!!["repo"] as List)[0] + } + + private fun getSnapshotsList(repository: String): List { + val response = client() + .makeRequest( + "GET", + "_cat/snapshots/$repository?format=json", + emptyMap() + ) + assertEquals("Unable to get a snapshot", RestStatus.OK, response.restStatus()) + try { + return jsonXContent + .createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, response.entity.content) + .use { parser -> parser.list() } + } catch (e: IOException) { + throw ElasticsearchParseException("Failed to parse content to list", e) + } + } + + @Suppress("UNCHECKED_CAST") + protected fun assertSnapshotExists( + repository: String, + snapshot: String + ) = require(getSnapshotsList(repository).any { element -> (element as Map)["id"]!!.contains(snapshot) }) { "No snapshot found with id: $snapshot" } + + @Suppress("UNCHECKED_CAST") + protected fun assertSnapshotFinishedWithSuccess( + repository: String, + snapshot: String + ) = require(getSnapshotsList(repository).any { element -> (element as Map)["id"]!!.contains(snapshot) && "SUCCESS" == element["status"] }) { "Snapshot didn't finish with success." } + /** * Compares responses returned by APIs such as those defined in [RetryFailedManagedIndexAction] and [RestAddPolicyAction] * diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt index 815ff22ad..a6639a7cd 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/TestHelpers.kt @@ -32,6 +32,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.R import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReadWriteActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ReplicaCountActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.RolloverActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.ClusterStateManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.coordinator.SweptManagedIndexConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.destination.Chime @@ -196,6 +197,10 @@ fun randomTemplateScript( params: Map = emptyMap() ): Script = Script(ScriptType.INLINE, Script.DEFAULT_TEMPLATE_LANG, source, params) +fun randomSnapshotActionConfig(repository: String? = null, snapshot: String? = null): SnapshotActionConfig { + return SnapshotActionConfig(repository, snapshot, index = 0) +} + /** * Helper functions for creating a random Conditions object */ @@ -360,6 +365,11 @@ fun ManagedIndexConfig.toJsonString(): String { return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() } +fun SnapshotActionConfig.toJsonString(): String { + val builder = XContentFactory.jsonBuilder() + return this.toXContent(builder, ToXContent.EMPTY_PARAMS).string() +} + /** * Wrapper for [RestClient.performRequest] which was deprecated in ES 6.5 and is used in tests. This provides * a single place to suppress deprecation warnings. This will probably need further work when the API is removed entirely diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/NotificationActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/NotificationActionIT.kt index 6677e3f9b..b9fc8e4c6 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/NotificationActionIT.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/NotificationActionIT.kt @@ -28,7 +28,7 @@ import org.elasticsearch.script.Script import org.elasticsearch.script.ScriptType import java.time.Instant import java.time.temporal.ChronoUnit -import java.util.* +import java.util.Locale class NotificationActionIT : IndexStateManagementRestTestCase() { diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt new file mode 100644 index 000000000..cfaf0e83a --- /dev/null +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/action/SnapshotActionIT.kt @@ -0,0 +1,70 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.indexstatemanagement.action + +import com.amazon.opendistroforelasticsearch.indexstatemanagement.IndexStateManagementRestTestCase +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.Policy +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.State +import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomErrorNotification +import com.amazon.opendistroforelasticsearch.indexstatemanagement.waitFor +import java.time.Instant +import java.time.temporal.ChronoUnit +import java.util.Locale + +class SnapshotActionIT : IndexStateManagementRestTestCase() { + + private val testIndexName = javaClass.simpleName.toLowerCase(Locale.ROOT) + + fun `test basic`() { + val indexName = "${testIndexName}_index_1" + val policyID = "${testIndexName}_testPolicyName_1" + val repository = "repository" + val snapshot = "snapshot" + val actionConfig = SnapshotActionConfig(repository, snapshot, 0) + val states = listOf( + State("Snapshot", listOf(actionConfig), listOf()) + ) + + createRepository(repository) + + val policy = Policy( + id = policyID, + description = "$testIndexName description", + schemaVersion = 1L, + lastUpdatedTime = Instant.now().truncatedTo(ChronoUnit.MILLIS), + errorNotification = randomErrorNotification(), + defaultState = states[0].name, + states = states + ) + createPolicy(policy, policyID) + createIndex(indexName, policyID) + + val managedIndexConfig = getExistingManagedIndexConfig(indexName) + + // Change the start time so the job will trigger in 2 seconds. + updateManagedIndexConfigStartTime(managedIndexConfig) + + waitFor { assertEquals(policyID, getExplainManagedIndexMetaData(indexName).policyID) } + + // Need to wait two cycles for wait for snapshot step + updateManagedIndexConfigStartTime(managedIndexConfig) + Thread.sleep(3000) + + waitFor { assertSnapshotExists(repository, snapshot) } + waitFor { assertSnapshotFinishedWithSuccess(repository, snapshot) } + } +} diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/ActionConfigTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/ActionConfigTests.kt index 81c64bea2..134803032 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/ActionConfigTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/ActionConfigTests.kt @@ -19,6 +19,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.A import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionTimeout import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomForceMergeActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomRolloverActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomSnapshotActionConfig import org.elasticsearch.common.unit.ByteSizeValue import org.elasticsearch.common.unit.TimeValue import org.elasticsearch.test.ESTestCase @@ -55,4 +56,16 @@ class ActionConfigTests : ESTestCase() { randomForceMergeActionConfig(maxNumSegments = 0) } } + + fun `test snapshot action empty snapshot fails`() { + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for snapshot equals to null") { + randomSnapshotActionConfig(repository = "repository") + } + } + + fun `test snapshot action empty repository fails`() { + assertFailsWith(IllegalArgumentException::class, "Expected IllegalArgumentException for repository equals to null") { + randomSnapshotActionConfig(snapshot = "snapshot") + } + } } diff --git a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt index d26b450b7..ca52e289d 100644 --- a/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt +++ b/src/test/kotlin/com/amazon/opendistroforelasticsearch/indexstatemanagement/model/XContentTests.kt @@ -27,6 +27,7 @@ import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomReadOnly import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomReadWriteActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomReplicaCountActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomRolloverActionConfig +import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomSnapshotActionConfig import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomState import com.amazon.opendistroforelasticsearch.indexstatemanagement.randomTransition import com.amazon.opendistroforelasticsearch.indexstatemanagement.toJsonString @@ -133,6 +134,14 @@ class XContentTests : ESTestCase() { assertEquals("Round tripping NotificationActionConfig doesn't work", notificationActionConfig, parsedNotificationActionConfig) } + fun `test snapshot action config parsing`() { + val snapshotActionConfig = randomSnapshotActionConfig("repository", "snapshot") + + val snapshotActionConfigString = snapshotActionConfig.toJsonString() + val parsedNotificationActionConfig = ActionConfig.parse(parser(snapshotActionConfigString), 0) + assertEquals("Round tripping SnapshotActionConfig doesn't work", snapshotActionConfig, parsedNotificationActionConfig) + } + fun `test managed index config parsing`() { val config = randomManagedIndexConfig() val configTwo = config.copy(changePolicy = null)