Skip to content

Commit

Permalink
Ensuring operator updates ScaledObject on all HTTPScaledObject changes
Browse files Browse the repository at this point in the history
Signed-off-by: Aaron Schlesinger <aaron@ecomaz.net>
  • Loading branch information
arschles committed Jan 14, 2022
1 parent eb4e1e9 commit 6942b8e
Show file tree
Hide file tree
Showing 15 changed files with 409 additions and 188 deletions.
5 changes: 3 additions & 2 deletions e2e/e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kelseyhightower/envconfig"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/uuid"
Expand Down Expand Up @@ -68,9 +69,9 @@ func TestE2E(t *testing.T) {

// ensure that the interceptor and XKCD scaledobjects
// exist
_, err = getScaledObject(ctx, cl, ns, "keda-add-ons-http-interceptor")
_, err = k8s.GetScaledObject(ctx, cl, ns, "keda-add-ons-http-interceptor")
r.NoError(err)
_, err = getScaledObject(ctx, cl, ns, "xkcd-app")
_, err = k8s.GetScaledObject(ctx, cl, ns, "xkcd-app")
r.NoError(err)

// issue requests to the XKCD service directly to make
Expand Down
27 changes: 0 additions & 27 deletions e2e/k8s.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package e2e

import (
"context"
"strconv"

"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/magefile/mage/sh"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/client"
clientconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
Expand Down Expand Up @@ -41,27 +38,3 @@ func getPortStrings(svc *corev1.Service) []string {
}
return ret
}

func getScaledObject(
ctx context.Context,
cl client.Client,
ns,
name string,
) (*unstructured.Unstructured, error) {
scaledObject, err := k8s.NewScaledObject(
ns,
name,
"",
"",
"",
1,
2,
)
if err != nil {
return nil, err
}
if err := cl.Get(ctx, k8s.ObjKey(ns, name), scaledObject); err != nil {
return nil, err
}
return scaledObject, nil
}
2 changes: 1 addition & 1 deletion operator/controllers/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ensureFinalizer(
logger.Info("Adding Finalizer for the ScaledObject")
httpso.SetFinalizers(append(httpso.GetFinalizers(), httpScaledObjectFinalizer))

// Update CR
// Update HTTPScaledObject
err := client.Update(ctx, httpso)
if err != nil {
logger.Error(
Expand Down
12 changes: 9 additions & 3 deletions operator/controllers/httpscaledobject_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,13 +79,19 @@ func (rec *HTTPScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.R
}, err
}

// This is a deleted HTTPScaledObject
if httpso.GetDeletionTimestamp() != nil {
logger.Info("Deletion timestamp found", "httpscaledobject", *httpso)
// if it was marked deleted, delete all the related objects
// and don't schedule for another reconcile. Kubernetes
// will finalize them
// TODO: move this function call into `finalizeScaledObject`
removeErr := rec.removeApplicationResources(ctx, logger, rec.BaseConfig.CurrentNamespace, httpso)
removeErr := rec.removeApplicationResources(
ctx,
logger,
rec.BaseConfig.CurrentNamespace,
httpso,
)
if removeErr != nil {
// if we failed to remove app resources, reschedule a reconcile so we can try
// again
Expand All @@ -107,9 +113,9 @@ func (rec *HTTPScaledObjectReconciler) Reconcile(ctx context.Context, req ctrl.R
logger.Info(
"Reconciling HTTPScaledObject",
"Namespace",
req.Namespace,
httpso.Namespace,
"DeploymentName",
httpso.Name,
httpso.Spec.ScaleTargetRef.Deployment,
)

// Create required app objects for the application defined by the CRD
Expand Down
134 changes: 134 additions & 0 deletions operator/controllers/httpscaledobject_controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
package controllers

import (
"context"
"fmt"
"strings"
"testing"

"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/operator/api/v1alpha1"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/stretchr/testify/require"
"k8s.io/client-go/kubernetes/scheme"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

// When an HTTPScaledObject replicas field is updated,
// the corresponding ScaledObject should be updated by
// the reconciler.
func TestUpdatesReplicas(t *testing.T) {
ctx := context.Background()
const (
ns = "testns"
name = "test"
)
namespacedName := k8s.NamespacedName(ns, name)
r := require.New(t)
// Add HTTPScaledObject to scheme
r.NoError(v1alpha1.AddToScheme(scheme.Scheme))
routingTable := routing.NewTable()
infra := newCommonTestInfra(ns, name)

// ensure the routing table config map exists
routingCM := newRoutingConfigMap(ns)
r.NoError(routing.SaveTableToConfigMap(
routingTable,
routingCM,
))
r.NoError(infra.cl.Create(ctx, routingCM))
// make sure the HTTPScaledObject is already created so it
// can be reconciled
r.NoError(infra.cl.Create(ctx, &infra.httpso))
// clear out all create call records, so we can later test
// actual create calls from the Reconcile method
infra.cl.Creates = nil
ctrl := HTTPScaledObjectReconciler{
Client: infra.cl,
Log: logr.Discard(),
RoutingTable: routingTable,
}

_, err := ctrl.Reconcile(ctx, reconcile.Request{
NamespacedName: namespacedName,
})
r.NoError(err)
// expect one get call for the HTTPScaledObject,
// then one for the routing table config map
r.Equal(2, len(infra.cl.Gets))
getCall := infra.cl.Gets[0]
getGVK := getCall.Obj.GetObjectKind().GroupVersionKind()
r.Equal("HTTPScaledObject", getGVK.Kind)
r.Equal(infra.httpso.Name, getCall.Key.Name)
r.Equal(infra.httpso.Namespace, getCall.Key.Namespace)
getCall = infra.cl.Gets[1]
getGVK = getCall.Obj.GetObjectKind().GroupVersionKind()
r.Equal("ConfigMap", getGVK.Kind)
r.Equal(infra.httpso.GetNamespace(), getCall.Key.Namespace)
r.Equal(routing.ConfigMapRoutingTableName, getCall.Key.Name)

// expect a create call for the ScaledObject
r.Equal(1, len(infra.cl.Creates))
createCall := infra.cl.Creates[0]
createGVK := createCall.GetObjectKind().GroupVersionKind()
r.Equal("keda.sh", createGVK.Group)
r.Equal("v1alpha1", createGVK.Version)
r.Equal("ScaledObject", createGVK.Kind)
r.Equal(fmt.Sprintf("%s-app", infra.httpso.Name), createCall.GetName())
r.Equal(infra.httpso.Namespace, createCall.GetNamespace())

// expect a call to patch the ConfigMap
r.Equal(1, len(infra.cl.Patches))
patchCall := infra.cl.Patches[0]
patchGVK := patchCall.GetObjectKind().GroupVersionKind()
r.Equal("ConfigMap", patchGVK.Kind)
r.Equal(infra.httpso.GetNamespace(), patchCall.GetNamespace())
r.Equal(routing.ConfigMapRoutingTableName, patchCall.GetName())

// now change the min and max replicas of the HTTPScaledObject
// and ensure the ScaledObject is updated (there should now be an
// Update call)
updatedHTTPSO := v1alpha1.HTTPScaledObject{}
r.NoError(infra.cl.Get(ctx, namespacedName, &updatedHTTPSO))
updatedHTTPSO.Spec.Replicas.Min++
updatedHTTPSO.Spec.Replicas.Max++
r.NoError(infra.cl.Update(ctx, &updatedHTTPSO))
priorGets := infra.cl.Gets
_, err = ctrl.Reconcile(ctx, reconcile.Request{
NamespacedName: namespacedName,
})
r.NoError(err)
// there should be twice as many get calls, since
// we get the HTTPScaledObject and the routing table
// ConfigMap again
r.Equal(len(priorGets)*2, len(infra.cl.Gets))
latestCreate := infra.cl.Creates[len(infra.cl.Creates)-1]
latestCreateGVK := latestCreate.GetObjectKind().GroupVersionKind()
r.Equal("ScaledObject", latestCreateGVK.Kind)
r.Equal(fmt.Sprintf("%s-app", infra.httpso.Name), latestCreate.GetName())
r.Equal(infra.httpso.Namespace, latestCreate.GetNamespace())
unstructuredSO, err := k8s.GetScaledObject(ctx, infra.cl, ns, name+"-app")
r.NoError(err)
minIface := getNestedField(unstructuredSO.Object, "spec.replicas.min")
maxIface := getNestedField(unstructuredSO.Object, "spec.replicas.max")
r.Equal(updatedHTTPSO.Spec.Replicas.Min, minIface)
r.Equal(updatedHTTPSO.Spec.Replicas.Max, maxIface)
}

// getNestedField returns the value of the field with the given path.
// for example, if you specify "a.b.c", it will return the field at
// m[a][b][c]. This assumes that m[a] is a map[string]interface{},
// m[b] is a map[string]interface{}, and so on.
func getNestedField(m map[string]interface{}, path string) interface{} {
parts := strings.Split(path, ".")
var ret interface{} = m
for _, part := range parts {
m, ok := ret.(map[string]interface{})
if !ok {
return nil
}
ret = m[part]
}
return ret
}
22 changes: 9 additions & 13 deletions operator/controllers/httpscaledobject_logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ import (
"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/operator/api/v1alpha1"
"github.com/kedacore/http-add-on/operator/controllers/config"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
apierrs "k8s.io/apimachinery/pkg/api/errors"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
)

func (rec *HTTPScaledObjectReconciler) removeApplicationResources(
Expand All @@ -36,17 +35,12 @@ func (rec *HTTPScaledObjectReconciler) removeApplicationResources(
"HTTPScaledObject.namespace",
httpso.Namespace,
)

// Delete App ScaledObject
scaledObject := &unstructured.Unstructured{}
scaledObject.SetNamespace(httpso.Namespace)
scaledObject.SetName(config.AppScaledObjectName(httpso))
scaledObject.SetGroupVersionKind(schema.GroupVersionKind{
Group: "keda.sh",
Kind: "ScaledObject",
Version: "v1alpha1",
})
if err := rec.Client.Delete(ctx, scaledObject); err != nil {
if err := k8s.DeleteScaledObject(
ctx,
httpso.Namespace,
config.AppScaledObjectName(httpso),
rec.Client,
); err != nil {
if apierrs.IsNotFound(err) {
logger.Info("App ScaledObject not found, moving on")
} else {
Expand Down Expand Up @@ -111,6 +105,8 @@ func (rec *HTTPScaledObjectReconciler) createOrUpdateApplicationResources(
ctx,
rec.Client,
logger,
httpso.Namespace,
config.AppScaledObjectName(httpso),
externalScalerConfig.HostName(currentNamespace),
httpso,
); err != nil {
Expand Down
28 changes: 11 additions & 17 deletions operator/controllers/routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ import (
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

func TestRoutingTable(t *testing.T) {
Expand All @@ -23,21 +21,17 @@ func TestRoutingTable(t *testing.T) {
r := require.New(t)
ctx := context.Background()
cl := k8s.NewFakeRuntimeClient()
cm := &corev1.ConfigMap{
Data: map[string]string{},
}
cm := newRoutingConfigMap(ns)
// save the empty routing table to the config map,
// so that it has valid structure
r.NoError(routing.SaveTableToConfigMap(table, cm))
cl.GetFunc = func() client.Object {
return cm
}
r.NoError(cl.Create(ctx, cm))
target := routing.NewTarget(
ns,
svcName,
8080,
deplName,
1234,
123,
)
r.NoError(addAndUpdateRoutingTable(
ctx,
Expand All @@ -50,10 +44,10 @@ func TestRoutingTable(t *testing.T) {
))
// ensure that the ConfigMap was read and created. no updates
// should occur
r.Equal(1, len(cl.FakeRuntimeClientReader.GetCalls))
r.Equal(1, len(cl.FakeRuntimeClientWriter.Patches))
r.Equal(0, len(cl.FakeRuntimeClientWriter.Updates))
r.Equal(0, len(cl.FakeRuntimeClientWriter.Creates))
r.Equal(1, len(cl.Gets))
r.Equal(1, len(cl.Patches))
r.Equal(0, len(cl.Updates))
r.Equal(0, len(cl.Creates))

retTarget, err := table.Lookup(host)
r.NoError(err)
Expand All @@ -70,10 +64,10 @@ func TestRoutingTable(t *testing.T) {

// ensure that the ConfigMap was read and updated. no additional
// creates should occur.
r.Equal(2, len(cl.FakeRuntimeClientReader.GetCalls))
r.Equal(2, len(cl.FakeRuntimeClientWriter.Patches))
r.Equal(0, len(cl.FakeRuntimeClientWriter.Updates))
r.Equal(0, len(cl.FakeRuntimeClientWriter.Creates))
r.Equal(2, len(cl.Gets))
r.Equal(2, len(cl.Patches))
r.Equal(0, len(cl.Updates))
r.Equal(0, len(cl.Creates))

_, err = table.Lookup(host)
r.Error(err)
Expand Down
Loading

0 comments on commit 6942b8e

Please sign in to comment.