From 2055d49e7efdedecd87be273953b417cd6b82894 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Patryk=20Ma=C5=82ek?= Date: Tue, 10 Jan 2023 15:06:12 +0100 Subject: [PATCH] feat: add service discovery for kong admin service --- Makefile | 32 +++-- config/debug/{ => base}/kustomization.yaml | 4 +- config/debug/{ => base}/manager_debug.yaml | 0 .../debug/multi_gw/gateway_admin_service.yaml | 14 ++ config/debug/multi_gw/gateway_deployment.yaml | 102 +++++++++++++ .../debug/multi_gw/gateway_service_patch.yaml | 8 ++ config/debug/multi_gw/kustomization.yaml | 24 ++++ .../multi_gw/manager_multi_gateway_patch.yaml | 18 +++ .../multi_gw/remove_proxy_container.yaml | 2 + config/rbac/role.yaml | 8 ++ .../all-in-one-dbless-k4k8s-enterprise.yaml | 8 ++ deploy/single/all-in-one-dbless.yaml | 8 ++ .../all-in-one-postgres-enterprise.yaml | 8 ++ deploy/single/all-in-one-postgres.yaml | 8 ++ examples/gateway-httproute.yaml | 4 + internal/admission/validator.go | 4 +- .../configuration/kongadminapi_controller.go | 136 ++++++++++++++++++ .../controllers/gateway/gateway_controller.go | 2 +- .../gateway/gateway_controller_test.go | 6 +- internal/controllers/gateway/gateway_utils.go | 4 +- .../gateway/gatewayclass_controller.go | 26 +++- .../gateway/httproute_controller.go | 6 +- internal/controllers/gateway/route_utils.go | 2 +- .../gateway/tcproute_controller.go | 6 +- .../gateway/tlsroute_controller.go | 6 +- .../gateway/udproute_controller.go | 4 +- internal/dataplane/client.go | 4 + internal/dataplane/kong_client.go | 107 ++++++++++++++ internal/dataplane/kong_client_test.go | 118 +++++++++++++++ internal/dataplane/synchronizer.go | 4 + internal/dataplane/synchronizer_test.go | 6 +- internal/manager/config.go | 119 +++++++++++++-- internal/manager/config_validation_test.go | 2 +- internal/manager/controllerdef.go | 14 ++ internal/manager/run.go | 41 ++++-- internal/manager/setup.go | 10 +- internal/manager/utils/kongconfig/root.go | 4 +- skaffold.yaml | 17 ++- staticcheck.conf | 2 +- test/conformance/gateway_conformance_test.go | 2 +- test/e2e/helpers_gateway_test.go | 4 +- test/integration/helpers_test.go | 18 +-- 42 files changed, 840 insertions(+), 82 deletions(-) rename config/debug/{ => base}/kustomization.yaml (78%) rename config/debug/{ => base}/manager_debug.yaml (100%) create mode 100644 config/debug/multi_gw/gateway_admin_service.yaml create mode 100644 config/debug/multi_gw/gateway_deployment.yaml create mode 100644 config/debug/multi_gw/gateway_service_patch.yaml create mode 100644 config/debug/multi_gw/kustomization.yaml create mode 100644 config/debug/multi_gw/manager_multi_gateway_patch.yaml create mode 100644 config/debug/multi_gw/remove_proxy_container.yaml create mode 100644 internal/controllers/configuration/kongadminapi_controller.go diff --git a/Makefile b/Makefile index e5228ffef7..04012149d3 100644 --- a/Makefile +++ b/Makefile @@ -443,9 +443,12 @@ _ensure-namespace: .PHONY: debug debug: install _ensure-namespace - $(DLV) debug ./internal/cmd/main.go -- \ + $(DLV) debug \ + --headless --listen 127.0.0.1:40000 --continue --accept-multiclient \ + ./internal/cmd/main.go -- \ --anonymous-reports=false \ --kong-admin-url $(KONG_ADMIN_URL) \ + --kong-admin-svc $(KONG_NAMESPACE)/$(KONG_ADMIN_SERVICE) \ --publish-service $(KONG_NAMESPACE)/$(KONG_PROXY_SERVICE) \ --publish-service-udp $(KONG_NAMESPACE)/$(KONG_PROXY_UDP_SERVICE) \ --kubeconfig $(KUBECONFIG) \ @@ -465,23 +468,33 @@ debug: install _ensure-namespace debug.connect: XDG_CONFIG_HOME="$(PROJECT_DIR)/.config" $(DLV) connect localhost:40000 +SKAFFOLD_DEBUG_PROFILE ?= debug + # This will port-forward 40000 from KIC's debugger to localhost. Connect to that # port with debugger/IDE of your choice .PHONY: debug.skaffold -debug.skaffold: skaffold +debug.skaffold: TAG=$(TAG)-debug REPO_INFO=$(REPO_INFO) COMMIT=$(COMMIT) \ - $(SKAFFOLD) debug --port-forward=pods --profile=debug $(SKAFFOLD_FLAGS) + CMD=debug SKAFFOLD_PROFILE=$(SKAFFOLD_DEBUG_PROFILE) \ + $(MAKE) _skaffold # This will port-forward 40000 from KIC's debugger to localhost. Connect to that # port with debugger/IDE of your choice .PHONY: debug.skaffold.sync -debug.skaffold.sync: skaffold - @$(MAKE) debug.skaffold SKAFFOLD_FLAGS="--auto-build --auto-deploy --auto-sync" +debug.skaffold.sync: + $(MAKE) debug.skaffold SKAFFOLD_FLAGS="--auto-build --auto-deploy --auto-sync" + +SKAFFOLD_RUN_PROFILE ?= dev .PHONY: run.skaffold -run.skaffold: skaffold - TAG=$(TAG)-debug REPO_INFO=$(REPO_INFO) COMMIT=$(COMMIT) \ - $(SKAFFOLD) dev --port-forward=pods --profile=dev +run.skaffold: + TAG=$(TAG) REPO_INFO=$(REPO_INFO) COMMIT=$(COMMIT) \ + CMD=dev SKAFFOLD_PROFILE=$(SKAFFOLD_RUN_PROFILE) \ + $(MAKE) _skaffold + +.PHONY: _skaffold +_skaffold: skaffold + $(SKAFFOLD) $(CMD) --port-forward=pods --profile=$(SKAFFOLD_PROFILE) $(SKAFFOLD_FLAGS) .PHONY: run run: install _ensure-namespace @@ -494,7 +507,8 @@ run: install _ensure-namespace _run: go run ./internal/cmd/main.go \ --anonymous-reports=false \ - --kong-admin-url $(KONG_ADMIN_URL) \ + --kong-admin-url $(KONG_NAMESPACE)/$(KONG_ADMIN_URL) \ + --kong-admin-svc $(KONG_NAMESPACE)/$(KONG_ADMIN_SERVICE) \ --publish-service $(KONG_NAMESPACE)/$(KONG_PROXY_SERVICE) \ --publish-service-udp $(KONG_NAMESPACE)/$(KONG_PROXY_UDP_SERVICE) \ --kubeconfig $(KUBECONFIG) \ diff --git a/config/debug/kustomization.yaml b/config/debug/base/kustomization.yaml similarity index 78% rename from config/debug/kustomization.yaml rename to config/debug/base/kustomization.yaml index c5278b7b19..61bfb9c4c4 100644 --- a/config/debug/kustomization.yaml +++ b/config/debug/base/kustomization.yaml @@ -1,8 +1,10 @@ apiVersion: kustomize.config.k8s.io/v1beta1 kind: Kustomization +# namespace: kong + resources: -- ../base +- ../../base/ patchesStrategicMerge: - manager_debug.yaml diff --git a/config/debug/manager_debug.yaml b/config/debug/base/manager_debug.yaml similarity index 100% rename from config/debug/manager_debug.yaml rename to config/debug/base/manager_debug.yaml diff --git a/config/debug/multi_gw/gateway_admin_service.yaml b/config/debug/multi_gw/gateway_admin_service.yaml new file mode 100644 index 0000000000..df62a158c8 --- /dev/null +++ b/config/debug/multi_gw/gateway_admin_service.yaml @@ -0,0 +1,14 @@ +apiVersion: v1 +kind: Service +metadata: + name: kong-admin + namespace: kong +spec: + clusterIP: "None" + selector: + app: proxy-kong + ports: + - name: admin + port: 8444 + targetPort: 8444 + protocol: TCP diff --git a/config/debug/multi_gw/gateway_deployment.yaml b/config/debug/multi_gw/gateway_deployment.yaml new file mode 100644 index 0000000000..128acb422d --- /dev/null +++ b/config/debug/multi_gw/gateway_deployment.yaml @@ -0,0 +1,102 @@ +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: proxy-kong + name: proxy-kong + namespace: kong +spec: + replicas: 2 + selector: + matchLabels: + app: proxy-kong + template: + metadata: + annotations: + traffic.sidecar.istio.io/includeInboundPorts: "" + kuma.io/gateway: enabled + kuma.io/service-account-token-volume: kong-serviceaccount-token + labels: + app: proxy-kong + spec: + serviceAccountName: kong-serviceaccount + automountServiceAccountToken: false + volumes: + - name: kong-serviceaccount-token + secret: + secretName: kong-serviceaccount-token + items: + - key: token + path: token + - key: ca.crt + path: ca.crt + - key: namespace + path: namespace + containers: + - name: proxy + image: kong-placeholder:placeholder # This is replaced by the config/image.yaml component + env: + # servers + - name: KONG_PROXY_LISTEN + value: 0.0.0.0:8000 reuseport backlog=16384, 0.0.0.0:8443 http2 ssl reuseport backlog=16384 + - name: KONG_PORT_MAPS + value: "80:8000, 443:8443" + - name: KONG_ADMIN_LISTEN + value: 0.0.0.0:8444 http2 ssl reuseport backlog=16384 + - name: KONG_STATUS_LISTEN + value: 0.0.0.0:8100 + # DB + - name: KONG_DATABASE + value: "off" + # runtime tweaks + - name: KONG_NGINX_WORKER_PROCESSES + value: "2" + - name: KONG_KIC + value: "on" + # logging + - name: KONG_ADMIN_ACCESS_LOG + value: /dev/stdout + - name: KONG_ADMIN_ERROR_LOG + value: /dev/stderr + # - name: KONG_PROXY_ACCESS_LOG + # - value: /dev/stdout + - name: KONG_PROXY_ERROR_LOG + value: /dev/stderr + # router mode in 3.0.0. use `traditional` here for full compatibility. + - name: KONG_ROUTER_FLAVOR + value: traditional + lifecycle: + preStop: + exec: + command: [ "/bin/bash", "-c", "kong quit" ] + ports: + - name: proxy + containerPort: 8000 + protocol: TCP + - name: proxy-ssl + containerPort: 8443 + protocol: TCP + - name: metrics + containerPort: 8100 + protocol: TCP + livenessProbe: + httpGet: + path: /status + port: 8100 + scheme: HTTP + initialDelaySeconds: 5 + timeoutSeconds: 1 + periodSeconds: 10 + successThreshold: 1 + failureThreshold: 3 + readinessProbe: + httpGet: + path: /status + port: 8100 + scheme: HTTP + initialDelaySeconds: 5 + timeoutSeconds: 1 + periodSeconds: 10 + successThreshold: 1 + failureThreshold: 3 diff --git a/config/debug/multi_gw/gateway_service_patch.yaml b/config/debug/multi_gw/gateway_service_patch.yaml new file mode 100644 index 0000000000..9f2ec6b5a7 --- /dev/null +++ b/config/debug/multi_gw/gateway_service_patch.yaml @@ -0,0 +1,8 @@ +apiVersion: v1 +kind: Service +metadata: + name: kong-proxy + namespace: kong +spec: + selector: + app: proxy-kong diff --git a/config/debug/multi_gw/kustomization.yaml b/config/debug/multi_gw/kustomization.yaml new file mode 100644 index 0000000000..940f03be16 --- /dev/null +++ b/config/debug/multi_gw/kustomization.yaml @@ -0,0 +1,24 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: kong + +resources: +- ../base +- gateway_deployment.yaml +- gateway_admin_service.yaml + +components: + - ../../image/oss + +patchesStrategicMerge: +- manager_multi_gateway_patch.yaml +- gateway_service_patch.yaml + +patchesJson6902: +- target: + group: apps + version: v1 + kind: Deployment + name: ingress-kong + path: ./remove_proxy_container.yaml diff --git a/config/debug/multi_gw/manager_multi_gateway_patch.yaml b/config/debug/multi_gw/manager_multi_gateway_patch.yaml new file mode 100644 index 0000000000..a0a7f0256d --- /dev/null +++ b/config/debug/multi_gw/manager_multi_gateway_patch.yaml @@ -0,0 +1,18 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: ingress-kong + name: ingress-kong + namespace: kong +spec: + template: + spec: + containers: + - name: ingress-controller + env: + - name: CONTROLLER_LOG_LEVEL + value: debug + - name: CONTROLLER_KONG_ADMIN_SVC + value: kong/kong-admin + image: kic-placeholder:placeholder diff --git a/config/debug/multi_gw/remove_proxy_container.yaml b/config/debug/multi_gw/remove_proxy_container.yaml new file mode 100644 index 0000000000..f9b75861c1 --- /dev/null +++ b/config/debug/multi_gw/remove_proxy_container.yaml @@ -0,0 +1,2 @@ +- op: remove + path: "/spec/template/spec/containers/1" diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 69969e7c35..193b52ccb9 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -177,6 +177,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml index 7223ffe959..72782f7d4c 100644 --- a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-dbless.yaml b/deploy/single/all-in-one-dbless.yaml index 7c67c8664b..003c3a35a3 100644 --- a/deploy/single/all-in-one-dbless.yaml +++ b/deploy/single/all-in-one-dbless.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-postgres-enterprise.yaml b/deploy/single/all-in-one-postgres-enterprise.yaml index ea290c201b..da7a7cb5c0 100644 --- a/deploy/single/all-in-one-postgres-enterprise.yaml +++ b/deploy/single/all-in-one-postgres-enterprise.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/deploy/single/all-in-one-postgres.yaml b/deploy/single/all-in-one-postgres.yaml index a52c30e578..5179b6fa30 100644 --- a/deploy/single/all-in-one-postgres.yaml +++ b/deploy/single/all-in-one-postgres.yaml @@ -1337,6 +1337,14 @@ rules: - get - patch - update +- apiGroups: + - discovery.k8s.io + resources: + - endpointslices + verbs: + - get + - list + - watch - apiGroups: - extensions resources: diff --git a/examples/gateway-httproute.yaml b/examples/gateway-httproute.yaml index f969480479..b413589565 100644 --- a/examples/gateway-httproute.yaml +++ b/examples/gateway-httproute.yaml @@ -32,6 +32,8 @@ metadata: labels: app: httpbin name: httpbin + annotations: + konghq.com/retries: "3" spec: ports: - port: 80 @@ -68,6 +70,8 @@ metadata: labels: app: nginx name: nginx + annotations: + konghq.com/retries: "3" spec: ports: - port: 8080 diff --git a/internal/admission/validator.go b/internal/admission/validator.go index 16bd57cf0d..db8d78e290 100644 --- a/internal/admission/validator.go +++ b/internal/admission/validator.go @@ -314,7 +314,7 @@ func (validator KongHTTPValidator) ValidateGateway( // validate whether the gatewayclass is a supported class, if not // then this gateway belongs to another controller. - if gwc.Spec.ControllerName != gatewaycontroller.ControllerName { + if gwc.Spec.ControllerName != gatewaycontroller.GetControllerName() { return true, "", nil } @@ -352,7 +352,7 @@ func (validator KongHTTPValidator) ValidateHTTPRoute( } // determine ultimately whether the Gateway is managed by this controller implementation - if string(gatewayClass.Spec.ControllerName) == string(gatewaycontroller.ControllerName) { + if string(gatewayClass.Spec.ControllerName) == string(gatewaycontroller.GetControllerName()) { managedGateways = append(managedGateways, &gateway) } } diff --git a/internal/controllers/configuration/kongadminapi_controller.go b/internal/controllers/configuration/kongadminapi_controller.go new file mode 100644 index 0000000000..54fa663834 --- /dev/null +++ b/internal/controllers/configuration/kongadminapi_controller.go @@ -0,0 +1,136 @@ +package configuration + +import ( + "context" + "fmt" + "time" + + "github.com/go-logr/logr" + "github.com/samber/lo" + corev1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +// KongAdminAPIServiceReconciler reconciles Kong Admin API Secrets +type KongAdminAPIServiceReconciler struct { + client.Client + + ServiceNN types.NamespacedName + Service *corev1.Service + + Log logr.Logger + CacheSyncTimeout time.Duration + + EndpointsNotifier EndpointsNotifier +} + +type EndpointsNotifier interface { + Notify(addresses []string) +} + +// SetupWithManager sets up the controller with the Manager. +func (r *KongAdminAPIServiceReconciler) SetupWithManager(mgr ctrl.Manager) error { + // var service corev1.Service + // if err := r.Get(context.Background(), r.ServiceNN, &service); err != nil { + // return fmt.Errorf("failed to get kong Admin API service %s: %w", r.ServiceNN, err) + // } + // r.Service = &service + + c, err := controller.New("KongAdminAPIEndpoints", mgr, controller.Options{ + Reconciler: r, + LogConstructor: func(_ *reconcile.Request) logr.Logger { + return r.Log + }, + CacheSyncTimeout: r.CacheSyncTimeout, + }) + if err != nil { + return err + } + + return c.Watch( + &source.Kind{Type: &discoveryv1.EndpointSlice{}}, + &handler.EnqueueRequestForObject{}, + predicate.NewPredicateFuncs(r.shouldReconcileEndpointSlice), + ) +} + +func (r *KongAdminAPIServiceReconciler) shouldReconcileEndpointSlice(obj client.Object) bool { + endpoints, ok := obj.(*discoveryv1.EndpointSlice) + if !ok { + return false + } + + if !lo.ContainsBy(endpoints.OwnerReferences, func(ref metav1.OwnerReference) bool { + r.Log.Info("checking ref", "ref", ref) + // if ref.UID != r.Service.UID { + // return false + // } + + if ref.Kind != "Service" || ref.Name != r.ServiceNN.Name { + return false + } + r.Log.Info("ref ok", "ref", ref) + return true + }) { + return false + } + + return true // TODO xxx +} + +//+kubebuilder:rbac:groups="discovery.k8s.io",resources=endpointslices,verbs=get;list;watch + +// Reconcile processes the watched objects +func (r *KongAdminAPIServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + // get the relevant object + // var endpoints corev1.Endpoints + var endpoints discoveryv1.EndpointSlice + if err := r.Get(ctx, req.NamespacedName, &endpoints); err != nil { + // if err := r.Get(ctx, req.NamespacedName, &endpoints); err != nil { + return ctrl.Result{}, err + } + r.Log.Info("reconciling resource", "namespace", req.Namespace, "name", req.Name) + + if !endpoints.DeletionTimestamp.IsZero() { + r.Log.Info("resource is being deleted", "type", "Service", "namespace", req.Namespace, "name", req.Name) + return ctrl.Result{}, nil + } + + addresses := AddressesFromEndpointSlice(endpoints) + r.EndpointsNotifier.Notify(addresses) + + return ctrl.Result{}, nil +} + +func AddressesFromEndpointSlice(endpoints discoveryv1.EndpointSlice) []string { + var addresses []string + for _, p := range endpoints.Ports { + if p.Name == nil { + continue + } + // TODO + if *p.Name != "admin" && *p.Name != "kong-admin" { + continue + } + + for _, e := range endpoints.Endpoints { + if e.Conditions.Ready == nil || !*e.Conditions.Ready { + continue + } + + for _, addr := range e.Addresses { + addresses = append(addresses, fmt.Sprintf("https://%s:%d", addr, *p.Port)) + } + } + } + return addresses +} diff --git a/internal/controllers/gateway/gateway_controller.go b/internal/controllers/gateway/gateway_controller.go index 682c882aa1..309c535228 100644 --- a/internal/controllers/gateway/gateway_controller.go +++ b/internal/controllers/gateway/gateway_controller.go @@ -320,7 +320,7 @@ func (r *GatewayReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ct debug(log, gateway, "ensured gateway was removed from the data-plane (if ever present)") return ctrl.Result{}, nil } - if gwc.Spec.ControllerName != ControllerName { + if gwc.Spec.ControllerName != GetControllerName() { debug(log, gateway, "unsupported gatewayclass controllername, ignoring", "gatewayclass", gwc.Name, "controllername", gwc.Spec.ControllerName) // delete reference relationships where the gateway is the referrer, as we will not process the gateway. err := ctrlref.DeleteReferencesByReferrer(r.ReferenceIndexers, r.DataplaneClient, gateway) diff --git a/internal/controllers/gateway/gateway_controller_test.go b/internal/controllers/gateway/gateway_controller_test.go index 944c98da40..c87b676d53 100644 --- a/internal/controllers/gateway/gateway_controller_test.go +++ b/internal/controllers/gateway/gateway_controller_test.go @@ -237,7 +237,7 @@ func TestReconcileGatewaysIfClassMatches(t *testing.T) { Name: "us", }, Spec: gatewayv1beta1.GatewayClassSpec{ - ControllerName: ControllerName, + ControllerName: GetControllerName(), }, } @@ -367,7 +367,7 @@ func TestIsGatewayControlledAndUnmanagedMode(t *testing.T) { Name: "controlled-managed", }, Spec: gatewayv1beta1.GatewayClassSpec{ - ControllerName: ControllerName, + ControllerName: GetControllerName(), }, }, expectedResult: false, @@ -382,7 +382,7 @@ func TestIsGatewayControlledAndUnmanagedMode(t *testing.T) { }, }, Spec: gatewayv1beta1.GatewayClassSpec{ - ControllerName: ControllerName, + ControllerName: GetControllerName(), }, }, expectedResult: true, diff --git a/internal/controllers/gateway/gateway_utils.go b/internal/controllers/gateway/gateway_utils.go index bf575fa55f..43e92ce1bb 100644 --- a/internal/controllers/gateway/gateway_utils.go +++ b/internal/controllers/gateway/gateway_utils.go @@ -82,7 +82,7 @@ func isObjectUnmanaged(anns map[string]string) bool { // isGatewayClassControlledAndUnmanaged returns boolean if the GatewayClass // is controlled by this controller and is configured for unmanaged mode. func isGatewayClassControlledAndUnmanaged(gatewayClass *GatewayClass) bool { - return gatewayClass.Spec.ControllerName == ControllerName && isObjectUnmanaged(gatewayClass.Annotations) + return gatewayClass.Spec.ControllerName == GetControllerName() && isObjectUnmanaged(gatewayClass.Annotations) } // getRefFromPublishService splits a publish service string in the format namespace/name into a types.NamespacedName @@ -597,7 +597,7 @@ func isGatewayClassEventInClass(log logr.Logger, watchEvent interface{}) bool { log.Error(fmt.Errorf("invalid type"), "received invalid object type in event handlers", "expected", "GatewayClass", "found", reflect.TypeOf(obj)) continue } - if gwc.Spec.ControllerName == ControllerName { + if gwc.Spec.ControllerName == GetControllerName() { return true } } diff --git a/internal/controllers/gateway/gatewayclass_controller.go b/internal/controllers/gateway/gatewayclass_controller.go index 2e97eb8095..27fe9ad2c5 100644 --- a/internal/controllers/gateway/gatewayclass_controller.go +++ b/internal/controllers/gateway/gatewayclass_controller.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "reflect" + "sync" "time" "github.com/go-logr/logr" @@ -26,10 +27,27 @@ import ( // GatewayClass Controller - Vars & Consts // ----------------------------------------------------------------------------- -// ControllerName is the unique identifier for this controller and is used -// within GatewayClass resources to indicate that this controller should -// support connected Gateway resources. -var ControllerName gatewayv1beta1.GatewayController = "konghq.com/kic-gateway-controller" +var ( + // _controllerName is the unique identifier for this controller and is used + // within GatewayClass resources to indicate that this controller should + // support connected Gateway resources. + _controllerName gatewayv1beta1.GatewayController = "konghq.com/kic-gateway-controller" + + // _controllerNameLock guards access to _controllerName. + _controllerNameLock sync.RWMutex +) + +func SetControllerName(name gatewayv1beta1.GatewayController) { + _controllerNameLock.Lock() + defer _controllerNameLock.Unlock() + _controllerName = name +} + +func GetControllerName() gatewayv1beta1.GatewayController { + _controllerNameLock.RLock() + defer _controllerNameLock.RUnlock() + return _controllerName +} // ----------------------------------------------------------------------------- // GatewayClass Controller - Reconciler diff --git a/internal/controllers/gateway/httproute_controller.go b/internal/controllers/gateway/httproute_controller.go index f80b4a40c8..1738a36018 100644 --- a/internal/controllers/gateway/httproute_controller.go +++ b/internal/controllers/gateway/httproute_controller.go @@ -440,7 +440,7 @@ func (r *HTTPRouteReconciler) ensureGatewayReferenceStatusAdded(ctx context.Cont Namespace: (*gatewayv1beta1.Namespace)(&gateway.gateway.Namespace), Name: gatewayv1beta1.ObjectName(gateway.gateway.Name), }, - ControllerName: ControllerName, + ControllerName: GetControllerName(), Conditions: []metav1.Condition{{ Type: gateway.condition.Type, Status: gateway.condition.Status, @@ -522,7 +522,7 @@ func (r *HTTPRouteReconciler) ensureGatewayReferenceStatusRemoved(ctx context.Co // drop all status references to supported Gateway objects newStatuses := make([]gatewayv1beta1.RouteParentStatus, 0) for _, status := range httproute.Status.Parents { - if status.ControllerName != ControllerName { + if status.ControllerName != GetControllerName() { newStatuses = append(newStatuses, status) } } @@ -783,7 +783,7 @@ func (r *HTTPRouteReconciler) ensureParentsProgrammedCondition( SectionName: lo.ToPtr(gatewayv1beta1.SectionName(g.listenerName)), // TODO: set port after gateway port matching implemented: https://github.com/Kong/kubernetes-ingress-controller/issues/3016 }, - ControllerName: ControllerName, + ControllerName: GetControllerName(), Conditions: []metav1.Condition{ programmedCondition, }, diff --git a/internal/controllers/gateway/route_utils.go b/internal/controllers/gateway/route_utils.go index 339eaf9ab7..2fc6768009 100644 --- a/internal/controllers/gateway/route_utils.go +++ b/internal/controllers/gateway/route_utils.go @@ -146,7 +146,7 @@ func getSupportedGatewayForRoute[T types.RouteT](ctx context.Context, mgrc clien } // If the GatewayClass does not match this controller then skip it - if gatewayClass.Spec.ControllerName != ControllerName { + if gatewayClass.Spec.ControllerName != GetControllerName() { continue } diff --git a/internal/controllers/gateway/tcproute_controller.go b/internal/controllers/gateway/tcproute_controller.go index fd0fed9acf..910812c79c 100644 --- a/internal/controllers/gateway/tcproute_controller.go +++ b/internal/controllers/gateway/tcproute_controller.go @@ -428,7 +428,7 @@ func (r *TCPRouteReconciler) ensureGatewayReferenceStatusAdded( Namespace: (*gatewayv1alpha2.Namespace)(&gateway.gateway.Namespace), Name: (gatewayv1alpha2.ObjectName)(gateway.gateway.Name), }, - ControllerName: (gatewayv1alpha2.GatewayController)(ControllerName), + ControllerName: (gatewayv1alpha2.GatewayController)(GetControllerName()), Conditions: []metav1.Condition{{ Type: string(gatewayv1beta1.RouteConditionAccepted), Status: metav1.ConditionTrue, @@ -501,7 +501,7 @@ func (r *TCPRouteReconciler) ensureGatewayReferenceStatusRemoved(ctx context.Con // drop all status references to supported Gateway objects newStatuses := make([]gatewayv1alpha2.RouteParentStatus, 0) for _, status := range tcproute.Status.Parents { - if status.ControllerName != (gatewayv1alpha2.GatewayController)(ControllerName) { + if status.ControllerName != (gatewayv1alpha2.GatewayController)(GetControllerName()) { newStatuses = append(newStatuses, status) } } @@ -558,7 +558,7 @@ func (r *TCPRouteReconciler) ensureParentsProgrammedCondition( Name: gatewayv1alpha2.ObjectName(gateway.Name), // TODO: set port after gateway port matching implemented: https://github.com/Kong/kubernetes-ingress-controller/issues/3016 }, - ControllerName: gatewayv1alpha2.GatewayController(ControllerName), + ControllerName: gatewayv1alpha2.GatewayController(GetControllerName()), Conditions: []metav1.Condition{ programmedCondition, }, diff --git a/internal/controllers/gateway/tlsroute_controller.go b/internal/controllers/gateway/tlsroute_controller.go index a0ab983bca..0e959f75b3 100644 --- a/internal/controllers/gateway/tlsroute_controller.go +++ b/internal/controllers/gateway/tlsroute_controller.go @@ -414,7 +414,7 @@ func (r *TLSRouteReconciler) ensureGatewayReferenceStatusAdded(ctx context.Conte Namespace: (*gatewayv1alpha2.Namespace)(&gateway.gateway.Namespace), Name: gatewayv1alpha2.ObjectName(gateway.gateway.Name), }, - ControllerName: (gatewayv1alpha2.GatewayController)(ControllerName), + ControllerName: (gatewayv1alpha2.GatewayController)(GetControllerName()), Conditions: []metav1.Condition{{ Type: string(gatewayv1alpha2.RouteConditionAccepted), Status: metav1.ConditionTrue, @@ -492,7 +492,7 @@ func (r *TLSRouteReconciler) ensureGatewayReferenceStatusRemoved(ctx context.Con // drop all status references to supported Gateway objects newStatuses := make([]gatewayv1alpha2.RouteParentStatus, 0) for _, status := range tlsroute.Status.Parents { - if status.ControllerName != (gatewayv1alpha2.GatewayController)(ControllerName) { + if status.ControllerName != (gatewayv1alpha2.GatewayController)(GetControllerName()) { newStatuses = append(newStatuses, status) } } @@ -549,7 +549,7 @@ func (r *TLSRouteReconciler) ensureParentsProgrammedCondition( Name: gatewayv1alpha2.ObjectName(gateway.Name), // TODO: set port after gateway port matching implemented: https://github.com/Kong/kubernetes-ingress-controller/issues/3016 }, - ControllerName: gatewayv1alpha2.GatewayController(ControllerName), + ControllerName: gatewayv1alpha2.GatewayController(GetControllerName()), Conditions: []metav1.Condition{ programmedCondition, }, diff --git a/internal/controllers/gateway/udproute_controller.go b/internal/controllers/gateway/udproute_controller.go index 8004a6bc1c..c94d44ddf4 100644 --- a/internal/controllers/gateway/udproute_controller.go +++ b/internal/controllers/gateway/udproute_controller.go @@ -405,7 +405,7 @@ func (r *UDPRouteReconciler) ensureGatewayReferenceStatusAdded(ctx context.Conte Namespace: (*gatewayv1alpha2.Namespace)(&gateway.gateway.Namespace), Name: gatewayv1alpha2.ObjectName(gateway.gateway.Name), }, - ControllerName: (gatewayv1alpha2.GatewayController)(ControllerName), + ControllerName: (gatewayv1alpha2.GatewayController)(GetControllerName()), Conditions: []metav1.Condition{{ Type: string(gatewayv1alpha2.RouteConditionAccepted), Status: metav1.ConditionTrue, @@ -478,7 +478,7 @@ func (r *UDPRouteReconciler) ensureGatewayReferenceStatusRemoved(ctx context.Con // drop all status references to supported Gateway objects newStatuses := make([]gatewayv1alpha2.RouteParentStatus, 0) for _, status := range udproute.Status.Parents { - if status.ControllerName != (gatewayv1alpha2.GatewayController)(ControllerName) { + if status.ControllerName != (gatewayv1alpha2.GatewayController)(GetControllerName()) { newStatuses = append(newStatuses, status) } } diff --git a/internal/dataplane/client.go b/internal/dataplane/client.go index 41765bc343..3fba32a05a 100644 --- a/internal/dataplane/client.go +++ b/internal/dataplane/client.go @@ -29,4 +29,8 @@ type Client interface { // Update the data-plane by parsing the current configuring and applying // it to the backend API. Update(ctx context.Context) error + + // Shutdown shuts down the client, all the synchronization loops and all its + // internal data structures. + Shutdown(ctx context.Context) error } diff --git a/internal/dataplane/kong_client.go b/internal/dataplane/kong_client.go index 1ab58e28a7..10c54749db 100644 --- a/internal/dataplane/kong_client.go +++ b/internal/dataplane/kong_client.go @@ -125,6 +125,14 @@ type KongClient struct { // SHAs is a slice is configuration hashes send in last batch send. SHAs []string + + // TODO xxx: change this + kongClientCreator func(ctx context.Context, addr string) (*kong.Client, error) + + notifyChan chan []string + // TODO xxx + close chan struct{} + onceClose sync.Once } // NewKongClient provides a new KongClient object after connecting to the @@ -140,6 +148,7 @@ func NewKongClient( kongConfig sendconfig.Kong, eventRecorder record.EventRecorder, dbMode string, + kongClientCreator func(ctx context.Context, addr string) (*kong.Client, error), ) (*KongClient, error) { // build the client object cache := store.NewCacheStores() @@ -155,8 +164,13 @@ func NewKongClient( kongConfig: kongConfig, eventRecorder: eventRecorder, dbmode: dbMode, + kongClientCreator: kongClientCreator, + notifyChan: make(chan []string), + close: make(chan struct{}), } + go c.notifyLoop(ctx) + return c, nil } @@ -368,6 +382,14 @@ func (c *KongClient) DBMode() string { return c.dbmode } +// Shutdown shuts down the internal loops and synchronization workers. +func (c *KongClient) Shutdown(ctx context.Context) error { + c.onceClose.Do(func() { + close(c.close) + }) + return nil +} + // Update parses the Cache present in the client and converts current // Kubernetes state into Kong objects and state, and then ships the // resulting configuration to the data-plane (Kong Admin API). @@ -545,6 +567,91 @@ func (c *KongClient) sendToClient( return string(newConfigSHA), nil } +// notifyLoop is an inner loop listening on notifyChan which are received via +// Notify() calls. Each time it receives on notifyChan tt will take the provided +// list of addresses and update the internally held list of clients such that: +// - the internal list of kong clients contains only the provided addresses +// - if a client for a provided address already exists it's not recreated again +// (hence no external calls are made to check the provided endpoint if there +// exists a client already using it) +// - client that do not exist in the provided address list are removed if they +// are present in the current state +// +// This function whill acquire the internal lock to prevent the modification of +// internal clients list. +func (c *KongClient) notifyLoop(ctx context.Context) { + for { + select { + case <-c.close: + c.notifyChan = nil + return + + case addresses := <-c.notifyChan: + c.lock.Lock() + + toAdd := lo.Filter(addresses, func(addr string, _ int) bool { + // If we already have a client with a provided address then great, no need + // to do anything. + + // If we don't have a client with new address then filter it and add + // a client for this address. + return !lo.ContainsBy(c.kongConfig.Clients, func(cl sendconfig.ClientWithPluginStore) bool { + return addr == cl.BaseRootURL() + }) + }) + + var idxToRemove []int + for i, cl := range c.kongConfig.Clients { + // If the new address set contains a client that we already have then + // good, no need to do anything for it. + if lo.Contains(addresses, cl.BaseRootURL()) { + continue + } + // If the new address set does not contain an address that we already + // have then remove it. + idxToRemove = append(idxToRemove, i) + } + + for i := len(idxToRemove) - 1; i >= 0; i-- { + idx := idxToRemove[i] + c.kongConfig.Clients = append(c.kongConfig.Clients[:idx], c.kongConfig.Clients[idx+1:]...) + } + + for _, addr := range toAdd { + client, err := c.kongClientCreator(ctx, addr) + if err != nil { + c.logger.WithError(err).Errorf("failed to create a client for %s", addr) + continue + } + c.kongConfig.Clients = append(c.kongConfig.Clients, sendconfig.ClientWithPluginStore{ + Client: client, + PluginSchemaStore: util.NewPluginSchemaStore(client), + }) + } + + c.lock.Unlock() + } + } +} + +// Notify receives a list of addresses that KongClient should use from now on as +// a list of Kong Admin API endpoints. +func (c *KongClient) Notify(addresses []string) { + // Ensure here that we're not closed. + select { + case <-c.close: + return + default: + } + + // And here also listen on c.close to allow the notification to be interrupted + // by Shutdown(). + select { + case <-c.close: + case c.notifyChan <- addresses: + } +} + // ----------------------------------------------------------------------------- // Dataplane Client - Kong - Private // ----------------------------------------------------------------------------- diff --git a/internal/dataplane/kong_client_test.go b/internal/dataplane/kong_client_test.go index 6ce52dd969..09dcec9f48 100644 --- a/internal/dataplane/kong_client_test.go +++ b/internal/dataplane/kong_client_test.go @@ -1,8 +1,19 @@ package dataplane import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "sync/atomic" "testing" + "time" + "github.com/blang/semver/v4" + "github.com/go-logr/logr" + "github.com/kong/go-kong/kong" + "github.com/samber/lo" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" netv1 "k8s.io/api/networking/v1" @@ -11,6 +22,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/failures" + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/sendconfig" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" ) func TestUniqueObjects(t *testing.T) { @@ -105,3 +118,108 @@ var ( Kind: "Ingress", } ) + +func TestClientAddressesNotifications(t *testing.T) { + t.Parallel() + + var ( + ctx = context.Background() + logger = logrus.New() + expected = map[string]int{} + serverCalls int32 + ) + + const numberOfServers = 2 + + createTestServer := func() *httptest.Server { + return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // This test server serves as kong Admin API checking that we only get + // as many calls as new clients requests. + // That said: when we have 1 client with url1 and we receive a notification + // with url1 and url2 we should only create the second client with + // url2 and leave the existing one (for url1) in place and reuse it. + + atomic.AddInt32(&serverCalls, 1) + n := int(atomic.LoadInt32(&serverCalls)) + + if n > numberOfServers { + t.Errorf("clients should only call out to the server %d times, but we received %d requests", + numberOfServers, n, + ) + } + })) + } + + srv := createTestServer() + defer srv.Close() + expected[srv.URL] = 0 + + srv2 := createTestServer() + defer srv2.Close() + expected[srv2.URL] = 0 + + client, err := NewKongClient(ctx, logger, time.Second, "", false, true, util.ConfigDumpDiagnostic{}, + sendconfig.New(ctx, logr.Discard(), []*kong.Client{}, semver.Version{}, "off", 10, []string{}), + nil, + "off", + func(ctx context.Context, addr string) (*kong.Client, error) { + num, ok := expected[addr] + if !ok { + return nil, fmt.Errorf("got %s which was unexpected", addr) + } + if num != 0 { + return nil, fmt.Errorf("got %s more than once", addr) + } + expected[addr] = 1 + + kongClient, err := kong.NewTestClient(lo.ToPtr(addr), &http.Client{}) + require.NoError(t, err) + return kongClient, nil + }, + ) + require.NoError(t, err) + + requireClientsCountEventually := func(t *testing.T, c *KongClient, n int, args ...any) { + require.Eventually(t, func() bool { + c.lock.RLock() + defer c.lock.RUnlock() + return len(c.kongConfig.Clients) == n + }, 5*time.Second, 5*time.Millisecond, args..., + ) + } + + requireClientsCountEventually(t, client, 0, + "initially there should be 0 clients") + + client.Notify([]string{srv.URL}) + requireClientsCountEventually(t, client, 1, + "after notifying about a new address we should get 1 client eventually") + + client.Notify([]string{srv.URL}) + requireClientsCountEventually(t, client, 1, + "after notifying the same address there's no update in clients") + + client.Notify([]string{srv.URL, srv2.URL}) + requireClientsCountEventually(t, client, 2, + "after notifying new address set including the old already existing one we get both the old and the new") + + client.Notify([]string{srv.URL, srv2.URL}) + requireClientsCountEventually(t, client, 2, + "notifying again with the same set of URLs should not change the existing URLs") + + client.Notify([]string{srv.URL}) + requireClientsCountEventually(t, client, 1, + "notifying again with just one URL should decrease the set of URLs to just this one") + + client.Notify([]string{}) + requireClientsCountEventually(t, client, 0) + + // We could test here notifying about srv.URL and srv2.URL again but there's + // no data structure in the client that could notify us about a removal of + // a client which we could use here. + + require.NoError(t, client.Shutdown(context.Background())) + require.NoError(t, client.Shutdown(context.Background()), "closing second time shouldn't return an error") + + client.Notify([]string{}) +} diff --git a/internal/dataplane/synchronizer.go b/internal/dataplane/synchronizer.go index fca4013fbb..b3319fb2cd 100644 --- a/internal/dataplane/synchronizer.go +++ b/internal/dataplane/synchronizer.go @@ -174,6 +174,10 @@ func (p *Synchronizer) startUpdateServer(ctx context.Context) { } p.syncTicker.Stop() + if err := p.dataplaneClient.Shutdown(ctx); err != nil { + p.logger.Error(err, "failed to shut down the dataplane client") + } + p.lock.Lock() defer p.lock.Unlock() p.isServerRunning = false diff --git a/internal/dataplane/synchronizer_test.go b/internal/dataplane/synchronizer_test.go index 45eb5eb084..ecbe745047 100644 --- a/internal/dataplane/synchronizer_test.go +++ b/internal/dataplane/synchronizer_test.go @@ -44,7 +44,7 @@ func TestSynchronizer(t *testing.T) { t.Log("starting the dataplane synchronizer server") assert.NoError(t, sync.Start(ctx)) assert.Eventually(t, func() bool { return sync.IsRunning() }, time.Second, tick) - assert.True(t, sync.NeedLeaderElection()) + // assert.True(t, sync.NeedLeaderElection()) t.Log("verifying that trying to start the dataplane synchronizer while it's already started fails") err = sync.Start(ctx) @@ -102,6 +102,10 @@ func (c *fakeDataplaneClient) Update(ctx context.Context) error { return nil } +func (c *fakeDataplaneClient) Shutdown(ctx context.Context) error { + return nil +} + func (c *fakeDataplaneClient) totalUpdates() int { c.lock.RLock() defer c.lock.RUnlock() diff --git a/internal/manager/config.go b/internal/manager/config.go index 91da4f0864..a47340164c 100644 --- a/internal/manager/config.go +++ b/internal/manager/config.go @@ -6,8 +6,13 @@ import ( "regexp" "time" + "github.com/avast/retry-go/v4" "github.com/kong/go-kong/kong" + "github.com/sirupsen/logrus" "github.com/spf13/pflag" + discoveryv1 "k8s.io/api/discovery/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -17,6 +22,7 @@ import ( "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v2/internal/admission" "github.com/kong/kubernetes-ingress-controller/v2/internal/annotations" + "github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/configuration" "github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane" ) @@ -52,7 +58,8 @@ type Config struct { APIServerBurst int MetricsAddr string ProbeAddr string - KongAdminURL []string + KongAdminURLs []string + KongAdminSvc types.NamespacedName ProxySyncSeconds float32 ProxyTimeoutSeconds float32 @@ -115,6 +122,11 @@ type Config struct { // Validate validates the config. It should be used to validate the config variables' interdependencies. // When a single variable is to be validated, NewValidatedValue should be used. func (c *Config) Validate() error { + // TODO xxx: leave this out to allow debugging before this is complete + // if c.flagSet.Changed("kong-admin-svc") && c.flagSet.Changed("kong-admin-url") { + // return fmt.Errorf("can't set both kong-admin-svc and kong-admin-url") + // } + return nil } @@ -155,9 +167,13 @@ func (c *Config) FlagSet() *pflag.FlagSet { flagSet.IntVar(&c.APIServerBurst, "apiserver-burst", 300, "The Kubernetes API RateLimiter maximum burst queries per second") flagSet.StringVar(&c.MetricsAddr, "metrics-bind-address", fmt.Sprintf(":%v", MetricsPort), "The address the metric endpoint binds to.") flagSet.StringVar(&c.ProbeAddr, "health-probe-bind-address", fmt.Sprintf(":%v", HealthzPort), "The address the probe endpoint binds to.") - flagSet.StringSliceVar(&c.KongAdminURL, "kong-admin-url", []string{"http://localhost:8001"}, + flagSet.StringSliceVar(&c.KongAdminURLs, "kong-admin-url", []string{"http://localhost:8001"}, `Kong Admin URL(s) to connect to in the format "protocol://address:port". `+ `More than 1 URL can be provided, in such case the flag should be used multiple times or a corresponding env variable should use comma delimited addresses.`) + flagSet.Var(NewValidatedValue(&c.KongAdminSvc, namespacedNameFromFlagValue), "kong-admin-svc", + // TODO xxx + `Kong Admin Service name to use for Kong Gateway service discovery. Namespaced name can be specified as namespace_name/service_name. Otherwise just use the Service name.`) + flagSet.Float32Var(&c.ProxySyncSeconds, "proxy-sync-seconds", dataplane.DefaultSyncSeconds, "Define the rate (in seconds) in which configuration updates will be applied to the Kong Admin API.", ) @@ -166,7 +182,7 @@ func (c *Config) FlagSet() *pflag.FlagSet { ) // Kubernetes configurations - flagSet.Var(NewValidatedValueWithDefault(&c.GatewayAPIControllerName, gatewayAPIControllerNameFromFlagValue, string(gateway.ControllerName)), "gateway-api-controller-name", "The controller name to match on Gateway API resources.") + flagSet.Var(NewValidatedValueWithDefault(&c.GatewayAPIControllerName, gatewayAPIControllerNameFromFlagValue, string(gateway.GetControllerName())), "gateway-api-controller-name", "The controller name to match on Gateway API resources.") flagSet.StringVar(&c.KubeconfigPath, "kubeconfig", "", "Path to the kubeconfig file.") flagSet.StringVar(&c.IngressClassName, "ingress-class", annotations.DefaultIngressClass, `Name of the ingress class to route through this controller.`) flagSet.StringVar(&c.LeaderElectionID, "election-id", "5b374a9e.konghq.com", `Election id to use for status update.`) @@ -174,7 +190,8 @@ func (c *Config) FlagSet() *pflag.FlagSet { flagSet.StringSliceVar(&c.FilterTags, "kong-admin-filter-tag", []string{"managed-by-ingress-controller"}, "The tag used to manage and filter entities in Kong. This flag can be specified multiple times to specify multiple tags. This setting will be silently ignored if the Kong instance has no tags support.") flagSet.IntVar(&c.Concurrency, "kong-admin-concurrency", 10, "Max number of concurrent requests sent to Kong's Admin API.") flagSet.StringSliceVar(&c.WatchNamespaces, "watch-namespace", nil, - `Namespace(s) to watch for Kubernetes resources. Defaults to all namespaces. To watch multiple namespaces, use a comma-separated list of namespaces.`) + `Namespace(s) to watch for Kubernetes resources. Defaults to all namespaces.`+ + `To watch multiple namespaces, use a comma-separated list of namespaces.`) // Ingress status flagSet.Var(NewValidatedValue(&c.PublishService, namespacedNameFromFlagValue), "publish-service", @@ -252,19 +269,63 @@ func (c *Config) FlagSet() *pflag.FlagSet { return flagSet } -// getKongClients returns the kong clients given the provided urls, workspace name -// and adminAPIConfig. -func getKongClients( - ctx context.Context, urls []string, workspace string, adminAPIConfig adminapi.HTTPClientOpts, +// getKongClients returns the kong clients given the config. +func (c *Config) getKongClients( + ctx context.Context, ) ([]*kong.Client, error) { - httpclient, err := adminapi.MakeHTTPClient(&adminAPIConfig) + if c.KongAdminToken != "" { + c.KongAdminAPIConfig.Headers = append(c.KongAdminAPIConfig.Headers, "kong-admin-token:"+c.KongAdminToken) + } + httpclient, err := adminapi.MakeHTTPClient(&c.KongAdminAPIConfig) if err != nil { return nil, err } - clients := make([]*kong.Client, 0, len(urls)) - for _, url := range urls { - client, err := adminapi.GetKongClientForWorkspace(ctx, url, workspace, httpclient) + var addresses []string + + // If kong-admin-svc flag has been specified then use it to get the list + // of Kong Admin API endpoints. + if c.KongAdminSvc.Name != "" { + kubeClient, err := c.GetKubeClient() + if err != nil { + return nil, err + } + + // Retry this as we may either encounter an error of get 0 addresses, + // which can mean that Kong instances meant to be configured by this controller + // are not yet ready. + // If we end up in a situation where none of them are ready then bail + // because we have more code that relies on the configuration of Kong + // instance and without an address there's no way to initialize the + // configuration validation and sending code. + err = retry.Do(func() error { + var err error + addresses, err = GetEndpointslicesForService(ctx, kubeClient, c.KongAdminSvc) + if err != nil { + return err + } + if len(addresses) == 0 { + return fmt.Errorf("no endpoints for kong admin service: %q", c.KongAdminSvc) + } + return nil + }, + retry.Attempts(60), + retry.DelayType(retry.FixedDelay), + retry.Delay(time.Second), + retry.OnRetry(func(_ uint, err error) { + logrus.New().WithError(err).Error("failed to create kong client(s)") + }), + ) + if err != nil { + return nil, err + } + } else { + addresses = c.KongAdminURLs + } + + clients := make([]*kong.Client, 0, len(c.KongAdminURLs)) + for _, address := range addresses { + client, err := adminapi.GetKongClientForWorkspace(ctx, address, c.KongWorkspace, httpclient) if err != nil { return nil, err } @@ -273,6 +334,40 @@ func getKongClients( return clients, nil } +// GetEndpointslicesForService performs an endpoint lookup, using provided kubeClient +// to list provided service's endpointslices. +func GetEndpointslicesForService(ctx context.Context, kubeClient client.Client, service types.NamespacedName) ([]string, error) { + // Get all the endpointslices assigned to the provided service. + labelReq, err := labels.NewRequirement("kubernetes.io/service-name", selection.Equals, []string{service.Name}) + if err != nil { + return nil, err + } + + var ( + addresses []string + continueToken string + ) + for { + var endpointsList discoveryv1.EndpointSliceList + if err := kubeClient.List(ctx, &endpointsList, &client.ListOptions{ + LabelSelector: labels.NewSelector().Add(*labelReq), + Namespace: service.Namespace, + Continue: continueToken, + }); err != nil { + return nil, err + } + + for _, es := range endpointsList.Items { + addresses = append(addresses, configuration.AddressesFromEndpointSlice(es)...) + } + + if endpointsList.Continue == "" { + break + } + } + return addresses, nil +} + func (c *Config) GetKubeconfig() (*rest.Config, error) { config, err := clientcmd.BuildConfigFromFlags(c.APIServerHost, c.KubeconfigPath) if err != nil { diff --git a/internal/manager/config_validation_test.go b/internal/manager/config_validation_test.go index 95dea28fc4..df7b14831b 100644 --- a/internal/manager/config_validation_test.go +++ b/internal/manager/config_validation_test.go @@ -32,7 +32,7 @@ func TestConfigValidatedVars(t *testing.T) { ExtractValueFn: func(c Config) any { return c.GatewayAPIControllerName }, - ExpectedValue: string(gateway.ControllerName), + ExpectedValue: string(gateway.GetControllerName()), }, { Input: "%invalid_controller_name$", diff --git a/internal/manager/controllerdef.go b/internal/manager/controllerdef.go index 0ae07f3c7f..4545763674 100644 --- a/internal/manager/controllerdef.go +++ b/internal/manager/controllerdef.go @@ -63,6 +63,7 @@ func setupControllers( kubernetesStatusQueue *status.Queue, c *Config, featureGates map[string]bool, + kongAdminAPIEndpointsNotifier configuration.EndpointsNotifier, ) ([]ControllerDef, error) { restMapper := mgr.GetClient().RESTMapper() @@ -84,6 +85,19 @@ func setupControllers( referenceIndexers := ctrlref.NewCacheIndexers() controllers := []ControllerDef{ + // --------------------------------------------------------------------------- + // Kong Gateway Admin API Service discovery + // --------------------------------------------------------------------------- + { + Enabled: c.KongAdminSvc.Name != "", + Controller: &configuration.KongAdminAPIServiceReconciler{ + Client: mgr.GetClient(), + ServiceNN: c.KongAdminSvc, + Log: ctrl.Log.WithName("controllers").WithName("KongAdminAPIService"), + CacheSyncTimeout: c.CacheSyncTimeout, + EndpointsNotifier: kongAdminAPIEndpointsNotifier, + }, + }, // --------------------------------------------------------------------------- // Core API Controllers // --------------------------------------------------------------------------- diff --git a/internal/manager/run.go b/internal/manager/run.go index 1b2174f585..63b90ca6b5 100644 --- a/internal/manager/run.go +++ b/internal/manager/run.go @@ -9,6 +9,7 @@ import ( "time" "github.com/blang/semver/v4" + "github.com/kong/go-kong/kong" "github.com/sirupsen/logrus" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" @@ -18,6 +19,7 @@ import ( gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" gatewayv1beta1 "sigs.k8s.io/gateway-api/apis/v1beta1" + "github.com/kong/kubernetes-ingress-controller/v2/internal/adminapi" "github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane" "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/sendconfig" @@ -42,32 +44,30 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d setupLog.Info("starting controller manager", "release", metadata.Release, "repo", metadata.Repo, "commit", metadata.Commit) setupLog.Info("the ingress class name has been set", "value", c.IngressClassName) + gateway.SetControllerName(gatewayv1beta1.GatewayController(c.GatewayAPIControllerName)) + setupLog.Info("getting enabled options and features") featureGates, err := setupFeatureGates(setupLog, c.FeatureGates) if err != nil { return fmt.Errorf("failed to configure feature gates: %w", err) } - setupLog.Info("getting the kubernetes client configuration") - kubeconfig, err := c.GetKubeconfig() - if err != nil { - return fmt.Errorf("get kubeconfig from file %q: %w", c.KubeconfigPath, err) - } - setupLog.Info("getting the kong admin api client configuration") - if c.KongAdminToken != "" { - c.KongAdminAPIConfig.Headers = append(c.KongAdminAPIConfig.Headers, "kong-admin-token:"+c.KongAdminToken) - } + // TODO Just do it once? - kongClients, err := getKongClients(ctx, c.KongAdminURL, c.KongWorkspace, c.KongAdminAPIConfig) + setupLog.Info("getting the kong admin api client configuration") + kongClients, err := c.getKongClients(ctx) if err != nil { return fmt.Errorf("unable to build kong api client(s): %w", err) } + // ------------------------------------------------------------------------- + // Get Kong configuration root(s) to validate them and extract Kong's version. kongRoots, err := kongconfig.GetRoots(ctx, setupLog, c.KongAdminInitializationRetries, c.KongAdminInitializationRetryDelay, kongClients) if err != nil { return fmt.Errorf("could not retrieve Kong admin root(s): %w", err) } + dbMode, v, err := kongconfig.ValidateRoots(kongRoots, c.SkipCACertificates) if err != nil { return fmt.Errorf("could not validate Kong admin root(s) configuration: %w", err) @@ -81,6 +81,13 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d if err != nil { return fmt.Errorf("unable to setup controller options: %w", err) } + + setupLog.Info("getting the kubernetes client configuration") + kubeconfig, err := c.GetKubeconfig() + if err != nil { + return fmt.Errorf("get kubeconfig from file %q: %w", c.KubeconfigPath, err) + } + mgr, err := ctrl.NewManager(kubeconfig, controllerOpts) if err != nil { return fmt.Errorf("unable to start controller manager: %w", err) @@ -104,6 +111,16 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d kongConfig, eventRecorder, dbMode, + func(ctx context.Context, addr string) (*kong.Client, error) { + if c.KongAdminToken != "" { + c.KongAdminAPIConfig.Headers = append(c.KongAdminAPIConfig.Headers, "kong-admin-token:"+c.KongAdminToken) + } + httpclient, err := adminapi.MakeHTTPClient(&c.KongAdminAPIConfig) + if err != nil { + return nil, err + } + return adminapi.GetKongClientForWorkspace(ctx, addr, c.KongWorkspace, httpclient) + }, ) if err != nil { return fmt.Errorf("failed to initialize kong data-plane client: %w", err) @@ -135,11 +152,9 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d return err } - gateway.ControllerName = gatewayv1beta1.GatewayController(c.GatewayAPIControllerName) - setupLog.Info("Starting Enabled Controllers") controllers, err := setupControllers(mgr, dataplaneClient, - dataplaneAddressFinder, udpDataplaneAddressFinder, kubernetesStatusQueue, c, featureGates) + dataplaneAddressFinder, udpDataplaneAddressFinder, kubernetesStatusQueue, c, featureGates, dataplaneClient) if err != nil { return fmt.Errorf("unable to setup controller as expected %w", err) } diff --git a/internal/manager/setup.go b/internal/manager/setup.go index b630b65429..65924ea3ab 100644 --- a/internal/manager/setup.go +++ b/internal/manager/setup.go @@ -152,11 +152,11 @@ func setupAdmissionServer( return nil } - kongclients, err := getKongClients(ctx, - managerConfig.KongAdminURL, - managerConfig.KongWorkspace, - managerConfig.KongAdminAPIConfig, - ) + kongclients, err := managerConfig.getKongClients(ctx) + // managerClient, + // managerConfig.KongAdminURLs, + // managerConfig.KongWorkspace, + // managerConfig.KongAdminAPIConfig, if err != nil { return err } diff --git a/internal/manager/utils/kongconfig/root.go b/internal/manager/utils/kongconfig/root.go index aabdf25bd2..6d9de276fc 100644 --- a/internal/manager/utils/kongconfig/root.go +++ b/internal/manager/utils/kongconfig/root.go @@ -25,9 +25,9 @@ func ValidateRoots(roots []Root, skipCACerts bool) (string, kong.Version, error) } uniqs := lo.UniqBy(roots, getRootKeyFunc(skipCACerts)) - if len(uniqs) > 1 { + if len(uniqs) != 1 { return "", kong.Version{}, - fmt.Errorf("there should only be one dbmode:version combination across configured kong instances while there are: %v", uniqs) + fmt.Errorf("there should only be one dbmode:version combination across configured kong instances while there are (%d): %v", len(uniqs), uniqs) } dbMode, err := DBModeFromRoot(uniqs[0]) diff --git a/skaffold.yaml b/skaffold.yaml index b998d5d3c6..73a890d13e 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -46,7 +46,22 @@ profiles: manifests: kustomize: paths: - - config/debug + - config/debug/base + build: + artifacts: + - image: kic-placeholder + docker: + dockerfile: Dockerfile.debug + target: debug + buildArgs: + TAG: ${{ .TAG }} + COMMIT: ${{ .COMMIT }} + REPO_INFO: ${{ .REPO_INFO }} +- name: debug_multi_gw + manifests: + kustomize: + paths: + - config/debug/multi_gw build: artifacts: - image: kic-placeholder diff --git a/staticcheck.conf b/staticcheck.conf index 8bb9d75150..3c3ac2082d 100644 --- a/staticcheck.conf +++ b/staticcheck.conf @@ -1,2 +1,2 @@ # Configuration options described in: https://staticcheck.io/docs/configuration/options/ -checks = ["all", "-ST1000", "-ST1005"] +checks = ["all", "-ST1000", "-ST1005", "-U1000"] diff --git a/test/conformance/gateway_conformance_test.go b/test/conformance/gateway_conformance_test.go index b398e1835a..e59caeca47 100644 --- a/test/conformance/gateway_conformance_test.go +++ b/test/conformance/gateway_conformance_test.go @@ -62,7 +62,7 @@ func TestGatewayConformance(t *testing.T) { }, }, Spec: gatewayv1beta1.GatewayClassSpec{ - ControllerName: gateway.ControllerName, + ControllerName: gateway.GetControllerName(), }, } require.NoError(t, client.Create(ctx, gwc)) diff --git a/test/e2e/helpers_gateway_test.go b/test/e2e/helpers_gateway_test.go index a08b9e87b6..5db002f71a 100644 --- a/test/e2e/helpers_gateway_test.go +++ b/test/e2e/helpers_gateway_test.go @@ -47,7 +47,7 @@ func deployGateway(ctx context.Context, t *testing.T, env environments.Environme }, }, Spec: gatewayv1beta1.GatewayClassSpec{ - ControllerName: gateway.ControllerName, + ControllerName: gateway.GetControllerName(), }, } supportedGatewayClass, err = gc.GatewayV1beta1().GatewayClasses().Create(ctx, supportedGatewayClass, metav1.CreateOptions{}) @@ -111,7 +111,7 @@ func deployGatewayWithTCPListener(ctx context.Context, t *testing.T, env environ }, }, Spec: gatewayv1beta1.GatewayClassSpec{ - ControllerName: gateway.ControllerName, + ControllerName: gateway.GetControllerName(), }, } supportedGatewayClass, err = gc.GatewayV1beta1().GatewayClasses().Create(ctx, supportedGatewayClass, metav1.CreateOptions{}) diff --git a/test/integration/helpers_test.go b/test/integration/helpers_test.go index 89523a2eb0..7a8e4a61f9 100644 --- a/test/integration/helpers_test.go +++ b/test/integration/helpers_test.go @@ -48,7 +48,7 @@ func DeployGatewayClass(ctx context.Context, client *gatewayclient.Clientset, ga }, }, Spec: gatewayv1beta1.GatewayClassSpec{ - ControllerName: gateway.ControllerName, + ControllerName: gateway.GetControllerName(), }, } @@ -250,7 +250,7 @@ func gatewayLinkStatusMatches( t.Logf("error getting http route: %v", err) } else { return newRouteParentsStatus(route.Status.Parents). - check(verifyLinked, string(gateway.ControllerName)) + check(verifyLinked, string(gateway.GetControllerName())) } case (gatewayv1beta1.ProtocolType)(gatewayv1alpha2.TCPProtocolType): route, err := c.GatewayV1alpha2().TCPRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) @@ -258,7 +258,7 @@ func gatewayLinkStatusMatches( t.Logf("error getting tcp route: %v", err) } else { return newRouteParentsStatus(route.Status.Parents). - check(verifyLinked, string(gateway.ControllerName)) + check(verifyLinked, string(gateway.GetControllerName())) } case (gatewayv1beta1.ProtocolType)(gatewayv1alpha2.UDPProtocolType): route, err := c.GatewayV1alpha2().UDPRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) @@ -266,7 +266,7 @@ func gatewayLinkStatusMatches( t.Logf("error getting udp route: %v", err) } else { return newRouteParentsStatus(route.Status.Parents). - check(verifyLinked, string(gateway.ControllerName)) + check(verifyLinked, string(gateway.GetControllerName())) } case (gatewayv1beta1.ProtocolType)(gatewayv1alpha2.TLSProtocolType): route, err := c.GatewayV1alpha2().TLSRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) @@ -274,7 +274,7 @@ func gatewayLinkStatusMatches( t.Logf("error getting tls route: %v", err) } else { return newRouteParentsStatus(route.Status.Parents). - check(verifyLinked, string(gateway.ControllerName)) + check(verifyLinked, string(gateway.GetControllerName())) } default: t.Fatalf("protocol %s not supported", string(protocolType)) @@ -331,28 +331,28 @@ func verifyProgrammedConditionStatus(t *testing.T, if err != nil { t.Logf("error getting http route: %v", err) } else { - return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.ControllerName, expectedStatus) + return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.GetControllerName(), expectedStatus) } case gateway.TCPProtocolType: route, err := c.GatewayV1alpha2().TCPRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { t.Logf("error getting tcp route: %v", err) } else { - return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.ControllerName, expectedStatus) + return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.GetControllerName(), expectedStatus) } case gateway.TLSProtocolType: route, err := c.GatewayV1alpha2().TLSRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { t.Logf("error getting tls route: %v", err) } else { - return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.ControllerName, expectedStatus) + return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.GetControllerName(), expectedStatus) } case gateway.UDPProtocolType: route, err := c.GatewayV1alpha2().UDPRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) if err != nil { t.Logf("error getting udp route: %v", err) } else { - return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.ControllerName, expectedStatus) + return parentStatusContainsProgrammedCondition(route.Status.Parents, gateway.GetControllerName(), expectedStatus) } default: t.Fatalf("protocol %s not supported", string(protocolType))