Skip to content
This repository has been archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
Fixes start API not setting FAILED metadata to RETRY. Also fixes bug …
Browse files Browse the repository at this point in the history
…with releaseLock not correclty releasing an updated lock
  • Loading branch information
dbbaughe committed Nov 19, 2020
1 parent 423a825 commit 27c8cb7
Show file tree
Hide file tree
Showing 3 changed files with 199 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -153,17 +153,12 @@ object RollupRunner : ScheduledJobRunner,
// Check if rollup should be processed before acquiring the lock
// If metadata does not exist, it will either be initialized for the first time or it will be recreated to communicate the failed state
if (rollupSearchService.shouldProcessRollup(job, metadata)) {
// Attempt to acquire lock
val lock = acquireLockForRollupJob(job, context)
if (lock == null) {
logger.debug("Could not acquire lock for ${job.id}")
} else {
runRollupJob(job, context, lock)
// Release lock
val released: Boolean = releaseLockForRollupJob(context, lock)
if (!released) {
logger.debug("Could not release lock for ${job.id}")
}
releaseLockForRollupJob(context, lock)
}
} else if (job.isEnabled) {
// We are doing this outside of ShouldProcess as schedule job interval can be more frequent than rollup and we want to fail
Expand Down Expand Up @@ -202,7 +197,16 @@ object RollupRunner : ScheduledJobRunner,
}

private suspend fun releaseLockForRollupJob(context: JobExecutionContext, lock: LockModel): Boolean {
return context.lockService.suspendUntil { release(lock, it) }
var released = false
try {
released = context.lockService.suspendUntil { release(lock, it) }
if (!released) {
logger.warn("Could not release lock for ${lock.jobId}")
}
} catch (e: Exception) {
logger.error("Failed to release lock", e)
}
return released
}

