Skip to content

Commit

Permalink
Merge branch 'main' into test67
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 authored Apr 11, 2024
2 parents 49b2374 + 62003ee commit 9d38e26
Show file tree
Hide file tree
Showing 8 changed files with 50 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ import org.opensearch.alerting.script.BucketLevelTriggerExecutionContext
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getBucketKeysHash
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.getCombinedTriggerRunResult
import org.opensearch.alerting.util.printsSampleDocData
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.client.Client
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.LoggingDeprecationHandler
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.model.Alert
Expand Down Expand Up @@ -448,6 +450,9 @@ object BucketLevelMonitorRunner : MonitorRunner() {
queryBuilder.filter(QueryBuilders.termsQuery(fieldName, bucketValues))
sr.source().query(queryBuilder)
}
sr.cancelAfterTimeInterval = TimeValue.timeValueMinutes(
getCancelAfterTimeInterval()
)
val searchResponse: SearchResponse = monitorCtx.client!!.suspendUntil { monitorCtx.client!!.search(sr, it) }
return createFindingPerIndex(searchResponse, monitor, monitorCtx, shouldCreateFinding, executionId)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,24 @@ import org.opensearch.alerting.model.InputRunResults
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.IndexUtils
import org.opensearch.alerting.util.defaultToPerExecutionAction
import org.opensearch.alerting.util.getActionExecutionPolicy
import org.opensearch.alerting.util.getCancelAfterTimeInterval
import org.opensearch.alerting.util.parseSampleDocTags
import org.opensearch.alerting.util.printsSampleDocDat
import org.opensearch.alerting.workflow.WorkflowRunContext
import org.opensearch.cluster.metadata.IndexMetadata
import org.opensearch.cluster.node.DiscoveryNode
import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.unit.TimeValue
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
Expand Down Expand Up @@ -75,10 +88,17 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx.alertIndices!!.createOrUpdateInitialFindingHistoryIndex(monitor.dataSources)
} catch (e: Exception) {
val id = if (monitor.id.trim().isEmpty()) "_na_" else monitor.id
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
val unwrappedException = ExceptionsHelper.unwrapCause(e)
if (unwrappedException is IllegalArgumentException && unwrappedException.message?.contains("Limit of total fields") == true) {
val errorMessage =
"Monitor [$id] can't process index [$monitor.dataSources] due to field mapping limit"
logger.error("Exception: ${unwrappedException.message}")
monitorResult = monitorResult.copy(error = AlertingException(errorMessage, RestStatus.INTERNAL_SERVER_ERROR, e))
} else {
logger.error("Error setting up alerts and findings indices for monitor: $id", e)
monitorResult = monitorResult.copy(error = AlertingException.wrap(e))
}
}

try {
validate(monitor)
} catch (e: Exception) {
Expand Down Expand Up @@ -517,7 +537,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
shards: List<String>,
index: String,
): Map<String, MutableSet<ShardId>> {

val totalShards = shards.size
val numFanOutNodes = allNodes.size.coerceAtMost((totalShards + 1) / 2)
val totalNodes = monitorCtx.totalNodesFanOut.coerceAtMost(numFanOutNodes)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ data class MonitorRunnerExecutionContext(

@Volatile var maxActionableAlertCount: Long = AlertingSettings.DEFAULT_MAX_ACTIONABLE_ALERT_COUNT,
@Volatile var indexTimeout: TimeValue? = null,
@Volatile var cancelAfterTimeInterval: TimeValue? = null,
@Volatile var findingsIndexBatchSize: Int = AlertingSettings.DEFAULT_FINDINGS_INDEXING_BATCH_SIZE,
@Volatile var fetchOnlyQueryFieldNames: Boolean = true,
@Volatile var percQueryMaxNumDocsInMemory: Int = AlertingSettings.DEFAULT_PERCOLATE_QUERY_NUM_DOCS_IN_MEMORY,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.action.search.TransportSearchAction.SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.action.ExecuteMonitorAction
import org.opensearch.alerting.action.ExecuteMonitorRequest
Expand Down Expand Up @@ -161,6 +162,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
ALERT_BACKOFF_MILLIS.get(monitorCtx.settings),
ALERT_BACKOFF_COUNT.get(monitorCtx.settings)
)

monitorCtx.cancelAfterTimeInterval = SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING.get(monitorCtx.settings)

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALERT_BACKOFF_MILLIS, ALERT_BACKOFF_COUNT) { millis, count ->
monitorCtx.retryPolicy = BackoffPolicy.constantBackoff(millis, count)
}
Expand All @@ -177,6 +181,9 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
monitorCtx.moveAlertsRetryPolicy = BackoffPolicy.exponentialBackoff(millis, count)
}

monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(SEARCH_CANCEL_AFTER_TIME_INTERVAL_SETTING) {
monitorCtx.cancelAfterTimeInterval = it
}
monitorCtx.allowList = ALLOW_LIST.get(monitorCtx.settings)
monitorCtx.clusterService!!.clusterSettings.addSettingsUpdateConsumer(ALLOW_LIST) {
monitorCtx.allowList = it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,6 @@ class TransportGetFindingsSearchAction @Inject constructor(
)
}
searchSourceBuilder.query(queryBuilder).trackTotalHits(true)

client.threadPool().threadContext.stashContext().use {
scope.launch {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
package org.opensearch.alerting.util

import org.apache.logging.log4j.LogManager
import org.opensearch.alerting.AlertService
import org.opensearch.alerting.MonitorRunnerService
import org.opensearch.alerting.model.AlertContext
import org.opensearch.alerting.model.BucketLevelTriggerRunResult
import org.opensearch.alerting.model.destination.Destination
Expand All @@ -25,6 +27,7 @@ import org.opensearch.commons.alerting.model.action.ActionExecutionPolicy
import org.opensearch.commons.alerting.model.action.ActionExecutionScope
import org.opensearch.commons.alerting.util.isBucketLevelMonitor
import org.opensearch.script.Script
import kotlin.math.max

private val logger = LogManager.getLogger("AlertingUtils")

Expand Down Expand Up @@ -172,6 +175,16 @@ inline fun <T : ThreadContext.StoredContext, R> T.use(block: (T) -> R): R {
}
}

fun getCancelAfterTimeInterval(): Long {
// The default value for the cancelAfterTimeInterval is -1 and so, in this case
// we should ignore processing on the value
val givenInterval = MonitorRunnerService.monitorCtx.cancelAfterTimeInterval!!.minutes
if (givenInterval == -1L) {
return givenInterval
}
return max(givenInterval, AlertService.ALERTS_SEARCH_TIMEOUT.minutes)
}

/**
* Closes this [AutoCloseable], suppressing possible exception or error thrown by [AutoCloseable.close] function when
* it's being closed due to some other [cause] exception occurred.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ class AlertServiceTests : OpenSearchTestCase() {
xContentRegistry = Mockito.mock(NamedXContentRegistry::class.java)
threadPool = Mockito.mock(ThreadPool::class.java)
clusterService = Mockito.mock(ClusterService::class.java)

settings = Settings.builder().build()
val settingSet = hashSetOf<Setting<*>>()
settingSet.addAll(ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Expand Down
2 changes: 1 addition & 1 deletion core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ dependencies {
implementation "org.jetbrains.kotlin:kotlin-stdlib:${kotlin_version}"
api 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
api "org.jetbrains.kotlin:kotlin-stdlib-jdk8:${kotlin_version}"
implementation "com.cronutils:cron-utils:9.1.6"
implementation "com.cronutils:cron-utils:9.1.7"
api "org.opensearch.client:opensearch-rest-client:${opensearch_version}"
implementation('com.google.googlejavaformat:google-java-format:1.10.0') {
exclude group: 'com.google.guava'
Expand Down

0 comments on commit 9d38e26

Please sign in to comment.