Skip to content

Commit

Permalink
fix monitor renew lock issue
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Jul 26, 2024
1 parent e22cd76 commit 93267b3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -331,12 +331,12 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
var lock: LockModel? = null
var workflowLock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
workflowLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${workflowLock.lockId} acquired")

monitorCtx.client!!.suspendUntil<Client, ExecuteWorkflowResponse> {
monitorCtx.client!!.execute(
Expand All @@ -352,19 +352,19 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock?.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(workflowLock, it) }
logger.debug("lock ${workflowLock?.lockId} released")
}
}
}
is Monitor -> {
launch {
var lock: LockModel? = null
var monitorLock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorLock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug("lock ${monitorLock.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
Expand All @@ -384,8 +384,8 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
)
}
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock?.lockId} released")
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(monitorLock, it) }
logger.debug("lock ${monitorLock?.lockId} released")
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import org.opensearch.action.update.UpdateResponse
import org.opensearch.client.Client
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.settings.Settings
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
Expand All @@ -29,6 +30,7 @@ import org.opensearch.index.engine.VersionConflictEngineException
import org.opensearch.index.seqno.SequenceNumbers
import java.io.IOException
import java.time.Instant
import java.util.concurrent.TimeUnit

private val log = LogManager.getLogger(LockService::class.java)

Expand All @@ -37,6 +39,7 @@ class LockService(private val client: Client, private val clusterService: Cluste

companion object {
const val LOCK_INDEX_NAME = ".opensearch-alerting-config-lock"
val LOCK_EXPIRED_SECONDS = TimeValue(5, TimeUnit.MINUTES)

@JvmStatic
fun lockMapping(): String? {
Expand Down Expand Up @@ -72,13 +75,23 @@ class LockService(private val client: Client, private val clusterService: Cluste
object : ActionListener<LockModel> {
override fun onResponse(existingLock: LockModel?) {
if (existingLock != null) {
val currentTimestamp = getNow()
if (isLockReleased(existingLock)) {
log.debug("lock is released or expired: {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT released or expired. {}", existingLock)
listener.onResponse(null)
log.debug("Lock is NOT released. {}", existingLock)
if (existingLock.lockTime.epochSecond + LOCK_EXPIRED_SECONDS.seconds
< currentTimestamp.epochSecond
) {
log.debug("Lock is expired. Renewing Lock {}", existingLock)
val updateLock = LockModel(existingLock, getNow(), false)
updateLock(updateLock, listener)
} else {
log.debug("Lock is NOT expired. {}", existingLock)
listener.onResponse(null)
}
}
} else {
val tempLock = LockModel(scheduledJobId, getNow(), false)
Expand Down

0 comments on commit 93267b3

Please sign in to comment.