diff --git a/control/plugin/execution.go b/control/plugin/execution.go index f3cd67d5c..ce5b9f77e 100644 --- a/control/plugin/execution.go +++ b/control/plugin/execution.go @@ -53,8 +53,20 @@ type commandWrapper struct { } func (cw *commandWrapper) Path() string { return cw.cmd.Path } -func (cw *commandWrapper) Kill() error { return cw.cmd.Process.Kill() } -func (cw *commandWrapper) Start() { cw.cmd.Start() } +func (cw *commandWrapper) Kill() error { + // first, kill the process wrapped up in the commandWrapper + if err := cw.cmd.Process.Kill(); err != nil { + log.WithFields(log.Fields{ + "_block": "Kill", + }).Error(err) + return err + } + // then wait for it to exit (so that we don't have any zombie processes kicking + // around the system) + _, err := cw.cmd.Process.Wait() + return err +} +func (cw *commandWrapper) Start() { cw.cmd.Start() } // Initialize a new ExecutablePlugin from path to executable and daemon mode (true or false) func NewExecutablePlugin(a Arg, path string) (*ExecutablePlugin, error) { diff --git a/control/runner.go b/control/runner.go index 7f1ce3e6b..16be2d496 100644 --- a/control/runner.go +++ b/control/runner.go @@ -286,7 +286,9 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { return } case *control_event.UnloadPluginEvent: - // On plugin unload, find the key and pool info for the plugin being unloaded. + // On plugin unload, first find the key and pool info for the plugin being unloaded + // so that we can determine what, if any, subscriptions need to be moved to another + // version of that plugin (if one is available). r.availablePlugins.RLock() var pool strategy.Pool var k string @@ -301,14 +303,24 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { r.availablePlugins.RUnlock() if pool == nil { + // if we didn't find the named plugin in one of our pools, then simply return return } - // Check for the highest lower version plugin and move subscriptions that - // are not bound to a plugin version to this pool. + // Otherwise, we need to roll any subscriptions that may have existed for the plugin being + // unloaded to a new version (if we can find one). First, check for the highest version of the + // plugin being unloaded that is still available (if there is one) plugin, err := r.pluginManager.get(fmt.Sprintf("%s:%s:%d", core.PluginType(v.Type).String(), v.Name, -1)) if err != nil { + if err == ErrPluginNotFound { + // if we got to here via a "PluginNotFoundError", then we can't find + // another version of this plugin, so we need to kill all instances + // of the plugin from this pool + pool.KillAll("Last verstion of plugin unloaded; no additional versions to roll over to") + } return } + // if we found another version, then move the subscriptions that are not currently bound to a + // plugin version to our pool newPool, err := r.availablePlugins.getOrCreatePool(plugin.Key()) if err != nil { return @@ -324,7 +336,7 @@ func (r *runner) HandleGomitEvent(e gomit.Event) { return } } - // Remove the unloaded plugin from available plugins + // Finally, remove the unloaded plugin from available plugins r.availablePlugins.Lock() delete(r.availablePlugins.table, k) r.availablePlugins.Unlock() diff --git a/control/strategy/pool.go b/control/strategy/pool.go index fd2930bbf..66d88efed 100644 --- a/control/strategy/pool.go +++ b/control/strategy/pool.go @@ -23,6 +23,7 @@ import ( "bytes" "encoding/gob" "errors" + "fmt" "strconv" "strings" "sync" @@ -77,6 +78,7 @@ type Pool interface { Version() int RestartCount() int IncRestartCount() + KillAll(string) } type AvailablePlugin interface { @@ -297,20 +299,43 @@ func (p *pool) Kill(id uint32, reason string) { } } +// Kill all instances of a plugin +func (p *pool) KillAll(reason string) { + for id, rp := range p.plugins { + log.WithFields(log.Fields{ + "_block": "KillAll", + "reason": reason, + }).Debug(fmt.Sprintf("handling 'KillAll' for pool '%v', killing plugin '%v:%v'", p.String(), rp.Name(), rp.Version())) + if err := rp.Stop(reason); err != nil { + log.WithFields(log.Fields{ + "_block": "KillAll", + "reason": reason, + }).Error(err) + } + p.Kill(id, reason) + } +} + // SelectAndKill selects, kills and removes the available plugin from the pool func (p *pool) SelectAndKill(id, reason string) { rp, err := p.Remove(p.plugins.Values(), id) if err != nil { log.WithFields(log.Fields{ - "_block": "selectAndKill", + "_block": "SelectAndKill", + "taskID": id, + "reason": reason, + }).Error(err) + } + if err := rp.Stop(reason); err != nil { + log.WithFields(log.Fields{ + "_block": "SelectAndKill", "taskID": id, "reason": reason, }).Error(err) } - rp.Stop(reason) if err := rp.Kill(reason); err != nil { log.WithFields(log.Fields{ - "_block": "selectAndKill", + "_block": "SelectAndKill", "taskID": id, "reason": reason, }).Error(err)