Skip to content

Commit

Permalink
doc-level monitor fan-out approach
Browse files Browse the repository at this point in the history
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
  • Loading branch information
sbcd90 committed Mar 28, 2024
1 parent 116fe90 commit 0588456
Showing 1 changed file with 26 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,11 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
if (IndexUtils.isAlias(indexName, monitorCtx.clusterService!!.state()) ||
IndexUtils.isDataStream(indexName, monitorCtx.clusterService!!.state())
) {
if (concreteIndexName == IndexUtils.getWriteIndex(indexName, monitorCtx.clusterService!!.state())) {
if (concreteIndexName == IndexUtils.getWriteIndex(
indexName,
monitorCtx.clusterService!!.state()
)
) {
updatedLastRunContext.remove(lastWriteIndex)
updatedLastRunContext[concreteIndexName] = indexUpdatedRunContext
}
Expand All @@ -199,7 +203,8 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
// update lastRunContext if its a temp monitor as we only want to view the last bit of data then
// TODO: If dryrun, we should make it so we limit the search as this could still potentially give us lots of data
if (isTempMonitor) {
indexLastRunContext[shard] = max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
indexLastRunContext[shard] =
max(-1, (indexUpdatedRunContext[shard] as String).toInt() - 10)
}
}
val indexExecutionContext = IndexExecutionContext(
Expand Down Expand Up @@ -227,13 +232,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
concreteIndexName
)

nodeShardAssignments.forEach {
logger.info(it.key)
it.value.forEach { it1 ->
logger.info(it1.id.toString())
}
}

val responses: Collection<DocLevelMonitorFanOutResponse> = suspendCoroutine { cont ->
val listener = GroupedActionListener(
object : ActionListener<Collection<DocLevelMonitorFanOutResponse>> {
Expand Down Expand Up @@ -277,7 +275,25 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
responseReader
) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
// retry in local node
transportService.sendRequest(
monitorCtx.clusterService!!.localNode(),
DocLevelMonitorFanOutAction.NAME,
docLevelMonitorFanOutRequest,
TransportRequestOptions.EMPTY,
object : ActionListenerResponseHandler<DocLevelMonitorFanOutResponse>(
listener,
responseReader
) {
override fun handleException(e: TransportException) {
listener.onFailure(e)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
listener.onResponse(response)
}
}
)
}

override fun handleResponse(response: DocLevelMonitorFanOutResponse) {
Expand Down Expand Up @@ -353,7 +369,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
val triggerResults = mutableMapOf<String, DocumentLevelTriggerRunResult>()
val triggerErrorMap = mutableMapOf<String, MutableList<AlertingException>>()
for (res in docLevelMonitorFanOutResponses) {
logger.info("trigger results-${res.triggerResults}")
for (triggerId in res.triggerResults.keys) {
val documentLevelTriggerRunResult = res.triggerResults[triggerId]
if (documentLevelTriggerRunResult != null) {
Expand Down Expand Up @@ -395,7 +410,6 @@ class DocumentLevelMonitorRunner : MonitorRunner() {
triggerResults[triggerId]!!.error = AlertingException.merge(*errorList.toTypedArray())
}
}
logger.info("final trigger results-$triggerResults")
return triggerResults
}

Expand Down

0 comments on commit 0588456

Please sign in to comment.