Skip to content

Commit

Permalink
allow for multiple alert handlers of the same type
Browse files Browse the repository at this point in the history
update alert docs and tickdoc for new doc structure
  • Loading branch information
nathanielc committed Dec 23, 2015
1 parent 70f8dcd commit 1c4b4a6
Show file tree
Hide file tree
Showing 7 changed files with 546 additions and 301 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
### Release Notes

### Features
- [#118](https://github.com/influxdb/kapacitor/issues/118): Can now define multiple handlers of the same type on an AlertNode.
- [#107](https://github.com/influxdb/kapacitor/issues/107): Enable TICKscript variables to be defined and then referenced from lambda expressions.
Also fixes various bugs around using regexes.

Expand Down
191 changes: 111 additions & 80 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,40 +96,71 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode) (an *AlertNode, err

// Construct alert handlers
an.handlers = make([]AlertHandler, 0)
if n.Post != "" {
an.handlers = append(an.handlers, an.handlePost)

for _, post := range n.PostHandlers {
post := post
an.handlers = append(an.handlers, func(ad *AlertData) { an.handlePost(post, ad) })
}
if n.Log != "" {
if !path.IsAbs(n.Log) {
return nil, fmt.Errorf("alert log path must be absolute: %s is not absolute", n.Log)
}
an.handlers = append(an.handlers, an.handleLog)

for _, email := range n.EmailHandlers {
email := email
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleEmail(email, ad) })
}
if len(n.Command) > 0 {
an.handlers = append(an.handlers, an.handleExec)
if len(n.EmailHandlers) == 0 && (et.tm.SMTPService != nil && et.tm.SMTPService.Global()) {
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleEmail(&pipeline.EmailHandler{}, ad) })
}
// If email has been configured globally only send state changes.
if et.tm.SMTPService != nil && et.tm.SMTPService.Global() {
n.IsStateChangesOnly = true
}

for _, exec := range n.ExecHandlers {
exec := exec
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleExec(exec, ad) })
}
if n.UseEmail || (et.tm.SMTPService != nil && et.tm.SMTPService.Global()) {
an.handlers = append(an.handlers, an.handleEmail)
// If email has been configured globally only send state changes.
if et.tm.SMTPService != nil && et.tm.SMTPService.Global() {
n.IsStateChangesOnly = true

for _, log := range n.LogHandlers {
log := log
if !path.IsAbs(log.FilePath) {
return nil, fmt.Errorf("alert log path must be absolute: %s is not absolute", log.FilePath)
}
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleLog(log, ad) })
}

for _, vo := range n.VictorOpsHandlers {
vo := vo
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleVictorOps(vo, ad) })
}
if n.UseOpsGenie || (et.tm.OpsGenieService != nil && et.tm.OpsGenieService.Global()) {
an.handlers = append(an.handlers, an.handleOpsGenie)
if len(n.VictorOpsHandlers) == 0 && (et.tm.VictorOpsService != nil && et.tm.VictorOpsService.Global()) {
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleVictorOps(&pipeline.VictorOpsHandler{}, ad) })
}
if n.UseVictorOps || (et.tm.VictorOpsService != nil && et.tm.VictorOpsService.Global()) {
an.handlers = append(an.handlers, an.handleVictorOps)

for _, pd := range n.PagerDutyHandlers {
pd := pd
an.handlers = append(an.handlers, func(ad *AlertData) { an.handlePagerDuty(pd, ad) })
}
if n.UsePagerDuty || (et.tm.PagerDutyService != nil && et.tm.PagerDutyService.Global()) {
an.handlers = append(an.handlers, an.handlePagerDuty)
if len(n.PagerDutyHandlers) == 0 && (et.tm.PagerDutyService != nil && et.tm.PagerDutyService.Global()) {
an.handlers = append(an.handlers, func(ad *AlertData) { an.handlePagerDuty(&pipeline.PagerDutyHandler{}, ad) })
}
if n.UseSlack || (et.tm.SlackService != nil && et.tm.SlackService.Global()) {
an.handlers = append(an.handlers, an.handleSlack)
// If slack has been configured globally only send state changes.
if et.tm.SlackService != nil && et.tm.SlackService.Global() {
n.IsStateChangesOnly = true
}

for _, slack := range n.SlackHandlers {
slack := slack
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleSlack(slack, ad) })
}
if len(n.SlackHandlers) == 0 && (et.tm.SlackService != nil && et.tm.SlackService.Global()) {
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleSlack(&pipeline.SlackHandler{}, ad) })
}
// If slack has been configured globally only send state changes.
if et.tm.SlackService != nil && et.tm.SlackService.Global() {
n.IsStateChangesOnly = true
}

for _, og := range n.OpsGenieHandlers {
og := og
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleOpsGenie(og, ad) })
}
if len(n.OpsGenieHandlers) == 0 && (et.tm.OpsGenieService != nil && et.tm.OpsGenieService.Global()) {
an.handlers = append(an.handlers, func(ad *AlertData) { an.handleOpsGenie(&pipeline.OpsGenieHandler{}, ad) })
}

