Skip to content

Commit

Permalink
Adding ExecutionStats to task for finding node stats and task stats.
Browse files Browse the repository at this point in the history
  • Loading branch information
yosiat committed Mar 18, 2016
1 parent 84a3739 commit 4a31c1b
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 5 deletions.
30 changes: 30 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type Node interface {
nodeStatsByGroup() map[models.GroupID]nodeStats

collectedCount() int64

emittedCount() int64

stats() map[string]interface{}
}

//implementation of Node
Expand Down Expand Up @@ -239,13 +243,39 @@ func (n *node) edot(buf *bytes.Buffer, labels bool) {
}
}

// node collected count is the sum of emitted counts of parent edges
func (n *node) collectedCount() (count int64) {
for _, in := range n.ins {
count += in.emittedCount()
}
return
}

// node emitted count is the sum of collected counts of children edges
func (n *node) emittedCount() (count int64) {
for _, out := range n.outs {
count += out.collectedCount()
}
return
}

func (n *node) stats() map[string]interface{} {
stats := make(map[string]interface{})

n.statMap.Do(func(kv expvar.KeyValue) {
switch v := kv.Value.(type) {
case kexpvar.IntVar:
stats[kv.Key] = v.IntValue()
case kexpvar.FloatVar:
stats[kv.Key] = v.FloatValue()
default:
stats[kv.Key] = v.String()
}
})

return stats
}

// Statistics for a node
type nodeStats struct {
Fields models.Fields
Expand Down
21 changes: 16 additions & 5 deletions services/task_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Service struct {
StartTask(t *kapacitor.Task) (*kapacitor.ExecutingTask, error)
StopTask(name string) error
IsExecuting(name string) bool
ExecutionStats(name string) (kapacitor.ExecutionStats, error)
ExecutingDot(name string, labels bool) string
}

Expand Down Expand Up @@ -684,11 +685,12 @@ func (ts *Service) Disable(name string) error {
}

type taskInfo struct {
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
Executing bool
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
Executing bool
ExecutionStats kapacitor.ExecutionStats
}

func (ts *Service) IsEnabled(name string) (e bool) {
Expand Down Expand Up @@ -725,6 +727,15 @@ func (ts *Service) GetTaskInfo(tasks []string) ([]taskInfo, error) {
Enabled: enabled,
Executing: ts.TaskMaster.IsExecuting(t.Name),
}

if info.Executing {
executionStats, err := ts.TaskMaster.ExecutionStats(t.Name)
if err != nil {
return fmt.Errorf("failed to fetch execution stats. name: %s, err: %s", t.Name, err)
}
info.ExecutionStats = executionStats
}

taskInfos = append(taskInfos, info)
return nil
}
Expand Down
34 changes: 34 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,40 @@ func (et *ExecutingTask) registerOutput(name string, o Output) {
et.outputs[name] = o
}

type ExecutionStats struct {
TaskStats map[string]interface{}
NodeStats map[string]map[string]interface{}
}

func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error) {
executionStats := ExecutionStats{
TaskStats: make(map[string]interface{}),
NodeStats: make(map[string]map[string]interface{}),
}

// Fill the task stats
executionStats.TaskStats["throughput"] = et.getThroughput()

// Fill the nodes stats
err := et.walk(func(node Node) error {
nodeStats := node.stats()

// Add collected and emitted
nodeStats["collected"] = node.collectedCount()
nodeStats["emitted"] = node.emittedCount()

executionStats.NodeStats[node.Name()] = nodeStats

return nil
})

if err != nil {
return executionStats, err
}

return executionStats, nil
}

// Return a graphviz .dot formatted byte array.
// Label edges with relavant execution information.
func (et *ExecutingTask) EDot(labels bool) []byte {
Expand Down
11 changes: 11 additions & 0 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,17 @@ func (tm *TaskMaster) IsExecuting(name string) bool {
return executing
}

func (tm *TaskMaster) ExecutionStats(name string) (ExecutionStats, error) {
tm.mu.RLock()
defer tm.mu.RUnlock()
task, executing := tm.tasks[name]
if !executing {
return ExecutionStats{}, nil
}

return task.ExecutionStats()
}

func (tm *TaskMaster) ExecutingDot(name string, labels bool) string {
tm.mu.RLock()
defer tm.mu.RUnlock()
Expand Down

0 comments on commit 4a31c1b

Please sign in to comment.