Skip to content

Commit

Permalink
add queryfieldnames field in findings mapping
Browse files Browse the repository at this point in the history
Signed-off-by: Surya Sashank Nistala <snistala@amazon.com>
  • Loading branch information
eirsep committed Feb 21, 2024
1 parent 5bf9f72 commit 25d722a
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,20 @@ import org.opensearch.search.sort.SortOrder
import java.io.IOException
import java.time.Instant
import java.util.UUID
import java.util.concurrent.atomic.AtomicLong
import java.util.stream.Collectors
import kotlin.math.max

object DocumentLevelMonitorRunner : MonitorRunner() {
class DocumentLevelMonitorRunner : MonitorRunner() {
private val logger = LogManager.getLogger(javaClass)
var nonPercolateSearchesTimeTakenStat = 0L
var percolateQueriesTimeTakenStat = 0L
var totalDocsQueriedStat = 0L
var docTransformTimeTakenStat = 0L
var totalDocsSizeInBytesStat = 0L
var docsSizeOfBatchInBytes = 0L
/* Contains list of docs source that are held in memory to submit to percolate query against query index.
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()

override suspend fun runMonitor(
monitor: Monitor,
Expand All @@ -88,10 +96,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.debug("Document-level-monitor is running ...")
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID
var monitorResult = MonitorRunResult<DocumentLevelTriggerRunResult>(monitor.name, periodStart, periodEnd)
var nonPercolateSearchesTimeTaken = AtomicLong(0)
var percolateQueriesTimeTaken = AtomicLong(0)
var totalDocsQueried = AtomicLong(0)
var docTransformTimeTaken = AtomicLong(0)

try {
monitorCtx.alertIndices!!.createOrUpdateAlertIndex(monitor.dataSources)
monitorCtx.alertIndices!!.createOrUpdateInitialAlertHistoryIndex(monitor.dataSources)
Expand Down Expand Up @@ -160,10 +165,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
// Map of document ids per index when monitor is workflow delegate and has chained findings
val matchingDocIdsPerIndex = workflowRunContext?.matchingDocIdsPerIndex

/* Contains list of docs source that are held in memory to submit to percolate query against query index.
* Docs are fetched from the source index per shard and transformed.*/
val transformedDocs = mutableListOf<Pair<String, TransformedDocDto>>()
val docsSizeInBytes = AtomicLong(0)
val concreteIndicesSeenSoFar = mutableListOf<String>()
val updatedIndexNames = mutableListOf<String>()
docLevelMonitorInput.indices.forEach { indexName ->
Expand Down Expand Up @@ -212,7 +213,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
indexLastRunContext.toMutableMap(),
monitorCtx,
concreteIndexName,
nonPercolateSearchesTimeTaken
) as MutableMap<String, Any>
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
Expand Down Expand Up @@ -250,14 +250,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata,
inputRunResults,
docsToQueries,
transformedDocs,
docsSizeInBytes,
updatedIndexNames,
concreteIndicesSeenSoFar,
nonPercolateSearchesTimeTaken,
percolateQueriesTimeTaken,
totalDocsQueried,
docTransformTimeTaken
)
}
}
Expand All @@ -266,16 +260,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (transformedDocs.isNotEmpty()) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
updatedIndexNames,
concreteIndicesSeenSoFar,
inputRunResults,
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
monitorResult = monitorResult.copy(inputResults = InputRunResults(listOf(inputRunResults)))
Expand Down Expand Up @@ -351,19 +341,19 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
} finally {
logger.debug(
"PERF_DEBUG_STATS: Monitor ${monitor.id} " +
"Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTaken"
"Time spent on fetching data from shards in millis: $nonPercolateSearchesTimeTakenStat"
)
logger.debug(
"PERF_DEBUG_STATS: Monitor {} Time spent on percolate queries in millis: {}",
monitor.id,
percolateQueriesTimeTaken
percolateQueriesTimeTakenStat
)
logger.debug(
"PERF_DEBUG_STATS: Monitor {} Time spent on transforming doc fields in millis: {}",
monitor.id,
docTransformTimeTaken
docTransformTimeTakenStat
)
logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueried)
logger.debug("PERF_DEBUG_STATS: Monitor {} Num docs queried: {}", monitor.id, totalDocsQueriedStat)
}
}

