Skip to content

Commit

Permalink
doc-level monitor fan-out approach
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Mar 26, 2024
1 parent 2c7486b commit 3ee0f10
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import org.opensearch.commons.alerting.model.DocLevelMonitorInput
import org.opensearch.commons.alerting.model.DocLevelQuery
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.common.io.stream.Writeable
import org.opensearch.core.index.shard.ShardId
import org.opensearch.core.rest.RestStatus
Expand Down Expand Up @@ -249,38 +248,43 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
cont.resumeWithException(e)
}
},
nodeMap.size
nodeShardAssignments.size
)
val responseReader = Writeable.Reader {
DocLevelMonitorFanOutResponse(it)
}
for (node in nodeMap) {
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
dryrun,
monitorMetadata,
executionId,
indexExecutionContext,
nodeShardAssignments[node.key]!!.toList(),
concreteIndicesSeenSoFar,
workflowRunContext
)

transportService.sendRequest(
node.value,
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(listener, responseReader) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
if (nodeShardAssignments.containsKey(node.key)) {
val docLevelMonitorFanOutRequest = DocLevelMonitorFanOutRequest(
monitor,
dryrun,
monitorMetadata,
executionId,
indexExecutionContext,
nodeShardAssignments[node.key]!!.toList(),
concreteIndicesSeenSoFar,
workflowRunContext
)

transportService.sendRequest(
node.value,
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(
listener,
responseReader
) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
}
}
}
)
)
}
}
}
docLevelMonitorFanOutResponses.addAll(responses)
Expand Down Expand Up @@ -475,15 +479,4 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
}
return nodeShardAssignments
}

/**
* POJO holding information about each doc's concrete index, id, input index pattern/alias/datastream name
* and doc source. A list of these POJOs would be passed to percolate query execution logic.
*/
data class TransformedDocDto(
var indexName: String,
var concreteIndexName: String,
var docId: String,
var docSource: BytesReference
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,23 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
}
}

