From a726e24d6fdc126456895318d0ece57e3ca97577 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Tue, 10 Jan 2023 15:06:12 +0100 Subject: [PATCH 1/4] feat: add service discovery for kong admin service --- CHANGELOG.md | 6 +- config/rbac/role.yaml | 8 + .../base/manager_multi_gateway_patch.yaml | 5 +- .../all-in-one-dbless-k4k8s-enterprise.yaml | 8 + deploy/single/all-in-one-dbless-konnect.yaml | 8 + deploy/single/all-in-one-dbless-multi-gw.yaml | 14 +- deploy/single/all-in-one-dbless.yaml | 8 + .../all-in-one-postgres-enterprise.yaml | 8 + deploy/single/all-in-one-postgres.yaml | 8 + internal/adminapi/client.go | 24 +++ internal/adminapi/kong.go | 23 ++- internal/adminapi/kong_test.go | 66 ++++-- .../configuration/kongadminapi_controller.go | 162 +++++++++++++++ internal/dataplane/client.go | 4 + internal/dataplane/kong_client.go | 143 ++++++++++++- internal/dataplane/kong_client_test.go | 189 ++++++++++++++++++ internal/dataplane/synchronizer.go | 4 + internal/dataplane/synchronizer_test.go | 4 + internal/manager/config.go | 13 +- internal/manager/config_validation.go | 6 + internal/manager/controllerdef.go | 14 ++ internal/manager/run.go | 14 +- internal/manager/setup.go | 63 +++++- internal/manager/utils/kongconfig/root.go | 4 +- test/e2e/all_in_one_test.go | 50 +++++ test/e2e/helpers_test.go | 41 +++- test/e2e/utils_test.go | 57 ++++-- test/internal/helpers/teardown.go | 16 +- 28 files changed, 879 insertions(+), 91 deletions(-) create mode 100644 internal/controllers/configuration/kongadminapi_controller.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 814ae1c57e..d39ee2a561 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -96,9 +96,13 @@ Adding a new version? You'll need three changes: [#3507](https://github.com/Kong/kubernetes-ingress-controller/pull/3507) - Enable `ReferenceGrant` if `Gateway` feature gate is turned on (default). [#3519](https://github.com/Kong/kubernetes-ingress-controller/pull/3519) -- Added Konnect client to upload status of KIC instance to Konnect cloud if +- Added Konnect client to upload status of KIC instance to Konnect cloud if flag `--konnect-sync-enabled` is set to `true`. [#3469](https://github.com/Kong/kubernetes-ingress-controller/pull/3469) +- Added service discovery for kong admin service configured via `--kong-admin-svc` + which accepts a namespaced name of headless kong admin service which should have + Admin API endpoints exposed under a named port called `admin` + [#3421](https://github.com/Kong/kubernetes-ingress-controller/pull/3421) ### Fixed diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 69969e7c35..193b52ccb9 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -177,6 +177,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/config/variants/multi-gw/base/manager_multi_gateway_patch.yaml b/config/variants/multi-gw/base/manager_multi_gateway_patch.yaml index a0a7f0256d..49bc5dbd5b 100644 --- a/config/variants/multi-gw/base/manager_multi_gateway_patch.yaml +++ b/config/variants/multi-gw/base/manager_multi_gateway_patch.yaml @@ -11,8 +11,7 @@ spec: containers: - name: ingress-controller env: - - name: CONTROLLER_LOG_LEVEL - value: debug - name: CONTROLLER_KONG_ADMIN_SVC value: kong/kong-admin - image: kic-placeholder:placeholder + - name: CONTROLLER_KONG_ADMIN_URL + $patch: delete diff --git a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml index 7c75256545..79921ae67e 100644 --- a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-dbless-konnect.yaml b/deploy/single/all-in-one-dbless-konnect.yaml index 6422d38372..26b800bd84 100644 --- a/deploy/single/all-in-one-dbless-konnect.yaml +++ b/deploy/single/all-in-one-dbless-konnect.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-dbless-multi-gw.yaml b/deploy/single/all-in-one-dbless-multi-gw.yaml index e8a3f0d48a..4a3a8748d6 100644 --- a/deploy/single/all-in-one-dbless-multi-gw.yaml +++ b/deploy/single/all-in-one-dbless-multi-gw.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: @@ -1650,12 +1658,8 @@ spec: automountServiceAccountToken: false containers: - env: - - name: CONTROLLER_LOG_LEVEL - value: debug - name: CONTROLLER_KONG_ADMIN_SVC value: kong/kong-admin - - name: CONTROLLER_KONG_ADMIN_URL - value: https://127.0.0.1:8444 - name: CONTROLLER_KONG_ADMIN_TLS_SKIP_VERIFY value: "true" - name: CONTROLLER_PUBLISH_SERVICE @@ -1670,7 +1674,7 @@ spec: fieldRef: apiVersion: v1 fieldPath: metadata.namespace - image: kic-placeholder:placeholder + image: kong/kubernetes-ingress-controller:2.8.1 imagePullPolicy: IfNotPresent livenessProbe: failureThreshold: 3 diff --git a/deploy/single/all-in-one-dbless.yaml b/deploy/single/all-in-one-dbless.yaml index 62880491ce..b31359c3dd 100644 --- a/deploy/single/all-in-one-dbless.yaml +++ b/deploy/single/all-in-one-dbless.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-postgres-enterprise.yaml b/deploy/single/all-in-one-postgres-enterprise.yaml index 471b0f1ec8..571e31ad99 100644 --- a/deploy/single/all-in-one-postgres-enterprise.yaml +++ b/deploy/single/all-in-one-postgres-enterprise.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-postgres.yaml b/deploy/single/all-in-one-postgres.yaml index 53e0ce4989..3c1093ec17 100644 --- a/deploy/single/all-in-one-postgres.yaml +++ b/deploy/single/all-in-one-postgres.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/internal/adminapi/client.go b/internal/adminapi/client.go index 11b12ba2cc..8b131c111c 100644 --- a/internal/adminapi/client.go +++ b/internal/adminapi/client.go @@ -1,6 +1,8 @@ package adminapi import ( + "context" + "github.com/kong/go-kong/kong" "github.com/kong/kubernetes-ingress-controller/v2/internal/util" @@ -76,3 +78,25 @@ func (c *Client) SetLastConfigSHA(s []byte) { func (c *Client) LastConfigSHA() []byte { return c.lastConfigSHA } + +type ClientFactory struct { + workspace string + httpClientOpts HTTPClientOpts + adminToken string +} + +func NewClientFactoryForWorkspace(workspace string, httpClientOpts HTTPClientOpts, adminToken string) ClientFactory { + return ClientFactory{ + workspace: workspace, + httpClientOpts: httpClientOpts, + adminToken: adminToken, + } +} + +func (cf ClientFactory) CreateAdminAPIClient(ctx context.Context, address string) (Client, error) { + httpclient, err := MakeHTTPClient(&cf.httpClientOpts, cf.adminToken) + if err != nil { + return Client{}, err + } + return NewKongClientForWorkspace(ctx, address, cf.workspace, httpclient) +} diff --git a/internal/adminapi/kong.go b/internal/adminapi/kong.go index 40665cbb3c..67b35ec45a 100644 --- a/internal/adminapi/kong.go +++ b/internal/adminapi/kong.go @@ -8,8 +8,10 @@ import ( "fmt" "net/http" "os" + "strings" "github.com/kong/go-kong/kong" + "github.com/samber/lo" tlsutil "github.com/kong/kubernetes-ingress-controller/v2/internal/util/tls" ) @@ -67,8 +69,12 @@ type HTTPClientOpts struct { TLSClient TLSClientConfig } +const ( + headerNameAdminToken = "Kong-Admin-Token" +) + // MakeHTTPClient returns an HTTP client with the specified mTLS/headers configuration. -func MakeHTTPClient(opts *HTTPClientOpts) (*http.Client, error) { +func MakeHTTPClient(opts *HTTPClientOpts, kongAdminToken string) (*http.Client, error) { var tlsConfig tls.Config if opts.TLSSkipVerify { @@ -118,8 +124,21 @@ func MakeHTTPClient(opts *HTTPClientOpts) (*http.Client, error) { transport.TLSClientConfig = &tlsConfig return &http.Client{ Transport: &HeaderRoundTripper{ - headers: opts.Headers, + headers: prepareHeaders(opts.Headers, kongAdminToken), rt: transport, }, }, nil } + +func prepareHeaders(headers []string, kongAdminToken string) []string { + if kongAdminToken != "" { + contains := lo.ContainsBy(headers, func(header string) bool { + return strings.HasPrefix(header, headerNameAdminToken+":") + }) + + if !contains { + headers = append(headers, headerNameAdminToken+":"+kongAdminToken) + } + } + return headers +} diff --git a/internal/adminapi/kong_test.go b/internal/adminapi/kong_test.go index e38371f50e..7c907e6ce1 100644 --- a/internal/adminapi/kong_test.go +++ b/internal/adminapi/kong_test.go @@ -19,7 +19,6 @@ import ( "testing" "time" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -30,9 +29,7 @@ func TestMakeHTTPClientWithTLSOpts(t *testing.T) { var err error caPEM, certPEM, certPrivateKeyPEM, err = buildTLS(t) - if err != nil { - t.Errorf("Fail to build TLS certificates - %s", err.Error()) - } + require.NoError(t, err, "Fail to build TLS certificates") opts := HTTPClientOpts{ TLSSkipVerify: true, @@ -46,13 +43,19 @@ func TestMakeHTTPClientWithTLSOpts(t *testing.T) { }, } - httpclient, err := MakeHTTPClient(&opts) - require.NoError(t, err) - - assert.NotNil(t, httpclient) + t.Run("without kong admin token", func(t *testing.T) { + httpclient, err := MakeHTTPClient(&opts, "") + require.NoError(t, err) + require.NotNil(t, httpclient) + require.NoError(t, validate(t, httpclient, caPEM, certPEM, certPrivateKeyPEM, "")) + }) - err = validate(t, httpclient, caPEM, certPEM, certPrivateKeyPEM) - require.NoError(t, err) + t.Run("with kong admin token", func(t *testing.T) { + httpclient, err := MakeHTTPClient(&opts, "my-token") + require.NoError(t, err) + require.NotNil(t, httpclient) + require.NoError(t, validate(t, httpclient, caPEM, certPEM, certPrivateKeyPEM, "my-token")) + }) } func TestMakeHTTPClientWithTLSOptsAndFilePaths(t *testing.T) { @@ -62,9 +65,7 @@ func TestMakeHTTPClientWithTLSOptsAndFilePaths(t *testing.T) { var err error caPEM, certPEM, certPrivateKeyPEM, err = buildTLS(t) - if err != nil { - t.Errorf("Fail to build TLS certificates - %s", err.Error()) - } + require.NoError(t, err, "Fail to build TLS certificates") caFile, err := os.CreateTemp(os.TempDir(), "ca.crt") require.NoError(t, err) @@ -99,13 +100,19 @@ func TestMakeHTTPClientWithTLSOptsAndFilePaths(t *testing.T) { }, } - httpclient, err := MakeHTTPClient(&opts) - require.NoError(t, err) - - assert.NotNil(t, httpclient) + t.Run("without kong admin token", func(t *testing.T) { + httpclient, err := MakeHTTPClient(&opts, "") + require.NoError(t, err) + require.NotNil(t, httpclient) + require.NoError(t, validate(t, httpclient, caPEM, certPEM, certPrivateKeyPEM, "")) + }) - err = validate(t, httpclient, caPEM, certPEM, certPrivateKeyPEM) - require.NoError(t, err) + t.Run("with kong admin token", func(t *testing.T) { + httpclient, err := MakeHTTPClient(&opts, "my-token") + require.NoError(t, err) + require.NotNil(t, httpclient) + require.NoError(t, validate(t, httpclient, caPEM, certPEM, certPrivateKeyPEM, "my-token")) + }) } func buildTLS(t *testing.T) (caPEM *bytes.Buffer, certPEM *bytes.Buffer, certPrivateKeyPEM *bytes.Buffer, err error) { @@ -222,6 +229,7 @@ func validate(t *testing.T, caPEM *bytes.Buffer, certPEM *bytes.Buffer, certPrivateKeyPEM *bytes.Buffer, + kongAdminToken string, ) (err error) { serverCert, err := tls.X509KeyPair(certPEM.Bytes(), certPrivateKeyPEM.Bytes()) if err != nil { @@ -242,6 +250,23 @@ func validate(t *testing.T, successMessage := "connection successful" server := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if kongAdminToken != "" { + v, ok := r.Header[http.CanonicalHeaderKey(headerNameAdminToken)] + if !ok { + fmt.Fprintf(w, "%s header not found", headerNameAdminToken) + return + } + if len(v) != 1 { + fmt.Fprintf(w, "%s header expected to contain %s but found %v", + headerNameAdminToken, kongAdminToken, v) + return + } + if v[0] != kongAdminToken { + fmt.Fprintf(w, "%s header expected to contain %s but found %s", + headerNameAdminToken, kongAdminToken, v[0]) + return + } + } fmt.Fprintln(w, successMessage) })) server.TLS = serverTLSConf @@ -264,8 +289,7 @@ func validate(t *testing.T, body := strings.TrimSpace(string(data[:])) if body != successMessage { - t.Errorf("Invalid server response") - return err + return fmt.Errorf("invalid server response: %s", body) } return nil diff --git a/internal/controllers/configuration/kongadminapi_controller.go b/internal/controllers/configuration/kongadminapi_controller.go new file mode 100644 index 0000000000..da40d6949e --- /dev/null +++ b/internal/controllers/configuration/kongadminapi_controller.go @@ -0,0 +1,162 @@ +package configuration + +import ( + "context" + "time" + + "github.com/go-logr/logr" + "github.com/samber/lo" + discoveryv1 "k8s.io/api/discovery/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" +) + +// KongAdminAPIServiceReconciler reconciles Kong Admin API Service Endpointslices +// and notifies the provided notifier about those. +type KongAdminAPIServiceReconciler struct { + client.Client + + // ServiceNN is the service NamespacedName to watch EndpointSlices for. + ServiceNN types.NamespacedName + Log logr.Logger + CacheSyncTimeout time.Duration + // EndpointsNotifier is used to notify about Admin API endpoints changes. + // We're going to call this only with endpoints when they change. + EndpointsNotifier EndpointsNotifier + + Cache CacheT +} + +type CacheT map[types.NamespacedName]sets.Set[string] + +type EndpointsNotifier interface { + Notify(addresses []string) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *KongAdminAPIServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { + c, err := controller.New("KongAdminAPIEndpoints", mgr, controller.Options{ + Reconciler: r, + LogConstructor: func(_ *reconcile.Request) logr.Logger { + return r.Log + }, + CacheSyncTimeout: r.CacheSyncTimeout, + }) + if err != nil { + return err + } + + if r.Cache == nil { + r.Cache = make(CacheT) + } + + return c.Watch( + &source.Kind{Type: &discoveryv1.EndpointSlice{}}, + &handler.EnqueueRequestForObject{}, + predicate.NewPredicateFuncs(r.shouldReconcileEndpointSlice), + ) +} + +func (r *KongAdminAPIServiceReconciler) shouldReconcileEndpointSlice(obj client.Object) bool { + endpoints, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + return false + } + + if endpoints.Namespace != r.ServiceNN.Namespace { + return false + } + + if !lo.ContainsBy(endpoints.OwnerReferences, func(ref metav1.OwnerReference) bool { + return ref.Kind == "Service" && ref.Name == r.ServiceNN.Name + }) { + return false + } + + return true +} + +//+kubebuilder:rbac:groups="discovery.k8s.io",resources=endpointslices,verbs=get;list;watch + +// Reconcile processes the watched objects. +func (r *KongAdminAPIServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + var endpoints discoveryv1.EndpointSlice + if err := r.Get(ctx, req.NamespacedName, &endpoints); err != nil { + if apierrors.IsNotFound(err) { + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + r.Log.Info("reconciling EndpointSlice", "namespace", req.Namespace, "name", req.Name) + + nn := types.NamespacedName{ + Namespace: req.Namespace, + Name: req.Name, + } + + if !endpoints.DeletionTimestamp.IsZero() { + r.Log.V(util.DebugLevel).Info("EndpointSlice is being deleted", + "type", "EndpointSlice", "namespace", req.Namespace, "name", req.Name, + ) + + // If we have an entry for this EndpointSlice... + if _, ok := r.Cache[nn]; ok { + // ... remove it and notify about the change. + delete(r.Cache, nn) + r.notify() + } + + return ctrl.Result{}, nil + } + + cached, ok := r.Cache[nn] + if !ok { + // If we don't have an entry for this EndpointSlice then save it and notify + // about the change. + r.Cache[nn] = adminapi.AddressesFromEndpointSlice(endpoints) + r.notify() + return ctrl.Result{}, nil + } + + // We do have an entry for this EndpointSlice. + // Let's check if it's the same that we're already aware of... + addresses := adminapi.AddressesFromEndpointSlice(endpoints) + if cached.Equal(addresses) { + // No change, don't notify + return ctrl.Result{}, nil + } + + // ... it's not the same. Store it and notify. + r.Cache[nn] = addresses + r.notify() + + return ctrl.Result{}, nil +} + +func (r *KongAdminAPIServiceReconciler) notify() { + addresses := addressesFromAddressesMap(r.Cache) + + r.Log.V(util.DebugLevel). + Info("notifying about newly detected Admin API addresses", "addresses", addresses) + r.EndpointsNotifier.Notify(addresses) +} + +func addressesFromAddressesMap(cache CacheT) []string { + addresses := []string{} + for _, v := range cache { + addresses = append(addresses, v.UnsortedList()...) + } + return addresses +} diff --git a/internal/dataplane/client.go b/internal/dataplane/client.go index 41765bc343..3fba32a05a 100644 --- a/internal/dataplane/client.go +++ b/internal/dataplane/client.go @@ -29,4 +29,8 @@ type Client interface { // Update the data-plane by parsing the current configuring and applying // it to the backend API. Update(ctx context.Context) error + + // Shutdown shuts down the client, all the synchronization loops and all its + // internal data structures. + Shutdown(ctx context.Context) error } diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 999c2f2fae..e921c58bdd 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -126,11 +126,26 @@ type KongClient struct { // SHAs is a slice is configuration hashes send in last batch send. SHAs []string + + // adminAPIClientFactory is a factory used for creating Admin API clients. + adminAPIClientFactory ClientFactory + + // adminAPIAddressNotifyChan is used for notifications that contain Admin API + // endpoints list that should be used for configuring the dataplane. + adminAPIAddressNotifyChan chan []string + + close chan struct{} + onceClose sync.Once +} + +type ClientFactory interface { + CreateAdminAPIClient(ctx context.Context, address string) (adminapi.Client, error) } // NewKongClient provides a new KongClient object after connecting to the // data-plane API and verifying integrity. func NewKongClient( + ctx context.Context, logger logrus.FieldLogger, timeout time.Duration, ingressClass string, @@ -140,23 +155,29 @@ func NewKongClient( kongConfig sendconfig.Kong, eventRecorder record.EventRecorder, dbMode string, + kongClientFactory ClientFactory, ) (*KongClient, error) { // build the client object cache := store.NewCacheStores() c := &KongClient{ - logger: logger, - ingressClass: ingressClass, - enableReverseSync: enableReverseSync, - skipCACertificates: skipCACertificates, - requestTimeout: timeout, - diagnostic: diagnostic, - prometheusMetrics: metrics.NewCtrlFuncMetrics(), - cache: &cache, - kongConfig: kongConfig, - eventRecorder: eventRecorder, - dbmode: dbMode, + logger: logger, + ingressClass: ingressClass, + enableReverseSync: enableReverseSync, + skipCACertificates: skipCACertificates, + requestTimeout: timeout, + diagnostic: diagnostic, + prometheusMetrics: metrics.NewCtrlFuncMetrics(), + cache: &cache, + kongConfig: kongConfig, + eventRecorder: eventRecorder, + dbmode: dbMode, + adminAPIClientFactory: kongClientFactory, + adminAPIAddressNotifyChan: make(chan []string), + close: make(chan struct{}), } + go c.adminAPIAddressNotifyLoop(ctx) + return c, nil } @@ -368,6 +389,14 @@ func (c *KongClient) DBMode() string { return c.dbmode } +// Shutdown shuts down the internal loops and synchronization workers. +func (c *KongClient) Shutdown(ctx context.Context) error { + c.onceClose.Do(func() { + close(c.close) + }) + return nil +} + // Update parses the Cache present in the client and converts current // Kubernetes state into Kong objects and state, and then ships the // resulting configuration to the data-plane (Kong Admin API). @@ -438,6 +467,7 @@ func (c *KongClient) Update(ctx context.Context) error { func (c *KongClient) sendOutToClients( ctx context.Context, s *kongstate.KongState, formatVersion string, config sendconfig.Config, ) ([]string, error) { + c.logger.Debugf("sending configuration to %d clients", len(c.kongConfig.Clients)) shas, err := iter.MapErr(c.kongConfig.Clients, func(client *adminapi.Client) (string, error) { return c.sendToClient(ctx, client, s, formatVersion, config) }, @@ -499,6 +529,97 @@ func (c *KongClient) sendToClient( return string(newConfigSHA), nil } +// adminAPIAddressNotifyLoop is an inner loop listening on notifyChan which are received via +// Notify() calls. Each time it receives on notifyChan tt will take the provided +// list of addresses and update the internally held list of clients such that: +// - the internal list of kong clients contains only the provided addresses +// - if a client for a provided address already exists it's not recreated again +// (hence no external calls are made to check the provided endpoint if there +// exists a client already using it) +// - client that do not exist in the provided address list are removed if they +// are present in the current state +// +// This function whill acquire the internal lock to prevent the modification of +// internal clients list. +func (c *KongClient) adminAPIAddressNotifyLoop(ctx context.Context) { + for { + select { + case <-c.close: + c.adminAPIAddressNotifyChan = nil + return + + case addresses := <-c.adminAPIAddressNotifyChan: + // This call will only log errors e.g. during creation of new clients. + // If need be we might consider propagating those errors up the stack. + c.adjustKongClients(ctx, addresses) + } + } +} + +// adjustKongClients adjusts internally stored clients slice based on the provided +// addresses slice. It consults BaseRootURLs of already stored clients with each +// of the addreses and creates only those clients that we don't have. +func (c *KongClient) adjustKongClients(ctx context.Context, addresses []string) { + c.lock.Lock() + defer c.lock.Unlock() + + toAdd := lo.Filter(addresses, func(addr string, _ int) bool { + // If we already have a client with a provided address then great, no need + // to do anything. + + // If we don't have a client with new address then filter it and add + // a client for this address. + return !lo.ContainsBy(c.kongConfig.Clients, func(cl adminapi.Client) bool { + return addr == cl.BaseRootURL() + }) + }) + + var idxToRemove []int + for i, cl := range c.kongConfig.Clients { + // If the new address set contains a client that we already have then + // good, no need to do anything for it. + if lo.Contains(addresses, cl.BaseRootURL()) { + continue + } + // If the new address set does not contain an address that we already + // have then remove it. + idxToRemove = append(idxToRemove, i) + } + + for i := len(idxToRemove) - 1; i >= 0; i-- { + idx := idxToRemove[i] + c.kongConfig.Clients = append(c.kongConfig.Clients[:idx], c.kongConfig.Clients[idx+1:]...) + } + + for _, addr := range toAdd { + client, err := c.adminAPIClientFactory.CreateAdminAPIClient(ctx, addr) + if err != nil { + c.logger.WithError(err).Errorf("failed to create a client for %s", addr) + continue + } + + c.kongConfig.Clients = append(c.kongConfig.Clients, client) + } +} + +// Notify receives a list of addresses that KongClient should use from now on as +// a list of Kong Admin API endpoints. +func (c *KongClient) Notify(addresses []string) { + // Ensure here that we're not closed. + select { + case <-c.close: + return + default: + } + + // And here also listen on c.close to allow the notification to be interrupted + // by Shutdown(). + select { + case <-c.close: + case c.adminAPIAddressNotifyChan <- addresses: + } +} + // ----------------------------------------------------------------------------- // Dataplane Client - Kong - Private // ----------------------------------------------------------------------------- diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 6ce52dd969..1a29b87fd7 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -1,8 +1,18 @@ package dataplane import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" "testing" + "time" + "github.com/go-logr/logr" + "github.com/kong/go-kong/kong" + "github.com/samber/lo" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" @@ -10,7 +20,10 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "sigs.k8s.io/controller-runtime/pkg/client" + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/failures" + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" ) func TestUniqueObjects(t *testing.T) { @@ -105,3 +118,179 @@ var ( Kind: "Ingress", } ) + +// clientFactoryWithExpected implements ClientFactory interface and can be used +// in tests to assert which clients have been created and signal failure if: +// - client for an unexpected address gets created +// - client which already got created was tried to be created second time. +type clientFactoryWithExpected struct { + expected map[string]bool + t *testing.T +} + +func (cf clientFactoryWithExpected) CreateAdminAPIClient(ctx context.Context, address string) (adminapi.Client, error) { + num, ok := cf.expected[address] + if !ok { + cf.t.Errorf("got %s which was unexpected", address) + return adminapi.Client{}, fmt.Errorf("got %s which was unexpected", address) + } + if !num { + cf.t.Errorf("got %s more than once", address) + return adminapi.Client{}, fmt.Errorf("got %s more than once", address) + } + cf.expected[address] = false + + kongClient, err := kong.NewTestClient(lo.ToPtr(address), &http.Client{}) + if err != nil { + return adminapi.Client{}, err + } + + return adminapi.NewClient(kongClient), nil +} + +func TestClientAddressesNotifications(t *testing.T) { + var ( + ctx = context.Background() + logger = logrus.New() + expected = map[string]bool{} + serverCalls int32 + ) + + const numberOfServers = 2 + + createTestServer := func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // This test server serves as kong Admin API checking that we only get + // as many calls as new clients requests. + // That said: when we have 1 client with url1 and we receive a notification + // with url1 and url2 we should only create the second client with + // url2 and leave the existing one (for url1) in place and reuse it. + + atomic.AddInt32(&serverCalls, 1) + n := int(atomic.LoadInt32(&serverCalls)) + + if n > numberOfServers { + t.Errorf("clients should only call out to the server %d times, but we received %d requests", + numberOfServers, n, + ) + } + })) + } + + srv := createTestServer() + defer srv.Close() + expected[srv.URL] = true + + srv2 := createTestServer() + defer srv2.Close() + expected[srv2.URL] = true + + client, err := NewKongClient(ctx, logger, time.Second, "", false, true, util.ConfigDumpDiagnostic{}, + sendconfig.New(ctx, logr.Discard(), []adminapi.Client{}, + sendconfig.Config{ + InMemory: true, + Concurrency: 10, + }, + ), + nil, + "off", + clientFactoryWithExpected{ + expected: expected, + t: t, + }, + ) + require.NoError(t, err) + + requireClientsCountEventually := func(t *testing.T, c *KongClient, n int, args ...any) { + require.Eventually(t, func() bool { + c.lock.RLock() + defer c.lock.RUnlock() + return len(c.kongConfig.Clients) == n + }, time.Second, time.Millisecond, args..., + ) + } + + requireClientsCountEventually(t, client, 0, + "initially there should be 0 clients") + + client.Notify([]string{srv.URL}) + requireClientsCountEventually(t, client, 1, + "after notifying about a new address we should get 1 client eventually") + + client.Notify([]string{srv.URL}) + requireClientsCountEventually(t, client, 1, + "after notifying the same address there's no update in clients") + + client.Notify([]string{srv.URL, srv2.URL}) + requireClientsCountEventually(t, client, 2, + "after notifying new address set including the old already existing one we get both the old and the new") + + client.Notify([]string{srv.URL, srv2.URL}) + requireClientsCountEventually(t, client, 2, + "notifying again with the same set of URLs should not change the existing URLs") + + client.Notify([]string{srv.URL}) + requireClientsCountEventually(t, client, 1, + "notifying again with just one URL should decrease the set of URLs to just this one") + + client.Notify([]string{}) + requireClientsCountEventually(t, client, 0) + + // We could test here notifying about srv.URL and srv2.URL again but there's + // no data structure in the client that could notify us about a removal of + // a client which we could use here. + + require.NoError(t, client.Shutdown(context.Background()), "closing shouldn't return an error") + require.NoError(t, client.Shutdown(context.Background()), "closing second time shouldn't return an error") + + require.NotPanics(t, func() { client.Notify([]string{}) }, "notifying about new clients after client has been shut down shouldn't panic") +} + +func TestClientAdjustInternalClientsAfterNotification(t *testing.T) { + var ( + ctx = context.Background() + logger = logrus.New() + ) + + cf := &clientFactoryWithExpected{ + t: t, + } + client, err := NewKongClient(ctx, logger, time.Second, "", false, true, util.ConfigDumpDiagnostic{}, + sendconfig.New(ctx, logr.Discard(), []adminapi.Client{}, + sendconfig.Config{ + InMemory: true, + Concurrency: 10, + }, + ), + nil, + "off", + cf, + ) + require.NoError(t, err) + require.NotNil(t, client) + + t.Run("2 new clients", func(t *testing.T) { + // Change expected addresses + cf.expected = map[string]bool{"localhost:8080": true, "localhost:8081": true} + + // there are 2 addresses contained in the notification of which 2 are new + // and client creator should be called exactly 2 times + client.adjustKongClients(ctx, []string{"localhost:8080", "localhost:8081"}) + }) + + t.Run("1 addresses, no new client", func(t *testing.T) { + // Change expected addresses + cf.expected = map[string]bool{"localhost:8080": true} + // there is address contained in the notification but a client for that + // address already exists, client creator should not be called + client.adjustKongClients(ctx, []string{"localhost:8080"}) + }) + + t.Run("2 addresses, 1 new client", func(t *testing.T) { + // Change expected addresses + cf.expected = map[string]bool{"localhost:8080": true, "localhost:8081": true} + // there are 2 addresses contained in the notification but only 1 is new + // hence the client creator should be called only once + client.adjustKongClients(ctx, []string{"localhost:8080", "localhost:8081"}) + }) +} diff --git a/internal/dataplane/synchronizer.go b/internal/dataplane/synchronizer.go index fca4013fbb..b3319fb2cd 100644 --- a/internal/dataplane/synchronizer.go +++ b/internal/dataplane/synchronizer.go @@ -174,6 +174,10 @@ func (p *Synchronizer) startUpdateServer(ctx context.Context) { } p.syncTicker.Stop() + if err := p.dataplaneClient.Shutdown(ctx); err != nil { + p.logger.Error(err, "failed to shut down the dataplane client") + } + p.lock.Lock() defer p.lock.Unlock() p.isServerRunning = false diff --git a/internal/dataplane/synchronizer_test.go b/internal/dataplane/synchronizer_test.go index 45eb5eb084..d05f344147 100644 --- a/internal/dataplane/synchronizer_test.go +++ b/internal/dataplane/synchronizer_test.go @@ -102,6 +102,10 @@ func (c *fakeDataplaneClient) Update(ctx context.Context) error { return nil } +func (c *fakeDataplaneClient) Shutdown(ctx context.Context) error { + return nil +} + func (c *fakeDataplaneClient) totalUpdates() int { c.lock.RLock() defer c.lock.RUnlock() diff --git a/internal/manager/config.go b/internal/manager/config.go index 092dd0d2af..0bf1b26559 100644 --- a/internal/manager/config.go +++ b/internal/manager/config.go @@ -49,7 +49,8 @@ type Config struct { APIServerBurst int MetricsAddr string ProbeAddr string - KongAdminURL []string + KongAdminURLs []string + KongAdminSvc types.NamespacedName ProxySyncSeconds float32 ProxyTimeoutSeconds float32 @@ -142,15 +143,19 @@ func (c *Config) FlagSet() *pflag.FlagSet { flagSet.StringVar(&c.KongAdminAPIConfig.TLSClient.Cert, "kong-admin-tls-client-cert", "", "mTLS client certificate for authentication.") flagSet.StringVar(&c.KongAdminAPIConfig.TLSClient.Key, "kong-admin-tls-client-key", "", "mTLS client key for authentication.") + // Kong Admin API configuration + flagSet.StringSliceVar(&c.KongAdminURLs, "kong-admin-url", []string{"http://localhost:8001"}, + `Kong Admin URL(s) to connect to in the format "protocol://address:port". `+ + `More than 1 URL can be provided, in such case the flag should be used multiple times or a corresponding env variable should use comma delimited addresses.`) + flagSet.Var(NewValidatedValue(&c.KongAdminSvc, namespacedNameFromFlagValue), "kong-admin-svc", + `Kong Admin API Service namespaced name in "namespace/name" format, to use for Kong Gateway service discovery.`) + // Kong Proxy and Proxy Cache configurations flagSet.StringVar(&c.APIServerHost, "apiserver-host", "", `The Kubernetes API server URL. If not set, the controller will use cluster config discovery.`) flagSet.IntVar(&c.APIServerQPS, "apiserver-qps", 100, "The Kubernetes API RateLimiter maximum queries per second") flagSet.IntVar(&c.APIServerBurst, "apiserver-burst", 300, "The Kubernetes API RateLimiter maximum burst queries per second") flagSet.StringVar(&c.MetricsAddr, "metrics-bind-address", fmt.Sprintf(":%v", MetricsPort), "The address the metric endpoint binds to.") flagSet.StringVar(&c.ProbeAddr, "health-probe-bind-address", fmt.Sprintf(":%v", HealthzPort), "The address the probe endpoint binds to.") - flagSet.StringSliceVar(&c.KongAdminURL, "kong-admin-url", []string{"http://localhost:8001"}, - `Kong Admin URL(s) to connect to in the format "protocol://address:port". `+ - `More than 1 URL can be provided, in such case the flag should be used multiple times or a corresponding env variable should use comma delimited addresses.`) flagSet.Float32Var(&c.ProxySyncSeconds, "proxy-sync-seconds", dataplane.DefaultSyncSeconds, "Define the rate (in seconds) in which configuration updates will be applied to the Kong Admin API.", ) diff --git a/internal/manager/config_validation.go b/internal/manager/config_validation.go index 05ac9d5c9d..12acd2b1ae 100644 --- a/internal/manager/config_validation.go +++ b/internal/manager/config_validation.go @@ -37,6 +37,12 @@ func gatewayAPIControllerNameFromFlagValue(flagValue string) (string, error) { // Validate validates the config. It should be used to validate the config variables' interdependencies. // When a single variable is to be validated, *FromFlagValue function should be implemented. func (c *Config) Validate() error { + if c.flagSet != nil { + if c.flagSet.Changed("kong-admin-svc") && c.flagSet.Changed("kong-admin-url") { + return fmt.Errorf("can't set both --kong-admin-svc and --kong-admin-url") + } + } + if err := c.validateKonnect(); err != nil { return fmt.Errorf("invalid konnect configuration: %w", err) } diff --git a/internal/manager/controllerdef.go b/internal/manager/controllerdef.go index 92d08ae2ee..06442e9898 100644 --- a/internal/manager/controllerdef.go +++ b/internal/manager/controllerdef.go @@ -63,6 +63,7 @@ func setupControllers( kubernetesStatusQueue *status.Queue, c *Config, featureGates map[string]bool, + kongAdminAPIEndpointsNotifier configuration.EndpointsNotifier, ) ([]ControllerDef, error) { restMapper := mgr.GetClient().RESTMapper() @@ -84,6 +85,19 @@ func setupControllers( referenceIndexers := ctrlref.NewCacheIndexers() controllers := []ControllerDef{ + // --------------------------------------------------------------------------- + // Kong Gateway Admin API Service discovery + // --------------------------------------------------------------------------- + { + Enabled: c.KongAdminSvc.Name != "", + Controller: &configuration.KongAdminAPIServiceReconciler{ + Client: mgr.GetClient(), + ServiceNN: c.KongAdminSvc, + Log: ctrl.Log.WithName("controllers").WithName("KongAdminAPIService"), + CacheSyncTimeout: c.CacheSyncTimeout, + EndpointsNotifier: kongAdminAPIEndpointsNotifier, + }, + }, // --------------------------------------------------------------------------- // Core API Controllers // --------------------------------------------------------------------------- diff --git a/internal/manager/run.go b/internal/manager/run.go index a6e776a90b..8c2c38f902 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -19,6 +19,7 @@ import ( gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/sendconfig" @@ -58,19 +59,19 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d return fmt.Errorf("get kubeconfig from file %q: %w", c.KubeconfigPath, err) } setupLog.Info("getting the kong admin api client configuration") - if c.KongAdminToken != "" { - c.KongAdminAPIConfig.Headers = append(c.KongAdminAPIConfig.Headers, "kong-admin-token:"+c.KongAdminToken) - } - kongClients, err := getKongClients(ctx, c) + kongClients, err := c.getKongClients(ctx) if err != nil { return fmt.Errorf("unable to build kong api client(s): %w", err) } + // ------------------------------------------------------------------------- + // Get Kong configuration root(s) to validate them and extract Kong's version. kongRoots, err := kongconfig.GetRoots(ctx, setupLog, c.KongAdminInitializationRetries, c.KongAdminInitializationRetryDelay, kongClients) if err != nil { return fmt.Errorf("could not retrieve Kong admin root(s): %w", err) } + dbMode, v, err := kongconfig.ValidateRoots(kongRoots, c.SkipCACertificates) if err != nil { return fmt.Errorf("could not validate Kong admin root(s) configuration: %w", err) @@ -91,6 +92,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d if err != nil { return fmt.Errorf("unable to setup controller options: %w", err) } + mgr, err := ctrl.NewManager(kubeconfig, controllerOpts) if err != nil { return fmt.Errorf("unable to start controller manager: %w", err) @@ -104,6 +106,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d setupLog.Info("Initializing Dataplane Client") eventRecorder := mgr.GetEventRecorderFor(KongClientEventRecorderComponentName) dataplaneClient, err := dataplane.NewKongClient( + ctx, deprecatedLogger, time.Duration(c.ProxyTimeoutSeconds*float32(time.Second)), c.IngressClassName, @@ -113,6 +116,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d kongConfig, eventRecorder, dbMode, + adminapi.NewClientFactoryForWorkspace(c.KongWorkspace, c.KongAdminAPIConfig, c.KongAdminToken), ) if err != nil { return fmt.Errorf("failed to initialize kong data-plane client: %w", err) @@ -146,7 +150,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d setupLog.Info("Starting Enabled Controllers") controllers, err := setupControllers(mgr, dataplaneClient, - dataplaneAddressFinder, udpDataplaneAddressFinder, kubernetesStatusQueue, c, featureGates) + dataplaneAddressFinder, udpDataplaneAddressFinder, kubernetesStatusQueue, c, featureGates, dataplaneClient) if err != nil { return fmt.Errorf("unable to setup controller as expected %w", err) } diff --git a/internal/manager/setup.go b/internal/manager/setup.go index d3b274064d..e824e46c9d 100644 --- a/internal/manager/setup.go +++ b/internal/manager/setup.go @@ -7,6 +7,7 @@ import ( "io" "time" + "github.com/avast/retry-go/v4" "github.com/bombsimon/logrusr/v2" "github.com/go-logr/logr" "github.com/kong/deck/cprint" @@ -159,7 +160,7 @@ func setupAdmissionServer( return nil } - kongclients, err := getKongClients(ctx, managerConfig) + kongclients, err := managerConfig.getKongClients(ctx) if err != nil { return err } @@ -259,21 +260,67 @@ func generateAddressFinderGetter(mgrc client.Client, publishServiceNn types.Name } } -// getKongClients returns the kong clients. -func getKongClients(ctx context.Context, cfg *Config) ([]adminapi.Client, error) { - httpclient, err := adminapi.MakeHTTPClient(&cfg.KongAdminAPIConfig) +// getKongClients returns the kong clients given the config. +// When a list of URLs is provided via --kong-admin-url then those are used +// to create the list of clients. +// When a headless service name is provided via --kong-admin-svc then that is used +// to obtain a list of endpoints via EndpointSlice lookup in kubernetes API. +func (c *Config) getKongClients(ctx context.Context) ([]adminapi.Client, error) { + httpclient, err := adminapi.MakeHTTPClient(&c.KongAdminAPIConfig, c.KongAdminToken) if err != nil { return nil, err } - clients := make([]adminapi.Client, 0, len(cfg.KongAdminURL)) - for _, url := range cfg.KongAdminURL { - client, err := adminapi.NewKongClientForWorkspace(ctx, url, cfg.KongWorkspace, httpclient) + var addresses []string + + // If kong-admin-svc flag has been specified then use it to get the list + // of Kong Admin API endpoints. + if c.KongAdminSvc.Name != "" { + kubeClient, err := c.GetKubeClient() if err != nil { return nil, err } - clients = append(clients, client) + + // Retry this as we may encounter an error of getting 0 addresses, + // which can mean that Kong instances meant to be configured by this controller + // are not yet ready. + // If we end up in a situation where none of them are ready then bail + // because we have more code that relies on the configuration of Kong + // instance and without an address and there's no way to initialize the + // configuration validation and sending code. + err = retry.Do(func() error { + s, err := adminapi.GetURLsForService(ctx, kubeClient, c.KongAdminSvc) + if err != nil { + return err + } + if s.Len() == 0 { + return fmt.Errorf("no endpoints for kong admin service: %q", c.KongAdminSvc) + } + addresses = s.UnsortedList() + return nil + }, + retry.Attempts(60), + retry.DelayType(retry.FixedDelay), + retry.Delay(time.Second), + retry.OnRetry(func(_ uint, err error) { + logrus.New().WithError(err).Error("failed to create kong client(s)") + }), + ) + if err != nil { + return nil, err + } + } else { + // Otherwise fallback to the list of kong admin URLs. + addresses = c.KongAdminURLs } + clients := make([]adminapi.Client, 0, len(addresses)) + for _, address := range addresses { + client, err := adminapi.NewKongClientForWorkspace(ctx, address, c.KongWorkspace, httpclient) + if err != nil { + return nil, err + } + clients = append(clients, client) + } return clients, nil } diff --git a/internal/manager/utils/kongconfig/root.go b/internal/manager/utils/kongconfig/root.go index 30a6b7977e..7590677406 100644 --- a/internal/manager/utils/kongconfig/root.go +++ b/internal/manager/utils/kongconfig/root.go @@ -27,9 +27,9 @@ func ValidateRoots(roots []Root, skipCACerts bool) (string, kong.Version, error) } uniqs := lo.UniqBy(roots, getRootKeyFunc(skipCACerts)) - if len(uniqs) > 1 { + if len(uniqs) != 1 { return "", kong.Version{}, - fmt.Errorf("there should only be one dbmode:version combination across configured kong instances while there are: %v", uniqs) + fmt.Errorf("there should only be one dbmode:version combination across configured kong instances while there are (%d): %v", len(uniqs), uniqs) } dbMode, err := DBModeFromRoot(uniqs[0]) diff --git a/test/e2e/all_in_one_test.go b/test/e2e/all_in_one_test.go index d11e1a7bf2..8fc09ade00 100644 --- a/test/e2e/all_in_one_test.go +++ b/test/e2e/all_in_one_test.go @@ -6,12 +6,14 @@ package e2e import ( "context" "fmt" + "io" "net/http" "os" "testing" "time" "github.com/kong/kubernetes-testing-framework/pkg/clusters/addons/kong" + "github.com/samber/lo" "github.com/stretchr/testify/require" autoscalingv1 "k8s.io/api/autoscaling/v1" corev1 "k8s.io/api/core/v1" @@ -377,3 +379,51 @@ func TestDeployAllInOneEnterprisePostgres(t *testing.T) { verifyEnterprise(ctx, t, env, adminPassword) verifyEnterpriseWithPostgres(ctx, t, env, adminPassword) } + +func TestDeployAllInOneDBLESSMultiGW(t *testing.T) { + t.Parallel() + + const ( + manifestFileName = "all-in-one-dbless-multi-gw.yaml" + manifestFilePath = "../../deploy/single/" + manifestFileName + ) + + t.Logf("configuring %s manifest test", manifestFileName) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Log("building test cluster and environment") + builder, err := getEnvironmentBuilder(ctx) + require.NoError(t, err) + env, err := builder.Build(ctx) + require.NoError(t, err) + + defer func() { + helpers.TeardownCluster(ctx, t, env.Cluster()) + }() + + t.Log("deploying kong components") + f, err := os.Open(manifestFilePath) + require.NoError(t, err) + defer f.Close() + var manifest io.Reader = f + + manifest, err = patchControllerImageHelper(manifest, manifestFilePath) + require.NoError(t, err) + deployment := deployKong(ctx, t, env, manifest) + + t.Log("running ingress tests to verify all-in-one deployed ingress controller and proxy are functional") + deployIngress(ctx, t, env) + verifyIngress(ctx, t, env) + + gatewayDeployment, err := env.Cluster().Client().AppsV1().Deployments(deployment.Namespace).Get(ctx, "proxy-kong", metav1.GetOptions{}) + require.NoError(t, err) + gatewayDeployment.Spec.Replicas = lo.ToPtr(int32(3)) + _, err = env.Cluster().Client().AppsV1().Deployments(deployment.Namespace).Update(ctx, gatewayDeployment, metav1.UpdateOptions{}) + require.NoError(t, err) + + t.Log("confirming that routes are stil served after scaling out") + verifyIngress(ctx, t, env) + verifyIngress(ctx, t, env) + verifyIngress(ctx, t, env) +} diff --git a/test/e2e/helpers_test.go b/test/e2e/helpers_test.go index f176ea6257..b2ae91bd72 100644 --- a/test/e2e/helpers_test.go +++ b/test/e2e/helpers_test.go @@ -26,6 +26,7 @@ import ( "github.com/kong/kubernetes-testing-framework/pkg/clusters/types/kind" "github.com/kong/kubernetes-testing-framework/pkg/environments" "github.com/kong/kubernetes-testing-framework/pkg/utils/kubernetes/generators" + "github.com/samber/lo" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -166,6 +167,8 @@ func createGKEBuilder() (*environments.Builder, error) { } func deployKong(ctx context.Context, t *testing.T, env environments.Environment, manifest io.Reader, additionalSecrets ...*corev1.Secret) *appsv1.Deployment { + t.Helper() + t.Log("waiting for testing environment to be ready") require.NoError(t, <-env.WaitForReady(ctx)) @@ -199,11 +202,23 @@ func deployKong(ctx context.Context, t *testing.T, env environments.Environment, return false } return deployment.Status.ReadyReplicas == *deployment.Spec.Replicas - }, kongComponentWait, time.Second) + }, kongComponentWait, time.Second, + func() string { + if deployment == nil { + return "" + } + return fmt.Sprintf( + "deployment %s: ready replicas %d, spec replicas: %d", + deployment.Name, deployment.Status.ReadyReplicas, *deployment.Spec.Replicas, + ) + }(), + ) return deployment } func deployIngress(ctx context.Context, t *testing.T, env environments.Environment) { + t.Helper() + c, err := clientset.NewForConfig(env.Cluster().Config()) assert.NoError(t, err) t.Log("deploying an HTTP service to test the ingress controller and proxy") @@ -217,17 +232,17 @@ func deployIngress(ctx context.Context, t *testing.T, env environments.Environme _, err = env.Cluster().Client().CoreV1().Services(corev1.NamespaceDefault).Create(ctx, service, metav1.CreateOptions{}) require.NoError(t, err) - getString := "GET" + kongIngressName := uuid.NewString() king := &kongv1.KongIngress{ ObjectMeta: metav1.ObjectMeta{ - Name: "testki", + Name: kongIngressName, Namespace: corev1.NamespaceDefault, Annotations: map[string]string{ annotations.IngressClassKey: ingressClass, }, }, Route: &kongv1.KongIngressRoute{ - Methods: []*string{&getString}, + Methods: []*string{lo.ToPtr("GET")}, }, } _, err = c.ConfigurationV1().KongIngresses(corev1.NamespaceDefault).Create(ctx, king, metav1.CreateOptions{}) @@ -238,12 +253,14 @@ func deployIngress(ctx context.Context, t *testing.T, env environments.Environme ingress := generators.NewIngressForServiceWithClusterVersion(kubernetesVersion, "/httpbin", map[string]string{ annotations.IngressClassKey: ingressClass, "konghq.com/strip-path": "true", - "konghq.com/override": "testki", + "konghq.com/override": kongIngressName, }, service) require.NoError(t, clusters.DeployIngress(ctx, env.Cluster(), corev1.NamespaceDefault, ingress)) } func verifyIngress(ctx context.Context, t *testing.T, env environments.Environment) { + t.Helper() + t.Log("finding the kong proxy service ip") proxyIP := getKongProxyIP(ctx, t, env) @@ -275,13 +292,15 @@ func verifyIngress(ctx context.Context, t *testing.T, env environments.Environme } defer resp.Body.Close() return resp.StatusCode == http.StatusNotFound - }, ingressWait, time.Second) + }, ingressWait, 100*time.Millisecond) } // verifyEnterprise performs some basic tests of the Kong Admin API in the provided // environment to ensure that the Admin API that responds is in fact the enterprise // version of Kong. func verifyEnterprise(ctx context.Context, t *testing.T, env environments.Environment, adminPassword string) { + t.Helper() + t.Log("finding the ip address for the admin API") service, err := env.Cluster().Client().CoreV1().Services(namespace).Get(ctx, adminServiceName, metav1.GetOptions{}) require.NoError(t, err) @@ -333,6 +352,8 @@ func verifyEnterprise(ctx context.Context, t *testing.T, env environments.Enviro } func verifyEnterpriseWithPostgres(ctx context.Context, t *testing.T, env environments.Environment, adminPassword string) { + t.Helper() + t.Log("finding the ip address for the admin API") service, err := env.Cluster().Client().CoreV1().Services(namespace).Get(ctx, adminServiceName, metav1.GetOptions{}) require.NoError(t, err) @@ -370,6 +391,8 @@ func verifyPostgres(ctx context.Context, t *testing.T, env environments.Environm // killKong kills the Kong container in a given Pod and returns when it has restarted. func killKong(ctx context.Context, t *testing.T, env environments.Environment, pod *corev1.Pod) { + t.Helper() + var orig, after int32 for _, status := range pod.Status.ContainerStatuses { if status.Name == "proxy" { @@ -418,6 +441,8 @@ func buildImageLoadAddons(images ...string) []clusters.Addon { // `kong-enterprise-edition-docker` for kong enterprise image // from env TEST_KONG_PULL_USERNAME and TEST_KONG_PULL_PASSWORD. func createKongImagePullSecret(ctx context.Context, t *testing.T, env environments.Environment) { + t.Helper() + if kongImagePullUsername == "" || kongImagePullPassword == "" { return } @@ -464,6 +489,8 @@ func getEnvValueInContainer(container *corev1.Container, name string) string { // getTemporaryKubeconfig dumps an environment's kubeconfig to a temporary file. func getTemporaryKubeconfig(t *testing.T, env environments.Environment) string { + t.Helper() + t.Log("creating a tempfile for kubeconfig") kubeconfig, err := generators.NewKubeConfigForRestConfig(env.Name(), env.Cluster().Config()) require.NoError(t, err) @@ -483,6 +510,8 @@ func getTemporaryKubeconfig(t *testing.T, env environments.Environment) string { } func runOnlyOnKindClusters(t *testing.T) { + t.Helper() + existingClusterIsKind := strings.Split(existingCluster, ":")[0] == string(kind.KindClusterType) clusterProviderIsKind := clusterProvider == "" || clusterProvider == string(kind.KindClusterType) diff --git a/test/e2e/utils_test.go b/test/e2e/utils_test.go index 5a005a0ff7..a1d3d8e8dc 100644 --- a/test/e2e/utils_test.go +++ b/test/e2e/utils_test.go @@ -108,32 +108,19 @@ func exposeAdminAPI(ctx context.Context, t *testing.T, env environments.Environm // returns the modified manifest path. If there is any issue patching the manifest, it will log the issue and return // the original provided path. func getTestManifest(t *testing.T, baseManifestPath string) (io.Reader, error) { - var manifestsReader io.Reader - manifestsReader, err := os.Open(baseManifestPath) + var ( + manifestsReader io.Reader + err error + ) + manifestsReader, err = os.Open(baseManifestPath) if err != nil { return nil, err } - var imageFullname string - if imageLoad != "" { - imageFullname = imageLoad - } else { - imageFullname = imageOverride - } - - if imageFullname != "" { - split := strings.Split(imageFullname, ":") - if len(split) < 2 { - t.Logf("could not parse override image '%v', using default manifest %v", imageFullname, baseManifestPath) - return manifestsReader, nil - } - repo := strings.Join(split[0:len(split)-1], ":") - tag := split[len(split)-1] - manifestsReader, err = patchControllerImage(manifestsReader, repo, tag) - if err != nil { - t.Logf("failed patching override image '%v' (%v), using default manifest %v", imageFullname, err, baseManifestPath) - return manifestsReader, nil - } + manifestsReader, err = patchControllerImageHelper(manifestsReader, baseManifestPath) + if err != nil { + t.Logf("failed patching controller image (%v), using default manifest %v", err, baseManifestPath) + return manifestsReader, nil } var kongImageFullname string @@ -180,6 +167,30 @@ func getTestManifest(t *testing.T, baseManifestPath string) (io.Reader, error) { return manifestsReader, nil } +func patchControllerImageHelper(manifestReader io.Reader, baseManifestPath string) (io.Reader, error) { + var imageFullname string + if imageLoad != "" { + imageFullname = imageLoad + } else { + imageFullname = imageOverride + } + + if imageFullname != "" { + split := strings.Split(imageFullname, ":") + if len(split) < 2 { + return manifestReader, fmt.Errorf("could not parse override image '%v', using default manifest %v", imageFullname, baseManifestPath) + } + repo := strings.Join(split[0:len(split)-1], ":") + tag := split[len(split)-1] + var err error + manifestReader, err = patchControllerImage(manifestReader, repo, tag) + if err != nil { + return manifestReader, fmt.Errorf("failed patching override image '%v' (%w), using default manifest %v", imageFullname, err, baseManifestPath) + } + } + return manifestReader, nil +} + func getCurrentGitTag(path string) (semver.Version, error) { cmd := exec.Command("git", "describe", "--tags") cmd.Dir = path @@ -260,6 +271,8 @@ func getKongProxyLoadBalancerIP(t *testing.T, refreshSvc func() *corev1.Service) } func getKongProxyNodePortIP(ctx context.Context, t *testing.T, env environments.Environment, svc *corev1.Service) string { + t.Helper() + var port corev1.ServicePort for _, sport := range svc.Spec.Ports { if sport.Name == "kong-proxy" || sport.Name == "proxy" { diff --git a/test/internal/helpers/teardown.go b/test/internal/helpers/teardown.go index abd23d559c..55b4b72a8a 100644 --- a/test/internal/helpers/teardown.go +++ b/test/internal/helpers/teardown.go @@ -3,9 +3,12 @@ package helpers import ( "context" "testing" + "time" "github.com/kong/kubernetes-testing-framework/pkg/clusters" "github.com/stretchr/testify/assert" + + "github.com/kong/kubernetes-ingress-controller/v2/test/internal/testenv" ) // TeardownCluster dumps the diagnostics from the test cluster if the test failed @@ -13,8 +16,19 @@ import ( func TeardownCluster(ctx context.Context, t *testing.T, cluster clusters.Cluster) { t.Helper() + const ( + environmentCleanupTimeout = 3 * time.Minute + ) + DumpDiagnosticsIfFailed(ctx, t, cluster) - assert.NoError(t, cluster.Cleanup(ctx)) + + if testenv.KeepTestCluster() == "" && testenv.ExistingClusterName() == "" { + ctx, cancel := context.WithTimeout(ctx, environmentCleanupTimeout) + defer cancel() + t.Logf("INFO: cluster %s is being deleted\n", cluster.Name()) + assert.NoError(t, cluster.Cleanup(ctx)) + return + } } // DumpDiagnosticsIfFailed dumps the diagnostics if the test failed. From 0cd5b4d8fdb71f716751af1b91df416dc37a9fff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Thu, 9 Feb 2023 12:37:30 +0100 Subject: [PATCH 2/4] Update test/e2e/utils_test.go MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Grzegorz BurzyƄski --- test/e2e/utils_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/utils_test.go b/test/e2e/utils_test.go index a1d3d8e8dc..e6399140bd 100644 --- a/test/e2e/utils_test.go +++ b/test/e2e/utils_test.go @@ -167,7 +167,8 @@ func getTestManifest(t *testing.T, baseManifestPath string) (io.Reader, error) { return manifestsReader, nil } -func patchControllerImageHelper(manifestReader io.Reader, baseManifestPath string) (io.Reader, error) { +// patchControllerImageFromEnv will optionally replace a default controller image in manifests with one of `imageLoad` or `imageOverride` if any is set. +func patchControllerImageFromEnv(manifestReader io.Reader, baseManifestPath string) (io.Reader, error) { var imageFullname string if imageLoad != "" { imageFullname = imageLoad From f05326c50d324e53282fcbeb50244a1b14bf2223 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Thu, 9 Feb 2023 13:31:30 +0100 Subject: [PATCH 3/4] tests: check that all gateway pods got the config --- test/e2e/all_in_one_test.go | 70 ++++++++++++++++++++++++++++++++++--- test/e2e/utils_test.go | 2 +- 2 files changed, 66 insertions(+), 6 deletions(-) diff --git a/test/e2e/all_in_one_test.go b/test/e2e/all_in_one_test.go index 8fc09ade00..98505aeacb 100644 --- a/test/e2e/all_in_one_test.go +++ b/test/e2e/all_in_one_test.go @@ -5,6 +5,7 @@ package e2e import ( "context" + "crypto/tls" "fmt" "io" "net/http" @@ -12,6 +13,8 @@ import ( "testing" "time" + "github.com/kong/deck/dump" + gokong "github.com/kong/go-kong/kong" "github.com/kong/kubernetes-testing-framework/pkg/clusters/addons/kong" "github.com/samber/lo" "github.com/stretchr/testify/require" @@ -408,7 +411,7 @@ func TestDeployAllInOneDBLESSMultiGW(t *testing.T) { defer f.Close() var manifest io.Reader = f - manifest, err = patchControllerImageHelper(manifest, manifestFilePath) + manifest, err = patchControllerImageFromEnv(manifest, manifestFilePath) require.NoError(t, err) deployment := deployKong(ctx, t, env, manifest) @@ -422,8 +425,65 @@ func TestDeployAllInOneDBLESSMultiGW(t *testing.T) { _, err = env.Cluster().Client().AppsV1().Deployments(deployment.Namespace).Update(ctx, gatewayDeployment, metav1.UpdateOptions{}) require.NoError(t, err) - t.Log("confirming that routes are stil served after scaling out") - verifyIngress(ctx, t, env) - verifyIngress(ctx, t, env) - verifyIngress(ctx, t, env) + var podList *corev1.PodList + + t.Log("waiting all the dataplane instances to be ready") + require.Eventually(t, func() bool { + forDeployment := metav1.ListOptions{ + LabelSelector: "app=proxy-kong", + } + podList, err = env.Cluster().Client().CoreV1().Pods(deployment.Namespace).List(ctx, forDeployment) + require.NoError(t, err) + return len(podList.Items) == 3 + }, time.Minute, time.Second) + + t.Log("confirming that all dataplanes got the config") + for _, pod := range podList.Items { + client := &http.Client{ + Timeout: time.Second * 30, + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{ + InsecureSkipVerify: true, //nolint:gosec + }, + }, + } + + forwardCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + localPort := startPortForwarder(forwardCtx, t, env, deployment.Namespace, pod.Name, "8444") + address := fmt.Sprintf("https://localhost:%d", localPort) + + kongClient, err := gokong.NewClient(lo.ToPtr(address), client) + require.NoError(t, err) + + require.Eventually(t, func() bool { + d, err := dump.Get(ctx, kongClient, dump.Config{}) + if err != nil { + return false + } + if len(d.Services) != 1 { + return false + } + if len(d.Routes) != 1 { + return false + } + + if d.Services[0].ID == nil || + d.Routes[0].Service.ID == nil || + *d.Services[0].ID != *d.Routes[0].Service.ID { + return false + } + + if len(d.Targets) != 1 { + return false + } + + if len(d.Upstreams) != 1 { + return false + } + + return true + }, time.Minute, time.Second, "pod: %s/%s didn't get the config", pod.Namespace, pod.Name) + t.Logf("proxy pod %s/%s: got the config", pod.Namespace, pod.Name) + } } diff --git a/test/e2e/utils_test.go b/test/e2e/utils_test.go index e6399140bd..ddf232e2a1 100644 --- a/test/e2e/utils_test.go +++ b/test/e2e/utils_test.go @@ -117,7 +117,7 @@ func getTestManifest(t *testing.T, baseManifestPath string) (io.Reader, error) { return nil, err } - manifestsReader, err = patchControllerImageHelper(manifestsReader, baseManifestPath) + manifestsReader, err = patchControllerImageFromEnv(manifestsReader, baseManifestPath) if err != nil { t.Logf("failed patching controller image (%v), using default manifest %v", err, baseManifestPath) return manifestsReader, nil From 9a400cf864b281db5efdc36d4471d86c925d4eb2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Thu, 9 Feb 2023 14:56:28 +0100 Subject: [PATCH 4/4] chore: add more concrete assertions in dataplane client tests --- internal/dataplane/kong_client_test.go | 45 +++++++++++++++++--------- 1 file changed, 30 insertions(+), 15 deletions(-) diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 1a29b87fd7..4c32e8959e 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -14,6 +14,7 @@ import ( "github.com/samber/lo" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" + "golang.org/x/exp/slices" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -129,12 +130,12 @@ type clientFactoryWithExpected struct { } func (cf clientFactoryWithExpected) CreateAdminAPIClient(ctx context.Context, address string) (adminapi.Client, error) { - num, ok := cf.expected[address] + stillExpecting, ok := cf.expected[address] if !ok { cf.t.Errorf("got %s which was unexpected", address) return adminapi.Client{}, fmt.Errorf("got %s which was unexpected", address) } - if !num { + if !stillExpecting { cf.t.Errorf("got %s more than once", address) return adminapi.Client{}, fmt.Errorf("got %s more than once", address) } @@ -148,6 +149,14 @@ func (cf clientFactoryWithExpected) CreateAdminAPIClient(ctx context.Context, ad return adminapi.NewClient(kongClient), nil } +func (cf clientFactoryWithExpected) AssertExpectedCalls() { + for addr, stillExpected := range cf.expected { + if stillExpected { + cf.t.Errorf("%s client expected to be called, but wasn't", addr) + } + } +} + func TestClientAddressesNotifications(t *testing.T) { var ( ctx = context.Background() @@ -185,6 +194,10 @@ func TestClientAddressesNotifications(t *testing.T) { defer srv2.Close() expected[srv2.URL] = true + testClientFactoryWithExpected := clientFactoryWithExpected{ + expected: expected, + t: t, + } client, err := NewKongClient(ctx, logger, time.Second, "", false, true, util.ConfigDumpDiagnostic{}, sendconfig.New(ctx, logr.Discard(), []adminapi.Client{}, sendconfig.Config{ @@ -194,47 +207,48 @@ func TestClientAddressesNotifications(t *testing.T) { ), nil, "off", - clientFactoryWithExpected{ - expected: expected, - t: t, - }, + testClientFactoryWithExpected, ) require.NoError(t, err) + defer testClientFactoryWithExpected.AssertExpectedCalls() - requireClientsCountEventually := func(t *testing.T, c *KongClient, n int, args ...any) { + requireClientsCountEventually := func(t *testing.T, c *KongClient, addresses []string, args ...any) { require.Eventually(t, func() bool { c.lock.RLock() defer c.lock.RUnlock() - return len(c.kongConfig.Clients) == n + clientAddresses := lo.Map(c.kongConfig.Clients, func(cl adminapi.Client, _ int) string { + return cl.BaseRootURL() + }) + return slices.Equal(addresses, clientAddresses) }, time.Second, time.Millisecond, args..., ) } - requireClientsCountEventually(t, client, 0, + requireClientsCountEventually(t, client, []string{}, "initially there should be 0 clients") client.Notify([]string{srv.URL}) - requireClientsCountEventually(t, client, 1, + requireClientsCountEventually(t, client, []string{srv.URL}, "after notifying about a new address we should get 1 client eventually") client.Notify([]string{srv.URL}) - requireClientsCountEventually(t, client, 1, + requireClientsCountEventually(t, client, []string{srv.URL}, "after notifying the same address there's no update in clients") client.Notify([]string{srv.URL, srv2.URL}) - requireClientsCountEventually(t, client, 2, + requireClientsCountEventually(t, client, []string{srv.URL, srv2.URL}, "after notifying new address set including the old already existing one we get both the old and the new") client.Notify([]string{srv.URL, srv2.URL}) - requireClientsCountEventually(t, client, 2, + requireClientsCountEventually(t, client, []string{srv.URL, srv2.URL}, "notifying again with the same set of URLs should not change the existing URLs") client.Notify([]string{srv.URL}) - requireClientsCountEventually(t, client, 1, + requireClientsCountEventually(t, client, []string{srv.URL}, "notifying again with just one URL should decrease the set of URLs to just this one") client.Notify([]string{}) - requireClientsCountEventually(t, client, 0) + requireClientsCountEventually(t, client, []string{}) // We could test here notifying about srv.URL and srv2.URL again but there's // no data structure in the client that could notify us about a removal of @@ -255,6 +269,7 @@ func TestClientAdjustInternalClientsAfterNotification(t *testing.T) { cf := &clientFactoryWithExpected{ t: t, } + defer cf.AssertExpectedCalls() client, err := NewKongClient(ctx, logger, time.Second, "", false, true, util.ConfigDumpDiagnostic{}, sendconfig.New(ctx, logr.Discard(), []adminapi.Client{}, sendconfig.Config{