Expand Down Expand Up @@ -605,13 +595,12 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
lastRunContext: Map<String, Any>,
monitorCtx: MonitorRunnerExecutionContext,
index: String,
nonPercolateSearchesTimeTaken: AtomicLong
): Map<String, Any> {
val count: Int = getShardsCount(monitorCtx.clusterService!!, index)
val updatedLastRunContext = lastRunContext.toMutableMap()
for (i: Int in 0 until count) {
val shard = i.toString()
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard, nonPercolateSearchesTimeTaken)
val maxSeqNo: Long = getMaxSeqNo(monitorCtx.client!!, index, shard)
updatedLastRunContext[shard] = maxSeqNo.toString()
}
return updatedLastRunContext
Expand Down Expand Up @@ -648,7 +637,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
* Get the current max seq number of the shard. We find it by searching the last document
* in the primary shard.
*/
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String, nonPercolateSearchesTimeTaken: AtomicLong): Long {
private suspend fun getMaxSeqNo(client: Client, index: String, shard: String): Long {
val request: SearchRequest = SearchRequest()
.indices(index)
.preference("_shards:$shard")
Expand All @@ -664,7 +653,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
if (response.status() !== RestStatus.OK) {
throw IOException("Failed to get max seq no for shard: $shard")
}
nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
nonPercolateSearchesTimeTakenStat += response.took.millis
if (response.hits.hits.isEmpty())
return -1L

Expand Down Expand Up @@ -692,14 +681,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata: MonitorMetadata,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
docsSizeInBytes: AtomicLong,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
nonPercolateSearchesTimeTaken: AtomicLong,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong,
docTransformTimeTake: AtomicLong,
) {
val count: Int = docExecutionCtx.updatedLastRunContext["shards_count"] as Int
for (i: Int in 0 until count) {
Expand All @@ -715,8 +698,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
prevSeqNo,
maxSeqNo,
null,
docIds,
nonPercolateSearchesTimeTaken
docIds
)
val startTime = System.currentTimeMillis()
transformedDocs.addAll(
Expand All @@ -726,10 +708,9 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndexName,
monitor.id,
conflictingFields,
docsSizeInBytes
)
)
docTransformTimeTake.getAndAdd(System.currentTimeMillis() - startTime)
docTransformTimeTakenStat += System.currentTimeMillis() - startTime
} catch (e: Exception) {
logger.error(
"Monitor ${monitor.id} :" +
Expand All @@ -739,46 +720,37 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
}
if (
transformedDocs.isNotEmpty() &&
shouldPerformPercolateQueryAndFlushInMemoryDocs(docsSizeInBytes, transformedDocs.size, monitorCtx)
shouldPerformPercolateQueryAndFlushInMemoryDocs(transformedDocs.size, monitorCtx)
) {
performPercolateQueryAndResetCounters(
monitorCtx,
transformedDocs,
docsSizeInBytes,
monitor,
monitorMetadata,
monitorInputIndices,
concreteIndices,
inputRunResults,
docsToQueries,
percolateQueriesTimeTaken,
totalDocsQueried
)
}
}
}

private fun shouldPerformPercolateQueryAndFlushInMemoryDocs(
docsSizeInBytes: AtomicLong,
numDocs: Int,
monitorCtx: MonitorRunnerExecutionContext,
): Boolean {
return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeInBytes.get(), monitorCtx) ||
return isInMemoryDocsSizeExceedingMemoryLimit(docsSizeOfBatchInBytes, monitorCtx) ||
isInMemoryNumDocsExceedingMaxDocsPerPercolateQueryLimit(numDocs, monitorCtx)
}

private suspend fun performPercolateQueryAndResetCounters(
monitorCtx: MonitorRunnerExecutionContext,
transformedDocs: MutableList<Pair<String, TransformedDocDto>>,
docsSizeInBytes: AtomicLong,
monitor: Monitor,
monitorMetadata: MonitorMetadata,
monitorInputIndices: List<String>,
concreteIndices: List<String>,
inputRunResults: MutableMap<String, MutableSet<String>>,
docsToQueries: MutableMap<String, MutableList<String>>,
percolateQueriesTimeTaken: AtomicLong,
totalDocsQueried: AtomicLong
) {
try {
val percolateQueryResponseHits = runPercolateQueryOnTransformedDocs(
Expand All @@ -788,7 +760,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata,
concreteIndices,
monitorInputIndices,
percolateQueriesTimeTaken
)

percolateQueryResponseHits.forEach { hit ->
Expand All @@ -802,10 +773,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
docsToQueries.getOrPut(docIndex) { mutableListOf() }.add(id)
}
}
totalDocsQueried.getAndAdd(transformedDocs.size.toLong())
totalDocsQueriedStat += transformedDocs.size.toLong()
} finally {
transformedDocs.clear()
docsSizeInBytes.set(0)
docsSizeOfBatchInBytes = 0
}
}

