diff --git a/.golangci.yml b/.golangci.yml index d11a00f13b2..08e8687a1dd 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -64,6 +64,11 @@ issues: - path: scale_resolvers_test.go linters: - staticcheck + # Got "sigs.k8s.io/controller-runtime/pkg/client/fake is deprecated: please use pkg/envtest for testing" + # This might not be ideal, see: https://github.com/kubernetes-sigs/controller-runtime/issues/768 + - path: kubernetes_workload_scaler_test.go + linters: + - staticcheck # https://github.com/go-critic/go-critic/issues/926 - linters: - gocritic diff --git a/CHANGELOG.md b/CHANGELOG.md index 9e9019efc84..3a8ac58cd93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -31,6 +31,7 @@ - Add new scaler for Selenium Grid ([#1971](https://github.com/kedacore/keda/pull/1971)) - Support using regex to select the queues in RabbitMQ Scaler ([#1957](https://github.com/kedacore/keda/pull/1957)) - Support custom metric name in RabbitMQ Scaler ([#1976](https://github.com/kedacore/keda/pull/1976)) +- Add new scaler for Kubernetes Workload ([#2010](https://github.com/kedacore/keda/pull/2010)) ### Improvements diff --git a/pkg/scalers/kubernetes_workload_scaler.go b/pkg/scalers/kubernetes_workload_scaler.go new file mode 100644 index 00000000000..d4b6a8c634c --- /dev/null +++ b/pkg/scalers/kubernetes_workload_scaler.go @@ -0,0 +1,138 @@ +package scalers + +import ( + "context" + "fmt" + "strconv" + "strings" + + "k8s.io/api/autoscaling/v2beta2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/metrics/pkg/apis/external_metrics" + "sigs.k8s.io/controller-runtime/pkg/client" + + kedautil "github.com/kedacore/keda/v2/pkg/util" +) + +type kubernetesWorkloadScaler struct { + metadata *kubernetesWorkloadMetadata + kubeClient client.Client +} + +const ( + kubernetesWorkloadMetricType = "External" + podSelectorKey = "podSelector" + valueKey = "value" +) + +type kubernetesWorkloadMetadata struct { + podSelector labels.Selector + namespace string + value int64 +} + +// NewKubernetesWorkloadScaler creates a new kubernetesWorkloadScaler +func NewKubernetesWorkloadScaler(kubeClient client.Client, config *ScalerConfig) (Scaler, error) { + meta, parseErr := parseWorkloadMetadata(config) + if parseErr != nil { + return nil, fmt.Errorf("error parsing kubernetes workload metadata: %s", parseErr) + } + + return &kubernetesWorkloadScaler{ + metadata: meta, + kubeClient: kubeClient, + }, nil +} + +func parseWorkloadMetadata(config *ScalerConfig) (*kubernetesWorkloadMetadata, error) { + meta := &kubernetesWorkloadMetadata{} + var err error + meta.namespace = config.Namespace + meta.podSelector, err = labels.Parse(config.TriggerMetadata[podSelectorKey]) + if err != nil || meta.podSelector.String() == "" { + return nil, fmt.Errorf("invalid pod selector") + } + meta.value, err = strconv.ParseInt(config.TriggerMetadata[valueKey], 10, 64) + if err != nil || meta.value == 0 { + return nil, fmt.Errorf("value must be an integer greater than 0") + } + return meta, nil +} + +// IsActive determines if we need to scale from zero +func (s *kubernetesWorkloadScaler) IsActive(ctx context.Context) (bool, error) { + pods, err := s.getMetricValue(ctx) + + if err != nil { + return false, err + } + + return pods > 0, nil +} + +// Close no need for kubernetes workload scaler +func (s *kubernetesWorkloadScaler) Close() error { + return nil +} + +// GetMetricSpecForScaling returns the metric spec for the HPA +func (s *kubernetesWorkloadScaler) GetMetricSpecForScaling() []v2beta2.MetricSpec { + targetMetricValue := resource.NewQuantity(s.metadata.value, resource.DecimalSI) + externalMetric := &v2beta2.ExternalMetricSource{ + Metric: v2beta2.MetricIdentifier{ + Name: kedautil.NormalizeString(fmt.Sprintf("%s-%s-%s", "workload", s.metadata.namespace, normalizeSelectorString(s.metadata.podSelector))), + }, + Target: v2beta2.MetricTarget{ + Type: v2beta2.AverageValueMetricType, + AverageValue: targetMetricValue, + }, + } + metricSpec := v2beta2.MetricSpec{External: externalMetric, Type: kubernetesWorkloadMetricType} + return []v2beta2.MetricSpec{metricSpec} +} + +// GetMetrics returns value for a supported metric +func (s *kubernetesWorkloadScaler) GetMetrics(ctx context.Context, metricName string, metricSelector labels.Selector) ([]external_metrics.ExternalMetricValue, error) { + pods, err := s.getMetricValue(ctx) + if err != nil { + return []external_metrics.ExternalMetricValue{}, fmt.Errorf("error inspecting kubernetes workload: %s", err) + } + + metric := external_metrics.ExternalMetricValue{ + MetricName: metricName, + Value: *resource.NewQuantity(int64(pods), resource.DecimalSI), + Timestamp: metav1.Now(), + } + + return append([]external_metrics.ExternalMetricValue{}, metric), nil +} + +func (s *kubernetesWorkloadScaler) getMetricValue(ctx context.Context) (int, error) { + podList := &corev1.PodList{} + listOptions := client.ListOptions{} + listOptions.LabelSelector = s.metadata.podSelector + listOptions.Namespace = s.metadata.namespace + opts := []client.ListOption{ + &listOptions, + } + + err := s.kubeClient.List(ctx, podList, opts...) + if err != nil { + return 0, err + } + + return len(podList.Items), nil +} + +func normalizeSelectorString(selector labels.Selector) string { + s := selector.String() + s = strings.ReplaceAll(s, " ", "") + s = strings.ReplaceAll(s, "(", "-") + s = strings.ReplaceAll(s, ")", "-") + s = strings.ReplaceAll(s, ",", "-") + s = strings.ReplaceAll(s, "!", "-") + return s +} diff --git a/pkg/scalers/kubernetes_workload_scaler_test.go b/pkg/scalers/kubernetes_workload_scaler_test.go new file mode 100644 index 00000000000..e8c5ed750f8 --- /dev/null +++ b/pkg/scalers/kubernetes_workload_scaler_test.go @@ -0,0 +1,138 @@ +package scalers + +import ( + "context" + "fmt" + "testing" + "time" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +type workloadMetadataTestData struct { + metadata map[string]string + namespace string + isError bool +} + +var parseWorkloadMetadataTestDataset = []workloadMetadataTestData{ + {map[string]string{"value": "1", "podSelector": "app=demo"}, "test", false}, + {map[string]string{"value": "1", "podSelector": "app=demo"}, "default", false}, + {map[string]string{"value": "1", "podSelector": "app in (demo1, demo2)"}, "test", false}, + {map[string]string{"value": "1", "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)"}, "test", false}, + {map[string]string{"podSelector": "app=demo"}, "test", true}, + {map[string]string{"podSelector": "app=demo"}, "default", true}, + {map[string]string{"value": "1"}, "test", true}, + {map[string]string{"value": "1"}, "default", true}, + {map[string]string{"value": "a", "podSelector": "app=demo"}, "test", true}, + {map[string]string{"value": "a", "podSelector": "app=demo"}, "default", true}, + {map[string]string{"value": "0", "podSelector": "app=demo"}, "test", true}, + {map[string]string{"value": "0", "podSelector": "app=demo"}, "default", true}, +} + +func TestParseWorkloadMetadata(t *testing.T) { + for _, testData := range parseWorkloadMetadataTestDataset { + _, err := parseWorkloadMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, Namespace: testData.namespace}) + if err != nil && !testData.isError { + t.Error("Expected success but got error", err) + } + if testData.isError && err == nil { + t.Error("Expected error but got success") + } + } +} + +type workloadIsActiveTestData struct { + metadata map[string]string + namespace string + podCount int + active bool +} + +var isActiveWorkloadTestDataset = []workloadIsActiveTestData{ + // "podSelector": "app=demo", "namespace": "test" + {parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 0, false}, + {parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 1, false}, + {parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, 15, false}, + // "podSelector": "app=demo", "namespace": "default" + {parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 0, false}, + {parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 1, true}, + {parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, 15, true}, +} + +func TestWorkloadIsActive(t *testing.T) { + for _, testData := range isActiveWorkloadTestDataset { + s, _ := NewKubernetesWorkloadScaler( + fake.NewFakeClient(createPodlist(testData.podCount)), + &ScalerConfig{ + TriggerMetadata: testData.metadata, + AuthParams: map[string]string{}, + GlobalHTTPTimeout: 1000 * time.Millisecond, + Namespace: testData.namespace, + }, + ) + isActive, _ := s.IsActive(context.TODO()) + if testData.active && !isActive { + t.Error("Expected active but got inactive") + } + if !testData.active && isActive { + t.Error("Expected inactive but got active") + } + } +} + +type workloadGetMetricSpecForScalingTestData struct { + metadata map[string]string + namespace string + name string +} + +var getMetricSpecForScalingTestDataset = []workloadGetMetricSpecForScalingTestData{ + // "podSelector": "app=demo", "namespace": "test" + {parseWorkloadMetadataTestDataset[0].metadata, parseWorkloadMetadataTestDataset[0].namespace, "workload-test-app=demo"}, + // "podSelector": "app=demo", "namespace": "default" + {parseWorkloadMetadataTestDataset[1].metadata, parseWorkloadMetadataTestDataset[1].namespace, "workload-default-app=demo"}, + // "podSelector": "app in (demo1, demo2)", "namespace": "test" + {parseWorkloadMetadataTestDataset[2].metadata, parseWorkloadMetadataTestDataset[2].namespace, "workload-test-appin-demo1-demo2-"}, + // "podSelector": "app in (demo1, demo2),deploy in (deploy1, deploy2)", "namespace": "test" + {parseWorkloadMetadataTestDataset[3].metadata, parseWorkloadMetadataTestDataset[3].namespace, "workload-test-appin-demo1-demo2--deployin-deploy1-deploy2-"}, +} + +func TestWorkloadGetMetricSpecForScaling(t *testing.T) { + for _, testData := range getMetricSpecForScalingTestDataset { + s, _ := NewKubernetesWorkloadScaler( + fake.NewFakeClient(), + &ScalerConfig{ + TriggerMetadata: testData.metadata, + AuthParams: map[string]string{}, + GlobalHTTPTimeout: 1000 * time.Millisecond, + Namespace: testData.namespace, + }, + ) + metric := s.GetMetricSpecForScaling() + + if metric[0].External.Metric.Name != testData.name { + t.Errorf("Expected '%s' as metric name and got '%s'", testData.name, metric[0].External.Metric.Name) + } + } +} + +func createPodlist(count int) *v1.PodList { + list := &v1.PodList{} + for i := 0; i < count; i++ { + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("demo-pod-v%d", i), + Namespace: "default", + Annotations: map[string]string{}, + Labels: map[string]string{ + "app": "demo", + }, + }, + } + list.Items = append(list.Items, *pod) + } + return list +} diff --git a/pkg/scaling/scale_handler.go b/pkg/scaling/scale_handler.go index 36b4f53acf1..8216344a7ad 100644 --- a/pkg/scaling/scale_handler.go +++ b/pkg/scaling/scale_handler.go @@ -373,7 +373,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod return []scalers.Scaler{}, err } - scaler, err := buildScaler(trigger.Type, config) + scaler, err := buildScaler(h.client, trigger.Type, config) if err != nil { closeScalers(scalersRes) h.recorder.Event(withTriggers, corev1.EventTypeWarning, eventreason.KEDAScalerFailed, err.Error()) @@ -386,7 +386,7 @@ func (h *scaleHandler) buildScalers(withTriggers *kedav1alpha1.WithTriggers, pod return scalersRes, nil } -func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { +func buildScaler(client client.Client, triggerType string, config *scalers.ScalerConfig) (scalers.Scaler, error) { // TRIGGERS-START switch triggerType { case "artemis-queue": @@ -429,6 +429,8 @@ func buildScaler(triggerType string, config *scalers.ScalerConfig) (scalers.Scal return scalers.NewInfluxDBScaler(config) case "kafka": return scalers.NewKafkaScaler(config) + case "kubernetes-workload": + return scalers.NewKubernetesWorkloadScaler(client, config) case "liiklus": return scalers.NewLiiklusScaler(config) case "memory": diff --git a/tests/scalers/kubernetes-workload.test.ts b/tests/scalers/kubernetes-workload.test.ts new file mode 100644 index 00000000000..5cb77c876d1 --- /dev/null +++ b/tests/scalers/kubernetes-workload.test.ts @@ -0,0 +1,134 @@ +import * as fs from 'fs' +import * as sh from 'shelljs' +import * as tmp from 'tmp' +import test from 'ava' +import {waitForDeploymentReplicaCount} from "./helpers"; + +const testNamespace = 'kubernetes-workload-test' +const monitoredDeploymentFile = tmp.fileSync() +const sutDeploymentFile = tmp.fileSync() + +test.before(t => { + sh.config.silent = true + sh.exec(`kubectl create namespace ${testNamespace}`) + + fs.writeFileSync(monitoredDeploymentFile.name, monitoredDeploymentYaml) + t.is( + 0, + sh.exec(`kubectl apply -f ${monitoredDeploymentFile.name} --namespace ${testNamespace}`).code, + 'Deploying monitored deployment should work.' + ) + + fs.writeFileSync(sutDeploymentFile.name, sutDeploymentYaml) + t.is( + 0, + sh.exec(`kubectl apply -f ${sutDeploymentFile.name} --namespace ${testNamespace}`).code, + 'Deploying monitored deployment should work.' + ) +}) + +test.serial('Deployment should have 0 replicas on start', t => { + const replicaCount = sh.exec( + `kubectl get deployment.apps/sut-deployment --namespace ${testNamespace} -o jsonpath="{.spec.replicas}"` + ).stdout + t.is(replicaCount, '0', 'replica count should start out as 0') +}) + +test.serial(`Deployment should scale to fit the amount of pods which match the selector`, async t => { + + sh.exec( + `kubectl scale deployment.apps/monitored-deployment --namespace ${testNamespace} --replicas=5` + ) + t.true(await waitForDeploymentReplicaCount(5, 'sut-deployment', testNamespace, 6, 10000), 'Replica count should be 5 after 60 seconds') + + sh.exec( + `kubectl scale deployment.apps/monitored-deployment --namespace ${testNamespace} --replicas=10` + ) + t.true(await waitForDeploymentReplicaCount(10, 'sut-deployment', testNamespace, 6, 10000), 'Replica count should be 10 after 60 seconds') + + sh.exec( + `kubectl scale deployment.apps/monitored-deployment --namespace ${testNamespace} --replicas=5` + ) + t.true(await waitForDeploymentReplicaCount(5, 'sut-deployment', testNamespace, 6, 10000), 'Replica count should be 5 after 60 seconds') + + sh.exec( + `kubectl scale deployment.apps/monitored-deployment --namespace ${testNamespace} --replicas=0` + ) + t.true(await waitForDeploymentReplicaCount(0, 'sut-deployment', testNamespace, 6, 10000), 'Replica count should be 0 after 60 seconds') +}) + +test.after.always.cb('clean up workload test related deployments', t => { + const resources = [ + 'scaledobject.keda.sh/sut-scaledobject', + 'deployment.apps/sut-deployment', + 'deployment.apps/monitored-deployment', + ] + + for (const resource of resources) { + sh.exec(`kubectl delete ${resource} --namespace ${testNamespace}`) + } + sh.exec(`kubectl delete namespace ${testNamespace}`) + t.end() +}) + +const monitoredDeploymentYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: monitored-deployment + labels: + deploy: workload-test +spec: + replicas: 0 + selector: + matchLabels: + pod: workload-test + template: + metadata: + labels: + pod: workload-test + spec: + containers: + - name: nginx + image: 'nginx'` + +const sutDeploymentYaml = `apiVersion: apps/v1 +kind: Deployment +metadata: + name: sut-deployment + labels: + deploy: workload-sut +spec: + replicas: 0 + selector: + matchLabels: + pod: workload-sut + template: + metadata: + labels: + pod: workload-sut + spec: + containers: + - name: nginx + image: 'nginx' +--- +apiVersion: keda.sh/v1alpha1 +kind: ScaledObject +metadata: + name: sut-scaledobject +spec: + scaleTargetRef: + name: sut-deployment + pollingInterval: 5 + cooldownPeriod: 5 + minReplicaCount: 0 + maxReplicaCount: 10 + advanced: + horizontalPodAutoscalerConfig: + behavior: + scaleDown: + stabilizationWindowSeconds: 15 + triggers: + - type: kubernetes-workload + metadata: + podSelector: 'pod=workload-test' + value: '1'`