Skip to content

Commit

Permalink
feat: add configurable Admin API service port names to be used for se…
Browse files Browse the repository at this point in the history
…rvice discovery
  • Loading branch information
pmalek committed Feb 15, 2023
1 parent b9b9256 commit 72602bc
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 30 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ Adding a new version? You'll need three changes:
which accepts a namespaced name of headless kong admin service which should have
Admin API endpoints exposed under a named port called `admin`
[#3421](https://github.com/Kong/kubernetes-ingress-controller/pull/3421)
- Added configurable port names for Admin API service discovery through
`--kong-admin-svc-port-names`. This flag accepts a list of port names that
Admin API Service ports will be matched against.
[#3556](https://github.com/Kong/kubernetes-ingress-controller/pull/3556)
- Added `dataplane` metrics label for `ingress_controller_configuration_push_count`
and `ingress_controller_configuration_push_duration_milliseconds`. This means
that all time series for those metrics will get a new label designating the
Expand Down
15 changes: 10 additions & 5 deletions internal/adminapi/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ import (

// GetURLsForService performs an endpoint lookup, using provided kubeClient
// to list provided Admin API Service EndpointSlices.
func GetURLsForService(ctx context.Context, kubeClient client.Client, service types.NamespacedName) (sets.Set[string], error) {
// The retrieved EndpointSlices' ports are compared with the provided portNames set.
func GetURLsForService(
ctx context.Context,
kubeClient client.Client,
service types.NamespacedName,
portNames sets.Set[string],
) (sets.Set[string], error) {
const (
defaultEndpointSliceListPagingLimit = 100
)
Expand Down Expand Up @@ -42,7 +48,7 @@ func GetURLsForService(ctx context.Context, kubeClient client.Client, service ty
}

for _, es := range endpointsList.Items {
addresses = addresses.Union(AddressesFromEndpointSlice(es))
addresses = addresses.Union(AddressesFromEndpointSlice(es, portNames))
}

if endpointsList.Continue == "" {
Expand All @@ -54,15 +60,14 @@ func GetURLsForService(ctx context.Context, kubeClient client.Client, service ty

// AddressesFromEndpointSlice returns a list of Admin API addresses when given
// an Endpointslice.
func AddressesFromEndpointSlice(endpoints discoveryv1.EndpointSlice) sets.Set[string] {
func AddressesFromEndpointSlice(endpoints discoveryv1.EndpointSlice, portNames sets.Set[string]) sets.Set[string] {
addresses := sets.New[string]()
for _, p := range endpoints.Ports {
if p.Name == nil {
continue
}

// NOTE: consider making this configurable.
if *p.Name != "admin" {
if !portNames.Has(*p.Name) {
continue
}

Expand Down
62 changes: 50 additions & 12 deletions internal/adminapi/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestAddressesFromEndpointSlice(t *testing.T) {
name string
enspoints discoveryv1.EndpointSlice
want sets.Set[string]
portNames sets.Set[string]
}{
{
enspoints: discoveryv1.EndpointSlice{
Expand All @@ -46,8 +47,9 @@ func TestAddressesFromEndpointSlice(t *testing.T) {
},
},
},
name: "basic",
want: sets.New("https://10.0.0.1:8444", "https://10.0.0.2:8444"),
name: "basic",
want: sets.New("https://10.0.0.1:8444", "https://10.0.0.2:8444"),
portNames: sets.New("admin"),
},
{
enspoints: discoveryv1.EndpointSlice{
Expand All @@ -69,8 +71,9 @@ func TestAddressesFromEndpointSlice(t *testing.T) {
},
},
},
name: "not ready endpoints are not returned",
want: sets.New[string](),
name: "not ready endpoints are not returned",
want: sets.New[string](),
portNames: sets.New("admin"),
},
{
enspoints: discoveryv1.EndpointSlice{
Expand All @@ -95,8 +98,9 @@ func TestAddressesFromEndpointSlice(t *testing.T) {
},
},
},
name: "not ready and terminating endpoints are not returned",
want: sets.New[string](),
name: "not ready and terminating endpoints are not returned",
want: sets.New[string](),
portNames: sets.New("admin"),
},
{
enspoints: discoveryv1.EndpointSlice{
Expand Down Expand Up @@ -132,8 +136,9 @@ func TestAddressesFromEndpointSlice(t *testing.T) {
},
},
},
name: "multiple enpoints are concatenated properly",
want: sets.New("https://10.0.0.1:8444", "https://10.0.0.2:8444", "https://10.0.0.3:8444", "https://10.0.1.1:8444", "https://10.0.1.2:8444"),
name: "multiple enpoints are concatenated properly",
want: sets.New("https://10.0.0.1:8444", "https://10.0.0.2:8444", "https://10.0.0.3:8444", "https://10.0.1.1:8444", "https://10.0.1.2:8444"),
portNames: sets.New("admin"),
},
{
enspoints: discoveryv1.EndpointSlice{
Expand Down Expand Up @@ -191,14 +196,46 @@ func TestAddressesFromEndpointSlice(t *testing.T) {
},
},
},
name: "ports without names are not taken into account ",
want: sets.New[string](),
name: "ports without names are not taken into account ",
want: sets.New[string](),
portNames: sets.New("admin"),
},
{
enspoints: discoveryv1.EndpointSlice{
ObjectMeta: endpointsSliceObjectMeta,
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []string{"10.0.0.1", "10.0.0.2", "10.0.0.3"},
Conditions: discoveryv1.EndpointConditions{
Ready: lo.ToPtr(true),
Terminating: lo.ToPtr(false),
},
},
},
Ports: []discoveryv1.EndpointPort{
{
Name: lo.ToPtr("admin-tls"),
Port: lo.ToPtr(int32(8443)),
},
{
Name: lo.ToPtr("admin"),
Port: lo.ToPtr(int32(8444)),
},
},
},
name: "multiple ports names",
want: sets.New(
"https://10.0.0.1:8443", "https://10.0.0.2:8443", "https://10.0.0.3:8443",
"https://10.0.0.1:8444", "https://10.0.0.2:8444", "https://10.0.0.3:8444",
),
portNames: sets.New("admin", "admin-tls"),
},
}
for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
require.Equal(t, tt.want, AddressesFromEndpointSlice(tt.enspoints))
require.Equal(t, tt.want, AddressesFromEndpointSlice(tt.enspoints, tt.portNames))
})
}
}
Expand Down Expand Up @@ -344,7 +381,8 @@ func TestGetURLsForService(t *testing.T) {
WithLists(tt.objects...).
Build()

got, err := GetURLsForService(context.Background(), fakeClient, tt.service)
portNames := sets.New("admin")
got, err := GetURLsForService(context.Background(), fakeClient, tt.service, portNames)
if tt.wantErr {
require.Error(t, err)
return
Expand Down
8 changes: 5 additions & 3 deletions internal/controllers/configuration/kongadminapi_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ type KongAdminAPIServiceReconciler struct {
client.Client

// ServiceNN is the service NamespacedName to watch EndpointSlices for.
ServiceNN types.NamespacedName
ServiceNN types.NamespacedName
// TODO(pmalek)
PortNames sets.Set[string]
Log logr.Logger
CacheSyncTimeout time.Duration
// EndpointsNotifier is used to notify about Admin API endpoints changes.
Expand Down Expand Up @@ -125,14 +127,14 @@ func (r *KongAdminAPIServiceReconciler) Reconcile(ctx context.Context, req ctrl.
if !ok {
// If we don't have an entry for this EndpointSlice then save it and notify
// about the change.
r.Cache[nn] = adminapi.AddressesFromEndpointSlice(endpoints)
r.Cache[nn] = adminapi.AddressesFromEndpointSlice(endpoints, r.PortNames)
r.notify()
return ctrl.Result{}, nil
}

// We do have an entry for this EndpointSlice.
// Let's check if it's the same that we're already aware of...
addresses := adminapi.AddressesFromEndpointSlice(endpoints)
addresses := adminapi.AddressesFromEndpointSlice(endpoints, r.PortNames)
if cached.Equal(addresses) {
// No change, don't notify
return ctrl.Result{}, nil
Expand Down
21 changes: 12 additions & 9 deletions internal/manager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,16 @@ type Config struct {
CacheSyncTimeout time.Duration

// Kong Proxy configurations
APIServerHost string
APIServerQPS int
APIServerBurst int
MetricsAddr string
ProbeAddr string
KongAdminURLs []string
KongAdminSvc types.NamespacedName
ProxySyncSeconds float32
ProxyTimeoutSeconds float32
APIServerHost string
APIServerQPS int
APIServerBurst int
MetricsAddr string
ProbeAddr string
KongAdminURLs []string
KongAdminSvc types.NamespacedName
KondAdminSvcPortNames []string
ProxySyncSeconds float32
ProxyTimeoutSeconds float32

// Kubernetes configurations
KubeconfigPath string
Expand Down Expand Up @@ -151,6 +152,8 @@ func (c *Config) FlagSet() *pflag.FlagSet {
`More than 1 URL can be provided, in such case the flag should be used multiple times or a corresponding env variable should use comma delimited addresses.`)
flagSet.Var(NewValidatedValue(&c.KongAdminSvc, namespacedNameFromFlagValue), "kong-admin-svc",
`Kong Admin API Service namespaced name in "namespace/name" format, to use for Kong Gateway service discovery.`)
flagSet.StringSliceVar(&c.KondAdminSvcPortNames, "kong-admin-svc-port-names", []string{"admin", "admin-tls", "kong-admin", "kong-admin-tls"},
"Names of ports on Kong Admin API service to take into account when doing service discovery.")

// Kong Proxy and Proxy Cache configurations
flagSet.StringVar(&c.APIServerHost, "apiserver-host", "", `The Kubernetes API server URL. If not set, the controller will use cluster config discovery.`)
Expand Down
2 changes: 2 additions & 0 deletions internal/manager/controllerdef.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"reflect"

"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
knativev1alpha1 "knative.dev/networking/pkg/apis/networking/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand Down Expand Up @@ -94,6 +95,7 @@ func setupControllers(
Controller: &configuration.KongAdminAPIServiceReconciler{
Client: mgr.GetClient(),
ServiceNN: c.KongAdminSvc,
PortNames: sets.New(c.KondAdminSvcPortNames...),
Log: ctrl.Log.WithName("controllers").WithName("KongAdminAPIService"),
CacheSyncTimeout: c.CacheSyncTimeout,
EndpointsNotifier: kongAdminAPIEndpointsNotifier,
Expand Down
3 changes: 2 additions & 1 deletion internal/manager/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand Down Expand Up @@ -294,7 +295,7 @@ func (c *Config) getKongClients(ctx context.Context) ([]*adminapi.Client, error)
// instance and without an address and there's no way to initialize the
// configuration validation and sending code.
err = retry.Do(func() error {
s, err := adminapi.GetURLsForService(ctx, kubeClient, c.KongAdminSvc)
s, err := adminapi.GetURLsForService(ctx, kubeClient, c.KongAdminSvc, sets.New(c.KondAdminSvcPortNames...))
if err != nil {
return err
}
Expand Down

0 comments on commit 72602bc

Please sign in to comment.