Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache metric names provided by KEDA Metrics Server #2279

Merged
merged 7 commits into from
Nov 23, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
- Improve Redis Scaler, upgrade library, add username and Sentinel support ([#2181](https://github.com/kedacore/keda/pull/2181))
- Add GCP identity authentication when using Pubsub Scaler ([#2225](https://github.com/kedacore/keda/pull/2225))
- Add ScalersCache to reuse scalers unless they need changing ([#2187](https://github.com/kedacore/keda/pull/2187))
- Cache metric names provided by KEDA Metrics Server ([#2279](https://github.com/kedacore/keda/pull/2279))

### Improvements

Expand Down
14 changes: 10 additions & 4 deletions adapter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"os"
"runtime"
"strconv"
"sync"
"time"

"github.com/go-logr/logr"
Expand Down Expand Up @@ -103,6 +104,8 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme, corev1.EventSource{Component: "keda-metrics-adapter"})
handler := scaling.NewScaleHandler(kubeclient, nil, scheme, globalHTTPTimeout, recorder)
externalMetricsInfo := &[]provider.ExternalMetricInfo{}
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
externalMetricsInfoLock := &sync.RWMutex{}

namespace, err := getWatchNamespace()
if err != nil {
Expand All @@ -113,14 +116,14 @@ func (a *Adapter) makeProvider(ctx context.Context, globalHTTPTimeout time.Durat
prometheusServer := &prommetrics.PrometheusMetricServer{}
go func() { prometheusServer.NewServer(fmt.Sprintf(":%v", prometheusMetricsPort), prometheusMetricsPath) }()
stopCh := make(chan struct{})
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, stopCh); err != nil {
if err := runScaledObjectController(ctx, scheme, namespace, handler, logger, externalMetricsInfo, externalMetricsInfoLock, stopCh); err != nil {
return nil, nil, err
}

return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace), stopCh, nil
return kedaprovider.NewProvider(ctx, logger, handler, kubeclient, namespace, externalMetricsInfo, externalMetricsInfoLock), stopCh, nil
}

func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, stopCh chan<- struct{}) error {
func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, namespace string, scaleHandler scaling.ScaleHandler, logger logr.Logger, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex, stopCh chan<- struct{}) error {
mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Namespace: namespace,
Expand All @@ -130,7 +133,10 @@ func runScaledObjectController(ctx context.Context, scheme *k8sruntime.Scheme, n
}

if err := (&kedacontrollers.MetricsScaledObjectReconciler{
ScaleHandler: scaleHandler,
Client: mgr.GetClient(),
ScaleHandler: scaleHandler,
ExternalMetricsInfo: externalMetricsInfo,
ExternalMetricsInfoLock: externalMetricsInfoLock,
}).SetupWithManager(mgr, controller.Options{}); err != nil {
return err
}
Expand Down
98 changes: 95 additions & 3 deletions controllers/keda/metrics_adapter_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,68 @@ package keda

import (
"context"
"sync"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scaling"
"k8s.io/apimachinery/pkg/api/errors"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/scaling"
)

type MetricsScaledObjectReconciler struct {
ScaleHandler scaling.ScaleHandler
Client client.Client
ScaleHandler scaling.ScaleHandler
ExternalMetricsInfo *[]provider.ExternalMetricInfo
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
ExternalMetricsInfoLock *sync.RWMutex
}

var (
scaledObjectsMetrics = map[string][]string{}
scaledObjectsMetricsLock = &sync.Mutex{}
)

func (r *MetricsScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLogger := log.FromContext(ctx)

// Fetch the ScaledObject instance
scaledObject := &kedav1alpha1.ScaledObject{}
err := r.Client.Get(ctx, req.NamespacedName, scaledObject)
if err != nil {
if errors.IsNotFound(err) {
// Request object not found, could have been deleted after reconcile request.
// Owned objects are automatically garbage collected. For additional cleanup logic use finalizers.
// Return and don't requeue

r.removeFromCache(req.NamespacedName.String())
return ctrl.Result{}, nil
}
// Error reading the object - requeue the request.
reqLogger.Error(err, "Failed to get ScaledObject")
return ctrl.Result{}, err
}

// Check if the ScaledObject instance is marked to be deleted, which is
// indicated by the deletion timestamp being set.
if scaledObject.GetDeletionTimestamp() != nil {
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
r.removeFromCache(req.NamespacedName.String())
return ctrl.Result{}, nil
}

reqLogger.V(1).Info("Reconciling ScaledObject", "externalMetricNames", scaledObject.Status.ExternalMetricNames)

// The ScaledObject hasn't yet been properly initialized and ExternalMetricsNames list popoluted => requeue
if scaledObject.Status.ExternalMetricNames == nil || len(scaledObject.Status.ExternalMetricNames) < 1 {
return ctrl.Result{Requeue: true}, nil
}

r.addToMetricsCache(req.NamespacedName.String(), scaledObject.Status.ExternalMetricNames)
r.ScaleHandler.ClearScalersCache(ctx, req.Name, req.Namespace)
return ctrl.Result{}, nil
}
Expand All @@ -43,3 +91,47 @@ func (r *MetricsScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager, optio
WithOptions(options).
Complete(r)
}

func (r *MetricsScaledObjectReconciler) addToMetricsCache(namespacedName string, metrics []string) {
scaledObjectsMetricsLock.Lock()
defer scaledObjectsMetricsLock.Unlock()
scaledObjectsMetrics[namespacedName] = metrics
extMetrics := populateExternalMetrics(scaledObjectsMetrics)

r.ExternalMetricsInfoLock.Lock()
(*r.ExternalMetricsInfo) = extMetrics
r.ExternalMetricsInfoLock.Unlock()
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *MetricsScaledObjectReconciler) removeFromCache(namespacedName string) {
scaledObjectsMetricsLock.Lock()
defer scaledObjectsMetricsLock.Unlock()
delete(scaledObjectsMetrics, namespacedName)
extMetrics := populateExternalMetrics(scaledObjectsMetrics)

// the metric could have been already removed by the previous call
// in this case we don't have to rewrite r.ExternalMetricsInfo
changed := false
r.ExternalMetricsInfoLock.RLock()
if len(*r.ExternalMetricsInfo) != len(extMetrics) {
changed = true
}
r.ExternalMetricsInfoLock.RUnlock()

if changed {
r.ExternalMetricsInfoLock.Lock()
(*r.ExternalMetricsInfo) = extMetrics
r.ExternalMetricsInfoLock.Unlock()
}
}

func populateExternalMetrics(scaledObjectsMetrics map[string][]string) []provider.ExternalMetricInfo {
externalMetrics := []provider.ExternalMetricInfo{}
zroubalik marked this conversation as resolved.
Show resolved Hide resolved
for _, metrics := range scaledObjectsMetrics {
for _, m := range metrics {
externalMetrics = append(externalMetrics, provider.ExternalMetricInfo{Metric: m})
}
}

return externalMetrics
}
3 changes: 0 additions & 3 deletions pkg/provider/fallback_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/metrics/pkg/apis/external_metrics"
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
"sigs.k8s.io/custom-metrics-apiserver/pkg/provider"

kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1"
"github.com/kedacore/keda/v2/pkg/mock/mock_client"
Expand Down Expand Up @@ -64,8 +63,6 @@ var _ = Describe("fallback", func() {
scaleHandler = mock_scaling.NewMockScaleHandler(ctrl)
client = mock_client.NewMockClient(ctrl)
providerUnderTest = &KedaProvider{
values: make(map[provider.CustomMetricInfo]int64),
externalMetrics: make([]externalMetric, 2, 10),
client: client,
scaleHandler: scaleHandler,
watchedNamespace: "",
Expand Down
62 changes: 25 additions & 37 deletions pkg/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/go-logr/logr"
apiErrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -37,28 +38,28 @@ import (

// KedaProvider implements External Metrics Provider
type KedaProvider struct {
client client.Client
values map[provider.CustomMetricInfo]int64
externalMetrics []externalMetric
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context
client client.Client
scaleHandler scaling.ScaleHandler
watchedNamespace string
ctx context.Context
externalMetricsInfo *[]provider.ExternalMetricInfo
externalMetricsInfoLock *sync.RWMutex
}

type externalMetric struct{}

var logger logr.Logger
var metricsServer prommetrics.PrometheusMetricServer
var (
logger logr.Logger
metricsServer prommetrics.PrometheusMetricServer
)

// NewProvider returns an instance of KedaProvider
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string) provider.MetricsProvider {
func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler scaling.ScaleHandler, client client.Client, watchedNamespace string, externalMetricsInfo *[]provider.ExternalMetricInfo, externalMetricsInfoLock *sync.RWMutex) provider.MetricsProvider {
provider := &KedaProvider{
values: make(map[provider.CustomMetricInfo]int64),
externalMetrics: make([]externalMetric, 2, 10),
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
client: client,
scaleHandler: scaleHandler,
watchedNamespace: watchedNamespace,
ctx: ctx,
externalMetricsInfo: externalMetricsInfo,
externalMetricsInfoLock: externalMetricsInfoLock,
}
logger = adapterLogger.WithName("provider")
logger.Info("starting")
Expand All @@ -71,9 +72,9 @@ func NewProvider(ctx context.Context, adapterLogger logr.Logger, scaleHandler sc
// Namespace can be used by the implementation for metric identification, access control or ignored.
func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string, metricSelector labels.Selector, info provider.ExternalMetricInfo) (*external_metrics.ExternalMetricValueList, error) {
// Note:
// metric name and namespace is used to lookup for the CRD which contains configuration to call azure
// metric name and namespace is used to lookup for the CRD which contains configuration
// if not found then ignored and label selector is parsed for all the metrics
logger.V(1).Info("KEDA provider received request for external metrics", "namespace", namespace, "metric name", info.Metric, "metricSelector", metricSelector.String())
logger.V(1).Info("KEDA Metrics Server received request for external metrics", "namespace", namespace, "metric name", info.Metric, "metricSelector", metricSelector.String())
selector, err := labels.ConvertSelectorToLabelsMap(metricSelector.String())
if err != nil {
logger.Error(err, "Error converting Selector to Labels Map")
Expand All @@ -90,7 +91,7 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,
if err != nil {
return nil, err
} else if len(scaledObjects.Items) != 1 {
return nil, fmt.Errorf("exactly one scaled object should match label %s", metricSelector.String())
return nil, fmt.Errorf("exactly one ScaledObject should match label %s", metricSelector.String())
}

scaledObject := &scaledObjects.Items[0]
Expand Down Expand Up @@ -144,25 +145,12 @@ func (p *KedaProvider) GetExternalMetric(ctx context.Context, namespace string,

// ListAllExternalMetrics returns the supported external metrics for this provider
func (p *KedaProvider) ListAllExternalMetrics() []provider.ExternalMetricInfo {
externalMetricsInfo := []provider.ExternalMetricInfo{}
logger.V(1).Info("KEDA Metrics Server received request for list fo all provided external metrics names")
zroubalik marked this conversation as resolved.
Show resolved Hide resolved

// get all ScaledObjects in namespace(s) watched by the operator
scaledObjects := &kedav1alpha1.ScaledObjectList{}
opts := []client.ListOption{
client.InNamespace(p.watchedNamespace),
}
err := p.client.List(p.ctx, scaledObjects, opts...)
if err != nil {
logger.Error(err, "Cannot get list of ScaledObjects", "WatchedNamespace", p.watchedNamespace)
return nil
}
p.externalMetricsInfoLock.RLock()
externalMetricsInfo := *p.externalMetricsInfo
p.externalMetricsInfoLock.RUnlock()

// get metrics from all watched ScaledObjects
for _, scaledObject := range scaledObjects.Items {
for _, metric := range scaledObject.Status.ExternalMetricNames {
externalMetricsInfo = append(externalMetricsInfo, provider.ExternalMetricInfo{Metric: metric})
}
}
return externalMetricsInfo
}

Expand Down