From 8e42b9e17386a364a3c2661bdcf51c1e193fa2ba Mon Sep 17 00:00:00 2001 From: Ross McDonald Date: Fri, 2 Sep 2016 16:50:23 -0500 Subject: [PATCH] Add a raw TCP handler to alert node. --- CHANGELOG.md | 10 ++++++++++ alert.go | 32 ++++++++++++++++++++++++++++++++ pipeline/alert.go | 25 +++++++++++++++++++++++++ 3 files changed, 67 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index ea51e6402..36ca22e10 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Changelog +## v1.0.1 [unreleased] + +### Release Notes + +### Features + +- [#873](https://github.com/influxdata/kapacitor/pull/873): Add TCP alert handler + +### Bugfixes + ## v1.0.0 [2016-09-02] ### Release Notes diff --git a/alert.go b/alert.go index 89d152f09..2a5689d15 100644 --- a/alert.go +++ b/alert.go @@ -7,6 +7,7 @@ import ( "fmt" html "html/template" "log" + "net" "net/http" "os" "os/exec" @@ -173,6 +174,11 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, l *log.Logger) (an * an.handlers = append(an.handlers, func(ad *AlertData) { an.handlePost(post, ad) }) } + for _, tcp := range n.TcpHandlers { + tcp := tcp + an.handlers = append(an.handlers, func(ad *AlertData) { an.handleTcp(tcp, ad) }) + } + for _, email := range n.EmailHandlers { email := email an.handlers = append(an.handlers, func(ad *AlertData) { an.handleEmail(email, ad) }) @@ -910,6 +916,32 @@ func (a *AlertNode) handlePost(post *pipeline.PostHandler, ad *AlertData) { return } +func (a *AlertNode) handleTcp(tcp *pipeline.TcpHandler, ad *AlertData) { + buf := a.bufPool.Get().(*bytes.Buffer) + defer func() { + buf.Reset() + a.bufPool.Put(buf) + }() + + err := json.NewEncoder(buf).Encode(ad) + if err != nil { + a.logger.Println("E! failed to marshal alert data json", err) + return + } + + conn, err := net.Dial("tcp", tcp.Address) + if err != nil { + a.logger.Println("E! failed to connect", err) + return + } + defer conn.Close() + + buf.WriteByte("\n") + conn.Write(buf.Bytes()) + + return +} + func (a *AlertNode) handleEmail(email *pipeline.EmailHandler, ad *AlertData) { if a.et.tm.SMTPService != nil { err := a.et.tm.SMTPService.SendMail(email.ToList, ad.Message, ad.Details) diff --git a/pipeline/alert.go b/pipeline/alert.go index bec421f69..7be526112 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -37,6 +37,7 @@ const defaultLogFileMode = 0600 // // * log -- log alert data to file. // * post -- HTTP POST data to a specified URL. +// * tcp -- Send data to a specified address via raw TCP. // * email -- Send and email with alert data. // * exec -- Execute a command passing alert data over STDIN. // * HipChat -- Post alert message to HipChat room. @@ -79,6 +80,7 @@ const defaultLogFileMode = 0600 // .crit(lambda: "value" > 30) // .post("http://example.com/api/alert") // .post("http://another.example.com/api/alert") +// .tcp("exampleendpoint.com:5678") // .email('oncall@example.com') // // @@ -283,6 +285,10 @@ type AlertNode struct { // tick:ignore PostHandlers []*PostHandler `tick:"Post"` + // Send the JSON alert data to the specified endpoint via TCP. + // tick:ignore + TcpHandlers []*TcpHandler `tick:"Tcp"` + // Email handlers // tick:ignore EmailHandlers []*EmailHandler `tick:"Email"` @@ -464,6 +470,25 @@ type PostHandler struct { URL string } +// Send JSON alert data to a specified address over TCP. +// tick:property +func (a *AlertNode) Tcp(address string) *TcpHandler { + tcp := &TcpHandler{ + AlertNode: a, + Address: address, + } + a.TcpHandlers = append(a.TcpHandlers, tcp) + return tcp +} + +// tick:embedded:AlertNode.Tcp +type TcpHandler struct { + *AlertNode + + // The endpoint address. + Address string +} + // Email the alert data. // // If the To list is empty, the To addresses from the configuration are used.