From a95528333f02f10cddac3208a8ae1b89e154998f Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 15 Jan 2016 16:23:04 -0700 Subject: [PATCH] Add deadman's switch --- CHANGELOG.md | 2 +- alert.go | 17 +- cmd/kapacitor/main.go | 5 + cmd/kapacitord/run/config.go | 4 + cmd/kapacitord/run/server.go | 10 + cmd/kapacitord/run/server_test.go | 24 +-- edge.go | 17 +- etc/kapacitor/kapacitor.conf | 15 ++ global_stats.go | 238 +++++++++++++++++++++ integrations/batcher_test.go | 4 +- integrations/helpers_test.go | 15 ++ integrations/streamer_test.go | 4 +- node.go | 16 +- pipeline/alert.go | 2 + pipeline/node.go | 91 ++++++++ pipeline/pipeline.go | 43 +++- pipeline/pipeline_test.go | 40 ++-- pipeline/stats.go | 19 ++ services/deadman/config.go | 35 ++++ services/deadman/service.go | 49 +++++ services/task_store/service.go | 15 +- stats.go | 262 ++++------------------- stream.go | 1 - task.go | 34 +-- task_master.go | 34 +++ tick/eval.go | 8 +- tick/lex.go | 178 ++++++++-------- tick/lex_test.go | 332 +++++++++++++++--------------- tick/node_test.go | 2 +- tick/parser.go | 132 ++++++------ tick/parser_test.go | 68 +++--- tick/stateful_expr.go | 58 +++--- 32 files changed, 1073 insertions(+), 701 deletions(-) create mode 100644 global_stats.go create mode 100644 pipeline/stats.go create mode 100644 services/deadman/config.go create mode 100644 services/deadman/service.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 64dc46275..49a1a7920 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,12 +11,12 @@ See [udf/agent/README.md](https://github.com/influxdata/kapacitor/blob/master/ud With the addition of UDFs it is now possible to run custom anomaly detection alogrithms suited to your needs. There are simple examples of how to use UDFs in [udf/agent/examples](https://github.com/influxdata/kapacitor/tree/master/udf/agent/examples/). - The version has jumped significantly so that it is inline with other projects in the TICK stack. This way you can easily tell which versions of Telegraf, InfluxDB, Chronograf and Kapacitor work together. ### Features +- [#137](https://github.com/influxdata/kapacitor/issues/137): Add deadman's switch. Can be setup via TICKscript and globally via configuration. - [#72](https://github.com/influxdata/kapacitor/issues/72): Add support for User Defined Functions (UDFs). - [#138](https://github.com/influxdata/kapacitor/issues/138): Change over to influxdata github org. - [#139](https://github.com/influxdata/kapacitor/issues/139): Alerta.io support thanks! @md14454 diff --git a/alert.go b/alert.go index 3e3b628b3..a513a9c4f 100644 --- a/alert.go +++ b/alert.go @@ -392,6 +392,9 @@ type idInfo struct { // Measurement name Name string + // Task name + TaskName string + // Concatenation of all group-by tags of the form [key=value,]+. // If not groupBy is performed equal to literal 'nil' Group string @@ -418,9 +421,10 @@ func (a *AlertNode) renderID(name string, group models.GroupID, tags models.Tags g = "nil" } info := idInfo{ - Name: name, - Group: g, - Tags: tags, + Name: name, + TaskName: a.et.Task.Name, + Group: g, + Tags: tags, } var id bytes.Buffer err := a.idTmpl.Execute(&id, info) @@ -437,9 +441,10 @@ func (a *AlertNode) renderMessage(id, name string, group models.GroupID, tags mo } info := messageInfo{ idInfo: idInfo{ - Name: name, - Group: g, - Tags: tags, + Name: name, + TaskName: a.et.Task.Name, + Group: g, + Tags: tags, }, ID: id, Fields: fields, diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index 2fa7f08af..60e2b6d39 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -616,6 +616,11 @@ func reloadUsage() { } func doReload(args []string) error { + if len(args) < 1 { + fmt.Fprintln(os.Stderr, "Must pass at least one task name") + reloadUsage() + os.Exit(2) + } err := doDisable(args) if err != nil { return err diff --git a/cmd/kapacitord/run/config.go b/cmd/kapacitord/run/config.go index 67ea806a1..a2fc9ebb4 100644 --- a/cmd/kapacitord/run/config.go +++ b/cmd/kapacitord/run/config.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/kapacitor/services/alerta" + "github.com/influxdata/kapacitor/services/deadman" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/influxdb" @@ -54,6 +55,7 @@ type Config struct { Reporting reporting.Config `toml:"reporting"` Stats stats.Config `toml:"stats"` UDF udf.Config `toml:"udf"` + Deadman deadman.Config `toml:"deadman"` Hostname string `toml:"hostname"` DataDir string `toml:"data_dir"` @@ -64,6 +66,7 @@ func NewConfig() *Config { c := &Config{ Hostname: "localhost", } + c.HTTP = httpd.NewConfig() c.Replay = replay.NewConfig() c.Task = task_store.NewConfig() @@ -82,6 +85,7 @@ func NewConfig() *Config { c.Reporting = reporting.NewConfig() c.Stats = stats.NewConfig() c.UDF = udf.NewConfig() + c.Deadman = deadman.NewConfig() return c } diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index 79704d811..7e2720707 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -15,6 +15,7 @@ import ( "github.com/influxdata/kapacitor" "github.com/influxdata/kapacitor/services/alerta" + "github.com/influxdata/kapacitor/services/deadman" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" "github.com/influxdata/kapacitor/services/influxdb" @@ -130,6 +131,7 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (* // Append Kapacitor services. s.appendUDFService(c.UDF) + s.appendDeadmanService(c.Deadman) s.appendSMTPService(c.SMTP) s.appendHTTPDService(c.HTTP) s.appendInfluxDBService(c.InfluxDB, c.Hostname) @@ -223,6 +225,14 @@ func (s *Server) appendReplayStoreService(c replay.Config) { s.Services = append(s.Services, srv) } +func (s *Server) appendDeadmanService(c deadman.Config) { + l := s.LogService.NewLogger("[deadman] ", log.LstdFlags) + srv := deadman.NewService(c, l) + + s.TaskMaster.DeadmanService = srv + s.Services = append(s.Services, srv) +} + func (s *Server) appendUDFService(c udf.Config) { l := s.LogService.NewLogger("[udf] ", log.LstdFlags) srv := udf.NewService(c, l) diff --git a/cmd/kapacitord/run/server_test.go b/cmd/kapacitord/run/server_test.go index 91bcfcaac..9fe8bcb1a 100644 --- a/cmd/kapacitord/run/server_test.go +++ b/cmd/kapacitord/run/server_test.go @@ -8,7 +8,6 @@ import ( "net/http" "net/url" "os" - "os/exec" "path" "path/filepath" "reflect" @@ -705,33 +704,16 @@ func TestServer_UDFAgents(t *testing.T) { t.Fatal(err) } - tmpDir, err := ioutil.TempDir("", "testStreamTaskRecording") - if err != nil { - t.Fatal(err) - } - defer os.RemoveAll(tmpDir) agents := []struct { buildFunc func() error config udf.FunctionConfig }{ // Go { - buildFunc: func() error { - cmd := exec.Command( - "go", - "build", - "-o", - filepath.Join(tmpDir, "go-moving_avg"), - filepath.Join(udfDir, "agent/examples/moving_avg.go"), - ) - out, err := cmd.CombinedOutput() - if err != nil { - return fmt.Errorf("go build failed: %v: %s", err, string(out)) - } - return nil - }, + buildFunc: func() error { return nil }, config: udf.FunctionConfig{ - Prog: filepath.Join(tmpDir, "go-moving_avg"), + Prog: "go", + Args: []string{"run", filepath.Join(udfDir, "agent/examples/moving_avg.go")}, Timeout: toml.Duration(time.Minute), }, }, diff --git a/edge.go b/edge.go index 36f67aade..28c2b3cd6 100644 --- a/edge.go +++ b/edge.go @@ -5,6 +5,7 @@ import ( "expvar" "fmt" "log" + "strconv" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" @@ -61,8 +62,20 @@ func newEdge(taskName, parentName, childName string, t pipeline.EdgeType, logSer return e } -func (e *Edge) collectedCount() string { - return e.statMap.Get(statCollected).String() +func (e *Edge) emittedCount() int64 { + c, err := strconv.ParseUint(e.statMap.Get(statEmitted).String(), 10, 64) + if err != nil { + panic("emitted count is not an int") + } + return int64(c) +} + +func (e *Edge) collectedCount() int64 { + c, err := strconv.ParseUint(e.statMap.Get(statCollected).String(), 10, 64) + if err != nil { + panic("collected count is not an int") + } + return int64(c) } // Close the edge, this can only be called after all diff --git a/etc/kapacitor/kapacitor.conf b/etc/kapacitor/kapacitor.conf index 7a24b15f7..5a76c3846 100644 --- a/etc/kapacitor/kapacitor.conf +++ b/etc/kapacitor/kapacitor.conf @@ -36,6 +36,21 @@ data_dir = "/var/lib/kapacitor" # How often to snapshot running task state. snapshot-interval = "60s" +[deadman] + # Configure a deadman's switch + # Globally configure deadman's switches on all stream tasks. + # NOTE: for this to be of use you must also globally configure at least one alerting method. + global = false + # Threshold, if globally configured the alert will be triggered if the throughput in points/interval is <= threshold. + threshold = 0.0 + # Interval, if globally configured the frequency at which to check the throughput. + interval = "10s" + # Id -- the alert Id, NODE_NAME will be replaced with the name of the node being monitored. + id = "node 'NODE_NAME' in task '{{ .TaskName }}'" + # The message of the alert. INTERVAL will be replaced by the interval. + message = "{{ .ID }} is dead: {{ index .Fields \"collected\" }} points/INTERVAL" + + [influxdb] # Connect to an InfluxDB cluster # Kapacitor can subscribe, query and write to this cluster. diff --git a/global_stats.go b/global_stats.go new file mode 100644 index 000000000..3927843e1 --- /dev/null +++ b/global_stats.go @@ -0,0 +1,238 @@ +package kapacitor + +import ( + "expvar" + "runtime" + "strconv" + "sync" + "time" + + "github.com/twinj/uuid" +) + +const ( + // List of names for top-level exported vars + ClusterIDVarName = "cluster_id" + ServerIDVarName = "server_id" + HostVarName = "host" + ProductVarName = "product" + VersionVarName = "version" + + NumTasksVarName = "num_tasks" + NumEnabledTasksVarName = "num_enabled_tasks" + NumSubscriptionsVarName = "num_subscriptions" + + UptimeVarName = "uptime" + + // The name of the product + Product = "kapacitor" +) + +var ( + // Global expvars + NumTasks = &expvar.Int{} + NumEnabledTasks = &expvar.Int{} + NumSubscriptions = &expvar.Int{} +) + +var ( + startTime time.Time +) + +func init() { + startTime = time.Now().UTC() + expvar.Publish(NumTasksVarName, NumTasks) + expvar.Publish(NumEnabledTasksVarName, NumEnabledTasks) + expvar.Publish(NumSubscriptionsVarName, NumSubscriptions) +} + +// Gets an exported var and returns its unquoted string contents +func GetStringVar(name string) string { + s, err := strconv.Unquote(expvar.Get(name).String()) + if err != nil { + panic(err) + } + return s +} + +// Gets an exported var and returns its int value +func GetIntVar(name string) int64 { + i, err := strconv.ParseInt(expvar.Get(name).String(), 10, 64) + if err != nil { + panic(err) + } + return i +} + +// Gets an exported var and returns its float value +func GetFloatVar(name string) float64 { + f, err := strconv.ParseFloat(expvar.Get(name).String(), 64) + if err != nil { + panic(err) + } + return f +} + +func Uptime() time.Duration { + return time.Now().Sub(startTime) +} + +var expvarMu sync.Mutex + +// NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, +// and values are placed at the key "values". +// The "values" map is returned so that statistics can be set. +func NewStatistics(name string, tags map[string]string) *expvar.Map { + expvarMu.Lock() + defer expvarMu.Unlock() + + key := uuid.NewV4().String() + + m := &expvar.Map{} + m.Init() + expvar.Publish(key, m) + + // Set the name + nameVar := &expvar.String{} + nameVar.Set(name) + m.Set("name", nameVar) + + // Set the tags + tagsVar := &expvar.Map{} + tagsVar.Init() + for k, v := range tags { + value := &expvar.String{} + value.Set(v) + tagsVar.Set(k, value) + } + m.Set("tags", tagsVar) + + // Create and set the values entry used for actual stats. + statMap := &expvar.Map{} + statMap.Init() + m.Set("values", statMap) + + return statMap +} + +type StatsData struct { + Name string `json:"name"` + Tags map[string]string `json:"tags"` + Values map[string]interface{} `json:"values"` +} + +// Return all stats data from the expvars. +func GetStatsData() ([]StatsData, error) { + allData := make([]StatsData, 0) + // Add Global expvars + globalData := StatsData{ + Name: "kapacitor", + Values: make(map[string]interface{}), + } + + allData = append(allData, globalData) + + expvar.Do(func(kv expvar.KeyValue) { + var f interface{} + var err error + switch v := kv.Value.(type) { + case *expvar.Float: + f, err = strconv.ParseFloat(v.String(), 64) + if err == nil { + globalData.Values[kv.Key] = f + } + case *expvar.Int: + f, err = strconv.ParseInt(v.String(), 10, 64) + if err == nil { + globalData.Values[kv.Key] = f + } + case *expvar.Map: + data := StatsData{ + Tags: make(map[string]string), + Values: make(map[string]interface{}), + } + + v.Do(func(subKV expvar.KeyValue) { + switch subKV.Key { + case "name": + // straight to string name. + u, err := strconv.Unquote(subKV.Value.String()) + if err != nil { + return + } + data.Name = u + case "tags": + // string-string tags map. + n := subKV.Value.(*expvar.Map) + n.Do(func(t expvar.KeyValue) { + u, err := strconv.Unquote(t.Value.String()) + if err != nil { + return + } + data.Tags[t.Key] = u + }) + case "values": + // string-interface map. + n := subKV.Value.(*expvar.Map) + n.Do(func(kv expvar.KeyValue) { + var f interface{} + var err error + switch v := kv.Value.(type) { + case *expvar.Float: + f, err = strconv.ParseFloat(v.String(), 64) + if err != nil { + return + } + case *expvar.Int: + f, err = strconv.ParseInt(v.String(), 10, 64) + if err != nil { + return + } + default: + return + } + data.Values[kv.Key] = f + }) + } + }) + + // If no field data, don't include it in the results + if len(data.Values) == 0 { + return + } + + allData = append(allData, data) + } + }) + + // Add uptime to globalData + globalData.Values[UptimeVarName] = Uptime().Seconds() + + // Add Go memstats. + data := StatsData{ + Name: "runtime", + } + + var rt runtime.MemStats + runtime.ReadMemStats(&rt) + data.Values = map[string]interface{}{ + "Alloc": int64(rt.Alloc), + "TotalAlloc": int64(rt.TotalAlloc), + "Sys": int64(rt.Sys), + "Lookups": int64(rt.Lookups), + "Mallocs": int64(rt.Mallocs), + "Frees": int64(rt.Frees), + "HeapAlloc": int64(rt.HeapAlloc), + "HeapSys": int64(rt.HeapSys), + "HeapIdle": int64(rt.HeapIdle), + "HeapInUse": int64(rt.HeapInuse), + "HeapReleased": int64(rt.HeapReleased), + "HeapObjects": int64(rt.HeapObjects), + "PauseTotalNs": int64(rt.PauseTotalNs), + "NumGC": int64(rt.NumGC), + "NumGoroutine": int64(runtime.NumGoroutine()), + } + allData = append(allData, data) + + return allData, nil +} diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index 2134b0d40..a7a3638bf 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -313,11 +313,11 @@ func testBatcher(t *testing.T, name, script string) (clock.Setter, *kapacitor.Ex tm := kapacitor.NewTaskMaster(logService) tm.HTTPDService = httpService tm.TaskStore = taskStore{} + tm.DeadmanService = deadman{} tm.Open() - scope := tm.CreateTICKScope() // Create task - task, err := kapacitor.NewTask(name, script, kapacitor.BatchTask, dbrps, 0, scope) + task, err := tm.NewTask(name, script, kapacitor.BatchTask, dbrps, 0) if err != nil { t.Fatal(err) } diff --git a/integrations/helpers_test.go b/integrations/helpers_test.go index f052e2f1e..3256a07a2 100644 --- a/integrations/helpers_test.go +++ b/integrations/helpers_test.go @@ -9,6 +9,7 @@ import ( "net/url" "os" "reflect" + "time" "github.com/influxdata/kapacitor" "github.com/influxdata/kapacitor/wlog" @@ -126,3 +127,17 @@ func (ts taskStore) HasSnapshot(name string) bool func (ts taskStore) LoadSnapshot(name string) (*kapacitor.TaskSnapshot, error) { return nil, errors.New("not implemented") } + +type deadman struct { + interval time.Duration + threshold float64 + id string + message string + global bool +} + +func (d deadman) Interval() time.Duration { return d.interval } +func (d deadman) Threshold() float64 { return d.threshold } +func (d deadman) Id() string { return d.id } +func (d deadman) Message() string { return d.message } +func (d deadman) Global() bool { return d.global } diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index ea6cd7fe1..fa55111b7 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -2224,11 +2224,11 @@ func testStreamer( tm.HTTPDService = httpService tm.UDFService = udfService tm.TaskStore = taskStore{} + tm.DeadmanService = deadman{} tm.Open() - scope := tm.CreateTICKScope() //Create the task - task, err := kapacitor.NewTask(name, script, kapacitor.StreamTask, dbrps, 0, scope) + task, err := tm.NewTask(name, script, kapacitor.StreamTask, dbrps, 0) if err != nil { t.Fatal(err) } diff --git a/node.go b/node.go index 9f56d01c2..e1d3430a4 100644 --- a/node.go +++ b/node.go @@ -38,6 +38,9 @@ type Node interface { // executing dot edot(buf *bytes.Buffer) + + // the number of points/batches this node has collected + collectedCount() int64 } //implementation of Node @@ -159,7 +162,7 @@ func (n *node) closeChildEdges() { func (n *node) edot(buf *bytes.Buffer) { for i, c := range n.children { buf.Write([]byte( - fmt.Sprintf("%s -> %s [label=\"%s\"];\n", + fmt.Sprintf("%s -> %s [label=\"%d\"];\n", n.Name(), c.Name(), n.outs[i].collectedCount(), @@ -167,3 +170,14 @@ func (n *node) edot(buf *bytes.Buffer) { )) } } + +// Return the number of points/batches this node +// has collected. +func (n *node) collectedCount() (c int64) { + + // Count how many points each parent edge has emitted. + for _, in := range n.ins { + c += in.emittedCount() + } + return c +} diff --git a/pipeline/alert.go b/pipeline/alert.go index 99104bb9a..5605d4512 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -81,6 +81,7 @@ type AlertNode struct { // Available template data: // // * Name -- Measurement name. + // * TaskName -- The name of the task // * Group -- Concatenation of all group-by tags of the form [key=value,]+. // If no groupBy is performed equal to literal 'nil'. // * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value. @@ -118,6 +119,7 @@ type AlertNode struct { // // * ID -- The ID of the alert. // * Name -- Measurement name. + // * TaskName -- The name of the task // * Group -- Concatenation of all group-by tags of the form [key=value,]+. // If no groupBy is performed equal to literal 'nil'. // * Tags -- Map of tags. Use '{{ index .Tags "key" }}' to get a specific tag value. diff --git a/pipeline/node.go b/pipeline/node.go index b420c1aa1..3beeeb02f 100644 --- a/pipeline/node.go +++ b/pipeline/node.go @@ -3,6 +3,8 @@ package pipeline import ( "bytes" "fmt" + "strings" + "time" "github.com/influxdata/kapacitor/tick" ) @@ -68,12 +70,15 @@ type Node interface { setTMark(b bool) pMark() bool setPMark(b bool) + setPipeline(*Pipeline) + pipeline() *Pipeline // Return .dot string to graph DAG dot(buf *bytes.Buffer) } type node struct { + p *Pipeline desc string name string id ID @@ -127,6 +132,8 @@ func (n *node) addParent(c Node) { } func (n *node) linkChild(c Node) { + c.setPipeline(n.p) + n.p.assignID(c) n.children = append(n.children, c) c.addParent(n) } @@ -147,6 +154,13 @@ func (n *node) setPMark(b bool) { n.pm = b } +func (n *node) setPipeline(p *Pipeline) { + n.p = p +} +func (n *node) pipeline() *Pipeline { + return n.p +} + // tick:ignore func (n *node) Wants() EdgeType { return n.wants @@ -163,6 +177,83 @@ func (n *node) dot(buf *bytes.Buffer) { } } +// Create a new stream of data that contains the internal statistics of the node. +// The interval represents how often to emit the statistics based on real time. +// This means the interval time is independent of the times of the data points the source node is receiving. +// +// Each node has these internal statistics: +// +// * collected -- the number of points or batches this node has received. +// +func (n *node) Stats(interval time.Duration) *StatsNode { + stats := newStatsNode(n, interval) + n.pipeline().addSource(stats) + return stats +} + +const nodeNameMarker = "NODE_NAME" +const intervalMarker = "INTERVAL" + +// Helper function for creating an alert on low throughput, aka deadman's switch. +// +// - Threshold -- trigger alert if throughput drops below threshold in points/interval. +// - Interval -- how often to check the throughput. +// +// Example: +// var data = stream.from()... +// // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. +// data.deadman(100.0, 10s) +// //Do normal processing of data +// data.... +// +// The above is equivalent to this +// Example: +// var data = stream.from()... +// // Trigger critical alert if the throughput drops below 100 points per 10s and checked every 10s. +// data.stats(10s) +// .derivative('collected') +// .unit(10s) +// .nonNegative() +// .alert() +// .id('node \'stream0\' in task \'{{ .TaskName }}\'') +// .message('{{ .ID }} is {{ if eq .Level "OK" }}alive{{ else }}dead{{ end }}: {{ index .Fields "collected" | printf "%0.3f" }} points/10s.') +// .crit(lamdba: "collected" <= 100.0) +// //Do normal processing of data +// data.... +// +// The `id` and `message` alert properties can be configured globally via the 'deadman' configuration section. +// +// Since the AlertNode is the last piece it can be further modified as normal. +// Example: +// var data = stream.from()... +// // Trigger critical alert if the throughput drops below 100 points per 1s and checked every 10s. +// data.deadman(100.0, 10s).slack().channel('#dead_tasks') +// //Do normal processing of data +// data.... +// +func (n *node) Deadman(threshold float64, interval time.Duration) *AlertNode { + dn := n.Stats(interval). + Derivative("collected").NonNegative() + dn.Unit = interval + + an := dn.Alert() + an.Crit = &tick.BinaryNode{ + Operator: tick.TokenLessEqual, + Left: &tick.ReferenceNode{ + Reference: "collected", + }, + Right: &tick.NumberNode{ + IsFloat: true, + Float64: threshold, + }, + } + // Replace NODE_NAME with actual name of the node in the Id. + an.Id = strings.Replace(n.pipeline().deadman.Id(), nodeNameMarker, n.Name(), 1) + // Set the message on the alert node. + an.Message = strings.Replace(n.pipeline().deadman.Message(), intervalMarker, interval.String(), 1) + return an +} + // --------------------------------- // Chaining methods // diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index 71049f33e..9170f8036 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -3,21 +3,36 @@ package pipeline import ( "bytes" "fmt" + "time" "github.com/influxdata/kapacitor/tick" ) +// Information relavant to configuring a deadman's swith +type DeadmanService interface { + Interval() time.Duration + Threshold() float64 + Id() string + Message() string + Global() bool +} + // A complete data processing pipeline. Starts with a single source. // tick:ignore type Pipeline struct { - Source Node - id ID - sorted []Node + sources []Node + id ID + sorted []Node + + deadman DeadmanService } // Create a pipeline from a given script. // tick:ignore -func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope) (*Pipeline, error) { +func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope, deadman DeadmanService) (*Pipeline, error) { + p := &Pipeline{ + deadman: deadman, + } var src Node switch sourceEdge { case StreamEdge: @@ -29,17 +44,26 @@ func CreatePipeline(script string, sourceEdge EdgeType, scope *tick.Scope) (*Pip default: return nil, fmt.Errorf("source edge type must be either Stream or Batch not %s", sourceEdge) } + p.addSource(src) + err := tick.Evaluate(script, scope) if err != nil { return nil, err } - p := &Pipeline{Source: src} - p.Walk(p.setID) + if sourceEdge == StreamEdge && deadman.Global() { + src.(*StreamNode).Deadman(deadman.Threshold(), deadman.Interval()) + } return p, nil } -func (p *Pipeline) setID(n Node) error { +func (p *Pipeline) addSource(src Node) { + src.setPipeline(p) + p.assignID(src) + p.sources = append(p.sources, src) +} + +func (p *Pipeline) assignID(n Node) error { n.setID(p.id) p.id++ return nil @@ -62,7 +86,10 @@ func (p *Pipeline) Walk(f func(n Node) error) error { } func (p *Pipeline) sort() { - p.visit(p.Source) + // Iterate the sources in reverse order + for i := len(p.sources) - 1; i >= 0; i-- { + p.visit(p.sources[i]) + } //reverse p.sorted s := p.sorted for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { diff --git a/pipeline/pipeline_test.go b/pipeline/pipeline_test.go index e30afe461..0ac1e3252 100644 --- a/pipeline/pipeline_test.go +++ b/pipeline/pipeline_test.go @@ -8,6 +8,20 @@ import ( "github.com/stretchr/testify/assert" ) +type deadman struct { + interval time.Duration + threshold float64 + id string + message string + global bool +} + +func (d deadman) Interval() time.Duration { return d.interval } +func (d deadman) Threshold() float64 { return d.threshold } +func (d deadman) Id() string { return d.id } +func (d deadman) Message() string { return d.message } +func (d deadman) Global() bool { return d.global } + func TestTICK_To_Pipeline_MultiLine(t *testing.T) { assert := assert.New(t) @@ -17,12 +31,14 @@ w.period(10s) w.every(1s) ` + d := deadman{} + scope := tick.NewScope() - p, err := CreatePipeline(tickScript, StreamEdge, scope) + p, err := CreatePipeline(tickScript, StreamEdge, scope, d) assert.Nil(err) assert.NotNil(p) - assert.Equal(1, len(p.Source.Children())) - w, ok := p.Source.Children()[0].(*WindowNode) + assert.Equal(1, len(p.sources[0].Children())) + w, ok := p.sources[0].Children()[0].(*WindowNode) if assert.True(ok) { assert.Equal(time.Duration(10)*time.Second, w.Period) assert.Equal(time.Duration(1)*time.Second, w.Every) @@ -33,11 +49,15 @@ w.every(1s) func TestPipelineSort(t *testing.T) { assert := assert.New(t) - p1 := &node{} - p2 := &node{} - p3 := &node{} - p4 := &node{} - p5 := &node{} + p := &Pipeline{} + + p1 := &node{p: p} + p2 := &node{p: p} + p3 := &node{p: p} + p4 := &node{p: p} + p5 := &node{p: p} + + p.addSource(p1) p1.linkChild(p2) p2.linkChild(p3) @@ -45,10 +65,6 @@ func TestPipelineSort(t *testing.T) { p4.linkChild(p5) p5.linkChild(p3) - p := &Pipeline{ - Source: p1, - } - p.sort() sorted := []Node{ diff --git a/pipeline/stats.go b/pipeline/stats.go new file mode 100644 index 000000000..3461a5e36 --- /dev/null +++ b/pipeline/stats.go @@ -0,0 +1,19 @@ +package pipeline + +import "time" + +type StatsNode struct { + chainnode + // tick:ignore + SourceNode Node + // tick:ignore + Interval time.Duration +} + +func newStatsNode(n Node, interval time.Duration) *StatsNode { + return &StatsNode{ + chainnode: newBasicChainNode("stats", StreamEdge, StreamEdge), + SourceNode: n, + Interval: interval, + } +} diff --git a/services/deadman/config.go b/services/deadman/config.go new file mode 100644 index 000000000..2cda50122 --- /dev/null +++ b/services/deadman/config.go @@ -0,0 +1,35 @@ +package deadman + +import ( + "time" + + "github.com/influxdb/influxdb/toml" +) + +const ( + // Default deadman's switch interval + DefaultInterval = toml.Duration(time.Second * 10) + // Default deadman's switch threshold + DefaultThreshold = float64(0) + // Default deadman's switch id + DefaultId = "node 'NODE_NAME' in task '{{ .TaskName }}'" + // Default deadman's switch message + DefaultMessage = "{{ .ID }} is {{ if eq .Level \"OK\" }}alive{{ else }}dead{{ end }}: {{ index .Fields \"collected\" | printf \"%0.3f\" }} points/INTERVAL." +) + +type Config struct { + Interval toml.Duration `toml:"interval"` + Threshold float64 `toml:"threshold"` + Id string `toml:"id"` + Message string `toml:"message"` + Global bool `toml:"global"` +} + +func NewConfig() Config { + return Config{ + Interval: DefaultInterval, + Threshold: DefaultThreshold, + Id: DefaultId, + Message: DefaultMessage, + } +} diff --git a/services/deadman/service.go b/services/deadman/service.go new file mode 100644 index 000000000..dc10210e5 --- /dev/null +++ b/services/deadman/service.go @@ -0,0 +1,49 @@ +package deadman + +import ( + "log" + "time" +) + +type Service struct { + c Config + logger *log.Logger +} + +func NewService(c Config, l *log.Logger) *Service { + return &Service{ + c: c, + logger: l, + } +} + +func (s *Service) Interval() time.Duration { + return time.Duration(s.c.Interval) +} + +func (s *Service) Threshold() float64 { + return s.c.Threshold +} + +func (s *Service) Id() string { + return s.c.Id +} + +func (s *Service) Message() string { + return s.c.Message +} + +func (s *Service) Global() bool { + return s.c.Global +} + +func (s *Service) Open() error { + if s.Global() { + s.logger.Println("I! Deadman's switch is configured globally") + } + return nil +} + +func (s *Service) Close() error { + return nil +} diff --git a/services/task_store/service.go b/services/task_store/service.go index 948fb613b..5e193e10f 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -17,7 +17,6 @@ import ( "github.com/boltdb/bolt" "github.com/influxdata/kapacitor" "github.com/influxdata/kapacitor/services/httpd" - "github.com/influxdata/kapacitor/tick" "github.com/influxdb/influxdb/influxql" ) @@ -39,11 +38,17 @@ type Service struct { DelRoutes([]httpd.Route) } TaskMaster interface { + NewTask( + name, + script string, + tt kapacitor.TaskType, + dbrps []kapacitor.DBRP, + snapshotInterval time.Duration, + ) (*kapacitor.Task, error) StartTask(t *kapacitor.Task) (*kapacitor.ExecutingTask, error) StopTask(name string) error IsExecuting(name string) bool ExecutingDot(name string) string - CreateTICKScope() *tick.Scope } logger *log.Logger @@ -458,12 +463,11 @@ func (ts *Service) handleDisable(w http.ResponseWriter, r *http.Request) { func (ts *Service) Save(task *rawTask) error { // Validate task - _, err := kapacitor.NewTask(task.Name, + _, err := ts.TaskMaster.NewTask(task.Name, task.TICKscript, task.Type, task.DBRPs, task.SnapshotInterval, - ts.TaskMaster.CreateTICKScope(), ) if err != nil { return fmt.Errorf("invalid task: %s", err) @@ -555,12 +559,11 @@ func (ts *Service) Load(name string) (*kapacitor.Task, error) { if err != nil { return nil, err } - return kapacitor.NewTask(task.Name, + return ts.TaskMaster.NewTask(task.Name, task.TICKscript, task.Type, task.DBRPs, task.SnapshotInterval, - ts.TaskMaster.CreateTICKScope(), ) } diff --git a/stats.go b/stats.go index 3927843e1..db23b6498 100644 --- a/stats.go +++ b/stats.go @@ -1,238 +1,64 @@ package kapacitor import ( - "expvar" - "runtime" - "strconv" - "sync" + "fmt" + "log" "time" - "github.com/twinj/uuid" + "github.com/influxdata/kapacitor/models" + "github.com/influxdata/kapacitor/pipeline" ) -const ( - // List of names for top-level exported vars - ClusterIDVarName = "cluster_id" - ServerIDVarName = "server_id" - HostVarName = "host" - ProductVarName = "product" - VersionVarName = "version" - - NumTasksVarName = "num_tasks" - NumEnabledTasksVarName = "num_enabled_tasks" - NumSubscriptionsVarName = "num_subscriptions" - - UptimeVarName = "uptime" - - // The name of the product - Product = "kapacitor" -) - -var ( - // Global expvars - NumTasks = &expvar.Int{} - NumEnabledTasks = &expvar.Int{} - NumSubscriptions = &expvar.Int{} -) - -var ( - startTime time.Time -) - -func init() { - startTime = time.Now().UTC() - expvar.Publish(NumTasksVarName, NumTasks) - expvar.Publish(NumEnabledTasksVarName, NumEnabledTasks) - expvar.Publish(NumSubscriptionsVarName, NumSubscriptions) +type StatsNode struct { + node + s *pipeline.StatsNode + en Node + closing chan struct{} } -// Gets an exported var and returns its unquoted string contents -func GetStringVar(name string) string { - s, err := strconv.Unquote(expvar.Get(name).String()) - if err != nil { - panic(err) +// Create a new StreamNode which filters data from a source. +func newStatsNode(et *ExecutingTask, n *pipeline.StatsNode, l *log.Logger) (*StatsNode, error) { + // Lookup the executing node for stats. + en := et.lookup[n.SourceNode.ID()] + if en == nil { + return nil, fmt.Errorf("no node found for %s", n.SourceNode.Name()) } - return s -} - -// Gets an exported var and returns its int value -func GetIntVar(name string) int64 { - i, err := strconv.ParseInt(expvar.Get(name).String(), 10, 64) - if err != nil { - panic(err) + sn := &StatsNode{ + node: node{Node: n, et: et, logger: l}, + s: n, + en: en, + closing: make(chan struct{}), } - return i + sn.node.runF = sn.runStats + sn.node.stopF = sn.stopStats + return sn, nil } -// Gets an exported var and returns its float value -func GetFloatVar(name string) float64 { - f, err := strconv.ParseFloat(expvar.Get(name).String(), 64) - if err != nil { - panic(err) - } - return f -} - -func Uptime() time.Duration { - return time.Now().Sub(startTime) -} - -var expvarMu sync.Mutex - -// NewStatistics creates an expvar-based map. Within there "name" is the Measurement name, "tags" are the tags, -// and values are placed at the key "values". -// The "values" map is returned so that statistics can be set. -func NewStatistics(name string, tags map[string]string) *expvar.Map { - expvarMu.Lock() - defer expvarMu.Unlock() - - key := uuid.NewV4().String() - - m := &expvar.Map{} - m.Init() - expvar.Publish(key, m) - - // Set the name - nameVar := &expvar.String{} - nameVar.Set(name) - m.Set("name", nameVar) - - // Set the tags - tagsVar := &expvar.Map{} - tagsVar.Init() - for k, v := range tags { - value := &expvar.String{} - value.Set(v) - tagsVar.Set(k, value) +func (s *StatsNode) runStats([]byte) error { + ticker := time.NewTicker(s.s.Interval) + defer ticker.Stop() + point := models.Point{ + Name: "stats", + Tags: map[string]string{"node": s.en.Name()}, } - m.Set("tags", tagsVar) - - // Create and set the values entry used for actual stats. - statMap := &expvar.Map{} - statMap.Init() - m.Set("values", statMap) - - return statMap -} - -type StatsData struct { - Name string `json:"name"` - Tags map[string]string `json:"tags"` - Values map[string]interface{} `json:"values"` -} - -// Return all stats data from the expvars. -func GetStatsData() ([]StatsData, error) { - allData := make([]StatsData, 0) - // Add Global expvars - globalData := StatsData{ - Name: "kapacitor", - Values: make(map[string]interface{}), - } - - allData = append(allData, globalData) - - expvar.Do(func(kv expvar.KeyValue) { - var f interface{} - var err error - switch v := kv.Value.(type) { - case *expvar.Float: - f, err = strconv.ParseFloat(v.String(), 64) - if err == nil { - globalData.Values[kv.Key] = f - } - case *expvar.Int: - f, err = strconv.ParseInt(v.String(), 10, 64) - if err == nil { - globalData.Values[kv.Key] = f - } - case *expvar.Map: - data := StatsData{ - Tags: make(map[string]string), - Values: make(map[string]interface{}), - } - - v.Do(func(subKV expvar.KeyValue) { - switch subKV.Key { - case "name": - // straight to string name. - u, err := strconv.Unquote(subKV.Value.String()) - if err != nil { - return - } - data.Name = u - case "tags": - // string-string tags map. - n := subKV.Value.(*expvar.Map) - n.Do(func(t expvar.KeyValue) { - u, err := strconv.Unquote(t.Value.String()) - if err != nil { - return - } - data.Tags[t.Key] = u - }) - case "values": - // string-interface map. - n := subKV.Value.(*expvar.Map) - n.Do(func(kv expvar.KeyValue) { - var f interface{} - var err error - switch v := kv.Value.(type) { - case *expvar.Float: - f, err = strconv.ParseFloat(v.String(), 64) - if err != nil { - return - } - case *expvar.Int: - f, err = strconv.ParseInt(v.String(), 10, 64) - if err != nil { - return - } - default: - return - } - data.Values[kv.Key] = f - }) + for { + select { + case <-s.closing: + return nil + case now := <-ticker.C: + point.Time = now + count := s.en.collectedCount() + point.Fields = models.Fields{"collected": count} + for _, out := range s.outs { + err := out.CollectPoint(point) + if err != nil { + return err } - }) - - // If no field data, don't include it in the results - if len(data.Values) == 0 { - return } - - allData = append(allData, data) } - }) - - // Add uptime to globalData - globalData.Values[UptimeVarName] = Uptime().Seconds() - - // Add Go memstats. - data := StatsData{ - Name: "runtime", } +} - var rt runtime.MemStats - runtime.ReadMemStats(&rt) - data.Values = map[string]interface{}{ - "Alloc": int64(rt.Alloc), - "TotalAlloc": int64(rt.TotalAlloc), - "Sys": int64(rt.Sys), - "Lookups": int64(rt.Lookups), - "Mallocs": int64(rt.Mallocs), - "Frees": int64(rt.Frees), - "HeapAlloc": int64(rt.HeapAlloc), - "HeapSys": int64(rt.HeapSys), - "HeapIdle": int64(rt.HeapIdle), - "HeapInUse": int64(rt.HeapInuse), - "HeapReleased": int64(rt.HeapReleased), - "HeapObjects": int64(rt.HeapObjects), - "PauseTotalNs": int64(rt.PauseTotalNs), - "NumGC": int64(rt.NumGC), - "NumGoroutine": int64(runtime.NumGoroutine()), - } - allData = append(allData, data) - - return allData, nil +func (s *StatsNode) stopStats() { + close(s.closing) } diff --git a/stream.go b/stream.go index 4a9299372..d1a9cfda8 100644 --- a/stream.go +++ b/stream.go @@ -39,7 +39,6 @@ func newStreamNode(et *ExecutingTask, n *pipeline.StreamNode, l *log.Logger) (*S } func (s *StreamNode) runStream([]byte) error { - for pt, ok := s.ins[0].NextPoint(); ok; pt, ok = s.ins[0].NextPoint() { if s.matches(pt) { if s.s.Truncate != 0 { diff --git a/task.go b/task.go index 94dde2e5f..086b9bd5a 100644 --- a/task.go +++ b/task.go @@ -10,7 +10,6 @@ import ( "time" "github.com/influxdata/kapacitor/pipeline" - "github.com/influxdata/kapacitor/tick" ) // The type of a task @@ -58,37 +57,6 @@ type Task struct { SnapshotInterval time.Duration } -func NewTask( - name, - script string, - tt TaskType, - dbrps []DBRP, - snapshotInterval time.Duration, - scope *tick.Scope, -) (*Task, error) { - t := &Task{ - Name: name, - Type: tt, - DBRPs: dbrps, - SnapshotInterval: snapshotInterval, - } - - var srcEdge pipeline.EdgeType - switch tt { - case StreamTask: - srcEdge = pipeline.StreamEdge - case BatchTask: - srcEdge = pipeline.BatchEdge - } - - p, err := pipeline.CreatePipeline(script, srcEdge, scope) - if err != nil { - return nil, err - } - t.Pipeline = p - return t, nil -} - func (t *Task) Dot() []byte { return t.Pipeline.Dot(t.Name) } @@ -369,6 +337,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, l *log.Logger) (Node, error return newDerivativeNode(et, t, l) case *pipeline.UDFNode: return newUDFNode(et, t, l) + case *pipeline.StatsNode: + return newStatsNode(et, t, l) default: return nil, fmt.Errorf("unknown pipeline node type %T", p) } diff --git a/task_master.go b/task_master.go index 689783ab5..35bca8b1f 100644 --- a/task_master.go +++ b/task_master.go @@ -38,6 +38,7 @@ type TaskMaster struct { HasSnapshot(name string) bool LoadSnapshot(name string) (*TaskSnapshot, error) } + DeadmanService pipeline.DeadmanService UDFService UDFService @@ -115,6 +116,7 @@ func (tm *TaskMaster) New() *TaskMaster { n := NewTaskMaster(tm.LogService) n.HTTPDService = tm.HTTPDService n.UDFService = tm.UDFService + n.DeadmanService = tm.DeadmanService n.TaskStore = tm.TaskStore n.InfluxDBService = tm.InfluxDBService n.SMTPService = tm.SMTPService @@ -168,6 +170,38 @@ func (tm *TaskMaster) Drain() { } } +// Create a new task in the context of a TaskMaster +func (tm *TaskMaster) NewTask( + name, + script string, + tt TaskType, + dbrps []DBRP, + snapshotInterval time.Duration, +) (*Task, error) { + t := &Task{ + Name: name, + Type: tt, + DBRPs: dbrps, + SnapshotInterval: snapshotInterval, + } + scope := tm.CreateTICKScope() + + var srcEdge pipeline.EdgeType + switch tt { + case StreamTask: + srcEdge = pipeline.StreamEdge + case BatchTask: + srcEdge = pipeline.BatchEdge + } + + p, err := pipeline.CreatePipeline(script, srcEdge, scope, tm.DeadmanService) + if err != nil { + return nil, err + } + t.Pipeline = p + return t, nil +} + func (tm *TaskMaster) waitForForks() { if tm.drained { return diff --git a/tick/eval.go b/tick/eval.go index 7f2410147..e31dc4f95 100644 --- a/tick/eval.go +++ b/tick/eval.go @@ -154,7 +154,7 @@ func eval(n Node, scope *Scope, stck *stack) (err error) { func evalUnary(op tokenType, scope *Scope, stck *stack) error { v := stck.Pop() switch op { - case tokenMinus: + case TokenMinus: switch n := v.(type) { case float64: stck.Push(-1 * n) @@ -163,7 +163,7 @@ func evalUnary(op tokenType, scope *Scope, stck *stack) error { default: return fmt.Errorf("invalid arugument to '-' %v", v) } - case tokenNot: + case TokenNot: if b, ok := v.(bool); ok { stck.Push(!b) } else { @@ -177,10 +177,10 @@ func evalBinary(op tokenType, scope *Scope, stck *stack) error { r := stck.Pop() l := stck.Pop() switch op { - case tokenAsgn: + case TokenAsgn: i := l.(*IdentifierNode) scope.Set(i.Ident, r) - case tokenDot: + case TokenDot: // Resolve identifier if left, ok := l.(*IdentifierNode); ok { var err error diff --git a/tick/lex.go b/tick/lex.go index 1d3ac3d10..9bfc1b8a2 100644 --- a/tick/lex.go +++ b/tick/lex.go @@ -14,24 +14,24 @@ type stateFn func(*lexer) stateFn const eof = -1 const ( - tokenError tokenType = iota - tokenEOF - tokenVar - tokenAsgn - tokenDot - tokenIdent - tokenReference - tokenLambda - tokenNumber - tokenString - tokenDuration - tokenLParen - tokenRParen - tokenComma - tokenNot - tokenTrue - tokenFalse - tokenRegex + TokenError tokenType = iota + TokenEOF + TokenVar + TokenAsgn + TokenDot + TokenIdent + TokenReference + TokenLambda + TokenNumber + TokenString + TokenDuration + TokenLParen + TokenRParen + TokenComma + TokenNot + TokenTrue + TokenFalse + TokenRegex // begin operator tokens begin_tok_operator @@ -39,11 +39,11 @@ const ( //begin mathematical operators begin_tok_operator_math - tokenPlus - tokenMinus - tokenMult - tokenDiv - tokenMod + TokenPlus + TokenMinus + TokenMult + TokenDiv + TokenMod //end mathematical operators end_tok_operator_math @@ -51,16 +51,16 @@ const ( // begin comparison operators begin_tok_operator_comp - tokenAnd - tokenOr - tokenEqual - tokenNotEqual - tokenLess - tokenGreater - tokenLessEqual - tokenGreaterEqual - tokenRegexEqual - tokenRegexNotEqual + TokenAnd + TokenOr + TokenEqual + TokenNotEqual + TokenLess + TokenGreater + TokenLessEqual + TokenGreaterEqual + TokenRegexEqual + TokenRegexNotEqual //end comparison operators end_tok_operator_comp @@ -70,33 +70,33 @@ const ( ) var operatorStr = [...]string{ - tokenNot: "!", - tokenPlus: "+", - tokenMinus: "-", - tokenMult: "*", - tokenDiv: "/", - tokenMod: "%", - tokenEqual: "==", - tokenNotEqual: "!=", - tokenLess: "<", - tokenGreater: ">", - tokenLessEqual: "<=", - tokenGreaterEqual: ">=", - tokenRegexEqual: "=~", - tokenRegexNotEqual: "!~", - tokenAnd: "AND", - tokenOr: "OR", + TokenNot: "!", + TokenPlus: "+", + TokenMinus: "-", + TokenMult: "*", + TokenDiv: "/", + TokenMod: "%", + TokenEqual: "==", + TokenNotEqual: "!=", + TokenLess: "<", + TokenGreater: ">", + TokenLessEqual: "<=", + TokenGreaterEqual: ">=", + TokenRegexEqual: "=~", + TokenRegexNotEqual: "!~", + TokenAnd: "AND", + TokenOr: "OR", } var strToOperator map[string]tokenType var keywords = map[string]tokenType{ - "AND": tokenAnd, - "OR": tokenOr, - "TRUE": tokenTrue, - "FALSE": tokenFalse, - "var": tokenVar, - "lambda": tokenLambda, + "AND": TokenAnd, + "OR": TokenOr, + "TRUE": TokenTrue, + "FALSE": TokenFalse, + "var": TokenVar, + "lambda": TokenLambda, } func init() { @@ -109,39 +109,39 @@ func init() { //String representation of an tokenType func (t tokenType) String() string { switch { - case t == tokenError: + case t == TokenError: return "ERR" - case t == tokenEOF: + case t == TokenEOF: return "EOF" - case t == tokenVar: + case t == TokenVar: return "var" - case t == tokenIdent: + case t == TokenIdent: return "identifier" - case t == tokenReference: + case t == TokenReference: return "reference" - case t == tokenDuration: + case t == TokenDuration: return "duration" - case t == tokenNumber: + case t == TokenNumber: return "number" - case t == tokenString: + case t == TokenString: return "string" - case t == tokenRegex: + case t == TokenRegex: return "regex" - case t == tokenDot: + case t == TokenDot: return "." - case t == tokenAsgn: + case t == TokenAsgn: return "=" - case t == tokenLParen: + case t == TokenLParen: return "(" - case t == tokenRParen: + case t == TokenRParen: return ")" - case t == tokenComma: + case t == TokenComma: return "," - case t == tokenNot: + case t == TokenNot: return "!" - case t == tokenTrue: + case t == TokenTrue: return "TRUE" - case t == tokenFalse: + case t == TokenFalse: return "FALSE" case isOperator(t): return operatorStr[t] @@ -235,7 +235,7 @@ func (l *lexer) next() (r rune) { // errorf returns an error token and terminates the scan by passing // back a nil pointer that will be the next state, terminating l.nextToken. func (l *lexer) errorf(format string, args ...interface{}) stateFn { - l.tokens <- token{tokenError, l.start, fmt.Sprintf(format, args...)} + l.tokens <- token{TokenError, l.start, fmt.Sprintf(format, args...)} return nil } @@ -297,19 +297,19 @@ func lexToken(l *lexer) stateFn { case isSpace(r): l.ignore() case r == '(': - l.emit(tokenLParen) + l.emit(TokenLParen) return lexToken case r == ')': - l.emit(tokenRParen) + l.emit(TokenRParen) return lexToken case r == '.': - l.emit(tokenDot) + l.emit(TokenDot) return lexToken case r == ',': - l.emit(tokenComma) + l.emit(TokenComma) return lexToken case r == eof: - l.emit(tokenEOF) + l.emit(TokenEOF) return nil default: l.errorf("unknown state") @@ -345,7 +345,7 @@ func lexOperator(l *lexer) stateFn { } op := strToOperator[l.current()] l.emit(op) - if op == tokenRegexNotEqual { + if op == TokenRegexNotEqual { l.ignoreSpace() if l.peek() == '/' { return lexRegex @@ -364,14 +364,14 @@ func lexOperator(l *lexer) stateFn { l.next() op := strToOperator[l.current()] l.emit(op) - if op == tokenRegexEqual { + if op == TokenRegexEqual { l.ignoreSpace() if l.peek() == '/' { return lexRegex } } } else { - l.emit(tokenAsgn) + l.emit(TokenAsgn) l.ignoreSpace() if l.peek() == '/' { return lexRegex @@ -390,12 +390,12 @@ func lexIdentOrKeyword(l *lexer) stateFn { default: l.backup() if t := keywords[l.current()]; t > 0 { - if t == tokenLambda && l.next() != ':' { + if t == TokenLambda && l.next() != ':' { return l.errorf("missing ':' on lambda keyword") } l.emit(t) } else { - l.emit(tokenIdent) + l.emit(TokenIdent) } return lexToken } @@ -433,11 +433,11 @@ func lexNumberOrDuration(l *lexer) stateFn { if r == 'm' && l.peek() == 's' { l.next() } - l.emit(tokenDuration) + l.emit(TokenDuration) return lexToken default: l.backup() - l.emit(tokenNumber) + l.emit(TokenNumber) return lexToken } } @@ -451,7 +451,7 @@ func lexReference(l *lexer) stateFn { l.next() } case '"': - l.emit(tokenReference) + l.emit(TokenReference) return lexToken case eof: return l.errorf("unterminated field reference") @@ -474,7 +474,7 @@ func lexSingleOrTripleString(l *lexer) stateFn { count++ l.next() } else { - l.emit(tokenString) + l.emit(TokenString) return lexToken } } @@ -490,7 +490,7 @@ func lexSingleOrTripleString(l *lexer) stateFn { case r == '\'': count-- if count == 0 { - l.emit(tokenString) + l.emit(TokenString) return lexToken } case r == eof: @@ -517,7 +517,7 @@ func lexRegex(l *lexer) stateFn { l.next() } case r == '/': - l.emit(tokenRegex) + l.emit(TokenRegex) return lexToken default: //absorb diff --git a/tick/lex_test.go b/tick/lex_test.go index e176bba65..25b592291 100644 --- a/tick/lex_test.go +++ b/tick/lex_test.go @@ -36,378 +36,378 @@ func TestLexer(t *testing.T) { { in: "!", tokens: []token{ - token{tokenNot, 0, "!"}, - token{tokenEOF, 1, ""}, + token{TokenNot, 0, "!"}, + token{TokenEOF, 1, ""}, }, }, { in: "+", tokens: []token{ - token{tokenPlus, 0, "+"}, - token{tokenEOF, 1, ""}, + token{TokenPlus, 0, "+"}, + token{TokenEOF, 1, ""}, }, }, { in: "-", tokens: []token{ - token{tokenMinus, 0, "-"}, - token{tokenEOF, 1, ""}, + token{TokenMinus, 0, "-"}, + token{TokenEOF, 1, ""}, }, }, { in: "*", tokens: []token{ - token{tokenMult, 0, "*"}, - token{tokenEOF, 1, ""}, + token{TokenMult, 0, "*"}, + token{TokenEOF, 1, ""}, }, }, { in: "/", tokens: []token{ - token{tokenDiv, 0, "/"}, - token{tokenEOF, 1, ""}, + token{TokenDiv, 0, "/"}, + token{TokenEOF, 1, ""}, }, }, { in: "=", tokens: []token{ - token{tokenAsgn, 0, "="}, - token{tokenEOF, 1, ""}, + token{TokenAsgn, 0, "="}, + token{TokenEOF, 1, ""}, }, }, { in: "==", tokens: []token{ - token{tokenEqual, 0, "=="}, - token{tokenEOF, 2, ""}, + token{TokenEqual, 0, "=="}, + token{TokenEOF, 2, ""}, }, }, { in: "!=", tokens: []token{ - token{tokenNotEqual, 0, "!="}, - token{tokenEOF, 2, ""}, + token{TokenNotEqual, 0, "!="}, + token{TokenEOF, 2, ""}, }, }, { in: ">", tokens: []token{ - token{tokenGreater, 0, ">"}, - token{tokenEOF, 1, ""}, + token{TokenGreater, 0, ">"}, + token{TokenEOF, 1, ""}, }, }, { in: ">=", tokens: []token{ - token{tokenGreaterEqual, 0, ">="}, - token{tokenEOF, 2, ""}, + token{TokenGreaterEqual, 0, ">="}, + token{TokenEOF, 2, ""}, }, }, { in: "<", tokens: []token{ - token{tokenLess, 0, "<"}, - token{tokenEOF, 1, ""}, + token{TokenLess, 0, "<"}, + token{TokenEOF, 1, ""}, }, }, { in: "<=", tokens: []token{ - token{tokenLessEqual, 0, "<="}, - token{tokenEOF, 2, ""}, + token{TokenLessEqual, 0, "<="}, + token{TokenEOF, 2, ""}, }, }, { in: "=~", tokens: []token{ - token{tokenRegexEqual, 0, "=~"}, - token{tokenEOF, 2, ""}, + token{TokenRegexEqual, 0, "=~"}, + token{TokenEOF, 2, ""}, }, }, { in: "!~", tokens: []token{ - token{tokenRegexNotEqual, 0, "!~"}, - token{tokenEOF, 2, ""}, + token{TokenRegexNotEqual, 0, "!~"}, + token{TokenEOF, 2, ""}, }, }, { in: "(", tokens: []token{ - token{tokenLParen, 0, "("}, - token{tokenEOF, 1, ""}, + token{TokenLParen, 0, "("}, + token{TokenEOF, 1, ""}, }, }, { in: ")", tokens: []token{ - token{tokenRParen, 0, ")"}, - token{tokenEOF, 1, ""}, + token{TokenRParen, 0, ")"}, + token{TokenEOF, 1, ""}, }, }, { in: ".", tokens: []token{ - token{tokenDot, 0, "."}, - token{tokenEOF, 1, ""}, + token{TokenDot, 0, "."}, + token{TokenEOF, 1, ""}, }, }, // Keywords { in: "AND", tokens: []token{ - token{tokenAnd, 0, "AND"}, - token{tokenEOF, 3, ""}, + token{TokenAnd, 0, "AND"}, + token{TokenEOF, 3, ""}, }, }, { in: "OR", tokens: []token{ - token{tokenOr, 0, "OR"}, - token{tokenEOF, 2, ""}, + token{TokenOr, 0, "OR"}, + token{TokenEOF, 2, ""}, }, }, { in: "TRUE", tokens: []token{ - token{tokenTrue, 0, "TRUE"}, - token{tokenEOF, 4, ""}, + token{TokenTrue, 0, "TRUE"}, + token{TokenEOF, 4, ""}, }, }, { in: "FALSE", tokens: []token{ - token{tokenFalse, 0, "FALSE"}, - token{tokenEOF, 5, ""}, + token{TokenFalse, 0, "FALSE"}, + token{TokenEOF, 5, ""}, }, }, { in: "var", tokens: []token{ - token{tokenVar, 0, "var"}, - token{tokenEOF, 3, ""}, + token{TokenVar, 0, "var"}, + token{TokenEOF, 3, ""}, }, }, //Numbers { in: "42", tokens: []token{ - token{tokenNumber, 0, "42"}, - token{tokenEOF, 2, ""}, + token{TokenNumber, 0, "42"}, + token{TokenEOF, 2, ""}, }, }, { in: "42.21", tokens: []token{ - token{tokenNumber, 0, "42.21"}, - token{tokenEOF, 5, ""}, + token{TokenNumber, 0, "42.21"}, + token{TokenEOF, 5, ""}, }, }, { in: ".421", tokens: []token{ - token{tokenDot, 0, "."}, - token{tokenNumber, 1, "421"}, - token{tokenEOF, 4, ""}, + token{TokenDot, 0, "."}, + token{TokenNumber, 1, "421"}, + token{TokenEOF, 4, ""}, }, }, { in: "0.421", tokens: []token{ - token{tokenNumber, 0, "0.421"}, - token{tokenEOF, 5, ""}, + token{TokenNumber, 0, "0.421"}, + token{TokenEOF, 5, ""}, }, }, //Durations { in: "42s", tokens: []token{ - token{tokenDuration, 0, "42s"}, - token{tokenEOF, 3, ""}, + token{TokenDuration, 0, "42s"}, + token{TokenEOF, 3, ""}, }, }, { in: "42.21m", tokens: []token{ - token{tokenDuration, 0, "42.21m"}, - token{tokenEOF, 6, ""}, + token{TokenDuration, 0, "42.21m"}, + token{TokenEOF, 6, ""}, }, }, { in: ".421h", tokens: []token{ - token{tokenDot, 0, "."}, - token{tokenDuration, 1, "421h"}, - token{tokenEOF, 5, ""}, + token{TokenDot, 0, "."}, + token{TokenDuration, 1, "421h"}, + token{TokenEOF, 5, ""}, }, }, { in: "0.421s", tokens: []token{ - token{tokenDuration, 0, "0.421s"}, - token{tokenEOF, 6, ""}, + token{TokenDuration, 0, "0.421s"}, + token{TokenEOF, 6, ""}, }, }, { in: "1u", tokens: []token{ - token{tokenDuration, 0, "1u"}, - token{tokenEOF, 2, ""}, + token{TokenDuration, 0, "1u"}, + token{TokenEOF, 2, ""}, }, }, { in: "1µ", tokens: []token{ - token{tokenDuration, 0, "1µ"}, - token{tokenEOF, 3, ""}, + token{TokenDuration, 0, "1µ"}, + token{TokenEOF, 3, ""}, }, }, { in: "1ms", tokens: []token{ - token{tokenDuration, 0, "1ms"}, - token{tokenEOF, 3, ""}, + token{TokenDuration, 0, "1ms"}, + token{TokenEOF, 3, ""}, }, }, { in: "1h", tokens: []token{ - token{tokenDuration, 0, "1h"}, - token{tokenEOF, 2, ""}, + token{TokenDuration, 0, "1h"}, + token{TokenEOF, 2, ""}, }, }, { in: "1d", tokens: []token{ - token{tokenDuration, 0, "1d"}, - token{tokenEOF, 2, ""}, + token{TokenDuration, 0, "1d"}, + token{TokenEOF, 2, ""}, }, }, { in: "1w", tokens: []token{ - token{tokenDuration, 0, "1w"}, - token{tokenEOF, 2, ""}, + token{TokenDuration, 0, "1w"}, + token{TokenEOF, 2, ""}, }, }, //Identifier { in: "variable", tokens: []token{ - token{tokenIdent, 0, "variable"}, - token{tokenEOF, 8, ""}, + token{TokenIdent, 0, "variable"}, + token{TokenEOF, 8, ""}, }, }, { in: "myVar01", tokens: []token{ - token{tokenIdent, 0, "myVar01"}, - token{tokenEOF, 7, ""}, + token{TokenIdent, 0, "myVar01"}, + token{TokenEOF, 7, ""}, }, }, // References { in: `""`, tokens: []token{ - token{tokenReference, 0, `""`}, - token{tokenEOF, 2, ""}, + token{TokenReference, 0, `""`}, + token{TokenEOF, 2, ""}, }, }, { in: `"ref with spaces"`, tokens: []token{ - token{tokenReference, 0, `"ref with spaces"`}, - token{tokenEOF, 17, ""}, + token{TokenReference, 0, `"ref with spaces"`}, + token{TokenEOF, 17, ""}, }, }, { in: `"ref\""`, tokens: []token{ - token{tokenReference, 0, `"ref\""`}, - token{tokenEOF, 7, ""}, + token{TokenReference, 0, `"ref\""`}, + token{TokenEOF, 7, ""}, }, }, //Strings { in: `''`, tokens: []token{ - token{tokenString, 0, `''`}, - token{tokenEOF, 2, ""}, + token{TokenString, 0, `''`}, + token{TokenEOF, 2, ""}, }, }, { in: `''''''`, tokens: []token{ - token{tokenString, 0, `''''''`}, - token{tokenEOF, 6, ""}, + token{TokenString, 0, `''''''`}, + token{TokenEOF, 6, ""}, }, }, { in: `'str'`, tokens: []token{ - token{tokenString, 0, `'str'`}, - token{tokenEOF, 5, ""}, + token{TokenString, 0, `'str'`}, + token{TokenEOF, 5, ""}, }, }, { in: `'str\''`, tokens: []token{ - token{tokenString, 0, `'str\''`}, - token{tokenEOF, 7, ""}, + token{TokenString, 0, `'str\''`}, + token{TokenEOF, 7, ""}, }, }, { in: `'''s'tr'''`, tokens: []token{ - token{tokenString, 0, `'''s'tr'''`}, - token{tokenEOF, 10, ""}, + token{TokenString, 0, `'''s'tr'''`}, + token{TokenEOF, 10, ""}, }, }, { in: `'''s\'tr'''`, tokens: []token{ - token{tokenString, 0, `'''s\'tr'''`}, - token{tokenEOF, 11, ""}, + token{TokenString, 0, `'''s\'tr'''`}, + token{TokenEOF, 11, ""}, }, }, { in: `'''str'''`, tokens: []token{ - token{tokenString, 0, `'''str'''`}, - token{tokenEOF, 9, ""}, + token{TokenString, 0, `'''str'''`}, + token{TokenEOF, 9, ""}, }, }, // Regex -- can only be lexed within context { in: `=~ //`, tokens: []token{ - token{tokenRegexEqual, 0, "=~"}, - token{tokenRegex, 3, "//"}, - token{tokenEOF, 5, ""}, + token{TokenRegexEqual, 0, "=~"}, + token{TokenRegex, 3, "//"}, + token{TokenEOF, 5, ""}, }, }, { in: `!~ //`, tokens: []token{ - token{tokenRegexNotEqual, 0, "!~"}, - token{tokenRegex, 3, "//"}, - token{tokenEOF, 5, ""}, + token{TokenRegexNotEqual, 0, "!~"}, + token{TokenRegex, 3, "//"}, + token{TokenEOF, 5, ""}, }, }, { in: `= //`, tokens: []token{ - token{tokenAsgn, 0, "="}, - token{tokenRegex, 2, "//"}, - token{tokenEOF, 4, ""}, + token{TokenAsgn, 0, "="}, + token{TokenRegex, 2, "//"}, + token{TokenEOF, 4, ""}, }, }, { in: `= /^((.*)[a-z]+\S{0,2})|cat\/\/$/`, tokens: []token{ - token{tokenAsgn, 0, "="}, - token{tokenRegex, 2, `/^((.*)[a-z]+\S{0,2})|cat\/\/$/`}, - token{tokenEOF, 33, ""}, + token{TokenAsgn, 0, "="}, + token{TokenRegex, 2, `/^((.*)[a-z]+\S{0,2})|cat\/\/$/`}, + token{TokenEOF, 33, ""}, }, }, @@ -415,89 +415,89 @@ func TestLexer(t *testing.T) { { in: " ", tokens: []token{ - token{tokenEOF, 1, ""}, + token{TokenEOF, 1, ""}, }, }, { in: " \t\n", tokens: []token{ - token{tokenEOF, 3, ""}, + token{TokenEOF, 3, ""}, }, }, //Combinations { in: "var x = avg()", tokens: []token{ - token{tokenVar, 0, "var"}, - token{tokenIdent, 4, "x"}, - token{tokenAsgn, 6, "="}, - token{tokenIdent, 8, "avg"}, - token{tokenLParen, 11, "("}, - token{tokenRParen, 12, ")"}, - token{tokenEOF, 13, ""}, + token{TokenVar, 0, "var"}, + token{TokenIdent, 4, "x"}, + token{TokenAsgn, 6, "="}, + token{TokenIdent, 8, "avg"}, + token{TokenLParen, 11, "("}, + token{TokenRParen, 12, ")"}, + token{TokenEOF, 13, ""}, }, }, { in: "var x = avg().parallel(4)x.groupby('cpu').window().period(10s)", tokens: []token{ - token{tokenVar, 0, "var"}, - token{tokenIdent, 4, "x"}, - token{tokenAsgn, 6, "="}, - token{tokenIdent, 8, "avg"}, - token{tokenLParen, 11, "("}, - token{tokenRParen, 12, ")"}, - token{tokenDot, 13, "."}, - token{tokenIdent, 14, "parallel"}, - token{tokenLParen, 22, "("}, - token{tokenNumber, 23, "4"}, - token{tokenRParen, 24, ")"}, - token{tokenIdent, 25, "x"}, - token{tokenDot, 26, "."}, - token{tokenIdent, 27, "groupby"}, - token{tokenLParen, 34, "("}, - token{tokenString, 35, "'cpu'"}, - token{tokenRParen, 40, ")"}, - token{tokenDot, 41, "."}, - token{tokenIdent, 42, "window"}, - token{tokenLParen, 48, "("}, - token{tokenRParen, 49, ")"}, - token{tokenDot, 50, "."}, - token{tokenIdent, 51, "period"}, - token{tokenLParen, 57, "("}, - token{tokenDuration, 58, "10s"}, - token{tokenRParen, 61, ")"}, - token{tokenEOF, 62, ""}, + token{TokenVar, 0, "var"}, + token{TokenIdent, 4, "x"}, + token{TokenAsgn, 6, "="}, + token{TokenIdent, 8, "avg"}, + token{TokenLParen, 11, "("}, + token{TokenRParen, 12, ")"}, + token{TokenDot, 13, "."}, + token{TokenIdent, 14, "parallel"}, + token{TokenLParen, 22, "("}, + token{TokenNumber, 23, "4"}, + token{TokenRParen, 24, ")"}, + token{TokenIdent, 25, "x"}, + token{TokenDot, 26, "."}, + token{TokenIdent, 27, "groupby"}, + token{TokenLParen, 34, "("}, + token{TokenString, 35, "'cpu'"}, + token{TokenRParen, 40, ")"}, + token{TokenDot, 41, "."}, + token{TokenIdent, 42, "window"}, + token{TokenLParen, 48, "("}, + token{TokenRParen, 49, ")"}, + token{TokenDot, 50, "."}, + token{TokenIdent, 51, "period"}, + token{TokenLParen, 57, "("}, + token{TokenDuration, 58, "10s"}, + token{TokenRParen, 61, ")"}, + token{TokenEOF, 62, ""}, }, }, //Comments { in: "var x = avg()\n// Comment all of this is ignored\nx.groupby('cpu')", tokens: []token{ - token{tokenVar, 0, "var"}, - token{tokenIdent, 4, "x"}, - token{tokenAsgn, 6, "="}, - token{tokenIdent, 8, "avg"}, - token{tokenLParen, 11, "("}, - token{tokenRParen, 12, ")"}, - token{tokenIdent, 48, "x"}, - token{tokenDot, 49, "."}, - token{tokenIdent, 50, "groupby"}, - token{tokenLParen, 57, "("}, - token{tokenString, 58, "'cpu'"}, - token{tokenRParen, 63, ")"}, - token{tokenEOF, 64, ""}, + token{TokenVar, 0, "var"}, + token{TokenIdent, 4, "x"}, + token{TokenAsgn, 6, "="}, + token{TokenIdent, 8, "avg"}, + token{TokenLParen, 11, "("}, + token{TokenRParen, 12, ")"}, + token{TokenIdent, 48, "x"}, + token{TokenDot, 49, "."}, + token{TokenIdent, 50, "groupby"}, + token{TokenLParen, 57, "("}, + token{TokenString, 58, "'cpu'"}, + token{TokenRParen, 63, ")"}, + token{TokenEOF, 64, ""}, }, }, { in: "var x = avg()\n// Comment all of this is ignored", tokens: []token{ - token{tokenVar, 0, "var"}, - token{tokenIdent, 4, "x"}, - token{tokenAsgn, 6, "="}, - token{tokenIdent, 8, "avg"}, - token{tokenLParen, 11, "("}, - token{tokenRParen, 12, ")"}, - token{tokenEOF, 47, ""}, + token{TokenVar, 0, "var"}, + token{TokenIdent, 4, "x"}, + token{TokenAsgn, 6, "="}, + token{TokenIdent, 8, "avg"}, + token{TokenLParen, 11, "("}, + token{TokenRParen, 12, ")"}, + token{TokenEOF, 47, ""}, }, }, } diff --git a/tick/node_test.go b/tick/node_test.go index 092819ebd..4ff6ad160 100644 --- a/tick/node_test.go +++ b/tick/node_test.go @@ -108,7 +108,7 @@ func TestNewBinaryNode(t *testing.T) { Right: nil, Operator: token{ pos: 0, - typ: tokenEqual, + typ: TokenEqual, val: "=", }, }, diff --git a/tick/parser.go b/tick/parser.go index 85c13bbce..5ea37947a 100644 --- a/tick/parser.go +++ b/tick/parser.go @@ -106,7 +106,7 @@ func (p *parser) unexpected(tok token, expected ...tokenType) { } expectedStr := strings.Join(expectedStrs, ",") tokStr := tok.typ.String() - if tok.typ == tokenError { + if tok.typ == TokenError { tokStr = tok.val } p.errorf("unexpected %s line %d char %d in \"%s\". expected: %s", tokStr, line, char, p.Text[start:stop], expectedStr) @@ -147,7 +147,7 @@ func (p *parser) Parse(text string) (err error) { // It runs to EOF. func (p *parser) parse() { p.Root = p.program() - p.expect(tokenEOF) + p.expect(TokenEOF) //if err := t.Root.Check(); err != nil { // t.error(err) //} @@ -158,7 +158,7 @@ func (p *parser) program() Node { l := newList(p.peek().pos) for { switch p.peek().typ { - case tokenEOF: + case TokenEOF: return l default: s := p.statement() @@ -170,7 +170,7 @@ func (p *parser) program() Node { //parse a statement func (p *parser) statement() Node { var n Node - if p.peek().typ == tokenVar { + if p.peek().typ == TokenVar { n = p.declaration() } else { n = p.expression() @@ -181,22 +181,22 @@ func (p *parser) statement() Node { //parse a declaration statement func (p *parser) declaration() Node { v := p.vr() - op := p.expect(tokenAsgn) + op := p.expect(TokenAsgn) b := p.expression() return newBinary(op, v, b) } //parse a 'var ident' expression func (p *parser) vr() Node { - p.expect(tokenVar) - ident := p.expect(tokenIdent) + p.expect(TokenVar) + ident := p.expect(TokenIdent) return newIdent(ident.pos, ident.val) } //parse an expression func (p *parser) expression() Node { switch p.peek().typ { - case tokenIdent: + case TokenIdent: term := p.funcOrIdent() return p.chain(term) default: @@ -207,7 +207,7 @@ func (p *parser) expression() Node { //parse a function or identifier invocation chain // '.' operator is left-associative func (p *parser) chain(lhs Node) Node { - for look := p.peek().typ; look == tokenDot; look = p.peek().typ { + for look := p.peek().typ; look == TokenDot; look = p.peek().typ { op := p.next() rhs := p.funcOrIdent() lhs = newBinary(op, lhs, rhs) @@ -217,7 +217,7 @@ func (p *parser) chain(lhs Node) Node { func (p *parser) funcOrIdent() (n Node) { p.next() - if p.peek().typ == tokenLParen { + if p.peek().typ == TokenLParen { p.backup() n = p.function() } else { @@ -229,17 +229,17 @@ func (p *parser) funcOrIdent() (n Node) { //parse an identifier func (p *parser) identifier() Node { - ident := p.expect(tokenIdent) + ident := p.expect(TokenIdent) n := newIdent(ident.pos, ident.val) return n } //parse a function call func (p *parser) function() Node { - ident := p.expect(tokenIdent) - p.expect(tokenLParen) + ident := p.expect(TokenIdent) + p.expect(TokenLParen) args := p.parameters() - p.expect(tokenRParen) + p.expect(TokenRParen) n := newFunc(ident.pos, ident.val, args) return n @@ -248,11 +248,11 @@ func (p *parser) function() Node { //parse a parameter list func (p *parser) parameters() (args []Node) { for { - if p.peek().typ == tokenRParen { + if p.peek().typ == TokenRParen { return } args = append(args, p.parameter()) - if p.next().typ != tokenComma { + if p.next().typ != TokenComma { p.backup() return } @@ -261,9 +261,9 @@ func (p *parser) parameters() (args []Node) { func (p *parser) parameter() (n Node) { switch p.peek().typ { - case tokenIdent: + case TokenIdent: n = p.expression() - case tokenLambda: + case TokenLambda: lambda := p.next() l := p.lambdaExpr() n = newLambda(lambda.pos, l) @@ -280,21 +280,21 @@ func (p *parser) lambdaExpr() Node { // Operator Precedence parsing var precedence = [...]int{ - tokenOr: 0, - tokenAnd: 1, - tokenEqual: 2, - tokenNotEqual: 2, - tokenRegexEqual: 2, - tokenRegexNotEqual: 2, - tokenGreater: 3, - tokenGreaterEqual: 3, - tokenLess: 3, - tokenLessEqual: 3, - tokenPlus: 4, - tokenMinus: 4, - tokenMult: 5, - tokenDiv: 5, - tokenMod: 5, + TokenOr: 0, + TokenAnd: 1, + TokenEqual: 2, + TokenNotEqual: 2, + TokenRegexEqual: 2, + TokenRegexNotEqual: 2, + TokenGreater: 3, + TokenGreaterEqual: 3, + TokenLess: 3, + TokenLessEqual: 3, + TokenPlus: 4, + TokenMinus: 4, + TokenMult: 5, + TokenDiv: 5, + TokenMod: 5, } // parse the expression considering operator precedence. @@ -317,10 +317,10 @@ func (p *parser) precedence(lhs Node, minP int) Node { //parse a function call in a lambda expr func (p *parser) lfunction() Node { - ident := p.expect(tokenIdent) - p.expect(tokenLParen) + ident := p.expect(TokenIdent) + p.expect(TokenLParen) args := p.lparameters() - p.expect(tokenRParen) + p.expect(TokenRParen) n := newFunc(ident.pos, ident.val, args) return n @@ -329,11 +329,11 @@ func (p *parser) lfunction() Node { //parse a parameter list in a lfunction func (p *parser) lparameters() (args []Node) { for { - if p.peek().typ == tokenRParen { + if p.peek().typ == TokenRParen { return } args = append(args, p.lparameter()) - if p.next().typ != tokenComma { + if p.next().typ != TokenComma { p.backup() return } @@ -351,49 +351,49 @@ func (p *parser) lparameter() (n Node) { // parse a primary expression func (p *parser) primary() Node { switch tok := p.peek(); { - case tok.typ == tokenLParen: + case tok.typ == TokenLParen: p.next() n := p.lambdaExpr() - p.expect(tokenRParen) + p.expect(TokenRParen) return n - case tok.typ == tokenNumber: + case tok.typ == TokenNumber: return p.number() - case tok.typ == tokenString: + case tok.typ == TokenString: return p.string() - case tok.typ == tokenTrue, tok.typ == tokenFalse: + case tok.typ == TokenTrue, tok.typ == TokenFalse: return p.boolean() - case tok.typ == tokenDuration: + case tok.typ == TokenDuration: return p.duration() - case tok.typ == tokenRegex: + case tok.typ == TokenRegex: return p.regex() - case tok.typ == tokenMult: + case tok.typ == TokenMult: return p.star() - case tok.typ == tokenReference: + case tok.typ == TokenReference: return p.reference() - case tok.typ == tokenIdent: + case tok.typ == TokenIdent: p.next() - if p.peek().typ == tokenLParen { + if p.peek().typ == TokenLParen { p.backup() return p.lfunction() } p.backup() return p.identifier() - case tok.typ == tokenMinus, tok.typ == tokenNot: + case tok.typ == TokenMinus, tok.typ == TokenNot: p.next() return newUnary(tok, p.primary()) default: p.unexpected( tok, - tokenNumber, - tokenString, - tokenDuration, - tokenIdent, - tokenTrue, - tokenFalse, - tokenEqual, - tokenLParen, - tokenMinus, - tokenNot, + TokenNumber, + TokenString, + TokenDuration, + TokenIdent, + TokenTrue, + TokenFalse, + TokenEqual, + TokenLParen, + TokenMinus, + TokenNot, ) return nil } @@ -401,7 +401,7 @@ func (p *parser) primary() Node { //parse a duration literal func (p *parser) duration() Node { - token := p.expect(tokenDuration) + token := p.expect(TokenDuration) num, err := newDur(token.pos, token.val) if err != nil { p.error(err) @@ -411,7 +411,7 @@ func (p *parser) duration() Node { //parse a number literal func (p *parser) number() Node { - token := p.expect(tokenNumber) + token := p.expect(TokenNumber) num, err := newNumber(token.pos, token.val) if err != nil { p.error(err) @@ -421,14 +421,14 @@ func (p *parser) number() Node { //parse a string literal func (p *parser) string() Node { - token := p.expect(tokenString) + token := p.expect(TokenString) s := newString(token.pos, token.val) return s } //parse a regex literal func (p *parser) regex() Node { - token := p.expect(tokenRegex) + token := p.expect(TokenRegex) r, err := newRegex(token.pos, token.val) if err != nil { p.error(err) @@ -438,13 +438,13 @@ func (p *parser) regex() Node { // parse '*' literal func (p *parser) star() Node { - tok := p.expect(tokenMult) + tok := p.expect(TokenMult) return newStar(tok.pos) } //parse a reference literal func (p *parser) reference() Node { - token := p.expect(tokenReference) + token := p.expect(TokenReference) r := newReference(token.pos, token.val) return r } diff --git a/tick/parser_test.go b/tick/parser_test.go index 1a84b9efa..01efba366 100644 --- a/tick/parser_test.go +++ b/tick/parser_test.go @@ -15,16 +15,16 @@ func TestParserLookAhead(t *testing.T) { p := &parser{} p.lex = lex("0 1 2 3") - assert.Equal(token{tokenNumber, 0, "0"}, p.next()) - assert.Equal(token{tokenNumber, 2, "1"}, p.peek()) - assert.Equal(token{tokenNumber, 2, "1"}, p.next()) + assert.Equal(token{TokenNumber, 0, "0"}, p.next()) + assert.Equal(token{TokenNumber, 2, "1"}, p.peek()) + assert.Equal(token{TokenNumber, 2, "1"}, p.next()) p.backup() - assert.Equal(token{tokenNumber, 2, "1"}, p.next()) - assert.Equal(token{tokenNumber, 4, "2"}, p.peek()) + assert.Equal(token{TokenNumber, 2, "1"}, p.next()) + assert.Equal(token{TokenNumber, 4, "2"}, p.peek()) p.backup() - assert.Equal(token{tokenNumber, 2, "1"}, p.next()) - assert.Equal(token{tokenNumber, 4, "2"}, p.next()) - assert.Equal(token{tokenNumber, 6, "3"}, p.next()) + assert.Equal(token{TokenNumber, 2, "1"}, p.next()) + assert.Equal(token{TokenNumber, 4, "2"}, p.next()) + assert.Equal(token{TokenNumber, 6, "3"}, p.next()) } func TestParseErrors(t *testing.T) { @@ -133,7 +133,7 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", @@ -152,7 +152,7 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", @@ -171,14 +171,14 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", }, Right: &UnaryNode{ pos: 8, - Operator: tokenNot, + Operator: TokenNot, Node: &BoolNode{ pos: 9, Bool: false, @@ -194,7 +194,7 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", @@ -214,14 +214,14 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", }, Right: &UnaryNode{ pos: 8, - Operator: tokenMinus, + Operator: TokenMinus, Node: &NumberNode{ pos: 9, IsInt: true, @@ -238,7 +238,7 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", @@ -258,14 +258,14 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", }, Right: &UnaryNode{ pos: 8, - Operator: tokenMinus, + Operator: TokenMinus, Node: &NumberNode{ pos: 9, IsFloat: true, @@ -282,7 +282,7 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", @@ -301,14 +301,14 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", }, Right: &UnaryNode{ pos: 8, - Operator: tokenMinus, + Operator: TokenMinus, Node: &DurationNode{ pos: 9, Dur: time.Hour * 5, @@ -324,7 +324,7 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", @@ -343,14 +343,14 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "x", }, Right: &BinaryNode{ pos: 9, - Operator: tokenDot, + Operator: TokenDot, Left: &IdentifierNode{ pos: 8, Ident: "a", @@ -372,7 +372,7 @@ func TestParseStatements(t *testing.T) { Nodes: []Node{ &BinaryNode{ pos: 6, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 4, Ident: "t", @@ -385,7 +385,7 @@ func TestParseStatements(t *testing.T) { }, &BinaryNode{ pos: 20, - Operator: tokenDot, + Operator: TokenDot, Left: &IdentifierNode{ pos: 14, Ident: "stream", @@ -398,7 +398,7 @@ func TestParseStatements(t *testing.T) { pos: 27, Node: &BinaryNode{ pos: 43, - Operator: tokenGreater, + Operator: TokenGreater, Left: &ReferenceNode{ pos: 35, Reference: "value", @@ -426,23 +426,23 @@ var x = stream pos: 1, Nodes: []Node{&BinaryNode{ pos: 7, - Operator: tokenAsgn, + Operator: TokenAsgn, Left: &IdentifierNode{ pos: 5, Ident: "x", }, Right: &BinaryNode{ pos: 57, - Operator: tokenDot, + Operator: TokenDot, Left: &BinaryNode{ pos: 44, - Operator: tokenDot, + Operator: TokenDot, Left: &BinaryNode{ pos: 30, - Operator: tokenDot, + Operator: TokenDot, Left: &BinaryNode{ pos: 18, - Operator: tokenDot, + Operator: TokenDot, Left: &IdentifierNode{ pos: 9, Ident: "stream", @@ -475,10 +475,10 @@ var x = stream Func: "map", Args: []Node{&BinaryNode{ pos: 74, - Operator: tokenDot, + Operator: TokenDot, Left: &BinaryNode{ pos: 70, - Operator: tokenDot, + Operator: TokenDot, Left: &IdentifierNode{ pos: 62, Ident: "influxql", diff --git a/tick/stateful_expr.go b/tick/stateful_expr.go index 4c316ae60..c0908aa57 100644 --- a/tick/stateful_expr.go +++ b/tick/stateful_expr.go @@ -151,7 +151,7 @@ func (s *StatefulExpr) eval(n Node, scope *Scope, stck *stack) (err error) { func (s *StatefulExpr) evalUnary(op tokenType, scope *Scope, stck *stack) error { v := stck.Pop() switch op { - case tokenMinus: + case TokenMinus: switch n := v.(type) { case float64: stck.Push(-1 * n) @@ -160,7 +160,7 @@ func (s *StatefulExpr) evalUnary(op tokenType, scope *Scope, stck *stack) error default: return fmt.Errorf("invalid arugument to '-' %v", v) } - case tokenNot: + case TokenNot: if b, ok := v.(bool); ok { stck.Push(!b) } else { @@ -264,15 +264,15 @@ func (s *StatefulExpr) evalBinary(op tokenType, scope *Scope, stck *stack) (err func doIntMath(op tokenType, l, r int64) (v int64, err error) { switch op { - case tokenPlus: + case TokenPlus: v = l + r - case tokenMinus: + case TokenMinus: v = l - r - case tokenMult: + case TokenMult: v = l * r - case tokenDiv: + case TokenDiv: v = l / r - case tokenMod: + case TokenMod: v = l % r default: return 0, fmt.Errorf("invalid integer math operator %v", op) @@ -282,13 +282,13 @@ func doIntMath(op tokenType, l, r int64) (v int64, err error) { func doFloatMath(op tokenType, l, r float64) (v float64, err error) { switch op { - case tokenPlus: + case TokenPlus: v = l + r - case tokenMinus: + case TokenMinus: v = l - r - case tokenMult: + case TokenMult: v = l * r - case tokenDiv: + case TokenDiv: v = l / r default: return math.NaN(), fmt.Errorf("invalid float math operator %v", op) @@ -298,13 +298,13 @@ func doFloatMath(op tokenType, l, r float64) (v float64, err error) { func doBoolComp(op tokenType, l, r bool) (v bool, err error) { switch op { - case tokenEqual: + case TokenEqual: v = l == r - case tokenNotEqual: + case TokenNotEqual: v = l != r - case tokenAnd: + case TokenAnd: v = l && r - case tokenOr: + case TokenOr: v = l || r default: err = fmt.Errorf("invalid boolean comparison operator %v", op) @@ -314,17 +314,17 @@ func doBoolComp(op tokenType, l, r bool) (v bool, err error) { func doFloatComp(op tokenType, l, r float64) (v bool, err error) { switch op { - case tokenEqual: + case TokenEqual: v = l == r - case tokenNotEqual: + case TokenNotEqual: v = l != r - case tokenLess: + case TokenLess: v = l < r - case tokenGreater: + case TokenGreater: v = l > r - case tokenLessEqual: + case TokenLessEqual: v = l <= r - case tokenGreaterEqual: + case TokenGreaterEqual: v = l >= r default: err = fmt.Errorf("invalid float comparison operator %v", op) @@ -334,17 +334,17 @@ func doFloatComp(op tokenType, l, r float64) (v bool, err error) { func doStringComp(op tokenType, l, r string) (v bool, err error) { switch op { - case tokenEqual: + case TokenEqual: v = l == r - case tokenNotEqual: + case TokenNotEqual: v = l != r - case tokenLess: + case TokenLess: v = l < r - case tokenGreater: + case TokenGreater: v = l > r - case tokenLessEqual: + case TokenLessEqual: v = l <= r - case tokenGreaterEqual: + case TokenGreaterEqual: v = l >= r default: err = fmt.Errorf("invalid string comparison operator %v", op) @@ -354,9 +354,9 @@ func doStringComp(op tokenType, l, r string) (v bool, err error) { func doRegexComp(op tokenType, l string, r *regexp.Regexp) (v bool, err error) { switch op { - case tokenRegexEqual: + case TokenRegexEqual: v = r.MatchString(l) - case tokenRegexNotEqual: + case TokenRegexNotEqual: v = !r.MatchString(l) default: err = fmt.Errorf("invalid regex comparison operator %v", op)