Skip to content

Commit

Permalink
remove blocking call to fix multi-node integ tests
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Jul 25, 2023
1 parent c987589 commit cc8b69d
Showing 1 changed file with 60 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,17 +119,7 @@ class TransportGetMonitorAction @Inject constructor(
}
}

actionListener.onResponse(
GetMonitorResponse(
response.id,
response.version,
response.seqNo,
response.primaryTerm,
RestStatus.OK,
monitor,
getAssociatedWorkflows(response.id)
)
)
getAssociatedWorkflows(response.id, response, monitor, actionListener)
}

override fun onFailure(t: Exception) {
Expand All @@ -140,40 +130,67 @@ class TransportGetMonitorAction @Inject constructor(
}
}

private fun getAssociatedWorkflows(id: String): List<AssociatedWorkflow> {
try {
val associatedWorkflows = mutableListOf<AssociatedWorkflow>()
val queryBuilder = QueryBuilders.nestedQuery(
WORKFLOW_DELEGATE_PATH,
QueryBuilders.boolQuery().must(
QueryBuilders.matchQuery(
WORKFLOW_MONITOR_PATH,
id
)
),
ScoreMode.None
)
val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder).fetchField("_id"))
val response: SearchResponse = client.execute(SearchAction.INSTANCE, searchRequest).get()

for (hit in response.hits) {
XContentType.JSON.xContent().createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceAsString
).use { hitsParser ->
val workflow = ScheduledJob.parse(hitsParser, hit.id, hit.version)
if (workflow is Workflow) {
associatedWorkflows.add(AssociatedWorkflow(hit.id, workflow.name))
private fun getAssociatedWorkflows(id: String, getResponse: GetResponse, monitor: Monitor?, actionListener: ActionListener<GetMonitorResponse>) {
val queryBuilder = QueryBuilders.nestedQuery(
WORKFLOW_DELEGATE_PATH,
QueryBuilders.boolQuery().must(
QueryBuilders.matchQuery(
WORKFLOW_MONITOR_PATH,
id
)
),
ScoreMode.None
)
val searchRequest = SearchRequest()
.indices(ScheduledJob.SCHEDULED_JOBS_INDEX)
.source(SearchSourceBuilder().query(queryBuilder).fetchField("_id"))
client.execute(
SearchAction.INSTANCE,
searchRequest,
object : ActionListener<SearchResponse> {
override fun onResponse(response: SearchResponse) {
val associatedWorkflows = mutableListOf<AssociatedWorkflow>()
for (hit in response.hits) {
XContentType.JSON.xContent().createParser(
xContentRegistry,
LoggingDeprecationHandler.INSTANCE,
hit.sourceAsString
).use { hitsParser ->
val workflow = ScheduledJob.parse(hitsParser, hit.id, hit.version)
if (workflow is Workflow) {
associatedWorkflows.add(AssociatedWorkflow(hit.id, workflow.name))
}
}
}

actionListener.onResponse(
GetMonitorResponse(
getResponse.id,
getResponse.version,
getResponse.seqNo,
getResponse.primaryTerm,
RestStatus.OK,
monitor,
associatedWorkflows
)
)
}

override fun onFailure(e: java.lang.Exception) {
log.error("failed to fetch associated workflows for monitor $id", e)
actionListener.onResponse(
GetMonitorResponse(
getResponse.id,
getResponse.version,
getResponse.seqNo,
getResponse.primaryTerm,
RestStatus.OK,
monitor,
emptyList()
)
)
}
}
return associatedWorkflows
} catch (e: java.lang.Exception) {
log.error("failed to fetch associated workflows for monitor $id", e)
return emptyList()
}
)
}
}

0 comments on commit cc8b69d

Please sign in to comment.