Skip to content

Commit

Permalink
add event listener demo
Browse files Browse the repository at this point in the history
  • Loading branch information
sbcd90 committed Sep 25, 2023
1 parent ab4cc0e commit 9eb889c
Show file tree
Hide file tree
Showing 10 changed files with 223 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.opensearch.alerting.core.resthandler.RestScheduledJobStatsHandler
import org.opensearch.alerting.core.schedule.JobScheduler
import org.opensearch.alerting.core.settings.LegacyOpenDistroScheduledJobSettings
import org.opensearch.alerting.core.settings.ScheduledJobSettings
import org.opensearch.alerting.event.listener.AlertingEventListenerModule
import org.opensearch.alerting.resthandler.RestAcknowledgeAlertAction
import org.opensearch.alerting.resthandler.RestAcknowledgeChainedAlertAction
import org.opensearch.alerting.resthandler.RestDeleteMonitorAction
Expand Down Expand Up @@ -51,6 +52,7 @@ import org.opensearch.alerting.settings.LegacyOpenDistroAlertingSettings
import org.opensearch.alerting.settings.LegacyOpenDistroDestinationSettings
import org.opensearch.alerting.transport.TransportAcknowledgeAlertAction
import org.opensearch.alerting.transport.TransportAcknowledgeChainedAlertAction
import org.opensearch.alerting.transport.TransportAddEventListenerAction
import org.opensearch.alerting.transport.TransportDeleteMonitorAction
import org.opensearch.alerting.transport.TransportDeleteWorkflowAction
import org.opensearch.alerting.transport.TransportExecuteMonitorAction
Expand Down Expand Up @@ -164,6 +166,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
lateinit var alertIndices: AlertIndices
lateinit var clusterService: ClusterService
lateinit var destinationMigrationCoordinator: DestinationMigrationCoordinator
lateinit var eventListenerModule: AlertingEventListenerModule

override fun getRestHandlers(
settings: Settings,
Expand Down Expand Up @@ -221,7 +224,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
ActionPlugin.ActionHandler(AlertingActions.INDEX_WORKFLOW_ACTION_TYPE, TransportIndexWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.GET_WORKFLOW_ACTION_TYPE, TransportGetWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.DELETE_WORKFLOW_ACTION_TYPE, TransportDeleteWorkflowAction::class.java),
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java)
ActionPlugin.ActionHandler(ExecuteWorkflowAction.INSTANCE, TransportExecuteWorkflowAction::class.java),
ActionPlugin.ActionHandler(AlertingActions.EVENT_LISTENER_ACTION_TYPE, TransportAddEventListenerAction::class.java)
)
}

Expand Down Expand Up @@ -255,6 +259,8 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
// Need to figure out how to use the OpenSearch DI classes rather than handwiring things here.
val settings = environment.settings()
alertIndices = AlertIndices(settings, client, threadPool, clusterService)

this.eventListenerModule = AlertingEventListenerModule(settings = settings, threadPool = threadPool)
runner = MonitorRunnerService
.registerClusterService(clusterService)
.registerClient(client)
Expand All @@ -269,6 +275,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R
.registerAlertService(AlertService(client, xContentRegistry, alertIndices))
.registerDocLevelMonitorQueries(DocLevelMonitorQueries(client, clusterService))
.registerWorkflowService(WorkflowService(client, xContentRegistry))
.registerEventListenerModule(eventListenerModule)
.registerConsumers()
.registerDestinationSettings()
scheduledJobIndices = ScheduledJobIndices(client.admin(), clusterService)
Expand All @@ -295,7 +302,7 @@ internal class AlertingPlugin : PainlessExtension, ActionPlugin, ScriptPlugin, R

DeleteMonitorService.initialize(client)

return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator)
return listOf(sweeper, scheduler, runner, scheduledJobIndices, docLevelMonitorQueries, destinationMigrationCoordinator, eventListenerModule)
}

