Skip to content

Commit

Permalink
Merge pull request kubernetes-sigs#158 from linux-on-ibm-z/cross-compile
Browse files Browse the repository at this point in the history
Edited Makefile to add cross build support for s390x.

Adding External Metrics Provider
  • Loading branch information
DirectXMan12 authored and john-delivuk committed Feb 10, 2019
2 parents dce283d + a02ca0f commit c2e176b
Show file tree
Hide file tree
Showing 17 changed files with 1,913 additions and 1 deletion.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ ifeq ($(ARCH),ppc64le)
endif
ifeq ($(ARCH),s390x)
BASEIMAGE?=s390x/busybox
GOIMAGE=s390x/golang:1.10
endif

.PHONY: all docker-build push-% push test verify-gofmt gofmt verify build-local-image
Expand Down
36 changes: 36 additions & 0 deletions cmd/adapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
mprom "github.com/directxman12/k8s-prometheus-adapter/pkg/client/metrics"
adaptercfg "github.com/directxman12/k8s-prometheus-adapter/pkg/config"
cmprov "github.com/directxman12/k8s-prometheus-adapter/pkg/custom-provider"
extprov "github.com/directxman12/k8s-prometheus-adapter/pkg/external-provider"
resprov "github.com/directxman12/k8s-prometheus-adapter/pkg/resourceprovider"
)

Expand Down Expand Up @@ -171,6 +172,30 @@ func (cmd *PrometheusAdapter) makeProvider(promClient prom.Client, stopCh <-chan
return cmProvider, nil
}

func (cmd *PrometheusAdapter) makeExternalProvider(promClient prom.Client, stopCh <-chan struct{}) (provider.ExternalMetricsProvider, error) {
if len(cmd.metricsConfig.ExternalRules) == 0 {
return nil, nil
}

// grab the mapper
mapper, err := cmd.RESTMapper()
if err != nil {
return nil, fmt.Errorf("unable to construct RESTMapper: %v", err)
}

// collect series converters for adapter
converters, errs := extprov.ConvertersFromConfig(cmd.metricsConfig, mapper)
if len(errs) > 0 {
return nil, fmt.Errorf("unable to construct naming scheme from metrics rules: %v", errs)
}

// construct the provider and start it
emProvider, runner := extprov.NewExternalPrometheusProvider(mapper, promClient, converters, cmd.MetricsRelistInterval)
runner.RunUntil(stopCh)

return emProvider, nil
}

