diff --git a/CHANGELOG.md b/CHANGELOG.md index 4cd0c6915..d5c1a89eb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,6 +16,12 @@ - [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system. - [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost - [#1535](https://github.com/influxdata/kapacitor/pull/1535): Add logfmt support and refactor logging. +- [#1481](https://github.com/influxdata/kapacitor/pull/1481): Add ability to load tasks/handlers from dir. + TICKscript was extended to be able to describe a task exclusively through a tickscript. + * tasks no longer need to specify their TaskType (Batch, Stream). + * `dbrp` expressions were added to tickscript. + Topic-Handler file format was modified to include the TopicID and HandlerID in the file. + Load service was added; the service can load tasks/handlers from a directory. ### Bugfixes diff --git a/client/v1/client.go b/client/v1/client.go index 24ee6b4ce..0508bfcdb 100644 --- a/client/v1/client.go +++ b/client/v1/client.go @@ -9,6 +9,7 @@ import ( "io" "io/ioutil" "net/http" + "net/http/httptest" "net/url" "path" "strconv" @@ -73,6 +74,10 @@ type Config struct { // Optional credentials for authenticating with the server. Credentials *Credentials + + // Optional Transport https://golang.org/pkg/net/http/#RoundTripper + // If nil the default transport will be used + Transport http.RoundTripper } // AuthenticationMethod defines the type of authentication used. @@ -118,6 +123,23 @@ func (c Credentials) Validate() error { return nil } +type localTransport struct { + h http.Handler +} + +func (l *localTransport) RoundTrip(r *http.Request) (*http.Response, error) { + w := httptest.NewRecorder() + l.h.ServeHTTP(w, r) + + return w.Result(), nil +} + +func NewLocalTransport(h http.Handler) http.RoundTripper { + return &localTransport{ + h: h, + } +} + // Basic HTTP client type Client struct { url *url.URL @@ -148,21 +170,28 @@ func New(conf Config) (*Client, error) { } } - tr := &http.Transport{ - Proxy: http.ProxyFromEnvironment, - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: conf.InsecureSkipVerify, - }, - } - if conf.TLSConfig != nil { - tr.TLSClientConfig = conf.TLSConfig + rt := conf.Transport + var tr *http.Transport + + if rt == nil { + tr = &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: conf.InsecureSkipVerify, + }, + } + if conf.TLSConfig != nil { + tr.TLSClientConfig = conf.TLSConfig + } + + rt = tr } return &Client{ url: u, userAgent: conf.UserAgent, httpClient: &http.Client{ Timeout: conf.Timeout, - Transport: tr, + Transport: rt, }, credentials: conf.Credentials, }, nil @@ -203,8 +232,9 @@ type ExecutionStats struct { type TaskType int const ( - StreamTask TaskType = 1 - BatchTask TaskType = 2 + InvalidTask TaskType = 0 + StreamTask TaskType = 1 + BatchTask TaskType = 2 ) func (tt TaskType) MarshalText() ([]byte, error) { @@ -213,6 +243,8 @@ func (tt TaskType) MarshalText() ([]byte, error) { return []byte("stream"), nil case BatchTask: return []byte("batch"), nil + case InvalidTask: + return []byte("invalid"), nil default: return nil, fmt.Errorf("unknown TaskType %d", tt) } @@ -224,6 +256,8 @@ func (tt *TaskType) UnmarshalText(text []byte) error { *tt = StreamTask case "batch": *tt = BatchTask + case "invalid": + *tt = InvalidTask default: return fmt.Errorf("unknown TaskType %s", s) } @@ -513,9 +547,9 @@ func (vs *Vars) UnmarshalJSON(b []byte) error { } type Var struct { - Type VarType `json:"type"` - Value interface{} `json:"value"` - Description string `json:"description"` + Type VarType `json:"type" yaml:"type"` + Value interface{} `json:"value" yaml:"value"` + Description string `json:"description" yaml:"description"` } // A Task plus its read-only attributes. @@ -725,13 +759,13 @@ func (c *Client) StorageLink(name string) Link { } type CreateTaskOptions struct { - ID string `json:"id,omitempty"` - TemplateID string `json:"template-id,omitempty"` + ID string `json:"id,omitempty" yaml:"id"` + TemplateID string `json:"template-id,omitempty" yaml:"template-id"` Type TaskType `json:"type,omitempty"` - DBRPs []DBRP `json:"dbrps,omitempty"` + DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"` TICKscript string `json:"script,omitempty"` Status TaskStatus `json:"status,omitempty"` - Vars Vars `json:"vars,omitempty"` + Vars Vars `json:"vars,omitempty" yaml:"vars"` } // Create a new task. @@ -759,13 +793,13 @@ func (c *Client) CreateTask(opt CreateTaskOptions) (Task, error) { } type UpdateTaskOptions struct { - ID string `json:"id,omitempty"` - TemplateID string `json:"template-id,omitempty"` + ID string `json:"id,omitempty" yaml:"id"` + TemplateID string `json:"template-id,omitempty" yaml:"template-id"` Type TaskType `json:"type,omitempty"` - DBRPs []DBRP `json:"dbrps,omitempty"` + DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"` TICKscript string `json:"script,omitempty"` Status TaskStatus `json:"status,omitempty"` - Vars Vars `json:"vars,omitempty"` + Vars Vars `json:"vars,omitempty" yaml:"vars"` } // Update an existing task. @@ -1963,6 +1997,7 @@ func (c *Client) TopicHandler(link Link) (TopicHandler, error) { } type TopicHandlerOptions struct { + Topic string `json:"topic" yaml:"topic"` ID string `json:"id" yaml:"id"` Kind string `json:"kind" yaml:"kind"` Options map[string]interface{} `json:"options" yaml:"options"` @@ -2282,3 +2317,37 @@ func (d *Duration) UnmarshalText(data []byte) error { *d = Duration(dur) return nil } + +type DBRPs []DBRP + +func (d *DBRPs) String() string { + return fmt.Sprint(*d) +} + +type TaskVars struct { + ID string `json:"id,omitempty" yaml:"id"` + TemplateID string `json:"template-id,omitempty" yaml:"template-id"` + DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"` + Vars Vars `json:"vars,omitempty" yaml:"vars"` +} + +func (t TaskVars) CreateTaskOptions() (CreateTaskOptions, error) { + o := CreateTaskOptions{ + ID: t.ID, + TemplateID: t.TemplateID, + Vars: t.Vars, + DBRPs: t.DBRPs, + } + + return o, nil +} + +func (t TaskVars) UpdateTaskOptions() (UpdateTaskOptions, error) { + o := UpdateTaskOptions{ + ID: t.ID, + TemplateID: t.TemplateID, + Vars: t.Vars, + DBRPs: t.DBRPs, + } + return o, nil +} diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index b4ab9ddb1..4d89438f0 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -542,6 +542,7 @@ var ( dtype = defineFlags.String("type", "", "The task type (stream|batch)") dtemplate = defineFlags.String("template", "", "Optional template ID") dvars = defineFlags.String("vars", "", "Optional path to a JSON vars file") + dfile = defineFlags.String("file", "", "Optional path to a YAML or JSON template task file") dnoReload = defineFlags.Bool("no-reload", false, "Do not reload the task even if it is enabled") ddbrp = make(dbrps, 0) ) @@ -687,33 +688,89 @@ func doDefine(args []string) error { } } + fileVars := client.TaskVars{} + if *dfile != "" { + f, err := os.Open(*dfile) + if err != nil { + return errors.Wrapf(err, "failed to open file %s", *dfile) + } + data, err := ioutil.ReadAll(f) + if err != nil { + return errors.Wrapf(err, "failed to read task vars file %q", *dfile) + } + defer f.Close() + switch ext := path.Ext(*dfile); ext { + case ".yaml", ".yml": + if err := yaml.Unmarshal(data, &fileVars); err != nil { + return errors.Wrapf(err, "failed to unmarshal yaml task vars file %q", *dfile) + } + case ".json": + if err := json.Unmarshal(data, &fileVars); err != nil { + return errors.Wrapf(err, "failed to unmarshal json task vars file %q", *dfile) + } + default: + return errors.New("bad file extension. Must be YAML or JSON") + + } + } + l := cli.TaskLink(id) task, _ := cli.Task(l, nil) var err error if task.ID == "" { - _, err = cli.CreateTask(client.CreateTaskOptions{ - ID: id, - TemplateID: *dtemplate, - Type: ttype, - DBRPs: ddbrp, - TICKscript: script, - Vars: vars, - Status: client.Disabled, - }) + if *dfile != "" { + o, err := fileVars.CreateTaskOptions() + if err != nil { + return err + } + _, err = cli.CreateTask(o) + if err != nil { + return err + } + } else { + o := client.CreateTaskOptions{ + ID: id, + TemplateID: *dtemplate, + Type: ttype, + DBRPs: ddbrp, + TICKscript: script, + Vars: vars, + Status: client.Disabled, + } + _, err = cli.CreateTask(o) + if err != nil { + return err + } + } } else { - _, err = cli.UpdateTask( - l, - client.UpdateTaskOptions{ + if *dfile != "" { + o, err := fileVars.UpdateTaskOptions() + if err != nil { + return err + } + _, err = cli.UpdateTask( + l, + o, + ) + if err != nil { + return err + } + } else { + o := client.UpdateTaskOptions{ TemplateID: *dtemplate, Type: ttype, DBRPs: ddbrp, TICKscript: script, Vars: vars, - }, - ) - } - if err != nil { - return err + } + _, err = cli.UpdateTask( + l, + o, + ) + if err != nil { + return err + } + } } if !*dnoReload && task.Status == client.Enabled { @@ -822,7 +879,7 @@ func doDefineTemplate(args []string) error { } func defineTopicHandlerUsage() { - var u = `Usage: kapacitor define-topic-handler + var u = `Usage: kapacitor define-topic-handler Create or update a handler. @@ -832,7 +889,7 @@ For example: Define a handler using the slack.yaml file: - $ kapacitor define-handler system my_handler slack.yaml + $ kapacitor define-topic-handler slack.yaml Options: ` @@ -840,14 +897,12 @@ Options: } func doDefineTopicHandler(args []string) error { - if len(args) != 3 { - fmt.Fprintln(os.Stderr, "Must provide a topic ID, a handler ID and a path to a handler file.") + if len(args) != 1 { + fmt.Fprintln(os.Stderr, "Must provide a path to a handler file.") defineTopicHandlerUsage() os.Exit(2) } - topic := args[0] - handlerID := args[1] - p := args[2] + p := args[0] f, err := os.Open(p) if err != nil { return errors.Wrapf(err, "failed to open handler spec file %q", p) @@ -870,12 +925,11 @@ func doDefineTopicHandler(args []string) error { return errors.Wrapf(err, "failed to unmarshal json handler file %q", p) } } - ho.ID = handlerID - l := cli.TopicHandlerLink(topic, ho.ID) + l := cli.TopicHandlerLink(ho.Topic, ho.ID) handler, _ := cli.TopicHandler(l) if handler.ID == "" { - _, err = cli.CreateTopicHandler(cli.TopicHandlersLink(topic), ho) + _, err = cli.CreateTopicHandler(cli.TopicHandlersLink(ho.Topic), ho) } else { _, err = cli.ReplaceTopicHandler(l, ho) } diff --git a/cmd/kapacitord/main.go b/cmd/kapacitord/main.go index 8a25b82ad..9fb41f813 100644 --- a/cmd/kapacitord/main.go +++ b/cmd/kapacitord/main.go @@ -89,16 +89,30 @@ func (m *Main) Run(args ...string) error { } signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM) + signal.Notify(signalCh, os.Interrupt, syscall.SIGTERM, syscall.SIGHUP) m.Diag.Info("listening for signals") - // Block until one of the signals above is received - select { - case <-signalCh: - m.Diag.Info("signal received, initializing clean shutdown...") - go func() { - cmd.Close() - }() + Loop: + for s := range signalCh { + switch s.String() { + case syscall.SIGTERM.String(): + m.Diag.Info("SIGTERM received, initializing clean shutdown...") + go func() { + cmd.Close() + }() + break Loop + + case syscall.SIGHUP.String(): + m.Diag.Info("SIGHUP received, reloading tasks/templates/handlers directory...") + cmd.Server.Reload() + + default: + m.Diag.Info("signal received, initializing clean shutdown...") + go func() { + cmd.Close() + }() + break Loop + } } // Block again until another signal is received, a shutdown timeout elapses, diff --git a/cmd/kapacitord/run/command.go b/cmd/kapacitord/run/command.go index e286807be..6f521e2ae 100644 --- a/cmd/kapacitord/run/command.go +++ b/cmd/kapacitord/run/command.go @@ -130,6 +130,7 @@ func (cmd *Command) Run(args ...string) error { if err := s.Open(); err != nil { return fmt.Errorf("open server: %s", err) } + cmd.Server = s // Begin monitoring the server's error channel. diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index 5655c097f..b2c0e2457 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -39,6 +39,14 @@ default-retention-policy = "" # DEBUG, INFO, WARN, ERROR, or OFF level = "INFO" +[load] + # Enable/Disable the service for loading tasks/templates/handlers + # from a directory + enabled = true + # Directory where task/template/handler files are set + dir = "/etc/kapacitor/load" + + [replay] # Where to store replay files, aka recordings. dir = "/var/lib/kapacitor/replay" diff --git a/examples/load/README.md b/examples/load/README.md new file mode 100644 index 000000000..831f3688b --- /dev/null +++ b/examples/load/README.md @@ -0,0 +1,177 @@ +# File based task/template/handler definition + +This proposal introduces directory based task, template, and handler definition. + +## Configuration +The load service configuration is specified under the `load` tag of the +kapacitor configuration file. + +``` +[load] + enabled = true + dir="/path/to/directory" +``` + +`dir` specifies the directory where the definition files exist. + +The service will attempt to load data from three subdirectories. + +The `tasks` directory should contain task tickscripts and and the associated templated task +definition files (either yaml or json). + +The `templates` directory should contain templated tickscripts. + +The `handlers` directory will contain will contain topic handler definitions in yaml or json. + +## Tasks + +Task files must be placed in the `tasks` subdirectory of the load service +directory. Defining tasks explicitly will be done according to the following scheme: + +* `id` - the file name without the tick extension +* `type` - determined by introspection of the task (stream, batch) +* `dbrp` - defined using the `dbrp` keyword followed by a specified database and retention policy + +For example, the tickscript + +``` +// /path/to/directory/tasks/my_task.tick +dbrp "telegraf"."autogen" + +stream + |from() + .measurement('cpu') + .groupBy(*) + |alert() + .warn(lambda: "usage_idle" < 20) + .crit(lambda: "usage_idle" < 10) + // Send alerts to the `cpu` topic + .topic('cpu') +``` + +will create a `stream` task named `my_task` for the dbrp `telegraf.autogen`. + + +## Templates + +Template files must be placed in the `templates` subdirectory of the load service +directory. Defining templated tasks is done according to the following scheme: + +* `id` - the file name without the tick extension +* `type` - determined by introspection of the task (stream, batch) +* `dbrp` - defined using the `dbrp` keyword followed by a specified database and retention policy + +For example, the tickscript +``` +// /path/to/directory/templates/my_template.tick +dbrp "telegraf"."autogen" + +var measurement string +var where_filter = lambda: TRUE +var groups = [*] +var field string +var warn lambda +var crit lambda +var window = 5m +var slack_channel = '#alerts' + +stream + |from() + .measurement(measurement) + .where(where_filter) + .groupBy(groups) + |window() + .period(window) + .every(window) + |mean(field) + |alert() + .warn(warn) + .crit(crit) + .slack() + .channel(slack_channel) +``` + +will create a `stream` template named `my_template` for the dbrp `telegaf.autogen`. + +### Templated Tasks + +Templated task files must be placed in the `tasks` subdirectory of the load service +directory. Defining templated tasks will be done according to the following scheme: + +* `id` - filename without the `yaml`, `yml`, or `json` extension +* `dbrps` - required if not specified in template +* `template-id` - required +* `vars` - list of template vars + +For example, the templated task file + +```yaml +# /path/to/directory/tasks/my_templated_tas.tick +dbrps: + - { db: "telegraf", rp: "autogen"} +template-id: my_template +vars: + measurement: + type: string + value: cpu + where_filter: + type: lambda + value: "\"cpu\" == 'cpu-total'" + groups: + type: list + value: + - type: string + value: host + - type: string + value: dc + field: + type: string + value : usage_idle + warn: + type: lambda + value: "\"mean\" < 30.0" + crit: + type: lambda + value: "\"mean\" < 10.0" + window: + type: duration + value : 1m + slack_channel: + type: string + value: "#alerts_testing" +``` +will create a `stream` task named `my_templated_task` for the dbrp `telegraf.autogen`. + +The same task may be created using JSON like so: + +```json +{ + "dbrps": [{"db": "telegraf", "rp": "autogen"}], + "template-id": "my_template", + "vars": { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } + } +} +``` + +## Handlers + +Topic Handler files must be placed in the `handlers` subdirectory of the load service +directory. + +``` +id: hanlder-id +topic: cpu +kind: slack +match: changed() == TRUE +options: + channel: '#alerts' +``` + diff --git a/examples/load/handlers/example.yaml b/examples/load/handlers/example.yaml new file mode 100644 index 000000000..97dd1b8d4 --- /dev/null +++ b/examples/load/handlers/example.yaml @@ -0,0 +1,6 @@ +topic: cpu +id: example +kind: slack +match: changed() == TRUE +options: + channel: '#alerts' diff --git a/examples/load/handlers/other.yaml b/examples/load/handlers/other.yaml new file mode 100644 index 000000000..986c0605f --- /dev/null +++ b/examples/load/handlers/other.yaml @@ -0,0 +1,6 @@ +topic: cpu +id: other +kind: slack +match: changed() == TRUE +options: + channel: '#alerts' diff --git a/examples/load/tasks/another.yaml b/examples/load/tasks/another.yaml new file mode 100644 index 000000000..092109c0e --- /dev/null +++ b/examples/load/tasks/another.yaml @@ -0,0 +1,11 @@ +template-id: implicit_template +vars: { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } +} diff --git a/examples/load/tasks/base.yaml b/examples/load/tasks/base.yaml new file mode 100644 index 000000000..9bcfe5c8f --- /dev/null +++ b/examples/load/tasks/base.yaml @@ -0,0 +1,13 @@ +dbrps: + - { db: "telegraf", rp: "autogen"} +template-id: base_template +vars: { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } +} diff --git a/examples/load/tasks/cpu_alert.tick b/examples/load/tasks/cpu_alert.tick new file mode 100644 index 000000000..dc5443b06 --- /dev/null +++ b/examples/load/tasks/cpu_alert.tick @@ -0,0 +1,12 @@ +dbrp "telegraf"."autogen" +dbrp "telegraf"."autogen_not" + +stream + |from() + .measurement('cpu') + .groupBy(*) + |alert() + .warn(lambda: "usage_idle" < 20) + .crit(lambda: "usage_idle" < 10) + // Send alerts to the `cpu` topic + .topic('cpu') diff --git a/examples/load/tasks/implicit.yaml b/examples/load/tasks/implicit.yaml new file mode 100644 index 000000000..092109c0e --- /dev/null +++ b/examples/load/tasks/implicit.yaml @@ -0,0 +1,11 @@ +template-id: implicit_template +vars: { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } +} diff --git a/examples/load/tasks/join.tick b/examples/load/tasks/join.tick new file mode 100644 index 000000000..872cc6a08 --- /dev/null +++ b/examples/load/tasks/join.tick @@ -0,0 +1,33 @@ +dbrp "telegraf"."autogen" + +var data = stream + |from() + .measurement('cpu') + .groupBy(*) + |eval() + .keep('usage_user') + |window() + .period(10s) + .every(5s) + +var mean_data = data + |mean('usage_user') + .as('usage_user') + +var max_data = data + |max('usage_user') + .as('usage_user') + +var min_data = data + |min('usage_user') + .as('usage_user') + +mean_data + |join(max_data, min_data) + .as('mean','max','min') + |eval(lambda: "mean.usage_user", lambda: "max.usage_user", lambda: "min.usage_user") + .as('mean_usage','max_usage','min_usage') + |log() + |influxDBOut() + .database('downit') + .measurement('idk') diff --git a/examples/load/tasks/other.json b/examples/load/tasks/other.json new file mode 100644 index 000000000..896fa9703 --- /dev/null +++ b/examples/load/tasks/other.json @@ -0,0 +1,14 @@ +{ + "dbrps": [{"db": "telegraf", "rp": "autogen"}], + "template-id": "base_template", + "vars": { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } + } +} diff --git a/examples/load/tasks/poll_cpu.tick b/examples/load/tasks/poll_cpu.tick new file mode 100644 index 000000000..cd437c276 --- /dev/null +++ b/examples/load/tasks/poll_cpu.tick @@ -0,0 +1,8 @@ +dbrp "telegraf"."autogen" + +var data = batch + |query('select * from "telegraf".autogen.cpu') + .every(10s) + .period(10s) + |log() + diff --git a/examples/load/tasks/post.tick b/examples/load/tasks/post.tick new file mode 100644 index 000000000..34fd0dc2f --- /dev/null +++ b/examples/load/tasks/post.tick @@ -0,0 +1,9 @@ +dbrp "telegraf"."autogen" + +stream + |from() + .measurement('cpu') + |alert() + .crit(lambda: "usage_idle" < 50.0) + //.post('http://localhost:2345') + .post('http://localhost:5555') diff --git a/examples/load/templates/base_template.tick b/examples/load/templates/base_template.tick new file mode 100644 index 000000000..9f59e5781 --- /dev/null +++ b/examples/load/templates/base_template.tick @@ -0,0 +1,23 @@ +var measurement string +var where_filter = lambda: TRUE +var groups = [*] +var field string +var warn lambda +var crit lambda +var window = 5m +var slack_channel = '#alerts' + +stream + |from() + .measurement(measurement) + .where(where_filter) + .groupBy(groups) + |window() + .period(window) + .every(window) + |mean(field) + |alert() + .warn(warn) + .crit(crit) + .slack() + .channel(slack_channel) diff --git a/examples/load/templates/implicit_template.tick b/examples/load/templates/implicit_template.tick new file mode 100644 index 000000000..c080257f9 --- /dev/null +++ b/examples/load/templates/implicit_template.tick @@ -0,0 +1,25 @@ +dbrp "telegaf"."not_autogen" + +var measurement string +var where_filter = lambda: TRUE +var groups = [*] +var field string +var warn lambda +var crit lambda +var window = 5m +var slack_channel = '#alerts' + +stream + |from() + .measurement(measurement) + .where(where_filter) + .groupBy(groups) + |window() + .period(window) + .every(window) + |mean(field) + |alert() + .warn(warn) + .crit(crit) + .slack() + .channel(slack_channel) diff --git a/server/config.go b/server/config.go index 9396ceaa1..5a179e514 100644 --- a/server/config.go +++ b/server/config.go @@ -27,6 +27,7 @@ import ( "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/influxdb" "github.com/influxdata/kapacitor/services/k8s" + "github.com/influxdata/kapacitor/services/load" "github.com/influxdata/kapacitor/services/marathon" "github.com/influxdata/kapacitor/services/mqtt" "github.com/influxdata/kapacitor/services/nerve" @@ -65,6 +66,7 @@ type Config struct { Replay replay.Config `toml:"replay"` Storage storage.Config `toml:"storage"` Task task_store.Config `toml:"task"` + Load load.Config `toml:"load"` InfluxDB []influxdb.Config `toml:"influxdb" override:"influxdb,element-key=name"` Logging diagnostic.Config `toml:"logging"` ConfigOverride config.Config `toml:"config-override"` @@ -159,6 +161,7 @@ func NewConfig() *Config { c.Stats = stats.NewConfig() c.UDF = udf.NewConfig() c.Deadman = deadman.NewConfig() + c.Load = load.NewConfig() return c } @@ -182,6 +185,7 @@ func NewDemoConfig() (*Config, error) { c.Task.Dir = filepath.Join(homeDir, ".kapacitor", c.Task.Dir) c.Storage.BoltDBPath = filepath.Join(homeDir, ".kapacitor", c.Storage.BoltDBPath) c.DataDir = filepath.Join(homeDir, ".kapacitor", c.DataDir) + c.Load.Dir = filepath.Join(homeDir, ".kapacitor", c.Load.Dir) return c, nil } @@ -206,6 +210,9 @@ func (c *Config) Validate() error { if err := c.Task.Validate(); err != nil { return errors.Wrap(err, "task") } + if err := c.Load.Validate(); err != nil { + return err + } // Validate the set of InfluxDB configs. // All names should be unique. names := make(map[string]bool, len(c.InfluxDB)) diff --git a/server/server.go b/server/server.go index 2044eb8c5..b9c91565c 100644 --- a/server/server.go +++ b/server/server.go @@ -37,6 +37,7 @@ import ( "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/influxdb" "github.com/influxdata/kapacitor/services/k8s" + "github.com/influxdata/kapacitor/services/load" "github.com/influxdata/kapacitor/services/marathon" "github.com/influxdata/kapacitor/services/mqtt" "github.com/influxdata/kapacitor/services/nerve" @@ -104,6 +105,7 @@ type Server struct { TaskMaster *kapacitor.TaskMaster TaskMasterLookup *kapacitor.TaskMasterLookup + LoadService *load.Service AuthService auth.Interface HTTPDService *httpd.Service StorageService *storage.Service @@ -212,6 +214,10 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv return nil, errors.Wrap(err, "influxdb service") } + if err := s.appendLoadService(); err != nil { + return nil, errors.Wrap(err, "load service") + } + // Append Alert integration services s.appendAlertaService() s.appendHipChatService() @@ -364,6 +370,28 @@ func (s *Server) appendSMTPService() { s.AppendService("smtp", srv) } +func (s *Server) appendLoadService() error { + c := s.config.Load + d := s.DiagService.NewLoadHandler() + if s.HTTPDService == nil { + return errors.New("httpd service must be set for load service") + } + if s.HTTPDService.Handler == nil { + return errors.New("httpd service handler must be set for load service") + } + srv, err := load.NewService(c, s.HTTPDService.Handler, d) + if err != nil { + return err + } + + srv.StorageService = s.StorageService + + s.LoadService = srv + s.AppendService("load", srv) + + return nil +} + func (s *Server) appendInfluxDBService() error { c := s.config.InfluxDB d := s.DiagService.NewInfluxDBHandler() @@ -867,6 +895,10 @@ func (s *Server) Open() error { return err } + if err := s.LoadService.Load(); err != nil { + return fmt.Errorf("failed to reload tasks/templates/handlers: %v", err) + } + go s.watchServices() go s.watchConfigUpdates() @@ -1025,6 +1057,12 @@ func (s *Server) writeID(file string, id uuid.UUID) error { return nil } +func (s *Server) Reload() { + if err := s.LoadService.Load(); err != nil { + s.Diag.Error("failed to reload tasks/templates/handlers", err) + } +} + func (s *Server) SetClusterID(clusterID uuid.UUID) error { s.clusterIDMu.Lock() defer s.clusterIDMu.Unlock() diff --git a/server/server_helper_test.go b/server/server_helper_test.go index 9cbb17113..6a53ab53b 100644 --- a/server/server_helper_test.go +++ b/server/server_helper_test.go @@ -10,6 +10,7 @@ import ( "net/http/httptest" "net/url" "os" + "path" "path/filepath" "strings" "testing" @@ -82,6 +83,16 @@ func OpenDefaultServer() (*Server, *client.Client) { return s, client } +func OpenLoadServer() (*Server, *server.Config, *client.Client) { + c := NewConfig() + if err := copyFiles("testdata/load", c.Load.Dir); err != nil { + panic(err) + } + s := OpenServer(c) + client := Client(s) + return s, c, client +} + // OpenServer opens a test server. func OpenServer(c *server.Config) *Server { s := NewServer(c) @@ -217,6 +228,8 @@ func NewConfig() *server.Config { //c.HTTP.BindAddress = "127.0.0.1:9092" //c.HTTP.GZIP = false c.InfluxDB[0].Enabled = false + c.Load.Enabled = true + c.Load.Dir = MustTempDir() return c } @@ -265,3 +278,51 @@ func (i *InfluxDB) URL() string { func (i *InfluxDB) Close() { i.server.Close() } + +func copyFiles(src, dst string) error { + fs, err := ioutil.ReadDir(src) + if err != nil { + return err + } + + for _, f := range fs { + if f.IsDir() { + if err := os.Mkdir(path.Join(dst, f.Name()), os.ModePerm); err != nil { + return err + } + // copy deeper files + if err := copyFiles(path.Join(src, f.Name()), path.Join(dst, f.Name())); err != nil { + return err + } + } else { + copyFile(path.Join(src, f.Name()), path.Join(dst, f.Name())) + } + } + + return nil + +} + +func copyFile(src, dst string) error { + srcFile, err := os.Open(src) + if err != nil { + return err + } + defer srcFile.Close() + + dstFile, err := os.Create(dst) + if err != nil { + return err + } + defer dstFile.Close() + + if _, err := io.Copy(dstFile, srcFile); err != nil { + return err + } + + if err := dstFile.Sync(); err != nil { + return err + } + + return nil +} diff --git a/server/server_test.go b/server/server_test.go index d65987286..e06a81488 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -11,6 +11,7 @@ import ( "net/url" "os" "os/exec" + "path" "path/filepath" "reflect" "runtime" @@ -347,6 +348,259 @@ func TestServer_CreateTask(t *testing.T) { } } +func TestServer_CreateTaskImplicitStream(t *testing.T) { + s, cli := OpenDefaultServer() + defer s.Close() + + id := "testTaskID" + dbrps := []client.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + { + Database: "otherdb", + RetentionPolicy: "default", + }, + } + tick := `dbrp "mydb"."myrp" + +dbrp "otherdb"."default" + +stream + |from() + .measurement('test') +` + task, err := cli.CreateTask(client.CreateTaskOptions{ + ID: id, + TICKscript: tick, + Status: client.Disabled, + }) + if err != nil { + t.Fatal(err) + } + + ti, err := cli.Task(task.Link, nil) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.ID != id { + t.Fatalf("unexpected id got %s exp %s", ti.ID, id) + } + if ti.Type != client.StreamTask { + t.Fatalf("unexpected type got %v exp %v", ti.Type, client.StreamTask) + } + if ti.Status != client.Disabled { + t.Fatalf("unexpected status got %v exp %v", ti.Status, client.Disabled) + } + if !reflect.DeepEqual(ti.DBRPs, dbrps) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, dbrps) + } + if ti.TICKscript != tick { + t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick) + } + dot := "digraph testTaskID {\nstream0 -> from1;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) + } +} + +func TestServer_CreateTaskBatch(t *testing.T) { + s, cli := OpenDefaultServer() + defer s.Close() + + id := "testTaskID" + dbrps := []client.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + } + tick := `dbrp "mydb"."myrp" + +batch + |query('SELECT * from mydb.myrp.mymeas') + |log() +` + task, err := cli.CreateTask(client.CreateTaskOptions{ + ID: id, + TICKscript: tick, + Status: client.Disabled, + }) + if err != nil { + t.Fatal(err) + } + + ti, err := cli.Task(task.Link, nil) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.ID != id { + t.Fatalf("unexpected id got %s exp %s", ti.ID, id) + } + if ti.Type != client.BatchTask { + t.Fatalf("unexpected type got %v exp %v", ti.Type, client.BatchTask) + } + if ti.Status != client.Disabled { + t.Fatalf("unexpected status got %v exp %v", ti.Status, client.Disabled) + } + if !reflect.DeepEqual(ti.DBRPs, dbrps) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, dbrps) + } + if ti.TICKscript != tick { + t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, tick) + } + dot := "digraph testTaskID {\nquery1 -> log2;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) + } +} + +func TestServer_CreateTaskImplicitAndExplicit(t *testing.T) { + s, cli := OpenDefaultServer() + defer s.Close() + + id := "testTaskID" + dbrps := []client.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + } + tick := `dbrp "mydb"."myrp" + +dbrp "otherdb"."default" + +stream + |from() + .measurement('test') +` + _, err := cli.CreateTask(client.CreateTaskOptions{ + ID: id, + DBRPs: dbrps, + TICKscript: tick, + Status: client.Disabled, + }) + + // It is expected that error should be non nil + if err == nil { + t.Fatal("expected task to fail to be created") + } +} + +func TestServer_CreateTaskExplicitUpdateImplicit(t *testing.T) { + s, cli := OpenDefaultServer() + defer s.Close() + + id := "testTaskID" + createDBRPs := []client.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + { + Database: "otherdb", + RetentionPolicy: "default", + }, + } + createTick := `stream + |from() + .measurement('test') +` + updateDBRPs := []client.DBRP{ + { + Database: "mydb", + RetentionPolicy: "myrp", + }, + } + updateTick := `dbrp "mydb"."myrp" + +stream + |from() + .measurement('test') +` + task, err := cli.CreateTask(client.CreateTaskOptions{ + ID: id, + DBRPs: createDBRPs, + TICKscript: createTick, + Status: client.Disabled, + }) + if err != nil { + t.Fatal(err) + } + + ti, err := cli.Task(task.Link, nil) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.ID != id { + t.Fatalf("unexpected id got %s exp %s", ti.ID, id) + } + if ti.Type != client.StreamTask { + t.Fatalf("unexpected type got %v exp %v", ti.Type, client.StreamTask) + } + if ti.Status != client.Disabled { + t.Fatalf("unexpected status got %v exp %v", ti.Status, client.Disabled) + } + if !reflect.DeepEqual(ti.DBRPs, createDBRPs) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, createDBRPs) + } + if ti.TICKscript != createTick { + t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, createTick) + } + dot := "digraph testTaskID {\nstream0 -> from1;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) + } + + _, err = cli.UpdateTask(task.Link, client.UpdateTaskOptions{ + TICKscript: updateTick, + }) + if err != nil { + t.Fatal(err) + } + + ti, err = cli.Task(task.Link, nil) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.ID != id { + t.Fatalf("unexpected id got %s exp %s", ti.ID, id) + } + if ti.Type != client.StreamTask { + t.Fatalf("unexpected type got %v exp %v", ti.Type, client.StreamTask) + } + if ti.Status != client.Disabled { + t.Fatalf("unexpected status got %v exp %v", ti.Status, client.Disabled) + } + if !reflect.DeepEqual(ti.DBRPs, updateDBRPs) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, updateDBRPs) + } + if ti.TICKscript != updateTick { + t.Fatalf("unexpected TICKscript got %s exp %s", ti.TICKscript, updateTick) + } + dot = "digraph testTaskID {\nstream0 -> from1;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) + } +} + func TestServer_EnableTask(t *testing.T) { s, cli := OpenDefaultServer() defer s.Close() @@ -1016,6 +1270,139 @@ stream t.Fatalf("unexpected vars\ngot\n%s\nexp\n%s\n", ti.Vars, vars) } } + +func TestServer_CreateTemplateImplicitAndUpdateExplicitWithTasks(t *testing.T) { + s, cli := OpenDefaultServer() + defer s.Close() + + id := "testTemplateID" + implicitTick := `dbrp "telegraf"."autogen" + +var x = 5 + +stream + |from() + .measurement('test') +` + template, err := cli.CreateTemplate(client.CreateTemplateOptions{ + ID: id, + TICKscript: implicitTick, + }) + if err != nil { + t.Fatal(err) + } + + ti, err := cli.Template(template.Link, nil) + if err != nil { + t.Fatal(err) + } + + if ti.Error != "" { + t.Fatal(ti.Error) + } + if ti.ID != id { + t.Fatalf("unexpected id got %s exp %s", ti.ID, id) + } + if ti.Type != client.StreamTask { + t.Fatalf("unexpected type got %v exp %v", ti.Type, client.StreamTask) + } + if ti.TICKscript != implicitTick { + t.Fatalf("unexpected TICKscript got\n%s\nexp\n%s\n", ti.TICKscript, implicitTick) + } + dot := "digraph testTemplateID {\nstream0 -> from1;\n}" + if ti.Dot != dot { + t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) + } + vars := client.Vars{"x": {Value: int64(5), Type: client.VarInt}} + if !reflect.DeepEqual(vars, ti.Vars) { + t.Fatalf("unexpected vars\ngot\n%s\nexp\n%s\n", ti.Vars, vars) + } + + implicitDBRPs := []client.DBRP{ + { + Database: "telegraf", + RetentionPolicy: "autogen", + }, + } + + count := 1 + tasks := make([]client.Task, count) + for i := 0; i < count; i++ { + task, err := cli.CreateTask(client.CreateTaskOptions{ + TemplateID: template.ID, + Status: client.Enabled, + }) + if err != nil { + t.Fatal(err) + } + tasks[i] = task + + ti, err := cli.Task(task.Link, nil) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(ti.DBRPs, implicitDBRPs) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, implicitDBRPs) + } + } + + updateTick := `var x = 5 + + stream + |from() + .measurement('test') + ` + + _, err = cli.UpdateTemplate(template.Link, client.UpdateTemplateOptions{ + ID: id, + TICKscript: updateTick, + }) + // Expects error + if err == nil { + t.Fatal(err) + } + + finalTick := `dbrp "telegraf"."autogen" + + dbrp "telegraf"."not_autogen" + + var x = 5 + + stream + |from() + .measurement('test') + ` + + finalDBRPs := []client.DBRP{ + { + Database: "telegraf", + RetentionPolicy: "autogen", + }, + { + Database: "telegraf", + RetentionPolicy: "not_autogen", + }, + } + template, err = cli.UpdateTemplate(template.Link, client.UpdateTemplateOptions{ + ID: id, + TICKscript: finalTick, + }) + if err != nil { + t.Fatal(err) + } + + for _, task := range tasks { + ti, err := cli.Task(task.Link, nil) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(ti.DBRPs, finalDBRPs) { + t.Fatalf("unexpected dbrps got %s exp %s", ti.DBRPs, finalDBRPs) + } + } +} func TestServer_UpdateTemplateID_WithTasks(t *testing.T) { s, cli := OpenDefaultServer() defer s.Close() @@ -10147,3 +10534,138 @@ func TestStorage_Backup(t *testing.T) { t.Fatalf("unexpected dot\ngot\n%s\nexp\n%s\n", ti.Dot, dot) } } + +func TestLoadService(t *testing.T) { + s, c, cli := OpenLoadServer() + + // If the list of test fixtures changes update this list + tasks := []string{"base", "cpu_alert", "implicit", "join", "other"} + ts, err := cli.ListTasks(nil) + if err != nil { + t.Fatalf("enountered error listing tasks: %v", err) + } + for i, task := range ts { + if exp, got := tasks[i], task.ID; exp != got { + t.Fatalf("expected task ID to be %v, got %v\n", exp, got) + } + } + + // If the list of test fixtures changes update this list + templates := []string{"base_template", "implicit_template"} + tmps, err := cli.ListTemplates(nil) + if err != nil { + t.Fatalf("enountered error listing tasks: %v", err) + } + for i, template := range tmps { + if exp, got := templates[i], template.ID; exp != got { + t.Fatalf("expected template ID to be %v, got %v\n", exp, got) + } + } + + // If the list of test fixtures changes update this list + topicHandlers := []string{"example", "other"} + link := cli.TopicHandlersLink("cpu") + ths, err := cli.ListTopicHandlers(link, nil) + if err != nil { + t.Fatalf("enountered error listing tasks: %v", err) + } + for i, th := range ths.Handlers { + if exp, got := topicHandlers[i], th.ID; exp != got { + t.Fatalf("expected topic-handler ID to be %v, got %v\n", exp, got) + } + } + + // delete task file + err = os.Rename( + path.Join(c.Load.Dir, "tasks", "join.tick"), + path.Join(c.Load.Dir, "tasks", "z.tick"), + ) + if err != nil { + t.Fatalf("failed to rename tickscript: %v", err) + } + + // reload + s.Reload() + + // If the list of test fixtures changes update this list + tasks = []string{"base", "cpu_alert", "implicit", "other", "z"} + ts, err = cli.ListTasks(nil) + if err != nil { + t.Fatalf("enountered error listing tasks: %v", err) + } + for i, task := range ts { + if exp, got := tasks[i], task.ID; exp != got { + t.Fatalf("expected task ID to be %v, got %v\n", exp, got) + } + } + + // rename template file + err = os.Rename( + path.Join(c.Load.Dir, "templates", "base_template.tick"), + path.Join(c.Load.Dir, "templates", "new.tick"), + ) + if err != nil { + t.Fatalf("failed to rename tickscript: %v", err) + } + + // reload + s.Reload() + + // If the list of test fixtures changes update this list + templates = []string{"implicit_template", "new"} + tmps, err = cli.ListTemplates(nil) + if err != nil { + t.Fatalf("enountered error listing templates: %v", err) + } + for i, template := range tmps { + if exp, got := templates[i], template.ID; exp != got { + t.Fatalf("expected template ID to be %v, got %v\n", exp, got) + } + } + // move template file back + err = os.Rename( + path.Join(c.Load.Dir, "templates", "new.tick"), + path.Join(c.Load.Dir, "templates", "base_template.tick"), + ) + + // add a new handler + f, err := os.Create(path.Join(c.Load.Dir, "handlers", "new.tick")) + if err != nil { + t.Fatalf("failed to create new handler file: %v", err) + } + + script := `topic: cpu +id: new +kind: slack +match: changed() == TRUE +options: + channel: '#alerts' +` + + if _, err := f.Write([]byte(script)); err != nil { + t.Fatalf("failed to write handler: %v", err) + } + f.Close() + + // remove handler file back + if err := os.Remove(path.Join(c.Load.Dir, "handlers", "other.yaml")); err != nil { + t.Fatalf("failed to remove handler file: %v", err) + } + + // reload + s.Reload() + + // If the list of test fixtures changes update this list + topicHandlers = []string{"example", "new"} + link = cli.TopicHandlersLink("cpu") + ths, err = cli.ListTopicHandlers(link, nil) + if err != nil { + t.Fatalf("enountered error listing topic-handlers: %v", err) + } + for i, th := range ths.Handlers { + if exp, got := topicHandlers[i], th.ID; exp != got { + t.Fatalf("expected topic-handler ID to be %v, got %v\n", exp, got) + } + } + +} diff --git a/server/testdata/load/handlers/example.yaml b/server/testdata/load/handlers/example.yaml new file mode 100644 index 000000000..97dd1b8d4 --- /dev/null +++ b/server/testdata/load/handlers/example.yaml @@ -0,0 +1,6 @@ +topic: cpu +id: example +kind: slack +match: changed() == TRUE +options: + channel: '#alerts' diff --git a/server/testdata/load/handlers/other.yaml b/server/testdata/load/handlers/other.yaml new file mode 100644 index 000000000..986c0605f --- /dev/null +++ b/server/testdata/load/handlers/other.yaml @@ -0,0 +1,6 @@ +topic: cpu +id: other +kind: slack +match: changed() == TRUE +options: + channel: '#alerts' diff --git a/server/testdata/load/tasks/base.yaml b/server/testdata/load/tasks/base.yaml new file mode 100644 index 000000000..9bcfe5c8f --- /dev/null +++ b/server/testdata/load/tasks/base.yaml @@ -0,0 +1,13 @@ +dbrps: + - { db: "telegraf", rp: "autogen"} +template-id: base_template +vars: { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } +} diff --git a/server/testdata/load/tasks/cpu_alert.tick b/server/testdata/load/tasks/cpu_alert.tick new file mode 100644 index 000000000..dc5443b06 --- /dev/null +++ b/server/testdata/load/tasks/cpu_alert.tick @@ -0,0 +1,12 @@ +dbrp "telegraf"."autogen" +dbrp "telegraf"."autogen_not" + +stream + |from() + .measurement('cpu') + .groupBy(*) + |alert() + .warn(lambda: "usage_idle" < 20) + .crit(lambda: "usage_idle" < 10) + // Send alerts to the `cpu` topic + .topic('cpu') diff --git a/server/testdata/load/tasks/implicit.yaml b/server/testdata/load/tasks/implicit.yaml new file mode 100644 index 000000000..092109c0e --- /dev/null +++ b/server/testdata/load/tasks/implicit.yaml @@ -0,0 +1,11 @@ +template-id: implicit_template +vars: { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } +} diff --git a/server/testdata/load/tasks/join.tick b/server/testdata/load/tasks/join.tick new file mode 100644 index 000000000..77377d24f --- /dev/null +++ b/server/testdata/load/tasks/join.tick @@ -0,0 +1,30 @@ +dbrp "telegraf"."autogen" + +var data = stream + |from() + .measurement('cpu') + .groupBy(*) + |eval() + .keep('usage_user') + |window() + .period(10s) + .every(5s) + +var mean_data = data + |mean('usage_user') + .as('usage_user') + +var max_data = data + |max('usage_user') + .as('usage_user') + +var min_data = data + |min('usage_user') + .as('usage_user') + +mean_data + |join(max_data, min_data) + .as('mean','max','min') + |eval(lambda: "mean.usage_user", lambda: "max.usage_user", lambda: "min.usage_user") + .as('mean_usage','max_usage','min_usage') + |log() diff --git a/server/testdata/load/tasks/other.json b/server/testdata/load/tasks/other.json new file mode 100644 index 000000000..896fa9703 --- /dev/null +++ b/server/testdata/load/tasks/other.json @@ -0,0 +1,14 @@ +{ + "dbrps": [{"db": "telegraf", "rp": "autogen"}], + "template-id": "base_template", + "vars": { + "measurement": {"type" : "string", "value" : "cpu" }, + "where_filter": {"type": "lambda", "value": "\"cpu\" == 'cpu-total'"}, + "groups": {"type": "list", "value": [{"type":"string", "value":"host"},{"type":"string", "value":"dc"}]}, + "field": {"type" : "string", "value" : "usage_idle" }, + "warn": {"type" : "lambda", "value" : "\"mean\" < 30.0" }, + "crit": {"type" : "lambda", "value" : "\"mean\" < 10.0" }, + "window": {"type" : "duration", "value" : "1m" }, + "slack_channel": {"type" : "string", "value" : "#alerts_testing" } + } +} diff --git a/server/testdata/load/templates/base_template.tick b/server/testdata/load/templates/base_template.tick new file mode 100644 index 000000000..9f59e5781 --- /dev/null +++ b/server/testdata/load/templates/base_template.tick @@ -0,0 +1,23 @@ +var measurement string +var where_filter = lambda: TRUE +var groups = [*] +var field string +var warn lambda +var crit lambda +var window = 5m +var slack_channel = '#alerts' + +stream + |from() + .measurement(measurement) + .where(where_filter) + .groupBy(groups) + |window() + .period(window) + .every(window) + |mean(field) + |alert() + .warn(warn) + .crit(crit) + .slack() + .channel(slack_channel) diff --git a/server/testdata/load/templates/implicit_template.tick b/server/testdata/load/templates/implicit_template.tick new file mode 100644 index 000000000..c080257f9 --- /dev/null +++ b/server/testdata/load/templates/implicit_template.tick @@ -0,0 +1,25 @@ +dbrp "telegaf"."not_autogen" + +var measurement string +var where_filter = lambda: TRUE +var groups = [*] +var field string +var warn lambda +var crit lambda +var window = 5m +var slack_channel = '#alerts' + +stream + |from() + .measurement(measurement) + .where(where_filter) + .groupBy(groups) + |window() + .period(window) + .every(window) + |mean(field) + |alert() + .warn(warn) + .crit(crit) + .slack() + .channel(slack_channel) diff --git a/services/diagnostic/handlers.go b/services/diagnostic/handlers.go index a3450b227..72af09950 100644 --- a/services/diagnostic/handlers.go +++ b/services/diagnostic/handlers.go @@ -1206,3 +1206,21 @@ func (h *CmdHandler) GoVersion() { func (h *CmdHandler) Info(msg string) { h.l.Info(msg) } + +// Load handler + +type LoadHandler struct { + l *klog.Logger +} + +func (h *LoadHandler) Error(msg string, err error) { + h.l.Error(msg, klog.Error(err)) +} + +func (h *LoadHandler) Debug(msg string) { + h.l.Debug(msg) +} + +func (h *LoadHandler) Loading(el string, file string) { + h.l.Debug("loading object from file", klog.String("object", el), klog.String("file", file)) +} diff --git a/services/diagnostic/service.go b/services/diagnostic/service.go index 9accf238d..e4e61fd88 100644 --- a/services/diagnostic/service.go +++ b/services/diagnostic/service.go @@ -423,3 +423,9 @@ func (s *Service) NewCmdHandler() *CmdHandler { l: s.logger.With(klog.String("service", "run")), } } + +func (s *Service) NewLoadHandler() *LoadHandler { + return &LoadHandler{ + l: s.logger.With(klog.String("service", "load")), + } +} diff --git a/services/load/config.go b/services/load/config.go new file mode 100644 index 000000000..02e6dc163 --- /dev/null +++ b/services/load/config.go @@ -0,0 +1,50 @@ +package load + +import ( + "errors" + "path/filepath" +) + +const taskDir = "tasks" +const templateDir = "templates" +const handlerDir = "handlers" + +type Config struct { + Enabled bool `toml:"enabled"` + Dir string `toml:"dir"` +} + +func NewConfig() Config { + return Config{ + Enabled: false, + Dir: "./load", + } +} + +// Validates verifies that the directory specified is an absolute path +// and that it contains the directories /tasks and /handlers. The directory +// may contain additional files, but must at least contain /tasks and /handlers. +func (c Config) Validate() error { + if !c.Enabled { + return nil + } + + // Verify that the path is absolute + if !filepath.IsAbs(c.Dir) { + return errors.New("dir must be an absolute path") + } + + return nil +} + +func (c Config) tasksDir() string { + return filepath.Join(c.Dir, taskDir) +} + +func (c Config) templatesDir() string { + return filepath.Join(c.Dir, templateDir) +} + +func (c Config) handlersDir() string { + return filepath.Join(c.Dir, handlerDir) +} diff --git a/services/load/dao.go b/services/load/dao.go new file mode 100644 index 000000000..e004850d8 --- /dev/null +++ b/services/load/dao.go @@ -0,0 +1,111 @@ +package load + +import ( + "encoding/json" + "path" + + "github.com/influxdata/kapacitor/services/storage" +) + +// version is the current version of the Item structure. +const version = 1 + +const ( + tasksStr = "tasks" + templatesStr = "templates" + handlersStr = "handlers" +) + +// Data access object for resources loaded from +// a directory +type ItemsDAO interface { + Set(i Item) error + Delete(id string) error + List(prefix string) ([]Item, error) + + Rebuild() error +} + +type Item struct { + ID string `json:"id"` +} + +func newTaskItem(id string) Item { + return Item{ + ID: path.Join(tasksStr, id), + } +} + +func newTemplateItem(id string) Item { + return Item{ + ID: path.Join(templatesStr, id), + } +} + +func newTopicHandlerItem(topic, id string) Item { + return Item{ + ID: path.Join(handlersStr, topic, id), + } +} + +func (i Item) MarshalBinary() ([]byte, error) { + return storage.VersionJSONEncode(version, i) +} +func (i *Item) UnmarshalBinary(data []byte) error { + return storage.VersionJSONDecode(data, func(version int, dec *json.Decoder) error { + dec.UseNumber() + return dec.Decode(i) + }) +} + +func (i Item) ObjectID() string { + return i.ID +} + +// Key/Value store based implementation of ItemsDAO +type itemKV struct { + store *storage.IndexedStore +} + +func newItemKV(store storage.Interface) (*itemKV, error) { + c := storage.DefaultIndexedStoreConfig("load_items", func() storage.BinaryObject { + return new(Item) + }) + + istore, err := storage.NewIndexedStore(store, c) + if err != nil { + return nil, err + } + + return &itemKV{ + store: istore, + }, nil +} + +func (kv *itemKV) Set(i Item) error { + return kv.store.Put(&i) +} + +func (kv *itemKV) Delete(id string) error { + return kv.store.Delete(id) +} + +func (kv *itemKV) List(prefix string) ([]Item, error) { + objects, err := kv.store.List(path.Join(storage.DefaultIDIndex, prefix), "", 0, -1) + if err != nil { + return nil, err + } + items := make([]Item, len(objects)) + for i, object := range objects { + it, ok := object.(*Item) + if !ok { + return nil, storage.ImpossibleTypeErr(i, object) + } + items[i] = *it + } + return items, nil +} + +func (kv *itemKV) Rebuild() error { + return kv.store.Rebuild() +} diff --git a/services/load/service.go b/services/load/service.go new file mode 100644 index 000000000..d42932063 --- /dev/null +++ b/services/load/service.go @@ -0,0 +1,667 @@ +package load + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os" + "path" + "path/filepath" + "strings" + "sync" + + "github.com/ghodss/yaml" + + "github.com/influxdata/kapacitor/client/v1" + kexpvar "github.com/influxdata/kapacitor/expvar" + "github.com/influxdata/kapacitor/server/vars" + "github.com/influxdata/kapacitor/services/storage" + "github.com/pkg/errors" +) + +var defaultURL = "http://localhost:9092" + +const ( + statErrorCount = "errors" +) + +const ( + // Public name of overrides store + loadAPIName = "load" + // The storage namespace for all configuration override data. + loadNamespace = "load_items" +) + +type Diagnostic interface { + Debug(msg string) + Error(msg string, err error) + Loading(thing string, file string) +} + +type Service struct { + mu sync.Mutex + config Config + + cli *client.Client + statsKey string + statMap *kexpvar.Map + errorCount *kexpvar.Int + + items ItemsDAO + + tasks map[string]bool + templates map[string]bool + handlers map[string]bool + + StorageService interface { + Store(namespace string) storage.Interface + Register(name string, store storage.StoreActioner) + } + + diag Diagnostic +} + +func NewService(c Config, h http.Handler, d Diagnostic) (*Service, error) { + cfg := client.Config{ + URL: defaultURL, + UserAgent: "internal-load-service", + } + if h != nil { + cfg.Transport = client.NewLocalTransport(h) + } + cli, err := client.New(cfg) + + if err != nil { + return nil, fmt.Errorf("failed to create client: %v", err) + } + + s := &Service{ + config: c, + diag: d, + cli: cli, + tasks: map[string]bool{}, + templates: map[string]bool{}, + handlers: map[string]bool{}, + } + + s.statsKey, s.statMap = vars.NewStatistic("load", nil) + s.errorCount = &kexpvar.Int{} + s.statMap.Set(statErrorCount, s.errorCount) + + return s, nil +} + +func (s *Service) Open() error { + if s.StorageService == nil { + return errors.New("StorageService cannot be nil") + } + store := s.StorageService.Store(loadNamespace) + items, err := newItemKV(store) + if err != nil { + return err + } + s.items = items + s.StorageService.Register(loadAPIName, s.items) + return nil +} + +func (s *Service) Close() error { + return nil +} + +// taskFiles gets a slice of all files with the .tick file extension +// and any associated files with .json, .yml, and .yaml file extentions +// in the configured task directory. +func (s *Service) taskFiles() (tickscripts []string, taskFiles []string, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + tasksDir := s.config.tasksDir() + + files, err := ioutil.ReadDir(tasksDir) + if err != nil { + return nil, nil, err + } + + for _, file := range files { + if file.IsDir() { + continue + } + + filename := file.Name() + switch ext := filepath.Ext(filename); ext { + case ".tick": + tickscripts = append(tickscripts, filepath.Join(tasksDir, filename)) + case ".yml", ".json", ".yaml": + taskFiles = append(taskFiles, filepath.Join(tasksDir, filename)) + default: + continue + } + } + + return +} + +// templateFiles gets a slice of all files with the .tick file extension +// in the configured template directory. +func (s *Service) templateFiles() (tickscripts []string, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + templatesDir := s.config.templatesDir() + + files, err := ioutil.ReadDir(templatesDir) + if err != nil { + return nil, err + } + + for _, file := range files { + if file.IsDir() { + continue + } + + filename := file.Name() + switch ext := filepath.Ext(filename); ext { + case ".tick": + tickscripts = append(tickscripts, filepath.Join(templatesDir, filename)) + default: + continue + } + } + + return +} + +// HandlerFiles gets a slice of all files with the .json, .yml, and +// .yaml file extentions in the configured handler directory. +func (s *Service) handlerFiles() ([]string, error) { + s.mu.Lock() + defer s.mu.Unlock() + handlers := []string{} + + handlersDir := s.config.handlersDir() + + files, err := ioutil.ReadDir(handlersDir) + if err != nil { + return nil, err + } + + for _, file := range files { + if file.IsDir() { + continue + } + + filename := file.Name() + switch ext := filepath.Ext(filename); ext { + case ".yml", ".json", ".yaml": + handlers = append(handlers, filepath.Join(handlersDir, filename)) + default: + continue + } + } + + return handlers, nil +} + +func (s *Service) Load() error { + s.mu.Lock() + s.tasks = map[string]bool{} + s.templates = map[string]bool{} + s.handlers = map[string]bool{} + s.mu.Unlock() + + if err := s.load(); err != nil { + s.diag.Error("failed to load new files", err) + s.errorCount.Add(1) + return err + } + + if err := s.removeMissing(); err != nil { + s.diag.Error("failed to remove missing", err) + s.errorCount.Add(1) + return err + } + + return nil +} + +func (s *Service) load() error { + if !s.config.Enabled { + return nil + } + + if _, err := ioutil.ReadDir(s.config.Dir); os.IsNotExist(err) { + s.diag.Debug("skipping load... load directory does not exists") + return nil + } + + s.diag.Debug("loading templates") + err := s.loadTemplates() + if err != nil && !os.IsNotExist(err) { + return err + } + + s.diag.Debug("loading tasks") + err = s.loadTasks() + if err != nil && !os.IsNotExist(err) { + return fmt.Errorf("failed to load tasks: %v", err) + } + + s.diag.Debug("loading handlers") + err = s.loadHandlers() + if err != nil && !os.IsNotExist(err) { + return err + } + + return nil +} + +func (s *Service) loadTasks() error { + ticks, templateTasks, err := s.taskFiles() + if err != nil { + return err + } + + for _, f := range ticks { + s.diag.Loading("task", f) + if err := s.loadTask(f); err != nil { + return fmt.Errorf("failed to load file %s: %s", f, err.Error()) + } + } + + for _, v := range templateTasks { + s.diag.Loading("template task", v) + if err := s.loadVars(v); err != nil { + return fmt.Errorf("failed to load file %s: %s", v, err.Error()) + } + } + + return nil +} + +func (s *Service) loadTask(f string) error { + file, err := os.Open(f) + defer file.Close() + if err != nil { + return fmt.Errorf("failed to open file %v: %v", f, err) + } + + data, err := ioutil.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file %v: %v", f, err) + } + + script := string(data) + fn := file.Name() + id := strings.TrimSuffix(filepath.Base(fn), filepath.Ext(fn)) + + l := s.cli.TaskLink(id) + task, _ := s.cli.Task(l, nil) + if task.ID == "" { + o := client.CreateTaskOptions{ + ID: id, + TICKscript: script, + Status: client.Enabled, + } + + if _, err := s.cli.CreateTask(o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + } else { + o := client.UpdateTaskOptions{ + ID: id, + TICKscript: script, + } + if _, err := s.cli.UpdateTask(l, o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + + // do reload + _, err := s.cli.UpdateTask(l, client.UpdateTaskOptions{Status: client.Disabled}) + if err != nil { + return err + } + _, err = s.cli.UpdateTask(l, client.UpdateTaskOptions{Status: client.Enabled}) + if err != nil { + return err + } + + } + + s.mu.Lock() + defer s.mu.Unlock() + if err := s.items.Set(newTaskItem(id)); err != nil { + return err + } + s.tasks[id] = true + + return nil +} + +func (s *Service) loadTemplates() error { + files, err := s.templateFiles() + if err != nil { + return err + } + + for _, f := range files { + s.diag.Loading("template", f) + if err := s.loadTemplate(f); err != nil { + return fmt.Errorf("failed to load file %s: %s", f, err.Error()) + } + } + return nil +} + +func (s *Service) loadTemplate(f string) error { + file, err := os.Open(f) + defer file.Close() + if err != nil { + return fmt.Errorf("failed to open file %v: %v", f, err) + } + + data, err := ioutil.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file %v: %v", f, err) + } + + script := string(data) + fn := file.Name() + id := strings.TrimSuffix(filepath.Base(fn), filepath.Ext(fn)) + + l := s.cli.TemplateLink(id) + task, _ := s.cli.Template(l, nil) + if task.ID == "" { + o := client.CreateTemplateOptions{ + ID: id, + TICKscript: script, + } + + if _, err := s.cli.CreateTemplate(o); err != nil { + return fmt.Errorf("failed to create template: %v", err) + } + } else { + o := client.UpdateTemplateOptions{ + ID: id, + TICKscript: script, + } + if _, err := s.cli.UpdateTemplate(l, o); err != nil { + return fmt.Errorf("failed to create template: %v", err) + } + } + + s.mu.Lock() + defer s.mu.Unlock() + if err := s.items.Set(newTemplateItem(id)); err != nil { + return err + } + s.templates[id] = true + + return nil +} + +func (s *Service) loadVars(f string) error { + file, err := os.Open(f) + defer file.Close() + if err != nil { + return fmt.Errorf("failed to open file %v: %v", f, err) + } + + data, err := ioutil.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file %v: %v", f, err) + } + + fn := file.Name() + id := strings.TrimSuffix(filepath.Base(fn), filepath.Ext(fn)) + + fileVars := client.TaskVars{} + switch ext := path.Ext(f); ext { + case ".yaml", ".yml": + if err := yaml.Unmarshal(data, &fileVars); err != nil { + return errors.Wrapf(err, "failed to unmarshal yaml task vars file %q", f) + } + case ".json": + if err := json.Unmarshal(data, &fileVars); err != nil { + return errors.Wrapf(err, "failed to unmarshal json task vars file %q", f) + } + default: + return errors.New("bad file extension. Must be YAML or JSON") + } + + l := s.cli.TaskLink(id) + task, _ := s.cli.Task(l, nil) + if task.ID == "" { + var o client.CreateTaskOptions + o, err = fileVars.CreateTaskOptions() + if err != nil { + return fmt.Errorf("failed to initialize create task options: %v", err) + } + + o.ID = id + o.Status = client.Enabled + if _, err := s.cli.CreateTask(o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + } else { + var o client.UpdateTaskOptions + o, err := fileVars.UpdateTaskOptions() + if err != nil { + return fmt.Errorf("failed to initialize create task options: %v", err) + } + + o.ID = id + if _, err := s.cli.UpdateTask(l, o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + // do reload + _, err = s.cli.UpdateTask(l, client.UpdateTaskOptions{Status: client.Disabled}) + if err != nil { + return err + } + _, err = s.cli.UpdateTask(l, client.UpdateTaskOptions{Status: client.Enabled}) + if err != nil { + return err + } + } + + s.mu.Lock() + defer s.mu.Unlock() + if err := s.items.Set(newTaskItem(id)); err != nil { + return err + } + s.tasks[id] = true + + return nil +} + +func (s *Service) loadHandlers() error { + files, err := s.handlerFiles() + if err != nil { + return err + } + + for _, f := range files { + s.diag.Loading("handler", f) + if err := s.loadHandler(f); err != nil { + return fmt.Errorf("failed to load file %s: %s", f, err.Error()) + } + } + return nil +} + +func (s *Service) loadHandler(f string) error { + file, err := os.Open(f) + defer file.Close() + if err != nil { + return fmt.Errorf("failed to open file %v: %v", f, err) + } + + data, err := ioutil.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file %v: %v", f, err) + } + + var o client.TopicHandlerOptions + switch ext := path.Ext(f); ext { + case ".yaml", ".yml": + if err := yaml.Unmarshal(data, &o); err != nil { + return errors.Wrapf(err, "failed to unmarshal yaml task vars file %q", f) + } + case ".json": + if err := json.Unmarshal(data, &o); err != nil { + return errors.Wrapf(err, "failed to unmarshal json task vars file %q", f) + } + default: + return errors.New("bad file extension. Must be YAML or JSON") + } + + l := s.cli.TopicHandlerLink(o.Topic, o.ID) + handler, _ := s.cli.TopicHandler(l) + if handler.ID == "" { + _, err := s.cli.CreateTopicHandler(s.cli.TopicHandlersLink(o.Topic), o) + if err != nil { + return err + } + } else { + _, err := s.cli.ReplaceTopicHandler(l, o) + if err != nil { + return err + } + } + s.mu.Lock() + defer s.mu.Unlock() + s.handlers[path.Join(o.Topic, o.ID)] = true + if err := s.items.Set(newTopicHandlerItem(o.Topic, o.ID)); err != nil { + return err + } + + return nil +} + +func (s *Service) removeMissing() error { + + if err := s.removeTasks(); err != nil { + return err + } + + if err := s.removeTemplates(); err != nil { + return err + } + + if err := s.removeHandlers(); err != nil { + return err + } + + return nil +} + +func (s *Service) removeTasks() error { + s.mu.Lock() + defer s.mu.Unlock() + + loadedTasks, err := s.loadedTasks() + if err != nil { + return err + } + + for _, id := range diff(s.tasks, loadedTasks) { + l := s.cli.TaskLink(id) + if err := s.cli.DeleteTask(l); err != nil { + return err + } + } + + return nil +} + +func (s *Service) removeTemplates() error { + s.mu.Lock() + defer s.mu.Unlock() + + loadedTemplates, err := s.loadedTemplates() + if err != nil { + return err + } + for _, id := range diff(s.templates, loadedTemplates) { + l := s.cli.TemplateLink(id) + if err := s.cli.DeleteTemplate(l); err != nil { + return err + } + } + return nil +} + +func (s *Service) removeHandlers() error { + s.mu.Lock() + defer s.mu.Unlock() + + loadedHandlers, err := s.loadedHandlers() + if err != nil { + return err + } + for _, id := range diff(s.handlers, loadedHandlers) { + pair := strings.Split(id, "/") + if len(pair) != 2 { + return errors.New("expected id to be topicID/handlerID") + } + l := s.cli.TopicHandlerLink(pair[0], pair[1]) + if err := s.cli.DeleteTopicHandler(l); err != nil { + return err + } + } + return nil +} + +func diff(m map[string]bool, xs []string) []string { + diffs := []string{} + + for _, x := range xs { + if m[x] { + continue + } + diffs = append(diffs, x) + } + + return diffs +} + +func (s *Service) loadedTasks() ([]string, error) { + items, err := s.items.List(tasksStr) + if err != nil { + return nil, err + } + tasks := []string{} + for _, item := range items { + tasks = append(tasks, filepath.Base(item.ID)) + } + + return tasks, nil +} + +func (s *Service) loadedTemplates() ([]string, error) { + items, err := s.items.List(templatesStr) + if err != nil { + return nil, err + } + templates := []string{} + for _, item := range items { + templates = append(templates, filepath.Base(item.ID)) + } + + return templates, nil +} + +func (s *Service) loadedHandlers() ([]string, error) { + items, err := s.items.List(handlersStr) + if err != nil { + return nil, err + } + handlers := []string{} + for _, item := range items { + handlers = append(handlers, strings.TrimLeft(item.ID, "handlers/")) + } + + return handlers, nil +} diff --git a/services/task_store/service.go b/services/task_store/service.go index 8e4dce967..4c39437fc 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -730,9 +730,6 @@ func (ts *Service) handleCreateTask(w http.ResponseWriter, r *http.Request) { newTask.Type = StreamTask case client.BatchTask: newTask.Type = BatchTask - default: - httpd.HttpError(w, fmt.Sprintf("unknown type %q", task.Type), true, http.StatusBadRequest) - return } // Set tick script @@ -751,10 +748,6 @@ func (ts *Service) handleCreateTask(w http.ResponseWriter, r *http.Request) { RetentionPolicy: dbrp.RetentionPolicy, } } - if len(newTask.DBRPs) == 0 { - httpd.HttpError(w, fmt.Sprintf("must provide at least one database and retention policy."), true, http.StatusBadRequest) - return - } // Set status switch task.Status { @@ -772,6 +765,23 @@ func (ts *Service) handleCreateTask(w http.ResponseWriter, r *http.Request) { httpd.HttpError(w, err.Error(), true, http.StatusBadRequest) return } + // Check for parity between tickscript and dbrp + + pn, err := newProgramNodeFromTickscript(newTask.TICKscript) + if err != nil { + httpd.HttpError(w, err.Error(), true, http.StatusBadRequest) + return + } + + switch tt := taskTypeFromProgram(pn); tt { + case client.StreamTask: + newTask.Type = StreamTask + case client.BatchTask: + newTask.Type = BatchTask + default: + httpd.HttpError(w, fmt.Sprintf("invalid task type: %v", tt), true, http.StatusBadRequest) + return + } // Validate task _, err = ts.newKapacitorTask(newTask) @@ -787,6 +797,28 @@ func (ts *Service) handleCreateTask(w http.ResponseWriter, r *http.Request) { newTask.LastEnabled = now } + dbrps := []DBRP{} + for _, dbrp := range dbrpsFromProgram(pn) { + dbrps = append(dbrps, DBRP{ + Database: dbrp.Database, + RetentionPolicy: dbrp.RetentionPolicy, + }) + } + + if len(dbrps) == 0 && len(newTask.DBRPs) == 0 { + httpd.HttpError(w, "must specify dbrp", true, http.StatusBadRequest) + return + } + + if len(dbrps) > 0 && len(newTask.DBRPs) > 0 { + httpd.HttpError(w, "cannot specify dbrp in both implicitly and explicitly", true, http.StatusBadRequest) + return + } + + if len(dbrps) != 0 { + newTask.DBRPs = dbrps + } + // Save task err = ts.tasks.Create(newTask) if err != nil { @@ -879,14 +911,48 @@ func (ts *Service) handleUpdateTask(w http.ResponseWriter, r *http.Request) { updated.Type = BatchTask } + oldPn, err := newProgramNodeFromTickscript(updated.TICKscript) + if err != nil { + httpd.HttpError(w, err.Error(), true, http.StatusBadRequest) + return + } + // Set tick script if task.TICKscript != "" { updated.TICKscript = task.TICKscript + + newPn, err := newProgramNodeFromTickscript(updated.TICKscript) + if err != nil { + httpd.HttpError(w, err.Error(), true, http.StatusBadRequest) + return + } + + if len(dbrpsFromProgram(oldPn)) > 0 && len(dbrpsFromProgram(newPn)) == 0 && len(task.DBRPs) == 0 { + httpd.HttpError(w, "must specify dbrp", true, http.StatusBadRequest) + return + } } } - // Set dbrps - if len(task.DBRPs) > 0 { + pn, err := newProgramNodeFromTickscript(updated.TICKscript) + if err != nil { + httpd.HttpError(w, err.Error(), true, http.StatusBadRequest) + return + } + + if dbrps := dbrpsFromProgram(pn); len(dbrps) > 0 && len(task.DBRPs) > 0 { + httpd.HttpError(w, "cannot specify dbrp in implicitly and explicitly", true, http.StatusBadRequest) + return + } else if len(dbrps) > 0 { + // make consistent + updated.DBRPs = []DBRP{} + for _, dbrp := range dbrpsFromProgram(pn) { + updated.DBRPs = append(updated.DBRPs, DBRP{ + Database: dbrp.Database, + RetentionPolicy: dbrp.RetentionPolicy, + }) + } + } else if len(task.DBRPs) > 0 { updated.DBRPs = make([]DBRP, len(task.DBRPs)) for i, dbrp := range task.DBRPs { updated.DBRPs[i] = DBRP{ @@ -915,6 +981,17 @@ func (ts *Service) handleUpdateTask(w http.ResponseWriter, r *http.Request) { } } + // set task type from tickscript + switch tt := taskTypeFromProgram(pn); tt { + case client.StreamTask: + updated.Type = StreamTask + case client.BatchTask: + updated.Type = BatchTask + default: + httpd.HttpError(w, fmt.Sprintf("invalid task type: %v", tt), true, http.StatusBadRequest) + return + } + // Validate task _, err = ts.newKapacitorTask(updated) if err != nil { @@ -927,6 +1004,7 @@ func (ts *Service) handleUpdateTask(w http.ResponseWriter, r *http.Request) { if statusChanged && updated.Status == Enabled { updated.LastEnabled = now } + if original.ID != updated.ID { // Task ID changed delete and re-create. if err := ts.tasks.Create(updated); err != nil { @@ -1599,14 +1677,20 @@ func (ts *Service) handleCreateTemplate(w http.ResponseWriter, r *http.Request) return } - // Set template type - switch template.Type { + pn, err := newProgramNodeFromTickscript(template.TICKscript) + if err != nil { + httpd.HttpError(w, err.Error(), true, http.StatusBadRequest) + return + } + + // set task type from tickscript + switch tt := taskTypeFromProgram(pn); tt { case client.StreamTask: newTemplate.Type = StreamTask case client.BatchTask: newTemplate.Type = BatchTask default: - httpd.HttpError(w, fmt.Sprintf("unknown type %q", template.Type), true, http.StatusBadRequest) + httpd.HttpError(w, fmt.Sprintf("invalid task type: %v", tt), true, http.StatusBadRequest) return } @@ -1741,6 +1825,16 @@ func (ts *Service) handleUpdateTemplate(w http.ResponseWriter, r *http.Request) // Rollsback all updated tasks if an error occurs. func (ts *Service) updateAllAssociatedTasks(old, new Template, taskIds []string) error { var i int + oldPn, err := newProgramNodeFromTickscript(old.TICKscript) + if err != nil { + return fmt.Errorf("failed to parse old tickscript: %v", err) + } + + newPn, err := newProgramNodeFromTickscript(new.TICKscript) + if err != nil { + return fmt.Errorf("failed to parse new tickscript: %v", err) + } + // Setup rollback function defer func() { if i == len(taskIds) { @@ -1760,6 +1854,15 @@ func (ts *Service) updateAllAssociatedTasks(old, new Template, taskIds []string) task.TemplateID = old.ID task.TICKscript = old.TICKscript task.Type = old.Type + if len(dbrpsFromProgram(oldPn)) > 0 { + task.DBRPs = []DBRP{} + for _, dbrp := range dbrpsFromProgram(oldPn) { + task.DBRPs = append(task.DBRPs, DBRP{ + Database: dbrp.Database, + RetentionPolicy: dbrp.RetentionPolicy, + }) + } + } if err := ts.tasks.Replace(task); err != nil { ts.diag.Error("error rolling back associated task", err, keyvalue.KV("task", taskId)) } @@ -1772,6 +1875,7 @@ func (ts *Service) updateAllAssociatedTasks(old, new Template, taskIds []string) } } }() + for ; i < len(taskIds); i++ { taskId := taskIds[i] task, err := ts.tasks.Get(taskId) @@ -1788,9 +1892,23 @@ func (ts *Service) updateAllAssociatedTasks(old, new Template, taskIds []string) return fmt.Errorf("error updating task association %s: %s", taskId, err) } } + task.TemplateID = new.ID task.TICKscript = new.TICKscript task.Type = new.Type + + if len(dbrpsFromProgram(oldPn)) > 0 || len(dbrpsFromProgram(newPn)) > 0 { + + task.DBRPs = []DBRP{} + for _, dbrp := range dbrpsFromProgram(newPn) { + task.DBRPs = append(task.DBRPs, DBRP{ + Database: dbrp.Database, + RetentionPolicy: dbrp.RetentionPolicy, + }) + + } + } + if err := ts.tasks.Replace(task); err != nil { return fmt.Errorf("error updating associated task %s: %s", taskId, err) } @@ -1802,6 +1920,7 @@ func (ts *Service) updateAllAssociatedTasks(old, new Template, taskIds []string) } } } + return nil } diff --git a/services/task_store/util.go b/services/task_store/util.go new file mode 100644 index 000000000..fd746bd7f --- /dev/null +++ b/services/task_store/util.go @@ -0,0 +1,102 @@ +package task_store + +import ( + "errors" + "fmt" + + client "github.com/influxdata/kapacitor/client/v1" + "github.com/influxdata/kapacitor/tick/ast" +) + +func newProgramNodeFromTickscript(tickscript string) (*ast.ProgramNode, error) { + p, err := ast.Parse(tickscript) + + if err != nil { + return nil, fmt.Errorf("invalid TICKscript: %v", err) + } + + pn, ok := p.(*ast.ProgramNode) + // This should never happen + if !ok { + return nil, errors.New("invalid TICKscript") + } + + return pn, nil +} + +func dbrpsFromProgram(n *ast.ProgramNode) []client.DBRP { + dbrps := []client.DBRP{} + for _, nn := range n.Nodes { + switch nn.(type) { + case *ast.DBRPNode: + dbrpn := nn.(*ast.DBRPNode) + dbrpc := client.DBRP{ + Database: dbrpn.DB.Reference, + RetentionPolicy: dbrpn.RP.Reference, + } + dbrps = append(dbrps, dbrpc) + default: + continue + } + } + + return dbrps +} + +func taskTypeFromProgram(n *ast.ProgramNode) client.TaskType { + tts := []string{} + for _, nn := range n.Nodes { + switch nn.(type) { + case *ast.DeclarationNode: + if cn, ok := nn.(*ast.DeclarationNode).Right.(*ast.ChainNode); ok { + var n = cn.Left + DeclLoop: + for { + switch n.(type) { + case *ast.ChainNode: + n = n.(*ast.ChainNode).Left + case *ast.IdentifierNode: + if ident := n.(*ast.IdentifierNode).Ident; ident == "batch" || ident == "stream" { + tts = append(tts, ident) + } + break DeclLoop + } + } + } + case *ast.ChainNode: + var n = nn.(*ast.ChainNode).Left + ChainLoop: + for { + switch n.(type) { + case *ast.ChainNode: + n = n.(*ast.ChainNode).Left + case *ast.IdentifierNode: + if ident := n.(*ast.IdentifierNode).Ident; ident == "batch" || ident == "stream" { + tts = append(tts, ident) + } + break ChainLoop + } + } + } + } + + if len(tts) == 0 { + return client.InvalidTask + } + + t := tts[0] + for _, tt := range tts[1:] { + if t != tt { + return client.InvalidTask + } + } + + switch t { + case "batch": + return client.BatchTask + case "stream": + return client.StreamTask + } + + return client.InvalidTask +} diff --git a/services/task_store/util_test.go b/services/task_store/util_test.go new file mode 100644 index 000000000..b903a04b9 --- /dev/null +++ b/services/task_store/util_test.go @@ -0,0 +1,134 @@ +package task_store + +import ( + "reflect" + "testing" + + client "github.com/influxdata/kapacitor/client/v1" +) + +func TestDBRPsFromProgram(t *testing.T) { + type testCase struct { + name string + tickscript string + dbrps []client.DBRP + } + + tt := []testCase{ + { + name: "one dbrp", + tickscript: `dbrp "telegraf"."autogen" + + stream|from().measurement('m') + `, + dbrps: []client.DBRP{ + { + Database: "telegraf", + RetentionPolicy: "auotgen", + }, + }, + }, + { + name: "two dbrp", + tickscript: `dbrp "telegraf"."autogen" + dbrp "telegraf"."not_autogen" + + stream|from().measurement('m') + `, + dbrps: []client.DBRP{ + { + Database: "telegraf", + RetentionPolicy: "auotgen", + }, + { + Database: "telegraf", + RetentionPolicy: "not_autogen", + }, + }, + }, + } + + for _, tst := range tt { + t.Run(tst.name, func(t *testing.T) { + pn, err := newProgramNodeFromTickscript(tst.tickscript) + if err != nil { + t.Fatalf("error parsing tickscript: %v", err) + } + if exp, got := tst.dbrps, dbrpsFromProgram(pn); reflect.DeepEqual(exp, got) { + t.Fatalf("DBRPs do not match:\nexp: %v,\ngot %v", exp, got) + } + }) + } +} + +func TestTaskTypeFromProgram(t *testing.T) { + type testCase struct { + name string + tickscript string + taskType client.TaskType + } + + tt := []testCase{ + { + name: "basic stream", + tickscript: `dbrp "telegraf"."autogen" + + stream|from().measurement('m') + `, + taskType: client.StreamTask, + }, + { + name: "basic batch", + tickscript: `dbrp "telegraf"."autogen" + + batch|query('SELECT * FROM "telegraf"."autogen"."mymeas"') + `, + taskType: client.BatchTask, + }, + { + name: "var stream", + tickscript: `dbrp "telegraf"."autogen" + + var x = stream|from().measurement('m') + `, + taskType: client.StreamTask, + }, + { + name: "var batch", + tickscript: `dbrp "telegraf"."autogen" + + var x = batch|query('SELECT * FROM "telegraf"."autogen"."mymeas"') + `, + taskType: client.BatchTask, + }, + { + name: "mixed type", + tickscript: `dbrp "telegraf"."autogen" + + var x = batch|query('SELECT * FROM "telegraf"."autogen"."mymeas"') + var y = stream|from().measurement('m') + `, + taskType: client.InvalidTask, + }, + { + name: "missing batch or stream", + tickscript: `dbrp "telegraf"."autogen" + + var x = testing|query('SELECT * FROM "telegraf"."autogen"."mymeas"') + `, + taskType: client.InvalidTask, + }, + } + + for _, tst := range tt { + t.Run(tst.name, func(t *testing.T) { + pn, err := newProgramNodeFromTickscript(tst.tickscript) + if err != nil { + t.Fatalf("error parsing tickscript: %v", err) + } + if exp, got := tst.taskType, taskTypeFromProgram(pn); exp != got { + t.Fatalf("TaskTypes do not match:\nexp: %v,\ngot %v", exp, got) + } + }) + } +} diff --git a/task.go b/task.go index 426c9909b..08b299a00 100644 --- a/task.go +++ b/task.go @@ -25,6 +25,7 @@ type TaskType int const ( StreamTask TaskType = iota BatchTask + InvalidTask ) func (t TaskType) String() string { diff --git a/task_master.go b/task_master.go index eff1b668f..86ac1948c 100644 --- a/task_master.go +++ b/task_master.go @@ -450,6 +450,9 @@ func (tm *TaskMaster) StartTask(t *Task) (*ExecutingTask, error) { if tm.closed { return nil, errors.New("task master is closed cannot start a task") } + if len(t.DBRPs) == 0 { + return nil, errors.New("task does contain any dbrps") + } tm.diag.StartingTask(t.ID) et, err := NewExecutingTask(tm, t) if err != nil { diff --git a/tick/ast/lex.go b/tick/ast/lex.go index ba90e396c..b942bd4cf 100644 --- a/tick/ast/lex.go +++ b/tick/ast/lex.go @@ -17,6 +17,7 @@ const ( TokenError TokenType = iota TokenEOF TokenVar + TokenDBRP TokenAsgn TokenDot TokenPipe @@ -109,6 +110,7 @@ const ( KW_True = "TRUE" KW_False = "FALSE" KW_Var = "var" + KW_DBRP = "dbrp" KW_Lambda = "lambda" ) @@ -118,6 +120,7 @@ var keywords = map[string]TokenType{ KW_True: TokenTrue, KW_False: TokenFalse, KW_Var: TokenVar, + KW_DBRP: TokenDBRP, KW_Lambda: TokenLambda, } @@ -137,6 +140,8 @@ func (t TokenType) String() string { return "EOF" case t == TokenVar: return "var" + case t == TokenDBRP: + return "dbrp" case t == TokenIdent: return "identifier" case t == TokenReference: diff --git a/tick/ast/node.go b/tick/ast/node.go index a376f9e9f..ca974ab97 100644 --- a/tick/ast/node.go +++ b/tick/ast/node.go @@ -362,6 +362,48 @@ func (n *BinaryNode) Equal(o interface{}) bool { return false } +type DBRPNode struct { + position + Comment *CommentNode + DB *ReferenceNode + RP *ReferenceNode +} + +func newDBRP(p position, db, rp *ReferenceNode, c *CommentNode) *DBRPNode { + return &DBRPNode{ + position: p, + DB: db, + RP: rp, + Comment: c, + } +} + +func (d *DBRPNode) DBRP() string { + return "\"" + d.DB.Reference + "\"" + "." + "\"" + d.RP.Reference + "\"" +} + +func (d *DBRPNode) Equal(o interface{}) bool { + if on, ok := o.(*DBRPNode); ok { + return d.DB.Equal(on.DB) && d.RP.Equal(on.RP) + } + + return false +} + +func (s *DBRPNode) Format(buf *bytes.Buffer, indent string, onNewLine bool) { + if s.Comment != nil { + s.Comment.Format(buf, indent, onNewLine) + } + buf.WriteString(indent) + buf.WriteString(TokenDBRP.String()) + buf.WriteByte(' ') + buf.WriteString(s.DBRP()) +} + +func (n *DBRPNode) String() string { + return fmt.Sprintf("DBRPNode@%v{%v %v}%v", n.position, n.DB, n.RP, n.Comment) +} + type DeclarationNode struct { position Left *IdentifierNode diff --git a/tick/ast/parser.go b/tick/ast/parser.go index 09e1ec49c..f939ee187 100644 --- a/tick/ast/parser.go +++ b/tick/ast/parser.go @@ -237,11 +237,24 @@ func (p *parser) statement() Node { switch t := p.peek().typ; t { case TokenVar: return p.declaration() + case TokenDBRP: + return p.dbrp() default: return p.expression() } } +//parse a dbrp statement +func (p *parser) dbrp() Node { + dbrpTok := p.expect(TokenDBRP) + dbrpC := p.consumeComment() + db := p.reference() + _ = p.expect(TokenDot) + rp := p.reference() + + return newDBRP(p.position(dbrpTok.pos), db.(*ReferenceNode), rp.(*ReferenceNode), dbrpC) +} + //parse a declaration statement func (p *parser) declaration() Node { varTok := p.expect(TokenVar) diff --git a/tick/ast/parser_test.go b/tick/ast/parser_test.go index 5847af726..f26bca070 100644 --- a/tick/ast/parser_test.go +++ b/tick/ast/parser_test.go @@ -127,6 +127,41 @@ func TestParseStatements(t *testing.T) { Root Node err error }{ + { + script: `dbrp "telegraf"."autogen"`, + Root: &ProgramNode{ + position: position{ + pos: 0, + line: 1, + char: 1, + }, + Nodes: []Node{ + &DBRPNode{ + position: position{ + pos: 0, + line: 1, + char: 1, + }, + DB: &ReferenceNode{ + position: position{ + pos: 5, + line: 1, + char: 6, + }, + Reference: "telegraf", + }, + RP: &ReferenceNode{ + position: position{ + pos: 16, + line: 1, + char: 17, + }, + Reference: "autogen", + }, + }, + }, + }, + }, { script: `var x int`, Root: &ProgramNode{ diff --git a/usr/share/bash-completion/completions/kapacitor b/usr/share/bash-completion/completions/kapacitor index e713c0165..2b52d226b 100644 --- a/usr/share/bash-completion/completions/kapacitor +++ b/usr/share/bash-completion/completions/kapacitor @@ -84,12 +84,7 @@ _kapacitor() fi ;; define-topic-handler) - if [[ -z "${COMP_WORDS[2]}" || ("$cur" = "${COMP_WORDS[2]}" && -z "${COMP_WORDS[3]}") ]] - then - words=$(_kapacitor_list topics "$cur") - else - _kapacitor_json_yaml_files "${cur}" - fi + _kapacitor_json_yaml_files "${cur}" ;; replay) case "$prev" in