diff --git a/.github/workflows/e2e-tests.yaml b/.github/workflows/e2e-tests.yaml index 83355110d..90e6f7475 100644 --- a/.github/workflows/e2e-tests.yaml +++ b/.github/workflows/e2e-tests.yaml @@ -51,7 +51,6 @@ jobs: - name: Show Kubernetes version run: | kubectl version - - name: Run e2e test run: | make e2e-test diff --git a/CHANGELOG.md b/CHANGELOG.md index cf1b4d96c..cb41b50e2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -19,6 +19,8 @@ This changelog keeps track of work items that have been completed and are ready ### New +- **General**: Add multi-host support to `HTTPScaledObject` ([#552](https://github.com/kedacore/http-add-on/issues/552)) + ### Improvements - **General**: Automatically tag Docker image with commit SHA ([#567](https://github.com/kedacore/http-add-on/issues/567)) @@ -36,7 +38,7 @@ You can find all deprecations in [this overview](https://github.com/kedacore/htt New deprecation(s): -- TODO +- **General**: `host` field deprecated in favor of `hosts` in `HTTPScaledObject` ([#552](https://github.com/kedacore/http-add-on/issues/552)) Previously announced deprecation(s): diff --git a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml index 6b01f0bc2..447146e6f 100644 --- a/config/crd/bases/http.keda.sh_httpscaledobjects.yaml +++ b/config/crd/bases/http.keda.sh_httpscaledobjects.yaml @@ -60,10 +60,19 @@ spec: description: HTTPScaledObjectSpec defines the desired state of HTTPScaledObject properties: host: - description: The host to route. All requests with this host in the - "Host" header will be routed to the Service and Port specified in - the scaleTargetRef + description: (optional) (deprecated) The host to route. All requests + with these hosts in the "Host" header will be routed to the Service + and Port specified in the scaleTargetRef. The host field is mutually + exclusive of the hosts field type: string + hosts: + description: (optional) The hosts to route. All requests with these + hosts in the "Host" header will be routed to the Service and Port + specified in the scaleTargetRef. The hosts field is mutually exclusive + of the host field. + items: + type: string + type: array replicas: description: (optional) Replica information properties: @@ -107,7 +116,6 @@ spec: format: int32 type: integer required: - - host - scaleTargetRef type: object status: diff --git a/examples/xkcd/templates/httpscaledobject.yaml b/examples/xkcd/templates/httpscaledobject.yaml index 613280ee3..8a4ca4e0a 100644 --- a/examples/xkcd/templates/httpscaledobject.yaml +++ b/examples/xkcd/templates/httpscaledobject.yaml @@ -3,7 +3,10 @@ apiVersion: http.keda.sh/v1alpha1 metadata: name: {{ include "xkcd.fullname" . }} spec: - host: {{ .Values.host }} + {{- with .Values.hosts }} + hosts: + {{- toYaml . | nindent 8 }} + {{- end }} targetPendingRequests: {{ .Values.targetPendingRequests }} scaleTargetRef: deployment: {{ include "xkcd.fullname" . }} diff --git a/examples/xkcd/templates/ingress.yaml b/examples/xkcd/templates/ingress.yaml index 97e564bbf..0e6f6a8ae 100644 --- a/examples/xkcd/templates/ingress.yaml +++ b/examples/xkcd/templates/ingress.yaml @@ -8,7 +8,8 @@ metadata: kubernetes.io/ingress.class: nginx spec: rules: - - host: {{ .Values.host }} + {{- range .Values.hosts }} + - host: {{ . | toString }} http: paths: - path: / @@ -18,3 +19,4 @@ spec: name: keda-add-ons-http-interceptor-proxy port: number: 8080 + {{- end }} diff --git a/examples/xkcd/values.yaml b/examples/xkcd/values.yaml index 716eb9db3..4584960bd 100644 --- a/examples/xkcd/values.yaml +++ b/examples/xkcd/values.yaml @@ -1,5 +1,7 @@ replicaCount: 1 -host: myhost.com +hosts: + - "myhost.com" + - "myhost2.com" targetPendingRequests: 200 # This is the namespace that the ingress should be installed # into. It should be set to the same namespace as the diff --git a/operator/apis/http/v1alpha1/httpscaledobject_types.go b/operator/apis/http/v1alpha1/httpscaledobject_types.go index 5bc5b69a8..850e71013 100644 --- a/operator/apis/http/v1alpha1/httpscaledobject_types.go +++ b/operator/apis/http/v1alpha1/httpscaledobject_types.go @@ -40,9 +40,14 @@ type ReplicaStruct struct { // HTTPScaledObjectSpec defines the desired state of HTTPScaledObject type HTTPScaledObjectSpec struct { - // The host to route. All requests with this host in the "Host" header will - // be routed to the Service and Port specified in the scaleTargetRef - Host string `json:"host"` + // (optional) (deprecated) The host to route. All requests with these hosts in the "Host" header will + // be routed to the Service and Port specified in the scaleTargetRef. The host field is mutually exclusive of the hosts field + // +optional + Host *string `json:"host,omitempty"` + // (optional) The hosts to route. All requests with these hosts in the "Host" header will + // be routed to the Service and Port specified in the scaleTargetRef. The hosts field is mutually exclusive of the host field. + // +optional + Hosts []string `json:"hosts,omitempty"` // The name of the deployment to route HTTP requests to (and to autoscale). // Either this or Image must be set ScaleTargetRef *ScaleTargetRef `json:"scaleTargetRef"` diff --git a/operator/controllers/http/app.go b/operator/controllers/http/app.go index ef1a0f187..f10f42c04 100644 --- a/operator/controllers/http/app.go +++ b/operator/controllers/http/app.go @@ -80,18 +80,14 @@ func removeApplicationResources( v1alpha1.AppScaledObjectTerminated, )) - if err := removeAndUpdateRoutingTable( + return removeAndUpdateRoutingTable( ctx, logger, cl, routingTable, - httpso.Spec.Host, + httpso.Spec.Hosts, baseConfig.CurrentNamespace, - ); err != nil { - return err - } - - return nil + ) } func createOrUpdateApplicationResources( @@ -144,12 +140,12 @@ func createOrUpdateApplicationResources( targetPendingReqs = *tpr } - if err := addAndUpdateRoutingTable( + return addAndUpdateRoutingTable( ctx, logger, cl, routingTable, - httpso.Spec.Host, + httpso.Spec.Hosts, routing.NewTarget( httpso.GetNamespace(), httpso.Spec.ScaleTargetRef.Service, @@ -158,8 +154,5 @@ func createOrUpdateApplicationResources( targetPendingReqs, ), baseConfig.CurrentNamespace, - ); err != nil { - return err - } - return nil + ) } diff --git a/operator/controllers/http/httpscaledobject_controller.go b/operator/controllers/http/httpscaledobject_controller.go index 01381d359..3e413734a 100644 --- a/operator/controllers/http/httpscaledobject_controller.go +++ b/operator/controllers/http/httpscaledobject_controller.go @@ -18,9 +18,11 @@ package http import ( "context" + "errors" "time" - "k8s.io/apimachinery/pkg/api/errors" + "github.com/go-logr/logr" + k8serrors "k8s.io/apimachinery/pkg/api/errors" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ctrl "sigs.k8s.io/controller-runtime" @@ -61,7 +63,7 @@ func (r *HTTPScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Req httpso := &httpv1alpha1.HTTPScaledObject{} if err := r.Client.Get(ctx, req.NamespacedName, httpso); err != nil { - if errors.IsNotFound(err) { + if k8serrors.IsNotFound(err) { // If the HTTPScaledObject wasn't found, it might have // been deleted between the reconcile and the get. // It'll automatically get garbage collected, so don't @@ -108,6 +110,12 @@ func (r *HTTPScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.Req return ctrl.Result{}, err } + // ensure only host or hosts is set and if host is set that + // it is converted to hosts + if err := sanitizeHosts(logger, httpso); err != nil { + return ctrl.Result{}, err + } + // httpso is updated now logger.Info( "Reconciling HTTPScaledObject", @@ -171,3 +179,28 @@ func (r *HTTPScaledObjectReconciler) SetupWithManager(mgr ctrl.Manager) error { For(&httpv1alpha1.HTTPScaledObject{}, builder.WithPredicates(predicate.GenerationChangedPredicate{})). Complete(r) } + +// sanitize hosts by converting the host definition to hosts are erroring +// when both fields are set +func sanitizeHosts( + logger logr.Logger, + httpso *httpv1alpha1.HTTPScaledObject, +) error { + switch { + case httpso.Spec.Hosts != nil && httpso.Spec.Host != nil: + err := errors.New("mutually exclusive fields Error") + logger.Error(err, "Only one of 'hosts' or 'host' field can be defined") + return err + case httpso.Spec.Hosts == nil && httpso.Spec.Host == nil: + err := errors.New("no host specified Error") + logger.Error(err, "At least one of 'hosts' or 'host' field must be defined") + return err + case httpso.Spec.Hosts == nil: + httpso.Spec.Hosts = []string{*httpso.Spec.Host} + httpso.Spec.Host = nil + logger.Info("Using the 'host' field is deprecated. Please consider switching to the 'hosts' field") + return nil + default: + return nil + } +} diff --git a/operator/controllers/http/httpscaledobject_test.go b/operator/controllers/http/httpscaledobject_test.go new file mode 100644 index 000000000..38650191b --- /dev/null +++ b/operator/controllers/http/httpscaledobject_test.go @@ -0,0 +1,65 @@ +package http + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSanitizeHostsWithOnlyHosts(t *testing.T) { + r := require.New(t) + + testInfra := newCommonTestInfra("testns", "testapp") + spec := testInfra.httpso.Spec + + r.NoError(sanitizeHosts( + testInfra.logger, + &testInfra.httpso, + )) + + r.Equal(spec.Hosts, testInfra.httpso.Spec.Hosts) + r.Nil(testInfra.httpso.Spec.Host) +} + +func TestSanitizeHostsWithBothHostAndHosts(t *testing.T) { + r := require.New(t) + + testInfra := newBrokenTestInfra("testns", "testapp") + + err := sanitizeHosts( + testInfra.logger, + &testInfra.httpso, + ) + r.Error(err) +} + +func TestSanitizeHostsWithOnlyHost(t *testing.T) { + r := require.New(t) + + testInfra := newDeprecatedTestInfra("testns", "testapp") + spec := testInfra.httpso.Spec + + r.NoError(sanitizeHosts( + testInfra.logger, + &testInfra.httpso, + )) + + r.NotEqual(spec.Hosts, testInfra.httpso.Spec.Hosts) + r.NotEqual(spec.Host, testInfra.httpso.Spec.Host) + r.Nil(testInfra.httpso.Spec.Host) + r.Equal([]string{*spec.Host}, testInfra.httpso.Spec.Hosts) +} + +func TestSanitizeHostsWithNoHostOrHosts(t *testing.T) { + r := require.New(t) + + testInfra := newEmptyHostTestInfra("testns", "testapp") + + err := sanitizeHosts( + testInfra.logger, + &testInfra.httpso, + ) + r.Error(err) + r.Nil(testInfra.httpso.Spec.Host) + r.Nil(testInfra.httpso.Spec.Hosts) +} diff --git a/operator/controllers/http/routing_table.go b/operator/controllers/http/routing_table.go index e50c16842..128c2a059 100644 --- a/operator/controllers/http/routing_table.go +++ b/operator/controllers/http/routing_table.go @@ -16,17 +16,19 @@ func removeAndUpdateRoutingTable( lggr logr.Logger, cl client.Client, table *routing.Table, - host, + hosts []string, namespace string, ) error { lggr = lggr.WithName("removeAndUpdateRoutingTable") - if err := table.RemoveTarget(host); err != nil { - lggr.Error( - err, - "could not remove host from routing table, progressing anyway", - "host", - host, - ) + for _, host := range hosts { + if err := table.RemoveTarget(host); err != nil { + lggr.Error( + err, + "could not remove host from routing table, progressing anyway", + "host", + host, + ) + } } return updateRoutingMap(ctx, lggr, cl, namespace, table) @@ -37,18 +39,20 @@ func addAndUpdateRoutingTable( lggr logr.Logger, cl client.Client, table *routing.Table, - host string, + hosts []string, target routing.Target, namespace string, ) error { lggr = lggr.WithName("addAndUpdateRoutingTable") - if err := table.AddTarget(host, target); err != nil { - lggr.Error( - err, - "could not add host to routing table, progressing anyway", - "host", - host, - ) + for _, host := range hosts { + if err := table.AddTarget(host, target); err != nil { + lggr.Error( + err, + "could not add host to routing table, progressing anyway", + "host", + host, + ) + } } return updateRoutingMap(ctx, lggr, cl, namespace, table) } diff --git a/operator/controllers/http/routing_table_test.go b/operator/controllers/http/routing_table_test.go index 07d6bf14e..3f754ae38 100644 --- a/operator/controllers/http/routing_table_test.go +++ b/operator/controllers/http/routing_table_test.go @@ -13,14 +13,18 @@ import ( "github.com/kedacore/http-add-on/pkg/routing" ) +func getHosts() []string { + return []string{"myhost.com"} +} + func TestRoutingTable(t *testing.T) { table := routing.NewTable() const ( - host = "myhost.com" ns = "testns" svcName = "testsvc" deplName = "testdepl" ) + hosts := getHosts() r := require.New(t) ctx := context.Background() cl := k8s.NewFakeRuntimeClient() @@ -45,7 +49,7 @@ func TestRoutingTable(t *testing.T) { logr.Discard(), cl, table, - host, + hosts, target, ns, )) @@ -56,16 +60,18 @@ func TestRoutingTable(t *testing.T) { r.Equal(0, len(cl.FakeRuntimeClientWriter.Updates)) r.Equal(0, len(cl.FakeRuntimeClientWriter.Creates)) - retTarget, err := table.Lookup(host) - r.NoError(err) - r.Equal(&target, retTarget) + for _, host := range hosts { + retTarget, err := table.Lookup(host) + r.NoError(err) + r.Equal(target, *retTarget) + } r.NoError(removeAndUpdateRoutingTable( ctx, logr.Discard(), cl, table, - host, + hosts, ns, )) @@ -76,6 +82,8 @@ func TestRoutingTable(t *testing.T) { r.Equal(0, len(cl.FakeRuntimeClientWriter.Updates)) r.Equal(0, len(cl.FakeRuntimeClientWriter.Creates)) - _, err = table.Lookup(host) - r.Error(err) + for _, host := range hosts { + _, err := table.Lookup(host) + r.Error(err) + } } diff --git a/operator/controllers/http/scaled_object.go b/operator/controllers/http/scaled_object.go index afa9aed33..fa4a881bb 100644 --- a/operator/controllers/http/scaled_object.go +++ b/operator/controllers/http/scaled_object.go @@ -39,7 +39,7 @@ func createOrUpdateScaledObject( fmt.Sprintf("%s-app", httpso.GetName()), // HTTPScaledObject name is the same as the ScaledObject name httpso.Spec.ScaleTargetRef.Deployment, externalScalerHostName, - httpso.Spec.Host, + httpso.Spec.Hosts, minReplicaCount, maxReplicaCount, httpso.Spec.CooldownPeriod, diff --git a/operator/controllers/http/scaled_object_test.go b/operator/controllers/http/scaled_object_test.go index 25d583bcd..07280bd20 100644 --- a/operator/controllers/http/scaled_object_test.go +++ b/operator/controllers/http/scaled_object_test.go @@ -72,6 +72,12 @@ func TestCreateOrUpdateScaledObject(t *testing.T) { spec.MaxReplicaCount, ) + // get hosts from spec and ensure all the hosts are there + r.Equal( + 2, + len(testInfra.httpso.Spec.Hosts), + ) + // now update the min and max replicas on the httpso // and call createOrUpdateScaledObject again if spec := &testInfra.httpso.Spec; spec.Replicas == nil { diff --git a/operator/controllers/http/suite_test.go b/operator/controllers/http/suite_test.go index 347068958..7a7f6ec11 100644 --- a/operator/controllers/http/suite_test.go +++ b/operator/controllers/http/suite_test.go @@ -102,6 +102,116 @@ func newCommonTestInfra(namespace, appName string) *commonTestInfra { cl := fake.NewClientBuilder().WithScheme(localScheme).Build() logger := logr.Discard() + httpso := httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: appName, + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + ScaleTargetRef: &httpv1alpha1.ScaleTargetRef{ + Deployment: appName, + Service: appName, + Port: 8081, + }, + Hosts: []string{"myhost1.com", "myhost2.com"}, + }, + } + + return &commonTestInfra{ + ns: namespace, + appName: appName, + ctx: ctx, + cl: cl, + logger: logger, + httpso: httpso, + } +} + +func newBrokenTestInfra(namespace, appName string) *commonTestInfra { + localScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(localScheme)) + utilruntime.Must(httpv1alpha1.AddToScheme(localScheme)) + utilruntime.Must(kedav1alpha1.AddToScheme(localScheme)) + + ctx := context.Background() + cl := fake.NewClientBuilder().WithScheme(localScheme).Build() + logger := logr.Discard() + + host := "myhost1.com" + + httpso := httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: appName, + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + ScaleTargetRef: &httpv1alpha1.ScaleTargetRef{ + Deployment: appName, + Service: appName, + Port: 8081, + }, + Hosts: []string{"myhost1.com", "myhost2.com"}, + Host: &host, + }, + } + + return &commonTestInfra{ + ns: namespace, + appName: appName, + ctx: ctx, + cl: cl, + logger: logger, + httpso: httpso, + } +} + +func newDeprecatedTestInfra(namespace, appName string) *commonTestInfra { + localScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(localScheme)) + utilruntime.Must(httpv1alpha1.AddToScheme(localScheme)) + utilruntime.Must(kedav1alpha1.AddToScheme(localScheme)) + + ctx := context.Background() + cl := fake.NewClientBuilder().WithScheme(localScheme).Build() + logger := logr.Discard() + + host := "myhost1.com" + + httpso := httpv1alpha1.HTTPScaledObject{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: namespace, + Name: appName, + }, + Spec: httpv1alpha1.HTTPScaledObjectSpec{ + ScaleTargetRef: &httpv1alpha1.ScaleTargetRef{ + Deployment: appName, + Service: appName, + Port: 8081, + }, + Host: &host, + }, + } + + return &commonTestInfra{ + ns: namespace, + appName: appName, + ctx: ctx, + cl: cl, + logger: logger, + httpso: httpso, + } +} + +func newEmptyHostTestInfra(namespace, appName string) *commonTestInfra { + localScheme := runtime.NewScheme() + utilruntime.Must(clientgoscheme.AddToScheme(localScheme)) + utilruntime.Must(httpv1alpha1.AddToScheme(localScheme)) + utilruntime.Must(kedav1alpha1.AddToScheme(localScheme)) + + ctx := context.Background() + cl := fake.NewClientBuilder().WithScheme(localScheme).Build() + logger := logr.Discard() + httpso := httpv1alpha1.HTTPScaledObject{ ObjectMeta: metav1.ObjectMeta{ Namespace: namespace, diff --git a/pkg/k8s/scaledobject.go b/pkg/k8s/scaledobject.go index ddc20b0fe..e185a892f 100644 --- a/pkg/k8s/scaledobject.go +++ b/pkg/k8s/scaledobject.go @@ -1,6 +1,8 @@ package k8s import ( + "strings" + kedav1alpha1 "github.com/kedacore/keda/v2/apis/keda/v1alpha1" appsv1 "k8s.io/api/apps/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -12,7 +14,7 @@ const ( soTriggerType = "external-push" mkScalerAddress = "scalerAddress" - mkHost = "host" + mkHosts = "hosts" ) // NewScaledObject creates a new ScaledObject in memory @@ -21,7 +23,7 @@ func NewScaledObject( name string, deploymentName string, scalerAddress string, - host string, + hosts []string, minReplicas *int32, maxReplicas *int32, cooldownPeriod *int32, @@ -54,7 +56,7 @@ func NewScaledObject( Type: soTriggerType, Metadata: map[string]string{ mkScalerAddress: scalerAddress, - mkHost: host, + mkHosts: strings.Join(hosts, ","), }, }, }, diff --git a/scaler/handlers.go b/scaler/handlers.go index f616fc240..b18d86fb7 100644 --- a/scaler/handlers.go +++ b/scaler/handlers.go @@ -22,7 +22,8 @@ func init() { } const ( - interceptor = "interceptor" + interceptor = "interceptor" + httpRequests = "http-requests" ) type impl struct { @@ -59,32 +60,38 @@ func (e *impl) IsActive( scaledObject *externalscaler.ScaledObjectRef, ) (*externalscaler.IsActiveResponse, error) { lggr := e.lggr.WithName("IsActive") - host, ok := scaledObject.ScalerMetadata["host"] - if !ok { - err := fmt.Errorf("no 'host' field found in ScaledObject metadata") - lggr.Error(err, "returning immediately from IsActive RPC call", "ScaledObject", scaledObject) + + hosts, err := getHostsFromScaledObjectRef(lggr, scaledObject) + if err != nil { return nil, err } - if host == interceptor { - return &externalscaler.IsActiveResponse{ - Result: true, - }, nil - } - hostCount, ok := getHostCount( - host, - e.pinger.counts(), - e.routingTable, - ) - if !ok { - err := fmt.Errorf("host '%s' not found in counts", host) - allCounts := e.pinger.mergeCountsWithRoutingTable( + totalHostCount := 0 + for _, host := range hosts { + if host == interceptor { + return &externalscaler.IsActiveResponse{ + Result: true, + }, nil + } + + hostCount, ok := getHostCount( + host, + e.pinger.counts(), e.routingTable, ) - lggr.Error(err, "Given host was not found in queue count map", "host", host, "allCounts", allCounts) - return nil, err + if !ok { + err := fmt.Errorf("host '%s' not found in counts", host) + allCounts := e.pinger.mergeCountsWithRoutingTable( + e.routingTable, + ) + lggr.Error(err, "Given host was not found in queue count map", "host", host, "allCounts", allCounts) + return nil, err + } + + totalHostCount += hostCount } - active := hostCount > 0 + + active := totalHostCount > 0 return &externalscaler.IsActiveResponse{ Result: active, }, nil @@ -131,13 +138,15 @@ func (e *impl) GetMetricSpec( sor *externalscaler.ScaledObjectRef, ) (*externalscaler.GetMetricSpecResponse, error) { lggr := e.lggr.WithName("GetMetricSpec") - host, ok := sor.ScalerMetadata["host"] - if !ok { - err := fmt.Errorf("'host' not found in ScaledObject metadata") - lggr.Error(err, "no 'host' found in ScaledObject metadata") + + hosts, err := getHostsFromScaledObjectRef(lggr, sor) + if err != nil { return nil, err } + var targetPendingRequests int64 + var host = hosts[0] // We are only interested in the first host to get the targetPendingRequests + if host == interceptor { targetPendingRequests = e.targetMetricInterceptor } else { @@ -151,15 +160,16 @@ func (e *impl) GetMetricSpec( ) return nil, err } + host = httpRequests targetPendingRequests = int64(target.TargetPendingRequests) } + metricSpecs := []*externalscaler.MetricSpec{ { MetricName: host, TargetSize: targetPendingRequests, }, } - return &externalscaler.GetMetricSpecResponse{ MetricSpecs: metricSpecs, }, nil @@ -170,34 +180,40 @@ func (e *impl) GetMetrics( metricRequest *externalscaler.GetMetricsRequest, ) (*externalscaler.GetMetricsResponse, error) { lggr := e.lggr.WithName("GetMetrics") - host, ok := metricRequest.ScaledObjectRef.ScalerMetadata["host"] - if !ok { - err := fmt.Errorf("no 'host' field found in ScaledObject metadata") - lggr.Error(err, "ScaledObjectRef", metricRequest.ScaledObjectRef) + + hosts, err := getHostsFromScaledObjectRef(lggr, metricRequest.ScaledObjectRef) + if err != nil { return nil, err } - hostCount, ok := getHostCount( - host, - e.pinger.counts(), - e.routingTable, - ) - if !ok { - if host == interceptor { + var totalCount int64 + var metricName = httpRequests + for _, host := range hosts { + hostCount, ok := getHostCount( + host, + e.pinger.counts(), + e.routingTable, + ) + if !ok { + if host != interceptor { + err := fmt.Errorf("host '%s' not found in counts", host) + allCounts := e.pinger.mergeCountsWithRoutingTable(e.routingTable) + lggr.Error(err, "allCounts", allCounts) + return nil, err + } hostCount = e.pinger.aggregate() - } else { - err := fmt.Errorf("host '%s' not found in counts", host) - allCounts := e.pinger.mergeCountsWithRoutingTable(e.routingTable) - lggr.Error(err, "allCounts", allCounts) - return nil, err + metricName = interceptor } + totalCount += int64(hostCount) } + metricValues := []*externalscaler.MetricValue{ { - MetricName: host, - MetricValue: int64(hostCount), + MetricName: metricName, + MetricValue: totalCount, }, } + return &externalscaler.GetMetricsResponse{ MetricValues: metricValues, }, nil diff --git a/scaler/handlers_test.go b/scaler/handlers_test.go index 8c0d95457..bc7d3620e 100644 --- a/scaler/handlers_test.go +++ b/scaler/handlers_test.go @@ -31,7 +31,7 @@ func standardTarget() routing.Target { func TestStreamIsActive(t *testing.T) { type testCase struct { name string - host string + hosts string expected bool expectedErr bool setup func(*routing.Table, *queuePinger) @@ -40,7 +40,7 @@ func TestStreamIsActive(t *testing.T) { testCases := []testCase{ { name: "Simple host inactive", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -52,14 +52,14 @@ func TestStreamIsActive(t *testing.T) { }, { name: "Host is 'interceptor'", - host: "interceptor", + hosts: "interceptor", expected: true, expectedErr: false, setup: func(*routing.Table, *queuePinger) {}, }, { name: "Simple host active", - host: t.Name(), + hosts: t.Name(), expected: true, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -69,9 +69,22 @@ func TestStreamIsActive(t *testing.T) { q.allCounts[t.Name()] = 1 }, }, + { + name: "Simple multi host active", + hosts: "host1,host2", + expected: true, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + r.NoError(table.AddTarget(t.Name(), standardTarget())) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts["host1"] = 1 + q.allCounts["host2"] = 1 + }, + }, { name: "No host present, but host in routing table", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -80,7 +93,7 @@ func TestStreamIsActive(t *testing.T) { }, { name: "Host doesn't exist", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: true, setup: func(*routing.Table, *queuePinger) {}, @@ -132,7 +145,7 @@ func TestStreamIsActive(t *testing.T) { testRef := &externalscaler.ScaledObjectRef{ ScalerMetadata: map[string]string{ - "host": tc.host, + "hosts": tc.hosts, }, } @@ -163,7 +176,7 @@ func TestStreamIsActive(t *testing.T) { func TestIsActive(t *testing.T) { type testCase struct { name string - host string + hosts string expected bool expectedErr bool setup func(*routing.Table, *queuePinger) @@ -172,7 +185,7 @@ func TestIsActive(t *testing.T) { testCases := []testCase{ { name: "Simple host inactive", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -184,14 +197,14 @@ func TestIsActive(t *testing.T) { }, { name: "Host is 'interceptor'", - host: "interceptor", + hosts: "interceptor", expected: true, expectedErr: false, setup: func(*routing.Table, *queuePinger) {}, }, { name: "Simple host active", - host: t.Name(), + hosts: t.Name(), expected: true, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -201,9 +214,22 @@ func TestIsActive(t *testing.T) { q.allCounts[t.Name()] = 1 }, }, + { + name: "Simple multi host active", + hosts: "host1,host2", + expected: true, + expectedErr: false, + setup: func(table *routing.Table, q *queuePinger) { + r.NoError(table.AddTarget(t.Name(), standardTarget())) + q.pingMut.Lock() + defer q.pingMut.Unlock() + q.allCounts["host1"] = 1 + q.allCounts["host2"] = 1 + }, + }, { name: "No host present, but host in routing table", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: false, setup: func(table *routing.Table, q *queuePinger) { @@ -212,7 +238,7 @@ func TestIsActive(t *testing.T) { }, { name: "Host doesn't exist", - host: t.Name(), + hosts: t.Name(), expected: false, expectedErr: true, setup: func(*routing.Table, *queuePinger) {}, @@ -236,11 +262,12 @@ func TestIsActive(t *testing.T) { 123, 200, ) + res, err := hdl.IsActive( ctx, &externalscaler.ScaledObjectRef{ ScalerMetadata: map[string]string{ - "host": tc.host, + "hosts": tc.hosts, }, }, ) @@ -270,11 +297,11 @@ func TestGetMetricSpecTable(t *testing.T) { r := require.New(t) cases := []testCase{ { - name: "valid host as host value in scaler metadata", + name: "valid host as single host value in scaler metadata", defaultTargetMetric: 0, defaultTargetMetricInterceptor: 123, scalerMetadata: map[string]string{ - "host": "validHost", + "hosts": "validHost", "targetPendingRequests": "123", }, newRoutingTableFn: func() *routing.Table { @@ -295,7 +322,44 @@ func TestGetMetricSpecTable(t *testing.T) { r.NotNil(res) r.Equal(1, len(res.MetricSpecs)) spec := res.MetricSpecs[0] - r.Equal("validHost", spec.MetricName) + r.Equal(httpRequests, spec.MetricName) + r.Equal(int64(123), spec.TargetSize) + }, + }, + { + name: "valid hosts as multiple hosts value in scaler metadata", + defaultTargetMetric: 0, + defaultTargetMetricInterceptor: 123, + scalerMetadata: map[string]string{ + "hosts": "validHost1,validHost2", + "targetPendingRequests": "123", + }, + newRoutingTableFn: func() *routing.Table { + ret := routing.NewTable() + r.NoError(ret.AddTarget("validHost1", routing.NewTarget( + ns, + "testsrv", + 8080, + "testdepl", + 123, + ))) + r.NoError(ret.AddTarget("validHost2", routing.NewTarget( + ns, + "testsrv", + 8080, + "testdepl", + 123, + ))) + return ret + }, + checker: func(t *testing.T, res *externalscaler.GetMetricSpecResponse, err error) { + t.Helper() + r := require.New(t) + r.NoError(err) + r.NotNil(res) + r.Equal(1, len(res.MetricSpecs)) + spec := res.MetricSpecs[0] + r.Equal(httpRequests, spec.MetricName) r.Equal(int64(123), spec.TargetSize) }, }, @@ -304,7 +368,7 @@ func TestGetMetricSpecTable(t *testing.T) { defaultTargetMetric: 1000, defaultTargetMetricInterceptor: 2000, scalerMetadata: map[string]string{ - "host": "interceptor", + "hosts": interceptor, "targetPendingRequests": "123", }, newRoutingTableFn: func() *routing.Table { @@ -325,7 +389,7 @@ func TestGetMetricSpecTable(t *testing.T) { r.NotNil(res) r.Equal(1, len(res.MetricSpecs)) spec := res.MetricSpecs[0] - r.Equal("interceptor", spec.MetricName) + r.Equal(interceptor, spec.MetricName) r.Equal(int64(2000), spec.TargetSize) }, }, @@ -425,7 +489,7 @@ func TestGetMetrics(t *testing.T) { testCases := []testCase{ { - name: "no 'host' field in the scaler metadata field", + name: "no 'hosts' field in the scaler metadata field", scalerMetadata: map[string]string{}, setupFn: func( ctx context.Context, @@ -445,7 +509,7 @@ func TestGetMetrics(t *testing.T) { r.Nil(res) r.Contains( err.Error(), - "no 'host' field found in ScaledObject metadata", + "no 'hosts' field in the scaler metadata field", ) }, defaultTargetMetric: int64(200), @@ -454,7 +518,7 @@ func TestGetMetrics(t *testing.T) { { name: "missing host value in the queue pinger", scalerMetadata: map[string]string{ - "host": "missingHostInQueue", + "hosts": "missingHostInQueue", }, setupFn: func( ctx context.Context, @@ -481,7 +545,7 @@ func TestGetMetrics(t *testing.T) { { name: "valid host", scalerMetadata: map[string]string{ - "host": "validHost", + "hosts": "validHost", }, setupFn: func( ctx context.Context, @@ -504,7 +568,7 @@ func TestGetMetrics(t *testing.T) { r.NotNil(res) r.Equal(1, len(res.MetricValues)) metricVal := res.MetricValues[0] - r.Equal("validHost", metricVal.MetricName) + r.Equal(httpRequests, metricVal.MetricName) r.Equal(int64(201), metricVal.MetricValue) }, defaultTargetMetric: int64(200), @@ -513,7 +577,7 @@ func TestGetMetrics(t *testing.T) { { name: "'interceptor' as host", scalerMetadata: map[string]string{ - "host": "interceptor", + "hosts": interceptor, }, setupFn: func( ctx context.Context, @@ -536,7 +600,7 @@ func TestGetMetrics(t *testing.T) { r.NotNil(res) r.Equal(1, len(res.MetricValues)) metricVal := res.MetricValues[0] - r.Equal("interceptor", metricVal.MetricName) + r.Equal(interceptor, metricVal.MetricName) // the value here needs to be the same thing as // the sum of the values in the fake queue created // in the setup function @@ -548,7 +612,7 @@ func TestGetMetrics(t *testing.T) { { name: "host in routing table, missing in queue pinger", scalerMetadata: map[string]string{ - "host": "myhost.com", + "hosts": "myhost.com", }, setupFn: func( ctx context.Context, @@ -576,7 +640,7 @@ func TestGetMetrics(t *testing.T) { r.NotNil(res) r.Equal(1, len(res.MetricValues)) metricVal := res.MetricValues[0] - r.Equal("myhost.com", metricVal.MetricName) + r.Equal(httpRequests, metricVal.MetricName) // the value here needs to be the same thing as // the sum of the values in the fake queue created // in the setup function @@ -585,6 +649,42 @@ func TestGetMetrics(t *testing.T) { defaultTargetMetric: int64(200), defaultTargetMetricInterceptor: int64(300), }, + { + name: "multiple validHosts add MetricValues", + scalerMetadata: map[string]string{ + "hosts": "validHost1,validHost2", + }, + setupFn: func( + ctx context.Context, + lggr logr.Logger, + ) (*routing.Table, *queuePinger, func(), error) { + table := routing.NewTable() + pinger, done, err := startFakeInterceptorServer(ctx, lggr, map[string]int{ + "validHost1": 123, + "validHost2": 456, + }, 2*time.Millisecond) + if err != nil { + return nil, nil, nil, err + } + + return table, pinger, done, nil + }, + checkFn: func(t *testing.T, res *externalscaler.GetMetricsResponse, err error) { + t.Helper() + r := require.New(t) + r.NoError(err) + r.NotNil(res) + r.Equal(1, len(res.MetricValues)) + metricVal := res.MetricValues[0] + r.Equal(httpRequests, metricVal.MetricName) + // the value here needs to be the same thing as + // the sum of the values in the fake queue created + // in the setup function + r.Equal(int64(579), metricVal.MetricValue) + }, + defaultTargetMetric: int64(500), + defaultTargetMetricInterceptor: int64(600), + }, } for i, c := range testCases { diff --git a/scaler/host_counts.go b/scaler/host_counts.go deleted file mode 100644 index 0a8874842..000000000 --- a/scaler/host_counts.go +++ /dev/null @@ -1,21 +0,0 @@ -package main - -import ( - "github.com/kedacore/http-add-on/pkg/routing" -) - -// getHostCount gets proper count for given host regardless whether -// host is in counts or only in routerTable -func getHostCount( - host string, - counts map[string]int, - table routing.TableReader, -) (int, bool) { - count, exists := counts[host] - if exists { - return count, exists - } - - exists = table.HasHost(host) - return 0, exists -} diff --git a/scaler/hosts.go b/scaler/hosts.go new file mode 100644 index 000000000..148f51b57 --- /dev/null +++ b/scaler/hosts.go @@ -0,0 +1,38 @@ +package main + +import ( + "fmt" + "strings" + + "github.com/go-logr/logr" + + "github.com/kedacore/http-add-on/pkg/routing" + externalscaler "github.com/kedacore/http-add-on/proto" +) + +// getHostCount gets proper count for given host regardless whether +// host is in counts or only in routerTable +func getHostCount( + host string, + counts map[string]int, + table routing.TableReader, +) (int, bool) { + count, exists := counts[host] + if exists { + return count, exists + } + + exists = table.HasHost(host) + return 0, exists +} + +// gets hosts from scaledobjectref +func getHostsFromScaledObjectRef(lggr logr.Logger, sor *externalscaler.ScaledObjectRef) ([]string, error) { + serializedHosts, ok := sor.ScalerMetadata["hosts"] + if !ok { + err := fmt.Errorf("no 'hosts' field in the scaler metadata field") + lggr.Error(err, "'hosts' not found in the scaler metadata field") + return make([]string, 0), err + } + return strings.Split(serializedHosts, ","), nil +}