From b05093cf88a8cde55ad8dbdb761f59cabcf70f66 Mon Sep 17 00:00:00 2001 From: Mateusz Kleina Date: Tue, 11 Jul 2017 14:15:07 +0200 Subject: [PATCH] Fix publishers and processors subscription --- control/available_plugin.go | 4 ++-- control/subscription_group.go | 17 +++++++++++++++-- scheduler/workflow_test.go | 8 +++++++- 3 files changed, 24 insertions(+), 5 deletions(-) diff --git a/control/available_plugin.go b/control/available_plugin.go index 21878a36e..c8464be2b 100644 --- a/control/available_plugin.go +++ b/control/available_plugin.go @@ -625,7 +625,7 @@ func (ap *availablePlugins) findLatestPool(pType, name string) (strategy.Pool, s var latest strategy.Pool for key, pool := range ap.table { tnv := strings.Split(key, core.Separator) - if tnv[0] == pType && tnv[1] == name { + if tnv[0] == pType && tnv[1] == name && pool.Count() > 0 { latest = pool break } @@ -633,7 +633,7 @@ func (ap *availablePlugins) findLatestPool(pType, name string) (strategy.Pool, s if latest != nil { for key, pool := range ap.table { tnv := strings.Split(key, core.Separator) - if tnv[0] == pType && tnv[1] == name && pool.Version() > latest.Version() { + if tnv[0] == pType && tnv[1] == name && pool.Version() > latest.Version() && pool.Count() > 0 { latest = pool } } diff --git a/control/subscription_group.go b/control/subscription_group.go index f60ed041a..b7e157fe2 100644 --- a/control/subscription_group.go +++ b/control/subscription_group.go @@ -433,8 +433,6 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) { // notice that requested plugins contains only processors and publishers for _, plugin := range s.requestedPlugins { - // add processors and publishers to collectors just gathered - plugins = append(plugins, plugin) // add defaults to plugins (exposed in a plugins ConfigPolicy) if lp, err := s.pluginManager.get( fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", @@ -445,6 +443,21 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) { // set defaults to plugin config plugin.Config().ApplyDefaults(policy.Defaults()) } + + // update version info for subscribed processor or publisher + version := plugin.Version() + if version < 1 { + version = lp.Version() + } + s := subscribedPlugin{ + name: plugin.Name(), + typeName: plugin.TypeName(), + version: version, + config: plugin.Config(), + } + + // add processors and publishers to collectors just gathered + plugins = append(plugins, s) } } // calculates those plugins that need to be subscribed and unsubscribed to diff --git a/scheduler/workflow_test.go b/scheduler/workflow_test.go index a30a0f8e7..47d2c7ec8 100644 --- a/scheduler/workflow_test.go +++ b/scheduler/workflow_test.go @@ -143,7 +143,7 @@ func TestCollectPublishWorkflow(t *testing.T) { So(err, ShouldBeNil) rp2, err := core.NewRequestedPlugin(snap_publisher_file_path, c.GetTempDir(), nil) So(err, ShouldBeNil) - _, err = c.Load(rp2) + plugPublisher, err := c.Load(rp2) So(err, ShouldBeNil) rp3, err := core.NewRequestedPlugin(snap_processor_passthru_path, c.GetTempDir(), nil) So(err, ShouldBeNil) @@ -189,6 +189,12 @@ func TestCollectPublishWorkflow(t *testing.T) { So(t.LastFailureMessage(), ShouldBeEmpty) So(t.FailedCount(), ShouldEqual, 0) So(t.HitCount(), ShouldBeGreaterThan, metricsToCollect) + + // check if task fails after unloading publisher + c.Unload(plugPublisher) + <-el.done + So(t.LastFailureMessage(), ShouldNotBeEmpty) + So(t.FailedCount(), ShouldBeGreaterThan, 0) }) }) })