Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to load tasks/handlers from dir #1481

Merged
merged 1 commit into from
Oct 9, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@
- [#1568](https://github.com/influxdata/kapacitor/issues/1568): Add support for custom HTTP Post bodies via a template system.
- [#1569](https://github.com/influxdata/kapacitor/issues/1569): Add support for add the HTTP status code as a field when using httpPost
- [#1535](https://github.com/influxdata/kapacitor/pull/1535): Add logfmt support and refactor logging.
- [#1481](https://github.com/influxdata/kapacitor/pull/1481): Add ability to load tasks/handlers from dir.
TICKscript was extended to be able to describe a task exclusively through a tickscript.
* tasks no longer need to specify their TaskType (Batch, Stream).
* `dbrp` expressions were added to tickscript.
Topic-Handler file format was modified to include the TopicID and HandlerID in the file.
Load service was added; the service can load tasks/handlers from a directory.

### Bugfixes

Expand Down
113 changes: 91 additions & 22 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"path"
"strconv"
Expand Down Expand Up @@ -73,6 +74,10 @@ type Config struct {

// Optional credentials for authenticating with the server.
Credentials *Credentials

// Optional Transport https://golang.org/pkg/net/http/#RoundTripper
// If nil the default transport will be used
Transport http.RoundTripper
}

// AuthenticationMethod defines the type of authentication used.
Expand Down Expand Up @@ -118,6 +123,23 @@ func (c Credentials) Validate() error {
return nil
}

type localTransport struct {
h http.Handler
}

func (l *localTransport) RoundTrip(r *http.Request) (*http.Response, error) {
w := httptest.NewRecorder()
l.h.ServeHTTP(w, r)

return w.Result(), nil
}

func NewLocalTransport(h http.Handler) http.RoundTripper {
return &localTransport{
h: h,
}
}

// Basic HTTP client
type Client struct {
url *url.URL
Expand Down Expand Up @@ -148,21 +170,28 @@ func New(conf Config) (*Client, error) {
}
}

tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
rt := conf.Transport
var tr *http.Transport

if rt == nil {
tr = &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}

rt = tr
}
return &Client{
url: u,
userAgent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
Transport: rt,
},
credentials: conf.Credentials,
}, nil
Expand Down Expand Up @@ -203,8 +232,9 @@ type ExecutionStats struct {
type TaskType int

const (
StreamTask TaskType = 1
BatchTask TaskType = 2
InvalidTask TaskType = 0
StreamTask TaskType = 1
BatchTask TaskType = 2
)

func (tt TaskType) MarshalText() ([]byte, error) {
Expand All @@ -213,6 +243,8 @@ func (tt TaskType) MarshalText() ([]byte, error) {
return []byte("stream"), nil
case BatchTask:
return []byte("batch"), nil
case InvalidTask:
return []byte("invalid"), nil
default:
return nil, fmt.Errorf("unknown TaskType %d", tt)
}
Expand All @@ -224,6 +256,8 @@ func (tt *TaskType) UnmarshalText(text []byte) error {
*tt = StreamTask
case "batch":
*tt = BatchTask
case "invalid":
*tt = InvalidTask
default:
return fmt.Errorf("unknown TaskType %s", s)
}
Expand Down Expand Up @@ -513,9 +547,9 @@ func (vs *Vars) UnmarshalJSON(b []byte) error {
}

type Var struct {
Type VarType `json:"type"`
Value interface{} `json:"value"`
Description string `json:"description"`
Type VarType `json:"type" yaml:"type"`
Value interface{} `json:"value" yaml:"value"`
Description string `json:"description" yaml:"description"`
}

// A Task plus its read-only attributes.
Expand Down Expand Up @@ -725,13 +759,13 @@ func (c *Client) StorageLink(name string) Link {
}

type CreateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Create a new task.
Expand Down Expand Up @@ -759,13 +793,13 @@ func (c *Client) CreateTask(opt CreateTaskOptions) (Task, error) {
}

type UpdateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Update an existing task.
Expand Down Expand Up @@ -1963,6 +1997,7 @@ func (c *Client) TopicHandler(link Link) (TopicHandler, error) {
}

type TopicHandlerOptions struct {
Topic string `json:"topic" yaml:"topic"`
ID string `json:"id" yaml:"id"`
Kind string `json:"kind" yaml:"kind"`
Options map[string]interface{} `json:"options" yaml:"options"`
Expand Down Expand Up @@ -2282,3 +2317,37 @@ func (d *Duration) UnmarshalText(data []byte) error {
*d = Duration(dur)
return nil
}

type DBRPs []DBRP

func (d *DBRPs) String() string {
return fmt.Sprint(*d)
}

type TaskVars struct {
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

func (t TaskVars) CreateTaskOptions() (CreateTaskOptions, error) {
o := CreateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
DBRPs: t.DBRPs,
}

return o, nil
}

func (t TaskVars) UpdateTaskOptions() (UpdateTaskOptions, error) {
o := UpdateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
DBRPs: t.DBRPs,
}
return o, nil
}
108 changes: 81 additions & 27 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ var (
dtype = defineFlags.String("type", "", "The task type (stream|batch)")
dtemplate = defineFlags.String("template", "", "Optional template ID")
dvars = defineFlags.String("vars", "", "Optional path to a JSON vars file")
dfile = defineFlags.String("file", "", "Optional path to a YAML or JSON template task file")
dnoReload = defineFlags.Bool("no-reload", false, "Do not reload the task even if it is enabled")
ddbrp = make(dbrps, 0)
)
Expand Down Expand Up @@ -687,33 +688,89 @@ func doDefine(args []string) error {
}
}

fileVars := client.TaskVars{}
if *dfile != "" {
f, err := os.Open(*dfile)
if err != nil {
return errors.Wrapf(err, "failed to open file %s", *dfile)
}
data, err := ioutil.ReadAll(f)
if err != nil {
return errors.Wrapf(err, "failed to read task vars file %q", *dfile)
}
defer f.Close()
switch ext := path.Ext(*dfile); ext {
case ".yaml", ".yml":
if err := yaml.Unmarshal(data, &fileVars); err != nil {
return errors.Wrapf(err, "failed to unmarshal yaml task vars file %q", *dfile)
}
case ".json":
if err := json.Unmarshal(data, &fileVars); err != nil {
return errors.Wrapf(err, "failed to unmarshal json task vars file %q", *dfile)
}
default:
return errors.New("bad file extension. Must be YAML or JSON")

}
}

l := cli.TaskLink(id)
task, _ := cli.Task(l, nil)
var err error
if task.ID == "" {
_, err = cli.CreateTask(client.CreateTaskOptions{
ID: id,
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
Status: client.Disabled,
})
if *dfile != "" {
o, err := fileVars.CreateTaskOptions()
if err != nil {
return err
}
_, err = cli.CreateTask(o)
if err != nil {
return err
}
} else {
o := client.CreateTaskOptions{
ID: id,
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
Status: client.Disabled,
}
_, err = cli.CreateTask(o)
if err != nil {
return err
}
}
} else {
_, err = cli.UpdateTask(
l,
client.UpdateTaskOptions{
if *dfile != "" {
o, err := fileVars.UpdateTaskOptions()
if err != nil {
return err
}
_, err = cli.UpdateTask(
l,
o,
)
if err != nil {
return err
}
} else {
o := client.UpdateTaskOptions{
TemplateID: *dtemplate,
Type: ttype,
DBRPs: ddbrp,
TICKscript: script,
Vars: vars,
},
)
}
if err != nil {
return err
}
_, err = cli.UpdateTask(
l,
o,
)
if err != nil {
return err
}
}
}

if !*dnoReload && task.Status == client.Enabled {
Expand Down Expand Up @@ -822,7 +879,7 @@ func doDefineTemplate(args []string) error {
}

func defineTopicHandlerUsage() {
var u = `Usage: kapacitor define-topic-handler <topic id> <handler id> <path to handler spec file>
var u = `Usage: kapacitor define-topic-handler <path to handler spec file>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay! Now everything is in the handler file.

show-topic-handler and list topic-handlers handlers haven't changed. So they still require passing both the topic ID and handler ID as args correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup.


Create or update a handler.

Expand All @@ -832,22 +889,20 @@ For example:

Define a handler using the slack.yaml file:

$ kapacitor define-handler system my_handler slack.yaml
$ kapacitor define-topic-handler slack.yaml

Options:
`
fmt.Fprintln(os.Stderr, u)
}

func doDefineTopicHandler(args []string) error {
if len(args) != 3 {
fmt.Fprintln(os.Stderr, "Must provide a topic ID, a handler ID and a path to a handler file.")
if len(args) != 1 {
fmt.Fprintln(os.Stderr, "Must provide a path to a handler file.")
defineTopicHandlerUsage()
os.Exit(2)
}
topic := args[0]
handlerID := args[1]
p := args[2]
p := args[0]
f, err := os.Open(p)
if err != nil {
return errors.Wrapf(err, "failed to open handler spec file %q", p)
Expand All @@ -870,12 +925,11 @@ func doDefineTopicHandler(args []string) error {
return errors.Wrapf(err, "failed to unmarshal json handler file %q", p)
}
}
ho.ID = handlerID

l := cli.TopicHandlerLink(topic, ho.ID)
l := cli.TopicHandlerLink(ho.Topic, ho.ID)
handler, _ := cli.TopicHandler(l)
if handler.ID == "" {
_, err = cli.CreateTopicHandler(cli.TopicHandlersLink(topic), ho)
_, err = cli.CreateTopicHandler(cli.TopicHandlersLink(ho.Topic), ho)
} else {
_, err = cli.ReplaceTopicHandler(l, ho)
}
Expand Down
Loading