diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt index 5fac12ac8..0ccc19a97 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/AlertService.kt @@ -641,7 +641,7 @@ class AlertService( if (requestsToRetry.isEmpty()) return // Retry Bulk requests if there was any 429 response retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) { - val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL) val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) } val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed } requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS } diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt index b7067f3b0..616c6338c 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetMonitorAction.kt @@ -5,6 +5,9 @@ package org.opensearch.alerting.transport +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.launch import org.apache.logging.log4j.LogManager import org.apache.lucene.search.join.ScoreMode import org.opensearch.OpenSearchStatusException @@ -20,6 +23,7 @@ import org.opensearch.alerting.action.GetMonitorAction import org.opensearch.alerting.action.GetMonitorRequest import org.opensearch.alerting.action.GetMonitorResponse import org.opensearch.alerting.action.GetMonitorResponse.AssociatedWorkflow +import org.opensearch.alerting.opensearchapi.suspendUntil import org.opensearch.alerting.settings.AlertingSettings import org.opensearch.alerting.util.AlertingException import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH @@ -43,6 +47,7 @@ import org.opensearch.tasks.Task import org.opensearch.transport.TransportService private val log = LogManager.getLogger(TransportGetMonitorAction::class.java) +private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO) class TransportGetMonitorAction @Inject constructor( transportService: TransportService, @@ -118,18 +123,19 @@ class TransportGetMonitorAction @Inject constructor( } } } - - actionListener.onResponse( - GetMonitorResponse( - response.id, - response.version, - response.seqNo, - response.primaryTerm, - RestStatus.OK, - monitor, - getAssociatedWorkflows(response.id) + scope.launch { + actionListener.onResponse( + GetMonitorResponse( + response.id, + response.version, + response.seqNo, + response.primaryTerm, + RestStatus.OK, + monitor, + getAssociatedWorkflows(response.id) + ) ) - ) + } } override fun onFailure(t: Exception) { @@ -140,7 +146,7 @@ class TransportGetMonitorAction @Inject constructor( } } - private fun getAssociatedWorkflows(id: String): List { + private suspend fun getAssociatedWorkflows(id: String): List { try { val associatedWorkflows = mutableListOf() val queryBuilder = QueryBuilders.nestedQuery( @@ -156,7 +162,7 @@ class TransportGetMonitorAction @Inject constructor( val searchRequest = SearchRequest() .indices(ScheduledJob.SCHEDULED_JOBS_INDEX) .source(SearchSourceBuilder().query(queryBuilder).fetchField("_id")) - val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get() + val response: SearchResponse = client.suspendUntil { client.execute(SearchAction.INSTANCE, searchRequest, it) } for (hit in response.hits) { XContentType.JSON.xContent().createParser(