Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add distributed locking to jobs in alerting #1403

Merged
merged 1 commit into from
Mar 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.opensearch.alerting.core.JobSweeper
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsAction
import org.opensearch.alerting.core.action.node.ScheduledJobsStatsTransportAction
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
Expand Down Expand Up @@ -251,6 +252,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
): Collection<Any> {
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
val lockService = LockService(client, clusterService)
alertIndices = AlertIndices(settings, client, threadPool, clusterService)
runner = MonitorRunnerService
.registerClusterService(clusterService)
Expand All @@ -267,6 +269,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerJvmStats(JvmStats.jvmStats())
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerLockService(lockService)
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
Expand All @@ -291,9 +294,9 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
settings
)

DeleteMonitorService.initialize(client)
DeleteMonitorService.initialize(client, lockService)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, lockService)
}

override fun getSettings(): List<Setting<*>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
Expand Down Expand Up @@ -57,4 +58,5 @@ data class MonitorRunnerExecutionContext(
AlertingSettings.DEFAULT_PERCOLATE_QUERY_DOCS_SIZE_MEMORY_PERCENTAGE_LIMIT,
@Volatile var docLevelMonitorShardFetchSize: Int =
AlertingSettings.DEFAULT_DOC_LEVEL_MONITOR_SHARD_FETCH_SIZE,
@Volatile var lockService: LockService? = null
)
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.opensearchapi.retry
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.script.TriggerExecutionContext
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.AlertingSettings.Companion.ALERT_BACKOFF_COUNT
Expand Down Expand Up @@ -221,6 +224,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerLockService(lockService: LockService): MonitorRunnerService {
monitorCtx.lockService = lockService
return this
}