// TODO: Clean up runner
Expand All @@ -214,6 +218,7 @@ object RollupRunner : ScheduledJobRunner,
* */
@Suppress("ReturnCount", "NestedBlockDepth", "ComplexMethod", "LongMethod", "ThrowsCount")
private suspend fun runRollupJob(job: Rollup, context: JobExecutionContext, lock: LockModel) {
var updatableLock = lock
try {
when (val jobValidity = isJobValid(job)) {
is RollupJobValidationResult.Invalid -> {
Expand All @@ -238,7 +243,6 @@ object RollupRunner : ScheduledJobRunner,
is MetadataResult.Failure ->
throw RollupMetadataException("Failed to initialize rollup metadata", initMetadataResult.cause)
}

if (metadata.status == RollupMetadata.Status.FAILED) {
logger.info("Metadata status is FAILED, disabling job $metadata")
disableJob(job, metadata)
Expand Down Expand Up @@ -270,7 +274,6 @@ object RollupRunner : ScheduledJobRunner,
else -> {}
}

var updatableLock = lock
while (rollupSearchService.shouldProcessRollup(updatableJob, metadata)) {
do {
try {
Expand Down Expand Up @@ -312,6 +315,7 @@ object RollupRunner : ScheduledJobRunner,
logger.warn("Failed trying to renew lock on $updatableLock", e)
// If we fail to renew the lock it doesn't mean we need to perm fail the job, we can just return early
// and let the next execution try to process the data from where this one left off
releaseLockForRollupJob(context, updatableLock)
return
}
} catch (e: RollupMetadataException) {
Expand All @@ -321,6 +325,7 @@ object RollupRunner : ScheduledJobRunner,
} catch (e: Exception) {
// TODO: Should update metadata and disable job here instead of allowing the rollup to keep going
logger.error("Failed to rollup ", e)
releaseLockForRollupJob(context, updatableLock)
return
}
} while (metadata.afterKey != null)
Expand All @@ -329,14 +334,19 @@ object RollupRunner : ScheduledJobRunner,
if (!updatableJob.continuous) {
if (listOf(RollupMetadata.Status.STOPPED, RollupMetadata.Status.FINISHED, RollupMetadata.Status.FAILED).contains(metadata.status)) {
disableJob(updatableJob, metadata)
return
}
}

// If we have been constantly renewing the lock then the seqNo/primaryTerm will have changed
// and the releaseLock call outside of runRollupJob will fail, so release here with updatableLock
// and outside just in case we returned early at a different point (attempting to release twice won't hurt)
releaseLockForRollupJob(context, updatableLock)
} catch (e: RollupMetadataException) {
// In most scenarios in the runner, the metadata will be used to communicate the result to the user
// If change to the metadata itself fails, there is nothing else to relay state change
// In these cases, the cause of the metadata operation will be logged here and the runner execution will exit
logger.error(e.message, e.cause)
releaseLockForRollupJob(context, updatableLock)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@

package com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.start

import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.INDEX_MANAGEMENT_INDEX
import com.amazon.opendistroforelasticsearch.indexmanagement.elasticapi.parseWithType
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.get.GetRollupAction
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.get.GetRollupRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.action.get.GetRollupResponse
Expand All @@ -26,13 +27,19 @@ import org.elasticsearch.ElasticsearchStatusException
import org.elasticsearch.ExceptionsHelper
import org.elasticsearch.action.ActionListener
import org.elasticsearch.action.DocWriteResponse
import org.elasticsearch.action.get.GetRequest
import org.elasticsearch.action.get.GetResponse
import org.elasticsearch.action.support.ActionFilters
import org.elasticsearch.action.support.HandledTransportAction
import org.elasticsearch.action.support.master.AcknowledgedResponse
import org.elasticsearch.action.update.UpdateRequest
import org.elasticsearch.action.update.UpdateResponse
import org.elasticsearch.client.Client
import org.elasticsearch.common.inject.Inject
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler
import org.elasticsearch.common.xcontent.NamedXContentRegistry
import org.elasticsearch.common.xcontent.XContentHelper
import org.elasticsearch.common.xcontent.XContentType
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.tasks.Task
import org.elasticsearch.transport.TransportService
Expand Down Expand Up @@ -60,8 +67,12 @@ class TransportStartRollupAction @Inject constructor(
}

if (rollup.enabled) {
log.debug("Rollup job is already enabled")
return actionListener.onResponse(AcknowledgedResponse(true))
log.debug("Rollup job is already enabled, checking if metadata needs to be updated")
return if (rollup.metadataID == null) {
actionListener.onResponse(AcknowledgedResponse(true))
} else {
getRollupMetadata(rollup, actionListener)
}
}

updateRollupJob(rollup, request, actionListener)
Expand All @@ -76,15 +87,14 @@ class TransportStartRollupAction @Inject constructor(
// TODO: Should create a transport action to update metadata
private fun updateRollupJob(rollup: Rollup, request: StartRollupRequest, actionListener: ActionListener<AcknowledgedResponse>) {
val now = Instant.now().toEpochMilli()
request.index(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX)
.doc(mapOf(Rollup.ROLLUP_TYPE to mapOf(Rollup.ENABLED_FIELD to true,
request.index(INDEX_MANAGEMENT_INDEX).doc(mapOf(Rollup.ROLLUP_TYPE to mapOf(Rollup.ENABLED_FIELD to true,
Rollup.ENABLED_TIME_FIELD to now, Rollup.LAST_UPDATED_TIME_FIELD to now)))
client.update(request, object : ActionListener<UpdateResponse> {
override fun onResponse(response: UpdateResponse) {
if (response.result == DocWriteResponse.Result.UPDATED) {
// If there is a metadata ID on rollup then we need to set it back to STARTED
// If there is a metadata ID on rollup then we need to set it back to STARTED or RETRY
if (rollup.metadataID != null) {
updateRollupMetadata(rollup, actionListener)
getRollupMetadata(rollup, actionListener)
} else {
actionListener.onResponse(AcknowledgedResponse(true))
}
Expand All @@ -98,10 +108,44 @@ class TransportStartRollupAction @Inject constructor(
})
}

private fun updateRollupMetadata(rollup: Rollup, actionListener: ActionListener<AcknowledgedResponse>) {
private fun getRollupMetadata(rollup: Rollup, actionListener: ActionListener<AcknowledgedResponse>) {
val req = GetRequest(INDEX_MANAGEMENT_INDEX, rollup.metadataID).routing(rollup.id)
client.get(req, object : ActionListener<GetResponse> {
override fun onResponse(response: GetResponse) {
if (!response.isExists || response.isSourceEmpty) {
// If there is no metadata doc then the runner will instantiate a new one
// in FAILED status which the user will need to retry from
actionListener.onResponse(AcknowledgedResponse(true))
} else {
val metadata = response.sourceAsBytesRef?.let {
val xcp = XContentHelper.createParser(NamedXContentRegistry.EMPTY, LoggingDeprecationHandler.INSTANCE, it, XContentType.JSON)
xcp.parseWithType(response.id, response.seqNo, response.primaryTerm, RollupMetadata.Companion::parse)
}
if (metadata == null) {
// If there is no metadata doc then the runner will instantiate a new one
// in FAILED status which the user will need to retry from
actionListener.onResponse(AcknowledgedResponse(true))
} else {
updateRollupMetadata(rollup, metadata, actionListener)
}
}
}

override fun onFailure(e: Exception) {
actionListener.onFailure(ExceptionsHelper.unwrapCause(e) as Exception)
}
})
}

private fun updateRollupMetadata(rollup: Rollup, metadata: RollupMetadata, actionListener: ActionListener<AcknowledgedResponse>) {
val now = Instant.now().toEpochMilli()
val updateRequest = UpdateRequest(IndexManagementPlugin.INDEX_MANAGEMENT_INDEX, rollup.metadataID)
.doc(mapOf(RollupMetadata.ROLLUP_METADATA_TYPE to mapOf(RollupMetadata.STATUS_FIELD to RollupMetadata.Status.STARTED.type,
val updatedStatus = when (metadata.status) {
RollupMetadata.Status.FINISHED, RollupMetadata.Status.STOPPED -> RollupMetadata.Status.STARTED
RollupMetadata.Status.STARTED, RollupMetadata.Status.INIT, RollupMetadata.Status.RETRY -> return actionListener.onResponse(AcknowledgedResponse(true))
RollupMetadata.Status.FAILED -> RollupMetadata.Status.RETRY
}
val updateRequest = UpdateRequest(INDEX_MANAGEMENT_INDEX, rollup.metadataID)
.doc(mapOf(RollupMetadata.ROLLUP_METADATA_TYPE to mapOf(RollupMetadata.STATUS_FIELD to updatedStatus.type,
RollupMetadata.FAILURE_REASON to null, RollupMetadata.LAST_UPDATED_FIELD to now)))
client.update(updateRequest, object : ActionListener<UpdateResponse> {
override fun onResponse(response: UpdateResponse) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,17 @@ package com.amazon.opendistroforelasticsearch.indexmanagement.rollup.resthandler
import com.amazon.opendistroforelasticsearch.indexmanagement.IndexManagementPlugin.Companion.ROLLUP_JOBS_BASE_URI
import com.amazon.opendistroforelasticsearch.indexmanagement.makeRequest
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.RollupRestTestCase
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.Rollup
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.RollupMetadata
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.model.dimension.DateHistogram
import com.amazon.opendistroforelasticsearch.indexmanagement.rollup.randomRollup
import com.amazon.opendistroforelasticsearch.indexmanagement.waitFor
import com.amazon.opendistroforelasticsearch.jobscheduler.spi.schedule.IntervalSchedule
import org.elasticsearch.client.ResponseException
import org.elasticsearch.rest.RestStatus
import org.elasticsearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.temporal.ChronoUnit

@TestLogging(value = "level:DEBUG", reason = "Debugging tests")
@Suppress("UNCHECKED_CAST")
Expand Down Expand Up @@ -77,4 +84,122 @@ class RestStartRollupActionIT : RollupRestTestCase() {
assertEquals("Unexpected status", RestStatus.BAD_REQUEST, e.response.restStatus())
}
}

@Throws(Exception::class)
fun `test starting a failed rollup`() {
val rollup = Rollup(
id = "restart_failed_rollup",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_restart_failed_rollup",
targetIndex = "target_restart_failed_rollup",
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h")),
metrics = emptyList()
).let { createRollup(it, it.id) }

// This should fail because we did not create a source index
updateRollupStartTime(rollup)

waitFor {
val updatedRollup = getRollup(rollupId = rollup.id)
assertNotNull("MetadataID on rollup was null", updatedRollup.metadataID)
val metadata = getRollupMetadata(updatedRollup.metadataID!!)
// It should be failed because we did not create the source index
assertEquals("Status should be failed", RollupMetadata.Status.FAILED, metadata.status)
assertFalse("Rollup was not disabled", updatedRollup.enabled)
}

// Now create the missing source index
generateNYCTaxiData("source_restart_failed_rollup")

// And call _start on the failed rollup job
val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_start")
assertEquals("Start rollup failed", RestStatus.OK, response.restStatus())
val expectedResponse = mapOf("acknowledged" to true)
assertEquals(expectedResponse, response.asMap())

val updatedRollup = getRollup(rollup.id)
assertTrue("Rollup was not enabled", updatedRollup.enabled)
waitFor {
val metadata = getRollupMetadata(updatedRollup.metadataID!!)
// It should be in retry now
assertEquals("Status should be retry", RollupMetadata.Status.RETRY, metadata.status)
}

updateRollupStartTime(rollup)

// Rollup should be able to finished, with actual rolled up docs
waitFor {
val metadata = getRollupMetadata(updatedRollup.metadataID!!)
assertEquals("Status should be finished", RollupMetadata.Status.FINISHED, metadata.status)
assertEquals("Did not roll up documents", 5000, metadata.stats.documentsProcessed)
assertTrue("Did not roll up documents", metadata.stats.rollupsIndexed > 0)
}
}

@Throws(Exception::class)
fun `test starting a finished rollup`() {
generateNYCTaxiData("source_restart_finished_rollup")
val rollup = Rollup(
id = "restart_finished_rollup",
schemaVersion = 1L,
enabled = true,
jobSchedule = IntervalSchedule(Instant.now(), 1, ChronoUnit.MINUTES),
jobLastUpdatedTime = Instant.now(),
jobEnabledTime = Instant.now(),
description = "basic search test",
sourceIndex = "source_restart_finished_rollup",
targetIndex = "target_restart_finished_rollup",
metadataID = null,
roles = emptyList(),
pageSize = 10,
delay = 0,
continuous = false,
dimensions = listOf(DateHistogram(sourceField = "tpep_pickup_datetime", fixedInterval = "1h")),
metrics = emptyList()
).let { createRollup(it, it.id) }

updateRollupStartTime(rollup)
var firstRollupsIndexed = 0L
waitFor {
val updatedRollup = getRollup(rollupId = rollup.id)
val metadata = getRollupMetadata(updatedRollup.metadataID!!)
assertEquals("Status should be finished", RollupMetadata.Status.FINISHED, metadata.status)
assertEquals("Did not roll up documents", 5000, metadata.stats.documentsProcessed)
assertTrue("Did not roll up documents", metadata.stats.rollupsIndexed > 0)
firstRollupsIndexed = metadata.stats.rollupsIndexed
}

deleteIndex("target_restart_finished_rollup")

// And call _start on the failed rollup job
val response = client().makeRequest("POST", "$ROLLUP_JOBS_BASE_URI/${rollup.id}/_start")
assertEquals("Start rollup failed", RestStatus.OK, response.restStatus())
val expectedResponse = mapOf("acknowledged" to true)
assertEquals(expectedResponse, response.asMap())

updateRollupStartTime(rollup)

// Rollup should be able to finished, with actual rolled up docs again
waitFor {
val updatedRollup = getRollup(rollupId = rollup.id)
val metadata = getRollupMetadata(updatedRollup.metadataID!!)
// logger.info("metadata $metadata")
assertEquals("Status should be finished", RollupMetadata.Status.FINISHED, metadata.status)
// Expect 10k docs now (5k from first and 5k from second)
assertEquals("Did not roll up documents", 10000, metadata.stats.documentsProcessed)
// Should have twice the rollups indexed now
assertEquals("Did not index rollup docs", firstRollupsIndexed * 2, metadata.stats.rollupsIndexed)
assertIndexExists("target_restart_finished_rollup")
}
}
}

0 comments on commit 27c8cb7

Please sign in to comment.