Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Snapshot implementation (#135)
Browse files Browse the repository at this point in the history
* Snapshot implementation
Action has two steps:
attempt_snapshot and wait_for_snapshot

* Add timestamp to snapshot

* added missing license headers
replaced LinkedHashMap in SnapshotAction
handle waitForSnapshotStep is not completed
added usermetadata
added hash at the end of the name
set explicit waitForCompletion
using CONDITION_NOT_MET

* added snapshot name hash

* implemented isIdempotent method

* Do not fail snapshot if another one is in progress
+ Restore snapshot name with timestamp

* replace wildcard imports and set isIdempotent to false

* removed includeGlobalState from Snapshotaction

Co-authored-by: Daniel Bryla <daniel.bryla@hertz.com>
Co-authored-by: Javier Abrego <javier.abrego@externals.adidas.com>
  • Loading branch information
3 people committed May 14, 2020
1 parent a2d607d commit 4a4d707
Show file tree
Hide file tree
Showing 11 changed files with 500 additions and 2 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.ActionConfig.ActionType
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot.AttemptSnapshotStep
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot.WaitForSnapshotStep
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService

class SnapshotAction(
clusterService: ClusterService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData,
config: SnapshotActionConfig
) : Action(ActionType.SNAPSHOT, config, managedIndexMetaData) {
private val attemptSnapshotStep = AttemptSnapshotStep(clusterService, client, config, managedIndexMetaData)
private val waitForSnapshotStep = WaitForSnapshotStep(clusterService, client, config, managedIndexMetaData)

override fun getSteps(): List<Step> = listOf(attemptSnapshotStep, waitForSnapshotStep)

@Suppress("ReturnCount")
override fun getStepToExecute(): Step {
// If stepMetaData is null, return the first step
val stepMetaData = managedIndexMetaData.stepMetaData ?: return attemptSnapshotStep

// If the current step has completed, return the next step
if (stepMetaData.stepStatus == Step.StepStatus.COMPLETED) {
return when (stepMetaData.name) {
AttemptSnapshotStep.name -> waitForSnapshotStep
else -> attemptSnapshotStep
}
}

return when (stepMetaData.name) {
AttemptSnapshotStep.name -> attemptSnapshotStep
else -> waitForSnapshotStep
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ abstract class ActionConfig(
READ_WRITE("read_write"),
REPLICA_COUNT("replica_count"),
FORCE_MERGE("force_merge"),
NOTIFICATION("notification");
NOTIFICATION("notification"),
SNAPSHOT("snapshot");

override fun toString(): String {
return type
Expand Down Expand Up @@ -94,6 +95,7 @@ abstract class ActionConfig(
ActionType.REPLICA_COUNT.type -> actionConfig = ReplicaCountActionConfig.parse(xcp, index)
ActionType.FORCE_MERGE.type -> actionConfig = ForceMergeActionConfig.parse(xcp, index)
ActionType.NOTIFICATION.type -> actionConfig = NotificationActionConfig.parse(xcp, index)
ActionType.SNAPSHOT.type -> actionConfig = SnapshotActionConfig.parse(xcp, index)
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in Action.")
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action

import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.Action
import com.amazon.opendistroforelasticsearch.indexstatemanagement.action.SnapshotAction
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.common.xcontent.ToXContent
import org.elasticsearch.common.xcontent.ToXContentObject
import org.elasticsearch.common.xcontent.XContentBuilder
import org.elasticsearch.common.xcontent.XContentParser
import org.elasticsearch.common.xcontent.XContentParserUtils
import org.elasticsearch.script.ScriptService
import java.io.IOException

data class SnapshotActionConfig(
val repository: String?,
val snapshot: String?,
val index: Int
) : ToXContentObject, ActionConfig(ActionType.SNAPSHOT, index) {

init {
require(repository != null) { "SnapshotActionConfig repository must be specified" }
require(snapshot != null) { "SnapshotActionConfig snapshot must be specified" }
}

override fun toXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
builder.startObject()
super.toXContent(builder, params)
.startObject(ActionType.SNAPSHOT.type)
if (repository != null) builder.field(REPOSITORY_FIELD, repository)
if (snapshot != null) builder.field(SNAPSHOT_FIELD, snapshot)
return builder.endObject().endObject()
}

override fun isFragment(): Boolean = super<ToXContentObject>.isFragment()

override fun toAction(
clusterService: ClusterService,
scriptService: ScriptService,
client: Client,
managedIndexMetaData: ManagedIndexMetaData
): Action = SnapshotAction(clusterService, client, managedIndexMetaData, this)

companion object {
const val REPOSITORY_FIELD = "repository"
const val SNAPSHOT_FIELD = "snapshot"
const val INCLUDE_GLOBAL_STATE = "include_global_state"

@JvmStatic
@Throws(IOException::class)
fun parse(xcp: XContentParser, index: Int): SnapshotActionConfig {
var repository: String? = null
var snapshot: String? = null

XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.currentToken(), xcp::getTokenLocation)
while (xcp.nextToken() != XContentParser.Token.END_OBJECT) {
val fieldName = xcp.currentName()
xcp.nextToken()

when (fieldName) {
REPOSITORY_FIELD -> repository = xcp.text()
SNAPSHOT_FIELD -> snapshot = xcp.text()
else -> throw IllegalArgumentException("Invalid field: [$fieldName] found in SnapshotActionConfig.")
}
}

return SnapshotActionConfig(
repository = repository,
snapshot = snapshot,
index = index
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotRequest
import org.elasticsearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.snapshots.ConcurrentSnapshotExecutionException
import java.time.LocalDateTime
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import java.util.Locale

class AttemptSnapshotStep(
val clusterService: ClusterService,
val client: Client,
val config: SnapshotActionConfig,
managedIndexMetaData: ManagedIndexMetaData
) : Step(name, managedIndexMetaData) {

private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = false

@Suppress("TooGenericExceptionCaught")
override suspend fun execute() {
try {
logger.info("Executing snapshot on ${managedIndexMetaData.index}")
val snapshotName = config
.snapshot
.plus("-")
.plus(LocalDateTime
.now(ZoneId.of("UTC"))
.format(DateTimeFormatter.ofPattern("uuuu.MM.dd-HH:mm:ss.SSS", Locale.ROOT)))
val mutableInfo = mutableMapOf("snapshotName" to snapshotName)

val createSnapshotRequest = CreateSnapshotRequest()
.userMetadata(mapOf("snapshot_created" to "Open Distro for Elasticsearch Index Management"))
.indices(managedIndexMetaData.index)
.snapshot(snapshotName)
.repository(config.repository)
.waitForCompletion(false)

val response: CreateSnapshotResponse = client.admin().cluster().suspendUntil { createSnapshot(createSnapshotRequest, it) }
when (response.status()) {
RestStatus.ACCEPTED -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = "Snapshot creation started and is still in progress for index: ${managedIndexMetaData.index}"
}
RestStatus.OK -> {
stepStatus = StepStatus.COMPLETED
mutableInfo["message"] = "Snapshot created for index: ${managedIndexMetaData.index}"
}
else -> {
stepStatus = StepStatus.FAILED
mutableInfo["message"] = "There was an error during snapshot creation for index: ${managedIndexMetaData.index}"
mutableInfo["cause"] = response.toString()
}
}
info = mutableInfo.toMap()
} catch (e: ConcurrentSnapshotExecutionException) {
val message = "Snapshot creation already in progress."
logger.debug(message, e)
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to message)
} catch (e: Exception) {
val message = "Failed to create snapshot for index: ${managedIndexMetaData.index}"
logger.error(message, e)
stepStatus = StepStatus.FAILED
val mutableInfo = mutableMapOf("message" to message)
val errorMessage = e.message
if (errorMessage != null) mutableInfo["cause"] = errorMessage
info = mutableInfo.toMap()
}
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

companion object {
const val name = "attempt_snapshot"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amazon.opendistroforelasticsearch.indexstatemanagement.step.snapshot

import com.amazon.opendistroforelasticsearch.indexstatemanagement.elasticapi.suspendUntil
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.ManagedIndexMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.action.SnapshotActionConfig
import com.amazon.opendistroforelasticsearch.indexstatemanagement.model.managedindexmetadata.StepMetaData
import com.amazon.opendistroforelasticsearch.indexstatemanagement.step.Step
import org.apache.logging.log4j.LogManager
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.elasticsearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.elasticsearch.client.Client
import org.elasticsearch.cluster.service.ClusterService

class WaitForSnapshotStep(
val clusterService: ClusterService,
val client: Client,
val config: SnapshotActionConfig,
managedIndexMetaData: ManagedIndexMetaData
) : Step(name, managedIndexMetaData) {
private val logger = LogManager.getLogger(javaClass)
private var stepStatus = StepStatus.STARTING
private var info: Map<String, Any>? = null

override fun isIdempotent() = true

override suspend fun execute() {
logger.info("Waiting for snapshot to complete...")
val request = SnapshotsStatusRequest()
.snapshots(arrayOf(managedIndexMetaData.info?.get("snapshotName").toString()))
.repository(config.repository)
val response: SnapshotsStatusResponse = client.admin().cluster().suspendUntil { snapshotsStatus(request, it) }
val status: SnapshotStatus? = response
.snapshots
.find { snapshotStatus ->
snapshotStatus.snapshot.snapshotId.name == managedIndexMetaData.info?.get("snapshotName").toString() &&
snapshotStatus.snapshot.repository == config.repository
}
if (status != null) {
if (status.state.completed()) {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to "Snapshot created for index: ${managedIndexMetaData.index}")
} else {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to "Creating snapshot in progress for index: ${managedIndexMetaData.index}")
}
} else {
stepStatus = StepStatus.FAILED
info = mapOf("message" to "Snapshot doesn't exist for index: ${managedIndexMetaData.index}")
}
}

override fun getUpdatedManagedIndexMetaData(currentMetaData: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetaData.copy(
stepMetaData = StepMetaData(name, getStepStartTime().toEpochMilli(), stepStatus),
transitionTo = null,
info = info
)
}

companion object {
const val name = "wait_for_snapshot"
}
}
Loading

0 comments on commit 4a4d707

Please sign in to comment.