Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Add an arbitrary separator for plugin keys
Browse files Browse the repository at this point in the history
This separator is arbitrary enough to be unused in a plugin name. 🐢
  • Loading branch information
kindermoumoute committed Sep 10, 2016
1 parent dda8bab commit 8ee7d05
Show file tree
Hide file tree
Showing 12 changed files with 62 additions and 53 deletions.
14 changes: 7 additions & 7 deletions control/available_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func newAvailablePlugin(resp plugin.Response, emitter gomit.Emitter, ep executab
lastHitTime: time.Now(),
ePlugin: ep,
}
ap.key = fmt.Sprintf("%s:%s:%d", ap.pluginType.String(), ap.name, ap.version)
ap.key = fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", ap.pluginType.String(), ap.name, ap.version)

listenURL := fmt.Sprintf("http://%v/rpc", resp.ListenAddress)
// Create RPC Client
Expand Down Expand Up @@ -327,7 +327,7 @@ func (ap *availablePlugins) insert(pl *availablePlugin) error {
ap.Lock()
defer ap.Unlock()

key := fmt.Sprintf("%s:%s:%d", pl.TypeName(), pl.name, pl.version)
key := fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pl.TypeName(), pl.name, pl.version)
_, exists := ap.table[key]
if !exists {
p, err := strategy.NewPool(key, pl)
Expand All @@ -348,7 +348,7 @@ func (ap *availablePlugins) getPool(key string) (strategy.Pool, serror.SnapError
defer ap.RUnlock()
pool, ok := ap.table[key]
if !ok {
tnv := strings.Split(key, ":")
tnv := strings.Split(key, core.Separator)
if len(tnv) != 3 {
return nil, serror.New(ErrBadKey, map[string]interface{}{
"key": key,
Expand Down Expand Up @@ -441,7 +441,7 @@ func (ap *availablePlugins) collectMetrics(pluginKey string, metricTypes []core.

func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) []error {
var errs []error
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
key := strings.Join([]string{plugin.PublisherPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator)
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
Expand Down Expand Up @@ -476,7 +476,7 @@ func (ap *availablePlugins) publishMetrics(metrics []core.Metric, pluginName str

func (ap *availablePlugins) processMetrics(metrics []core.Metric, pluginName string, pluginVersion int, config map[string]ctypes.ConfigValue, taskID string) ([]core.Metric, []error) {
var errs []error
key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, ":")
key := strings.Join([]string{plugin.ProcessorPluginType.String(), pluginName, strconv.Itoa(pluginVersion)}, core.Separator)
pool, serr := ap.getPool(key)
if serr != nil {
errs = append(errs, serr)
Expand Down Expand Up @@ -512,15 +512,15 @@ func (ap *availablePlugins) findLatestPool(pType, name string) (strategy.Pool, s
// see if there exists a pool at all which matches name version.
var latest strategy.Pool
for key, pool := range ap.table {
tnv := strings.Split(key, ":")
tnv := strings.Split(key, core.Separator)
if tnv[0] == pType && tnv[1] == name {
latest = pool
break
}
}
if latest != nil {
for key, pool := range ap.table {
tnv := strings.Split(key, ":")
tnv := strings.Split(key, core.Separator)
if tnv[0] == pType && tnv[1] == name && pool.Version() > latest.Version() {
latest = pool
}
Expand Down
3 changes: 2 additions & 1 deletion control/available_plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"github.com/intelsdi-x/snap/control/fixtures"
"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/core"
. "github.com/smartystreets/goconvey/convey"
)

Expand Down Expand Up @@ -84,7 +85,7 @@ func TestAvailablePlugins(t *testing.T) {
err := aps.insert(ap)
So(err, ShouldBeNil)

pool, err := aps.getPool("collector:test:1")
pool, err := aps.getPool("collector" + core.Separator + "test" + core.Separator + "1")
So(err, ShouldBeNil)
nap, ok := pool.Plugins()[ap.id]
So(ok, ShouldBeTrue)
Expand Down
2 changes: 1 addition & 1 deletion control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ func (p *pluginConfig) deletePluginConfigDataNodeField(pluginType core.PluginTyp

func (p *pluginConfig) getPluginConfigDataNode(pluginType core.PluginType, name string, ver int) *cdata.ConfigDataNode {
// check cache
key := fmt.Sprintf("%d:%s:%d", pluginType, name, ver)
key := fmt.Sprintf("%d"+core.Separator+"%s"+core.Separator+"%d", pluginType, name, ver)
if res, ok := p.pluginCache[key]; ok {
return res
}
Expand Down
54 changes: 27 additions & 27 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func TestMetricConfig(t *testing.T) {
So(errs, ShouldBeNil)
})
Convey("So mock should have name: bob config from defaults", func() {
So(c.Config.Plugins.pluginCache["0:mock:1"].Table()["name"], ShouldResemble, ctypes.ConfigValueStr{Value: "bob"})
So(c.Config.Plugins.pluginCache["0"+core.Separator+"mock"+core.Separator+"1"].Table()["name"], ShouldResemble, ctypes.ConfigValueStr{Value: "bob"})
})

c.Stop()
Expand Down Expand Up @@ -892,10 +892,10 @@ func TestRoutingCachingStrategy(t *testing.T) {
cdt.Add([]string{"intel", "mock"}, node)

Convey("Start the plugins", func() {
lp, err := c.pluginManager.get("collector:mock:2")
lp, err := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "2")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "2")
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)
tasks := []string{
Expand Down Expand Up @@ -964,10 +964,10 @@ func TestRoutingCachingStrategy(t *testing.T) {
cdt.Add([]string{"intel", "mock"}, node)

Convey("Start the plugins", func() {
lp, err := c.pluginManager.get("collector:mock:1")
lp, err := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "1")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
time.Sleep(1 * time.Second)
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)
Expand Down Expand Up @@ -1055,10 +1055,10 @@ func TestCollectDynamicMetrics(t *testing.T) {
cdt := cdata.NewTree()

Convey("collects metrics from plugin using native client", func() {
lp, err := c.pluginManager.get("collector:mock:2")
lp, err := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "2")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "2")
So(errp, ShouldBeNil)
So(pool, ShouldNotBeNil)
taskID := uuid.New()
Expand Down Expand Up @@ -1149,12 +1149,12 @@ func TestFailedPlugin(t *testing.T) {
So(serrs, ShouldBeNil)

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector:mock:2")
lp, err := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "2")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)

Convey("create a pool, add subscriptions and start plugins", func() {
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "2")
So(errp, ShouldBeNil)
Convey("collect metrics against a plugin that will panic", func() {
So(pool.Count(), ShouldEqual, 1)
Expand Down Expand Up @@ -1230,7 +1230,7 @@ func TestCollectMetrics(t *testing.T) {
}

// retrieve loaded plugin
lp, err := c.pluginManager.get("collector:mock:1")
lp, err := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "1")
So(err, ShouldBeNil)
So(lp, ShouldNotBeNil)

Expand All @@ -1250,7 +1250,7 @@ func TestCollectMetrics(t *testing.T) {
serrs = c.SubscribeDeps(taskNonHit, r, []core.SubscribedPlugin{subscribedPlugin{typeName: "collector", name: "mock", version: 1}}, cdt)
So(serrs, ShouldBeNil)

pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
pool, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
So(errp, ShouldBeNil)

Convey("collect metrics", func() {
Expand Down Expand Up @@ -1427,7 +1427,7 @@ func TestMetricSubscriptionToNewVersion(t *testing.T) {
<-lpe.load
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:1")
lp, err2 := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "1")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
//Subscribe deps to create pools.
Expand Down Expand Up @@ -1464,11 +1464,11 @@ func TestMetricSubscriptionToNewVersion(t *testing.T) {
So(false, ShouldEqual, true)
}

pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 0)

pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "2")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

Expand All @@ -1493,7 +1493,7 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) {
<-lpe.load
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:2")
lp, err2 := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "2")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
requestedMetric := fixtures.NewMockRequestedMetric(
Expand Down Expand Up @@ -1539,11 +1539,11 @@ func TestMetricSubscriptionToOlderVersion(t *testing.T) {
var pool1 strategy.Pool
var errp error
ap := c.pluginRunner.AvailablePlugins()
pool1, errp = ap.getOrCreatePool("collector:mock:2")
pool1, errp = ap.getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "2")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 0)

pool2, errp := ap.getOrCreatePool("collector:mock:1")
pool2, errp := ap.getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

Expand All @@ -1570,7 +1570,7 @@ func TestDynamicMetricSubscriptionLoad(t *testing.T) {
_, err := load(c, path.Join(fixtures.SnapPath, "plugin", "snap-plugin-collector-mock1"))
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:1")
lp, err2 := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "1")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
//Subscribe deps to create pools.
Expand Down Expand Up @@ -1599,7 +1599,7 @@ func TestDynamicMetricSubscriptionLoad(t *testing.T) {
<-lpe.load // wait for load event
<-lpe.sub // wait for subscription event

pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 1)

Expand Down Expand Up @@ -1627,7 +1627,7 @@ func TestDynamicMetricSubscriptionUnload(t *testing.T) {
_, err = load(c, path.Join(fixtures.SnapPath, "plugin", "snap-plugin-collector-anothermock1"))
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 2)
lpMock, err2 := c.pluginManager.get("collector:mock:1")
lpMock, err2 := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "1")
So(err2, ShouldBeNil)
So(lpMock.Name(), ShouldResemble, "mock")
lpAMock, err3 := c.pluginManager.get("collector:anothermock:1")
Expand Down Expand Up @@ -1665,7 +1665,7 @@ func TestDynamicMetricSubscriptionUnload(t *testing.T) {
So(errs, ShouldBeNil)
So(len(mts1), ShouldBeGreaterThan, 1)
Convey("Unloading mock plugin should remove its subscriptions", func() {
pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 1)
_, err = c.Unload(lpMock)
Expand Down Expand Up @@ -1703,7 +1703,7 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
_, err := load(c, path.Join(fixtures.SnapPath, "plugin", "snap-plugin-collector-mock1"))
So(err, ShouldBeNil)
So(len(c.pluginManager.all()), ShouldEqual, 1)
lp, err2 := c.pluginManager.get("collector:mock:1")
lp, err2 := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "1")
So(err2, ShouldBeNil)
So(lp.Name(), ShouldResemble, "mock")
//Subscribe deps to create pools.
Expand All @@ -1719,7 +1719,7 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
<-lpe.load // wait for load event
<-lpe.sub // wait for subscription event
So(serr, ShouldBeNil)
lpMock, err2 := c.pluginManager.get("collector:mock:1")
lpMock, err2 := c.pluginManager.get("collector" + core.Separator + "mock" + core.Separator + "1")
So(err2, ShouldBeNil)
So(lpMock.Name(), ShouldResemble, "mock")
// collect metrics as a sanity check that everything is setup correctly
Expand Down Expand Up @@ -1747,11 +1747,11 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
<-lpe.load // wait for load event
<-lpe.sub // wait for subscription event

pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 1)

pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "2")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

Expand All @@ -1778,11 +1778,11 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
So(err, ShouldBeNil)
<-lpe.unsub

pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:1")
pool1, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "1")
So(errp, ShouldBeNil)
So(pool1.SubscriptionCount(), ShouldEqual, 0)

pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector:mock:2")
pool2, errp := c.pluginRunner.AvailablePlugins().getOrCreatePool("collector" + core.Separator + "mock" + core.Separator + "2")
So(errp, ShouldBeNil)
So(pool2.SubscriptionCount(), ShouldEqual, 1)

Expand Down
6 changes: 3 additions & 3 deletions control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ func (l *loadedPlugins) get(key string) (*loadedPlugin, error) {

lp, ok := l.table[key]
if !ok {
tnv := strings.Split(key, ":")
tnv := strings.Split(key, core.Separator)
if len(tnv) != 3 {
return nil, ErrBadKey
}
Expand Down Expand Up @@ -191,7 +191,7 @@ func (lp *loadedPlugin) PluginPath() string {

// Key returns plugin type, name and version
func (lp *loadedPlugin) Key() string {
return fmt.Sprintf("%s:%s:%d", lp.TypeName(), lp.Name(), lp.Version())
return fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", lp.TypeName(), lp.Name(), lp.Version())
}

// Version returns plugin version
Expand Down Expand Up @@ -496,7 +496,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter

// UnloadPlugin unloads a plugin from the LoadedPlugins table
func (p *pluginManager) UnloadPlugin(pl core.Plugin) (*loadedPlugin, serror.SnapError) {
plugin, err := p.loadedPlugins.get(fmt.Sprintf("%s:%s:%d", pl.TypeName(), pl.Name(), pl.Version()))
plugin, err := p.loadedPlugins.get(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pl.TypeName(), pl.Name(), pl.Version()))
if err != nil {
se := serror.New(ErrPluginNotFound, map[string]interface{}{
"plugin-name": pl.Name(),
Expand Down
8 changes: 4 additions & 4 deletions control/plugin_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestLoadedPlugins(t *testing.T) {
Convey("returns an error when index is out of range", func() {
lp := newLoadedPlugins()

_, err := lp.get("not:found:1")
_, err := lp.get("not" + core.Separator + "found" + core.Separator + "1")
So(err, ShouldResemble, errors.New("plugin not found"))

})
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestUnloadPlugin(t *testing.T) {

numPluginsLoaded := len(p.all())
So(numPluginsLoaded, ShouldEqual, 1)
lp, _ := p.get("collector:mock:2")
lp, _ := p.get("collector" + core.Separator + "mock" + core.Separator + "2")
_, err = p.UnloadPlugin(lp)

So(err, ShouldBeNil)
Expand All @@ -196,7 +196,7 @@ func TestUnloadPlugin(t *testing.T) {
p := newPluginManager()
p.SetMetricCatalog(newMetricCatalog())
lp, err := loadPlugin(p, fixtures.PluginPath)
glp, err2 := p.get("collector:mock:2")
glp, err2 := p.get("collector" + core.Separator + "mock" + core.Separator + "2")
So(err2, ShouldBeNil)
glp.State = DetectedState
_, err = p.UnloadPlugin(lp)
Expand All @@ -210,7 +210,7 @@ func TestUnloadPlugin(t *testing.T) {
p.SetMetricCatalog(newMetricCatalog())
_, err := loadPlugin(p, fixtures.PluginPath)

lp, err2 := p.get("collector:mock:2")
lp, err2 := p.get("collector" + core.Separator + "mock" + core.Separator + "2")
So(err2, ShouldBeNil)
_, err = p.UnloadPlugin(lp)

Expand Down
2 changes: 1 addition & 1 deletion control/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ func (r *runner) runPlugin(details *pluginDetails) error {
}

func (r *runner) handleUnsubscription(pType, pName string, pVersion int, taskID string) error {
pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s:%s:%d", pType, pName, pVersion))
pool, err := r.availablePlugins.getPool(fmt.Sprintf("%s"+core.Separator+"%s"+core.Separator+"%d", pType, pName, pVersion))
if err != nil {
runnerLog.WithFields(log.Fields{
"_block": "handle-unsubscription",
Expand Down
3 changes: 2 additions & 1 deletion control/strategy/fixtures/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/intelsdi-x/snap/control/plugin"
"github.com/intelsdi-x/snap/core"
)

const (
Expand Down Expand Up @@ -133,7 +134,7 @@ func (m MockAvailablePlugin) LastHit() time.Time {
}

func (m MockAvailablePlugin) String() string {
return strings.Join([]string{m.pluginType.String(), m.pluginName, strconv.Itoa(m.Version())}, ":")
return strings.Join([]string{m.pluginType.String(), m.pluginName, strconv.Itoa(m.Version())}, core.Separator)
}

func (m MockAvailablePlugin) Kill(string) error {
Expand Down
2 changes: 1 addition & 1 deletion control/strategy/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ type pool struct {
}

func NewPool(key string, plugins ...AvailablePlugin) (Pool, error) {
versl := strings.Split(key, ":")
versl := strings.Split(key, core.Separator)
ver, err := strconv.Atoi(versl[len(versl)-1])
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 8ee7d05

Please sign in to comment.