diff --git a/client/v1/client.go b/client/v1/client.go index e63533ee5..2923a92ef 100644 --- a/client/v1/client.go +++ b/client/v1/client.go @@ -12,6 +12,7 @@ import ( "net/url" "path" "strconv" + "strings" "time" "github.com/influxdata/influxdb/influxql" @@ -523,9 +524,9 @@ func (vs *Vars) UnmarshalJSON(b []byte) error { } type Var struct { - Type VarType `json:"type"` - Value interface{} `json:"value"` - Description string `json:"description"` + Type VarType `json:"type" yaml:"type"` + Value interface{} `json:"value" yaml:"value"` + Description string `json:"description" yaml:"description"` } // A Task plus its read-only attributes. @@ -741,7 +742,7 @@ type CreateTaskOptions struct { DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"` TICKscript string `json:"script,omitempty"` Status TaskStatus `json:"status,omitempty"` - Vars Vars `json:"vars,omitempty" yamls:"vars"` + Vars Vars `json:"vars,omitempty" yaml:"vars"` } // Create a new task. @@ -775,7 +776,7 @@ type UpdateTaskOptions struct { DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"` TICKscript string `json:"script,omitempty"` Status TaskStatus `json:"status,omitempty"` - Vars Vars `json:"vars,omitempty" yamls:"vars"` + Vars Vars `json:"vars,omitempty" yaml:"vars"` } // Update an existing task. @@ -2292,3 +2293,107 @@ func (d *Duration) UnmarshalText(data []byte) error { *d = Duration(dur) return nil } + +// TODO: this is also used in kapacitor/main.go +type dbrps []DBRP + +func (d *dbrps) String() string { + return fmt.Sprint(*d) +} + +// Parse string of the form "db"."rp" where the quotes are optional but can include escaped quotes +// within the strings. +func (d *dbrps) Set(value string) error { + dbrp := DBRP{} + if len(value) == 0 { + return errors.New("dbrp cannot be empty") + } + var n int + if value[0] == '"' { + dbrp.Database, n = parseQuotedStr(value) + } else { + n = strings.IndexRune(value, '.') + if n == -1 { + return errors.New("does not contain a '.', it must be in the form \"dbname\".\"rpname\" where the quotes are optional.") + } + dbrp.Database = value[:n] + } + if value[n] != '.' { + return errors.New("dbrp must specify retention policy, do you have a missing or extra '.'?") + } + value = value[n+1:] + if value[0] == '"' { + dbrp.RetentionPolicy, _ = parseQuotedStr(value) + } else { + dbrp.RetentionPolicy = value + } + *d = append(*d, dbrp) + return nil +} + +// parseQuotedStr reads from txt starting with beginning quote until next unescaped quote returning the unescaped string and the number of bytes read. +func parseQuotedStr(txt string) (string, int) { + quote := txt[0] + // Unescape quotes + var buf bytes.Buffer + buf.Grow(len(txt)) + last := 1 + i := 1 + for ; i < len(txt)-1; i++ { + if txt[i] == '\\' && txt[i+1] == quote { + buf.Write([]byte(txt[last:i])) + buf.Write([]byte{quote}) + i += 2 + last = i + } else if txt[i] == quote { + break + } + } + buf.Write([]byte(txt[last:i])) + return buf.String(), i + 1 +} + +type TaskVars struct { + ID string `json:"id,omitempty" yaml:"id"` + TemplateID string `json:"template-id,omitempty" yaml:"template-id"` + DBRPs []string `json:"dbrps,omitempty" yaml:"dbrps"` + Vars Vars `json:"vars,omitempty" yaml:"vars"` +} + +func (t TaskVars) CreateTaskOptions() (CreateTaskOptions, error) { + ds := dbrps{} + o := CreateTaskOptions{ + ID: t.ID, + TemplateID: t.TemplateID, + Vars: t.Vars, + } + + for _, dbrp := range t.DBRPs { + if err := ds.Set(dbrp); err != nil { + return o, err + } + } + + o.DBRPs = ds + + return o, nil +} + +func (t TaskVars) UpdateTaskOptions() (UpdateTaskOptions, error) { + ds := dbrps{} + o := UpdateTaskOptions{ + ID: t.ID, + TemplateID: t.TemplateID, + Vars: t.Vars, + } + + for _, dbrp := range t.DBRPs { + if err := ds.Set(dbrp); err != nil { + return o, err + } + } + + o.DBRPs = ds + + return o, nil +} diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index f8a06d7ea..595a1b96f 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -688,7 +688,7 @@ func doDefine(args []string) error { } } - fileVars := TaskVars{} + fileVars := client.TaskVars{} if *dfile != "" { f, err := os.Open(*dfile) if err != nil { @@ -733,6 +733,7 @@ func doDefine(args []string) error { return err } } + fmt.Printf("%#v\n", o) _, err = cli.CreateTask(o) } else { o := client.UpdateTaskOptions{ @@ -2287,48 +2288,3 @@ func doBackup(args []string) error { } return nil } - -type TaskVars struct { - ID string `json:"id,omitempty" yaml:"id"` - TemplateID string `json:"template-id,omitempty" yaml:"template-id"` - DBRPs []string `json:"dbrps,omitempty" yaml:"dbrps"` - Vars client.Vars `json:"vars,omitempty" yamls:"vars"` -} - -func (t TaskVars) CreateTaskOptions() (client.CreateTaskOptions, error) { - ds := dbrps{} - o := client.CreateTaskOptions{ - ID: t.ID, - TemplateID: t.TemplateID, - Vars: t.Vars, - } - - for _, dbrp := range t.DBRPs { - if err := ds.Set(dbrp); err != nil { - return o, err - } - } - - o.DBRPs = ds - - return o, nil -} - -func (t TaskVars) UpdateTaskOptions() (client.UpdateTaskOptions, error) { - ds := dbrps{} - o := client.UpdateTaskOptions{ - ID: t.ID, - TemplateID: t.TemplateID, - Vars: t.Vars, - } - - for _, dbrp := range t.DBRPs { - if err := ds.Set(dbrp); err != nil { - return o, err - } - } - - o.DBRPs = ds - - return o, nil -} diff --git a/services/load/config.go b/services/load/config.go index 065d82295..c27d1c6d2 100644 --- a/services/load/config.go +++ b/services/load/config.go @@ -8,6 +8,7 @@ import ( ) const taskDir = "tasks" +const templateDir = "templates" const handlerDir = "handlers" type Config struct { @@ -46,6 +47,10 @@ func (c Config) Validate() error { return fmt.Errorf("directory %s must be contain subdirectory %s", c.Dir, taskDir) } + if !dirs[templateDir] { + return fmt.Errorf("directory %s must be contain subdirectory %s", c.Dir, templateDir) + } + if !dirs[handlerDir] { return fmt.Errorf("directory %s must be contain subdirectory %s", c.Dir, handlerDir) } @@ -59,6 +64,10 @@ func (c Config) TasksDir() string { return filepath.Join(c.Dir, taskDir) } +func (c Config) TemplatesDir() string { + return filepath.Join(c.Dir, templateDir) +} + func (c Config) HandlersDir() string { return filepath.Join(c.Dir, handlerDir) } diff --git a/services/load/service.go b/services/load/service.go index fbab8665e..b1a56cc0e 100644 --- a/services/load/service.go +++ b/services/load/service.go @@ -1,30 +1,51 @@ package load import ( + "encoding/json" + "fmt" "io/ioutil" "log" + "os" + "path" "path/filepath" + "strings" "sync" + + "github.com/ghodss/yaml" + + "github.com/influxdata/kapacitor/client/v1" + "github.com/pkg/errors" ) +var defaultURL = "http://localhost:9092" + type Service struct { mu sync.Mutex config Config + cli *client.Client logger *log.Logger } -func NewService(c Config, l *log.Logger) *Service { +func NewService(c Config, l *log.Logger) (*Service, error) { + cli, err := client.New(client.Config{ + URL: defaultURL, + }) + + if err != nil { + return nil, fmt.Errorf("failed to create client: %v", err) + } + return &Service{ config: c, logger: l, - } + cli: cli, + }, nil } // TaskFiles gets a slice of all files with the .tick file extension -// and any associated files with .json, .yml, and .yaml file extentions // in the configured task directory. -func (s *Service) TaskFiles() (tickscripts []string, tmplVars []string, err error) { +func (s *Service) TaskFiles() (tickscripts []string, err error) { s.mu.Lock() defer s.mu.Unlock() @@ -32,7 +53,7 @@ func (s *Service) TaskFiles() (tickscripts []string, tmplVars []string, err erro files, err := ioutil.ReadDir(tasksDir) if err != nil { - return nil, nil, err + return nil, err } for _, file := range files { @@ -44,8 +65,39 @@ func (s *Service) TaskFiles() (tickscripts []string, tmplVars []string, err erro switch ext := filepath.Ext(filename); ext { case ".tick": tickscripts = append(tickscripts, filepath.Join(tasksDir, filename)) + default: + continue + } + } + + return +} + +// TemplateFiles gets a slice of all files with the .tick file extension +// and any associated files with .json, .yml, and .yaml file extentions +// in the configured template directory. +func (s *Service) TemplateFiles() (tickscripts []string, tmplVars []string, err error) { + s.mu.Lock() + defer s.mu.Unlock() + + templatesDir := s.config.TemplatesDir() + + files, err := ioutil.ReadDir(templatesDir) + if err != nil { + return nil, nil, err + } + + for _, file := range files { + if file.IsDir() { + continue + } + + filename := file.Name() + switch ext := filepath.Ext(filename); ext { + case ".tick": + tickscripts = append(tickscripts, filepath.Join(templatesDir, filename)) case ".yml", ".json", ".yaml": - tmplVars = append(tmplVars, filepath.Join(tasksDir, filename)) + tmplVars = append(tmplVars, filepath.Join(templatesDir, filename)) default: continue } @@ -86,14 +138,185 @@ func (s *Service) HandlerFiles() ([]string, error) { } func (s *Service) Load() error { + err := s.loadTickscripts() + if err != nil { + return err + } + + err = s.loadTemplates() + if err != nil { + return err + } + return nil } func (s *Service) loadTickscripts() error { + files, err := s.TaskFiles() + if err != nil { + return fmt.Errorf("failed to load tickscripts: %v", err) + } + + for _, f := range files { + if err := s.loadTickscript(f); err != nil { + return err + } + } + return nil } -func (s *Service) loadTemplateVars() error { +func (s *Service) loadTickscript(f string) error { + file, err := os.Open(f) + if err != nil { + return fmt.Errorf("failed to open file %v: %v", f, err) + } + + data, err := ioutil.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file %v: %v", f, err) + } + + script := string(data) + id := strings.TrimSuffix(filepath.Base(file.Name()), ".tick") + + l := s.cli.TaskLink(id) + task, _ := s.cli.Task(l, nil) + if task.ID == "" { + o := client.CreateTaskOptions{ + ID: id, + TICKscript: script, + Status: client.Enabled, + } + + if _, err := s.cli.CreateTask(o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + } else { + o := client.UpdateTaskOptions{ + ID: id, + TICKscript: script, + Status: client.Enabled, + } + if _, err := s.cli.UpdateTask(l, o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + } + return nil +} + +func (s *Service) loadTemplates() error { + files, vars, err := s.TemplateFiles() + if err != nil { + return err + } + + for _, f := range files { + if err := s.loadTemplate(f); err != nil { + return err + } + } + + for _, v := range vars { + if err := s.loadVars(v); err != nil { + return err + } + } + return nil +} + +func (s *Service) loadTemplate(f string) error { + file, err := os.Open(f) + if err != nil { + return fmt.Errorf("failed to open file %v: %v", f, err) + } + + data, err := ioutil.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file %v: %v", f, err) + } + + script := string(data) + id := strings.TrimSuffix(filepath.Base(file.Name()), ".tick") + + l := s.cli.TemplateLink(id) + task, _ := s.cli.Template(l, nil) + if task.ID == "" { + o := client.CreateTemplateOptions{ + ID: id, + TICKscript: script, + } + + if _, err := s.cli.CreateTemplate(o); err != nil { + return fmt.Errorf("failed to create template: %v", err) + } + } else { + o := client.UpdateTemplateOptions{ + ID: id, + TICKscript: script, + } + if _, err := s.cli.UpdateTemplate(l, o); err != nil { + return fmt.Errorf("failed to create template: %v", err) + } + } + return nil +} + +func (s *Service) loadVars(f string) error { + file, err := os.Open(f) + defer file.Close() + if err != nil { + return fmt.Errorf("failed to open file %v: %v", f, err) + } + + data, err := ioutil.ReadAll(file) + if err != nil { + return fmt.Errorf("failed to read file %v: %v", f, err) + } + + id := strings.TrimSuffix(filepath.Base(file.Name()), filepath.Ext(file.Name())) + fileVars := client.TaskVars{} + switch ext := path.Ext(f); ext { + case ".yaml", ".yml": + if err := yaml.Unmarshal(data, &fileVars); err != nil { + return errors.Wrapf(err, "failed to unmarshal yaml task vars file %q", f) + } + case ".json": + if err := json.Unmarshal(data, &fileVars); err != nil { + return errors.Wrapf(err, "failed to unmarshal json task vars file %q", f) + } + default: + return errors.New("bad file extension. Must be YAML or JSON") + + } + + l := s.cli.TaskLink(id) + task, _ := s.cli.Task(l, nil) + if task.ID == "" { + var o client.CreateTaskOptions + o, err = fileVars.CreateTaskOptions() + if err != nil { + return fmt.Errorf("failed to initialize create task options: %v", err) + } + + o.ID = id + o.Status = client.Enabled + if _, err := s.cli.CreateTask(o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + } else { + var o client.UpdateTaskOptions + o, err := fileVars.UpdateTaskOptions() + if err != nil { + return fmt.Errorf("failed to initialize create task options: %v", err) + } + + o.ID = id + o.Status = client.Enabled + if _, err := s.cli.UpdateTask(l, o); err != nil { + return fmt.Errorf("failed to create task: %v", err) + } + } return nil }