Skip to content

Commit

Permalink
remove blocking call to fix multi-node integ tests
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Jul 26, 2023
1 parent c987589 commit 9607d02
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -641,7 +641,7 @@ class AlertService(
if (requestsToRetry.isEmpty()) return
// Retry Bulk requests if there was any 429 response
retryPolicy.retry(logger, listOf(RestStatus.TOO_MANY_REQUESTS)) {
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
val bulkRequest = BulkRequest().add(requestsToRetry).setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL)
val bulkResponse: BulkResponse = client.suspendUntil { client.bulk(bulkRequest, it) }
val failedResponses = (bulkResponse.items ?: arrayOf()).filter { it.isFailed }
requestsToRetry = failedResponses.filter { it.status() == RestStatus.TOO_MANY_REQUESTS }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,7 @@ class TransportGetMonitorAction @Inject constructor(
}
}

actionListener.onResponse(
GetMonitorResponse(
response.id,
response.version,
response.seqNo,
response.primaryTerm,
RestStatus.OK,
monitor,
getAssociatedWorkflows(response.id)
)
)
getAssociatedWorkflows(response.id, response, monitor, actionListener)
}

override fun onFailure(t: Exception) {
Expand All @@ -140,40 +130,72 @@ class TransportGetMonitorAction @Inject constructor(
}
}

private fun getAssociatedWorkflows(id: String): List<AssociatedWorkflow> {
try {
val associatedWorkflows = mutableListOf<AssociatedWorkflow>()
val queryBuilder = QueryBuilders.nestedQuery(
WORKFLOW_DELEGATE_PATH,
QueryBuilders.boolQuery().must(
QueryBuilders.matchQuery(
WORKFLOW_MONITOR_PATH,
id
)
),
ScoreMode.None
)
val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder).fetchField("_id"))
val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get()

for (hit in response.hits) {
XContentType.JSON.xContent().createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceAsString
).use { hitsParser ->
val workflow = ScheduledJob.parse(hitsParser, hit.id, hit.version)
if (workflow is Workflow) {
associatedWorkflows.add(AssociatedWorkflow(hit.id, workflow.name))
private fun getAssociatedWorkflows(
id: String,
getResponse: GetResponse,
monitor: Monitor?,
actionListener: ActionListener<GetMonitorResponse>
) {
val queryBuilder = QueryBuilders.nestedQuery(
WORKFLOW_DELEGATE_PATH,
QueryBuilders.boolQuery().must(
QueryBuilders.matchQuery(
WORKFLOW_MONITOR_PATH,
id
)
),
ScoreMode.None
)
val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder).fetchField("_id"))
client.execute(
SearchAction.INSTANCE,
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
val associatedWorkflows = mutableListOf<AssociatedWorkflow>()
for (hit in response.hits) {
XContentType.JSON.xContent().createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceAsString
).use { hitsParser ->
val workflow = ScheduledJob.parse(hitsParser, hit.id, hit.version)
if (workflow is Workflow) {
associatedWorkflows.add(AssociatedWorkflow(hit.id, workflow.name))
}
}
}

actionListener.onResponse(
GetMonitorResponse(
getResponse.id,
getResponse.version,
getResponse.seqNo,
getResponse.primaryTerm,
RestStatus.OK,
monitor,
associatedWorkflows
)
)
}

override fun onFailure(e: java.lang.Exception) {
log.error("failed to fetch associated workflows for monitor $id", e)
actionListener.onResponse(
GetMonitorResponse(
getResponse.id,
getResponse.version,
getResponse.seqNo,
getResponse.primaryTerm,
RestStatus.OK,
monitor,
emptyList()
)
)
}
}
return associatedWorkflows
} catch (e: java.lang.Exception) {
log.error("failed to fetch associated workflows for monitor $id", e)
return emptyList()
}
)
}
}

0 comments on commit 9607d02

Please sign in to comment.