Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(status) add UDP publish service #3325

Merged
merged 15 commits into from
Jan 24, 2023
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,7 @@ test.istio: gotestsum
KUBECONFIG ?= "${HOME}/.kube/config"
KONG_NAMESPACE ?= kong-system
KONG_PROXY_SERVICE ?= ingress-controller-kong-proxy
KONG_PROXY_UDP_SERVICE ?= ingress-controller-kong-udp-proxy
KONG_ADMIN_SERVICE ?= ingress-controller-kong-admin
KONG_ADMIN_PORT ?= 8001
KONG_ADMIN_URL ?= "http://$(shell kubectl -n $(KONG_NAMESPACE) get service $(KONG_ADMIN_SERVICE) -o=go-template='{{range .status.loadBalancer.ingress}}{{.ip}}{{end}}'):$(KONG_ADMIN_PORT)"
Expand All @@ -481,6 +482,7 @@ debug: install _ensure-namespace
$(DLV) debug ./internal/cmd/main.go -- \
--kong-admin-url $(KONG_ADMIN_URL) \
--publish-service $(KONG_NAMESPACE)/$(KONG_PROXY_SERVICE) \
--publish-service-udp $(KONG_NAMESPACE)/$(KONG_PROXY_UDP_SERVICE) \
--kubeconfig $(KUBECONFIG) \
--feature-gates=$(KONG_CONTROLLER_FEATURE_GATES)

Expand Down Expand Up @@ -526,6 +528,7 @@ _run:
go run ./internal/cmd/main.go \
--kong-admin-url $(KONG_ADMIN_URL) \
--publish-service $(KONG_NAMESPACE)/$(KONG_PROXY_SERVICE) \
--publish-service-udp $(KONG_NAMESPACE)/$(KONG_PROXY_UDP_SERVICE) \
--kubeconfig $(KUBECONFIG) \
--feature-gates=$(KONG_CONTROLLER_FEATURE_GATES)

