Skip to content

Commit

Permalink
Add ability to load tasks/handlers from dir
Browse files Browse the repository at this point in the history
Add check for tickscript defined dbrps

Fix error that allowed you to set dbrps twice

Add template and task dbrp via tickscript

Remove dao test file

Fix issue with updating task without dbrp values

Add new template vars structure

Load tasks and templates from directory

Update define-topic-handler subcommand to 1 arg

Add loadHandlers method to load service

Add tests and prevent certain updates to templates

Update parser to define dbrp as reference.referenc

Fix issue where tasks weren't reloaded

Replace correct topic handler

Listen for SIGHUP to reload tasks/templates/handld

Move dbrps type to client package

Unexport internal functions

Address todo items

Make assorted changes suggested in PR

Add test for dbrp

Add tests for node task type and dbrps

Fix comment

Use ServeHTTP instead of making actual http req

Add soft/hard configuration option

Fix typos and remove TODO comments

Don't error if the directories are missing

Append load service

Move util methods into task_store package

Improve error messaging

Clean up kapacitor and kapacitord mains

Remove old TODO comment

Add error check in doDefine

Add load example directory

Address issues from PR

Add diagnostic to load service

Add batch examples and tests

Add reload method to server

Add dbrp type to kapacitor main package

Use explicit specification for dbrp in files

Set sane default for demo config

Skip load if directory doesn't exist

Unwrap error so that correct error type

Move templated tasks to task dir from templates

Remove commented out code

Remove hard error

Add error stat to load service

Delete tasks/templates/hanlders when appropriate

Delete tasks/template/handlers when appropraite

Cleanup example of load directory

Add Error to load diagnostic

Check for nil storage service

Clean-up load functions

Add test for load service

Add test for topic-handlers

Prevent server crash on error

Fix wording in README

Remove error from Reload function

Make additional changes to README

Crash on initial Load

Add more to readme

Add UserAgent and task string constants

Update define-topic-handler completion
  • Loading branch information
