Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Adds graceful shutdown handling to tribe
Browse files Browse the repository at this point in the history
  • Loading branch information
jcooklin committed Oct 1, 2015
1 parent 04502bb commit e3cb82b
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 21 deletions.
2 changes: 1 addition & 1 deletion control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (p *pluginControl) Name() string {
}

func (p *pluginControl) RegisterEventHandler(name string, h gomit.Handler) error {
return p.eventManager.RegisterHandler("tribe", h)
return p.eventManager.RegisterHandler(name, h)
}

// Begin handling load, unload, and inventory
Expand Down
2 changes: 1 addition & 1 deletion mgmt/tribe/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func (m msgType) String() string {
type msg interface {
ID() string
Time() LTime
GetType() msgType //TODO rename to Type
GetType() msgType
Agreement() string
String() string
}
Expand Down
26 changes: 23 additions & 3 deletions mgmt/tribe/tribe.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,9 @@ type tribe struct {

pluginCatalog managesPlugins
pluginWorkQueue chan pluginRequest

workerQuitChan chan interface{}
workerWaitGroup *sync.WaitGroup
}

type ManagesTribe interface {
Expand Down Expand Up @@ -154,6 +157,8 @@ func New(c *config) (*tribe, error) {
logger: logger.WithField("_name", c.memberlistConfig.Name),
tags: map[string]string{RestAPIPort: strconv.Itoa(c.restAPIPort)},
pluginWorkQueue: make(chan pluginRequest, 999),
workerQuitChan: make(chan interface{}),
workerWaitGroup: &sync.WaitGroup{},
}

tribe.broadcasts = &memberlist.TransmitLimitedQueue{
Expand Down Expand Up @@ -221,16 +226,28 @@ func (t *tribe) Start() error {
}

func (t *tribe) Stop() {
//TODO send tribe shutdown msg here
//TODO stop workers
err := t.memberlist.Leave(1 * time.Second)
if err != nil {
logger.WithFields(log.Fields{
"_block": "Stop",
}).Error(err)
}
err = t.memberlist.Shutdown()
if err != nil {
logger.WithFields(log.Fields{
"_block": "Stop",
}).Error(err)
}
close(t.workerQuitChan)
t.workerWaitGroup.Wait()
}

func (t *tribe) startDispatcher(nworkers int, cp managesPlugins, mm getsMembers) {
pluginWorkerQueue := make(chan chan pluginRequest, nworkers)

for i := 0; i < nworkers; i++ {
logger.Infof("Starting tribe plugin worker-%d", i+1)
worker := newPluginWorker(i+1, pluginWorkerQueue, cp, mm)
worker := newPluginWorker(i+1, pluginWorkerQueue, t.workerQuitChan, t.workerWaitGroup, cp, mm)
worker.Start()
}

Expand All @@ -245,6 +262,9 @@ func (t *tribe) startDispatcher(nworkers int, cp managesPlugins, mm getsMembers)
logger.Debug("Dispatching plugin work request")
worker <- work
}()
case <-t.workerQuitChan:
logger.Debug("Stopping plugin work dispatcher")
return
}
}
}()
Expand Down
38 changes: 22 additions & 16 deletions mgmt/tribe/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"net/http"
"os"
"sync"
"time"

log "github.com/Sirupsen/logrus"
Expand All @@ -27,26 +28,29 @@ type pluginRequest struct {
}

// newPluginWorker
func newPluginWorker(id int, workerQueue chan chan pluginRequest, pm managesPlugins, mm getsMembers) pluginWorker {
func newPluginWorker(id int, workerQueue chan chan pluginRequest, quitChan chan interface{}, wg *sync.WaitGroup, pm managesPlugins, mm getsMembers) pluginWorker {
// Create, and return the worker.
worker := pluginWorker{
pluginManager: pm,
memberManager: mm,
ID: id,
Work: make(chan pluginRequest),
WorkerQueue: workerQueue,
QuitChan: make(chan bool)}
id: id,
work: make(chan pluginRequest),
workerQueue: workerQueue,
quitChan: quitChan,
waitGroup: wg,
}

return worker
}

type pluginWorker struct {
pluginManager managesPlugins
memberManager getsMembers
ID int
Work chan pluginRequest
WorkerQueue chan chan pluginRequest
QuitChan chan bool
id int
work chan pluginRequest
workerQueue chan chan pluginRequest
quitChan chan interface{}
waitGroup *sync.WaitGroup
}

type getsMembers interface {
Expand All @@ -55,20 +59,22 @@ type getsMembers interface {

// Start "starts" the worker
func (w pluginWorker) Start() {
w.waitGroup.Add(1)
go func() {
defer w.waitGroup.Done()
for {
// Add ourselves into the worker queue.
w.WorkerQueue <- w.Work
w.workerQueue <- w.work

var done bool
select {
case work := <-w.Work:
case work := <-w.work:
// Receive a work request.
wlogger := workerLogger.WithFields(log.Fields{
"plugin_name": work.Plugin.Name_,
"plugin_version": work.Plugin.Version_,
"plugin_type": work.Plugin.Type_.String(),
"worker": w.ID,
"worker": w.id,
})
workerLogger.Debug("received work")
done = false
Expand All @@ -83,7 +89,7 @@ func (w pluginWorker) Start() {
}
for _, member := range shuffle(members) {
url := fmt.Sprintf("http://%s:%s/v1/plugins/%s/%s/%d", member.Node.Addr, member.Tags[RestAPIPort], work.Plugin.Type_.String(), work.Plugin.Name_, work.Plugin.Version_)
workerLogger.Debugf("worker-%v is trying %v ", w.ID, url)
workerLogger.Debugf("worker-%v is trying %v ", w.id, url)
resp, err := http.Get(url)
if err != nil {
wlogger.Error(err)
Expand Down Expand Up @@ -123,8 +129,8 @@ func (w pluginWorker) Start() {
time.Sleep(200 * time.Millisecond)
}

case <-w.QuitChan:
workerLogger.Debugf("Tribe plugin worker-%d is stopping\n", w.ID)
case <-w.quitChan:
workerLogger.Debugf("Tribe plugin worker-%d is stopping\n", w.id)
return
}
}
Expand All @@ -134,7 +140,7 @@ func (w pluginWorker) Start() {
// Stop tells the worker to stop listening
func (w pluginWorker) Stop() {
go func() {
w.QuitChan <- true
w.quitChan <- true
}()
}

Expand Down

0 comments on commit e3cb82b

Please sign in to comment.