Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External Metrics contd #146

Merged
merged 12 commits into from
Apr 3, 2019
Merged
232 changes: 41 additions & 191 deletions Gopkg.lock

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,7 @@ version="v1.4.7"
[prune]
go-tests = true
unused-packages = true

[[constraint]]
name = "github.com/stretchr/testify"
version = "1.3.0"
39 changes: 38 additions & 1 deletion cmd/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics"
adaptercfg "github.com/directxman12/k8s-prometheus-adapter/pkg/config"
cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider"
extprov "github.com/directxman12/k8s-prometheus-adapter/pkg/external-provider"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
resprov "github.com/directxman12/k8s-prometheus-adapter/pkg/resourceprovider"
)

Expand Down Expand Up @@ -159,7 +161,7 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan
}

// extract the namers
namers, err := cmprov.NamersFromConfig(cmd.metricsConfig, mapper)
namers, err := naming.NamersFromConfig(cmd.metricsConfig, mapper)
if err != nil {
return nil, fmt.Errorf("unable to construct naming scheme from metrics rules: %v", err)
}
Expand All @@ -171,6 +173,30 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan
return cmProvider, nil
}

func (cmd *PrometheusAdapter) makeExternalProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.ExternalMetricsProvider, error) {
if len(cmd.metricsConfig.ExternalRules) == 0 {
return nil, nil
}

// grab the mapper
mapper, err := cmd.RESTMapper()
if err != nil {
return nil, fmt.Errorf("unable to construct RESTMapper: %v", err)
}

// collect series converters for adapter
converters, errs := extprov.ConvertersFromConfig(cmd.metricsConfig, mapper)
if len(errs) > 0 {
return nil, fmt.Errorf("unable to construct naming scheme from metrics rules: %v", errs)
}

// construct the provider and start it
emProvider, runner := extprov.NewExternalPrometheusProvider(mapper, promClient, converters, cmd.MetricsRelistInterval)
runner.RunUntil(stopCh)

return emProvider, nil
}

