diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt index b7067f3b0..126b02948 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt @@ -119,17 +119,7 @@ class TransportGetMonitorAction @Inject constructor( } } - actionListener.onResponse( - GetMonitorResponse( - response.id, - response.version, - response.seqNo, - response.primaryTerm, - RestStatus.OK, - monitor, - getAssociatedWorkflows(response.id) - ) - ) + getAssociatedWorkflows(response.id, response, monitor, actionListener) } override fun onFailure(t: Exception) { @@ -140,40 +130,72 @@ class TransportGetMonitorAction @Inject constructor( } } - private fun getAssociatedWorkflows(id: String): List { - try { - val associatedWorkflows = mutableListOf() - val queryBuilder = QueryBuilders.nestedQuery( - WORKFLOW_DELEGATE_PATH, - QueryBuilders.boolQuery().must( - QueryBuilders.matchQuery( - WORKFLOW_MONITOR_PATH, - id - ) - ), - ScoreMode.None - ) - val searchRequest = SearchRequest() - .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) - .source(SearchSourceBuilder().query(queryBuilder).fetchField("_id")) - val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get() - - for (hit in response.hits) { - XContentType.JSON.xContent().createParser( - xContentRegistry, - LoggingDeprecationHandler.INSTANCE, - hit.sourceAsString - ).use { hitsParser -> - val workflow = ScheduledJob.parse(hitsParser, hit.id, hit.version) - if (workflow is Workflow) { - associatedWorkflows.add(AssociatedWorkflow(hit.id, workflow.name)) + private fun getAssociatedWorkflows( + id: String, + getResponse: GetResponse, + monitor: Monitor?, + actionListener: ActionListener + ) { + val queryBuilder = QueryBuilders.nestedQuery( + WORKFLOW_DELEGATE_PATH, + QueryBuilders.boolQuery().must( + QueryBuilders.matchQuery( + WORKFLOW_MONITOR_PATH, + id + ) + ), + ScoreMode.None + ) + val searchRequest = SearchRequest() + .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) + .source(SearchSourceBuilder().query(queryBuilder).fetchField("_id")) + client.execute( + SearchAction.INSTANCE, + searchRequest, + object : ActionListener { + override fun onResponse(response: SearchResponse) { + val associatedWorkflows = mutableListOf() + for (hit in response.hits) { + XContentType.JSON.xContent().createParser( + xContentRegistry, + LoggingDeprecationHandler.INSTANCE, + hit.sourceAsString + ).use { hitsParser -> + val workflow = ScheduledJob.parse(hitsParser, hit.id, hit.version) + if (workflow is Workflow) { + associatedWorkflows.add(AssociatedWorkflow(hit.id, workflow.name)) + } + } } + + actionListener.onResponse( + GetMonitorResponse( + getResponse.id, + getResponse.version, + getResponse.seqNo, + getResponse.primaryTerm, + RestStatus.OK, + monitor, + associatedWorkflows + ) + ) + } + + override fun onFailure(e: java.lang.Exception) { + log.error("failed to fetch associated workflows for monitor $id", e) + actionListener.onResponse( + GetMonitorResponse( + getResponse.id, + getResponse.version, + getResponse.seqNo, + getResponse.primaryTerm, + RestStatus.OK, + monitor, + emptyList() + ) + ) } } - return associatedWorkflows - } catch (e: java.lang.Exception) { - log.error("failed to fetch associated workflows for monitor $id", e) - return emptyList() - } + ) } }