Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Metric schema and metadata (#872)
Browse files Browse the repository at this point in the history
Resolves #872 - Metric schema and metadata
  • Loading branch information
jcooklin committed Apr 27, 2016
1 parent d122e47 commit f598bd1
Show file tree
Hide file tree
Showing 43 changed files with 747 additions and 608 deletions.
3 changes: 1 addition & 2 deletions cmd/snapctl/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ func watchTask(ctx *cli.Context) {
}()

w := tabwriter.NewWriter(os.Stdout, 0, 8, 1, '\t', 0)
printFields(w, false, 0, "NAMESPACE", "DATA", "TIMESTAMP", "SOURCE")
printFields(w, false, 0, "NAMESPACE", "DATA", "TIMESTAMP")
// Loop listening to events
for {
select {
Expand All @@ -395,7 +395,6 @@ func watchTask(ctx *cli.Context) {
event.Namespace,
event.Data,
event.Timestamp,
event.Source,
)
}
lines = len(e.Event)
Expand Down
54 changes: 30 additions & 24 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,19 @@ type managesPlugins interface {
}

type catalogsMetrics interface {
Get([]string, int) (*metricType, error)
GetQueriedNamespaces([]string) ([][]string, error)
MatchQuery([]string) ([][]string, error)
Get(core.Namespace, int) (*metricType, error)
GetQueriedNamespaces(core.Namespace) ([]core.Namespace, error)
MatchQuery(core.Namespace) ([]core.Namespace, error)
Add(*metricType)
AddLoadedMetricType(*loadedPlugin, core.Metric) error
RmUnloadedPluginMetrics(lp *loadedPlugin)
GetVersions([]string) ([]*metricType, error)
Fetch([]string) ([]*metricType, error)
GetVersions(core.Namespace) ([]*metricType, error)
Fetch(core.Namespace) ([]*metricType, error)
Item() (string, []*metricType)
Next() bool
Subscribe([]string, int) error
Unsubscribe([]string, int) error
GetPlugin([]string, int) (*loadedPlugin, error)
GetPlugin(core.Namespace, int) (*loadedPlugin, error)
}

type managesSigning interface {
Expand Down Expand Up @@ -449,7 +449,7 @@ func (p *pluginControl) SwapPlugins(in *core.RequestedPlugin, out core.Cataloged
}

// MatchQueryToNamespaces performs the process of matching the 'ns' with namespaces of all cataloged metrics
func (p *pluginControl) MatchQueryToNamespaces(ns []string) ([][]string, serror.SnapError) {
func (p *pluginControl) MatchQueryToNamespaces(ns core.Namespace) ([]core.Namespace, serror.SnapError) {
// carry out the matching process
nss, err := p.metricCatalog.MatchQuery(ns)
if err != nil {
Expand All @@ -460,7 +460,7 @@ func (p *pluginControl) MatchQueryToNamespaces(ns []string) ([][]string, serror.

// ExpandWildcards returns all matched metrics namespaces with given 'ns'
// as the results of matching query process which has been done
func (p *pluginControl) ExpandWildcards(ns []string) ([][]string, serror.SnapError) {
func (p *pluginControl) ExpandWildcards(ns core.Namespace) ([]core.Namespace, serror.SnapError) {
// retrieve queried namespaces
nss, err := p.metricCatalog.GetQueriedNamespaces(ns)
if err != nil {
Expand Down Expand Up @@ -542,7 +542,7 @@ func (p *pluginControl) validateMetricTypeSubscription(mt core.RequestedMetric,

if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": core.JoinNamespace(mt.Namespace()),
"name": mt.Namespace().String(),
"version": mt.Version(),
}))
return serrs
Expand Down Expand Up @@ -611,7 +611,7 @@ func (p *pluginControl) gatherCollectors(mts []core.Metric) ([]gatheredPlugin, [
m, err := p.metricCatalog.Get(mt.Namespace(), mt.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": core.JoinNamespace(mt.Namespace()),
"name": mt.Namespace().String(),
"version": mt.Version(),
}))
continue
Expand Down Expand Up @@ -846,12 +846,12 @@ func (p *pluginControl) AvailablePlugins() []core.AvailablePlugin {
// MetricCatalog returns the entire metric catalog
// NOTE: The returned data from this function should be considered constant and read only
func (p *pluginControl) MetricCatalog() ([]core.CatalogedMetric, error) {
return p.FetchMetrics([]string{}, 0)
return p.FetchMetrics(core.Namespace{}, 0)
}

// FetchMetrics returns the metrics which fall under the given namespace
// NOTE: The returned data from this function should be considered constant and read only
func (p *pluginControl) FetchMetrics(ns []string, version int) ([]core.CatalogedMetric, error) {
func (p *pluginControl) FetchMetrics(ns core.Namespace, version int) ([]core.CatalogedMetric, error) {
mts, err := p.metricCatalog.Fetch(ns)
if err != nil {
return nil, err
Expand All @@ -869,11 +869,11 @@ func (p *pluginControl) FetchMetrics(ns []string, version int) ([]core.Cataloged
return cmt, nil
}

func (p *pluginControl) GetMetric(ns []string, ver int) (core.CatalogedMetric, error) {
func (p *pluginControl) GetMetric(ns core.Namespace, ver int) (core.CatalogedMetric, error) {
return p.metricCatalog.Get(ns, ver)
}

func (p *pluginControl) GetMetricVersions(ns []string) ([]core.CatalogedMetric, error) {
func (p *pluginControl) GetMetricVersions(ns core.Namespace) ([]core.CatalogedMetric, error) {
mts, err := p.metricCatalog.GetVersions(ns)
if err != nil {
return nil, err
Expand All @@ -886,7 +886,7 @@ func (p *pluginControl) GetMetricVersions(ns []string) ([]core.CatalogedMetric,
return rmts, nil
}

func (p *pluginControl) MetricExists(mns []string, ver int) bool {
func (p *pluginControl) MetricExists(mns core.Namespace, ver int) bool {
_, err := p.metricCatalog.Get(mns, ver)
if err == nil {
return true
Expand Down Expand Up @@ -932,6 +932,12 @@ func (p *pluginControl) CollectMetrics(metricTypes []core.Metric, deadline time.

go func() {
for m := range cMetrics {
// Reapply standard tags after collection as a precaution. It is common for
// plugin authors to inadvertently overwrite or not pass along the data
// passed to CollectMetrics so we will help them out here.
for i := range m {
m[i] = addStandardTags(m[i])
}
metrics = append(metrics, m...)
wg.Done()
}
Expand Down Expand Up @@ -1035,20 +1041,20 @@ func (r *requestedPlugin) Config() *cdata.ConfigDataNode {
// ------------------- helper struct and function for grouping metrics types ------

// just a tuple of loadedPlugin and metricType slice
type pluginMetricTypes struct {
type metricTypes struct {
plugin *loadedPlugin
metricTypes []core.Metric
}

func (p *pluginMetricTypes) Count() int {
func (p *metricTypes) Count() int {
return len(p.metricTypes)
}

// groupMetricTypesByPlugin groups metricTypes by a plugin.Key() and returns appropriate structure
func groupMetricTypesByPlugin(cat catalogsMetrics, metricTypes []core.Metric) (map[string]pluginMetricTypes, serror.SnapError) {
pmts := make(map[string]pluginMetricTypes)
func groupMetricTypesByPlugin(cat catalogsMetrics, mts []core.Metric) (map[string]metricTypes, serror.SnapError) {
pmts := make(map[string]metricTypes)
// For each plugin type select a matching available plugin to call
for _, incomingmt := range metricTypes {
for _, incomingmt := range mts {
version := incomingmt.Version()
if version == 0 {
// If the version is not provided we will choose the latest
Expand All @@ -1058,17 +1064,17 @@ func groupMetricTypesByPlugin(cat catalogsMetrics, metricTypes []core.Metric) (m
if err != nil {
return nil, serror.New(err)
}
returnedmt := plugin.PluginMetricType{
Namespace_: incomingmt.Namespace(),
returnedmt := plugin.MetricType{
Namespace_: catalogedmt.Namespace(),
LastAdvertisedTime_: catalogedmt.LastAdvertisedTime(),
Version_: incomingmt.Version(),
Tags_: catalogedmt.Tags(),
Labels_: catalogedmt.Labels(),
Config_: incomingmt.Config(),
Unit_: catalogedmt.Unit(),
}
lp := catalogedmt.Plugin
if lp == nil {
return nil, serror.New(errorMetricNotFound(incomingmt.Namespace()))
return nil, serror.New(errorMetricNotFound(incomingmt.Namespace().String()))
}
key := lp.Key()
pmt, _ := pmts[key]
Expand Down
Loading

0 comments on commit f598bd1

Please sign in to comment.