func (cmd *PrometheusAdapter) addResourceMetricsAPI(promClient prom.Client) error {
if cmd.metricsConfig.ResourceRules == nil {
// bail if we don't have rules for setting things up
Expand Down Expand Up @@ -247,6 +272,17 @@ func main() {
cmd.WithCustomMetrics(cmProvider)
}

// construct the external provider
emProvider, err := cmd.makeExternalProvider(promClient, wait.NeverStop)
if err != nil {
glog.Fatalf("unable to construct external metrics provider: %v", err)
}

// attach the provider to the server, if it's needed
if emProvider != nil {
cmd.WithExternalMetrics(emProvider)
}

// attach resource metrics support, if it's needed
if err := cmd.addResourceMetricsAPI(promClient); err != nil {
glog.Fatalf("unable to install resource metrics API: %v", err)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type MetricsDiscoveryConfig struct {
// will make only a single API call.
Rules []DiscoveryRule `yaml:"rules"`
ResourceRules *ResourceRules `yaml:"resourceRules,omitempty"`
ExternalRules []DiscoveryRule `yaml:"externalRules,omitempty"`
}

// DiscoveryRule describes a set of rules for transforming Prometheus metrics to/from
Expand Down
165 changes: 165 additions & 0 deletions pkg/external-provider/basic_metric_lister.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package provider

import (
"context"
"fmt"
"time"

"github.com/golang/glog"
pmodel "github.com/prometheus/common/model"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)

// Runnable represents something that can be run until told to stop.
type Runnable interface {
// Run runs the runnable forever.
Run()
// RunUntil runs the runnable until the given channel is closed.
RunUntil(stopChan <-chan struct{})
}

// A MetricLister provides a window into all of the metrics that are available within a given
// Prometheus instance, classified as either Custom or External metrics, but presented generically
// so that it can manage both types simultaneously.
type MetricLister interface {
ListAllMetrics() (MetricUpdateResult, error)
}

// A MetricListerWithNotification is a MetricLister that has the ability to notify listeners
// when new metric data is available.
type MetricListerWithNotification interface {
MetricLister
Runnable

// AddNotificationReceiver registers a callback to be invoked when new metric data is available.
AddNotificationReceiver(MetricUpdateCallback)
// UpdateNow forces an immediate refresh from the source data. Primarily for test purposes.
UpdateNow()
}

type basicMetricLister struct {
promClient prom.Client
converters []SeriesConverter
lookback time.Duration
}

// NewBasicMetricLister creates a MetricLister that is capable of interactly directly with Prometheus to list metrics.
func NewBasicMetricLister(promClient prom.Client, converters []SeriesConverter, lookback time.Duration) MetricLister {
lister := basicMetricLister{
promClient: promClient,
converters: converters,
lookback: lookback,
}

return &lister
}

type selectorSeries struct {
selector prom.Selector
series []prom.Series
}

func (l *basicMetricLister) ListAllMetrics() (MetricUpdateResult, error) {
result := MetricUpdateResult{
series: make([][]prom.Series, 0),
converters: make([]SeriesConverter, 0),
}

startTime := pmodel.Now().Add(-1 * l.lookback)

// these can take a while on large clusters, so launch in parallel
// and don't duplicate
selectors := make(map[prom.Selector]struct{})
selectorSeriesChan := make(chan selectorSeries, len(l.converters))
errs := make(chan error, len(l.converters))
for _, converter := range l.converters {
sel := converter.Selector()
if _, ok := selectors[sel]; ok {
errs <- nil
selectorSeriesChan <- selectorSeries{}
continue
}
selectors[sel] = struct{}{}
go func() {
series, err := l.promClient.Series(context.TODO(), pmodel.Interval{startTime, 0}, sel)
if err != nil {
errs <- fmt.Errorf("unable to fetch metrics for query %q: %v", sel, err)
return
}
errs <- nil
// Push into the channel: "this selector produced these series"
selectorSeriesChan <- selectorSeries{
selector: sel,
series: series,
}
}()
}

// don't do duplicate queries when it's just the matchers that change
seriesCacheByQuery := make(map[prom.Selector][]prom.Series)

// iterate through, blocking until we've got all results
// We know that, from above, we should have pushed one item into the channel
// for each converter. So here, we'll assume that we should receive one item per converter.
for range l.converters {
if err := <-errs; err != nil {
return result, fmt.Errorf("unable to update list of all metrics: %v", err)
}
// Receive from the channel: "this selector produced these series"
// We stuff that into this map so that we can collect the data as it arrives
// and then, once we've received it all, we can process it below.
if ss := <-selectorSeriesChan; ss.series != nil {
seriesCacheByQuery[ss.selector] = ss.series
}
}
close(errs)

// Now that we've collected all of the results into `seriesCacheByQuery`
// we can start processing them.
newSeries := make([][]prom.Series, len(l.converters))
for i, converter := range l.converters {
series, cached := seriesCacheByQuery[converter.Selector()]
if !cached {
return result, fmt.Errorf("unable to update list of all metrics: no metrics retrieved for query %q", converter.Selector())
}
// Because converters provide a "post-filtering" option, it's not enough to
// simply take all the series that were produced. We need to further filter them.
newSeries[i] = converter.SeriesFilterer().FilterSeries(series)
}

glog.V(10).Infof("Set available metric list from Prometheus to: %v", newSeries)

result.series = newSeries
result.converters = l.converters
return result, nil
}

// MetricUpdateResult represents the output of a periodic inspection of metrics found to be
// available in Prometheus.
// It includes both the series data the Prometheus exposed, as well as the configurational
// object that led to their discovery.
type MetricUpdateResult struct {
series [][]prom.Series
converters []SeriesConverter
}

// MetricUpdateCallback is a function signature for receiving periodic updates about
// available metrics.
type MetricUpdateCallback func(MetricUpdateResult)
27 changes: 27 additions & 0 deletions pkg/external-provider/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package provider

import "errors"

// NewOperatorNotSupportedByPrometheusError creates an error that represents the fact that we were requested to service a query that
// Prometheus would be unable to support.
func NewOperatorNotSupportedByPrometheusError() error {
return errors.New("operator not supported by prometheus")
}

// NewOperatorRequiresValuesError creates an error that represents the fact that we were requested to service a query
// that was malformed in its operator/value combination.
func NewOperatorRequiresValuesError() error {
return errors.New("operator requires values")
}

// NewOperatorDoesNotSupportValuesError creates an error that represents the fact that we were requested to service a query
// that was malformed in its operator/value combination.
func NewOperatorDoesNotSupportValuesError() error {
return errors.New("operator does not support values")
}

// NewLabelNotSpecifiedError creates an error that represents the fact that we were requested to service a query
// that was malformed in its label specification.
func NewLabelNotSpecifiedError() error {
return errors.New("label not specified")
}
109 changes: 109 additions & 0 deletions pkg/external-provider/external_series_registry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
package provider

import (
"sync"

"github.com/golang/glog"
"github.com/kubernetes-incubator/custom-metrics-apiserver/pkg/provider"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"

prom "github.com/directxman12/k8s-prometheus-adapter/pkg/client"
)

// ExternalSeriesRegistry acts as the top-level converter for transforming Kubernetes requests
// for external metrics into Prometheus queries.
type ExternalSeriesRegistry interface {
// ListAllMetrics lists all metrics known to this registry
ListAllMetrics() []provider.ExternalMetricInfo
QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (prom.Selector, bool, error)
}

// overridableSeriesRegistry is a basic SeriesRegistry
type externalSeriesRegistry struct {
mu sync.RWMutex

// metrics is the list of all known metrics, ready to return from the API
metrics []provider.ExternalMetricInfo
// rawMetrics is a lookup from a metric to SeriesConverter for the sake of generating queries
rawMetrics map[string]SeriesConverter

mapper apimeta.RESTMapper

metricLister MetricListerWithNotification
}

// NewExternalSeriesRegistry creates an ExternalSeriesRegistry driven by the data from the provided MetricLister.
func NewExternalSeriesRegistry(lister MetricListerWithNotification, mapper apimeta.RESTMapper) ExternalSeriesRegistry {
var registry = externalSeriesRegistry{
mapper: mapper,
metricLister: lister,
metrics: make([]provider.ExternalMetricInfo, 0),
rawMetrics: map[string]SeriesConverter{},
}

lister.AddNotificationReceiver(registry.filterAndStoreMetrics)

return &registry
}

func (r *externalSeriesRegistry) filterAndStoreMetrics(result MetricUpdateResult) {

newSeriesSlices := result.series
converters := result.converters

if len(newSeriesSlices) != len(converters) {
glog.Errorf("need one set of series per converter")
}
apiMetricsCache := make([]provider.ExternalMetricInfo, 0)
rawMetricsCache := make(map[string]SeriesConverter)

for i, newSeries := range newSeriesSlices {
converter := converters[i]
for _, series := range newSeries {
identity, err := converter.IdentifySeries(series)

if err != nil {
glog.Errorf("unable to name series %q, skipping: %v", series.String(), err)
continue
}

name := identity.name
rawMetricsCache[name] = converter
}
}

for metricName := range rawMetricsCache {
apiMetricsCache = append(apiMetricsCache, provider.ExternalMetricInfo{
Metric: metricName,
})
}

r.mu.Lock()
defer r.mu.Unlock()

r.metrics = apiMetricsCache
r.rawMetrics = rawMetricsCache
}

func (r *externalSeriesRegistry) ListAllMetrics() []provider.ExternalMetricInfo {
r.mu.RLock()
defer r.mu.RUnlock()

return r.metrics
}

func (r *externalSeriesRegistry) QueryForMetric(namespace string, metricName string, metricSelector labels.Selector) (prom.Selector, bool, error) {
r.mu.RLock()
defer r.mu.RUnlock()

converter, found := r.rawMetrics[metricName]

if !found {
glog.V(10).Infof("external metric %q not found", metricName)
return "", false, nil
}

query, err := converter.QueryForExternalSeries(namespace, metricName, metricSelector)
return query, found, err
}
Loading

0 comments on commit c2e176b

Please sign in to comment.