// Updates destination settings when the reload API is called so that new keystore values are visible
fun reloadDestinationSettings(settings: Settings) {
monitorCtx.destinationSettings = loadDestinationSettings(settings)
Expand Down Expand Up @@ -292,20 +300,40 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
when (job) {
is Workflow -> {
launch {
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
var lock: LockModel? = null
try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could consider adding adding some delay between retries on acquiring the lock

logger.debug("lock ${lock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing workflow ${job.id} on node " +
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: I think we should get rid of the PERF_DEBUG prefixes in these log statements. I'm fine if we fix it later though

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
is Monitor -> {
launch {
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
var lock: LockModel? = null
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: this code looks identical to 303-317. We should create a private method to avoid the duplication

try {
lock = monitorCtx.client!!.suspendUntil<Client, LockModel?> {
monitorCtx.lockService!!.acquireLock(job, it)
} ?: return@launch
logger.debug("lock ${lock!!.lockId} acquired")
logger.debug(
"PERF_DEBUG: executing ${job.monitorType} ${job.id} on node " +
monitorCtx.clusterService!!.state().nodes().localNode.id
)
runJob(job, periodStart, periodEnd, false)
} finally {
monitorCtx.client!!.suspendUntil<Client, Boolean> { monitorCtx.lockService!!.release(lock, it) }
logger.debug("lock ${lock!!.lockId} released")
}
}
}
else -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import org.opensearch.action.support.IndicesOptions
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
Expand Down Expand Up @@ -49,11 +51,14 @@ object DeleteMonitorService :
private val log = LogManager.getLogger(this.javaClass)

private lateinit var client: Client
private lateinit var lockService: LockService

fun initialize(
client: Client,
lockService: LockService
) {
DeleteMonitorService.client = client
DeleteMonitorService.lockService = lockService
}

/**
Expand All @@ -65,6 +70,7 @@ object DeleteMonitorService :
val deleteResponse = deleteMonitor(monitor.id, refreshPolicy)
deleteDocLevelMonitorQueriesAndIndices(monitor)
deleteMetadata(monitor)
deleteLock(monitor)
return DeleteMonitorResponse(deleteResponse.id, deleteResponse.version)
}

Expand Down Expand Up @@ -148,6 +154,10 @@ object DeleteMonitorService :
}
}

private suspend fun deleteLock(monitor: Monitor) {
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(monitor.id), it) }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

try catch?

}

/**
* Checks if the monitor is part of the workflow
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import org.opensearch.action.search.SearchResponse
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.alerting.core.lock.LockModel
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.model.WorkflowMetadata
import org.opensearch.alerting.opensearchapi.addFilter
Expand Down Expand Up @@ -73,6 +75,7 @@ class TransportDeleteWorkflowAction @Inject constructor(
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val lockService: LockService
) : HandledTransportAction<ActionRequest, DeleteWorkflowResponse>(
AlertingActions.DELETE_WORKFLOW_ACTION_NAME, transportService, actionFilters, ::DeleteWorkflowRequest
),
Expand Down Expand Up @@ -180,6 +183,12 @@ class TransportDeleteWorkflowAction @Inject constructor(
} catch (t: Exception) {
log.error("Failed to delete delegate monitor metadata. But proceeding with workflow deletion $workflowId", t)
}
try {
// Delete the workflow lock
client.suspendUntil<Client, Boolean> { lockService.deleteLock(LockModel.generateLockId(workflowId), it) }
} catch (t: Exception) {
log.error("Failed to delete workflow lock for $workflowId")
}
actionListener.onResponse(deleteWorkflowResponse)
} else {
actionListener.onFailure(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import org.apache.hc.core5.http.io.entity.StringEntity
import org.opensearch.action.search.SearchResponse
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_ALERT_INDEX_PATTERN
import org.opensearch.alerting.alerts.AlertIndices.Companion.ALL_FINDING_INDEX_PATTERN
import org.opensearch.alerting.core.lock.LockService
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.client.Response
import org.opensearch.client.ResponseException
Expand All @@ -18,16 +19,20 @@ import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DataSources
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.IntervalSchedule
import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.AlertCategory
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.model.action.PerExecutionActionScope
import org.opensearch.core.rest.RestStatus
import org.opensearch.script.Script
import org.opensearch.test.OpenSearchTestCase
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.time.temporal.ChronoUnit.MILLIS
import java.util.Locale
import java.util.concurrent.TimeUnit

class DocumentMonitorRunnerIT : AlertingRestTestCase() {

Expand Down Expand Up @@ -470,6 +475,86 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
assertTrue("Findings saved for test monitor", findings[1].relatedDocIds.contains("1"))
}

fun `test monitor run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val inputMap = HashMap<String, Any>()
inputMap["searchString"] = monitor.name

val responseMap = getAlerts(inputMap).asMap()
val alerts = (responseMap["alerts"] as ArrayList<Map<String, Any>>)
alerts.forEach {
assertTrue(it["error_message"] == null)
}
}

@AwaitsFix(bugUrl = "")
fun `test monitor run generate lock and monitor delete removes lock`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
OpenSearchTestCase.waitUntil({
val response = client().makeRequest("HEAD", LockService.LOCK_INDEX_NAME)
return@waitUntil (response.restStatus().status == 200)
}, 240, TimeUnit.SECONDS)

var response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
var responseMap = entityAsMap(response)
var noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(1, noOfLocks)

deleteMonitor(monitor)
refreshIndex(LockService.LOCK_INDEX_NAME)
response = client().makeRequest("GET", LockService.LOCK_INDEX_NAME + "/_search")
responseMap = entityAsMap(response)
noOfLocks = ((responseMap["hits"] as Map<String, Any>)["hits"] as List<Any>).size
assertEquals(0, noOfLocks)
}

fun `test execute monitor with tag as trigger condition generates alerts and findings`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(MILLIS))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ import org.opensearch.search.builder.SearchSourceBuilder
import org.opensearch.test.OpenSearchTestCase
import org.opensearch.test.junit.annotations.TestLogging
import java.time.Instant
import java.time.ZonedDateTime
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit
import java.util.Collections
import java.util.Locale
Expand Down Expand Up @@ -1190,4 +1192,45 @@ class WorkflowRestApiIT : AlertingRestTestCase() {
val findings = searchFindings(monitor.copy(id = monitorResponse.id))
assertEquals("Findings saved for test monitor", 1, findings.size)
}

fun `test workflow run generates no error alerts with versionconflictengineexception with locks`() {
val testIndex = createTestIndex()
val testTime = DateTimeFormatter.ISO_OFFSET_DATE_TIME.format(ZonedDateTime.now().truncatedTo(ChronoUnit.MILLIS))
val testDoc = """{
"message" : "This is an error from IAD region",
"test_strict_date_time" : "$testTime",
"test_field" : "us-west-2"
}"""

val docQuery = DocLevelQuery(query = "test_field:\"us-west-2\"", name = "3", fields = listOf())
val docLevelInput = DocLevelMonitorInput("description", listOf(testIndex), listOf(docQuery))

val trigger = randomDocumentLevelTrigger(condition = ALWAYS_RUN)
val monitor = createMonitor(
randomDocumentLevelMonitor(
name = "__lag-monitor-test__",
inputs = listOf(docLevelInput),
triggers = listOf(trigger),
enabled = false,
schedule = IntervalSchedule(interval = 1, unit = ChronoUnit.MINUTES)
)
)
assertNotNull(monitor.id)
createWorkflow(
randomWorkflow(
monitorIds = listOf(monitor.id),
enabled = true,
schedule = IntervalSchedule(1, ChronoUnit.MINUTES)
)
)

indexDoc(testIndex, "1", testDoc)
indexDoc(testIndex, "5", testDoc)
Thread.sleep(240000)

val alerts = searchAlerts(monitor)
alerts.forEach {
assertTrue(it.errorMessage == null)
}
}
}
Loading
Loading