func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error {
if cmd.metricsConfig.ResourceRules == nil {
// bail if we don't have rules for setting things up
Expand Down Expand Up @@ -247,6 +273,17 @@ func main() {
cmd.WithCustomMetrics(cmProvider)
}

// construct the external provider
emProvider, err := cmd.makeExternalProvider(promClient, wait.NeverStop)
if err != nil {
glog.Fatalf("unable to construct external metrics provider: %v", err)
}

// attach the provider to the server, if it's needed
if emProvider != nil {
cmd.WithExternalMetrics(emProvider)
}

// attach resource metrics support, if it's needed
if err := cmd.addResourceMetricsAPI(promClient); err != nil {
glog.Fatalf("unable to install resource metrics API: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type MetricsDiscoveryConfig struct {
// will make only a single API call.
Rules []DiscoveryRule `yaml:"rules"`
ResourceRules *ResourceRules `yaml:"resourceRules,omitempty"`
ExternalRules []DiscoveryRule `yaml:"externalRules,omitempty"`
}

// DiscoveryRule describes a set of rules for transforming Prometheus metrics to/from
Expand Down
5 changes: 3 additions & 2 deletions pkg/custom-provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"k8s.io/metrics/pkg/apis/custom_metrics"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
)

// Runnable represents something that can be run until told to stop.
Expand All @@ -55,7 +56,7 @@ type prometheusProvider struct {
SeriesRegistry
}

func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) {
func NewPrometheusProvider(mapper apimeta.RESTMapper, kubeClient dynamic.Interface, promClient prom.Client, namers []naming.MetricNamer, updateInterval time.Duration, maxAge time.Duration) (provider.CustomMetricsProvider, Runnable) {
lister := &cachingMetricsLister{
updateInterval: updateInterval,
maxAge: maxAge,
Expand Down Expand Up @@ -193,7 +194,7 @@ type cachingMetricsLister struct {
promClient prom.Client
updateInterval time.Duration
maxAge time.Duration
namers []MetricNamer
namers []naming.MetricNamer
}

func (l *cachingMetricsLister) Run() {
Expand Down
3 changes: 2 additions & 1 deletion pkg/custom-provider/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
config "github.com/directxman12/k8s-prometheus-adapter/cmd/config-gen/utils"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
fakeprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/fake"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
pmodel "github.com/prometheus/common/model"
)

Expand All @@ -39,7 +40,7 @@ func setupPrometheusProvider() (provider.CustomMetricsProvider, *fakeprom.FakePr
fakeKubeClient := &fakedyn.FakeDynamicClient{}

cfg := config.DefaultConfig(1*time.Minute, "")
namers, err := NamersFromConfig(cfg, restMapper())
namers, err := naming.NamersFromConfig(cfg, restMapper())
Expect(err).NotTo(HaveOccurred())

prov, _ := NewPrometheusProvider(restMapper(), fakeKubeClient, fakeProm, namers, fakeProviderUpdateInterval, fakeProviderStartDuration)
Expand Down
9 changes: 5 additions & 4 deletions pkg/custom-provider/series_registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
apimeta "k8s.io/apimachinery/pkg/api/meta"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"
)
Expand All @@ -45,7 +46,7 @@ const (
type SeriesRegistry interface {
// SetSeries replaces the known series in this registry.
// Each slice in series should correspond to a MetricNamer in namers.
SetSeries(series [][]prom.Series, namers []MetricNamer) error
SetSeries(series [][]prom.Series, namers []naming.MetricNamer) error
// ListAllMetrics lists all metrics known to this registry
ListAllMetrics() []provider.CustomMetricInfo
// SeriesForMetric looks up the minimum required series information to make a query for the given metric
Expand All @@ -60,7 +61,7 @@ type seriesInfo struct {
seriesName string

// namer is the MetricNamer used to name this series
namer MetricNamer
namer naming.MetricNamer
}

// overridableSeriesRegistry is a basic SeriesRegistry
Expand All @@ -75,7 +76,7 @@ type basicSeriesRegistry struct {
mapper apimeta.RESTMapper
}

func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []MetricNamer) error {
func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers []naming.MetricNamer) error {
if len(newSeriesSlices) != len(namers) {
return fmt.Errorf("need one set of series per namer")
}
Expand All @@ -99,7 +100,7 @@ func (r *basicSeriesRegistry) SetSeries(newSeriesSlices [][]prom.Series, namers
}

// namespace metrics aren't counted as namespaced
if resource == nsGroupResource {
if resource == naming.NsGroupResource {
info.Namespaced = false
}

Expand Down
5 changes: 3 additions & 2 deletions pkg/custom-provider/series_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (

config "github.com/directxman12/k8s-prometheus-adapter/cmd/config-gen/utils"
prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
"github.com/directxman12/k8s-prometheus-adapter/pkg/naming"
)

// restMapper creates a RESTMapper with just the types we need for
Expand All @@ -50,9 +51,9 @@ func restMapper() apimeta.RESTMapper {
return mapper
}

func setupMetricNamer() []MetricNamer {
func setupMetricNamer() []naming.MetricNamer {
cfg := config.DefaultConfig(1*time.Minute, "kube_")
namers, err := NamersFromConfig(cfg, restMapper())
namers, err := naming.NamersFromConfig(cfg, restMapper())
Expect(err).NotTo(HaveOccurred())
return namers
}
Expand Down
165 changes: 165 additions & 0 deletions pkg/external-provider/basic_metric_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provider

import (
"context"
"fmt"
"time"

"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)

// Runnable represents something that can be run until told to stop.
type Runnable interface {
// Run runs the runnable forever.
Run()
// RunUntil runs the runnable until the given channel is closed.
RunUntil(stopChan <-chan struct{})
}

// A MetricLister provides a window into all of the metrics that are available within a given
// Prometheus instance, classified as either Custom or External metrics, but presented generically
// so that it can manage both types simultaneously.
type MetricLister interface {
ListAllMetrics() (MetricUpdateResult, error)
}

// A MetricListerWithNotification is a MetricLister that has the ability to notify listeners
// when new metric data is available.
type MetricListerWithNotification interface {
MetricLister
Runnable

// AddNotificationReceiver registers a callback to be invoked when new metric data is available.
AddNotificationReceiver(MetricUpdateCallback)
// UpdateNow forces an immediate refresh from the source data. Primarily for test purposes.
UpdateNow()
}

type basicMetricLister struct {
promClient prom.Client
converters []SeriesConverter
lookback time.Duration
}

// NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics.
func NewBasicMetricLister(promClient prom.Client, converters []SeriesConverter, lookback time.Duration) MetricLister {
lister := basicMetricLister{
promClient: promClient,
converters: converters,
lookback: lookback,
}

return &lister
}

type selectorSeries struct {
selector prom.Selector
series []prom.Series
}

func (l *basicMetricLister) ListAllMetrics() (MetricUpdateResult, error) {
result := MetricUpdateResult{
series: make([][]prom.Series, 0),
converters: make([]SeriesConverter, 0),
}

startTime := pmodel.Now().Add(-1 * l.lookback)

// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.converters))
errs := make(chan error, len(l.converters))
for _, converter := range l.converters {
sel := converter.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
selectorSeriesChan <- selectorSeries{}
continue
}
selectors[sel] = struct{}{}
go func() {
series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
if err != nil {
errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
return
}
errs <- nil
// Push into the channel: "this selector produced these series"
selectorSeriesChan <- selectorSeries{
selector: sel,
series: series,
}
}()
}

// don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series)

// iterate through, blocking until we've got all results
// We know that, from above, we should have pushed one item into the channel
// for each converter. So here, we'll assume that we should receive one item per converter.
for range l.converters {
if err := <-errs; err != nil {
return result, fmt.Errorf("unable to update list of all metrics: %v", err)
}
// Receive from the channel: "this selector produced these series"
// We stuff that into this map so that we can collect the data as it arrives
// and then, once we've received it all, we can process it below.
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
}
close(errs)

// Now that we've collected all of the results into `seriesCacheByQuery`
// we can start processing them.
newSeries := make([][]prom.Series, len(l.converters))
for i, converter := range l.converters {
series, cached := seriesCacheByQuery[converter.Selector()]
if !cached {
return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", converter.Selector())
}
// Because converters provide a "post-filtering" option, it's not enough to
// simply take all the series that were produced. We need to further filter them.
newSeries[i] = converter.SeriesFilterer().FilterSeries(series)
}

glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)

result.series = newSeries
result.converters = l.converters
return result, nil
}

// MetricUpdateResult represents the output of a periodic inspection of metrics found to be
// available in Prometheus.
// It includes both the series data the Prometheus exposed, as well as the configurational
// object that led to their discovery.
type MetricUpdateResult struct {
series [][]prom.Series
converters []SeriesConverter
}

// MetricUpdateCallback is a function signature for receiving periodic updates about
// available metrics.
type MetricUpdateCallback func(MetricUpdateResult)
34 changes: 34 additions & 0 deletions pkg/external-provider/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provider

import "errors"

var (
// ErrUnsupportedOperator creates an error that represents the fact that we were requested to service a query that
// Prometheus would be unable to support.
ErrUnsupportedOperator = errors.New("operator not supported by prometheus")

// ErrMalformedQuery creates an error that represents the fact that we were requested to service a query
// that was malformed in its operator/value combination.
ErrMalformedQuery = errors.New("operator requires values")

// ErrQueryUnsupportedValues creates an error that represents an unsupported return value from the
// specified query.
ErrQueryUnsupportedValues = errors.New("operator does not support values")

// ErrLabelNotSpecified creates an error that represents the fact that we were requested to service a query
// that was malformed in its label specification.
ErrLabelNotSpecified = errors.New("label not specified")
)
Loading