Skip to content

Commit

Permalink
Merge pull request #149 from influxdata/nc-issue#137
Browse files Browse the repository at this point in the history
Add deadman's switch
  • Loading branch information
Nathaniel Cook committed Jan 20, 2016
2 parents 4ee42f4 + a955283 commit af8e166
Show file tree
Hide file tree
Showing 32 changed files with 1,073 additions and 701 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ See [udf/agent/README.md](https://github.com/influxdata/kapacitor/blob/master/ud
With the addition of UDFs it is now possible to run custom anomaly detection alogrithms suited to your needs.
There are simple examples of how to use UDFs in [udf/agent/examples](https://github.com/influxdata/kapacitor/tree/master/udf/agent/examples/).


The version has jumped significantly so that it is inline with other projects in the TICK stack.
This way you can easily tell which versions of Telegraf, InfluxDB, Chronograf and Kapacitor work together.


### Features
- [#137](https://github.com/influxdata/kapacitor/issues/137): Add deadman's switch. Can be setup via TICKscript and globally via configuration.
- [#72](https://github.com/influxdata/kapacitor/issues/72): Add support for User Defined Functions (UDFs).
- [#138](https://github.com/influxdata/kapacitor/issues/138): Change over to influxdata github org.
- [#139](https://github.com/influxdata/kapacitor/issues/139): Alerta.io support thanks! @md14454
Expand Down
17 changes: 11 additions & 6 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,9 @@ type idInfo struct {
// Measurement name
Name string

// Task name
TaskName string

// Concatenation of all group-by tags of the form [key=value,]+.
// If not groupBy is performed equal to literal 'nil'
Group string
Expand All @@ -418,9 +421,10 @@ func (a *AlertNode) renderID(name string, group models.GroupID, tags models.Tags
g = "nil"
}
info := idInfo{
Name: name,
Group: g,
Tags: tags,
Name: name,
TaskName: a.et.Task.Name,
Group: g,
Tags: tags,
}
var id bytes.Buffer
err := a.idTmpl.Execute(&id, info)
Expand All @@ -437,9 +441,10 @@ func (a *AlertNode) renderMessage(id, name string, group models.GroupID, tags mo
}
info := messageInfo{
idInfo: idInfo{
Name: name,
Group: g,
Tags: tags,
Name: name,
TaskName: a.et.Task.Name,
Group: g,
Tags: tags,
},
ID: id,
Fields: fields,
Expand Down
5 changes: 5 additions & 0 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -616,6 +616,11 @@ func reloadUsage() {
}

func doReload(args []string) error {
if len(args) < 1 {
fmt.Fprintln(os.Stderr, "Must pass at least one task name")
reloadUsage()
os.Exit(2)
}
err := doDisable(args)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions cmd/kapacitord/run/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/deadman"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/influxdb"
Expand Down Expand Up @@ -54,6 +55,7 @@ type Config struct {
Reporting reporting.Config `toml:"reporting"`
Stats stats.Config `toml:"stats"`
UDF udf.Config `toml:"udf"`
Deadman deadman.Config `toml:"deadman"`

Hostname string `toml:"hostname"`
DataDir string `toml:"data_dir"`
Expand All @@ -64,6 +66,7 @@ func NewConfig() *Config {
c := &Config{
Hostname: "localhost",
}

c.HTTP = httpd.NewConfig()
c.Replay = replay.NewConfig()
c.Task = task_store.NewConfig()
Expand All @@ -82,6 +85,7 @@ func NewConfig() *Config {
c.Reporting = reporting.NewConfig()
c.Stats = stats.NewConfig()
c.UDF = udf.NewConfig()
c.Deadman = deadman.NewConfig()

return c
}
Expand Down
10 changes: 10 additions & 0 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/influxdata/kapacitor"
"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/deadman"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httpd"
"github.com/influxdata/kapacitor/services/influxdb"
Expand Down Expand Up @@ -130,6 +131,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*

// Append Kapacitor services.
s.appendUDFService(c.UDF)
s.appendDeadmanService(c.Deadman)
s.appendSMTPService(c.SMTP)
s.appendHTTPDService(c.HTTP)
s.appendInfluxDBService(c.InfluxDB, c.Hostname)
Expand Down Expand Up @@ -223,6 +225,14 @@ func (s *Server) appendReplayStoreService(c replay.Config) {
s.Services = append(s.Services, srv)
}

func (s *Server) appendDeadmanService(c deadman.Config) {
l := s.LogService.NewLogger("[deadman] ", log.LstdFlags)
srv := deadman.NewService(c, l)

s.TaskMaster.DeadmanService = srv
s.Services = append(s.Services, srv)
}

func (s *Server) appendUDFService(c udf.Config) {
l := s.LogService.NewLogger("[udf] ", log.LstdFlags)
srv := udf.NewService(c, l)
Expand Down
24 changes: 3 additions & 21 deletions cmd/kapacitord/run/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"net/http"
"net/url"
"os"
"os/exec"
"path"
"path/filepath"
"reflect"
Expand Down Expand Up @@ -705,33 +704,16 @@ func TestServer_UDFAgents(t *testing.T) {
t.Fatal(err)
}

tmpDir, err := ioutil.TempDir("", "testStreamTaskRecording")
if err != nil {
t.Fatal(err)
}
defer os.RemoveAll(tmpDir)
agents := []struct {
buildFunc func() error
config udf.FunctionConfig
}{
// Go
{
buildFunc: func() error {
cmd := exec.Command(
"go",
"build",
"-o",
filepath.Join(tmpDir, "go-moving_avg"),
filepath.Join(udfDir, "agent/examples/moving_avg.go"),
)
out, err := cmd.CombinedOutput()
if err != nil {
return fmt.Errorf("go build failed: %v: %s", err, string(out))
}
return nil
},
buildFunc: func() error { return nil },
config: udf.FunctionConfig{
Prog: filepath.Join(tmpDir, "go-moving_avg"),
Prog: "go",
Args: []string{"run", filepath.Join(udfDir, "agent/examples/moving_avg.go")},
Timeout: toml.Duration(time.Minute),
},
},
Expand Down
17 changes: 15 additions & 2 deletions edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"expvar"
"fmt"
"log"
"strconv"

"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
Expand Down Expand Up @@ -61,8 +62,20 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer
return e
}

func (e *Edge) collectedCount() string {
return e.statMap.Get(statCollected).String()
func (e *Edge) emittedCount() int64 {
c, err := strconv.ParseUint(e.statMap.Get(statEmitted).String(), 10, 64)
if err != nil {
panic("emitted count is not an int")
}
return int64(c)
}

func (e *Edge) collectedCount() int64 {
c, err := strconv.ParseUint(e.statMap.Get(statCollected).String(), 10, 64)
if err != nil {
panic("collected count is not an int")
}
return int64(c)
}

// Close the edge, this can only be called after all
Expand Down
15 changes: 15 additions & 0 deletions etc/kapacitor/kapacitor.conf
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,21 @@ data_dir = "/var/lib/kapacitor"
# How often to snapshot running task state.
snapshot-interval = "60s"

[deadman]
# Configure a deadman's switch
# Globally configure deadman's switches on all stream tasks.
# NOTE: for this to be of use you must also globally configure at least one alerting method.
global = false
# Threshold, if globally configured the alert will be triggered if the throughput in points/interval is <= threshold.
threshold = 0.0
# Interval, if globally configured the frequency at which to check the throughput.
interval = "10s"
# Id -- the alert Id, NODE_NAME will be replaced with the name of the node being monitored.
id = "node 'NODE_NAME' in task '{{ .TaskName }}'"
# The message of the alert. INTERVAL will be replaced by the interval.
message = "{{ .ID }} is dead: {{ index .Fields \"collected\" }} points/INTERVAL"


[influxdb]
# Connect to an InfluxDB cluster
# Kapacitor can subscribe, query and write to this cluster.
Expand Down
Loading

0 comments on commit af8e166

Please sign in to comment.