Expand Down
6 changes: 6 additions & 0 deletions hack/generators/controllers/networking/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ var inputControllersNeeded = &typesNeeded{
Package: kongv1beta1,
Plural: "udpingresses",
CacheType: "UDPIngress",
IsUDP: true,
NeedsStatusPermissions: true,
CapableOfStatusUpdates: true,
AcceptsIngressClassNameAnnotation: true,
Expand Down Expand Up @@ -333,6 +334,7 @@ type typeNeeded struct {
Plural string
CacheType string
RBACVerbs []string
IsUDP bool

// AcceptsIngressClassNameAnnotation indicates that the object accepts (and the controller will listen to)
// the "kubernetes.io/ingress.class" annotation to decide whether or not the object is supported.
Expand Down Expand Up @@ -627,7 +629,11 @@ func (r *{{.PackageAlias}}{{.Kind}}Reconciler) Reconcile(ctx context.Context, re
}

log.V(util.DebugLevel).Info("determining gateway addresses for object status updates", "namespace", req.Namespace, "name", req.Name)
{{- if .IsUDP }}
addrs, err := r.DataplaneAddressFinder.GetUDPLoadBalancerAddresses()
{{- else }}
addrs, err := r.DataplaneAddressFinder.GetLoadBalancerAddresses()
{{- end }}
if err != nil {
return ctrl.Result{}, err
}
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

51 changes: 49 additions & 2 deletions internal/dataplane/address_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ type AddressGetter func() ([]string, error)
// AddressFinder is a threadsafe metadata object which can provide the current
// live addresses in use by the dataplane at any point in time.
type AddressFinder struct {
overrideAddresses []string
addressGetter AddressGetter
overrideAddresses []string
overrideAddressesUDP []string
addressGetter AddressGetter

lock sync.RWMutex
}
Expand Down Expand Up @@ -54,6 +55,15 @@ func (a *AddressFinder) SetOverrides(addrs []string) {
a.overrideAddresses = addrs
}

// SetUDPOverrides hard codes a specific list of addresses to be the UDP addresses
// that this finder produces for the data-plane. To disable overrides, call
// this method again with an empty list.
func (a *AddressFinder) SetUDPOverrides(addrs []string) {
a.lock.Lock()
defer a.lock.Unlock()
a.overrideAddressesUDP = addrs
}

// GetAddresses provides a list of the addresses which the data-plane is
// listening on for ingress network traffic. Addresses can either be IP
// addresses or hostnames.
Expand All @@ -72,6 +82,28 @@ func (a *AddressFinder) GetAddresses() ([]string, error) {
return nil, fmt.Errorf("data-plane addresses can't be retrieved: no valid method available")
}

// GetUDPAddresses provides a list of the UDP addresses which the data-plane is
// listening on for ingress network traffic. Addresses can either be IP
// addresses or hostnames. If UDP settings are not configured, falls back to GetAddresses().
func (a *AddressFinder) GetUDPAddresses() ([]string, error) {
a.lock.RLock()
defer a.lock.RUnlock()

if len(a.overrideAddressesUDP) > 0 {
return a.overrideAddressesUDP, nil
}

if len(a.overrideAddresses) > 0 && a.addressGetter == nil {
rainest marked this conversation as resolved.
Show resolved Hide resolved
return a.overrideAddresses, nil
}

if a.addressGetter != nil {
return a.addressGetter()
}

return a.GetAddresses()
rainest marked this conversation as resolved.
Show resolved Hide resolved
}

// GetLoadBalancerAddresses provides a list of the addresses which the
// data-plane is listening on for ingress network traffic, but provides the
// addresses in Kubernetes corev1.LoadBalancerIngress format. Addresses can be
Expand All @@ -81,7 +113,10 @@ func (a *AddressFinder) GetLoadBalancerAddresses() ([]netv1.IngressLoadBalancerI
if err != nil {
return nil, err
}
return getAddressHelper(addrs)
}

func getAddressHelper(addrs []string) ([]netv1.IngressLoadBalancerIngress, error) {
rainest marked this conversation as resolved.
Show resolved Hide resolved
var loadBalancerAddresses []netv1.IngressLoadBalancerIngress
for _, addr := range addrs {
ing := netv1.IngressLoadBalancerIngress{}
Expand All @@ -99,6 +134,18 @@ func (a *AddressFinder) GetLoadBalancerAddresses() ([]netv1.IngressLoadBalancerI
return loadBalancerAddresses, nil
}

// GetUDPLoadBalancerAddresses provides a list of the addresses which the
// data-plane is listening on for UDP network traffic, but provides the
// addresses in Kubernetes corev1.LoadBalancerIngress format. Addresses can be
// IP addresses or hostnames.
func (a *AddressFinder) GetUDPLoadBalancerAddresses() ([]netv1.IngressLoadBalancerIngress, error) {
rainest marked this conversation as resolved.
Show resolved Hide resolved
addrs, err := a.GetUDPAddresses()
if err != nil {
return nil, err
}
return getAddressHelper(addrs)
}

// -----------------------------------------------------------------------------
//
// -----------------------------------------------------------------------------
Expand Down
38 changes: 38 additions & 0 deletions internal/dataplane/address_finder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,3 +74,41 @@ func TestAddressFinder(t *testing.T) {
require.Empty(t, lbs)
require.Equal(t, fmt.Sprintf("%s is not a valid DNS hostname", invalidDNSAddrs[0]), err.Error())
}

func TestUDPAddressFinder(t *testing.T) {
t.Log("generating a new AddressFinder")
finder := NewAddressFinder()
require.NotNil(t, finder)
require.Nil(t, finder.addressGetter)

t.Log("generating fake AddressGetters")
defaultAddrs := []string{"127.0.0.1", "127.0.0.2"}
overrideAddrs := []string{"192.168.1.1", "192.168.1.2", "192.168.1.3"}
fakeGetter := func() ([]string, error) { return defaultAddrs, nil }

defaultUDPAddrs := []string{"127.1.0.1", "127.1.0.2"}
overrideUDPAddrs := []string{"192.168.2.1", "192.168.2.2", "192.168.2.3"}
fakeUDPGetter := func() ([]string, error) { return defaultUDPAddrs, nil }

t.Log("verifying getting a list of addresses from the finder after a getter function is provided")
finder.SetGetter(fakeGetter)
addrs, err := finder.GetUDPAddresses()
require.NoError(t, err)
require.Equal(t, defaultAddrs, addrs)

t.Log("verifying that overrides take precedent over the getter")
finder.SetOverrides(overrideAddrs)
addrs, err = finder.GetAddresses()
require.NoError(t, err)
require.Equal(t, overrideAddrs, addrs)

finder.SetGetter(fakeUDPGetter)
addrs, err = finder.GetUDPAddresses()
require.NoError(t, err)
require.Equal(t, defaultUDPAddrs, addrs)

finder.SetUDPOverrides(overrideUDPAddrs)
addrs, err = finder.GetUDPAddresses()
require.NoError(t, err)
require.Equal(t, overrideUDPAddrs, addrs)
}
23 changes: 15 additions & 8 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,11 @@ type Config struct {
GatewayAPIControllerName string

// Ingress status
PublishService string
PublishStatusAddress []string
UpdateStatus bool
PublishService string
PublishServiceUDP string
PublishStatusAddress []string
PublishStatusAddressUDP []string
UpdateStatus bool

// Kubernetes API toggling
IngressExtV1beta1Enabled bool
Expand Down Expand Up @@ -165,11 +167,16 @@ func (c *Config) FlagSet() *pflag.FlagSet {
`To watch multiple namespaces, use a comma-separated list of namespaces.`)

// Ingress status
flagSet.StringVar(&c.PublishService, "publish-service", "",
`Service fronting Ingress resources in "namespace/name" format. The controller will update Ingress status information with this Service's endpoints.`)
flagSet.StringSliceVar(&c.PublishStatusAddress, "publish-status-address", []string{},
`User-provided addresses in comma-separated string format, for use in lieu of "publish-service" `+
`when that Service lacks useful address information (for example, in bare-metal environments).`)
flagSet.StringVar(&c.PublishService, "publish-service", "", `Service fronting routing resources in "namespace/name"
format. The controller will update route resource status information with this Service's endpoints.`)
flagSet.StringSliceVar(&c.PublishStatusAddress, "publish-status-address", []string{}, `User-provided address CSV.
For use in lieu of "publish-service" when that Service lacks useful address information (for example,
in bare-metal environments).`)
flagSet.StringVar(&c.PublishServiceUDP, "publish-service-udp", "", `Service fronting UDP routing resources in
"namespace/name" format. The controller will update UDP route status information with this Service's
endpoints. If omitted, the same Service will be used for both TCP and UDP routes.`)
flagSet.StringSliceVar(&c.PublishStatusAddressUDP, "publish-status-address-udp", []string{}, `User-provided
address CSV, for use in lieu of "publish-service-udp" when that Service lacks useful address information.`)
flagSet.BoolVar(&c.UpdateStatus, "update-status", true,
`Indicates if the ingress controller should update the status of resources (e.g. IP/Hostname for v1.Ingress, e.t.c.)`)

Expand Down
3 changes: 2 additions & 1 deletion internal/manager/controllerdef.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func setupControllers(
mgr manager.Manager,
dataplaneClient *dataplane.KongClient,
dataplaneAddressFinder *dataplane.AddressFinder,
udpDataplaneAddressFinder *dataplane.AddressFinder,
kubernetesStatusQueue *status.Queue,
c *Config,
featureGates map[string]bool,
Expand Down Expand Up @@ -197,7 +198,7 @@ func setupControllers(
IngressClassName: c.IngressClassName,
DisableIngressClassLookups: !c.IngressClassNetV1Enabled,
StatusQueue: kubernetesStatusQueue,
DataplaneAddressFinder: dataplaneAddressFinder,
DataplaneAddressFinder: udpDataplaneAddressFinder,
rainest marked this conversation as resolved.
Show resolved Hide resolved
CacheSyncTimeout: c.CacheSyncTimeout,
},
},
Expand Down
5 changes: 3 additions & 2 deletions internal/manager/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
}

setupLog.Info("Initializing Dataplane Address Discovery")
dataplaneAddressFinder, err := setupDataplaneAddressFinder(ctx, mgr.GetClient(), c)
dataplaneAddressFinder, udpDataplaneAddressFinder, err := setupDataplaneAddressFinder(ctx, mgr.GetClient(), c)
if err != nil {
return err
}
Expand All @@ -190,7 +190,8 @@ func Run(ctx context.Context, c *Config, diagnostic util.ConfigDumpDiagnostic, d
gateway.ControllerName = gatewayv1beta1.GatewayController(c.GatewayAPIControllerName)

setupLog.Info("Starting Enabled Controllers")
controllers, err := setupControllers(mgr, dataplaneClient, dataplaneAddressFinder, kubernetesStatusQueue, c, featureGates)
controllers, err := setupControllers(mgr, dataplaneClient,
dataplaneAddressFinder, udpDataplaneAddressFinder, kubernetesStatusQueue, c, featureGates)
if err != nil {
return fmt.Errorf("unable to setup controller as expected %w", err)
}
Expand Down
95 changes: 66 additions & 29 deletions internal/manager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,51 +200,88 @@ func setupAdmissionServer(
return nil
}

func setupDataplaneAddressFinder(ctx context.Context, mgrc client.Client, c *Config) (*dataplane.AddressFinder, error) {
// setupDataplaneAddressFinder returns a default and UDP address finder. These finders return the override addresses if
// set or the publish service addresses if no overrides are set. If no UDP overrides or UDP publish service are set,
// the UDP finder will also return the default addresses. If no override or publish service is set, this function
// returns nil finders and an error.
func setupDataplaneAddressFinder(
ctx context.Context,
rainest marked this conversation as resolved.
Show resolved Hide resolved
mgrc client.Client,
c *Config,
) (*dataplane.AddressFinder, *dataplane.AddressFinder, error) {
dataplaneAddressFinder := dataplane.NewAddressFinder()
udpDataplaneAddressFinder := dataplane.NewAddressFinder()
var getter func() ([]string, error)
if c.UpdateStatus {
// Default
if overrideAddrs := c.PublishStatusAddress; len(overrideAddrs) > 0 {
dataplaneAddressFinder.SetOverrides(overrideAddrs)
} else if c.PublishService != "" {
parts := strings.Split(c.PublishService, "/")
if len(parts) != 2 {
return nil, fmt.Errorf("publish service %s is invalid, expecting <namespace>/<name>", c.PublishService)
return nil, nil, fmt.Errorf("publish service %s is invalid, expecting <namespace>/<name>", c.PublishService)
}
nsn := types.NamespacedName{
Namespace: parts[0],
Name: parts[1],
}
dataplaneAddressFinder.SetGetter(func() ([]string, error) {
svc := new(corev1.Service)
if err := mgrc.Get(ctx, nsn, svc); err != nil {
return nil, err
}
getter = generateAddressFinderGetter(ctx, mgrc, nsn)
dataplaneAddressFinder.SetGetter(getter)
} else {
return nil, nil, fmt.Errorf("status updates enabled but no method to determine data-plane addresses, need either --publish-service or --publish-status-address")
}

var addrs []string
switch svc.Spec.Type { //nolint:exhaustive
case corev1.ServiceTypeLoadBalancer:
for _, lbaddr := range svc.Status.LoadBalancer.Ingress {
if lbaddr.IP != "" {
addrs = append(addrs, lbaddr.IP)
}
if lbaddr.Hostname != "" {
addrs = append(addrs, lbaddr.Hostname)
}
}
default:
addrs = append(addrs, svc.Spec.ClusterIPs...)
}
// UDP. falls back to default if not configured
if udpOverrideAddrs := c.PublishStatusAddressUDP; len(udpOverrideAddrs) > 0 {
udpDataplaneAddressFinder.SetUDPOverrides(udpOverrideAddrs)
} else if c.PublishServiceUDP != "" {
parts := strings.Split(c.PublishServiceUDP, "/")
if len(parts) != 2 {
return nil, nil, fmt.Errorf("UDP publish service %s is invalid, expecting <namespace>/<name>", c.PublishService)
}
nsn := types.NamespacedName{
Namespace: parts[0],
Name: parts[1],
}
udpDataplaneAddressFinder.SetGetter(generateAddressFinderGetter(ctx, mgrc, nsn))
} else {
udpDataplaneAddressFinder.SetGetter(getter)
}
}

return dataplaneAddressFinder, udpDataplaneAddressFinder, nil
}

func generateAddressFinderGetter(
ctx context.Context,
mgrc client.Client,
nsn types.NamespacedName,
) func() ([]string, error) {
rainest marked this conversation as resolved.
Show resolved Hide resolved
return func() ([]string, error) {
svc := new(corev1.Service)
if err := mgrc.Get(ctx, nsn, svc); err != nil {
return nil, err
}

if len(addrs) == 0 {
return nil, fmt.Errorf("waiting for addresses to be provisioned for publish service %s/%s", nsn.Namespace, nsn.Name)
var addrs []string
switch svc.Spec.Type { //nolint:exhaustive
case corev1.ServiceTypeLoadBalancer:
for _, lbaddr := range svc.Status.LoadBalancer.Ingress {
if lbaddr.IP != "" {
addrs = append(addrs, lbaddr.IP)
}
if lbaddr.Hostname != "" {
addrs = append(addrs, lbaddr.Hostname)
}
}
default:
addrs = append(addrs, svc.Spec.ClusterIPs...)
}

return addrs, nil
})
} else {
return nil, fmt.Errorf("status updates enabled but no method to determine data-plane addresses, need either --publish-service or --publish-status-address")
if len(addrs) == 0 {
return nil, fmt.Errorf("waiting for addresses to be provisioned for publish service %s/%s", nsn.Namespace, nsn.Name)
}
}

return dataplaneAddressFinder, nil
return addrs, nil
}
}
1 change: 1 addition & 0 deletions internal/util/test/controller_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ func DeployControllerManagerForCluster(
fmt.Sprintf("--kubeconfig=%s", kubeconfig.Name()),
"--election-id=integrationtests.konghq.com",
"--publish-service=kong-system/ingress-controller-kong-proxy",
"--publish-service-udp=kong-system/ingress-controller-kong-udp-proxy",
"--log-format=text",
}
controllerManagerFlags = append(controllerManagerFlags, additionalFlags...)
Expand Down
Loading