From 1c803919b3901e443708bf387f8eecb9afacb7ef Mon Sep 17 00:00:00 2001 From: Nathaniel Cook Date: Fri, 17 Jun 2016 15:05:57 -0600 Subject: [PATCH] add optional to be able to show a task in the context of a replay (#652) --- CHANGELOG.md | 1 + client/API.md | 1 + client/v1/client.go | 2 + client/v1/swagger.yml | 4 ++ cmd/kapacitor/main.go | 20 ++++-- cmd/kapacitord/run/server.go | 10 ++- influxql.go | 9 +++ services/replay/service.go | 8 +++ services/task_store/service.go | 72 ++++++++++--------- task_master.go | 52 +++++++++++--- .../bash-completion/completions/kapacitor | 13 +++- 11 files changed, 140 insertions(+), 52 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6938984cf..fbc10fb2f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ ### Features - [#636](https://github.com/influxdata/kapacitor/pull/636): Change HTTP logs to be in Common Log format. +- [#](https://github.com/influxdata/kapacitor/pull/): Add optional replay ID to the task API so that you can get information about a task inside a running replay. ### Bugfixes diff --git a/client/API.md b/client/API.md index 327a9ba47..61f64b71f 100644 --- a/client/API.md +++ b/client/API.md @@ -256,6 +256,7 @@ To get information about a task make a GET request to the `/kapacitor/v1/tasks/T | --------------- | ------- | ------- | | dot-view | attributes | One of `labels` or `attributes`. Labels is less readable but will correctly render with all the information contained in labels. | | script-format | formatted | One of `formatted` or `raw`. Raw will return the script identical to how it was defined. Formatted will first format the script. | +| replay-id | | Optional ID of a running replay. The returned task information will be in the context of the task for the running replay. | A task has these read only properties in addition to the properties listed [above](#define-task). diff --git a/client/v1/client.go b/client/v1/client.go index 3f81ba272..eb08bf788 100644 --- a/client/v1/client.go +++ b/client/v1/client.go @@ -693,6 +693,7 @@ func (c *Client) UpdateTask(link Link, opt UpdateTaskOptions) (Task, error) { type TaskOptions struct { DotView string ScriptFormat string + ReplayID string } func (o *TaskOptions) Default() { @@ -708,6 +709,7 @@ func (o *TaskOptions) Values() *url.Values { v := &url.Values{} v.Set("dot-view", o.DotView) v.Set("script-format", o.ScriptFormat) + v.Set("replay-id", o.ReplayID) return v } diff --git a/client/v1/swagger.yml b/client/v1/swagger.yml index b994a7d96..4d7384e1e 100644 --- a/client/v1/swagger.yml +++ b/client/v1/swagger.yml @@ -145,6 +145,10 @@ paths: required: true type: string pattern: (formatted|raw) + - name: replay-id + in: query + description: Optional ID of a running replay. The returned task information will be in the context of the task for the running replay. + type: string responses: '200': description: Task information diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index c9e2bcdf4..33b5b86d9 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -14,7 +14,7 @@ import ( "strings" "time" - "github.com/dustin/go-humanize" + humanize "github.com/dustin/go-humanize" "github.com/influxdata/influxdb/influxql" "github.com/influxdata/kapacitor/client/v1" "github.com/pkg/errors" @@ -143,7 +143,8 @@ func main() { commandArgs = args commandF = doList case "show": - commandArgs = args + showFlags.Parse(args) + commandArgs = showFlags.Args() commandF = doShow case "show-template": commandArgs = args @@ -177,6 +178,7 @@ func init() { replayFlags.Usage = replayUsage defineFlags.Usage = defineUsage defineTemplateFlags.Usage = defineTemplateUsage + showFlags.Usage = showUsage recordStreamFlags.Usage = recordStreamUsage recordBatchFlags.Usage = recordBatchUsage @@ -1180,13 +1182,20 @@ func doReload(args []string) error { } // Show +var ( + showFlags = flag.NewFlagSet("show", flag.ExitOnError) + sReplayId = showFlags.String("replay", "", "Optional replay ID. If set the task information is in the context of the running replay.") +) func showUsage() { - var u = `Usage: kapacitor show [task ID] + var u = `Usage: kapacitor show [-replay] [task ID] Show details about a specific task. + +Options: ` fmt.Fprintln(os.Stderr, u) + showFlags.PrintDefaults() } func doShow(args []string) error { @@ -1196,7 +1205,10 @@ func doShow(args []string) error { os.Exit(2) } - t, err := cli.Task(cli.TaskLink(args[0]), nil) + t, err := cli.Task( + cli.TaskLink(args[0]), + &client.TaskOptions{ReplayID: *sReplayId}, + ) if err != nil { return err } diff --git a/cmd/kapacitord/run/server.go b/cmd/kapacitord/run/server.go index 5bb574d62..ac906082a 100644 --- a/cmd/kapacitord/run/server.go +++ b/cmd/kapacitord/run/server.go @@ -61,7 +61,8 @@ type Server struct { err chan error - TaskMaster *kapacitor.TaskMaster + TaskMaster *kapacitor.TaskMaster + TaskMasterLookup *kapacitor.TaskMasterLookup LogService logging.Interface HTTPDService *httpd.Service @@ -118,7 +119,9 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (* s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID) // Start Task Master - s.TaskMaster = kapacitor.NewTaskMaster("main", logService) + s.TaskMasterLookup = kapacitor.NewTaskMasterLookup() + s.TaskMaster = kapacitor.NewTaskMaster(kapacitor.MainTaskMaster, logService) + s.TaskMasterLookup.Set(s.TaskMaster) if err := s.TaskMaster.Open(); err != nil { return nil, err } @@ -224,7 +227,7 @@ func (s *Server) appendTaskStoreService(c task_store.Config) { srv := task_store.NewService(c, l) srv.StorageService = s.StorageService srv.HTTPDService = s.HTTPDService - srv.TaskMaster = s.TaskMaster + srv.TaskMasterLookup = s.TaskMasterLookup s.TaskStore = srv s.TaskMaster.TaskStore = srv @@ -239,6 +242,7 @@ func (s *Server) appendReplayService(c replay.Config) { srv.HTTPDService = s.HTTPDService srv.InfluxDBService = s.InfluxDBService srv.TaskMaster = s.TaskMaster + srv.TaskMasterLookup = s.TaskMasterLookup s.ReplayService = srv s.Services = append(s.Services, srv) diff --git a/influxql.go b/influxql.go index 3a438b09d..2e90d03a0 100644 --- a/influxql.go +++ b/influxql.go @@ -72,6 +72,7 @@ func (c *baseReduceContext) Time() time.Time { func (n *InfluxQLNode) runStreamInfluxQL() error { contexts := make(map[models.GroupID]reduceContext) for p, ok := n.ins[0].NextPoint(); ok; { + n.timer.Start() context := contexts[p.Group] // Fisrt point in window if context == nil { @@ -127,6 +128,7 @@ func (n *InfluxQLNode) runStreamInfluxQL() error { // go through loop again to initialize new iterator. } } + n.timer.Stop() } return nil } @@ -134,6 +136,7 @@ func (n *InfluxQLNode) runStreamInfluxQL() error { func (n *InfluxQLNode) runBatchInfluxQL() error { var exampleValue interface{} for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() { + n.timer.Start() // Create new base context c := baseReduceContext{ as: n.n.As, @@ -148,6 +151,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error { if len(b.Points) == 0 { if !n.n.ReduceCreater.IsEmptyOK { // If the reduce does not handle empty batches continue + n.timer.Stop() continue } if exampleValue == nil { @@ -171,6 +175,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error { if err != nil { n.logger.Println("E! failed to emit batch:", err) } + n.timer.Stop() } return nil } @@ -194,20 +199,24 @@ func (n *InfluxQLNode) emit(context reduceContext) error { if err != nil { return err } + n.timer.Pause() for _, out := range n.outs { err := out.CollectPoint(p) if err != nil { return err } } + n.timer.Resume() case pipeline.BatchEdge: b := context.EmitBatch() + n.timer.Pause() for _, out := range n.outs { err := out.CollectBatch(b) if err != nil { return err } } + n.timer.Resume() } return nil } diff --git a/services/replay/service.go b/services/replay/service.go index 241fe8c2c..20d90d957 100644 --- a/services/replay/service.go +++ b/services/replay/service.go @@ -73,6 +73,11 @@ type Service struct { NewDefaultClient() (client.Client, error) NewNamedClient(name string) (client.Client, error) } + TaskMasterLookup interface { + Get(string) *kapacitor.TaskMaster + Set(*kapacitor.TaskMaster) + Delete(*kapacitor.TaskMaster) + } TaskMaster interface { NewFork(name string, dbrps []kapacitor.DBRP, measurements []string) (*kapacitor.Edge, error) DelFork(name string) @@ -1165,6 +1170,9 @@ func (r *Service) doLiveQueryReplay(id string, task *kapacitor.Task, clk clock.C func (r *Service) doReplay(id string, task *kapacitor.Task, runReplay func(tm *kapacitor.TaskMaster) error) error { // Create new isolated task master tm := r.TaskMaster.New(id) + r.TaskMasterLookup.Set(tm) + defer r.TaskMasterLookup.Delete(tm) + tm.Open() defer tm.Close() et, err := tm.StartTask(task) diff --git a/services/task_store/service.go b/services/task_store/service.go index 755b2b0c1..f61299fe1 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -46,25 +46,11 @@ type Service struct { AddRoutes([]httpd.Route) error DelRoutes([]httpd.Route) } - TaskMaster interface { - NewTask( - name, - script string, - tt kapacitor.TaskType, - dbrps []kapacitor.DBRP, - snapshotInterval time.Duration, - vars map[string]tick.Var, - ) (*kapacitor.Task, error) - NewTemplate( - name, - script string, - tt kapacitor.TaskType, - ) (*kapacitor.Template, error) - StartTask(t *kapacitor.Task) (*kapacitor.ExecutingTask, error) - StopTask(name string) error - IsExecuting(name string) bool - ExecutionStats(name string) (kapacitor.ExecutionStats, error) - ExecutingDot(name string, labels bool) string + TaskMasterLookup interface { + Main() *kapacitor.TaskMaster + Get(string) *kapacitor.TaskMaster + Set(*kapacitor.TaskMaster) + Delete(*kapacitor.TaskMaster) } logger *log.Logger @@ -461,8 +447,21 @@ func (ts *Service) handleTask(w http.ResponseWriter, r *http.Request) { httpd.HttpError(w, fmt.Sprintf("invalid dot-view parameter %q", dotView), true, http.StatusBadRequest) return } + tmID := r.URL.Query().Get("replay-id") + if tmID == "" { + tmID = kapacitor.MainTaskMaster + } + tm := ts.TaskMasterLookup.Get(tmID) + if tm == nil { + httpd.HttpError(w, fmt.Sprintf("no running replay with ID: %s", tmID), true, http.StatusBadRequest) + return + } + if tmID != kapacitor.MainTaskMaster && !tm.IsExecuting(raw.ID) { + httpd.HttpError(w, fmt.Sprintf("replay %s is not for task: %s", tmID, raw.ID), true, http.StatusBadRequest) + return + } - t, err := ts.convertTask(raw, scriptFormat, dotView) + t, err := ts.convertTask(raw, scriptFormat, dotView, tm) if err != nil { httpd.HttpError(w, fmt.Sprintf("invalid task stored in db: %s", err.Error()), true, http.StatusInternalServerError) return @@ -556,9 +555,11 @@ func (ts *Service) handleListTasks(w http.ResponseWriter, r *http.Request) { rawTasks, err := ts.tasks.List(pattern, int(offset), int(limit)) tasks := make([]map[string]interface{}, len(rawTasks)) + tm := ts.TaskMasterLookup.Main() + for i, task := range rawTasks { tasks[i] = make(map[string]interface{}, len(fields)) - executing := ts.TaskMaster.IsExecuting(task.ID) + executing := tm.IsExecuting(task.ID) for _, field := range fields { var value interface{} switch field { @@ -596,7 +597,7 @@ func (ts *Service) handleListTasks(w http.ResponseWriter, r *http.Request) { value = executing case "dot": if executing { - value = ts.TaskMaster.ExecutingDot(task.ID, dotView == "labels") + value = tm.ExecutingDot(task.ID, dotView == "labels") } else { kt, err := ts.newKapacitorTask(task) if err != nil { @@ -606,7 +607,7 @@ func (ts *Service) handleListTasks(w http.ResponseWriter, r *http.Request) { } case "stats": if executing { - s, err := ts.TaskMaster.ExecutionStats(task.ID) + s, err := tm.ExecutionStats(task.ID) if err != nil { ts.logger.Printf("E! failed to retrieve stats for task %s: %v", task.ID, err) } else { @@ -781,7 +782,7 @@ func (ts *Service) handleCreateTask(w http.ResponseWriter, r *http.Request) { } // Return task info - t, err := ts.convertTask(newTask, "formatted", "attributes") + t, err := ts.convertTask(newTask, "formatted", "attributes", ts.TaskMasterLookup.Main()) if err != nil { httpd.HttpError(w, err.Error(), true, http.StatusInternalServerError) return @@ -940,7 +941,7 @@ func (ts *Service) handleUpdateTask(w http.ResponseWriter, r *http.Request) { } } - t, err := ts.convertTask(updated, "formatted", "attributes") + t, err := ts.convertTask(updated, "formatted", "attributes", ts.TaskMasterLookup.Main()) if err != nil { httpd.HttpError(w, err.Error(), true, http.StatusInternalServerError) return @@ -949,7 +950,7 @@ func (ts *Service) handleUpdateTask(w http.ResponseWriter, r *http.Request) { w.Write(httpd.MarshalJSON(t, true)) } -func (ts *Service) convertTask(t Task, scriptFormat, dotView string) (client.Task, error) { +func (ts *Service) convertTask(t Task, scriptFormat, dotView string, tm *kapacitor.TaskMaster) (client.Task, error) { script := t.TICKscript if scriptFormat == "formatted" { // Format TICKscript @@ -961,15 +962,15 @@ func (ts *Service) convertTask(t Task, scriptFormat, dotView string) (client.Tas } } - executing := ts.TaskMaster.IsExecuting(t.ID) + executing := tm.IsExecuting(t.ID) errMsg := t.Error dot := "" stats := client.ExecutionStats{} task, err := ts.newKapacitorTask(t) if err == nil { if executing { - dot = ts.TaskMaster.ExecutingDot(t.ID, dotView == "labels") - s, err := ts.TaskMaster.ExecutionStats(t.ID) + dot = tm.ExecutingDot(t.ID, dotView == "labels") + s, err := tm.ExecutionStats(t.ID) if err != nil { ts.logger.Printf("E! failed to retrieve stats for task %s: %v", t.ID, err) } else { @@ -1794,7 +1795,7 @@ func (ts *Service) newKapacitorTask(task Task) (*kapacitor.Task, error) { if err != nil { return nil, err } - return ts.TaskMaster.NewTask(task.ID, + return ts.TaskMasterLookup.Main().NewTask(task.ID, task.TICKscript, tt, dbrps, @@ -1811,7 +1812,7 @@ func (ts *Service) templateTask(template Template) (*kapacitor.Template, error) case BatchTask: tt = kapacitor.BatchTask } - t, err := ts.TaskMaster.NewTemplate(template.ID, + t, err := ts.TaskMasterLookup.Main().NewTemplate(template.ID, template.TICKscript, tt, ) @@ -1829,8 +1830,9 @@ func (ts *Service) startTask(task Task) error { // Starting task, remove last error ts.saveLastError(t.ID, "") + tm := ts.TaskMasterLookup.Main() // Start the task - et, err := ts.TaskMaster.StartTask(t) + et, err := tm.StartTask(t) if err != nil { ts.saveLastError(t.ID, err.Error()) return err @@ -1841,7 +1843,7 @@ func (ts *Service) startTask(task Task) error { err := et.StartBatching() if err != nil { ts.saveLastError(t.ID, err.Error()) - ts.TaskMaster.StopTask(t.ID) + tm.StopTask(t.ID) return err } } @@ -1853,7 +1855,7 @@ func (ts *Service) startTask(task Task) error { if err != nil { // Stop task - ts.TaskMaster.StopTask(t.ID) + tm.StopTask(t.ID) ts.logger.Printf("E! task %s finished with error: %s", et.Task.ID, err) // Save last error from task. @@ -1867,7 +1869,7 @@ func (ts *Service) startTask(task Task) error { } func (ts *Service) stopTask(id string) { - ts.TaskMaster.StopTask(id) + ts.TaskMasterLookup.Main().StopTask(id) } // Save last error from task. diff --git a/task_master.go b/task_master.go index 1dc0e53cf..bbbdbb0a2 100644 --- a/task_master.go +++ b/task_master.go @@ -21,6 +21,7 @@ import ( const ( statPointsReceived = "points_received" + MainTaskMaster = "main" ) type LogService interface { @@ -38,8 +39,8 @@ var ErrTaskMasterOpen = errors.New("TaskMaster is open") // An execution framework for a set of tasks. type TaskMaster struct { - // Unique name for this task master instance - name string + // Unique id for this task master instance + id string HTTPDService interface { AddRoutes([]httpd.Route) error @@ -147,24 +148,24 @@ type forkKey struct { } // Create a new Executor with a given clock. -func NewTaskMaster(name string, l LogService) *TaskMaster { +func NewTaskMaster(id string, l LogService) *TaskMaster { return &TaskMaster{ - name: name, + id: id, forks: make(map[forkKey]map[string]*Edge), forkStats: make(map[forkKey]*expvar.Int), taskToForkKeys: make(map[string][]forkKey), batches: make(map[string][]BatchCollector), tasks: make(map[string]*ExecutingTask), LogService: l, - logger: l.NewLogger(fmt.Sprintf("[task_master:%s] ", name), log.LstdFlags), + logger: l.NewLogger(fmt.Sprintf("[task_master:%s] ", id), log.LstdFlags), closed: true, TimingService: noOpTimingService{}, } } // Returns a new TaskMaster instance with the same services as the current one. -func (tm *TaskMaster) New(name string) *TaskMaster { - n := NewTaskMaster(name, tm.LogService) +func (tm *TaskMaster) New(id string) *TaskMaster { + n := NewTaskMaster(id, tm.LogService) n.HTTPDService = tm.HTTPDService n.UDFService = tm.UDFService n.DeadmanService = tm.DeadmanService @@ -455,7 +456,7 @@ func (tm *TaskMaster) stream(name string) (StreamCollector, error) { if tm.closed { return nil, ErrTaskMasterClosed } - in := newEdge(fmt.Sprintf("task_master:%s", tm.name), name, "stream", pipeline.StreamEdge, defaultEdgeBufferSize, tm.LogService) + in := newEdge(fmt.Sprintf("task_master:%s", tm.id), name, "stream", pipeline.StreamEdge, defaultEdgeBufferSize, tm.LogService) tm.drained = false tm.wg.Add(1) go tm.runForking(in) @@ -504,7 +505,7 @@ func (tm *TaskMaster) forkPoint(p models.Point) { tm.forkStats[key] = c tags := map[string]string{ - "task_master": tm.name, + "task_master": tm.id, "database": key.Database, "retention_policy": key.RetentionPolicy, "measurement": key.Measurement, @@ -639,3 +640,36 @@ type noOpTimingService struct{} func (noOpTimingService) NewTimer(timer.Setter) timer.Timer { return timer.NewNoOp() } + +type TaskMasterLookup struct { + sync.Mutex + taskMasters map[string]*TaskMaster +} + +func NewTaskMasterLookup() *TaskMasterLookup { + return &TaskMasterLookup{ + taskMasters: make(map[string]*TaskMaster), + } +} + +func (tml *TaskMasterLookup) Get(id string) *TaskMaster { + tml.Lock() + defer tml.Unlock() + return tml.taskMasters[id] +} + +func (tml *TaskMasterLookup) Main() *TaskMaster { + return tml.Get(MainTaskMaster) +} + +func (tml *TaskMasterLookup) Set(tm *TaskMaster) { + tml.Lock() + defer tml.Unlock() + tml.taskMasters[tm.id] = tm +} + +func (tml *TaskMasterLookup) Delete(tm *TaskMaster) { + tml.Lock() + defer tml.Unlock() + delete(tml.taskMasters, tm.id) +} diff --git a/usr/share/bash-completion/completions/kapacitor b/usr/share/bash-completion/completions/kapacitor index 2e8c40bbd..d76252316 100644 --- a/usr/share/bash-completion/completions/kapacitor +++ b/usr/share/bash-completion/completions/kapacitor @@ -123,7 +123,18 @@ _kapacitor() ;; esac ;; - enable|disable|reload|show) + show) + case "$prev" in + -replay) + words=$(_kapacitor_list replays "$cur") + ;; + *) + words=$(_kapacitor_list tasks "$cur") + words+=' -replay' + ;; + esac + ;; + enable|disable|reload) words=$(_kapacitor_list tasks "$cur") ;; delete|list)