Skip to content

Commit

Permalink
Fixing snapshot bug
Browse files Browse the repository at this point in the history
Signed-off-by: Kshitij Tandon <tandonks@amazon.com>
  • Loading branch information
tandonks committed Sep 14, 2024
1 parent 395317d commit e929e87
Showing 1 changed file with 22 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,16 @@ package org.opensearch.indexmanagement.indexstatemanagement.step.snapshot

import org.apache.logging.log4j.LogManager
import org.opensearch.ExceptionsHelper
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotStatus
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusRequest
import org.opensearch.action.admin.cluster.snapshots.status.SnapshotsStatusResponse
import org.opensearch.cluster.SnapshotsInProgress.State
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsRequest
import org.opensearch.action.admin.cluster.snapshots.get.GetSnapshotsResponse
import org.opensearch.indexmanagement.indexstatemanagement.action.SnapshotAction
import org.opensearch.indexmanagement.opensearchapi.suspendUntil
import org.opensearch.indexmanagement.spi.indexstatemanagement.Step
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ActionProperties
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.ManagedIndexMetaData
import org.opensearch.indexmanagement.spi.indexstatemanagement.model.StepMetaData
import org.opensearch.snapshots.SnapshotInfo
import org.opensearch.snapshots.SnapshotState
import org.opensearch.transport.RemoteTransportException

class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {
Expand All @@ -33,32 +33,31 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {

try {
val snapshotName = getSnapshotName(managedIndexMetadata, indexName) ?: return this
val request =
SnapshotsStatusRequest()
.snapshots(arrayOf(snapshotName))
.repository(repository)
val response: SnapshotsStatusResponse = context.client.admin().cluster().suspendUntil { snapshotsStatus(request, it) }
val status: SnapshotStatus? =
val newRequest = GetSnapshotsRequest()
.snapshots(arrayOf(snapshotName))
.repository(repository)
val response: GetSnapshotsResponse = context.client.admin().cluster().suspendUntil { getSnapshots(newRequest, it) }
val status: SnapshotInfo? =
response
.snapshots
.find { snapshotStatus ->
snapshotStatus.snapshot.snapshotId.name == snapshotName && snapshotStatus.snapshot.repository == repository
.find { snapshotInfo ->
snapshotInfo.snapshotId().name == snapshotName
}
if (status != null) {
when (status.state) {
State.INIT, State.STARTED -> {
when (status.state()) {
SnapshotState.IN_PROGRESS -> {
stepStatus = StepStatus.CONDITION_NOT_MET
info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state.name)
info = mapOf("message" to getSnapshotInProgressMessage(indexName), "state" to status.state().toString())
}
State.SUCCESS -> {
SnapshotState.SUCCESS -> {
stepStatus = StepStatus.COMPLETED
info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state.name)
info = mapOf("message" to getSuccessMessage(indexName), "state" to status.state().toString())
}
else -> { // State.FAILED, State.ABORTED
val message = getFailedExistsMessage(indexName)
logger.warn(message)
stepStatus = StepStatus.FAILED
info = mapOf("message" to message, "state" to status.state.name)
info = mapOf("message" to message, "state" to status.stWate().toString())
}
}
} else {
Expand Down Expand Up @@ -98,13 +97,11 @@ class WaitForSnapshotStep(private val action: SnapshotAction) : Step(name) {
return actionProperties.snapshotName
}

override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData {
return currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)
}
override fun getUpdatedManagedIndexMetadata(currentMetadata: ManagedIndexMetaData): ManagedIndexMetaData = currentMetadata.copy(
stepMetaData = StepMetaData(name, getStepStartTime(currentMetadata).toEpochMilli(), stepStatus),
transitionTo = null,
info = info,
)

override fun isIdempotent(): Boolean = true

Expand Down

0 comments on commit e929e87

Please sign in to comment.