diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt index f43beda55..0bd88c003 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/DocumentLevelMonitorRunner.kt @@ -69,12 +69,20 @@ import org.opensearch.search.sort.SortOrder import java.io.IOException import java.time.Instant import java.util.UUID -import java.util.concurrent.atomic.AtomicLong import java.util.stream.Collectors import kotlin.math.max -object DocumentLevelMonitorRunner : MonitorRunner() { +class DocumentLevelMonitorRunner : MonitorRunner() { private val logger = LogManager.getLogger(javaClass) + var nonPercolateSearchesTimeTakenStat = 0L + var percolateQueriesTimeTakenStat = 0L + var totalDocsQueriedStat = 0L + var docTransformTimeTakenStat = 0L + var totalDocsSizeInBytesStat = 0L + var docsSizeOfBatchInBytes = 0L + /* Contains list of docs source that are held in memory to submit to percolate query against query index. + * Docs are fetched from the source index per shard and transformed.*/ + val transformedDocs = mutableListOf>() override suspend fun runMonitor( monitor: Monitor, @@ -88,10 +96,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.debug("Document-level-monitor is running ...") val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID var monitorResult = MonitorRunResult(monitor.name, periodStart, periodEnd) - var nonPercolateSearchesTimeTaken = AtomicLong(0) - var percolateQueriesTimeTaken = AtomicLong(0) - var totalDocsQueried = AtomicLong(0) - var docTransformTimeTaken = AtomicLong(0) + try { monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources) monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources) @@ -160,10 +165,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { // Map of document ids per index when monitor is workflow delegate and has chained findings val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex - /* Contains list of docs source that are held in memory to submit to percolate query against query index. - * Docs are fetched from the source index per shard and transformed.*/ - val transformedDocs = mutableListOf>() - val docsSizeInBytes = AtomicLong(0) val concreteIndicesSeenSoFar = mutableListOf() val updatedIndexNames = mutableListOf() docLevelMonitorInput.indices.forEach { indexName -> @@ -212,7 +213,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { indexLastRunContext.toMutableMap(), monitorCtx, concreteIndexName, - nonPercolateSearchesTimeTaken ) as MutableMap if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) || IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state()) @@ -250,14 +250,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata, inputRunResults, docsToQueries, - transformedDocs, - docsSizeInBytes, updatedIndexNames, concreteIndicesSeenSoFar, - nonPercolateSearchesTimeTaken, - percolateQueriesTimeTaken, - totalDocsQueried, - docTransformTimeTaken ) } } @@ -266,16 +260,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (transformedDocs.isNotEmpty()) { performPercolateQueryAndResetCounters( monitorCtx, - transformedDocs, - docsSizeInBytes, monitor, monitorMetadata, updatedIndexNames, concreteIndicesSeenSoFar, inputRunResults, docsToQueries, - percolateQueriesTimeTaken, - totalDocsQueried ) } monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults))) @@ -351,19 +341,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } finally { logger.debug( "PERF_DEBUG_STATS: Monitor ${monitor.id} " + - "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken" + "Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTakenStat" ) logger.debug( "PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}", monitor.id, - percolateQueriesTimeTaken + percolateQueriesTimeTakenStat ) logger.debug( "PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}", monitor.id, - docTransformTimeTaken + docTransformTimeTakenStat ) - logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueried) + logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueriedStat) } } @@ -605,13 +595,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() { lastRunContext: Map, monitorCtx: MonitorRunnerExecutionContext, index: String, - nonPercolateSearchesTimeTaken: AtomicLong ): Map { val count: Int = getShardsCount(monitorCtx.clusterService!!, index) val updatedLastRunContext = lastRunContext.toMutableMap() for (i: Int in 0 until count) { val shard = i.toString() - val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken) + val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard) updatedLastRunContext[shard] = maxSeqNo.toString() } return updatedLastRunContext @@ -648,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * Get the current max seq number of the shard. We find it by searching the last document * in the primary shard. */ - private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long { + private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long { val request: SearchRequest = SearchRequest() .indices(index) .preference("_shards:$shard") @@ -664,7 +653,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { if (response.status() !== RestStatus.OK) { throw IOException("Failed to get max seq no for shard: $shard") } - nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) + nonPercolateSearchesTimeTakenStat += response.took.millis if (response.hits.hits.isEmpty()) return -1L @@ -692,14 +681,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, inputRunResults: MutableMap>, docsToQueries: MutableMap>, - transformedDocs: MutableList>, - docsSizeInBytes: AtomicLong, monitorInputIndices: List, concreteIndices: List, - nonPercolateSearchesTimeTaken: AtomicLong, - percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong, - docTransformTimeTake: AtomicLong, ) { val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int for (i: Int in 0 until count) { @@ -715,8 +698,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { prevSeqNo, maxSeqNo, null, - docIds, - nonPercolateSearchesTimeTaken + docIds ) val startTime = System.currentTimeMillis() transformedDocs.addAll( @@ -726,10 +708,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndexName, monitor.id, conflictingFields, - docsSizeInBytes ) ) - docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime) + docTransformTimeTakenStat += System.currentTimeMillis() - startTime } catch (e: Exception) { logger.error( "Monitor ${monitor.id} :" + @@ -739,46 +720,37 @@ object DocumentLevelMonitorRunner : MonitorRunner() { } if ( transformedDocs.isNotEmpty() && - shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx) + shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx) ) { performPercolateQueryAndResetCounters( monitorCtx, - transformedDocs, - docsSizeInBytes, monitor, monitorMetadata, monitorInputIndices, concreteIndices, inputRunResults, docsToQueries, - percolateQueriesTimeTaken, - totalDocsQueried ) } } } private fun shouldPerformPercolateQueryAndFlushInMemoryDocs( - docsSizeInBytes: AtomicLong, numDocs: Int, monitorCtx: MonitorRunnerExecutionContext, ): Boolean { - return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) || + return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeOfBatchInBytes, monitorCtx) || isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx) } private suspend fun performPercolateQueryAndResetCounters( monitorCtx: MonitorRunnerExecutionContext, - transformedDocs: MutableList>, - docsSizeInBytes: AtomicLong, monitor: Monitor, monitorMetadata: MonitorMetadata, monitorInputIndices: List, concreteIndices: List, inputRunResults: MutableMap>, docsToQueries: MutableMap>, - percolateQueriesTimeTaken: AtomicLong, - totalDocsQueried: AtomicLong ) { try { val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs( @@ -788,7 +760,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata, concreteIndices, monitorInputIndices, - percolateQueriesTimeTaken ) percolateQueryResponseHits.forEach { hit -> @@ -802,10 +773,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() { docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id) } } - totalDocsQueried.getAndAdd(transformedDocs.size.toLong()) + totalDocsQueriedStat += transformedDocs.size.toLong() } finally { transformedDocs.clear() - docsSizeInBytes.set(0) + docsSizeOfBatchInBytes = 0 } } @@ -820,7 +791,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { maxSeqNo: Long, query: String?, docIds: List? = null, - nonPercolateSearchesTimeTaken: AtomicLong, ): SearchHits { if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) { return SearchHits.empty() @@ -851,7 +821,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { logger.error("Failed search shard. Response: $response") throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}") } - nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis) + nonPercolateSearchesTimeTakenStat += response.took.millis return response.hits } @@ -863,7 +833,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { monitorMetadata: MonitorMetadata, concreteIndices: List, monitorInputIndices: List, - percolateQueriesTimeTaken: AtomicLong, ): SearchHits { val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList()) val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices)) @@ -916,7 +885,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) } logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}") - percolateQueriesTimeTaken.getAndAdd(response.took.millis) + percolateQueriesTimeTakenStat += response.took.millis return response.hits } /** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/ @@ -934,7 +903,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() { concreteIndex: String, monitorId: String, conflictingFields: List, - docsSizeInBytes: AtomicLong, ): List> { return hits.mapNotNull(fun(hit: SearchHit): Pair? { try { @@ -948,7 +916,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() { ) var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap) val sourceRef = BytesReference.bytes(xContentBuilder) - docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed()) + docsSizeOfBatchInBytes += sourceRef.ramBytesUsed() + totalDocsSizeInBytesStat += sourceRef.ramBytesUsed() return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef)) } catch (e: Exception) { logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e) @@ -1031,7 +1000,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() { * POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name * and doc source. A list of these POJOs would be passed to percolate query execution logic. */ - private data class TransformedDocDto( + data class TransformedDocDto( var indexName: String, var concreteIndexName: String, var docId: String, diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt index fe8e94734..4aa4c07f0 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/MonitorRunnerService.kt @@ -321,7 +321,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon val runResult = if (monitor.isBucketLevelMonitor()) { BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } else if (monitor.isDocLevelMonitor()) { - DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) + DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } else { QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId) } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt index cfed18c89..a22b09bdc 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/workflow/CompositeWorkflowRunner.kt @@ -251,7 +251,7 @@ object CompositeWorkflowRunner : WorkflowRunner() { executionId ) } else if (delegateMonitor.isDocLevelMonitor()) { - return DocumentLevelMonitorRunner.runMonitor( + return DocumentLevelMonitorRunner().runMonitor( delegateMonitor, monitorCtx, periodStart, diff --git a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json index d2ecc0907..1bfea4ebc 100644 --- a/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json +++ b/alerting/src/main/resources/org/opensearch/alerting/alerts/finding_mapping.json @@ -49,6 +49,9 @@ }, "fields": { "type": "text" + }, + "query_field_names": { + "type": "keyword" } } },