Skip to content

Commit

Permalink
remove blocking call to fix multi-node integ tests
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Jul 25, 2023
1 parent c987589 commit 66c0c41
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ class AlertService(
if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@

package org.opensearch.alerting.transport

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.apache.logging.log4j.LogManager
import org.apache.lucene.search.join.ScoreMode
import org.opensearch.OpenSearchStatusException
Expand All @@ -20,6 +23,7 @@ import org.opensearch.alerting.action.GetMonitorAction
import org.opensearch.alerting.action.GetMonitorRequest
import org.opensearch.alerting.action.GetMonitorResponse
import org.opensearch.alerting.action.GetMonitorResponse.AssociatedWorkflow
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.util.AlertingException
import org.opensearch.alerting.util.ScheduledJobUtils.Companion.WORKFLOW_DELEGATE_PATH
Expand All @@ -43,6 +47,7 @@ import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

private val log = LogManager.getLogger(TransportGetMonitorAction::class.java)
private val scope: CoroutineScope = CoroutineScope(Dispatchers.IO)

class TransportGetMonitorAction @Inject constructor(
transportService: TransportService,
Expand Down Expand Up @@ -118,18 +123,19 @@ class TransportGetMonitorAction @Inject constructor(
}
}
}

actionListener.onResponse(
GetMonitorResponse(
response.id,
response.version,
response.seqNo,
response.primaryTerm,
RestStatus.OK,
monitor,
getAssociatedWorkflows(response.id)
scope.launch {
actionListener.onResponse(
GetMonitorResponse(
response.id,
response.version,
response.seqNo,
response.primaryTerm,
RestStatus.OK,
monitor,
getAssociatedWorkflows(response.id)
)
)
)
}
}

override fun onFailure(t: Exception) {
Expand All @@ -140,7 +146,7 @@ class TransportGetMonitorAction @Inject constructor(
}
}

private fun getAssociatedWorkflows(id: String): List<AssociatedWorkflow> {
private suspend fun getAssociatedWorkflows(id: String): List<AssociatedWorkflow> {
try {
val associatedWorkflows = mutableListOf<AssociatedWorkflow>()
val queryBuilder = QueryBuilders.nestedQuery(
Expand All @@ -156,7 +162,7 @@ class TransportGetMonitorAction @Inject constructor(
val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder).fetchField("_id"))
val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get()
val response: SearchResponse = client.suspendUntil { client.execute(SearchAction.INSTANCE, searchRequest, it) }

for (hit in response.hits) {
XContentType.JSON.xContent().createParser(
Expand Down

0 comments on commit 66c0c41

Please sign in to comment.