Skip to content

Commit

Permalink
Merge pull request #1481 from influxdata/md-issue#1313
Browse files Browse the repository at this point in the history
Add ability to load tasks/handlers from dir
  • Loading branch information
desa authored Oct 9, 2017
2 parents eacb373 + 1de435d commit 0a8abec
Show file tree
Hide file tree
Showing 47 changed files with 2,644 additions and 75 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost
- [#1535](https://github.com/influxdata/kapacitor/pull/1535): Add logfmt support and refactor logging.
- [#1481](https://github.com/influxdata/kapacitor/pull/1481): Add ability to load tasks/handlers from dir.
TICKscript was extended to be able to describe a task exclusively through a tickscript.
* tasks no longer need to specify their TaskType (Batch, Stream).
* `dbrp` expressions were added to tickscript.
Topic-Handler file format was modified to include the TopicID and HandlerID in the file.
Load service was added; the service can load tasks/handlers from a directory.

### Bugfixes

Expand Down
113 changes: 91 additions & 22 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"path"
"strconv"
Expand Down Expand Up @@ -73,6 +74,10 @@ type Config struct {

// Optional credentials for authenticating with the server.
Credentials *Credentials

// Optional Transport https://golang.org/pkg/net/http/#RoundTripper
// If nil the default transport will be used
Transport http.RoundTripper
}

// AuthenticationMethod defines the type of authentication used.
Expand Down Expand Up @@ -118,6 +123,23 @@ func (c Credentials) Validate() error {
return nil
}

type localTransport struct {
h http.Handler
}

func (l *localTransport) RoundTrip(r *http.Request) (*http.Response, error) {
w := httptest.NewRecorder()
l.h.ServeHTTP(w, r)

return w.Result(), nil
}

func NewLocalTransport(h http.Handler) http.RoundTripper {
return &localTransport{
h: h,
}
}

// Basic HTTP client
type Client struct {
url *url.URL
Expand Down Expand Up @@ -148,21 +170,28 @@ func New(conf Config) (*Client, error) {
}
}

tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
rt := conf.Transport
var tr *http.Transport

if rt == nil {
tr = &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}

rt = tr
}
return &Client{
url: u,
userAgent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
Transport: rt,
},
credentials: conf.Credentials,
}, nil
Expand Down Expand Up @@ -203,8 +232,9 @@ type ExecutionStats struct {
type TaskType int

const (
StreamTask TaskType = 1
BatchTask TaskType = 2
InvalidTask TaskType = 0
StreamTask TaskType = 1
BatchTask TaskType = 2
)

func (tt TaskType) MarshalText() ([]byte, error) {
Expand All @@ -213,6 +243,8 @@ func (tt TaskType) MarshalText() ([]byte, error) {
return []byte("stream"), nil
case BatchTask:
return []byte("batch"), nil
case InvalidTask:
return []byte("invalid"), nil
default:
return nil, fmt.Errorf("unknown TaskType %d", tt)
}
Expand All @@ -224,6 +256,8 @@ func (tt *TaskType) UnmarshalText(text []byte) error {
*tt = StreamTask
case "batch":
*tt = BatchTask
case "invalid":
*tt = InvalidTask
default:
return fmt.Errorf("unknown TaskType %s", s)
}
Expand Down Expand Up @@ -513,9 +547,9 @@ func (vs *Vars) UnmarshalJSON(b []byte) error {
}

type Var struct {
Type VarType `json:"type"`
Value interface{} `json:"value"`
Description string `json:"description"`
Type VarType `json:"type" yaml:"type"`
Value interface{} `json:"value" yaml:"value"`
Description string `json:"description" yaml:"description"`
}

// A Task plus its read-only attributes.
Expand Down Expand Up @@ -725,13 +759,13 @@ func (c *Client) StorageLink(name string) Link {
}

type CreateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Create a new task.
Expand Down Expand Up @@ -759,13 +793,13 @@ func (c *Client) CreateTask(opt CreateTaskOptions) (Task, error) {
}

type UpdateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Update an existing task.
Expand Down Expand Up @@ -1963,6 +1997,7 @@ func (c *Client) TopicHandler(link Link) (TopicHandler, error) {
}

type TopicHandlerOptions struct {
Topic string `json:"topic" yaml:"topic"`
ID string `json:"id" yaml:"id"`
Kind string `json:"kind" yaml:"kind"`
Options map[string]interface{} `json:"options" yaml:"options"`
Expand Down Expand Up @@ -2282,3 +2317,37 @@ func (d *Duration) UnmarshalText(data []byte) error {
*d = Duration(dur)
return nil
}

type DBRPs []DBRP

func (d *DBRPs) String() string {
return fmt.Sprint(*d)
}

type TaskVars struct {
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

func (t TaskVars) CreateTaskOptions() (CreateTaskOptions, error) {
o := CreateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
DBRPs: t.DBRPs,
}

return o, nil
}

func (t TaskVars) UpdateTaskOptions() (UpdateTaskOptions, error) {
o := UpdateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
DBRPs: t.DBRPs,
}
return o, nil
}
108 changes: 81 additions & 27 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ var (
dtype = defineFlags.String("type", "", "The task type (stream|batch)")
dtemplate = defineFlags.String("template", "", "Optional template ID")
dvars = defineFlags.String("vars", "", "Optional path to a JSON vars file")
dfile = defineFlags.String("file", "", "Optional path to a YAML or JSON template task file")
dnoReload = defineFlags.Bool("no-reload", false, "Do not reload the task even if it is enabled")
ddbrp = make(dbrps, 0)
)
Expand Down Expand Up @@ -687,33 +688,89 @@ func doDefine(args []string) error {
}
}

fileVars := client.TaskVars{}
if *dfile != "" {
f, err := os.Open(*dfile)
if err != nil {
return errors.Wrapf(err, "failed to open file %s", *dfile)
}
data, err := ioutil.ReadAll(f)
if err != nil {
return errors.Wrapf(err, "failed to read task vars file %q", *dfile)
}
defer f.Close()
switch ext := path.Ext(*dfile); ext {
case ".yaml", ".yml":
if err := yaml.Unmarshal(data, &fileVars); err != nil {
return errors.Wrapf(err, "failed to unmarshal yaml task vars file %q", *dfile)
}
case ".json":
if err := json.Unmarshal(data, &fileVars); err != nil {
return errors.Wrapf(err, "failed to unmarshal json task vars file %q", *dfile)
}
default:
return errors.New("bad file extension. Must be YAML or JSON")

}
}

l := cli.TaskLink(id)
task, _ := cli.Task(l, nil)
var err error
if task.ID == "" {
_, err = cli.CreateTask(client.CreateTaskOptions{
ID: id,
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
Status: client.Disabled,
})
if *dfile != "" {
o, err := fileVars.CreateTaskOptions()
if err != nil {
return err
}
_, err = cli.CreateTask(o)
if err != nil {
return err
}
} else {
o := client.CreateTaskOptions{
ID: id,
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
Status: client.Disabled,
}
_, err = cli.CreateTask(o)
if err != nil {
return err
}
}
} else {
_, err = cli.UpdateTask(
l,
client.UpdateTaskOptions{
if *dfile != "" {
o, err := fileVars.UpdateTaskOptions()
if err != nil {
return err
}
_, err = cli.UpdateTask(
l,
o,
)
if err != nil {
return err
}
} else {
o := client.UpdateTaskOptions{
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
},
)
}
if err != nil {
return err
}
_, err = cli.UpdateTask(
l,
o,
)
if err != nil {
return err
}
}
}

if !*dnoReload && task.Status == client.Enabled {
Expand Down Expand Up @@ -822,7 +879,7 @@ func doDefineTemplate(args []string) error {
}

func defineTopicHandlerUsage() {
var u = `Usage: kapacitor define-topic-handler <topic id> <handler id> <path to handler spec file>
var u = `Usage: kapacitor define-topic-handler <path to handler spec file>
Create or update a handler.
Expand All @@ -832,22 +889,20 @@ For example:
Define a handler using the slack.yaml file:
$ kapacitor define-handler system my_handler slack.yaml
$ kapacitor define-topic-handler slack.yaml
Options:
`
fmt.Fprintln(os.Stderr, u)
}

func doDefineTopicHandler(args []string) error {
if len(args) != 3 {
fmt.Fprintln(os.Stderr, "Must provide a topic ID, a handler ID and a path to a handler file.")
if len(args) != 1 {
fmt.Fprintln(os.Stderr, "Must provide a path to a handler file.")
defineTopicHandlerUsage()
os.Exit(2)
}
topic := args[0]
handlerID := args[1]
p := args[2]
p := args[0]
f, err := os.Open(p)
if err != nil {
return errors.Wrapf(err, "failed to open handler spec file %q", p)
Expand All @@ -870,12 +925,11 @@ func doDefineTopicHandler(args []string) error {
return errors.Wrapf(err, "failed to unmarshal json handler file %q", p)
}
}
ho.ID = handlerID

l := cli.TopicHandlerLink(topic, ho.ID)
l := cli.TopicHandlerLink(ho.Topic, ho.ID)
handler, _ := cli.TopicHandler(l)
if handler.ID == "" {
_, err = cli.CreateTopicHandler(cli.TopicHandlersLink(topic), ho)
_, err = cli.CreateTopicHandler(cli.TopicHandlersLink(ho.Topic), ho)
} else {
_, err = cli.ReplaceTopicHandler(l, ho)
}
Expand Down
Loading

0 comments on commit 0a8abec

Please sign in to comment.