Skip to content

Commit

Permalink
rework changes for the new alert system
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanielc committed Jan 9, 2017
1 parent 678e66c commit f28aa6e
Show file tree
Hide file tree
Showing 12 changed files with 685 additions and 639 deletions.
387 changes: 20 additions & 367 deletions alert.go

Large diffs are not rendered by default.

11 changes: 11 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,17 @@ default-retention-policy = ""
# meaning alerts will only be sent if the alert state changes.
state-changes-only = false

[snmp]
# Configure an SNMP trap server
enabled = false
# The host:port address of the SNMP trap server
addr = "localhost:162"
# The community to use for traps
community = "kapacitor"
# Number of retries when sending traps
retries = 1


[opsgenie]
# Configure OpsGenie with your API key and default routing key.
enabled = false
Expand Down
267 changes: 134 additions & 133 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"text/template"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/influxdata/influxdb/client"
"github.com/influxdata/influxdb/influxql"
imodels "github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -46,6 +47,7 @@ import (
"github.com/influxdata/kapacitor/services/smtp"
"github.com/influxdata/kapacitor/services/smtp/smtptest"
"github.com/influxdata/kapacitor/services/snmptrap"
"github.com/influxdata/kapacitor/services/snmptrap/snmptraptest"
"github.com/influxdata/kapacitor/services/storage/storagetest"
"github.com/influxdata/kapacitor/services/talk"
"github.com/influxdata/kapacitor/services/talk/talktest"
Expand Down Expand Up @@ -7348,60 +7350,13 @@ stream
for i := range cmds {
cmdsI[i] = cmds[i]
}
if err := compareListIgnoreOrder(cmdsI, expCmds, func(got, exp interface{}) error {
if err := compareListIgnoreOrder(cmdsI, expCmds, func(got, exp interface{}) bool {
g := got.(*commandtest.Command)
e := exp.(*commandtest.Command)
return e.Compare(g)
return e.Compare(g) == nil
}); err != nil {
t.Error(err)
}

//for i := 0; i < 2; i++ {
// select {
// case cmd := <-cmdC:
// cmd.Lock()
// defer cmd.Unlock()

// var err error
// for j, info := range expInfo {
// if got, exp := cmd.Info, info; reflect.DeepEqual(got, exp) {
// // Found match remove it
// if j == 0 {
// expInfo = expInfo[1:]
// } else {
// expInfo = expInfo[:1]
// }
// err = nil
// break
// } else {
// err = fmt.Errorf("%d unexpected command info:\ngot\n%+v\nexp\n%+v\n", i, got, exp)
// }
// }
// if err != nil {
// t.Error(err)
// }

// if !cmd.Started {
// t.Errorf("%d expected command to have been started", i)
// }
// if !cmd.Waited {
// t.Errorf("%d expected command to have waited", i)
// }
// if cmd.Killed {
// t.Errorf("%d expected command not to have been killed", i)
// }

// ad := alertservice.AlertData{}
// if err := json.Unmarshal(cmd.StdinData, &ad); err != nil {
// t.Fatal(err)
// }
// if got, exp := ad, expAD; !reflect.DeepEqual(got, exp) {
// t.Errorf("%d unexpected alert data sent to command:\ngot\n%+v\nexp\n%+v\n%s", i, got, exp, string(cmd.StdinData))
// }
// default:
// t.Error("expected command to be created")
// }
//}
}

func TestStream_AlertEmail(t *testing.T) {
Expand Down Expand Up @@ -7508,6 +7463,129 @@ Value: 10
}
}

