Skip to content

Commit

Permalink
add optional to be able to show a task in the context of a replay (#652)
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathaniel Cook authored Jun 17, 2016
1 parent ea2f959 commit 1c80391
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 52 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
### Features

- [#636](https://github.com/influxdata/kapacitor/pull/636): Change HTTP logs to be in Common Log format.
- [#](https://github.com/influxdata/kapacitor/pull/): Add optional replay ID to the task API so that you can get information about a task inside a running replay.

### Bugfixes

Expand Down
1 change: 1 addition & 0 deletions client/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,7 @@ To get information about a task make a GET request to the `/kapacitor/v1/tasks/T
| --------------- | ------- | ------- |
| dot-view | attributes | One of `labels` or `attributes`. Labels is less readable but will correctly render with all the information contained in labels. |
| script-format | formatted | One of `formatted` or `raw`. Raw will return the script identical to how it was defined. Formatted will first format the script. |
| replay-id | | Optional ID of a running replay. The returned task information will be in the context of the task for the running replay. |


A task has these read only properties in addition to the properties listed [above](#define-task).
Expand Down
2 changes: 2 additions & 0 deletions client/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,6 +693,7 @@ func (c *Client) UpdateTask(link Link, opt UpdateTaskOptions) (Task, error) {
type TaskOptions struct {
DotView string
ScriptFormat string
ReplayID string
}

func (o *TaskOptions) Default() {
Expand All @@ -708,6 +709,7 @@ func (o *TaskOptions) Values() *url.Values {
v := &url.Values{}
v.Set("dot-view", o.DotView)
v.Set("script-format", o.ScriptFormat)
v.Set("replay-id", o.ReplayID)
return v
}

Expand Down
4 changes: 4 additions & 0 deletions client/v1/swagger.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@ paths:
required: true
type: string
pattern: (formatted|raw)
- name: replay-id
in: query
description: Optional ID of a running replay. The returned task information will be in the context of the task for the running replay.
type: string
responses:
'200':
description: Task information
Expand Down
20 changes: 16 additions & 4 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"strings"
"time"

"github.com/dustin/go-humanize"
humanize "github.com/dustin/go-humanize"
"github.com/influxdata/influxdb/influxql"
"github.com/influxdata/kapacitor/client/v1"
"github.com/pkg/errors"
Expand Down Expand Up @@ -143,7 +143,8 @@ func main() {
commandArgs = args
commandF = doList
case "show":
commandArgs = args
showFlags.Parse(args)
commandArgs = showFlags.Args()
commandF = doShow
case "show-template":
commandArgs = args
Expand Down Expand Up @@ -177,6 +178,7 @@ func init() {
replayFlags.Usage = replayUsage
defineFlags.Usage = defineUsage
defineTemplateFlags.Usage = defineTemplateUsage
showFlags.Usage = showUsage

recordStreamFlags.Usage = recordStreamUsage
recordBatchFlags.Usage = recordBatchUsage
Expand Down Expand Up @@ -1180,13 +1182,20 @@ func doReload(args []string) error {
}

// Show
var (
showFlags = flag.NewFlagSet("show", flag.ExitOnError)
sReplayId = showFlags.String("replay", "", "Optional replay ID. If set the task information is in the context of the running replay.")
)

func showUsage() {
var u = `Usage: kapacitor show [task ID]
var u = `Usage: kapacitor show [-replay] [task ID]
Show details about a specific task.
Options:
`
fmt.Fprintln(os.Stderr, u)
showFlags.PrintDefaults()
}

func doShow(args []string) error {
Expand All @@ -1196,7 +1205,10 @@ func doShow(args []string) error {
os.Exit(2)
}

t, err := cli.Task(cli.TaskLink(args[0]), nil)
t, err := cli.Task(
cli.TaskLink(args[0]),
&client.TaskOptions{ReplayID: *sReplayId},
)
if err != nil {
return err
}
Expand Down
10 changes: 7 additions & 3 deletions cmd/kapacitord/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ type Server struct {

err chan error

TaskMaster *kapacitor.TaskMaster
TaskMaster *kapacitor.TaskMaster
TaskMasterLookup *kapacitor.TaskMasterLookup

LogService logging.Interface
HTTPDService *httpd.Service
Expand Down Expand Up @@ -118,7 +119,9 @@ func NewServer(c *Config, buildInfo *BuildInfo, logService logging.Interface) (*
s.Logger.Printf("I! ClusterID: %s ServerID: %s", s.ClusterID, s.ServerID)

// Start Task Master
s.TaskMaster = kapacitor.NewTaskMaster("main", logService)
s.TaskMasterLookup = kapacitor.NewTaskMasterLookup()
s.TaskMaster = kapacitor.NewTaskMaster(kapacitor.MainTaskMaster, logService)
s.TaskMasterLookup.Set(s.TaskMaster)
if err := s.TaskMaster.Open(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -224,7 +227,7 @@ func (s *Server) appendTaskStoreService(c task_store.Config) {
srv := task_store.NewService(c, l)
srv.StorageService = s.StorageService
srv.HTTPDService = s.HTTPDService
srv.TaskMaster = s.TaskMaster
srv.TaskMasterLookup = s.TaskMasterLookup

s.TaskStore = srv
s.TaskMaster.TaskStore = srv
Expand All @@ -239,6 +242,7 @@ func (s *Server) appendReplayService(c replay.Config) {
srv.HTTPDService = s.HTTPDService
srv.InfluxDBService = s.InfluxDBService
srv.TaskMaster = s.TaskMaster
srv.TaskMasterLookup = s.TaskMasterLookup

s.ReplayService = srv
s.Services = append(s.Services, srv)
Expand Down
9 changes: 9 additions & 0 deletions influxql.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ func (c *baseReduceContext) Time() time.Time {
func (n *InfluxQLNode) runStreamInfluxQL() error {
contexts := make(map[models.GroupID]reduceContext)
for p, ok := n.ins[0].NextPoint(); ok; {
n.timer.Start()
context := contexts[p.Group]
// Fisrt point in window
if context == nil {
Expand Down Expand Up @@ -127,13 +128,15 @@ func (n *InfluxQLNode) runStreamInfluxQL() error {
// go through loop again to initialize new iterator.
}
}
n.timer.Stop()
}
return nil
}

func (n *InfluxQLNode) runBatchInfluxQL() error {
var exampleValue interface{}
for b, ok := n.ins[0].NextBatch(); ok; b, ok = n.ins[0].NextBatch() {
n.timer.Start()
// Create new base context
c := baseReduceContext{
as: n.n.As,
Expand All @@ -148,6 +151,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
if len(b.Points) == 0 {
if !n.n.ReduceCreater.IsEmptyOK {
// If the reduce does not handle empty batches continue
n.timer.Stop()
continue
}
if exampleValue == nil {
Expand All @@ -171,6 +175,7 @@ func (n *InfluxQLNode) runBatchInfluxQL() error {
if err != nil {
n.logger.Println("E! failed to emit batch:", err)
}
n.timer.Stop()
}
return nil
}
Expand All @@ -194,20 +199,24 @@ func (n *InfluxQLNode) emit(context reduceContext) error {
if err != nil {
return err
}
n.timer.Pause()
for _, out := range n.outs {
err := out.CollectPoint(p)
if err != nil {
return err
}
}
n.timer.Resume()
case pipeline.BatchEdge:
b := context.EmitBatch()
n.timer.Pause()
for _, out := range n.outs {
err := out.CollectBatch(b)
if err != nil {
return err
}
}
n.timer.Resume()
}
return nil
}
8 changes: 8 additions & 0 deletions services/replay/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,11 @@ type Service struct {
NewDefaultClient() (client.Client, error)
NewNamedClient(name string) (client.Client, error)
}
TaskMasterLookup interface {
Get(string) *kapacitor.TaskMaster
Set(*kapacitor.TaskMaster)
Delete(*kapacitor.TaskMaster)
}
TaskMaster interface {
NewFork(name string, dbrps []kapacitor.DBRP, measurements []string) (*kapacitor.Edge, error)
DelFork(name string)
Expand Down Expand Up @@ -1165,6 +1170,9 @@ func (r *Service) doLiveQueryReplay(id string, task *kapacitor.Task, clk clock.C
func (r *Service) doReplay(id string, task *kapacitor.Task, runReplay func(tm *kapacitor.TaskMaster) error) error {
// Create new isolated task master
tm := r.TaskMaster.New(id)
r.TaskMasterLookup.Set(tm)
defer r.TaskMasterLookup.Delete(tm)

tm.Open()
defer tm.Close()
et, err := tm.StartTask(task)
Expand Down
Loading

0 comments on commit 1c80391

Please sign in to comment.