Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Simplifiy tribe worker
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Oct 2, 2015
1 parent 331ac33 commit 8e39e8f
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 69 deletions.
4 changes: 3 additions & 1 deletion mgmt/tribe/tribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ type tribe struct {
pluginWorkQueue chan worker.PluginRequest
taskWorkQueue chan worker.TaskRequest

workerQuitChan chan interface{}
workerQuitChan chan struct{}
workerWaitGroup *sync.WaitGroup
}

Expand Down Expand Up @@ -99,6 +99,8 @@ func New(c *config) (*tribe, error) {
tags: map[string]string{agreement.RestAPIPort: strconv.Itoa(c.restAPIPort)},
pluginWorkQueue: make(chan worker.PluginRequest, 999),
taskWorkQueue: make(chan worker.TaskRequest, 999),
workerQuitChan: make(chan struct{}),
workerWaitGroup: &sync.WaitGroup{},
}

tribe.broadcasts = &memberlist.TransmitLimitedQueue{
Expand Down
97 changes: 29 additions & 68 deletions mgmt/tribe/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,88 +71,55 @@ type Member interface {

// newPluginWorker
func newWorker(id int,
pluginWorkerQueue chan chan PluginRequest,
taskWorkerQueue chan chan TaskRequest,
quitChan chan interface{},
pluginQueue chan PluginRequest,
taskQueue chan TaskRequest,
quitChan chan struct{},
wg *sync.WaitGroup,
pm ManagesPlugins,
tm ManagesTasks,
mm getsMembers) worker {
// Create, and return the worker.
worker := worker{
pluginManager: pm,
taskManager: tm,
memberManager: mm,
id: id,
pluginWork: make(chan PluginRequest),
pluginWorkerQueue: pluginWorkerQueue,
taskWork: make(chan TaskRequest),
taskWorkerQueue: taskWorkerQueue,
quitChan: make(chan bool),
pluginManager: pm,
taskManager: tm,
memberManager: mm,
id: id,
pluginWork: pluginQueue,
taskWork: taskQueue,
waitGroup: wg,
quitChan: quitChan,
}

return worker
}

type worker struct {
pluginManager ManagesPlugins
memberManager getsMembers
taskManager ManagesTasks
id int
pluginWork chan PluginRequest
pluginWorkerQueue chan chan PluginRequest
taskWork chan TaskRequest
taskWorkerQueue chan chan TaskRequest
quitChan chan bool
waitGroup *sync.WaitGroup
pluginManager ManagesPlugins
memberManager getsMembers
taskManager ManagesTasks
id int
pluginWork chan PluginRequest
taskWork chan TaskRequest
quitChan chan struct{}
waitGroup *sync.WaitGroup
}

func DispatchWorkers(nworkers int, pluginWorkQueue chan PluginRequest, taskWorkQueue chan TaskRequest, quitChan chan interface{}, workerWaitGroup *sync.WaitGroup, cp ManagesPlugins, tm ManagesTasks, mm getsMembers) {
pluginWorkerQueue := make(chan chan PluginRequest, nworkers)
taskWorkerQueue := make(chan chan TaskRequest, nworkers)
func DispatchWorkers(nworkers int, pluginQueue chan PluginRequest, taskQueue chan TaskRequest, quitChan chan struct{}, workerWaitGroup *sync.WaitGroup, cp ManagesPlugins, tm ManagesTasks, mm getsMembers) {

for i := 0; i < nworkers; i++ {
workerLogger.Infof("Starting tribe worker-%d", i+1)
worker := newWorker(i+1, pluginWorkerQueue, taskWorkerQueue, quitChan, workerWaitGroup, cp, tm, mm)
worker := newWorker(i+1, pluginQueue, taskQueue, quitChan, workerWaitGroup, cp, tm, mm)
worker.start()
}

go func() {
for {
select {
case pluginWork := <-pluginWorkQueue:
workerLogger.Infof("Received plugin work request")
go func() {
pluginWorker := <-pluginWorkerQueue

workerLogger.Infof("Dispatching plugin work request")
pluginWorker <- pluginWork
}()
case taskWork := <-taskWorkQueue:
workerLogger.Infof("Received task work request")
go func() {
workerLogger.Infof("Waiting for free worker")
taskWorker := <-taskWorkerQueue

workerLogger.Infof("Dispatching task work request")
taskWorker <- taskWork
}()
case <-quitChan:
workerLogger.Infof("Stopping plugin work dispatcher")
return
}
}
}()
}

// Start "starts" the workers
func (w worker) start() {
// task worker
w.waitGroup.Add(1)
go func() {
defer w.waitGroup.Done()
workerLogger.Debugf("Starting task worker-%d", w.id)
for {
defer w.waitGroup.Done()
w.taskWorkerQueue <- w.taskWork

select {
case work := <-w.taskWork:
done := false
Expand All @@ -162,6 +129,7 @@ func (w worker) start() {
"worker": w.id,
"_block": "start",
})
wLogger.Error("received task work")
if work.RequestType == TaskCreatedType {
task, _ := w.taskManager.GetTask(work.Task.ID)
if task != nil {
Expand Down Expand Up @@ -222,12 +190,11 @@ func (w worker) start() {
}()

// plugin worker
w.waitGroup.Add(1)
go func() {
defer w.waitGroup.Done()
workerLogger.Debugf("Starting plugin worker-%d", w.id)
for {
// Add ourselves into the worker queue.
w.pluginWorkerQueue <- w.pluginWork

select {
case work := <-w.pluginWork:
// Receive a work request.
Expand All @@ -238,6 +205,7 @@ func (w worker) start() {
"worker": w.id,
"_block": "start",
})
wLogger.Debug("received plugin work")
done := false
for {
if w.isPluginLoaded(work.Plugin.Name(), work.Plugin.TypeName(), work.Plugin.Version()) {
Expand All @@ -249,7 +217,7 @@ func (w worker) start() {
continue
}
for _, member := range shuffle(members) {
url := fmt.Sprintf("http://%s:%s/v1/plugins/%s/%s/%d", member.GetAddr(), member.GetRESTAPIPort(), work.Plugin.TypeName(), work.Plugin.Name(), work.Plugin.Version())
url := fmt.Sprintf("http://%s:%s/v1/plugins/%s/%s/%d?download=true", member.GetAddr(), member.GetRESTAPIPort(), work.Plugin.TypeName(), work.Plugin.Name(), work.Plugin.Version())
resp, err := http.Get(url)
if err != nil {
wLogger.Error(err)
Expand Down Expand Up @@ -296,13 +264,6 @@ func (w worker) start() {
}()
}

// Stop tells the worker to stop listening
func (w worker) Stop() {
go func() {
w.quitChan <- true
}()
}

func shuffle(m []Member) []Member {
result := make([]Member, len(m))
perm := rand.Perm(len(m))
Expand Down

0 comments on commit 8e39e8f

Please sign in to comment.