override fun getSettings(): List<Setting<*>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,6 @@ import org.opensearch.cluster.routing.ShardRouting
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.xcontent.XContentFactory
import org.opensearch.common.xcontent.XContentType
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.ActionExecutionResult
import org.opensearch.commons.alerting.model.Alert
import org.opensearch.commons.alerting.model.DocLevelMonitorInput
Expand All @@ -46,7 +43,6 @@ import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.alerting.model.Monitor
import org.opensearch.commons.alerting.model.action.PerAlertActionScope
import org.opensearch.commons.alerting.util.string
import org.opensearch.core.action.ActionListener
import org.opensearch.core.common.bytes.BytesReference
import org.opensearch.core.rest.RestStatus
import org.opensearch.core.xcontent.ToXContent
Expand Down Expand Up @@ -487,15 +483,10 @@ object DocumentLevelMonitorRunner : MonitorRunner() {
monitorCtx: MonitorRunnerExecutionContext,
finding: Finding
) {
val publishFindingsRequest = PublishFindingsRequest(monitor.id, finding)
AlertingPluginInterface.publishFinding(
monitorCtx.eventListenerModule!!.eventListener().onFindingCreated(
monitorCtx.client!! as NodeClient,
publishFindingsRequest,
object : ActionListener<SubscribeFindingsResponse> {
override fun onResponse(response: SubscribeFindingsResponse) {}

override fun onFailure(e: Exception) {}
}
monitor.id,
finding
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.alerting

import org.opensearch.action.bulk.BackoffPolicy
import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.event.listener.AlertingEventListenerModule
import org.opensearch.alerting.model.destination.DestinationContextFactory
import org.opensearch.alerting.settings.AlertingSettings
import org.opensearch.alerting.settings.DestinationSettings
Expand Down Expand Up @@ -36,6 +37,7 @@ data class MonitorRunnerExecutionContext(
var alertService: AlertService? = null,
var docLevelMonitorQueries: DocLevelMonitorQueries? = null,
var workflowService: WorkflowService? = null,
var eventListenerModule: AlertingEventListenerModule? = null,

@Volatile var retryPolicy: BackoffPolicy? = null,
@Volatile var moveAlertsRetryPolicy: BackoffPolicy? = null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import org.opensearch.alerting.alerts.AlertIndices
import org.opensearch.alerting.alerts.AlertMover.Companion.moveAlerts
import org.opensearch.alerting.core.JobRunner
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.event.listener.AlertingEventListenerModule
import org.opensearch.alerting.model.MonitorRunResult
import org.opensearch.alerting.model.WorkflowRunResult
import org.opensearch.alerting.model.destination.DestinationContextFactory
Expand Down Expand Up @@ -117,6 +118,11 @@ object MonitorRunnerService : JobRunner, CoroutineScope, AbstractLifecycleCompon
return this
}

fun registerEventListenerModule(eventListenerModule: AlertingEventListenerModule): MonitorRunnerService {
monitorCtx.eventListenerModule = eventListenerModule
return this
}

fun registerAlertService(alertService: AlertService): MonitorRunnerService {
this.monitorCtx.alertService = alertService
return this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,9 +382,9 @@ class AlertIndices(
}

// TODO call getMapping and compare actual mappings here instead of this
if (targetIndex == IndexUtils.lastUpdatedAlertHistoryIndex || targetIndex == IndexUtils.lastUpdatedFindingHistoryIndex) {
/* if (targetIndex == IndexUtils.lastUpdatedAlertHistoryIndex || targetIndex == IndexUtils.lastUpdatedFindingHistoryIndex) {
return
}
}*/

val putMappingRequest: PutMappingRequest = PutMappingRequest(targetIndex)
.source(mapping, XContentType.JSON)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.event.listener

import org.opensearch.client.node.NodeClient
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.authuser.User

interface AlertingEventListener {

suspend fun onAdCallbackCalled(client: NodeClient, monitorId: String, user: User?)

fun onFindingCreated(client: NodeClient, monitorId: String, finding: Finding)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.event.listener

import org.apache.logging.log4j.LogManager
import org.opensearch.common.settings.Settings
import org.opensearch.threadpool.ThreadPool
import java.lang.IllegalStateException
import java.util.concurrent.atomic.AtomicBoolean

private val log = LogManager.getLogger(AlertingEventListenerModule::class.java)

class AlertingEventListenerModule(
val alertingEventListeners: MutableMap<String, MutableSet<String>> = mutableMapOf(),
var frozen: AtomicBoolean = AtomicBoolean(false),
var eventListener: AlertingEventListener? = null,
val settings: Settings,
val threadPool: ThreadPool
) {

public fun addEventListener(listener: String?, topic: String) {
ensureNotFrozen()
if (listener == null) {
throw IllegalArgumentException("listener must not be null")
}
if (alertingEventListeners.contains(topic)) {
if (alertingEventListeners[topic]!!.contains(listener)) {
throw IllegalArgumentException("listener already added for topic")
} else {
alertingEventListeners[topic]!!.add(listener)
}
} else {
alertingEventListeners[topic] = mutableSetOf(listener)
}
}

public fun eventListener(): AlertingEventListener {
if (!isFrozen()) {
ensureNotFrozen()
eventListener = freeze()
}
return eventListener!!
}

private fun freeze(): CompositeAlertingEventListener {
if (this.frozen.compareAndSet(false, true)) {
return CompositeAlertingEventListener(alertingEventListeners, settings, threadPool)
} else {
throw IllegalStateException("already frozen")
}
}

private fun ensureNotFrozen() {
if (this.frozen.get()) {
throw IllegalArgumentException("Can't modify AlertingEventListenerModule once Alerting callbacks are frozen")
}
}

private fun isFrozen(): Boolean {
return frozen.get()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.event.listener

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionType
import org.opensearch.alerting.opensearchapi.InjectorContextElement
import org.opensearch.alerting.opensearchapi.withClosableContext
import org.opensearch.client.node.NodeClient
import org.opensearch.common.settings.Settings
import org.opensearch.commons.alerting.AlertingPluginInterface
import org.opensearch.commons.alerting.action.PublishAdRequest
import org.opensearch.commons.alerting.action.PublishFindingsRequest
import org.opensearch.commons.alerting.action.SubscribeAdResponse
import org.opensearch.commons.alerting.action.SubscribeFindingsResponse
import org.opensearch.commons.alerting.model.Finding
import org.opensearch.commons.authuser.User
import org.opensearch.core.action.ActionListener
import org.opensearch.threadpool.ThreadPool

private val log = LogManager.getLogger(CompositeAlertingEventListener::class.java)

class CompositeAlertingEventListener(
val topics: MutableMap<String, MutableSet<String>>,
val settings: Settings,
val threadPool: ThreadPool
) : AlertingEventListener {

override suspend fun onAdCallbackCalled(client: NodeClient, monitorId: String, user: User?) {
val listeners = topics["onAdCallbackCalled"]
listeners?.forEach { listener ->
val SUBSCRIBE_AD_ACTION_TYPE = ActionType(listener, ::SubscribeAdResponse)
val request = PublishAdRequest(monitorId)

withClosableContext(InjectorContextElement(monitorId, settings, threadPool.threadContext, user!!.roles, user)) {
AlertingPluginInterface.publishAdRequest(
client,
SUBSCRIBE_AD_ACTION_TYPE,
request,
object : ActionListener<SubscribeAdResponse> {
override fun onResponse(response: SubscribeAdResponse) {}

override fun onFailure(e: Exception) {}
}
)
}
}
}

override fun onFindingCreated(client: NodeClient, monitorId: String, finding: Finding) {
val listeners = topics["onFindingCreated"]
listeners?.forEach { listener ->
val SUBSCRIBE_FINDINGS_ACTION_TYPE = ActionType(listener, ::SubscribeFindingsResponse)
val request = PublishFindingsRequest(monitorId, finding)

AlertingPluginInterface.publishFinding(
client,
SUBSCRIBE_FINDINGS_ACTION_TYPE,
request,
object : ActionListener<SubscribeFindingsResponse> {
override fun onResponse(response: SubscribeFindingsResponse) {}

override fun onFailure(e: Exception) {}
}
)
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.alerting.transport

import org.apache.logging.log4j.LogManager
import org.opensearch.action.ActionRequest
import org.opensearch.action.support.ActionFilters
import org.opensearch.action.support.HandledTransportAction
import org.opensearch.alerting.event.listener.AlertingEventListenerModule
import org.opensearch.client.Client
import org.opensearch.common.inject.Inject
import org.opensearch.commons.alerting.action.AlertingActions
import org.opensearch.commons.alerting.action.EventListenerRequest
import org.opensearch.commons.alerting.action.EventListenerResponse
import org.opensearch.commons.utils.recreateObject
import org.opensearch.core.action.ActionListener
import org.opensearch.tasks.Task
import org.opensearch.transport.TransportService

private val log = LogManager.getLogger(TransportAddEventListenerAction::class.java)

class TransportAddEventListenerAction @Inject constructor(
transportService: TransportService,
val client: Client,
actionFilters: ActionFilters,
val eventListenerModule: AlertingEventListenerModule
) : HandledTransportAction<ActionRequest, EventListenerResponse>(
AlertingActions.EVENT_LISTENER_ACTION_NAME, transportService, actionFilters,
::EventListenerRequest
) {

override fun doExecute(task: Task, request: ActionRequest, listener: ActionListener<EventListenerResponse>) {
log.info("hit here- TransportAddEventListenerAction")
val transformedRequest = request as? EventListenerRequest
?: recreateObject(request) { EventListenerRequest(it) }
eventListenerModule.addEventListener(transformedRequest.listener, transformedRequest.topic)
listener.onResponse(EventListenerResponse())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.opensearch.action.support.WriteRequest.RefreshPolicy
import org.opensearch.action.support.master.AcknowledgedResponse
import org.opensearch.alerting.MonitorMetadataService
import org.opensearch.alerting.core.ScheduledJobIndices
import org.opensearch.alerting.event.listener.AlertingEventListenerModule
import org.opensearch.alerting.model.MonitorMetadata
import org.opensearch.alerting.opensearchapi.suspendUntil
import org.opensearch.alerting.service.DeleteMonitorService
Expand All @@ -47,6 +48,7 @@ import org.opensearch.alerting.util.addUserBackendRolesFilter
import org.opensearch.alerting.util.getRoleFilterEnabled
import org.opensearch.alerting.util.isADMonitor
import org.opensearch.client.Client
import org.opensearch.client.node.NodeClient
import org.opensearch.cluster.service.ClusterService
import org.opensearch.common.inject.Inject
import org.opensearch.common.settings.Settings
Expand Down Expand Up @@ -94,7 +96,8 @@ class TransportIndexMonitorAction @Inject constructor(
val clusterService: ClusterService,
val settings: Settings,
val xContentRegistry: NamedXContentRegistry,
val namedWriteableRegistry: NamedWriteableRegistry
val namedWriteableRegistry: NamedWriteableRegistry,
val eventListenerModule: AlertingEventListenerModule
) : HandledTransportAction<ActionRequest, IndexMonitorResponse>(
AlertingActions.INDEX_MONITOR_ACTION_NAME,
transportService,
Expand Down Expand Up @@ -532,6 +535,8 @@ class TransportIndexMonitorAction @Inject constructor(
throw t
}

log.info("calling ad callback")
eventListenerModule.eventListener().onAdCallbackCalled(client as NodeClient, indexResponse.id, user)
actionListener.onResponse(
IndexMonitorResponse(
indexResponse.id,
Expand Down

0 comments on commit 9eb889c

Please sign in to comment.