diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index b949eefdf..b92ef076a 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -331,12 +331,12 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon when (job) { is Workflow -> { launch { - var lock: LockModel? = null + var workflowLock: LockModel? = null try { - lock = monitorCtx.client!!.suspendUntil { + workflowLock = monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.acquireLock(job, it) } ?: return@launch - logger.debug("lock ${lock!!.lockId} acquired") + logger.debug("lock ${workflowLock.lockId} acquired") monitorCtx.client!!.suspendUntil { monitorCtx.client!!.execute( @@ -352,19 +352,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon ) } } finally { - monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } - logger.debug("lock ${lock?.lockId} released") + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(workflowLock, it) } + logger.debug("lock ${workflowLock?.lockId} released") } } } is Monitor -> { launch { - var lock: LockModel? = null + var monitorLock: LockModel? = null try { - lock = monitorCtx.client!!.suspendUntil { + monitorLock = monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.acquireLock(job, it) } ?: return@launch - logger.debug("lock ${lock!!.lockId} acquired") + logger.debug("lock ${monitorLock.lockId} acquired") logger.debug( "PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " + monitorCtx.clusterService!!.state().nodes().localNode.id @@ -384,8 +384,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon ) } } finally { - monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(lock, it) } - logger.debug("lock ${lock?.lockId} released") + monitorCtx.client!!.suspendUntil { monitorCtx.lockService!!.release(monitorLock, it) } + logger.debug("lock ${monitorLock?.lockId} released") } } } diff --git a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt index 35618e156..03f40d745 100644 --- a/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt +++ b/core/src/main/kotlin/org/opensearch/alerting/core/lock/LockService.kt @@ -16,6 +16,7 @@ import org.opensearch.action.update.UpdateResponse import org.opensearch.client.Client import org.opensearch.cluster.service.ClusterService import org.opensearch.common.settings.Settings +import org.opensearch.common.unit.TimeValue import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentType @@ -29,6 +30,7 @@ import org.opensearch.index.engine.VersionConflictEngineException import org.opensearch.index.seqno.SequenceNumbers import java.io.IOException import java.time.Instant +import java.util.concurrent.TimeUnit private val log = LogManager.getLogger(LockService::class.java) @@ -37,6 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste companion object { const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock" + val LOCK_EXPIRED_SECONDS = TimeValue(5, TimeUnit.MINUTES) @JvmStatic fun lockMapping(): String? { @@ -72,13 +75,23 @@ class LockService(private val client: Client, private val clusterService: Cluste object : ActionListener { override fun onResponse(existingLock: LockModel?) { if (existingLock != null) { + val currentTimestamp = getNow() if (isLockReleased(existingLock)) { log.debug("lock is released or expired: {}", existingLock) val updateLock = LockModel(existingLock, getNow(), false) updateLock(updateLock, listener) } else { - log.debug("Lock is NOT released or expired. {}", existingLock) - listener.onResponse(null) + log.debug("Lock is NOT released. {}", existingLock) + if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_SECONDS.seconds + < currentTimestamp.epochSecond + ) { + log.debug("Lock is expired. Renewing Lock {}", existingLock) + val updateLock = LockModel(existingLock, getNow(), false) + updateLock(updateLock, listener) + } else { + log.debug("Lock is NOT expired. {}", existingLock) + listener.onResponse(null) + } } } else { val tempLock = LockModel(scheduledJobId, getNow(), false)