// Parse level expressions
Expand Down Expand Up @@ -407,61 +438,39 @@ func (a *AlertNode) renderMessage(id, name string, group models.GroupID, tags mo
//--------------------------------
// Alert handlers

func (a *AlertNode) handlePost(ad *AlertData) {
func (a *AlertNode) handlePost(post *pipeline.PostHandler, ad *AlertData) {
b, err := json.Marshal(ad)
if err != nil {
a.logger.Println("E! failed to marshal alert data json", err)
return
}
buf := bytes.NewBuffer(b)
_, err = http.Post(a.a.Post, "application/json", buf)
_, err = http.Post(post.URL, "application/json", buf)
if err != nil {
a.logger.Println("E! failed to POST batch", err)
}
}

func (a *AlertNode) handleEmail(ad *AlertData) {
func (a *AlertNode) handleEmail(email *pipeline.EmailHandler, ad *AlertData) {
b, err := json.Marshal(ad)
if err != nil {
a.logger.Println("E! failed to marshal alert data json", err)
return
}
if a.et.tm.SMTPService != nil {
a.et.tm.SMTPService.SendMail(a.a.ToList, ad.Message, string(b))
a.et.tm.SMTPService.SendMail(email.ToList, ad.Message, string(b))
} else {
a.logger.Println("W! smtp service not enabled, cannot send email.")
}
}

func (a *AlertNode) handleLog(ad *AlertData) {
func (a *AlertNode) handleExec(ex *pipeline.ExecHandler, ad *AlertData) {
b, err := json.Marshal(ad)
if err != nil {
a.logger.Println("E! failed to marshal alert data json", err)
return
}
f, err := os.OpenFile(a.a.Log, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
a.logger.Println("E! failed to open file for alert logging", err)
return
}
defer f.Close()
n, err := f.Write(b)
if n != len(b) || err != nil {
a.logger.Println("E! failed to write to file", err)
}
n, err = f.Write([]byte("\n"))
if n != 1 || err != nil {
a.logger.Println("E! failed to write to file", err)
}
}

func (a *AlertNode) handleExec(ad *AlertData) {
b, err := json.Marshal(ad)
if err != nil {
a.logger.Println("E! failed to marshal alert data json", err)
return
}
cmd := exec.Command(a.a.Command[0], a.a.Command[1:]...)
cmd := exec.Command(ex.Command[0], ex.Command[1:]...)
cmd.Stdin = bytes.NewBuffer(b)
var out bytes.Buffer
cmd.Stdout = &out
Expand All @@ -473,35 +482,29 @@ func (a *AlertNode) handleExec(ad *AlertData) {
}
}

func (a *AlertNode) handleOpsGenie(ad *AlertData) {
if a.et.tm.OpsGenieService == nil {
a.logger.Println("E! failed to send OpsGenie alert. OpsGenie is not enabled")
func (a *AlertNode) handleLog(l *pipeline.LogHandler, ad *AlertData) {
b, err := json.Marshal(ad)
if err != nil {
a.logger.Println("E! failed to marshal alert data json", err)
return
}
var messageType string
switch ad.Level {
case OKAlert:
messageType = "RECOVERY"
default:
messageType = ad.Level.String()
}

err := a.et.tm.OpsGenieService.Alert(
a.a.OpsGenieTeams,
a.a.OpsGenieRecipients,
messageType,
ad.Message,
ad.ID,
ad.Time,
ad.Data,
)
f, err := os.OpenFile(l.FilePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0600)
if err != nil {
a.logger.Println("E! failed to send alert data to OpsGenie:", err)
a.logger.Println("E! failed to open file for alert logging", err)
return
}
defer f.Close()
n, err := f.Write(b)
if n != len(b) || err != nil {
a.logger.Println("E! failed to write to file", err)
}
n, err = f.Write([]byte("\n"))
if n != 1 || err != nil {
a.logger.Println("E! failed to write to file", err)
}
}

func (a *AlertNode) handleVictorOps(ad *AlertData) {
func (a *AlertNode) handleVictorOps(vo *pipeline.VictorOpsHandler, ad *AlertData) {
if a.et.tm.VictorOpsService == nil {
a.logger.Println("E! failed to send VictorOps alert. VictorOps is not enabled")
return
Expand All @@ -514,7 +517,7 @@ func (a *AlertNode) handleVictorOps(ad *AlertData) {
messageType = ad.Level.String()
}
err := a.et.tm.VictorOpsService.Alert(
a.a.VictorOpsRoutingKey,
vo.RoutingKey,
messageType,
ad.Message,
ad.ID,
Expand All @@ -527,7 +530,7 @@ func (a *AlertNode) handleVictorOps(ad *AlertData) {
}
}

func (a *AlertNode) handlePagerDuty(ad *AlertData) {
func (a *AlertNode) handlePagerDuty(pd *pipeline.PagerDutyHandler, ad *AlertData) {
if a.et.tm.PagerDutyService == nil {
a.logger.Println("E! failed to send PagerDuty alert. PagerDuty is not enabled")
return
Expand All @@ -543,13 +546,13 @@ func (a *AlertNode) handlePagerDuty(ad *AlertData) {
}
}

func (a *AlertNode) handleSlack(ad *AlertData) {
func (a *AlertNode) handleSlack(slack *pipeline.SlackHandler, ad *AlertData) {
if a.et.tm.SlackService == nil {
a.logger.Println("E! failed to send Slack message. Slack is not enabled")
return
}
err := a.et.tm.SlackService.Alert(
a.a.SlackChannel,
slack.Channel,
ad.Message,
ad.Level,
)
Expand All @@ -558,3 +561,31 @@ func (a *AlertNode) handleSlack(ad *AlertData) {
return
}
}

func (a *AlertNode) handleOpsGenie(og *pipeline.OpsGenieHandler, ad *AlertData) {
if a.et.tm.OpsGenieService == nil {
a.logger.Println("E! failed to send OpsGenie alert. OpsGenie is not enabled")
return
}
var messageType string
switch ad.Level {
case OKAlert:
messageType = "RECOVERY"
default:
messageType = ad.Level.String()
}

err := a.et.tm.OpsGenieService.Alert(
og.TeamsList,
og.RecipientsList,
messageType,
ad.Message,
ad.ID,
ad.Time,
ad.Data,
)
if err != nil {
a.logger.Println("E! failed to send alert data to OpsGenie:", err)
return
}
}
Loading

0 comments on commit 1c4b4a6

Please sign in to comment.