-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathBalancer.go
84 lines (69 loc) · 1.65 KB
/
Balancer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
package queue
import (
"fmt"
"log"
"sync"
)
func NewBalancer(name string, size, queueSize int) *Balancer { // {{{
return &Balancer{
name: name,
size: size,
queueSize: queueSize,
shutdown: make(chan bool),
pools: &Pools{},
}
} // }}}
type Balancer struct {
sync.Mutex
name string
size int
queueSize int
shutdown chan bool
workerFactory WorkerFactoryInterface
pools *Pools
}
func (this *Balancer) Name() string { // {{{
return this.name
} // }}}
func (this *Balancer) Size() int { // {{{
return this.size
} // }}}
func (this *Balancer) Info() string { // {{{
return fmt.Sprintf("%sBalancer:%d#%d", this.Name(), this.size, this.queueSize)
} // }}}
func (this *Balancer) SetWorkerFactory(f WorkerFactoryInterface) { // {{{
this.workerFactory = f
} // }}}
func (this *Balancer) WorkerFactory() WorkerFactoryInterface { // {{{
if this.workerFactory == nil {
this.workerFactory = NewWorkerFactory(this.name)
}
return this.workerFactory
} // }}}
func (this *Balancer) Dispatch(job JobInterface) { // {{{
if this.pools.Len() > 0 {
this.pools.Dispatch(job)
} else {
go log.Println("[Balancer:Error] Pools is empty")
}
} // }}}
func (this *Balancer) Run() { // {{{
for i := 1; i <= this.size; i++ {
pool := NewPool(this.name, i, this.queueSize)
pool.SetWorkerFactory(this.WorkerFactory().New())
this.pools.Push(pool)
pool.Start()
}
} // }}}
func (this *Balancer) Close() { // {{{
for this.pools.Len() > 0 {
pool := this.pools.Pop()
pool.Close()
}
} // }}}
func (this *Balancer) Start() { // {{{
go this.Run()
} // }}}
func (this *Balancer) Stop() { // {{{
go this.Close()
} // }}}