Skip to content

Commit

Permalink
Add more error notification at fail points (opensearch-project#1000)
Browse files Browse the repository at this point in the history
* Add more error notification at fail points

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* Handle exception gracefully

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

* small fix

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>

---------

Signed-off-by: bowenlan-amzn <bowenlan23@gmail.com>
  • Loading branch information
bowenlan-amzn authored and Joshua152 committed Dec 22, 2023
1 parent ebd2fe5 commit 87af02e
Showing 1 changed file with 48 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@ object ManagedIndexRunner :
return
}

val policy = managedIndexConfig.policy
if (managedIndexMetaData.hasDifferentPolicyVersion(managedIndexConfig)) {
val info = mapOf("message" to "There is a version conflict between your previous execution and your managed index")
val result = updateManagedIndexMetaData(
Expand All @@ -301,11 +302,11 @@ object ManagedIndexRunner :
)
if (result.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
}
return
}

val policy = managedIndexConfig.policy
val state = policy.getStateToExecute(managedIndexMetaData)
val action: Action? = state?.getActionToExecute(managedIndexMetaData, indexMetadataProvider)
val stepContext = StepContext(
Expand All @@ -328,8 +329,10 @@ object ManagedIndexRunner :
managedIndexMetaData
.copy(actionMetaData = currentActionMetaData?.copy(failed = true), info = info)
)
if (updated.metadataSaved)
if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
}
return
}

Expand All @@ -355,7 +358,10 @@ object ManagedIndexRunner :
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
}
return
}
}
Expand All @@ -369,7 +375,10 @@ object ManagedIndexRunner :
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
}
return
}

Expand All @@ -382,7 +391,10 @@ object ManagedIndexRunner :
policyRetryInfo = PolicyRetryInfoMetaData(true, 0), info = info
)
)
if (updated.metadataSaved) disableManagedIndexConfig(managedIndexConfig)
if (updated.metadataSaved) {
disableManagedIndexConfig(managedIndexConfig)
publishErrorNotification(policy, managedIndexMetaData)
}
return
}

Expand Down Expand Up @@ -424,16 +436,7 @@ object ManagedIndexRunner :
var executedManagedIndexMetaData = startingManagedIndexMetaData.getCompletedManagedIndexMetaData(action, step)

if (executedManagedIndexMetaData.isFailed) {
try {
// if the policy has no error_notification this will do nothing otherwise it will try to send the configured error message
publishErrorNotification(policy, executedManagedIndexMetaData)
} catch (e: Exception) {
logger.error("Failed to publish error notification", e)
val errorMessage = e.message ?: "Failed to publish error notification"
val mutableInfo = executedManagedIndexMetaData.info?.toMutableMap() ?: mutableMapOf()
mutableInfo["errorNotificationFailure"] = errorMessage
executedManagedIndexMetaData = executedManagedIndexMetaData.copy(info = mutableInfo.toMap())
}
executedManagedIndexMetaData = publishErrorNotification(policy, executedManagedIndexMetaData)
}

if (executedManagedIndexMetaData.isSuccessfulDelete) {
Expand Down Expand Up @@ -753,13 +756,37 @@ object ManagedIndexRunner :
}
}

private suspend fun publishErrorNotification(policy: Policy, managedIndexMetaData: ManagedIndexMetaData) {
policy.errorNotification?.run {
errorNotificationRetryPolicy.retry(logger) {
val compiledMessage = compileTemplate(messageTemplate, managedIndexMetaData)
destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client)
channel?.sendNotification(client, ErrorNotification.CHANNEL_TITLE, managedIndexMetaData, compiledMessage, policy.user)
private suspend fun publishErrorNotification(policy: Policy, metadata: ManagedIndexMetaData): ManagedIndexMetaData {
return try {
val errorNotification = policy.errorNotification
if (errorNotification != null) {
policy.errorNotification.run {
errorNotificationRetryPolicy.retry(logger) {
val compiledMessage = compileTemplate(messageTemplate, metadata)
destination?.buildLegacyBaseMessage(null, compiledMessage)?.publishLegacyNotification(client)
channel?.sendNotification(
client,
ErrorNotification.CHANNEL_TITLE,
metadata,
compiledMessage,
policy.user
)
}
}
val message = "Successfully published error notification [index = ${metadata.index}]"
logger.info(message)
val mutableInfo = metadata.info?.toMutableMap() ?: mutableMapOf()
mutableInfo["error_notification"] = message
metadata.copy(info = mutableInfo.toMap())
} else {
return metadata
}
} catch (e: Exception) {
logger.error("Failed to publish error notification", e)
val errorMessage = e.message ?: "Failed to publish error notification"
val mutableInfo = metadata.info?.toMutableMap() ?: mutableMapOf()
mutableInfo["error_notification"] = errorMessage
metadata.copy(info = mutableInfo.toMap())
}
}

Expand Down

0 comments on commit 87af02e

Please sign in to comment.