Expand All @@ -820,7 +791,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
maxSeqNo: Long,
query: String?,
docIds: List<String>? = null,
nonPercolateSearchesTimeTaken: AtomicLong,
): SearchHits {
if (prevSeqNo?.equals(maxSeqNo) == true && maxSeqNo != 0L) {
return SearchHits.empty()
Expand Down Expand Up @@ -851,7 +821,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
logger.error("Failed search shard. Response: $response")
throw IOException("Failed to search shard: [$shard] in index [$index]. Response status is ${response.status()}")
}
nonPercolateSearchesTimeTaken.getAndAdd(response.took.millis)
nonPercolateSearchesTimeTakenStat += response.took.millis
return response.hits
}

Expand All @@ -863,7 +833,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorMetadata: MonitorMetadata,
concreteIndices: List<String>,
monitorInputIndices: List<String>,
percolateQueriesTimeTaken: AtomicLong,
): SearchHits {
val indices = docs.stream().map { it.second.indexName }.distinct().collect(Collectors.toList())
val boolQueryBuilder = BoolQueryBuilder().must(buildShouldClausesOverPerIndexMatchQueries(indices))
Expand Down Expand Up @@ -916,7 +885,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
}
logger.debug("Monitor ${monitor.id} PERF_DEBUG: Percolate query time taken millis = ${response.took}")
percolateQueriesTimeTaken.getAndAdd(response.took.millis)
percolateQueriesTimeTakenStat += response.took.millis
return response.hits
}
/** we cannot use terms query because `index` field's mapping is of type TEXT and not keyword. Refer doc-level-queries.json*/
Expand All @@ -934,7 +903,6 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndex: String,
monitorId: String,
conflictingFields: List<String>,
docsSizeInBytes: AtomicLong,
): List<Pair<String, TransformedDocDto>> {
return hits.mapNotNull(fun(hit: SearchHit): Pair<String, TransformedDocDto>? {
try {
Expand All @@ -948,7 +916,8 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
)
var xContentBuilder = XContentFactory.jsonBuilder().map(sourceMap)
val sourceRef = BytesReference.bytes(xContentBuilder)
docsSizeInBytes.getAndAdd(sourceRef.ramBytesUsed())
docsSizeOfBatchInBytes += sourceRef.ramBytesUsed()
totalDocsSizeInBytesStat += sourceRef.ramBytesUsed()
return Pair(hit.id, TransformedDocDto(index, concreteIndex, hit.id, sourceRef))
} catch (e: Exception) {
logger.error("Monitor $monitorId: Failed to transform payload $hit for percolate query", e)
Expand Down Expand Up @@ -1031,7 +1000,7 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
* POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name
* and doc source. A list of these POJOs would be passed to percolate query execution logic.
*/
private data class TransformedDocDto(
data class TransformedDocDto(
var indexName: String,
var concreteIndexName: String,
var docId: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
val runResult = if (monitor.isBucketLevelMonitor()) {
BucketLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
} else if (monitor.isDocLevelMonitor()) {
DocumentLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
DocumentLevelMonitorRunner().runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
} else {
QueryLevelMonitorRunner.runMonitor(monitor, monitorCtx, periodStart, periodEnd, dryrun, executionId = executionId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ object CompositeWorkflowRunner : WorkflowRunner() {
executionId
)
} else if (delegateMonitor.isDocLevelMonitor()) {
return DocumentLevelMonitorRunner.runMonitor(
return DocumentLevelMonitorRunner().runMonitor(
delegateMonitor,
monitorCtx,
periodStart,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@
},
"fields": {
"type": "text"
},
"query_field_names": {
"type": "keyword"
}
}
},
Expand Down

0 comments on commit 25d722a

Please sign in to comment.