From e3cb82ba2685b7de65d2dc6b5f65ac1022527afb Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Tue, 22 Sep 2015 11:42:16 -0700 Subject: [PATCH] Adds graceful shutdown handling to tribe --- control/control.go | 2 +- mgmt/tribe/messages.go | 2 +- mgmt/tribe/tribe.go | 26 +++++++++++++++++++++++--- mgmt/tribe/worker.go | 38 ++++++++++++++++++++++---------------- 4 files changed, 47 insertions(+), 21 deletions(-) diff --git a/control/control.go b/control/control.go index 456c2f5d0..09f932ae2 100644 --- a/control/control.go +++ b/control/control.go @@ -175,7 +175,7 @@ func (p *pluginControl) Name() string { } func (p *pluginControl) RegisterEventHandler(name string, h gomit.Handler) error { - return p.eventManager.RegisterHandler("tribe", h) + return p.eventManager.RegisterHandler(name, h) } // Begin handling load, unload, and inventory diff --git a/mgmt/tribe/messages.go b/mgmt/tribe/messages.go index b08453b9f..240721d0c 100644 --- a/mgmt/tribe/messages.go +++ b/mgmt/tribe/messages.go @@ -40,7 +40,7 @@ func (m msgType) String() string { type msg interface { ID() string Time() LTime - GetType() msgType //TODO rename to Type + GetType() msgType Agreement() string String() string } diff --git a/mgmt/tribe/tribe.go b/mgmt/tribe/tribe.go index a83a03cae..81dd4231b 100644 --- a/mgmt/tribe/tribe.go +++ b/mgmt/tribe/tribe.go @@ -88,6 +88,9 @@ type tribe struct { pluginCatalog managesPlugins pluginWorkQueue chan pluginRequest + + workerQuitChan chan interface{} + workerWaitGroup *sync.WaitGroup } type ManagesTribe interface { @@ -154,6 +157,8 @@ func New(c *config) (*tribe, error) { logger: logger.WithField("_name", c.memberlistConfig.Name), tags: map[string]string{RestAPIPort: strconv.Itoa(c.restAPIPort)}, pluginWorkQueue: make(chan pluginRequest, 999), + workerQuitChan: make(chan interface{}), + workerWaitGroup: &sync.WaitGroup{}, } tribe.broadcasts = &memberlist.TransmitLimitedQueue{ @@ -221,8 +226,20 @@ func (t *tribe) Start() error { } func (t *tribe) Stop() { - //TODO send tribe shutdown msg here - //TODO stop workers + err := t.memberlist.Leave(1 * time.Second) + if err != nil { + logger.WithFields(log.Fields{ + "_block": "Stop", + }).Error(err) + } + err = t.memberlist.Shutdown() + if err != nil { + logger.WithFields(log.Fields{ + "_block": "Stop", + }).Error(err) + } + close(t.workerQuitChan) + t.workerWaitGroup.Wait() } func (t *tribe) startDispatcher(nworkers int, cp managesPlugins, mm getsMembers) { @@ -230,7 +247,7 @@ func (t *tribe) startDispatcher(nworkers int, cp managesPlugins, mm getsMembers) for i := 0; i < nworkers; i++ { logger.Infof("Starting tribe plugin worker-%d", i+1) - worker := newPluginWorker(i+1, pluginWorkerQueue, cp, mm) + worker := newPluginWorker(i+1, pluginWorkerQueue, t.workerQuitChan, t.workerWaitGroup, cp, mm) worker.Start() } @@ -245,6 +262,9 @@ func (t *tribe) startDispatcher(nworkers int, cp managesPlugins, mm getsMembers) logger.Debug("Dispatching plugin work request") worker <- work }() + case <-t.workerQuitChan: + logger.Debug("Stopping plugin work dispatcher") + return } } }() diff --git a/mgmt/tribe/worker.go b/mgmt/tribe/worker.go index 3dadf0906..258a2a9b5 100644 --- a/mgmt/tribe/worker.go +++ b/mgmt/tribe/worker.go @@ -7,6 +7,7 @@ import ( "math/rand" "net/http" "os" + "sync" "time" log "github.com/Sirupsen/logrus" @@ -27,15 +28,17 @@ type pluginRequest struct { } // newPluginWorker -func newPluginWorker(id int, workerQueue chan chan pluginRequest, pm managesPlugins, mm getsMembers) pluginWorker { +func newPluginWorker(id int, workerQueue chan chan pluginRequest, quitChan chan interface{}, wg *sync.WaitGroup, pm managesPlugins, mm getsMembers) pluginWorker { // Create, and return the worker. worker := pluginWorker{ pluginManager: pm, memberManager: mm, - ID: id, - Work: make(chan pluginRequest), - WorkerQueue: workerQueue, - QuitChan: make(chan bool)} + id: id, + work: make(chan pluginRequest), + workerQueue: workerQueue, + quitChan: quitChan, + waitGroup: wg, + } return worker } @@ -43,10 +46,11 @@ func newPluginWorker(id int, workerQueue chan chan pluginRequest, pm managesPlug type pluginWorker struct { pluginManager managesPlugins memberManager getsMembers - ID int - Work chan pluginRequest - WorkerQueue chan chan pluginRequest - QuitChan chan bool + id int + work chan pluginRequest + workerQueue chan chan pluginRequest + quitChan chan interface{} + waitGroup *sync.WaitGroup } type getsMembers interface { @@ -55,20 +59,22 @@ type getsMembers interface { // Start "starts" the worker func (w pluginWorker) Start() { + w.waitGroup.Add(1) go func() { + defer w.waitGroup.Done() for { // Add ourselves into the worker queue. - w.WorkerQueue <- w.Work + w.workerQueue <- w.work var done bool select { - case work := <-w.Work: + case work := <-w.work: // Receive a work request. wlogger := workerLogger.WithFields(log.Fields{ "plugin_name": work.Plugin.Name_, "plugin_version": work.Plugin.Version_, "plugin_type": work.Plugin.Type_.String(), - "worker": w.ID, + "worker": w.id, }) workerLogger.Debug("received work") done = false @@ -83,7 +89,7 @@ func (w pluginWorker) Start() { } for _, member := range shuffle(members) { url := fmt.Sprintf("http://%s:%s/v1/plugins/%s/%s/%d", member.Node.Addr, member.Tags[RestAPIPort], work.Plugin.Type_.String(), work.Plugin.Name_, work.Plugin.Version_) - workerLogger.Debugf("worker-%v is trying %v ", w.ID, url) + workerLogger.Debugf("worker-%v is trying %v ", w.id, url) resp, err := http.Get(url) if err != nil { wlogger.Error(err) @@ -123,8 +129,8 @@ func (w pluginWorker) Start() { time.Sleep(200 * time.Millisecond) } - case <-w.QuitChan: - workerLogger.Debugf("Tribe plugin worker-%d is stopping\n", w.ID) + case <-w.quitChan: + workerLogger.Debugf("Tribe plugin worker-%d is stopping\n", w.id) return } } @@ -134,7 +140,7 @@ func (w pluginWorker) Start() { // Stop tells the worker to stop listening func (w pluginWorker) Stop() { go func() { - w.QuitChan <- true + w.quitChan <- true }() }