Skip to content

Commit

Permalink
Adding ExecutionStats to task for finding easily how much the task pr…
Browse files Browse the repository at this point in the history
…ocessed.
  • Loading branch information
yosiat committed Mar 12, 2016
1 parent 4f50575 commit 10939b0
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 5 deletions.
9 changes: 9 additions & 0 deletions node.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ type Node interface {
nodeStatsByGroup() map[models.GroupID]nodeStats

collectedCount() int64

emittedCount() int64
}

//implementation of Node
Expand Down Expand Up @@ -246,6 +248,13 @@ func (n *node) collectedCount() (count int64) {
return
}

func (n *node) emittedCount() (count int64) {
for _, out := range n.outs {
count += out.emittedCount()
}
return
}

// Statistics for a node
type nodeStats struct {
Fields models.Fields
Expand Down
21 changes: 16 additions & 5 deletions services/task_store/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type Service struct {
StartTask(t *kapacitor.Task) (*kapacitor.ExecutingTask, error)
StopTask(name string) error
IsExecuting(name string) bool
ExecutionStats(name string) (kapacitor.ExecutionStats, error)
ExecutingDot(name string, labels bool) string
}

Expand Down Expand Up @@ -684,11 +685,12 @@ func (ts *Service) Disable(name string) error {
}

type taskInfo struct {
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
Executing bool
Name string
Type kapacitor.TaskType
DBRPs []kapacitor.DBRP
Enabled bool
Executing bool
ExecutionStats kapacitor.ExecutionStats
}

func (ts *Service) IsEnabled(name string) (e bool) {
Expand Down Expand Up @@ -725,6 +727,15 @@ func (ts *Service) GetTaskInfo(tasks []string) ([]taskInfo, error) {
Enabled: enabled,
Executing: ts.TaskMaster.IsExecuting(t.Name),
}

if info.Executing {
executionStats, err := ts.TaskMaster.ExecutionStats(t.Name)
if err != nil {
return fmt.Errorf("failed to fetch execution stats. name: %s, err: %s", t.Name, err)
}
info.ExecutionStats = executionStats
}

taskInfos = append(taskInfos, info)
return nil
}
Expand Down
54 changes: 54 additions & 0 deletions task.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,26 @@ func (et *ExecutingTask) rwalk(f func(n Node) error) error {
return nil
}

// find nodes in the pipeline that matches the predicate
func (et *ExecutingTask) findNodes(predicate func(n Node) (bool, error)) ([]Node, error) {
nodes := make([]Node, 0)

err := et.walk(func(n Node) error {
matches, err := predicate(n)
if err != nil {
return err
}

if matches {
nodes = append(nodes, n)
}

return nil
})

return nodes, err
}

// Link all the nodes together based on the task pipeline.
func (et *ExecutingTask) link() error {

Expand Down Expand Up @@ -291,6 +311,40 @@ func (et *ExecutingTask) registerOutput(name string, o Output) {
et.outputs[name] = o
}

type ExecutionStats struct {
// Input nodes and their emitted points
// where input nodes - are batch/stream node
// for example: stream1 -> 12
Inputs map[string]int64
}

func (et *ExecutingTask) ExecutionStats() (ExecutionStats, error) {
executionStats := ExecutionStats{
Inputs: make(map[string]int64),
}

// First we need to find all input nodes
inputNodes, err := et.findNodes(func(n Node) (bool, error) {
switch n.(type) {
case *StreamNode, *BatchNode:
return true, nil
default:
return false, nil
}
})

if err != nil {
return executionStats, err
}

// Build the input nodes map
for _, inputNode := range inputNodes {
executionStats.Inputs[inputNode.Name()] = inputNode.emittedCount()
}

return executionStats, nil
}

// Return a graphviz .dot formatted byte array.
// Label edges with relavant execution information.
func (et *ExecutingTask) EDot(labels bool) []byte {
Expand Down
11 changes: 11 additions & 0 deletions task_master.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,17 @@ func (tm *TaskMaster) IsExecuting(name string) bool {
return executing
}

func (tm *TaskMaster) ExecutionStats(name string) (ExecutionStats, error) {
tm.mu.RLock()
defer tm.mu.RUnlock()
task, executing := tm.tasks[name]
if !executing {
return ExecutionStats{}, nil
}

return task.ExecutionStats()
}

func (tm *TaskMaster) ExecutingDot(name string, labels bool) string {
tm.mu.RLock()
defer tm.mu.RUnlock()
Expand Down

0 comments on commit 10939b0

Please sign in to comment.