-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpool.go
127 lines (118 loc) · 2.82 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package main
import (
"net/http"
"sync"
"time"
)
// Task is a command to run on a controller
type Task struct {
Cmd string
Path Lookup
Attr []string
}
// Result of one execution in the loop
type Result struct {
Data []interface{}
Err error
}
// Pool of worker gophers running commands in controllers
type Pool struct {
client *http.Client
wg sync.WaitGroup
delay time.Duration
loop time.Duration
sem chan struct{}
cancel chan struct{}
}
// NewPool returns a new Task Pool
func NewPool(tasks int, delay, loop time.Duration, client *http.Client) *Pool {
p := &Pool{
client: client,
delay: delay,
loop: loop,
sem: make(chan struct{}, tasks),
cancel: make(chan struct{}),
}
return p
}
// Push adds the tasks to the pool
func (p *Pool) Push(md, username, pass string, commands []Task, script Script, useSSH bool) chan Result {
// Leave notice a new thread is running
p.wg.Add(1)
controller := NewController(md, username, pass, p.client, useSSH)
stream := make(chan Result, 1)
go func() {
defer p.wg.Done()
defer controller.Close()
defer close(stream)
for {
// Dial does session caching, will refresh credentials if needed
var data []interface{}
var done bool
err := controller.Dial()
if err == nil {
data, done, err = func() ([]interface{}, bool, error) {
// Do this in a closure to use defer() and make sure
// we release the lock after running the task, whatever the error
p.sem <- struct{}{}
defer func() { <-p.sem }()
// Iterate on the switches, delivering tasks to the queue
return p.run(controller, commands, script)
}()
}
stream <- Result{Data: data, Err: err}
if done || p.loop <= 0 {
return
}
select {
case <-time.After(p.loop): // do nothing
case <-p.cancel:
return
}
}
}()
return stream
}
// Cancel loops
func (p *Pool) Cancel() {
close(p.cancel)
}
// Close tells the pool no more tasks will be pushed. It does not cancel it.
func (p *Pool) Close() {
p.wg.Wait()
}
// run the required commands
func (p *Pool) run(controller *Controller, commands []Task, script Script) ([]interface{}, bool, error) {
result := make([]interface{}, 0, len(commands))
first := true
// Get data
for _, cmd := range commands {
// add delay, if requested
if first {
first = false
} else if p.delay > 0 {
time.Sleep(p.delay)
}
curr, err := controller.Show(cmd.Cmd, cmd.Path)
if err != nil {
return nil, false, err
}
if cmd.Attr != nil && len(cmd.Attr) > 0 {
selected, err := Select(curr, cmd.Attr)
if err != nil {
return nil, false, err
}
curr = selected
}
result = append(result, curr)
}
if script == nil {
return result, false, nil
}
value, done, err := script.Run(controller, result)
if err != nil {
return nil, done, err
}
result = []interface{}{value}
return result, done, nil
}