desa committed Oct 6, 2017
1 parent eacb373 commit 1de435d
Show file tree
Hide file tree
Showing 47 changed files with 2,644 additions and 75 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost
- [#1535](https://github.com/influxdata/kapacitor/pull/1535): Add logfmt support and refactor logging.
- [#1481](https://github.com/influxdata/kapacitor/pull/1481): Add ability to load tasks/handlers from dir.
TICKscript was extended to be able to describe a task exclusively through a tickscript.
* tasks no longer need to specify their TaskType (Batch, Stream).
* `dbrp` expressions were added to tickscript.
Topic-Handler file format was modified to include the TopicID and HandlerID in the file.
Load service was added; the service can load tasks/handlers from a directory.

### Bugfixes

Expand Down
113 changes: 91 additions & 22 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"path"
"strconv"
Expand Down Expand Up @@ -73,6 +74,10 @@ type Config struct {

// Optional credentials for authenticating with the server.
Credentials *Credentials

// Optional Transport https://golang.org/pkg/net/http/#RoundTripper
// If nil the default transport will be used
Transport http.RoundTripper
}

// AuthenticationMethod defines the type of authentication used.
Expand Down Expand Up @@ -118,6 +123,23 @@ func (c Credentials) Validate() error {
return nil
}

type localTransport struct {
h http.Handler
}

func (l *localTransport) RoundTrip(r *http.Request) (*http.Response, error) {
w := httptest.NewRecorder()
l.h.ServeHTTP(w, r)

return w.Result(), nil
}

func NewLocalTransport(h http.Handler) http.RoundTripper {
return &localTransport{
h: h,
}
}

// Basic HTTP client
type Client struct {
url *url.URL
Expand Down Expand Up @@ -148,21 +170,28 @@ func New(conf Config) (*Client, error) {
}
}

tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
rt := conf.Transport
var tr *http.Transport

if rt == nil {
tr = &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}

rt = tr
}
return &Client{
url: u,
userAgent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
Transport: rt,
},
credentials: conf.Credentials,
}, nil
Expand Down Expand Up @@ -203,8 +232,9 @@ type ExecutionStats struct {
type TaskType int

const (
StreamTask TaskType = 1
BatchTask TaskType = 2
InvalidTask TaskType = 0
StreamTask TaskType = 1
BatchTask TaskType = 2
)

func (tt TaskType) MarshalText() ([]byte, error) {
Expand All @@ -213,6 +243,8 @@ func (tt TaskType) MarshalText() ([]byte, error) {
return []byte("stream"), nil
case BatchTask:
return []byte("batch"), nil
case InvalidTask:
return []byte("invalid"), nil
default:
return nil, fmt.Errorf("unknown TaskType %d", tt)
}
Expand All @@ -224,6 +256,8 @@ func (tt *TaskType) UnmarshalText(text []byte) error {
*tt = StreamTask
case "batch":
*tt = BatchTask
case "invalid":
*tt = InvalidTask
default:
return fmt.Errorf("unknown TaskType %s", s)
}
Expand Down Expand Up @@ -513,9 +547,9 @@ func (vs *Vars) UnmarshalJSON(b []byte) error {
}

type Var struct {
Type VarType `json:"type"`
Value interface{} `json:"value"`
Description string `json:"description"`
Type VarType `json:"type" yaml:"type"`
Value interface{} `json:"value" yaml:"value"`
Description string `json:"description" yaml:"description"`
}

// A Task plus its read-only attributes.
Expand Down Expand Up @@ -725,13 +759,13 @@ func (c *Client) StorageLink(name string) Link {
}

type CreateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Create a new task.
Expand Down Expand Up @@ -759,13 +793,13 @@ func (c *Client) CreateTask(opt CreateTaskOptions) (Task, error) {
}

type UpdateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Update an existing task.
Expand Down Expand Up @@ -1963,6 +1997,7 @@ func (c *Client) TopicHandler(link Link) (TopicHandler, error) {
}

type TopicHandlerOptions struct {
Topic string `json:"topic" yaml:"topic"`
ID string `json:"id" yaml:"id"`
Kind string `json:"kind" yaml:"kind"`
Options map[string]interface{} `json:"options" yaml:"options"`
Expand Down Expand Up @@ -2282,3 +2317,37 @@ func (d *Duration) UnmarshalText(data []byte) error {
*d = Duration(dur)
return nil
}

type DBRPs []DBRP

func (d *DBRPs) String() string {
return fmt.Sprint(*d)
}

type TaskVars struct {
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

func (t TaskVars) CreateTaskOptions() (CreateTaskOptions, error) {
o := CreateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
DBRPs: t.DBRPs,
}

return o, nil
}

func (t TaskVars) UpdateTaskOptions() (UpdateTaskOptions, error) {
o := UpdateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
DBRPs: t.DBRPs,
}
return o, nil
}
108 changes: 81 additions & 27 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ var (
dtype = defineFlags.String("type", "", "The task type (stream|batch)")
dtemplate = defineFlags.String("template", "", "Optional template ID")
dvars = defineFlags.String("vars", "", "Optional path to a JSON vars file")
dfile = defineFlags.String("file", "", "Optional path to a YAML or JSON template task file")
dnoReload = defineFlags.Bool("no-reload", false, "Do not reload the task even if it is enabled")
ddbrp = make(dbrps, 0)
)
Expand Down Expand Up @@ -687,33 +688,89 @@ func doDefine(args []string) error {
}
}

fileVars := client.TaskVars{}
if *dfile != "" {
f, err := os.Open(*dfile)
if err != nil {
return errors.Wrapf(err, "failed to open file %s", *dfile)
}
data, err := ioutil.ReadAll(f)
if err != nil {
return errors.Wrapf(err, "failed to read task vars file %q", *dfile)
}
defer f.Close()
switch ext := path.Ext(*dfile); ext {
case ".yaml", ".yml":
if err := yaml.Unmarshal(data, &fileVars); err != nil {
return errors.Wrapf(err, "failed to unmarshal yaml task vars file %q", *dfile)
}
case ".json":
if err := json.Unmarshal(data, &fileVars); err != nil {
return errors.Wrapf(err, "failed to unmarshal json task vars file %q", *dfile)
}
default:
return errors.New("bad file extension. Must be YAML or JSON")

}
}

l := cli.TaskLink(id)
task, _ := cli.Task(l, nil)
var err error
if task.ID == "" {
_, err = cli.CreateTask(client.CreateTaskOptions{
ID: id,
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
Status: client.Disabled,
})
if *dfile != "" {
o, err := fileVars.CreateTaskOptions()
if err != nil {
return err
}
_, err = cli.CreateTask(o)
if err != nil {
return err
}
} else {
o := client.CreateTaskOptions{
ID: id,
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
Status: client.Disabled,
}
_, err = cli.CreateTask(o)
if err != nil {
return err
}
}
} else {
_, err = cli.UpdateTask(
l,
client.UpdateTaskOptions{
if *dfile != "" {
o, err := fileVars.UpdateTaskOptions()
if err != nil {
return err
}
_, err = cli.UpdateTask(
l,
o,
)
if err != nil {
return err
}
} else {
o := client.UpdateTaskOptions{
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
},
)
}
if err != nil {
return err
}
_, err = cli.UpdateTask(
l,
o,
)
if err != nil {
return err
}
}
}

if !*dnoReload && task.Status == client.Enabled {
Expand Down Expand Up @@ -822,7 +879,7 @@ func doDefineTemplate(args []string) error {
}

func defineTopicHandlerUsage() {
var u = `Usage: kapacitor define-topic-handler <topic id> <handler id> <path to handler spec file>
var u = `Usage: kapacitor define-topic-handler <path to handler spec file>
Create or update a handler.
Expand All @@ -832,22 +889,20 @@ For example:
Define a handler using the slack.yaml file:
$ kapacitor define-handler system my_handler slack.yaml
$ kapacitor define-topic-handler slack.yaml
Options:
`
fmt.Fprintln(os.Stderr, u)
}

func doDefineTopicHandler(args []string) error {
if len(args) != 3 {
fmt.Fprintln(os.Stderr, "Must provide a topic ID, a handler ID and a path to a handler file.")
if len(args) != 1 {
fmt.Fprintln(os.Stderr, "Must provide a path to a handler file.")
defineTopicHandlerUsage()
os.Exit(2)
}
topic := args[0]
handlerID := args[1]
p := args[2]
p := args[0]
f, err := os.Open(p)
if err != nil {
return errors.Wrapf(err, "failed to open handler spec file %q", p)
Expand All @@ -870,12 +925,11 @@ func doDefineTopicHandler(args []string) error {
return errors.Wrapf(err, "failed to unmarshal json handler file %q", p)
}
}
ho.ID = handlerID

l := cli.TopicHandlerLink(topic, ho.ID)
l := cli.TopicHandlerLink(ho.Topic, ho.ID)
handler, _ := cli.TopicHandler(l)
if handler.ID == "" {
_, err = cli.CreateTopicHandler(cli.TopicHandlersLink(topic), ho)
_, err = cli.CreateTopicHandler(cli.TopicHandlersLink(ho.Topic), ho)
} else {
_, err = cli.ReplaceTopicHandler(l, ho)
}
Expand Down
Loading

0 comments on commit 1de435d

Please sign in to comment.