func TestStream_AlertSNMPTrap(t *testing.T) {

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.snmpTrap('1.1.1')
.data('1.1.1.2', 'c', '1')
.data('1.1.1.2', 's', 'SNMP ALERT')
.data('1.1.1.2', 's', '{{.Message}}')
.snmpTrap('1.1.2')
.data('1.1.2.3', 'i', '10')
.data('1.1.2.3', 'n', '')
.data('1.1.2.3', 't', '20000')
`

expTraps := []interface{}{
snmptraptest.Trap{
Pdu: snmptraptest.Pdu{
Type: snmpgo.SNMPTrapV2,
ErrorStatus: snmpgo.NoError,
VarBinds: snmptraptest.VarBinds{
{
Oid: "1.3.6.1.6.3.1.1.4.1.0",
Value: "1.1.1",
Type: "Oid",
},
{
Oid: "1.1.1.2",
Value: "1",
Type: "Counter64",
},
{
Oid: "1.1.1.2",
Value: "SNMP ALERT",
Type: "OctetString",
},
{
Oid: "1.1.1.2",
Value: "kapacitor/cpu/serverA is CRITICAL",
Type: "OctetString",
},
},
},
},
snmptraptest.Trap{
Pdu: snmptraptest.Pdu{
Type: snmpgo.SNMPTrapV2,
ErrorStatus: snmpgo.NoError,
VarBinds: snmptraptest.VarBinds{
{
Oid: "1.3.6.1.6.3.1.1.4.1.0",
Value: "1.1.2",
Type: "Oid",
},
{
Oid: "1.1.2.3",
Value: "10",
Type: "Integer",
},
{
Oid: "1.1.2.3",
Value: "",
Type: "Null",
},
{
Oid: "1.1.2.3",
Value: "20000",
Type: "TimeTicks",
},
},
},
},
}

snmpServer, err := snmptraptest.NewServer()
if err != nil {
t.Fatal(err)
}
defer snmpServer.Close()

c := snmptrap.NewConfig()
c.Enabled = true
c.Addr = snmpServer.Addr
c.Community = snmpServer.Community
c.Retries = 2
st := snmptrap.NewService(c, logService.NewLogger("[test_snmptrap] ", log.LstdFlags))
if err := st.Open(); err != nil {
t.Fatal(err)
}
defer st.Close()

tmInit := func(tm *kapacitor.TaskMaster) {
tm.SNMPTrapService = st
}

testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit)

// TODO make snmpServer Close gauruntee that all traps have been processed
time.Sleep(10 * time.Millisecond)
snmpServer.Close()

traps := snmpServer.Traps()
got := make([]interface{}, len(traps))
for i, t := range traps {
got[i] = t
}
if err := compareListIgnoreOrder(got, expTraps, nil); err != nil {
t.Error(err)
}
}

func TestStream_AlertSigma(t *testing.T) {
requestCount := int32(0)
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
Expand Down Expand Up @@ -8443,108 +8521,31 @@ func testStreamerWithOutput(
}
}

func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface{}) error) error {
func compareListIgnoreOrder(got, exp []interface{}, cmpF func(got, exp interface{}) bool) error {
if len(got) != len(exp) {
return fmt.Errorf("unexpected count got %d exp %d", len(got), len(exp))
return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp))
}

if cmpF == nil {
cmpF = func(got, exp interface{}) error {
cmpF = func(got, exp interface{}) bool {
if !reflect.DeepEqual(got, exp) {
return fmt.Errorf("\ngot\n%+v\nexp\n%+v\n", got, exp)
return false
}
return nil
return true
}
}

for _, e := range exp {
found := false
var err error
for _, g := range got {
if err = cmpF(g, e); err == nil {
if cmpF(g, e) {
found = true
break
}
}
if !found {
return err
return fmt.Errorf("unequal lists ignoring order:\ngot\n%s\nexp\n%s\n", spew.Sdump(got), spew.Sdump(exp))
}
}
return nil
}

// SNMPTrap

type TrapListener struct {
data string
}

func (l *TrapListener) OnTRAP(trap *snmpgo.TrapRequest) {
l.data = trap.Pdu.String()
}

func NewTrapListener(data *string) *TrapListener {
return &TrapListener{*data}
}

func TestStream_AlertSnmpTrap(t *testing.T) {
trapData := ""
server, err := snmpgo.NewTrapServer(snmpgo.ServerArguments{
LocalAddr: "127.0.0.1:9162",
})
if err != nil {
log.Fatal(err)
}
// V2c
err = server.AddSecurity(&snmpgo.SecurityEntry{
Version: snmpgo.V2c,
Community: "public",
})
if err != nil {
log.Fatal(err)
}
go server.Serve(NewTrapListener(&trapData))

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.snmptrap('1.1.1')
.data('1.1.1.2', 's', 'SNMP ALERT')
`

clock, et, replayErr, tm := testStreamer(t, "TestStream_Alert", script, nil)
defer tm.Close()

c := snmptrap.NewConfig()
c.Enabled = true
c.TargetIp = "127.0.0.1"
c.TargetPort = 9162
c.Community = "public"
c.Version = "2c"
st := snmptrap.NewService(c, logService.NewLogger("[test_snmptrap] ", log.LstdFlags))
tm.SnmpTrapService = st

err = fastForwardTask(clock, et, replayErr, tm, 13*time.Second)
if err != nil {
t.Error(err)
}

// Stop server
server.Close()

if trapData == "" {
t.Errorf("No SNMP Trap data received")
}
}
Loading

0 comments on commit f28aa6e

Please sign in to comment.