Skip to content

Commit

Permalink
Fixes intelsdi-x#1061: ensures that all instances of a running plugin…
Browse files Browse the repository at this point in the history
… are stopped/killed on unload
  • Loading branch information
Tom McSweeney committed Jul 22, 2016
1 parent 5137f6c commit 1ee9a50
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 9 deletions.
16 changes: 14 additions & 2 deletions control/plugin/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,20 @@ type commandWrapper struct {
}

func (cw *commandWrapper) Path() string { return cw.cmd.Path }
func (cw *commandWrapper) Kill() error { return cw.cmd.Process.Kill() }
func (cw *commandWrapper) Start() { cw.cmd.Start() }
func (cw *commandWrapper) Kill() error {
// first, kill the process wrapped up in the commandWrapper
if err := cw.cmd.Process.Kill(); err != nil {
log.WithFields(log.Fields{
"_block": "Kill",
}).Error(err)
return err
}
// then wait for it to exit (so that we don't have any zombie processes kicking
// around the system)
_, err := cw.cmd.Process.Wait()
return err
}
func (cw *commandWrapper) Start() { cw.cmd.Start() }

// Initialize a new ExecutablePlugin from path to executable and daemon mode (true or false)
func NewExecutablePlugin(a Arg, path string) (*ExecutablePlugin, error) {
Expand Down
20 changes: 16 additions & 4 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
return
}
case *control_event.UnloadPluginEvent:
// On plugin unload, find the key and pool info for the plugin being unloaded.
// On plugin unload, first find the key and pool info for the plugin being unloaded
// so that we can determine what, if any, subscriptions need to be moved to another
// version of that plugin (if one is available).
r.availablePlugins.RLock()
var pool strategy.Pool
var k string
Expand All @@ -301,14 +303,24 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {

r.availablePlugins.RUnlock()
if pool == nil {
// if we didn't find the named plugin in one of our pools, then simply return
return
}
// Check for the highest lower version plugin and move subscriptions that
// are not bound to a plugin version to this pool.
// Otherwise, we need to roll any subscriptions that may have existed for the plugin being
// unloaded to a new version (if we can find one). First, check for the highest version of the
// plugin being unloaded that is still available (if there is one)
plugin, err := r.pluginManager.get(fmt.Sprintf("%s:%s:%d", core.PluginType(v.Type).String(), v.Name, -1))
if err != nil {
if err == ErrPluginNotFound {
// if we got to here via a "PluginNotFoundError", then we can't find
// another version of this plugin, so we need to kill all instances
// of the plugin from this pool
pool.KillAll("Last verstion of plugin unloaded; no additional versions to roll over to")
}
return
}
// if we found another version, then move the subscriptions that are not currently bound to a
// plugin version to our pool
newPool, err := r.availablePlugins.getOrCreatePool(plugin.Key())
if err != nil {
return
Expand All @@ -324,7 +336,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
return
}
}
// Remove the unloaded plugin from available plugins
// Finally, remove the unloaded plugin from available plugins
r.availablePlugins.Lock()
delete(r.availablePlugins.table, k)
r.availablePlugins.Unlock()
Expand Down
31 changes: 28 additions & 3 deletions control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"bytes"
"encoding/gob"
"errors"
"fmt"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -77,6 +78,7 @@ type Pool interface {
Version() int
RestartCount() int
IncRestartCount()
KillAll(string)
}

type AvailablePlugin interface {
Expand Down Expand Up @@ -297,20 +299,43 @@ func (p *pool) Kill(id uint32, reason string) {
}
}

// Kill all instances of a plugin
func (p *pool) KillAll(reason string) {
for id, rp := range p.plugins {
log.WithFields(log.Fields{
"_block": "KillAll",
"reason": reason,
}).Debug(fmt.Sprintf("handling 'KillAll' for pool '%v', killing plugin '%v:%v'", p.String(), rp.Name(), rp.Version()))
if err := rp.Stop(reason); err != nil {
log.WithFields(log.Fields{
"_block": "KillAll",
"reason": reason,
}).Error(err)
}
p.Kill(id, reason)
}
}

// SelectAndKill selects, kills and removes the available plugin from the pool
func (p *pool) SelectAndKill(id, reason string) {
rp, err := p.Remove(p.plugins.Values(), id)
if err != nil {
log.WithFields(log.Fields{
"_block": "selectAndKill",
"_block": "SelectAndKill",
"taskID": id,
"reason": reason,
}).Error(err)
}
if err := rp.Stop(reason); err != nil {
log.WithFields(log.Fields{
"_block": "SelectAndKill",
"taskID": id,
"reason": reason,
}).Error(err)
}
rp.Stop(reason)
if err := rp.Kill(reason); err != nil {
log.WithFields(log.Fields{
"_block": "selectAndKill",
"_block": "SelectAndKill",
"taskID": id,
"reason": reason,
}).Error(err)
Expand Down

0 comments on commit 1ee9a50

Please sign in to comment.