From 13ef2f51f5ceaaffe041a31d735158d8ace2267c Mon Sep 17 00:00:00 2001 From: Sebastian Borza Date: Tue, 6 Feb 2018 16:28:41 -0600 Subject: [PATCH] initial commit, support for pagerduty API v2 --- CHANGELOG.md | 1 + CONTRIBUTING.md | 2 +- alert.go | 14 + etc/kapacitor/kapacitor.conf | 11 + integrations/streamer_test.go | 108 +++++ pipeline/alert.go | 63 +++ pipeline/alert_test.go | 1 + pipeline/json_test.go | 1 + pipeline/tick/alert.go | 5 + pipeline/tick/alert_test.go | 19 + server/config.go | 36 +- server/server.go | 15 + server/server_test.go | 189 +++++++++ services/alert/service.go | 12 + services/diagnostic/handlers.go | 18 + services/diagnostic/service.go | 6 + services/pagerduty2/config.go | 40 ++ .../pagerduty2test/pagerduty2test.go | 89 ++++ services/pagerduty2/service.go | 379 ++++++++++++++++++ task_master.go | 6 + 20 files changed, 999 insertions(+), 16 deletions(-) create mode 100644 services/pagerduty2/config.go create mode 100644 services/pagerduty2/pagerduty2test/pagerduty2test.go create mode 100644 services/pagerduty2/service.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 49cab94d8..b5618d319 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ alert will auto-recover. - [#1827](https://github.com/influxdata/kapacitor/pull/1827): Fix deadlock in load service when task has an error. +- [#1795](https://github.com/influxdata/kapacitor/pull/1795): Support PagerDuty API v2 ## v1.4.0 [2017-12-08] diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 2f0973d88..95d01a5a1 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -136,7 +136,7 @@ Kapacitor uses the golang [dep](https://github.com/golang/dep) tool. Install the dep tool: ``` -go get -u github.com/golang/dep +go get -v -u github.com/golang/dep/cmd/dep ``` See the dep help for usage and documentation. diff --git a/alert.go b/alert.go index f8b54c728..7bc3d495b 100644 --- a/alert.go +++ b/alert.go @@ -23,6 +23,7 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie2" "github.com/influxdata/kapacitor/services/pagerduty" + "github.com/influxdata/kapacitor/services/pagerduty2" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/sensu" "github.com/influxdata/kapacitor/services/slack" @@ -203,6 +204,19 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a an.handlers = append(an.handlers, h) } + for _, pd := range n.PagerDuty2Handlers { + c := pagerduty2.HandlerConfig{ + ServiceKey: pd.ServiceKey, + } + h := et.tm.PagerDuty2Service.Handler(c, ctx...) + an.handlers = append(an.handlers, h) + } + if len(n.PagerDuty2Handlers) == 0 && (et.tm.PagerDuty2Service != nil && et.tm.PagerDuty2Service.Global()) { + c := pagerduty2.HandlerConfig{} + h := et.tm.PagerDuty2Service.Handler(c, ctx...) + an.handlers = append(an.handlers, h) + } + for _, s := range n.SensuHandlers { c := sensu.HandlerConfig{ Source: s.Source, diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index c36b0cf04..72e6ba2da 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -311,6 +311,17 @@ default-retention-policy = "" # without explicitly marking them in the TICKscript. global = false +[pagerduty2] + # Configure PagerDuty API v2. + enabled = false + # Your PagerDuty Service Key. + service-key = "" + # The PagerDuty API v2 URL should not need to be changed. + url = "https://events.pagerduty.com/v2/enqueue" + # If true the all alerts will be sent to PagerDuty + # without explicitly marking them in the TICKscript. + global = false + [pushover] # Configure Pushover. enabled = false diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 6e6231f1e..02e5f3064 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -52,6 +52,8 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie2/opsgenie2test" "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/pagerduty/pagerdutytest" + "github.com/influxdata/kapacitor/services/pagerduty2" + "github.com/influxdata/kapacitor/services/pagerduty2/pagerduty2test" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/pushover/pushovertest" "github.com/influxdata/kapacitor/services/sensu" @@ -9239,6 +9241,112 @@ stream } } +func TestStream_AlertPagerDuty2(t *testing.T) { + ts := pagerduty2test.NewServer() + defer ts.Close() + + detailsTmpl := map[string]interface{}{ + "result": map[string]interface{}{ + "series": []interface{}{ + map[string]interface{}{ + "name": "cpu", + "tags": map[string]interface{}{ + "host": "serverA", + }, + "columns": []interface{}{"time", "count"}, + "values": []interface{}{ + []interface{}{"1971-01-01T00:00:10Z", float64(10)}, + }, + }, + }, + }, + } + + var script = ` +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |window() + .period(10s) + .every(10s) + |count('value') + |alert() + .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') + .message('{{ .Level }} alert for {{ .ID }}') + .info(lambda: "count" > 6.0) + .warn(lambda: "count" > 7.0) + .crit(lambda: "count" > 8.0) + .pagerDuty2() + .pagerDuty2() + .serviceKey('test_override_key') + ` + + var kapacitorURL string + tmInit := func(tm *kapacitor.TaskMaster) { + c := pagerduty2.NewConfig() + c.Enabled = true + c.URL = ts.URL + c.ServiceKey = "service_key" + pd := pagerduty2.NewService(c, diagService.NewPagerDuty2Handler()) + pd.HTTPDService = tm.HTTPDService + tm.PagerDuty2Service = pd + + kapacitorURL = tm.HTTPDService.URL() + } + testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit) + + exp := []interface{}{ + pagerduty2test.Request{ + URL: "/", + PostData: pagerduty2test.PostData{ + Client: "kapacitor", + ClientURL: kapacitorURL, + EventAction: "trigger", + DedupKey: "kapacitor/cpu/serverA", + Payload: &pagerduty2test.PDCEF{ + Summary: "CRITICAL alert for kapacitor/cpu/serverA", + Source: "serverA", + Severity: "critical", + Class: "TestStream_Alert", + CustomDetails: detailsTmpl, + Timestamp: "1971-01-01T00:00:10.000000000Z", + }, + RoutingKey: "service_key", + }, + }, + pagerduty2test.Request{ + URL: "/", + PostData: pagerduty2test.PostData{ + Client: "kapacitor", + ClientURL: kapacitorURL, + EventAction: "trigger", + DedupKey: "kapacitor/cpu/serverA", + Payload: &pagerduty2test.PDCEF{ + Summary: "CRITICAL alert for kapacitor/cpu/serverA", + Source: "serverA", + Severity: "critical", + Class: "TestStream_Alert", + CustomDetails: detailsTmpl, + Timestamp: "1971-01-01T00:00:10.000000000Z", + }, + RoutingKey: "test_override_key", + }, + }, + } + + ts.Close() + var got []interface{} + for _, g := range ts.Requests() { + got = append(got, g) + } + + if err := compareListIgnoreOrder(got, exp, nil); err != nil { + t.Error(err) + } +} + func TestStream_AlertHTTPPost(t *testing.T) { ts := httpposttest.NewAlertServer(nil, false) defer ts.Close() diff --git a/pipeline/alert.go b/pipeline/alert.go index cc08d9f0b..f49f9a0af 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -23,6 +23,7 @@ const defaultMessageTmpl = "{{ .ID }} is {{ .Level }}" // Default template for constructing a details message. const defaultDetailsTmpl = "{{ json . }}" +// AlertNode struct wraps the default AlertNodeData // tick:wraps:AlertNodeData type AlertNode struct{ *AlertNodeData } @@ -325,6 +326,10 @@ type AlertNodeData struct { // tick:ignore PagerDutyHandlers []*PagerDutyHandler `tick:"PagerDuty" json:"pagerDuty"` + // Send alert to PagerDuty API v2. + // tick:ignore + PagerDuty2Handlers []*PagerDuty2Handler `tick:"PagerDuty2" json:"pagerDuty2"` + // Send alert to Pushover. // tick:ignore PushoverHandlers []*PushoverHandler `tick:"Pushover" json:"pushover"` @@ -903,6 +908,64 @@ type PagerDutyHandler struct { ServiceKey string `json:"serviceKey"` } +// Send the alert to PagerDuty API v2. +// To use PagerDuty alerting you must first follow the steps to enable a new 'Generic API' service. +// NOTE: the API v2 endpoint is different and requires a new configuration in order to process/handle alerts +// +// From https://developer.pagerduty.com/documentation/integration/events +// +// 1. In your account, under the Services tab, click "Add New Service". +// 2. Enter a name for the service and select an escalation policy. Then, select "Generic API" for the Service Type. +// 3. Click the "Add Service" button. +// 4. Once the service is created, you'll be taken to the service page. On this page, you'll see the "Service key", which is needed to access the API +// +// Place the 'service key' into the 'pagerduty' section of the Kapacitor configuration as the option 'service-key'. +// +// Example: +// [pagerduty2] +// enabled = true +// service-key = "xxxxxxxxx" +// +// With the correct configuration you can now use PagerDuty in TICKscripts. +// +// Example: +// stream +// |alert() +// .pagerDuty2() +// +// If the 'pagerduty' section in the configuration has the option: global = true +// then all alerts are sent to PagerDuty without the need to explicitly state it +// in the TICKscript. +// +// Example: +// [pagerduty2] +// enabled = true +// service-key = "xxxxxxxxx" +// global = true +// +// Example: +// stream +// |alert() +// +// Send alert to PagerDuty API v2. +// tick:property +func (n *AlertNodeData) PagerDuty2() *PagerDuty2Handler { + pd2 := &PagerDuty2Handler{ + AlertNodeData: n, + } + n.PagerDuty2Handlers = append(n.PagerDuty2Handlers, pd2) + return pd2 +} + +// tick:embedded:AlertNode.PagerDuty +type PagerDuty2Handler struct { + *AlertNodeData `json:"-"` + + // The service key to use for the alert. + // Defaults to the value in the configuration if empty. + ServiceKey string `json:"serviceKey"` +} + // Send the alert to HipChat. // For step-by-step instructions on setting up Kapacitor with HipChat, see the [Event Handler Setup Guide](https://docs.influxdata.com//kapacitor/latest/guides/event-handler-setup/#hipchat-setup). // To allow Kapacitor to post to HipChat, diff --git a/pipeline/alert_test.go b/pipeline/alert_test.go index ccd0da03b..421287027 100644 --- a/pipeline/alert_test.go +++ b/pipeline/alert_test.go @@ -65,6 +65,7 @@ func TestAlertNode_MarshalJSON(t *testing.T) { "log": null, "victorOps": null, "pagerDuty": null, + "pagerDuty2": null, "pushover": null, "sensu": null, "slack": null, diff --git a/pipeline/json_test.go b/pipeline/json_test.go index c330e5ded..51c21f165 100644 --- a/pipeline/json_test.go +++ b/pipeline/json_test.go @@ -246,6 +246,7 @@ func TestPipeline_MarshalJSON(t *testing.T) { "log": null, "victorOps": null, "pagerDuty": null, + "pagerDuty2": null, "pushover": null, "sensu": null, "slack": null, diff --git a/pipeline/tick/alert.go b/pipeline/tick/alert.go index a4c1d7014..1a2956bcf 100644 --- a/pipeline/tick/alert.go +++ b/pipeline/tick/alert.go @@ -109,6 +109,11 @@ func (n *AlertNode) Build(a *pipeline.AlertNode) (ast.Node, error) { Dot("serviceKey", h.ServiceKey) } + for _, h := range a.PagerDuty2Handlers { + n.Dot("pagerDuty2"). + Dot("serviceKey", h.ServiceKey) + } + for _, h := range a.PushoverHandlers { n.Dot("pushover"). Dot("userKey", h.UserKey). diff --git a/pipeline/tick/alert_test.go b/pipeline/tick/alert_test.go index 6a2cd3873..47320acf6 100644 --- a/pipeline/tick/alert_test.go +++ b/pipeline/tick/alert_test.go @@ -193,6 +193,7 @@ func TestAlertTCPJSON(t *testing.T) { "log": null, "victorOps": null, "pagerDuty": null, + "pagerDuty2": null, "pushover": null, "sensu": null, "slack": null, @@ -306,6 +307,24 @@ func TestAlertPagerDuty(t *testing.T) { PipelineTickTestHelper(t, pipe, want) } +func TestAlertPagerDuty2(t *testing.T) { + pipe, _, from := StreamFrom() + handler := from.Alert().PagerDuty2() + handler.ServiceKey = "LeafsNation" + + want := `stream + |from() + |alert() + .id('{{ .Name }}:{{ .Group }}') + .message('{{ .ID }} is {{ .Level }}') + .details('{{ json . }}') + .history(21) + .pagerDuty2() + .serviceKey('LeafsNation') +` + PipelineTickTestHelper(t, pipe, want) +} + func TestAlertPushover(t *testing.T) { pipe, _, from := StreamFrom() handler := from.Alert().Pushover() diff --git a/server/config.go b/server/config.go index 3bae23bd1..f07518f2b 100644 --- a/server/config.go +++ b/server/config.go @@ -34,6 +34,7 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie2" "github.com/influxdata/kapacitor/services/pagerduty" + "github.com/influxdata/kapacitor/services/pagerduty2" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/replay" "github.com/influxdata/kapacitor/services/reporting" @@ -79,21 +80,22 @@ type Config struct { UDP []udp.Config `toml:"udp"` // Alert handlers - Alerta alerta.Config `toml:"alerta" override:"alerta"` - HipChat hipchat.Config `toml:"hipchat" override:"hipchat"` - MQTT mqtt.Configs `toml:"mqtt" override:"mqtt,element-key=name"` - OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"` - OpsGenie2 opsgenie2.Config `toml:"opsgenie2" override:"opsgenie2"` - PagerDuty pagerduty.Config `toml:"pagerduty" override:"pagerduty"` - Pushover pushover.Config `toml:"pushover" override:"pushover"` - HTTPPost httppost.Configs `toml:"httppost" override:"httppost,element-key=endpoint"` - SMTP smtp.Config `toml:"smtp" override:"smtp"` - SNMPTrap snmptrap.Config `toml:"snmptrap" override:"snmptrap"` - Sensu sensu.Config `toml:"sensu" override:"sensu"` - Slack slack.Config `toml:"slack" override:"slack"` - Talk talk.Config `toml:"talk" override:"talk"` - Telegram telegram.Config `toml:"telegram" override:"telegram"` - VictorOps victorops.Config `toml:"victorops" override:"victorops"` + Alerta alerta.Config `toml:"alerta" override:"alerta"` + HipChat hipchat.Config `toml:"hipchat" override:"hipchat"` + MQTT mqtt.Configs `toml:"mqtt" override:"mqtt,element-key=name"` + OpsGenie opsgenie.Config `toml:"opsgenie" override:"opsgenie"` + OpsGenie2 opsgenie2.Config `toml:"opsgenie2" override:"opsgenie2"` + PagerDuty pagerduty.Config `toml:"pagerduty" override:"pagerduty"` + PagerDuty2 pagerduty2.Config `toml:"pagerduty2" override:"pagerduty2"` + Pushover pushover.Config `toml:"pushover" override:"pushover"` + HTTPPost httppost.Configs `toml:"httppost" override:"httppost,element-key=endpoint"` + SMTP smtp.Config `toml:"smtp" override:"smtp"` + SNMPTrap snmptrap.Config `toml:"snmptrap" override:"snmptrap"` + Sensu sensu.Config `toml:"sensu" override:"sensu"` + Slack slack.Config `toml:"slack" override:"slack"` + Talk talk.Config `toml:"talk" override:"talk"` + Telegram telegram.Config `toml:"telegram" override:"telegram"` + VictorOps victorops.Config `toml:"victorops" override:"victorops"` // Discovery for scraping Scraper []scraper.Config `toml:"scraper" override:"scraper,element-key=name"` @@ -150,6 +152,7 @@ func NewConfig() *Config { c.OpsGenie = opsgenie.NewConfig() c.OpsGenie2 = opsgenie2.NewConfig() c.PagerDuty = pagerduty.NewConfig() + c.PagerDuty2 = pagerduty2.NewConfig() c.Pushover = pushover.NewConfig() c.HTTPPost = httppost.Configs{httppost.NewConfig()} c.SMTP = smtp.NewConfig() @@ -272,6 +275,9 @@ func (c *Config) Validate() error { if err := c.PagerDuty.Validate(); err != nil { return errors.Wrap(err, "pagerduty") } + if err := c.PagerDuty2.Validate(); err != nil { + return errors.Wrap(err, "pagerduty2") + } if err := c.Pushover.Validate(); err != nil { return errors.Wrap(err, "pushover") } diff --git a/server/server.go b/server/server.go index 13d3ea44a..dfd538421 100644 --- a/server/server.go +++ b/server/server.go @@ -45,6 +45,7 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie2" "github.com/influxdata/kapacitor/services/pagerduty" + "github.com/influxdata/kapacitor/services/pagerduty2" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/replay" "github.com/influxdata/kapacitor/services/reporting" @@ -234,6 +235,7 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv s.appendOpsGenieService() s.appendOpsGenie2Service() s.appendPagerDutyService() + s.appendPagerDuty2Service() s.appendPushoverService() if err := s.appendHTTPPostService(); err != nil { return nil, errors.Wrap(err, "httppost service") @@ -619,6 +621,19 @@ func (s *Server) appendPagerDutyService() { s.AppendService("pagerduty", srv) } +func (s *Server) appendPagerDuty2Service() { + c := s.config.PagerDuty2 + d := s.DiagService.NewPagerDuty2Handler() + srv := pagerduty2.NewService(c, d) + srv.HTTPDService = s.HTTPDService + + s.TaskMaster.PagerDuty2Service = srv + s.AlertService.PagerDuty2Service = srv + + s.SetDynamicService("pagerduty2", srv) + s.AppendService("pagerduty2", srv) +} + func (s *Server) appendPushoverService() { c := s.config.Pushover d := s.DiagService.NewPushoverHandler() diff --git a/server/server_test.go b/server/server_test.go index 6397516ff..0faff321b 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/davecgh/go-spew/spew" "github.com/dgrijalva/jwt-go" iclient "github.com/influxdata/influxdb/client/v2" "github.com/influxdata/influxdb/influxql" @@ -46,6 +47,8 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie2/opsgenie2test" "github.com/influxdata/kapacitor/services/pagerduty" "github.com/influxdata/kapacitor/services/pagerduty/pagerdutytest" + "github.com/influxdata/kapacitor/services/pagerduty2" + "github.com/influxdata/kapacitor/services/pagerduty2/pagerduty2test" "github.com/influxdata/kapacitor/services/pushover/pushovertest" "github.com/influxdata/kapacitor/services/sensu/sensutest" "github.com/influxdata/kapacitor/services/slack/slacktest" @@ -7301,6 +7304,76 @@ func TestServer_UpdateConfig(t *testing.T) { }, }, }, + { + section: "pagerduty2", + setDefaults: func(c *server.Config) { + c.PagerDuty2.ServiceKey = "secret" + }, + expDefaultSection: client.ConfigSection{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/pagerduty2"}, + Elements: []client.ConfigElement{{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/pagerduty2/"}, + Options: map[string]interface{}{ + "enabled": false, + "global": false, + "service-key": true, + "url": pagerduty2.DefaultPagerDuty2APIURL, + }, + Redacted: []string{ + "service-key", + }, + }}, + }, + expDefaultElement: client.ConfigElement{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/pagerduty2/"}, + Options: map[string]interface{}{ + "enabled": false, + "global": false, + "service-key": true, + "url": pagerduty2.DefaultPagerDuty2APIURL, + }, + Redacted: []string{ + "service-key", + }, + }, + updates: []updateAction{ + { + updateAction: client.ConfigUpdateAction{ + Set: map[string]interface{}{ + "service-key": "", + "enabled": true, + }, + }, + expSection: client.ConfigSection{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/pagerduty2"}, + Elements: []client.ConfigElement{{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/pagerduty2/"}, + Options: map[string]interface{}{ + "enabled": true, + "global": false, + "service-key": false, + "url": pagerduty2.DefaultPagerDuty2APIURL, + }, + Redacted: []string{ + "service-key", + }, + }}, + }, + expElement: client.ConfigElement{ + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/config/pagerduty2/"}, + Options: map[string]interface{}{ + "enabled": true, + "global": false, + "service-key": false, + "url": pagerduty2.DefaultPagerDuty2APIURL, + }, + Redacted: []string{ + "service-key", + }, + }, + }, + }, + }, { section: "smtp", setDefaults: func(c *server.Config) { @@ -8221,6 +8294,27 @@ func TestServer_ListServiceTests(t *testing.T) { "details": "", }, }, + { + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/pagerduty2"}, + Name: "pagerduty2", + Options: client.ServiceTestOptions{ + "incident-key": "testIncidentKey", + "description": "test pagerduty2 message", + "level": "CRITICAL", + "event_data": map[string]interface{}{ + "Fields": map[string]interface{}{}, + "Result": map[string]interface{}{ + "series": interface{}(nil), + }, + "Name": "testPagerDuty2", + "TaskName": "", + "Group": "", + "Tags": map[string]interface{}{}, + "Recoverable": false, + }, + "timestamp": "2014-11-12T11:45:26.371Z", + }, + }, { Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/pushover"}, Name: "pushover", @@ -8560,6 +8654,14 @@ func TestServer_DoServiceTest(t *testing.T) { Message: "service is not enabled", }, }, + { + service: "pagerduty2", + options: client.ServiceTestOptions{}, + exp: client.ServiceTestResult{ + Success: false, + Message: "service is not enabled", + }, + }, { service: "pushover", options: client.ServiceTestOptions{}, @@ -9153,6 +9255,64 @@ func TestServer_AlertHandlers(t *testing.T) { return nil }, }, + { + handler: client.TopicHandler{ + Kind: "pagerduty2", + Options: map[string]interface{}{ + "service-key": "service_key", + }, + }, + setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) { + ts := pagerduty2test.NewServer() + ctxt := context.WithValue(nil, "server", ts) + + c.PagerDuty2.Enabled = true + c.PagerDuty2.URL = ts.URL + return ctxt, nil + }, + result: func(ctxt context.Context) error { + ts := ctxt.Value("server").(*pagerduty2test.Server) + kapacitorURL := ctxt.Value("kapacitorURL").(string) + ts.Close() + got := ts.Requests() + exp := []pagerduty2test.Request{{ + URL: "/", + PostData: pagerduty2test.PostData{ + Client: "kapacitor", + ClientURL: kapacitorURL, + EventAction: "trigger", + DedupKey: "id", + Payload: &pagerduty2test.PDCEF{ + Summary: "message", + Source: "unknown", + Severity: "critical", + Class: "testAlertHandlers", + CustomDetails: map[string]interface{}{ + "result": map[string]interface{}{ + "series": []interface{}{ + map[string]interface{}{ + "name": "alert", + "columns": []interface{}{"time", "value"}, + "values": []interface{}{ + []interface{}{"1970-01-01T00:00:00Z", float64(1)}, + }, + }, + }, + }, + }, + Timestamp: "1970-01-01T00:00:00.000000000Z", + }, + RoutingKey: "service_key", + }, + }} + + if !reflect.DeepEqual(exp, got) { + return fmt.Errorf("unexpected pagerduty2 request:\nexp\n%+v\ngot\n%+v\n", exp, got) + } + + return nil + }, + }, { handler: client.TopicHandler{ Kind: "post", @@ -11213,3 +11373,32 @@ func TestLogSessions_HeaderGzip(t *testing.T) { } } + +func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface{}) bool) error { + if len(got) != len(exp) { + return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp)) + } + + if cmpF == nil { + cmpF = func(got, exp interface{}) bool { + if !reflect.DeepEqual(got, exp) { + return false + } + return true + } + } + + for _, e := range exp { + found := false + for _, g := range got { + if cmpF(g, e) { + found = true + break + } + } + if !found { + return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp)) + } + } + return nil +} diff --git a/services/alert/service.go b/services/alert/service.go index dd4df612e..e7a170af4 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -20,6 +20,7 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie2" "github.com/influxdata/kapacitor/services/pagerduty" + "github.com/influxdata/kapacitor/services/pagerduty2" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/sensu" "github.com/influxdata/kapacitor/services/slack" @@ -93,6 +94,9 @@ type Service struct { PagerDutyService interface { Handler(pagerduty.HandlerConfig, ...keyvalue.T) alert.Handler } + PagerDuty2Service interface { + Handler(pagerduty2.HandlerConfig, ...keyvalue.T) alert.Handler + } PushoverService interface { Handler(pushover.HandlerConfig, ...keyvalue.T) alert.Handler } @@ -823,6 +827,14 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) { } h = s.PagerDutyService.Handler(c, ctx...) h = newExternalHandler(h) + case "pagerduty2": + c := pagerduty2.HandlerConfig{} + err = decodeOptions(spec.Options, &c) + if err != nil { + return handler{}, err + } + h = s.PagerDuty2Service.Handler(c, ctx...) + h = newExternalHandler(h) case "pushover": c := pushover.HandlerConfig{} err = decodeOptions(spec.Options, &c) diff --git a/services/diagnostic/handlers.go b/services/diagnostic/handlers.go index 8334318dd..79f5d27b8 100644 --- a/services/diagnostic/handlers.go +++ b/services/diagnostic/handlers.go @@ -26,6 +26,7 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie2" "github.com/influxdata/kapacitor/services/pagerduty" + "github.com/influxdata/kapacitor/services/pagerduty2" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/sensu" "github.com/influxdata/kapacitor/services/sideload" @@ -537,6 +538,23 @@ func (h *PagerDutyHandler) Error(msg string, err error) { h.l.Error(msg, Error(err)) } +// PagerDuty2 handler +type PagerDuty2Handler struct { + l Logger +} + +func (h *PagerDuty2Handler) WithContext(ctx ...keyvalue.T) pagerduty2.Diagnostic { + fields := logFieldsFromContext(ctx) + + return &PagerDuty2Handler{ + l: h.l.With(fields...), + } +} + +func (h *PagerDuty2Handler) Error(msg string, err error) { + h.l.Error(msg, Error(err)) +} + // Slack Handler type SlackHandler struct { diff --git a/services/diagnostic/service.go b/services/diagnostic/service.go index 079c53672..3cb590131 100644 --- a/services/diagnostic/service.go +++ b/services/diagnostic/service.go @@ -202,6 +202,12 @@ func (s *Service) NewPagerDutyHandler() *PagerDutyHandler { } } +func (s *Service) NewPagerDuty2Handler() *PagerDuty2Handler { + return &PagerDuty2Handler{ + l: s.Logger.With(String("service", "pagerduty2")), + } +} + func (s *Service) NewSMTPHandler() *SMTPHandler { return &SMTPHandler{ l: s.Logger.With(String("service", "smtp")), diff --git a/services/pagerduty2/config.go b/services/pagerduty2/config.go new file mode 100644 index 000000000..b1c7d59c0 --- /dev/null +++ b/services/pagerduty2/config.go @@ -0,0 +1,40 @@ +package pagerduty2 + +import ( + "net/url" + + "github.com/pkg/errors" +) + +// DefaultPagerDuty2APIURL is the default URL for the v2 API +const DefaultPagerDuty2APIURL = "https://events.pagerduty.com/v2/enqueue" + +// Config is the default struct for the PagerDuty v2 plugin +type Config struct { + // Whether PagerDuty integration is enabled. + Enabled bool `toml:"enabled" override:"enabled"` + // The PagerDuty API URL, should not need to be changed. + URL string `toml:"url" override:"url"` + // The PagerDuty service key. + ServiceKey string `toml:"service-key" override:"service-key,redact"` + // Whether every alert should automatically go to PagerDuty + Global bool `toml:"global" override:"global"` +} + +// NewConfig returns a new instance of the primary config struct for PagerDuty +func NewConfig() Config { + return Config{ + URL: DefaultPagerDuty2APIURL, + } +} + +// Validate is a bound method that checks/confirms whether the attached configuration is valid +func (c Config) Validate() error { + if c.URL == "" { + return errors.New("url cannot be empty") + } + if _, err := url.Parse(c.URL); err != nil { + return errors.Wrapf(err, "invalid URL %q", c.URL) + } + return nil +} diff --git a/services/pagerduty2/pagerduty2test/pagerduty2test.go b/services/pagerduty2/pagerduty2test/pagerduty2test.go new file mode 100644 index 000000000..15903ad85 --- /dev/null +++ b/services/pagerduty2/pagerduty2test/pagerduty2test.go @@ -0,0 +1,89 @@ +package pagerduty2test + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "sync" +) + +type Server struct { + mu sync.Mutex + ts *httptest.Server + URL string + requests []Request + closed bool +} + +func NewServer() *Server { + s := new(Server) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + pr := Request{ + URL: r.URL.String(), + } + dec := json.NewDecoder(r.Body) + dec.Decode(&pr.PostData) + s.mu.Lock() + s.requests = append(s.requests, pr) + s.mu.Unlock() + + })) + s.ts = ts + s.URL = ts.URL + return s +} + +func (s *Server) Requests() []Request { + s.mu.Lock() + defer s.mu.Unlock() + return s.requests +} + +func (s *Server) Close() { + if s.closed { + return + } + s.closed = true + s.ts.Close() +} + +type Request struct { + URL string + PostData PostData +} + +type PDCEF struct { + Summary string `json:"summary"` + Source string `json:"source"` + Severity string `json:"severity"` + Timestamp string `json:"timestamp"` + Class string `json:"class"` + Component string `json:"component"` + Group string `json:"group"` + CustomDetails map[string]interface{} `json:"custom_details"` +} + +// Image is the struct of elements for an image in the payload +type Image struct { + Src string `json:"src"` + Href string `json:"href"` + Alt string `json:"alt"` +} + +// Link is the struct of elements for a link in the payload +type Link struct { + Href string `json:"href"` + Text string `json:"text"` +} + +// PostData is the default struct to send an element through to PagerDuty +type PostData struct { + RoutingKey string `json:"routing_key"` + EventAction string `json:"event_action"` + DedupKey string `json:"dedup_key"` + Payload *PDCEF `json:"payload"` + Images []Image `json:"images"` + Links []Link `json:"links"` + Client string `json:"client"` + ClientURL string `json:"client_url"` +} diff --git a/services/pagerduty2/service.go b/services/pagerduty2/service.go new file mode 100644 index 000000000..7a60424d6 --- /dev/null +++ b/services/pagerduty2/service.go @@ -0,0 +1,379 @@ +package pagerduty2 + +import ( + "bytes" + "encoding/json" + "errors" + "fmt" + "io" + "io/ioutil" + "net/http" + "sync/atomic" + "time" + + "github.com/influxdata/kapacitor/alert" + "github.com/influxdata/kapacitor/keyvalue" + "github.com/influxdata/kapacitor/models" +) + +// This example shows how to send a trigger event without a dedup_key. +// In this case, PagerDuty will automatically assign a random and unique key +// and return it in the response object. +// +// You should store this key in case you want to send an acknowledge or resolve +// event to this incident in the future. +// +//{ +// "payload": { +// "summary": "Example alert on host1.example.com", +// "timestamp": "2015-07-17T08:42:58.315+0000", +// "source": "monitoringtool:cloudvendor:central-region-dc-01:852559987:cluster/api-stats-prod-003", +// "severity": "info", +// "component": "postgres", +// "group": "prod-datapipe", +// "class": "deploy", +// "custom_details": { +// "ping time": "1500ms", +// "load avg": 0.75 +// } +// }, +// "routing_key": "samplekeyhere", +// "dedup_key": "samplekeyhere", +// "images": [{ +// "src": "https://www.pagerduty.com/wp-content/uploads/2016/05/pagerduty-logo-green.png", +// "href": "https://example.com/", +// "alt": "Example text" +// }], +// "links": [{ +// "href": "https://example.com/", +// "text": "Link text" +// }], +// "event_action": "trigger", +// "client": "Sample Monitoring Service", +// "client_url": "https://monitoring.example.com" +//} + +// PDCEF is the PagerDuty - Common Event Format (PD-CEF) as outlined in the v2 API +// https://v2.developer.pagerduty.com/docs/events-api-v2 +// https://support.pagerduty.com/docs/pd-cef +// +// API entry point is now https://events.pagerduty.com/v2/enqueue +type PDCEF struct { + Summary string `json:"summary"` + Source string `json:"source"` + Severity string `json:"severity"` + Timestamp string `json:"timestamp"` + Class string `json:"class"` + Component string `json:"component"` + Group string `json:"group"` + CustomDetails map[string]interface{} `json:"custom_details"` +} + +// Image is the struct of elements for an image in the payload +type Image struct { + Src string `json:"src"` + Href string `json:"href"` + Alt string `json:"alt"` +} + +// Link is the struct of elements for a link in the payload +type Link struct { + Href string `json:"href"` + Text string `json:"text"` +} + +// AlertPayload is the default struct to send an element through to PagerDuty +type AlertPayload struct { + RoutingKey string `json:"routing_key"` + EventAction string `json:"event_action"` + DedupKey string `json:"dedup_key"` + Payload *PDCEF `json:"payload"` + Images []Image `json:"images"` + Links []Link `json:"links"` + Client string `json:"client"` + ClientURL string `json:"client_url"` +} + +// Diagnostic defines the interface of a diagnostic event +type Diagnostic interface { + WithContext(ctx ...keyvalue.T) Diagnostic + Error(msg string, err error) +} + +// Service is the default struct for the HTTP service +type Service struct { + configValue atomic.Value + + HTTPDService interface { + URL() string + } + diag Diagnostic +} + +// NewService returns a newly instantiated Service +func NewService(c Config, d Diagnostic) *Service { + s := &Service{ + diag: d, + } + s.configValue.Store(c) + return s +} + +// Open is a bound method of the Service struct +func (s *Service) Open() error { + return nil +} + +// Close is a bound method of the Service struct +func (s *Service) Close() error { + return nil +} + +func (s *Service) config() Config { + return s.configValue.Load().(Config) +} + +// Update is a bound method of the Service struct, handles updates to the existing service +func (s *Service) Update(newConfig []interface{}) error { + if l := len(newConfig); l != 1 { + return fmt.Errorf("expected only one new config object, got %d", l) + } + + c, ok := newConfig[0].(Config) + if !ok { + return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0]) + } + + s.configValue.Store(c) + return nil +} + +// Global is a bound method of the Service struct, returns whether the Service configuration is global +func (s *Service) Global() bool { + c := s.config() + return c.Global +} + +type testOptions struct { + IncidentKey string `json:"incident-key"` + Description string `json:"description"` + // Details string `json:"details"` + Level alert.Level `json:"level"` + Data alert.EventData `json:"event_data"` + Timestamp time.Time `json:"timestamp"` +} + +// TestOptions returns optional values for the test harness +func (s *Service) TestOptions() interface{} { + layout := "2006-01-02T15:04:05.000Z" + str := "2014-11-12T11:45:26.371Z" + t, _ := time.Parse(layout, str) + + return &testOptions{ + IncidentKey: "testIncidentKey", + Description: "test pagerduty2 message", + Level: alert.Critical, + Timestamp: t, + // Details: html.EscapeString(`{"Test": "test_value"}`), + Data: alert.EventData{ + Name: "testPagerDuty2", + Tags: make(map[string]string), + Fields: make(map[string]interface{}), + Result: models.Result{}, + }, + } +} + +// Test is a bound method of the Service struct that handles testing the Alert function +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + c := s.config() + return s.Alert( + c.ServiceKey, + o.IncidentKey, + o.Description, + o.Level, + o.Timestamp, + o.Data, + ) +} + +// Alert is a bound method of the Service struct that processes a given alert to PagerDuty +// +// The req headers are now required with the API v2: +// https://v2.developer.pagerduty.com/docs/migrating-to-api-v2 +func (s *Service) Alert(serviceKey, incidentKey, desc string, level alert.Level, timestamp time.Time, data alert.EventData) error { + url, post, err := s.preparePost(serviceKey, incidentKey, desc, level, timestamp, data) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", url, post) + if err != nil { + return err + } + + req.Header.Add("Content-Type", "application/json") + req.Header.Add("Accept", "application/vnd.pagerduty+json;version=2") + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + if resp.StatusCode != http.StatusAccepted && resp.StatusCode != http.StatusOK { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + fmt.Printf("error parsing error body\n") + return err + } + type response struct { + Message string `json:"message"` + } + r := &response{Message: fmt.Sprintf("failed to understand PagerDuty2 response. code: %d content: %s", resp.StatusCode, string(body))} + b := bytes.NewReader(body) + dec := json.NewDecoder(b) + dec.Decode(r) + return errors.New(r.Message) + } + return nil +} + +func (s *Service) sendResolve(c Config, serviceKey, incidentKey string) (string, io.Reader, error) { + // create a new AlertPayload for us to fire off + type Resolve struct { + RoutingKey string `json:"routing_key"` + DedupKey string `json:"dedup_key"` + EventAction string `json:"event_action"` + } + + ap := Resolve{} + + if serviceKey == "" { + ap.RoutingKey = c.ServiceKey + } else { + ap.RoutingKey = serviceKey + } + + ap.DedupKey = incidentKey + ap.EventAction = "resolve" + + var post bytes.Buffer + enc := json.NewEncoder(&post) + err := enc.Encode(ap) + if err != nil { + return "", nil, err + } + + return c.URL, &post, nil +} + +// preparePost is a helper method that sets up the payload for transmission to PagerDuty +func (s *Service) preparePost(serviceKey, incidentKey, desc string, level alert.Level, timestamp time.Time, data alert.EventData) (string, io.Reader, error) { + c := s.config() + if !c.Enabled { + return "", nil, errors.New("service is not enabled") + } + + var severity string + eventType := "trigger" + + switch level { + case alert.Warning: + severity = "warning" + case alert.Critical: + severity = "critical" + case alert.Info: + severity = "info" + default: + // default is a 'resolve' function + return s.sendResolve(c, serviceKey, incidentKey) + } + + // create a new AlertPayload for us to fire off + ap := &AlertPayload{ + Payload: &PDCEF{}, + } + + if serviceKey == "" { + ap.RoutingKey = c.ServiceKey + } else { + ap.RoutingKey = serviceKey + } + + ap.Client = "kapacitor" + ap.ClientURL = s.HTTPDService.URL() + ap.DedupKey = incidentKey + ap.EventAction = eventType + + ap.Payload.CustomDetails = make(map[string]interface{}) + ap.Payload.CustomDetails["result"] = data.Result + + // The API doesn't explicitly mention a requirement for nanosecond resolution but payloads seem to + // fail if we don't include it (even zeroes). This hack is not graceful, but adds a negligible + // nanosecond resolution to our timestamp + m := timestamp.Format("2006-01-02T15:04:05.000000000Z07:00") + // m := timestamp.Format(time.RFC3339Nano) + // if timestamp.Nanosecond() == 0 { + // m = time.Unix(timestamp.Unix(), 1).In(timestamp.Location()).Format(time.RFC3339Nano) + // } + + ap.Payload.Class = data.TaskName + ap.Payload.Severity = severity + ap.Payload.Source = "unknown" + ap.Payload.Summary = desc + ap.Payload.Timestamp = m + + if _, ok := data.Tags["host"]; ok { + ap.Payload.Source = data.Tags["host"] + } + + // Post data to PagerDuty + var post bytes.Buffer + enc := json.NewEncoder(&post) + err := enc.Encode(ap) + if err != nil { + return "", nil, err + } + + return c.URL, &post, nil +} + +// HandlerConfig defines the high-level struct required to connect to PagerDuty +type HandlerConfig struct { + // The service key to use for the alert. + // Defaults to the value in the configuration if empty. + ServiceKey string `mapstructure:"service-key"` +} + +type handler struct { + s *Service + c HandlerConfig + diag Diagnostic +} + +// Handler is a bound method to the Service struct that returns the appropriate alert handler for PagerDuty +func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) alert.Handler { + return &handler{ + s: s, + c: c, + diag: s.diag.WithContext(ctx...), + } +} + +// Handle is a bound method to the handler that processes a given alert +func (h *handler) Handle(event alert.Event) { + if err := h.s.Alert( + h.c.ServiceKey, + event.State.ID, + event.State.Message, + event.State.Level, + event.State.Time, + event.Data, + ); err != nil { + h.diag.Error("failed to send event to PagerDuty", err) + } +} diff --git a/task_master.go b/task_master.go index 7ae052d7c..160398814 100644 --- a/task_master.go +++ b/task_master.go @@ -28,6 +28,7 @@ import ( "github.com/influxdata/kapacitor/services/opsgenie" "github.com/influxdata/kapacitor/services/opsgenie2" "github.com/influxdata/kapacitor/services/pagerduty" + "github.com/influxdata/kapacitor/services/pagerduty2" "github.com/influxdata/kapacitor/services/pushover" "github.com/influxdata/kapacitor/services/sensu" "github.com/influxdata/kapacitor/services/sideload" @@ -135,6 +136,10 @@ type TaskMaster struct { Global() bool Handler(pagerduty.HandlerConfig, ...keyvalue.T) alert.Handler } + PagerDuty2Service interface { + Global() bool + Handler(pagerduty2.HandlerConfig, ...keyvalue.T) alert.Handler + } PushoverService interface { Handler(pushover.HandlerConfig, ...keyvalue.T) alert.Handler } @@ -266,6 +271,7 @@ func (tm *TaskMaster) New(id string) *TaskMaster { n.OpsGenie2Service = tm.OpsGenie2Service n.VictorOpsService = tm.VictorOpsService n.PagerDutyService = tm.PagerDutyService + n.PagerDuty2Service = tm.PagerDuty2Service n.PushoverService = tm.PushoverService n.HTTPPostService = tm.HTTPPostService n.SlackService = tm.SlackService