Skip to content

Commit

Permalink
Add ability to load tasks/handlers from dir
Browse files Browse the repository at this point in the history
Add check for tickscript defined dbrps

Fix error that allowed you to set dbrps twice

Add template and task dbrp via tickscript

Remove dao test file

Fix issue with updating task without dbrp values

Add new template vars structure

Load tasks and templates from directory

Update define-topic-handler subcommand to 1 arg

Add loadHandlers method to load service

Add tests and prevent certain updates to templates

Update parser to define dbrp as reference.referenc

Fix issue where tasks weren't reloaded

Replace correct topic handler

Listen for SIGHUP to reload tasks/templates/handld

Move dbrps type to client package

Unexport internal functions

Address todo items

Make assorted changes suggested in PR

Add test for dbrp

Add tests for node task type and dbrps

Fix comment

Use ServeHTTP instead of making actual http req

Add soft/hard configuration option

Fix typos and remove TODO comments

Don't error if the directories are missing

Append load service

Move util methods into task_store package

Improve error messaging

Clean up kapacitor and kapacitord mains

Remove old TODO comment

Add error check in doDefine

Add load example directory
  • Loading branch information
desa committed Aug 7, 2017
1 parent afbe3c5 commit dd5602b
Show file tree
Hide file tree
Showing 31 changed files with 1,898 additions and 128 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
- [#1425](https://github.com/influxdata/kapacitor/pull/1425): BREAKING: Change over internal API to use message passing semantics.
The breaking change is that the Combine and Flatten nodes previously, but erroneously, operated across batch boundaries; this has been fixed.
- [#1497](https://github.com/influxdata/kapacitor/pull/1497): Add support for Docker Swarm autoscaling services.
- [#1481](https://github.com/influxdata/kapacitor/pull/1481): Add ability to load tasks/handlers from dir.
TICKscript was extended to be able to describe a task exclusively through a tickscript.
* tasks no longer need to specify their TaskType (Batch, Stream).
* `dbrp` expressions were added to tickscript.
Topic-Handler file format was modified to include the TopicID and HandlerID in the file.
Load service was added; the service can load tasks/handlers from a directory.

### Bugfixes

Expand Down
183 changes: 161 additions & 22 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ import (
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"net/url"
"path"
"strconv"
"strings"
"time"

"github.com/influxdata/influxdb/influxql"
Expand Down Expand Up @@ -73,6 +75,10 @@ type Config struct {

// Optional credentials for authenticating with the server.
Credentials *Credentials

// Optional Transport https://golang.org/pkg/net/http/#RoundTripper
// If nil the default transport will be used
Transport http.RoundTripper
}

// AuthenticationMethod defines the type of authentication used.
Expand Down Expand Up @@ -118,6 +124,23 @@ func (c Credentials) Validate() error {
return nil
}

type localTransport struct {
h http.Handler
}

func (l *localTransport) RoundTrip(r *http.Request) (*http.Response, error) {
w := httptest.NewRecorder()
l.h.ServeHTTP(w, r)

return w.Result(), nil
}

func NewLocalTransport(h http.Handler) http.RoundTripper {
return &localTransport{
h: h,
}
}

// Basic HTTP client
type Client struct {
url *url.URL
Expand Down Expand Up @@ -148,21 +171,28 @@ func New(conf Config) (*Client, error) {
}
}

tr := &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
rt := conf.Transport
var tr *http.Transport

if rt == nil {
tr = &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: conf.InsecureSkipVerify,
},
}
if conf.TLSConfig != nil {
tr.TLSClientConfig = conf.TLSConfig
}

rt = tr
}
return &Client{
url: u,
userAgent: conf.UserAgent,
httpClient: &http.Client{
Timeout: conf.Timeout,
Transport: tr,
Transport: rt,
},
credentials: conf.Credentials,
}, nil
Expand Down Expand Up @@ -203,8 +233,9 @@ type ExecutionStats struct {
type TaskType int

const (
StreamTask TaskType = 1
BatchTask TaskType = 2
InvalidTask TaskType = 0
StreamTask TaskType = 1
BatchTask TaskType = 2
)

func (tt TaskType) MarshalText() ([]byte, error) {
Expand All @@ -213,6 +244,8 @@ func (tt TaskType) MarshalText() ([]byte, error) {
return []byte("stream"), nil
case BatchTask:
return []byte("batch"), nil
case InvalidTask:
return []byte("invalid"), nil
default:
return nil, fmt.Errorf("unknown TaskType %d", tt)
}
Expand All @@ -224,6 +257,8 @@ func (tt *TaskType) UnmarshalText(text []byte) error {
*tt = StreamTask
case "batch":
*tt = BatchTask
case "invalid":
*tt = InvalidTask
default:
return fmt.Errorf("unknown TaskType %s", s)
}
Expand Down Expand Up @@ -513,9 +548,9 @@ func (vs *Vars) UnmarshalJSON(b []byte) error {
}

type Var struct {
Type VarType `json:"type"`
Value interface{} `json:"value"`
Description string `json:"description"`
Type VarType `json:"type" yaml:"type"`
Value interface{} `json:"value" yaml:"value"`
Description string `json:"description" yaml:"description"`
}

// A Task plus its read-only attributes.
Expand Down Expand Up @@ -725,13 +760,13 @@ func (c *Client) StorageLink(name string) Link {
}

type CreateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Create a new task.
Expand Down Expand Up @@ -759,13 +794,13 @@ func (c *Client) CreateTask(opt CreateTaskOptions) (Task, error) {
}

type UpdateTaskOptions struct {
ID string `json:"id,omitempty"`
TemplateID string `json:"template-id,omitempty"`
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
Type TaskType `json:"type,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty"`
DBRPs []DBRP `json:"dbrps,omitempty" yaml:"dbrps"`
TICKscript string `json:"script,omitempty"`
Status TaskStatus `json:"status,omitempty"`
Vars Vars `json:"vars,omitempty"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

// Update an existing task.
Expand Down Expand Up @@ -1963,6 +1998,7 @@ func (c *Client) TopicHandler(link Link) (TopicHandler, error) {
}

type TopicHandlerOptions struct {
Topic string `json:"topic" yaml:"topic"`
ID string `json:"id" yaml:"id"`
Kind string `json:"kind" yaml:"kind"`
Options map[string]interface{} `json:"options" yaml:"options"`
Expand Down Expand Up @@ -2282,3 +2318,106 @@ func (d *Duration) UnmarshalText(data []byte) error {
*d = Duration(dur)
return nil
}

type DBRPs []DBRP

func (d *DBRPs) String() string {
return fmt.Sprint(*d)
}

// Parse string of the form "db"."rp" where the quotes are optional but can include escaped quotes
// within the strings.
func (d *DBRPs) Set(value string) error {
dbrp := DBRP{}
if len(value) == 0 {
return errors.New("dbrp cannot be empty")
}
var n int
if value[0] == '"' {
dbrp.Database, n = parseQuotedStr(value)
} else {
n = strings.IndexRune(value, '.')
if n == -1 {
return errors.New("does not contain a '.', it must be in the form \"dbname\".\"rpname\" where the quotes are optional.")
}
dbrp.Database = value[:n]
}
if value[n] != '.' {
return errors.New("dbrp must specify retention policy, do you have a missing or extra '.'?")
}
value = value[n+1:]
if value[0] == '"' {
dbrp.RetentionPolicy, _ = parseQuotedStr(value)
} else {
dbrp.RetentionPolicy = value
}
*d = append(*d, dbrp)
return nil
}

// parseQuotedStr reads from txt starting with beginning quote until next unescaped quote returning the unescaped string and the number of bytes read.
func parseQuotedStr(txt string) (string, int) {
quote := txt[0]
// Unescape quotes
var buf bytes.Buffer
buf.Grow(len(txt))
last := 1
i := 1
for ; i < len(txt)-1; i++ {
if txt[i] == '\\' && txt[i+1] == quote {
buf.Write([]byte(txt[last:i]))
buf.Write([]byte{quote})
i += 2
last = i
} else if txt[i] == quote {
break
}
}
buf.Write([]byte(txt[last:i]))
return buf.String(), i + 1
}

type TaskVars struct {
ID string `json:"id,omitempty" yaml:"id"`
TemplateID string `json:"template-id,omitempty" yaml:"template-id"`
DBRPs []string `json:"dbrps,omitempty" yaml:"dbrps"`
Vars Vars `json:"vars,omitempty" yaml:"vars"`
}

func (t TaskVars) CreateTaskOptions() (CreateTaskOptions, error) {
ds := DBRPs{}
o := CreateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
}

for _, dbrp := range t.DBRPs {
if err := ds.Set(dbrp); err != nil {
return o, err
}
}

o.DBRPs = ds

return o, nil
}

func (t TaskVars) UpdateTaskOptions() (UpdateTaskOptions, error) {
ds := DBRPs{}
o := UpdateTaskOptions{
ID: t.ID,
TemplateID: t.TemplateID,
Vars: t.Vars,
}

for _, dbrp := range t.DBRPs {
if err := ds.Set(dbrp); err != nil {
return o, err
}
}

o.DBRPs = ds

return o, nil
}
Loading

0 comments on commit dd5602b

Please sign in to comment.