suspend fun runJob(workflow: Workflow, periodStart: Instant, periodEnd: Instant, dryrun: Boolean, transportService: TransportService?): WorkflowRunResult {
suspend fun runJob(
workflow: Workflow,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
transportService: TransportService?
): WorkflowRunResult {
return CompositeWorkflowRunner.runWorkflow(workflow, monitorCtx, periodStart, periodEnd, dryrun, transportService)
}

suspend fun runJob(job: ScheduledJob, periodStart: Instant, periodEnd: Instant, dryrun: Boolean, transportService: TransportService?): MonitorRunResult<*> {
suspend fun runJob(
job: ScheduledJob,
periodStart: Instant,
periodEnd: Instant,
dryrun: Boolean,
transportService: TransportService?
): MonitorRunResult<*> {
// Updating the scheduled job index at the start of monitor execution runs for when there is an upgrade the the schema mapping
// has not been updated.
if (!IndexUtils.scheduledJobIndexUpdated && monitorCtx.clusterService != null && monitorCtx.client != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ data class DocumentLevelTriggerRunResult(
triggerName = sin.readString(),
error = sin.readException(),
triggeredDocs = sin.readStringList(),
actionResultsMap = sin.readMap() as MutableMap<String, MutableMap<String, ActionRunResult>>
actionResultsMap = readActionResults(sin)
)

override fun internalXContent(builder: XContentBuilder, params: ToXContent.Params): XContentBuilder {
Expand All @@ -39,7 +39,15 @@ data class DocumentLevelTriggerRunResult(
override fun writeTo(out: StreamOutput) {
super.writeTo(out)
out.writeStringCollection(triggeredDocs)
out.writeMap(actionResultsMap as Map<String, Any>)
out.writeInt(actionResultsMap.size)
actionResultsMap.forEach { (alert, actionResults) ->
out.writeString(alert)
out.writeInt(actionResults.size)
actionResults.forEach { (id, result) ->
out.writeString(id)
result.writeTo(out)
}
}
}

companion object {
Expand All @@ -48,5 +56,27 @@ data class DocumentLevelTriggerRunResult(
fun readFrom(sin: StreamInput): TriggerRunResult {
return DocumentLevelTriggerRunResult(sin)
}

@JvmStatic
fun readActionResults(sin: StreamInput): MutableMap<String, MutableMap<String, ActionRunResult>> {
val actionResultsMapReconstruct: MutableMap<String, MutableMap<String, ActionRunResult>> = mutableMapOf()
val size = sin.readInt()
var idx = 0
while (idx < size) {
val alert = sin.readString()
val actionResultsSize = sin.readInt()
val actionRunResultElem = mutableMapOf<String, ActionRunResult>()
var i = 0
while (i < actionResultsSize) {
val actionId = sin.readString()
val actionResult = ActionRunResult.readFrom(sin)
actionRunResultElem[actionId] = actionResult
++i
}
actionResultsMapReconstruct[alert] = actionRunResultElem
++idx
}
return actionResultsMapReconstruct
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class TransportDocLevelMonitorFanOutAction
val inputRunResults = mutableMapOf<String, MutableSet<String>>()
val docsToQueries = mutableMapOf<String, MutableList<String>>()
val indexName = shardIds.first().indexName
val isTempMonitor = dryrun || monitor.id == Monitor.NO_ID

val lastRunContext = if (monitorMetadata.lastRunContext.isNullOrEmpty()) mutableMapOf()
else monitorMetadata.lastRunContext.toMutableMap() as MutableMap<String, MutableMap<String, Any>>
Expand Down Expand Up @@ -316,17 +317,20 @@ class TransportDocLevelMonitorFanOutAction
}
}

// If any error happened during trigger execution, upsert monitor error alert
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
if (errorMessage.isNotEmpty()) {
alertService.upsertMonitorErrorAlert(
monitor = monitor,
errorMessage = errorMessage,
executionId = executionId,
workflowRunContext
)
} else {
onSuccessfulMonitorRun(monitor)
if (!isTempMonitor) {
// If any error happened during trigger execution, upsert monitor error alert
val errorMessage = constructErrorMessageFromTriggerResults(triggerResults = triggerResults)
log.info(errorMessage)
if (errorMessage.isNotEmpty()) {
alertService.upsertMonitorErrorAlert(
monitor = monitor,
errorMessage = errorMessage,
executionId = executionId,
workflowRunContext
)
} else {
onSuccessfulMonitorRun(monitor)
}
}

listener.onResponse(
Expand Down Expand Up @@ -605,7 +609,14 @@ class TransportDocLevelMonitorFanOutAction
}
}
}
ActionRunResult(action.id, action.name, actionOutput, false, Instant.ofEpochMilli(client.threadPool().absoluteTimeInMillis()), null)
ActionRunResult(
action.id,
action.name,
actionOutput,
false,
Instant.ofEpochMilli(client.threadPool().absoluteTimeInMillis()),
null
)
} catch (e: Exception) {
ActionRunResult(action.id, action.name, mapOf(), false, Instant.ofEpochMilli(client.threadPool().absoluteTimeInMillis()), e)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,13 @@ class TransportExecuteWorkflowAction @Inject constructor(
"dryrun: ${execWorkflowRequest.dryrun}"
)
val workflowRunResult =
MonitorRunnerService.runJob(workflow, periodStart, periodEnd, execWorkflowRequest.dryrun, transportService = transportService)
MonitorRunnerService.runJob(
workflow,
periodStart,
periodEnd,
execWorkflowRequest.dryrun,
transportService = transportService
)
withContext(Dispatchers.IO, {
actionListener.onResponse(
ExecuteWorkflowResponse(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,16 @@ object CompositeWorkflowRunner : WorkflowRunner() {
try {
dataSources = delegateMonitor.dataSources
val delegateRunResult =
runDelegateMonitor(delegateMonitor, monitorCtx, periodStart, periodEnd, dryRun, workflowRunContext, executionId, transportService)
runDelegateMonitor(
delegateMonitor,
monitorCtx,
periodStart,
periodEnd,
dryRun,
workflowRunContext,
executionId,
transportService
)
resultList.add(delegateRunResult!!)
} catch (ex: Exception) {
logger.error("Error executing workflow delegate monitor ${delegate.monitorId}", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2732,7 +2732,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
indexDoc(aliasName, "6", testDoc)
indexDoc(aliasName, "7", testDoc)
OpenSearchTestCase.waitUntil(
{ searchFindings(monitor).size == 1 }, 2, TimeUnit.MINUTES
{ searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES
)

rolloverDatastream(aliasName)
Expand All @@ -2743,7 +2743,7 @@ class DocumentMonitorRunnerIT : AlertingRestTestCase() {
indexDoc(aliasName, "16", testDoc)
indexDoc(aliasName, "17", testDoc)
OpenSearchTestCase.waitUntil(
{ searchFindings(monitor).size == 2 }, 2, TimeUnit.MINUTES
{ searchFindings(monitor).size == 6 }, 2, TimeUnit.MINUTES
)

deleteDataStream(aliasName)
Expand Down

0 comments on commit 3ee0f10

Please sign in to comment.