From 84d92702e528a4c8e5b8db3afadbef8093f579c8 Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Wed, 23 Mar 2022 12:04:40 -0700 Subject: [PATCH 1/6] chore(*) update version in kubebuilder scaffold --- PROJECT | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/PROJECT b/PROJECT index efb3f9bf62..154b75d6b4 100644 --- a/PROJECT +++ b/PROJECT @@ -3,7 +3,7 @@ layout: - go.kubebuilder.io/v3 multigroup: true projectName: kubernetes-ingress-controller -repo: github.com/kong/kubernetes-ingress-controller +repo: github.com/kong/kubernetes-ingress-controller/v2 resources: - api: crdVersion: v1 From b04322809cf2ef9c052c46fbb08d117971f4a43d Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Wed, 23 Mar 2022 13:24:38 -0700 Subject: [PATCH 2/6] feat(gw) add UDPRoute controller --- PROJECT | 9 + config/rbac/role.yaml | 15 + .../all-in-one-dbless-k4k8s-enterprise.yaml | 15 + deploy/single/all-in-one-dbless.yaml | 15 + .../all-in-one-postgres-enterprise.yaml | 15 + deploy/single/all-in-one-postgres.yaml | 15 + examples/gateway-udproute.yaml | 120 +++++ internal/controllers/gateway/route_utils.go | 2 + .../gateway/udproute_controller.go | 450 ++++++++++++++++++ internal/manager/controllerdef.go | 15 + internal/store/fake_store.go | 15 +- internal/store/fake_store_test.go | 51 ++ internal/store/store.go | 25 + 13 files changed, 759 insertions(+), 3 deletions(-) create mode 100644 examples/gateway-udproute.yaml create mode 100644 internal/controllers/gateway/udproute_controller.go diff --git a/PROJECT b/PROJECT index 154b75d6b4..df781d39e4 100644 --- a/PROJECT +++ b/PROJECT @@ -70,4 +70,13 @@ resources: kind: HTTPRoute path: github.com/kubernetes-sigs/gateway-api/apis/v1alpha2 version: v1alpha2 +- api: + crdVersion: v1 + namespaced: true + controller: true + domain: networking.k8s.io + group: gateway + kind: UDPRoute + path: github.com/kubernetes-sigs/gateway-api/apis/v1alpha2 + version: v1alpha2 version: "3" diff --git a/config/rbac/role.yaml b/config/rbac/role.yaml index 9ae73afb42..95c9f8ce3c 100644 --- a/config/rbac/role.yaml +++ b/config/rbac/role.yaml @@ -232,6 +232,21 @@ rules: verbs: - get - update +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes + verbs: + - get + - list + - watch +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes/status + verbs: + - get + - update - apiGroups: - networking.internal.knative.dev resources: diff --git a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml index 4a0d038c46..d990370c55 100644 --- a/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml +++ b/deploy/single/all-in-one-dbless-k4k8s-enterprise.yaml @@ -1182,6 +1182,21 @@ rules: verbs: - get - update +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes + verbs: + - get + - list + - watch +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes/status + verbs: + - get + - update - apiGroups: - networking.internal.knative.dev resources: diff --git a/deploy/single/all-in-one-dbless.yaml b/deploy/single/all-in-one-dbless.yaml index 40fe5bc301..0810c7cace 100644 --- a/deploy/single/all-in-one-dbless.yaml +++ b/deploy/single/all-in-one-dbless.yaml @@ -1182,6 +1182,21 @@ rules: verbs: - get - update +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes + verbs: + - get + - list + - watch +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes/status + verbs: + - get + - update - apiGroups: - networking.internal.knative.dev resources: diff --git a/deploy/single/all-in-one-postgres-enterprise.yaml b/deploy/single/all-in-one-postgres-enterprise.yaml index 4936fdd180..a0a1ffd100 100644 --- a/deploy/single/all-in-one-postgres-enterprise.yaml +++ b/deploy/single/all-in-one-postgres-enterprise.yaml @@ -1182,6 +1182,21 @@ rules: verbs: - get - update +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes + verbs: + - get + - list + - watch +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes/status + verbs: + - get + - update - apiGroups: - networking.internal.knative.dev resources: diff --git a/deploy/single/all-in-one-postgres.yaml b/deploy/single/all-in-one-postgres.yaml index 15665e56ba..571d298d61 100644 --- a/deploy/single/all-in-one-postgres.yaml +++ b/deploy/single/all-in-one-postgres.yaml @@ -1182,6 +1182,21 @@ rules: verbs: - get - update +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes + verbs: + - get + - list + - watch +- apiGroups: + - gateway.networking.k8s.io + resources: + - udproutes/status + verbs: + - get + - update - apiGroups: - networking.internal.knative.dev resources: diff --git a/examples/gateway-udproute.yaml b/examples/gateway-udproute.yaml new file mode 100644 index 0000000000..da9364ef6a --- /dev/null +++ b/examples/gateway-udproute.yaml @@ -0,0 +1,120 @@ +# WARNING: Gateway APIs support is still experimental. Use as your own risk. +# +# NOTE: You need to install the Gateway APIs CRDs before using this example, +# they are external and can be deployed with the following one-liner: +# +# kubectl kustomize https://github.com/kubernetes-sigs/gateway-api.git/config/crd?ref=master | kubectl apply -f - +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: coredns +data: + Corefile: |- + .:53 { + errors + health { + lameduck 5s + } + ready + kubernetes cluster.local in-addr.arpa ip6.arpa { + pods insecure + fallthrough in-addr.arpa ip6.arpa + ttl 30 + } + forward . /etc/resolv.conf { + max_concurrent 1000 + } + cache 30 + loop + reload + loadbalance + } +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: coredns + labels: + app: coredns +spec: + replicas: 1 + selector: + matchLabels: + app: coredns + template: + metadata: + labels: + app: coredns + spec: + containers: + - args: + - -conf + - /etc/coredns/Corefile + image: k8s.gcr.io/coredns/coredns:v1.8.6 + imagePullPolicy: IfNotPresent + name: coredns + ports: + - containerPort: 53 + protocol: UDP + volumeMounts: + - mountPath: /etc/coredns + name: config-volume + volumes: + - configMap: + defaultMode: 420 + items: + - key: Corefile + path: Corefile + name: coredns + name: config-volume +--- +apiVersion: v1 +kind: Service +metadata: + name: coredns +spec: + ports: + - port: 53 + protocol: UDP + targetPort: 53 + selector: + app: coredns + type: ClusterIP +--- +kind: GatewayClass +apiVersion: gateway.networking.k8s.io/v1alpha2 +metadata: + name: kong +spec: + controllerName: konghq.com/kic-gateway-controller +--- +kind: Gateway +apiVersion: gateway.networking.k8s.io/v1alpha2 +metadata: + name: kong +spec: + gatewayClassName: kong + listeners: + - name: http + protocol: HTTP + port: 80 + - name: dns + protocol: UDP + port: 53 +--- +apiVersion: gateway.networking.k8s.io/v1alpha2 +kind: UDPRoute +metadata: + name: coredns +spec: + parentRefs: + - name: kong + rules: + - matches: + - destinationAddresses: + - type: IPAddress + value: 192.0.2.254 + backendRefs: + - name: coredns + port: 53 diff --git a/internal/controllers/gateway/route_utils.go b/internal/controllers/gateway/route_utils.go index 38a899cefd..90cb600ed7 100644 --- a/internal/controllers/gateway/route_utils.go +++ b/internal/controllers/gateway/route_utils.go @@ -22,6 +22,8 @@ func parentRefsForRoute(obj client.Object) ([]gatewayv1alpha2.ParentReference, e switch v := obj.(type) { case *gatewayv1alpha2.HTTPRoute: return v.Spec.ParentRefs, nil + case *gatewayv1alpha2.UDPRoute: + return v.Spec.ParentRefs, nil default: return nil, fmt.Errorf("cant determine parent gateway for unsupported type %s", reflect.TypeOf(obj)) } diff --git a/internal/controllers/gateway/udproute_controller.go b/internal/controllers/gateway/udproute_controller.go new file mode 100644 index 0000000000..3e223751e6 --- /dev/null +++ b/internal/controllers/gateway/udproute_controller.go @@ -0,0 +1,450 @@ +package gateway + +import ( + "context" + "fmt" + "reflect" + + "github.com/go-logr/logr" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/predicate" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane" +) + +// ----------------------------------------------------------------------------- +// UDPRoute Controller - UDPRouteReconciler +// ----------------------------------------------------------------------------- + +// UDPRouteReconciler reconciles an UDPRoute object +type UDPRouteReconciler struct { + client.Client + + Log logr.Logger + Scheme *runtime.Scheme + DataplaneClient *dataplane.KongClient +} + +// SetupWithManager sets up the controller with the Manager. +func (r *UDPRouteReconciler) SetupWithManager(mgr ctrl.Manager) error { + c, err := controller.New("udproute-controller", mgr, controller.Options{ + Reconciler: r, + Log: r.Log, + }) + if err != nil { + return err + } + + // if a GatewayClass updates then we need to enqueue the linked UDPRoutes to + // ensure that any route objects that may have been orphaned by that change get + // removed from data-plane configurations, and any routes that are now supported + // due to that change get added to data-plane configurations. + if err := c.Watch( + &source.Kind{Type: &gatewayv1alpha2.GatewayClass{}}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGatewayClass), + predicate.Funcs{ + GenericFunc: func(e event.GenericEvent) bool { return false }, // we don't need to enqueue from generic + CreateFunc: func(e event.CreateEvent) bool { return isGatewayClassEventInClass(r.Log, e) }, + UpdateFunc: func(e event.UpdateEvent) bool { return isGatewayClassEventInClass(r.Log, e) }, + DeleteFunc: func(e event.DeleteEvent) bool { return isGatewayClassEventInClass(r.Log, e) }, + }, + ); err != nil { + return err + } + + // if a Gateway updates then we need to enqueue the linked UDPRoutes to + // ensure that any route objects that may have been orphaned by that change get + // removed from data-plane configurations, and any routes that are now supported + // due to that change get added to data-plane configurations. + if err := c.Watch( + &source.Kind{Type: &gatewayv1alpha2.Gateway{}}, + handler.EnqueueRequestsFromMapFunc(r.listUDPRoutesForGateway), + ); err != nil { + return err + } + + // because of the additional burden of having to manage reference data-plane + // configurations for UDPRoute objects in the underlying Kong Gateway, we + // simply reconcile ALL UDPRoute objects. This allows us to drop the backend + // data-plane config for an UDPRoute if it somehow becomes disconnected from + // a supported Gateway and GatewayClass. + return c.Watch( + &source.Kind{Type: &gatewayv1alpha2.UDPRoute{}}, + &handler.EnqueueRequestForObject{}, + ) +} + +// ----------------------------------------------------------------------------- +// UDPRoute Controller - Event Handlers +// ----------------------------------------------------------------------------- + +// listUDPRoutesForGatewayClass is a controller-runtime event.Handler which +// produces a list of UDPRoutes which were bound to a Gateway which is or was +// bound to this GatewayClass. This implementation effectively does a map-reduce +// to determine the UDPRoutes as the relationship has to be discovered entirely +// by object reference. This relies heavily on the inherent performance benefits of +// the cached manager client to avoid API overhead. +func (r *UDPRouteReconciler) listUDPRoutesForGatewayClass(obj client.Object) []reconcile.Request { + // verify that the object is a GatewayClass + gwc, ok := obj.(*gatewayv1alpha2.GatewayClass) + if !ok { + r.Log.Error(fmt.Errorf("invalid type"), "found invalid type in event handlers", "expected", "GatewayClass", "found", reflect.TypeOf(obj)) + return nil + } + + // map all Gateway objects + gatewayList := gatewayv1alpha2.GatewayList{} + if err := r.Client.List(context.Background(), &gatewayList); err != nil { + r.Log.Error(err, "failed to list gateway objects from the cached client") + return nil + } + + // reduce for in-class Gateway objects + gateways := make(map[string]map[string]struct{}) + for _, gateway := range gatewayList.Items { + if string(gateway.Spec.GatewayClassName) == gwc.Name { + _, ok := gateways[gateway.Namespace] + if !ok { + gateways[gateway.Namespace] = make(map[string]struct{}) + } + gateways[gateway.Namespace][gateway.Name] = struct{}{} + } + } + + // if there are no Gateways associated with this GatewayClass we can stop + if len(gateways) == 0 { + return nil + } + + // map all UDPRoute objects + udprouteList := gatewayv1alpha2.UDPRouteList{} + if err := r.Client.List(context.Background(), &udprouteList); err != nil { + r.Log.Error(err, "failed to list udproute objects from the cached client") + return nil + } + + // reduce for UDPRoute objects bound to an in-class Gateway + queue := make([]reconcile.Request, 0) + for _, udproute := range udprouteList.Items { + // check the udproute's parentRefs + for _, parentRef := range udproute.Spec.ParentRefs { + // determine what namespace the parent gateway is in + namespace := udproute.Namespace + if parentRef.Namespace != nil { + namespace = string(*parentRef.Namespace) + } + + // if the gateway matches one of our previously filtered gateways, enqueue the route + if gatewaysForNamespace, ok := gateways[namespace]; ok { + if _, ok := gatewaysForNamespace[string(parentRef.Name)]; ok { + queue = append(queue, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: udproute.Namespace, + Name: udproute.Name, + }, + }) + } + } + } + } + + return queue +} + +// listUDPRoutesForGateway is a controller-runtime event.Handler which enqueues UDPRoute +// objects for changes to Gateway objects. The relationship between UDPRoutes and their +// Gateways (by way of .Spec.ParentRefs) must be discovered by object relation, so this +// implementation effectively does a map reduce to determine inclusion. This relies heavily +// on the inherent performance benefits of the cached manager client to avoid API overhead. +// +// NOTE: due to a race condition where a Gateway and a GatewayClass may be updated at the +// same time and could cause a changed Gateway object to look like it wasn't in-class +// while in reality it may still have active data-plane configurations because it was +// recently in-class, we can't reliably filter Gateway objects based on class as we +// can't verify that didn't change since we received the object. As such the current +// implementation enqueues ALL UDPRoute objects for reconciliation every time a Gateway +// changes. This is not ideal, but after communicating with other members of the +// community this appears to be a standard approach across multiple implementations at +// the moment for v1alpha2. As future releases of Gateway come out we'll need to +// continue iterating on this and perhaps advocating for upstream changes to help avoid +// this kind of problem without having to enqueue extra objects. +func (r *UDPRouteReconciler) listUDPRoutesForGateway(obj client.Object) []reconcile.Request { + // verify that the object is a Gateway + gw, ok := obj.(*gatewayv1alpha2.Gateway) + if !ok { + r.Log.Error(fmt.Errorf("invalid type"), "found invalid type in event handlers", "expected", "Gateway", "found", reflect.TypeOf(obj)) + return nil + } + + // map all UDPRoute objects + udprouteList := gatewayv1alpha2.UDPRouteList{} + if err := r.Client.List(context.Background(), &udprouteList); err != nil { + r.Log.Error(err, "failed to list udproute objects from the cached client") + return nil + } + + // reduce for UDPRoute objects bound to the Gateway + queue := make([]reconcile.Request, 0) + for _, udproute := range udprouteList.Items { + for _, parentRef := range udproute.Spec.ParentRefs { + namespace := udproute.Namespace + if parentRef.Namespace != nil { + namespace = string(*parentRef.Namespace) + } + if namespace == gw.Namespace && string(parentRef.Name) == gw.Name { + queue = append(queue, reconcile.Request{ + NamespacedName: types.NamespacedName{ + Namespace: udproute.Namespace, + Name: udproute.Name, + }, + }) + } + } + } + + return queue +} + +// ----------------------------------------------------------------------------- +// UDPRoute Controller - Reconciliation +// ----------------------------------------------------------------------------- + +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes,verbs=get;list;watch +//+kubebuilder:rbac:groups=gateway.networking.k8s.io,resources=udproutes/status,verbs=get;update + +// Reconcile is part of the main kubernetes reconciliation loop which aims to +// move the current state of the cluster closer to the desired state. +func (r *UDPRouteReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + log := r.Log.WithValues("NetV1Alpha2UDPRoute", req.NamespacedName) + + udproute := new(gatewayv1alpha2.UDPRoute) + if err := r.Get(ctx, req.NamespacedName, udproute); err != nil { + // if the queued object is no longer present in the proxy cache we need + // to ensure that if it was ever added to the cache, it gets removed. + if errors.IsNotFound(err) { + debug(log, udproute, "object does not exist, ensuring it is not present in the proxy cache") + udproute.Namespace = req.Namespace + udproute.Name = req.Name + return ctrl.Result{}, r.DataplaneClient.DeleteObject(udproute) + } + + // for any error other than 404, requeue + return ctrl.Result{}, err + } + debug(log, udproute, "processing udproute") + + // if there's a present deletion timestamp then we need to update the proxy cache + // to drop all relevant routes from its configuration, regardless of whether or + // not we can find a valid gateway as that gateway may now be deleted but we still + // need to ensure removal of the data-plane configuration. + debug(log, udproute, "checking deletion timestamp") + if udproute.DeletionTimestamp != nil { + debug(log, udproute, "udproute is being deleted, re-configuring data-plane") + if err := r.DataplaneClient.DeleteObject(udproute); err != nil { + debug(log, udproute, "failed to delete object from data-plane, requeuing") + return ctrl.Result{}, err + } + debug(log, udproute, "ensured object was removed from the data-plane (if ever present)") + return ctrl.Result{}, r.DataplaneClient.DeleteObject(udproute) + } + + // we need to pull the Gateway parent objects for the UDPRoute to verify + // routing behavior and ensure compatibility with Gateway configurations. + debug(log, udproute, "retrieving GatewayClass and Gateway for route") + gateways, err := getSupportedGatewayForRoute(ctx, r.Client, udproute) + if err != nil { + if err.Error() == unsupportedGW { + debug(log, udproute, "unsupported route found, processing to verify whether it was ever supported") + // if there's no supported Gateway then this route could have been previously + // supported by this controller. As such we ensure that no supported Gateway + // references exist in the object status any longer. + statusUpdated, err := r.ensureGatewayReferenceStatusRemoved(ctx, udproute) + if err != nil { + // some failure happened so we need to retry to avoid orphaned statuses + return ctrl.Result{}, err + } + if statusUpdated { + // the status did in fact needed to be updated, so no need to requeue + // as the status update will trigger a requeue. + debug(log, udproute, "unsupported route was previously supported, status was updated") + return ctrl.Result{}, nil + } + + // if the route doesn't have a supported Gateway+GatewayClass associated with + // it it's possible it became orphaned after becoming queued. In either case + // ensure that it's removed from the proxy cache to avoid orphaned data-plane + // configurations. + debug(log, udproute, "ensuring that dataplane is updated to remove unsupported route (if applicable)") + return ctrl.Result{}, r.DataplaneClient.DeleteObject(udproute) + } + return ctrl.Result{}, err + } + + // the referenced gateway object(s) for the UDPRoute needs to be ready + // before we'll attempt any configurations of it. If it's not we'll + // requeue the object and wait until all supported gateways are ready. + debug(log, udproute, "checking if the udproute's gateways are ready") + for _, gateway := range gateways { + if !isGatewayReady(gateway) { + debug(log, udproute, "gateway for route was not ready, waiting") + return ctrl.Result{Requeue: true}, nil + } + } + + // if the gateways are ready, and the UDPRoute is destined for them, ensure that + // the object is pushed to the dataplane. + if err := r.DataplaneClient.UpdateObject(udproute); err != nil { + debug(log, udproute, "failed to update object in data-plane, requeueing") + return ctrl.Result{}, err + } + if r.DataplaneClient.AreKubernetesObjectReportsEnabled() { + // if the dataplane client has reporting enabled (this is the default and is + // tied in with status updates being enabled in the controller manager) then + // we will wait until the object is reported as successfully configured before + // moving on to status updates. + if !r.DataplaneClient.KubernetesObjectIsConfigured(udproute) { + return ctrl.Result{Requeue: true}, nil + } + } + + // now that the object has been successfully configured for in the dataplane + // we can update the object status to indicate that it's now properly linked + // to the configured Gateways. + debug(log, udproute, "ensuring status contains Gateway associations") + statusUpdated, err := r.ensureGatewayReferenceStatusAdded(ctx, udproute, gateways...) + if err != nil { + // don't proceed until the statuses can be updated appropriately + return ctrl.Result{}, err + } + if statusUpdated { + // if the status was updated it will trigger a follow-up reconciliation + // so we don't need to do anything further here. + return ctrl.Result{}, nil + } + + // once the data-plane has accepted the UDPRoute object, we're all set. + info(log, udproute, "udproute has been configured on the data-plane") + return ctrl.Result{}, nil +} + +// ----------------------------------------------------------------------------- +// UDPRouteReconciler - Status Helpers +// ----------------------------------------------------------------------------- + +// udprouteParentKind indicates the only object KIND that this UDPRoute +// implementation supports for route object parent references. +var udprouteParentKind = "Gateway" + +// ensureGatewayReferenceStatus takes any number of Gateways that should be +// considered "attached" to a given UDPRoute and ensures that the status +// for the UDPRoute is updated appropriately. +func (r *UDPRouteReconciler) ensureGatewayReferenceStatusAdded(ctx context.Context, udproute *gatewayv1alpha2.UDPRoute, gateways ...*gatewayv1alpha2.Gateway) (bool, error) { + // map the existing parentStatues to avoid duplications + parentStatuses := make(map[string]*gatewayv1alpha2.RouteParentStatus) + for _, existingParent := range udproute.Status.Parents { + namespace := udproute.Namespace + if existingParent.ParentRef.Namespace != nil { + namespace = string(*existingParent.ParentRef.Namespace) + } + existingParentCopy := existingParent + parentStatuses[namespace+string(existingParent.ParentRef.Name)] = &existingParentCopy + } + + // overlay the parent ref statuses for all new gateway references + statusChangesWereMade := false + for _, gateway := range gateways { + // build a new status for the parent Gateway + gatewayParentStatus := &gatewayv1alpha2.RouteParentStatus{ + ParentRef: gatewayv1alpha2.ParentReference{ + Group: (*gatewayv1alpha2.Group)(&gatewayv1alpha2.GroupVersion.Group), + Kind: (*gatewayv1alpha2.Kind)(&udprouteParentKind), + Namespace: (*gatewayv1alpha2.Namespace)(&gateway.Namespace), + Name: gatewayv1alpha2.ObjectName(gateway.Name), + }, + ControllerName: ControllerName, + Conditions: []metav1.Condition{{ + Type: string(gatewayv1alpha2.ConditionRouteAccepted), + Status: metav1.ConditionTrue, + ObservedGeneration: udproute.Generation, + LastTransitionTime: metav1.Now(), + Reason: string(gatewayv1alpha2.GatewayReasonReady), + }}, + } + + // if the reference already exists and doesn't require any changes + // then just leave it alone. + if existingGatewayParentStatus, exists := parentStatuses[gateway.Namespace+gateway.Name]; exists { + // fake the time of the existing status as this wont be equal + for i := range existingGatewayParentStatus.Conditions { + existingGatewayParentStatus.Conditions[i].LastTransitionTime = gatewayParentStatus.Conditions[0].LastTransitionTime + } + + // other than the condition timestamps, check if the statuses are equal + if reflect.DeepEqual(existingGatewayParentStatus, gatewayParentStatus) { + continue + } + } + + // otherwise overlay the new status on top the list of parentStatuses + parentStatuses[gateway.Namespace+gateway.Name] = gatewayParentStatus + statusChangesWereMade = true + } + + // if we didn't have to actually make any changes, no status update is needed + if !statusChangesWereMade { + return false, nil + } + + // update the udproute status with the new status references + udproute.Status.Parents = make([]gatewayv1alpha2.RouteParentStatus, 0, len(parentStatuses)) + for _, parent := range parentStatuses { + udproute.Status.Parents = append(udproute.Status.Parents, *parent) + } + + // update the object status in the API + if err := r.Status().Update(ctx, udproute); err != nil { + return false, err + } + + // the status needed an update and it was updated successfully + return true, nil +} + +// ensureGatewayReferenceStatusRemoved uses the ControllerName provided by the Gateway +// implementation to prune status references to Gateways supported by this controller +// in the provided UDPRoute object. +func (r *UDPRouteReconciler) ensureGatewayReferenceStatusRemoved(ctx context.Context, udproute *gatewayv1alpha2.UDPRoute) (bool, error) { + // drop all status references to supported Gateway objects + newStatuses := make([]gatewayv1alpha2.RouteParentStatus, 0) + for _, status := range udproute.Status.Parents { + if status.ControllerName != ControllerName { + newStatuses = append(newStatuses, status) + } + } + + // if the new list of statuses is the same length as the old + // nothing has changed and we're all done. + if len(newStatuses) == len(udproute.Status.Parents) { + return false, nil + } + + // update the object status in the API + udproute.Status.Parents = newStatuses + if err := r.Status().Update(ctx, udproute); err != nil { + return false, err + } + + // the status needed an update and it was updated successfully + return true, nil +} diff --git a/internal/manager/controllerdef.go b/internal/manager/controllerdef.go index 6e68cc0e4f..63901483b7 100644 --- a/internal/manager/controllerdef.go +++ b/internal/manager/controllerdef.go @@ -284,6 +284,21 @@ func setupControllers( DataplaneClient: dataplaneClient, }, }, + { + Enabled: featureGates[gatewayFeature], + AutoHandler: crdExistsChecker{ + GVR: schema.GroupVersionResource{ + Group: gatewayv1alpha2.SchemeGroupVersion.Group, + Version: gatewayv1alpha2.SchemeGroupVersion.Version, + Resource: "udproutes", + }}.CRDExists, + Controller: &gateway.UDPRouteReconciler{ + Client: mgr.GetClient(), + Log: ctrl.Log.WithName("controllers").WithName("UDPRoute"), + Scheme: mgr.GetScheme(), + DataplaneClient: dataplaneClient, + }, + }, } return controllers, nil diff --git a/internal/store/fake_store.go b/internal/store/fake_store.go index d7d0317cf9..6127a822df 100644 --- a/internal/store/fake_store.go +++ b/internal/store/fake_store.go @@ -33,7 +33,8 @@ type FakeObjects struct { IngressesV1beta1 []*networkingv1beta1.Ingress IngressesV1 []*networkingv1.Ingress IngressClassesV1 []*networkingv1.IngressClass - HTTPRoute []*gatewayv1alpha2.HTTPRoute + HTTPRoutes []*gatewayv1alpha2.HTTPRoute + UDPRoutes []*gatewayv1alpha2.UDPRoute TCPIngresses []*configurationv1beta1.TCPIngress UDPIngresses []*configurationv1beta1.UDPIngress Services []*apiv1.Service @@ -49,7 +50,8 @@ type FakeObjects struct { // NewFakeStore creates a store backed by the objects passed in as arguments. func NewFakeStore( - objects FakeObjects) (Storer, error) { + objects FakeObjects, +) (Storer, error) { var s Storer ingressV1beta1Store := cache.NewStore(keyFunc) @@ -74,11 +76,17 @@ func NewFakeStore( } } httprouteStore := cache.NewStore(keyFunc) - for _, httproute := range objects.HTTPRoute { + for _, httproute := range objects.HTTPRoutes { if err := httprouteStore.Add(httproute); err != nil { return nil, err } } + udprouteStore := cache.NewStore(keyFunc) + for _, udproute := range objects.UDPRoutes { + if err := udprouteStore.Add(udproute); err != nil { + return nil, err + } + } tcpIngressStore := cache.NewStore(keyFunc) for _, ingress := range objects.TCPIngresses { err := tcpIngressStore.Add(ingress) @@ -155,6 +163,7 @@ func NewFakeStore( IngressV1: ingressV1Store, IngressClassV1: ingressClassV1Store, HTTPRoute: httprouteStore, + UDPRoute: udprouteStore, TCPIngress: tcpIngressStore, UDPIngress: udpIngressStore, Service: serviceStore, diff --git a/internal/store/fake_store_test.go b/internal/store/fake_store_test.go index 31b368b39f..8110a8073b 100644 --- a/internal/store/fake_store_test.go +++ b/internal/store/fake_store_test.go @@ -11,6 +11,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" knative "knative.dev/networking/pkg/apis/networking/v1alpha1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" "github.com/kong/kubernetes-ingress-controller/v2/internal/annotations" configurationv1 "github.com/kong/kubernetes-ingress-controller/v2/pkg/apis/configuration/v1" @@ -713,3 +714,53 @@ func TestFakeStore_ListCACerts(t *testing.T) { assert.Nil(err) assert.Len(certs, 2, "expect two secrets as CA certificates") } + +func TestFakeStoreHTTPRoute(t *testing.T) { + assert := assert.New(t) + + classes := []*gatewayv1alpha2.HTTPRoute{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + Spec: gatewayv1alpha2.HTTPRouteSpec{}, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + }, + Spec: gatewayv1alpha2.HTTPRouteSpec{}, + }, + } + store, err := NewFakeStore(FakeObjects{HTTPRoutes: classes}) + assert.Nil(err) + assert.NotNil(store) + routes, err := store.ListHTTPRoutes() + assert.Nil(err) + assert.Len(routes, 2, "expect two HTTPRoutes") +} + +func TestFakeStoreUDPRoute(t *testing.T) { + assert := assert.New(t) + + classes := []*gatewayv1alpha2.UDPRoute{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + }, + Spec: gatewayv1alpha2.UDPRouteSpec{}, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "bar", + }, + Spec: gatewayv1alpha2.UDPRouteSpec{}, + }, + } + store, err := NewFakeStore(FakeObjects{UDPRoutes: classes}) + assert.Nil(err) + assert.NotNil(store) + routes, err := store.ListUDPRoutes() + assert.Nil(err) + assert.Len(routes, 2, "expect two UDPRoutes") +} diff --git a/internal/store/store.go b/internal/store/store.go index 0a43f04932..71783b8f6e 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -83,6 +83,7 @@ type Storer interface { ListIngressesV1() []*networkingv1.Ingress ListIngressClassesV1() []*networkingv1.IngressClass ListHTTPRoutes() ([]*gatewayv1alpha2.HTTPRoute, error) + ListUDPRoutes() ([]*gatewayv1alpha2.UDPRoute, error) ListTCPIngresses() ([]*kongv1beta1.TCPIngress, error) ListUDPIngresses() ([]*kongv1beta1.UDPIngress, error) ListKnativeIngresses() ([]*knative.Ingress, error) @@ -124,6 +125,7 @@ type CacheStores struct { // Gateway API Stores HTTPRoute cache.Store + UDPRoute cache.Store // Kong Stores Plugin cache.Store @@ -148,6 +150,7 @@ func NewCacheStores() (c CacheStores) { c.IngressClassV1 = cache.NewStore(clusterResourceKeyFunc) c.IngressV1beta1 = cache.NewStore(keyFunc) c.HTTPRoute = cache.NewStore(keyFunc) + c.UDPRoute = cache.NewStore(keyFunc) c.KnativeIngress = cache.NewStore(keyFunc) c.Plugin = cache.NewStore(keyFunc) c.Secret = cache.NewStore(keyFunc) @@ -229,6 +232,8 @@ func (c CacheStores) Get(obj runtime.Object) (item interface{}, exists bool, err // ---------------------------------------------------------------------------- case *gatewayv1alpha2.HTTPRoute: return c.HTTPRoute.Get(obj) + case *gatewayv1alpha2.UDPRoute: + return c.UDPRoute.Get(obj) // ---------------------------------------------------------------------------- // Kong API Support // ---------------------------------------------------------------------------- @@ -282,6 +287,8 @@ func (c CacheStores) Add(obj runtime.Object) error { // ---------------------------------------------------------------------------- case *gatewayv1alpha2.HTTPRoute: return c.HTTPRoute.Add(obj) + case *gatewayv1alpha2.UDPRoute: + return c.UDPRoute.Add(obj) // ---------------------------------------------------------------------------- // Kong API Support // ---------------------------------------------------------------------------- @@ -336,6 +343,8 @@ func (c CacheStores) Delete(obj runtime.Object) error { // ---------------------------------------------------------------------------- case *gatewayv1alpha2.HTTPRoute: return c.HTTPRoute.Delete(obj) + case *gatewayv1alpha2.UDPRoute: + return c.UDPRoute.Delete(obj) // ---------------------------------------------------------------------------- // Kong API Support // ---------------------------------------------------------------------------- @@ -516,6 +525,22 @@ func (s Store) ListHTTPRoutes() ([]*gatewayv1alpha2.HTTPRoute, error) { return httproutes, nil } +// ListUDPRoutes returns the list of UDPRoutes in the UDPRoute cache store. +func (s Store) ListUDPRoutes() ([]*gatewayv1alpha2.UDPRoute, error) { + var udproutes []*gatewayv1alpha2.UDPRoute + if err := cache.ListAll(s.stores.UDPRoute, labels.NewSelector(), + func(ob interface{}) { + udproute, ok := ob.(*gatewayv1alpha2.UDPRoute) + if ok { + udproutes = append(udproutes, udproute) + } + }, + ); err != nil { + return nil, err + } + return udproutes, nil +} + // ListTCPIngresses returns the list of TCP Ingresses from // configuration.konghq.com group. func (s Store) ListTCPIngresses() ([]*kongv1beta1.TCPIngress, error) { From 6a77cdae94be0f25382b0285bd4d3318cf53385b Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Thu, 24 Mar 2022 15:18:26 -0700 Subject: [PATCH 3/6] feat(parser) add UDPRoute support --- CHANGELOG.md | 3 + internal/dataplane/parser/parser.go | 1 + .../dataplane/parser/translate_udproute.go | 187 +++++++++ test/integration/udpingress_test.go | 32 +- test/integration/udproute_test.go | 361 ++++++++++++++++++ 5 files changed, 578 insertions(+), 6 deletions(-) create mode 100644 internal/dataplane/parser/translate_udproute.go create mode 100644 test/integration/udproute_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 91ab81f762..9b5cb849a3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -81,6 +81,9 @@ - Admission webhook certificate files now track updates to the file, and will update when the corresponding Secret has changed. [#2258](https://github.com/Kong/kubernetes-ingress-controller/pull/2258) +- Added support for Gateway API [UDPRoute](https://gateway-api.sigs.k8s.io/v1alpha2/references/spec/#gateway.networking.k8s.io/v1alpha2.UDPRoute) + resources. + [#2363](https://github.com/Kong/kubernetes-ingress-controller/pull/2363) #### Fixed diff --git a/internal/dataplane/parser/parser.go b/internal/dataplane/parser/parser.go index 99eb3094e7..69bf672a96 100644 --- a/internal/dataplane/parser/parser.go +++ b/internal/dataplane/parser/parser.go @@ -68,6 +68,7 @@ func (p *Parser) Build() (*kongstate.KongState, error) { p.ingressRulesFromUDPIngressV1beta1(), p.ingressRulesFromKnativeIngress(), p.ingressRulesFromHTTPRoutes(), + p.ingressRulesFromUDPRoutes(), ) // populate any Kubernetes Service objects relevant objects diff --git a/internal/dataplane/parser/translate_udproute.go b/internal/dataplane/parser/translate_udproute.go new file mode 100644 index 0000000000..cc11f47c09 --- /dev/null +++ b/internal/dataplane/parser/translate_udproute.go @@ -0,0 +1,187 @@ +package parser + +import ( + "fmt" + + "github.com/kong/go-kong/kong" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/dataplane/kongstate" + "github.com/kong/kubernetes-ingress-controller/v2/internal/util" +) + +// ----------------------------------------------------------------------------- +// Translate UDPRoute - IngressRules Translation +// ----------------------------------------------------------------------------- + +// ingressRulesFromUDPRoutes processes a list of UDPRoute objects and translates +// then into Kong configuration objects. +func (p *Parser) ingressRulesFromUDPRoutes() ingressRules { + result := newIngressRules() + + udpRouteList, err := p.storer.ListUDPRoutes() + if err != nil { + p.logger.Errorf("failed to list UDPRoutes: %w", err) + return result + } + + var errs []error + for _, udproute := range udpRouteList { + if err := ingressRulesFromUDPRoute(&result, udproute); err != nil { + err = fmt.Errorf("UDPRoute %s/%s can't be routed: %w", udproute.Namespace, udproute.Name, err) + errs = append(errs, err) + } else { + // at this point the object has been configured and can be + // reported as successfully parsed. + p.ReportKubernetesObjectUpdate(udproute) + } + } + + if len(errs) > 0 { + for _, err := range errs { + p.logger.Errorf(err.Error()) + } + } + + return result +} + +func ingressRulesFromUDPRoute(result *ingressRules, udproute *gatewayv1alpha2.UDPRoute) error { + // first we grab the spec and gather some metdata about the object + spec := udproute.Spec + + // validation for UDPRoutes will happen at a higher layer, but in spite of that we run + // validation at this level as well as a fallback so that if routes are posted which + // are invalid somehow make it past validation (e.g. the webhook is not enabled) we can + // at least try to provide a helpful message about the situation in the manager logs. + if len(spec.Rules) == 0 { + return fmt.Errorf("no rules provided") + } + + // each rule may represent a different set of backend services that will be accepting + // traffic, so we make separate routes and Kong services for every present rule. + for ruleNumber, rule := range spec.Rules { + // TODO: add this to a generic UDPRoute validation, and then we should probably + // simply be calling validation on each udproute object at the begininning + // of the topmost list. + if len(rule.BackendRefs) == 0 { + return fmt.Errorf("missing backendRef in rule") + } + + // TODO: support multiple backend refs + if len(rule.BackendRefs) > 1 { + return fmt.Errorf("multiple backendRefs are not yet supported") + } + + // determine the routes needed to route traffic to services for this rule + routes, err := generateKongRoutesFromUDPRouteRule(udproute, ruleNumber, rule) + if err != nil { + return err + } + + // create a service and attach the routes to it + service := generateKongServiceFromUDPRouteBackendRef(result, udproute, rule.BackendRefs[0]) + service.Routes = append(service.Routes, routes...) + + // cache the service to avoid duplicates in further loop iterations + result.ServiceNameToServices[*service.Service.Name] = service + } + + return nil +} + +// ----------------------------------------------------------------------------- +// Translate UDPRoute - Utils +// ----------------------------------------------------------------------------- + +// generateKongRoutesFromUDPRouteRule converts an UDPRoute rule to one or more +// Kong Route objects to route traffic to services. +func generateKongRoutesFromUDPRouteRule(udproute *gatewayv1alpha2.UDPRoute, ruleNumber int, + rule gatewayv1alpha2.UDPRouteRule) ([]kongstate.Route, error) { + // gather the k8s object information and hostnames from the udproute + objectInfo := util.FromK8sObject(udproute) + + var routes []kongstate.Route + if len(rule.Matches) > 0 { + // As of 2022-03-04, matches are supported only in experimental CRDs. if you apply a UDPRoute with matches against + // the stable CRDs, the matches disappear into the ether (only if doing it via client-go, kubectl rejects them) + // We do not intend to implement these until they are stable per https://github.com/Kong/kubernetes-ingress-controller/issues/2087#issuecomment-1079053290 + return routes, fmt.Errorf("UDPRoute Matches are not yet supported") + } + if len(rule.BackendRefs) > 1 { + // TODO refactor around the solution used for https://github.com/Kong/kubernetes-ingress-controller/issues/2173 + return routes, fmt.Errorf("Support for multiple backends in UDPRoute rules is not yet supported") + } + routeName := kong.String(fmt.Sprintf( + "udproute.%s.%s.%d.%d", + udproute.Namespace, + udproute.Name, + ruleNumber, + 0, + )) + + // for now, UDPRoutes provide no means of specifying a destination port other than the backend target port + // they will once https://gateway-api.sigs.k8s.io/geps/gep-957/ is stable. in the interim, this always uses the + // backend target TODO also needs support for multiple backends + var destinations []*kong.CIDRPort + destinations = append(destinations, &kong.CIDRPort{Port: kong.Int(int(*rule.BackendRefs[0].Port))}) + r := kongstate.Route{ + Ingress: objectInfo, + Route: kong.Route{ + Name: routeName, + Protocols: kong.StringSlice("udp"), + Destinations: destinations, + }, + } + + routes = append(routes, r) + + return routes, nil +} + +// generateKongServiceFromUDPRouteBackendRef converts a provided backendRef for an UDPRoute +// into a kong.Service so that routes for that object can be attached to the Service. +// TODO add a generic backendRef handler for all GW routes. HTTPRoute needs a wrapper because it uses a special wrapped +// type with filters. Deferred til after https://github.com/Kong/kubernetes-ingress-controller/issues/2166 though +// we probably shouldn't see much change to the service (just the upstream it references in Host) +func generateKongServiceFromUDPRouteBackendRef(result *ingressRules, udproute *gatewayv1alpha2.UDPRoute, backendRef gatewayv1alpha2.BackendRef) kongstate.Service { + // determine the service namespace + // TODO: need to add validation to restrict namespaces in backendRefs + namespace := udproute.Namespace + if backendRef.Namespace != nil { + namespace = string(*backendRef.Namespace) + } + + // determine the name of the Service + serviceName := fmt.Sprintf("%s.%s.%d", namespace, backendRef.Name, *backendRef.Port) + + // determine the Service port + port := kongstate.PortDef{ + Mode: kongstate.PortModeByNumber, + Number: int32(*backendRef.Port), + } + + // check if the service is already known, and if not create it + service, ok := result.ServiceNameToServices[serviceName] + if !ok { + service = kongstate.Service{ + Service: kong.Service{ + Name: kong.String(serviceName), + Host: kong.String(fmt.Sprintf("%s.%s.%s.svc", backendRef.Name, namespace, port.CanonicalString())), + Port: kong.Int(int(*backendRef.Port)), + Protocol: kong.String("udp"), + ConnectTimeout: kong.Int(DefaultServiceTimeout), + ReadTimeout: kong.Int(DefaultServiceTimeout), + WriteTimeout: kong.Int(DefaultServiceTimeout), + Retries: kong.Int(DefaultRetries), + }, + Namespace: udproute.Namespace, + Backend: kongstate.ServiceBackend{ + Name: string(backendRef.Name), + Port: port, + }, + } + } + + return service +} diff --git a/test/integration/udpingress_test.go b/test/integration/udpingress_test.go index f8da8314a3..9c724ab6fb 100644 --- a/test/integration/udpingress_test.go +++ b/test/integration/udpingress_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + ktfkong "github.com/kong/kubernetes-testing-framework/pkg/clusters/addons/kong" "github.com/kong/kubernetes-testing-framework/pkg/utils/kubernetes/generators" "github.com/miekg/dns" "github.com/stretchr/testify/assert" @@ -99,7 +100,7 @@ func TestUDPIngressEssentials(t *testing.T) { }, Spec: kongv1beta1.UDPIngressSpec{Rules: []kongv1beta1.UDPIngressRule{ { - Port: 9999, + Port: ktfkong.DefaultUDPServicePort, Backend: kongv1beta1.IngressBackend{ ServiceName: service.Name, ServicePort: int(service.Spec.Ports[0].Port), @@ -128,7 +129,7 @@ func TestUDPIngressEssentials(t *testing.T) { d := net.Dialer{ Timeout: time.Second * 5, } - return d.DialContext(ctx, network, fmt.Sprintf("%s:9999", proxyUDPURL.Hostname())) + return d.DialContext(ctx, network, fmt.Sprintf("%s:%d", proxyUDPURL.Hostname(), ktfkong.DefaultUDPServicePort)) }, } @@ -237,7 +238,7 @@ func TestUDPIngressTCPIngressCollision(t *testing.T) { }, Spec: kongv1beta1.UDPIngressSpec{Rules: []kongv1beta1.UDPIngressRule{ { - Port: 9999, + Port: ktfkong.DefaultUDPServicePort, Backend: kongv1beta1.IngressBackend{ ServiceName: service.Name, ServicePort: int(service.Spec.Ports[0].Port), @@ -286,7 +287,7 @@ func TestUDPIngressTCPIngressCollision(t *testing.T) { t.Logf("checking DNS to resolve via UDPIngress %s", udp.Name) assert.Eventually(t, func() bool { - _, _, err := dnsUDPClient.Exchange(query, fmt.Sprintf("%s:9999", proxyUDPURL.Hostname())) + _, _, err := dnsUDPClient.Exchange(query, fmt.Sprintf("%s:%d", proxyUDPURL.Hostname(), ktfkong.DefaultUDPServicePort)) return err == nil }, ingressWait, waitTick) @@ -347,7 +348,7 @@ func TestUDPIngressTCPIngressCollision(t *testing.T) { t.Logf("checking DNS to resolve via UDPIngress %s still works also", udp.Name) assert.Eventually(t, func() bool { - _, _, err := dnsUDPClient.Exchange(query, fmt.Sprintf("%s:9999", proxyUDPURL.Hostname())) + _, _, err := dnsUDPClient.Exchange(query, fmt.Sprintf("%s:%d", proxyUDPURL.Hostname(), ktfkong.DefaultUDPServicePort)) return err == nil }, ingressWait, waitTick) @@ -355,7 +356,7 @@ func TestUDPIngressTCPIngressCollision(t *testing.T) { t.Logf("tearing down UDPIngress %s and ensuring backends are torn down", udp.Name) assert.NoError(t, c.ConfigurationV1beta1().UDPIngresses(ns.Name).Delete(ctx, udp.Name, metav1.DeleteOptions{})) assert.Eventually(t, func() bool { - _, _, err := dnsUDPClient.Exchange(query, fmt.Sprintf("%s:9999", proxyUDPURL.Hostname())) + _, _, err := dnsUDPClient.Exchange(query, fmt.Sprintf("%s:%d", proxyUDPURL.Hostname(), ktfkong.DefaultUDPServicePort)) if err != nil { if strings.Contains(err.Error(), "i/o timeout") { return true @@ -397,4 +398,23 @@ const corefile = ` reload loadbalance } +.:9999 { + errors + health { + lameduck 5s + } + ready + kubernetes cluster.local in-addr.arpa ip6.arpa { + pods insecure + fallthrough in-addr.arpa ip6.arpa + ttl 30 + } + forward . /etc/resolv.conf { + max_concurrent 1000 + } + cache 30 + loop + reload + loadbalance +} ` diff --git a/test/integration/udproute_test.go b/test/integration/udproute_test.go new file mode 100644 index 0000000000..779c4de6d9 --- /dev/null +++ b/test/integration/udproute_test.go @@ -0,0 +1,361 @@ +//go:build integration_tests +// +build integration_tests + +package integration + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/google/uuid" + ktfkong "github.com/kong/kubernetes-testing-framework/pkg/clusters/addons/kong" + "github.com/kong/kubernetes-testing-framework/pkg/utils/kubernetes/generators" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + gatewayv1alpha2 "sigs.k8s.io/gateway-api/apis/v1alpha2" + gatewayclient "sigs.k8s.io/gateway-api/pkg/client/clientset/versioned" + + "github.com/kong/kubernetes-ingress-controller/v2/internal/controllers/gateway" +) + +func TestUDPRouteEssentials(t *testing.T) { + ns, cleanup := namespace(t) + defer cleanup() + t.Log("locking UDP port") + udpMutex.Lock() + defer func() { + // Free up the UDP port + t.Log("unlocking UDP port") + udpMutex.Unlock() + }() + + // TODO consolidate into suite and use for all GW tests? + t.Log("deploying a supported gatewayclass to the test cluster") + c, err := gatewayclient.NewForConfig(env.Cluster().Config()) + require.NoError(t, err) + gwc := &gatewayv1alpha2.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: uuid.NewString(), + }, + Spec: gatewayv1alpha2.GatewayClassSpec{ + ControllerName: gateway.ControllerName, + }, + } + gwc, err = c.GatewayV1alpha2().GatewayClasses().Create(ctx, gwc, metav1.CreateOptions{}) + require.NoError(t, err) + + defer func() { + t.Log("cleaning up gatewayclasses") + if err := c.GatewayV1alpha2().GatewayClasses().Delete(ctx, gwc.Name, metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + assert.NoError(t, err) + } + } + }() + + t.Log("deploying a gateway to the test cluster using unmanaged gateway mode") + gw := &gatewayv1alpha2.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "kong", + Annotations: map[string]string{ + unmanagedAnnotation: "true", // trigger the unmanaged gateway mode + }, + }, + Spec: gatewayv1alpha2.GatewaySpec{ + GatewayClassName: gatewayv1alpha2.ObjectName(gwc.Name), + Listeners: []gatewayv1alpha2.Listener{{ + Name: "udp", + Protocol: gatewayv1alpha2.UDPProtocolType, + Port: gatewayv1alpha2.PortNumber(ktfkong.DefaultUDPServicePort), + }}, + }, + } + gw, err = c.GatewayV1alpha2().Gateways(ns.Name).Create(ctx, gw, metav1.CreateOptions{}) + require.NoError(t, err) + + defer func() { + t.Log("cleaning up gateways") + if err := c.GatewayV1alpha2().Gateways(ns.Name).Delete(ctx, gw.Name, metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + assert.NoError(t, err) + } + } + }() + + t.Log("configuring coredns corefile") + cfgmap := &corev1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "coredns"}, Data: map[string]string{"Corefile": corefile}} + cfgmap, err = env.Cluster().Client().CoreV1().ConfigMaps(ns.Name).Create(ctx, cfgmap, metav1.CreateOptions{}) + assert.NoError(t, err) + + defer func() { + t.Logf("cleaning up the coredns corefile %s", cfgmap.Name) + assert.NoError(t, env.Cluster().Client().CoreV1().ConfigMaps(ns.Name).Delete(ctx, cfgmap.Name, metav1.DeleteOptions{})) + }() + + t.Log("configuring a coredns deployent to deploy for UDP testing") + container := generators.NewContainer("coredns", coreDNSImage, 9999) + container.Ports[0].Protocol = corev1.ProtocolUDP + container.VolumeMounts = []corev1.VolumeMount{{Name: "config-volume", MountPath: "/etc/coredns"}} + container.Args = []string{"-conf", "/etc/coredns/Corefile"} + deployment := generators.NewDeploymentForContainer(container) + + t.Log("configuring the coredns pod with a custom corefile") + configVolume := corev1.Volume{ + Name: "config-volume", + VolumeSource: corev1.VolumeSource{ConfigMap: &corev1.ConfigMapVolumeSource{ + LocalObjectReference: corev1.LocalObjectReference{Name: cfgmap.Name}, + Items: []corev1.KeyToPath{{Key: "Corefile", Path: "Corefile"}}}}} + deployment.Spec.Template.Spec.Volumes = append(deployment.Spec.Template.Spec.Volumes, configVolume) + + t.Log("deploying coredns") + deployment, err = env.Cluster().Client().AppsV1().Deployments(ns.Name).Create(ctx, deployment, metav1.CreateOptions{}) + assert.NoError(t, err) + + defer func() { + t.Logf("cleaning up deployment %s", deployment.Name) + assert.NoError(t, env.Cluster().Client().AppsV1().Deployments(ns.Name).Delete(ctx, deployment.Name, metav1.DeleteOptions{})) + }() + + t.Logf("exposing deployment %s via service", deployment.Name) + service := generators.NewServiceForDeployment(deployment, corev1.ServiceTypeLoadBalancer) + service, err = env.Cluster().Client().CoreV1().Services(ns.Name).Create(ctx, service, metav1.CreateOptions{}) + assert.NoError(t, err) + + defer func() { + t.Logf("cleaning up the service %s", service.Name) + assert.NoError(t, env.Cluster().Client().CoreV1().Services(ns.Name).Delete(ctx, service.Name, metav1.DeleteOptions{})) + }() + + t.Logf("creating a udproute to access deployment %s via kong", deployment.Name) + udpPortDefault := gatewayv1alpha2.PortNumber(ktfkong.DefaultUDPServicePort) + udproute := &gatewayv1alpha2.UDPRoute{ + ObjectMeta: metav1.ObjectMeta{ + Name: uuid.NewString(), + Annotations: map[string]string{}, + }, + Spec: gatewayv1alpha2.UDPRouteSpec{ + CommonRouteSpec: gatewayv1alpha2.CommonRouteSpec{ + ParentRefs: []gatewayv1alpha2.ParentReference{{ + Name: gatewayv1alpha2.ObjectName(gw.Name), + }}, + }, + Rules: []gatewayv1alpha2.UDPRouteRule{{ + BackendRefs: []gatewayv1alpha2.BackendRef{{ + BackendObjectReference: gatewayv1alpha2.BackendObjectReference{ + Name: gatewayv1alpha2.ObjectName(service.Name), + Port: &udpPortDefault, + }, + }}, + }}, + }, + } + + t.Log("configurating a net.Resolver to resolve DNS via the proxy") + resolver := &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{ + Timeout: time.Second * 5, + } + return d.DialContext(ctx, network, fmt.Sprintf("%s:%d", proxyUDPURL.Hostname(), ktfkong.DefaultUDPServicePort)) + }, + } + + udproute, err = c.GatewayV1alpha2().UDPRoutes(ns.Name).Create(ctx, udproute, metav1.CreateOptions{}) + require.NoError(t, err) + + defer func() { + t.Logf("cleaning up the udproute %s", udproute.Name) + if err := c.GatewayV1alpha2().UDPRoutes(ns.Name).Delete(ctx, udproute.Name, metav1.DeleteOptions{}); err != nil { + if !errors.IsNotFound(err) { + assert.NoError(t, err) + } + } + }() + + t.Log("verifying that the Gateway gets linked to the route via status") + udpeventuallyGatewayIsLinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Logf("checking DNS to resolve via UDPIngress %s", udproute.Name) + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err == nil + }, ingressWait, waitTick) + + t.Log("removing the parentrefs from the UDPRoute") + oldParentRefs := udproute.Spec.ParentRefs + require.Eventually(t, func() bool { + udproute, err = c.GatewayV1alpha2().UDPRoutes(ns.Name).Get(ctx, udproute.Name, metav1.GetOptions{}) + require.NoError(t, err) + udproute.Spec.ParentRefs = nil + udproute, err = c.GatewayV1alpha2().UDPRoutes(ns.Name).Update(ctx, udproute, metav1.UpdateOptions{}) + return err == nil + }, time.Minute, time.Second) + + t.Log("verifying that the Gateway gets unlinked from the route via status") + udpeventuallyGatewayIsUnlinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Log("verifying that the data-plane configuration from the UDPRoute gets dropped with the parentRefs now removed") + // negative checks for these tests check that DNS queries eventually start to fail, presumably because they time + // out. we assume there shouldn't be unrelated failure reasons because they always follow a test that confirm + // resolution was working before. we can't use never here because there may be some delay in deleting the route + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err != nil + }, ingressWait, waitTick) + + t.Log("putting the parentRefs back") + require.Eventually(t, func() bool { + udproute, err = c.GatewayV1alpha2().UDPRoutes(ns.Name).Get(ctx, udproute.Name, metav1.GetOptions{}) + require.NoError(t, err) + udproute.Spec.ParentRefs = oldParentRefs + udproute, err = c.GatewayV1alpha2().UDPRoutes(ns.Name).Update(ctx, udproute, metav1.UpdateOptions{}) + return err == nil + }, time.Minute, time.Second) + + t.Log("verifying that the Gateway gets linked to the route via status") + udpeventuallyGatewayIsLinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Log("verifying that putting the parentRefs back results in the routes becoming available again") + t.Logf("checking DNS to resolve via UDPIngress %s", udproute.Name) + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err == nil + }, ingressWait, waitTick) + + t.Log("deleting the GatewayClass") + oldGWCName := gwc.Name + require.NoError(t, c.GatewayV1alpha2().GatewayClasses().Delete(ctx, gwc.Name, metav1.DeleteOptions{})) + + t.Log("verifying that the Gateway gets unlinked from the route via status") + udpeventuallyGatewayIsUnlinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Log("verifying that the data-plane configuration from the UDPRoute gets dropped with the GatewayClass now removed") + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err != nil + }, ingressWait, waitTick) + + t.Log("putting the GatewayClass back") + gwc = &gatewayv1alpha2.GatewayClass{ + ObjectMeta: metav1.ObjectMeta{ + Name: oldGWCName, + }, + Spec: gatewayv1alpha2.GatewayClassSpec{ + ControllerName: gateway.ControllerName, + }, + } + gwc, err = c.GatewayV1alpha2().GatewayClasses().Create(ctx, gwc, metav1.CreateOptions{}) + require.NoError(t, err) + + t.Log("verifying that the Gateway gets linked to the route via status") + udpeventuallyGatewayIsLinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Log("verifying that creating the GatewayClass again triggers reconciliation of UDPRoutes and the route becomes available again") + t.Logf("checking DNS to resolve via UDPIngress %s", udproute.Name) + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err == nil + }, ingressWait, waitTick) + + t.Log("deleting the Gateway") + oldGWName := gw.Name + require.NoError(t, c.GatewayV1alpha2().Gateways(ns.Name).Delete(ctx, gw.Name, metav1.DeleteOptions{})) + + t.Log("verifying that the Gateway gets unlinked from the route via status") + udpeventuallyGatewayIsUnlinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Log("verifying that the data-plane configuration from the UDPRoute gets dropped with the Gateway now removed") + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err != nil + }, ingressWait, waitTick) + + t.Log("putting the Gateway back") + gw = &gatewayv1alpha2.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: oldGWName, + Annotations: map[string]string{ + unmanagedAnnotation: "true", // trigger the unmanaged gateway mode + }, + }, + Spec: gatewayv1alpha2.GatewaySpec{ + GatewayClassName: gatewayv1alpha2.ObjectName(gwc.Name), + Listeners: []gatewayv1alpha2.Listener{{ + Name: "udp", + Protocol: gatewayv1alpha2.UDPProtocolType, + Port: gatewayv1alpha2.PortNumber(9999), + }}, + }, + } + gw, err = c.GatewayV1alpha2().Gateways(ns.Name).Create(ctx, gw, metav1.CreateOptions{}) + require.NoError(t, err) + + t.Log("verifying that the Gateway gets linked to the route via status") + udpeventuallyGatewayIsLinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Log("verifying that creating the Gateway again triggers reconciliation of UDPRoutes and the route becomes available again") + t.Logf("checking DNS to resolve via UDPIngress %s", udproute.Name) + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err == nil + }, ingressWait, waitTick) + + t.Log("deleting both GatewayClass and Gateway rapidly") + require.NoError(t, c.GatewayV1alpha2().GatewayClasses().Delete(ctx, gwc.Name, metav1.DeleteOptions{})) + require.NoError(t, c.GatewayV1alpha2().Gateways(ns.Name).Delete(ctx, gw.Name, metav1.DeleteOptions{})) + + t.Log("verifying that the Gateway gets unlinked from the route via status") + udpeventuallyGatewayIsUnlinkedInStatus(t, c, ns.Name, udproute.Name) + + t.Log("verifying that the data-plane configuration from the UDPRoute does not get orphaned with the GatewayClass and Gateway gone") + assert.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err != nil + }, ingressWait, waitTick) +} + +// TODO consolidate shared util gateway linked funcs +func udpeventuallyGatewayIsLinkedInStatus(t *testing.T, c *gatewayclient.Clientset, namespace, name string) { + require.Eventually(t, func() bool { + // gather a fresh copy of the UDPRoute + udproute, err := c.GatewayV1alpha2().UDPRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) + require.NoError(t, err) + + // determine if there is a link to a supported Gateway + for _, parentStatus := range udproute.Status.Parents { + if parentStatus.ControllerName == gateway.ControllerName { + // supported Gateway link was found + return true + } + } + + // if no link was found yet retry + return false + }, ingressWait, waitTick) +} + +func udpeventuallyGatewayIsUnlinkedInStatus(t *testing.T, c *gatewayclient.Clientset, namespace, name string) { + require.Eventually(t, func() bool { + // gather a fresh copy of the UDPRoute + udproute, err := c.GatewayV1alpha2().UDPRoutes(namespace).Get(ctx, name, metav1.GetOptions{}) + require.NoError(t, err) + + // determine if there is a link to a supported Gateway + for _, parentStatus := range udproute.Status.Parents { + if parentStatus.ControllerName == gateway.ControllerName { + // a supported Gateway link was found retry + return false + } + } + + // linked gateway is not present, all set + return true + }, ingressWait, waitTick) +} From d3e2a751d7a8311185e2fdb05c595298183c8233 Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Mon, 28 Mar 2022 13:22:22 -0700 Subject: [PATCH 4/6] chore(*) disable unparam linter on utility --- internal/controllers/gateway/utils.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/controllers/gateway/utils.go b/internal/controllers/gateway/utils.go index 5b9938deae..cb420ab918 100644 --- a/internal/controllers/gateway/utils.go +++ b/internal/controllers/gateway/utils.go @@ -21,7 +21,7 @@ func debug(log logr.Logger, obj client.Object, msg string, keysAndValues ...inte } // info is an alias for the longer log.V(util.InfoLevel).Info for convenience -func info(log logr.Logger, obj client.Object, msg string, keysAndValues ...interface{}) { +func info(log logr.Logger, obj client.Object, msg string, keysAndValues ...interface{}) { //nolint:unparam keysAndValues = append([]interface{}{ "namespace", obj.GetNamespace(), "name", obj.GetName(), From 43691c9e3850f8cd5c9684dbbb2ebba4ca0755b0 Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Fri, 1 Apr 2022 09:07:41 -0700 Subject: [PATCH 5/6] tests(examples) add UDPRoute test --- examples/gateway-udproute.yaml | 6 +---- test/integration/examples_test.go | 38 +++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/examples/gateway-udproute.yaml b/examples/gateway-udproute.yaml index da9364ef6a..43d686f89f 100644 --- a/examples/gateway-udproute.yaml +++ b/examples/gateway-udproute.yaml @@ -111,10 +111,6 @@ spec: parentRefs: - name: kong rules: - - matches: - - destinationAddresses: - - type: IPAddress - value: 192.0.2.254 - backendRefs: + - backendRefs: - name: coredns port: 53 diff --git a/test/integration/examples_test.go b/test/integration/examples_test.go index d75d10b690..ba36729df6 100644 --- a/test/integration/examples_test.go +++ b/test/integration/examples_test.go @@ -5,11 +5,14 @@ package integration import ( "bytes" + "context" "fmt" + "net" "net/http" "os" "strings" "testing" + "time" "github.com/kong/kubernetes-testing-framework/pkg/clusters" "github.com/miekg/dns" @@ -75,6 +78,41 @@ func TestHTTPRouteExample(t *testing.T) { }, ingressWait, waitTick) } +var udpRouteExampleManifests = fmt.Sprintf("%s/gateway-udproute.yaml", examplesDIR) + +func TestUDPRouteExample(t *testing.T) { + t.Logf("applying yaml manifest %s", strings.TrimPrefix(udpRouteExampleManifests, examplesDIR)) + b, err := os.ReadFile(udpRouteExampleManifests) + // TODO as of 2022-04-01, UDPRoute does not support using a different inbound port than the outbound + // destination service port. Once parentRef port functionality is stable, we should remove this + s := string(b) + s = strings.ReplaceAll(s, "port: 53", "port: 9999") + b = []byte(s) + require.NoError(t, err) + require.NoError(t, clusters.ApplyYAML(ctx, env.Cluster(), string(b))) + + defer func() { + require.NoError(t, clusters.DeleteYAML(ctx, env.Cluster(), string(b))) + }() + + t.Logf("configuring test and setting up API clients") + resolver := &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{ + Timeout: time.Second * 5, + } + return d.DialContext(ctx, network, fmt.Sprintf("%s:%d", proxyUDPURL.Hostname(), 9999)) + }, + } + + t.Logf("verifying that the UDPRoute becomes routable") + require.Eventually(t, func() bool { + _, err := resolver.LookupHost(ctx, "kernel.org") + return err == nil + }, ingressWait, waitTick) +} + var ingressExampleManifests = fmt.Sprintf("%s/ingress.yaml", examplesDIR) func TestIngressExample(t *testing.T) { From ebcca76be7a0cd71d27eb360ee48e478a995a2eb Mon Sep 17 00:00:00 2001 From: Travis Raines <571832+rainest@users.noreply.github.com> Date: Fri, 1 Apr 2022 12:16:28 -0700 Subject: [PATCH 6/6] feat(translate) handle zero-backend UDPRoutes --- internal/dataplane/parser/translate_udproute.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/dataplane/parser/translate_udproute.go b/internal/dataplane/parser/translate_udproute.go index cc11f47c09..f40e884fa3 100644 --- a/internal/dataplane/parser/translate_udproute.go +++ b/internal/dataplane/parser/translate_udproute.go @@ -108,6 +108,9 @@ func generateKongRoutesFromUDPRouteRule(udproute *gatewayv1alpha2.UDPRoute, rule // We do not intend to implement these until they are stable per https://github.com/Kong/kubernetes-ingress-controller/issues/2087#issuecomment-1079053290 return routes, fmt.Errorf("UDPRoute Matches are not yet supported") } + if len(rule.BackendRefs) == 0 { + return routes, fmt.Errorf("UDPRoute rules must include at least one backendRef") + } if len(rule.BackendRefs) > 1 { // TODO refactor around the solution used for https://github.com/Kong/kubernetes-ingress-controller/issues/2173 return routes, fmt.Errorf("Support for multiple backends in UDPRoute rules is not yet supported")