Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
start plugins synchronously
Browse files Browse the repository at this point in the history
When a task starts it calls SubscribeDeps on control to prepare any
plugins it may need.  Previously, SubscribeDeps emitted events which
were picked up by the runner, who then started the plugins.  This
occasionally resulted in an error starting the task, because the async
process of starting the needed plugins had not completed yet.  This was
especially true with tasks with very small intervals, <250ms for
example.

This commit causes SubscribeDeps to first subscribe, then check that the
pool is eligible.  If it is, then it runs a plugin directly.  The runner
no longer handles `PluginSubscriptionEvent`.
  • Loading branch information
pittma committed Oct 22, 2015
1 parent 0c1c681 commit 706a38d
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 89 deletions.
20 changes: 20 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ type runsPlugins interface {
SetStrategy(RoutingStrategy)
Strategy() RoutingStrategy
Monitor() *monitor
runPlugin(string) error
}

type managesPlugins interface {
Expand Down Expand Up @@ -521,13 +522,32 @@ func (p *pluginControl) SubscribeDeps(taskId string, mts []core.Metric, plugins
return perrs
}
pool.subscribe(taskId, unboundSubscriptionType)
if pool.eligible() {
err = p.pluginRunner.runPlugin(latest.Path)
if err != nil {
perrs = append(perrs, perror.New(err))
return perrs
}
}
} else {
pool, err := p.pluginRunner.AvailablePlugins().getOrCreatePool(fmt.Sprintf("%s:%s:%d", sub.TypeName(), sub.Name(), sub.Version()))
if err != nil {
perrs = append(perrs, perror.New(err))
return perrs
}
pool.subscribe(taskId, boundSubscriptionType)
if pool.eligible() {
pl, err := p.pluginManager.get(fmt.Sprintf("%s:%s:%d", sub.TypeName(), sub.Name(), sub.Version()))
if err != nil {
perrs = append(perrs, perror.New(err))
return perrs
}
err = p.pluginRunner.runPlugin(pl.Path)
if err != nil {
perrs = append(perrs, perror.New(err))
return perrs
}
}
}
perr := p.sendPluginSubscriptionEvent(taskId, sub)
if perr != nil {
Expand Down
24 changes: 5 additions & 19 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -802,7 +802,7 @@ func TestCollectMetrics(t *testing.T) {
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:dummy1:1")
So(errp, ShouldBeNil)
pool.subscribe("1", unboundSubscriptionType)
err = c.sendPluginSubscriptionEvent("1", lp)
err = c.pluginRunner.runPlugin(lp.Path)
So(err, ShouldBeNil)
m = append(m, m1, m2, m3)
time.Sleep(time.Millisecond * 1100)
Expand Down Expand Up @@ -886,18 +886,11 @@ func TestPublishMetrics(t *testing.T) {
Convey("Subscribe to file publisher with good config", func() {
n := cdata.NewNode()
config.Plugins.Publisher.Plugins[lp.Name()] = newPluginConfigItem(optAddPluginConfigItem("file", ctypes.ConfigValueStr{Value: "/tmp/pulse-TestPublishMetrics.out"}))
p := mockPlugin{
name: "file",
pluginType: core.PublisherPluginType,
ver: 1,
config: n,
}
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("publisher:file:1")
So(errp, ShouldBeNil)
pool.subscribe("1", unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent("1", p)
So(errs, ShouldBeNil)
<-lpe.done
err := c.pluginRunner.runPlugin(lp.Path)
So(err, ShouldBeNil)
time.Sleep(2500 * time.Millisecond)

Convey("Publish to file", func() {
Expand Down Expand Up @@ -944,18 +937,11 @@ func TestProcessMetrics(t *testing.T) {

Convey("Subscribe to passthru processor with good config", func() {
n := cdata.NewNode()
p := mockPlugin{
name: "passthru",
pluginType: core.ProcessorPluginType,
ver: 1,
config: n,
}
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("processor:passthru:1")
So(errp, ShouldBeNil)
pool.subscribe("1", unboundSubscriptionType)
errs := c.sendPluginSubscriptionEvent("1", p)
So(errs, ShouldBeNil)
<-lpe.done
err := c.pluginRunner.runPlugin(lp.Path)
So(err, ShouldBeNil)
time.Sleep(2500 * time.Millisecond)

Convey("process metrics", func() {
Expand Down
74 changes: 4 additions & 70 deletions control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,22 +257,6 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
if pool != nil {
pool.kill(v.Id, "plugin dead")
}
case *control_event.PluginSubscriptionEvent:
runnerLog.WithFields(log.Fields{
"_block": "subscribe-pool",
"event": v.Namespace(),
"plugin-name": v.PluginName,
"plugin-version": v.PluginVersion,
"plugin-type": v.PluginType,
}).Debug("handling plugin subscription event")

r.handleSubscription(
core.PluginType(v.PluginType).String(),
v.PluginName,
v.PluginVersion,
v.TaskId,
subscriptionType(v.SubscriptionType),
)

case *control_event.PluginUnsubscriptionEvent:
runnerLog.WithFields(log.Fields{
Expand Down Expand Up @@ -377,14 +361,15 @@ func (r *runner) HandleGomitEvent(e gomit.Event) {
}
}

func (r *runner) runPlugin(path string) {
func (r *runner) runPlugin(path string) error {
ePlugin, err := plugin.NewExecutablePlugin(r.pluginManager.GenerateArgs(path), path)
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "run-plugin",
"path": path,
"error": err,
}).Error("error creating executable plugin")
return err
}
_, err = r.startPlugin(ePlugin)
if err != nil {
Expand All @@ -393,62 +378,11 @@ func (r *runner) runPlugin(path string) {
"path": path,
"error": err,
}).Error("error starting new plugin")
return err
}
return nil
}

func (r *runner) handleSubscription(pType, pName string, pVersion int, taskId string, subType subscriptionType) {
pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion))
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-subscription",
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
}).Error("error retrieving pool")
return
}
if pool == nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-subscription",
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
}).Error("pool not found")
return
}
runnerLog.WithFields(log.Fields{
"_block": "handle-subscription",
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
}).Debug(fmt.Sprintf("found pool: version %d", pool.version))
runnerLog.WithFields(log.Fields{
"_block": "handle-subscription",
"pool-count": pool.count(),
"pool-subscription-count": pool.subscriptionCount(),
"pool-max": pool.max,
"pool-eligibility": pool.eligible(),
}).Debug("checking is pool is eligible to grow.")
if pool.eligible() {
runnerLog.WithFields(log.Fields{
"_block": "handle-subscription",
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
}).Debug("pool is eligible. starting a new available plugin")
plugin, err := r.pluginManager.get(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion))
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-subscription",
"plugin-name": pName,
"plugin-version": pVersion,
"plugin-type": pType,
}).Error("plugin not found")
return
}
r.runPlugin(plugin.Path)
}
}
func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskId string) error {
pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion))
if err != nil {
Expand Down

0 comments on commit 706a38d

Please sign in to comment.