diff --git a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsSearchAction.kt b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsSearchAction.kt index f09bc9893..466bbedc9 100644 --- a/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsSearchAction.kt +++ b/alerting/src/main/kotlin/org/opensearch/alerting/transport/TransportGetFindingsSearchAction.kt @@ -6,10 +6,7 @@ package org.opensearch.alerting.transport import org.apache.logging.log4j.LogManager -import org.opensearch.OpenSearchStatusException import org.opensearch.action.ActionListener -import org.opensearch.action.get.GetRequest -import org.opensearch.action.get.GetResponse import org.opensearch.action.search.SearchRequest import org.opensearch.action.search.SearchResponse import org.opensearch.action.support.ActionFilters @@ -30,8 +27,6 @@ import org.opensearch.common.inject.Inject import org.opensearch.common.settings.Settings import org.opensearch.common.xcontent.LoggingDeprecationHandler import org.opensearch.common.xcontent.NamedXContentRegistry -import org.opensearch.common.xcontent.ToXContent -import org.opensearch.common.xcontent.XContentBuilder import org.opensearch.common.xcontent.XContentFactory import org.opensearch.common.xcontent.XContentParser import org.opensearch.common.xcontent.XContentParserUtils @@ -131,6 +126,7 @@ class TransportGetFindingsSearchAction @Inject constructor( object : ActionListener { override fun onResponse(response: SearchResponse) { val totalFindingCount = response.hits.totalHits?.value?.toInt() + val mgetRequest = MultiGetRequest() val findingsWithDocs = mutableListOf() for (hit in response.hits) { val id = hit.id @@ -138,18 +134,15 @@ class TransportGetFindingsSearchAction @Inject constructor( .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, hit.sourceAsString) XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) val finding = Finding.parse(xcp, id) - val doc_ids = finding.relatedDocId.split(",").toTypedArray() - val docs = mutableListOf() - for (doc_id in doc_ids) { - // TODO: Add the document to docs after searching - val findingDocument = searchDocument(doc_id, finding.index, actionListener) + val documentIds = finding.relatedDocId.split(",").toTypedArray() + // Add getRequests to mget request + documentIds.values.forEach { + // TODO: check if we want to add individual get document request, or use documentIds array for a single finding related_docs + docId -> + mgetMetadataReq.add(MultiGetRequest.Item(sourceIndex, docId)) } - val findingWithDoc = FindingWithDocs(finding, docs) - findingsWithDocs.add(findingWithDoc) - // TODO: remove debug log - log.info("findingWithDoc: $findingWithDoc") } - actionListener.onResponse(GetFindingsSearchResponse(RestStatus.OK, totalFindingCount, findingsWithDocs)) + searchDocument(mgetRequest, totalFindingCount, actionListener) } override fun onFailure(t: Exception) { @@ -160,50 +153,32 @@ class TransportGetFindingsSearchAction @Inject constructor( } fun searchDocument( - documentId: String, - sourceIndex: String, + mgetRequest: MultiGetRequest, + totalFindingCount: Int, actionListener: ActionListener ): FindingDocument? { - val getRequest = GetRequest(sourceIndex, documentId) - var findingDocument: FindingDocument? = null - client.threadPool().threadContext.stashContext().use { - client.get( - getRequest, - object : ActionListener { - override fun onResponse(response: GetResponse) { - if (!response.isExists) { - actionListener.onFailure( - AlertingException.wrap( - OpenSearchStatusException( - "Document $documentId not found from source index $sourceIndex.", - RestStatus.NOT_FOUND - ) - ) - ) - return - } - - if (!response.isSourceEmpty) { - val xcp = XContentFactory.xContent(XContentType.JSON) - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.toString()) - XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) - findingDocument = FindingDocument.parse(xcp) - // TODO: remove debug log - log.info("Response not empty") - val docStr = findingDocument?.toXContent( - XContentBuilder.builder(XContentType.JSON.xContent()), - ToXContent.EMPTY_PARAMS - ).string() - log.info("findingDocument: $docStr") - } + client.multiGet( + mgetRequest, + object : ActionListener { + override fun onResponse(response: MultiGetResponse) { + // val findingsWithDocs: Map = + response.responses.associate { + // TODO: REMOVE DEBUG LOG + log.info("response: $response") + val xcp = XContentFactory.xContent(XContentType.JSON) + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, response.toString()) + XContentParserUtils.ensureExpectedToken(XContentParser.Token.START_OBJECT, xcp.nextToken(), xcp) + findingDocument = FindingDocument.parse(xcp) + // TODO: Parse the searched documents and add to map of findings, need to associate original finding id to response } + // TODO: Form the response here with the map/list of findings + actionListener.onResponse(GetFindingsSearchResponse(RestStatus.OK, totalFindingCount, findingsWithDocs)) + } - override fun onFailure(t: Exception) { - actionListener.onFailure(AlertingException.wrap(t)) - } + override fun onFailure(t: Exception) { + actionListener.onFailure(AlertingException.wrap(t)) } - ) - } - return findingDocument + } + ) } }