Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
First implementation of dynamic query support
Browse files Browse the repository at this point in the history
  • Loading branch information
IzabellaRaulin committed Apr 6, 2016
1 parent c86d08c commit 4a20ccc
Show file tree
Hide file tree
Showing 12 changed files with 781 additions and 69 deletions.
44 changes: 35 additions & 9 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
http://www.apache.org/licenses/LICENSE-2.0.txt
Copyright 2015 Intel Corporation
Copyright 2015-2016 Intel Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -60,6 +60,7 @@ var (

// ErrLoadedPluginNotFound - error message when a loaded plugin is not found
ErrLoadedPluginNotFound = errors.New("Loaded plugin not found")

// ErrControllerNotStarted - error message when the Controller was not started
ErrControllerNotStarted = errors.New("Must start Controller before calling Load()")
)
Expand Down Expand Up @@ -109,6 +110,8 @@ type managesPlugins interface {

type catalogsMetrics interface {
Get([]string, int) (*metricType, error)
GetQueriedNamespaces([]string) ([][]string, error)
MatchQuery([]string) ([][]string, error)
Add(*metricType)
AddLoadedMetricType(*loadedPlugin, core.Metric) error
RmUnloadedPluginMetrics(lp *loadedPlugin)
Expand Down Expand Up @@ -445,10 +448,31 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged
return nil
}

// MatchQueryToNamespaces performs the process of matching the 'ns' with namespaces of all cataloged metrics
func (p *pluginControl) MatchQueryToNamespaces(ns []string) ([][]string, serror.SnapError) {
// carry out the matching process
nss, err := p.metricCatalog.MatchQuery(ns)
if err != nil {
return nil, serror.New(err)
}
return nss, nil
}

// ExpandWildcards returns all matched metrics namespaces with given 'ns'
// as the results of matching query process which has been done
func (p *pluginControl) ExpandWildcards(ns []string) ([][]string, serror.SnapError) {
// retrieve queried namespaces
nss, err := p.metricCatalog.GetQueriedNamespaces(ns)
if err != nil {
return nil, serror.New(err)
}
return nss, nil
}

func (p *pluginControl) ValidateDeps(mts []core.Metric, plugins []core.SubscribedPlugin) []serror.SnapError {
var serrs []serror.SnapError
for _, mt := range mts {
_, errs := p.validateMetricTypeSubscription(mt, mt.Config())
errs := p.validateMetricTypeSubscription(mt, mt.Config())
if len(errs) > 0 {
serrs = append(serrs, errs...)
}
Expand Down Expand Up @@ -506,7 +530,7 @@ func (p *pluginControl) validatePluginSubscription(pl core.SubscribedPlugin) []s
return serrs
}

func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric, cd *cdata.ConfigDataNode) (core.Metric, []serror.SnapError) {
func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric, cd *cdata.ConfigDataNode) []serror.SnapError {
var serrs []serror.SnapError
controlLogger.WithFields(log.Fields{
"_block": "validate-metric-subscription",
Expand All @@ -515,25 +539,26 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,
}).Info("subscription called on metric")

m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())

if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": core.JoinNamespace(mt.Namespace()),
"version": mt.Version(),
}))
return nil, serrs
return serrs
}

// No metric found return error.
if m == nil {
serrs = append(serrs, serror.New(fmt.Errorf("no metric found cannot subscribe: (%s) version(%d)", mt.Namespace(), mt.Version())))
return nil, serrs
return serrs
}

m.config = cd

typ, serr := core.ToPluginType(m.Plugin.TypeName())
if serr != nil {
return nil, []serror.SnapError{serror.New(err)}
return []serror.SnapError{serror.New(err)}
}

// merge global plugin config
Expand All @@ -549,19 +574,19 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,
if m.policy.HasRules() {
if m.Config() == nil {
serrs = append(serrs, serror.New(fmt.Errorf("Policy defined for metric, (%s) version (%d), but no config defined in manifest", mt.Namespace(), mt.Version())))
return nil, serrs
return serrs
}
ncdTable, errs := m.policy.Process(m.Config().Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
serrs = append(serrs, serror.New(e))
}
return nil, serrs
return serrs
}
m.config = cdata.FromTable(*ncdTable)
}

return m, serrs
return serrs
}

func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]core.Plugin, []serror.SnapError) {
Expand Down Expand Up @@ -836,6 +861,7 @@ func (p *pluginControl) MetricExists(mns []string, ver int) bool {
// of metrics and errors. If an error is encountered no metrics will be
// returned.
func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.Time, taskID string) (metrics []core.Metric, errs []error) {

pluginToMetricMap, err := groupMetricTypesByPlugin(p.metricCatalog, metricTypes)
if err != nil {
errs = append(errs, err)
Expand Down
Loading

0 comments on commit 4a20ccc

Please sign in to comment.