Skip to content

Commit

Permalink
Merge branch 'main' into spatel/NET-1646-add-max-ejection-percent-and…
Browse files Browse the repository at this point in the history
…-base-ejection-time
  • Loading branch information
analogue committed Jan 17, 2023
2 parents 978c3fa + 9ab7bea commit 669d94c
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 33 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ IMPROVEMENTS:
* Add the `balanceInboundConnections` field to the `ServiceDefaults` CRD. [[GH-1823]](https://github.com/hashicorp/consul-k8s/pull/1823)
* Control-Plane
* Add support for the annotation `consul.hashicorp.com/use-proxy-health-check`. [[GH-1824](https://github.com/hashicorp/consul-k8s/pull/1824)]
* Add health check for synced services based on the status of the Kubernetes readiness probe on synced pod. [[GH-1821](https://github.com/hashicorp/consul-k8s/pull/1821)]

BUG FIXES:
* Control Plane
Expand Down
8 changes: 7 additions & 1 deletion acceptance/tests/sync/sync_catalog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/hashicorp/consul-k8s/acceptance/framework/helpers"
"github.com/hashicorp/consul-k8s/acceptance/framework/k8s"
"github.com/hashicorp/consul-k8s/acceptance/framework/logger"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -65,8 +66,13 @@ func TestSyncCatalog(t *testing.T) {

service, _, err := consulClient.Catalog().Service(syncedServiceName, "", nil)
require.NoError(t, err)
require.Equal(t, 1, len(service))
require.Len(t, service, 1)
require.Equal(t, []string{"k8s"}, service[0].ServiceTags)
filter := fmt.Sprintf("ServiceID == %q", service[0].ServiceID)
healthChecks, _, err := consulClient.Health().Checks(syncedServiceName, &api.QueryOptions{Filter: filter})
require.NoError(t, err)
require.Len(t, healthChecks, 1)
require.Equal(t, api.HealthPassing, healthChecks[0].Status)
})
}
}
67 changes: 44 additions & 23 deletions control-plane/catalog/to-consul/resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/hashicorp/consul-k8s/control-plane/namespaces"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
apiv1 "k8s.io/api/core/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
Expand All @@ -33,6 +33,12 @@ const (
ConsulK8SRefKind = "external-k8s-ref-kind"
ConsulK8SRefValue = "external-k8s-ref-name"
ConsulK8SNodeName = "external-k8s-node-name"

// consulKubernetesCheckType is the type of health check in Consul for Kubernetes readiness status.
consulKubernetesCheckType = "kubernetes-readiness"
// consulKubernetesCheckName is the name of health check in Consul for Kubernetes readiness status.
consulKubernetesCheckName = "Kubernetes Readiness Check"
kubernetesSuccessReasonMsg = "Kubernetes health checks passing"
)

type NodePortSyncType string
Expand Down Expand Up @@ -131,11 +137,11 @@ type ServiceResource struct {

// serviceMap holds services we should sync to Consul. Keys are the
// in the form <kube namespace>/<kube svc name>.
serviceMap map[string]*apiv1.Service
serviceMap map[string]*corev1.Service

// endpointsMap uses the same keys as serviceMap but maps to the endpoints
// of each service.
endpointsMap map[string]*apiv1.Endpoints
endpointsMap map[string]*corev1.Endpoints

// consulMap holds the services in Consul that we've registered from kube.
// It's populated via Consul's API and lets us diff what is actually in
Expand All @@ -157,7 +163,7 @@ func (t *ServiceResource) Informer() cache.SharedIndexInformer {
return t.Client.CoreV1().Services(metav1.NamespaceAll).Watch(t.Ctx, options)
},
},
&apiv1.Service{},
&corev1.Service{},
0,
cache.Indexers{},
)
Expand All @@ -166,7 +172,7 @@ func (t *ServiceResource) Informer() cache.SharedIndexInformer {
// Upsert implements the controller.Resource interface.
func (t *ServiceResource) Upsert(key string, raw interface{}) error {
// We expect a Service. If it isn't a service then just ignore it.
service, ok := raw.(*apiv1.Service)
service, ok := raw.(*corev1.Service)
if !ok {
t.Log.Warn("upsert got invalid type", "raw", raw)
return nil
Expand All @@ -176,7 +182,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
defer t.serviceLock.Unlock()

if t.serviceMap == nil {
t.serviceMap = make(map[string]*apiv1.Service)
t.serviceMap = make(map[string]*corev1.Service)
}

if !t.shouldSync(service) {
Expand Down Expand Up @@ -205,7 +211,7 @@ func (t *ServiceResource) Upsert(key string, raw interface{}) error {
"err", err)
} else {
if t.endpointsMap == nil {
t.endpointsMap = make(map[string]*apiv1.Endpoints)
t.endpointsMap = make(map[string]*corev1.Endpoints)
}
t.endpointsMap[key] = endpoints
t.Log.Debug("[ServiceResource.Upsert] adding service's endpoints to endpointsMap", "key", key, "service", service, "endpoints", endpoints)
Expand Down Expand Up @@ -254,7 +260,7 @@ func (t *ServiceResource) Run(ch <-chan struct{}) {
}

// shouldSync returns true if resyncing should be enabled for the given service.
func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool {
func (t *ServiceResource) shouldSync(svc *corev1.Service) bool {
// Namespace logic
// If in deny list, don't sync
if t.DenyK8sNamespacesSet.Contains(svc.Namespace) {
Expand All @@ -269,7 +275,7 @@ func (t *ServiceResource) shouldSync(svc *apiv1.Service) bool {
}

// Ignore ClusterIP services if ClusterIP sync is disabled
if svc.Spec.Type == apiv1.ServiceTypeClusterIP && !t.ClusterIPSync {
if svc.Spec.Type == corev1.ServiceTypeClusterIP && !t.ClusterIPSync {
t.Log.Debug("[shouldSync] ignoring clusterip service", "svc.Namespace", svc.Namespace, "service", svc)
return false
}
Expand Down Expand Up @@ -310,9 +316,9 @@ func (t *ServiceResource) shouldTrackEndpoints(key string) bool {
return false
}

return svc.Spec.Type == apiv1.ServiceTypeNodePort ||
svc.Spec.Type == apiv1.ServiceTypeClusterIP ||
(t.LoadBalancerEndpointsSync && svc.Spec.Type == apiv1.ServiceTypeLoadBalancer)
return svc.Spec.Type == corev1.ServiceTypeNodePort ||
svc.Spec.Type == corev1.ServiceTypeClusterIP ||
(t.LoadBalancerEndpointsSync && svc.Spec.Type == corev1.ServiceTypeLoadBalancer)
}

// generateRegistrations generates the necessary Consul registrations for
Expand Down Expand Up @@ -380,7 +386,7 @@ func (t *ServiceResource) generateRegistrations(key string) {
var overridePortNumber int
if len(svc.Spec.Ports) > 0 {
var port int
isNodePort := svc.Spec.Type == apiv1.ServiceTypeNodePort
isNodePort := svc.Spec.Type == corev1.ServiceTypeNodePort

// If a specific port is specified, then use that port value
portAnnotation, ok := svc.Annotations[annotationServicePort]
Expand Down Expand Up @@ -479,7 +485,7 @@ func (t *ServiceResource) generateRegistrations(key string) {
// each LoadBalancer entry. We only support entries that have an IP
// address assigned (not hostnames).
// If LoadBalancerEndpointsSync is true sync LB endpoints instead of loadbalancer ingress.
case apiv1.ServiceTypeLoadBalancer:
case corev1.ServiceTypeLoadBalancer:
if t.LoadBalancerEndpointsSync {
t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, false)
} else {
Expand Down Expand Up @@ -512,7 +518,7 @@ func (t *ServiceResource) generateRegistrations(key string) {
// endpoint of the service, which corresponds to the nodes the service's
// pods are running on. This way we don't register _every_ K8S
// node as part of the service.
case apiv1.ServiceTypeNodePort:
case corev1.ServiceTypeNodePort:
if t.endpointsMap == nil {
return
}
Expand All @@ -538,11 +544,11 @@ func (t *ServiceResource) generateRegistrations(key string) {
}

// Set the expected node address type
var expectedType apiv1.NodeAddressType
var expectedType corev1.NodeAddressType
if t.NodePortSync == InternalOnly {
expectedType = apiv1.NodeInternalIP
expectedType = corev1.NodeInternalIP
} else {
expectedType = apiv1.NodeExternalIP
expectedType = corev1.NodeExternalIP
}

// Find the ip address for the node and
Expand Down Expand Up @@ -571,7 +577,7 @@ func (t *ServiceResource) generateRegistrations(key string) {
// use an InternalIP
if t.NodePortSync == ExternalFirst && !found {
for _, address := range node.Status.Addresses {
if address.Type == apiv1.NodeInternalIP {
if address.Type == corev1.NodeInternalIP {
r := baseNode
rs := baseService
r.Service = &rs
Expand All @@ -593,7 +599,7 @@ func (t *ServiceResource) generateRegistrations(key string) {

// For ClusterIP services, we register a service instance
// for each endpoint.
case apiv1.ServiceTypeClusterIP:
case corev1.ServiceTypeClusterIP:
t.registerServiceInstance(baseNode, baseService, key, overridePortName, overridePortNumber, true)
}
}
Expand Down Expand Up @@ -674,6 +680,16 @@ func (t *ServiceResource) registerServiceInstance(
r.Service.Meta[ConsulK8SNodeName] = *subsetAddr.NodeName
}

r.Check = &consulapi.AgentCheck{
CheckID: consulHealthCheckID(endpoints.Namespace, serviceID(r.Service.Service, addr)),
Name: consulKubernetesCheckName,
Namespace: baseService.Namespace,
Type: consulKubernetesCheckType,
Status: consulapi.HealthPassing,
ServiceID: serviceID(r.Service.Service, addr),
Output: kubernetesSuccessReasonMsg,
}

t.consulMap[key] = append(t.consulMap[key], &r)
}
}
Expand Down Expand Up @@ -723,15 +739,15 @@ func (t *serviceEndpointsResource) Informer() cache.SharedIndexInformer {
Watch(t.Ctx, options)
},
},
&apiv1.Endpoints{},
&corev1.Endpoints{},
0,
cache.Indexers{},
)
}

func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error {
svc := t.Service
endpoints, ok := raw.(*apiv1.Endpoints)
endpoints, ok := raw.(*corev1.Endpoints)
if !ok {
svc.Log.Warn("upsert got invalid type", "raw", raw)
return nil
Expand All @@ -747,7 +763,7 @@ func (t *serviceEndpointsResource) Upsert(key string, raw interface{}) error {

// We are tracking this service so let's keep track of the endpoints
if svc.endpointsMap == nil {
svc.endpointsMap = make(map[string]*apiv1.Endpoints)
svc.endpointsMap = make(map[string]*corev1.Endpoints)
}
svc.endpointsMap[key] = endpoints

Expand Down Expand Up @@ -788,3 +804,8 @@ func (t *ServiceResource) addPrefixAndK8SNamespace(name, namespace string) strin

return name
}

// consulHealthCheckID deterministically generates a health check ID based on service ID and Kubernetes namespace.
func consulHealthCheckID(k8sNS string, serviceID string) string {
return fmt.Sprintf("%s/%s", k8sNS, serviceID)
}
38 changes: 38 additions & 0 deletions control-plane/catalog/to-consul/resource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

mapset "github.com/deckarep/golang-set"
"github.com/hashicorp/consul-k8s/control-plane/helper/controller"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -1005,6 +1006,43 @@ func TestServiceResource_clusterIP(t *testing.T) {
})
}

// Test that the proper registrations with health checks are generated for a ClusterIP type.
func TestServiceResource_clusterIP_healthCheck(t *testing.T) {
t.Parallel()
client := fake.NewSimpleClientset()
syncer := newTestSyncer()
serviceResource := defaultServiceResource(client, syncer)
serviceResource.ClusterIPSync = true

// Start the controller
closer := controller.TestControllerRun(&serviceResource)
defer closer()

// Insert the service
svc := clusterIPService("foo", metav1.NamespaceDefault)
_, err := client.CoreV1().Services(metav1.NamespaceDefault).Create(context.Background(), svc, metav1.CreateOptions{})
require.NoError(t, err)

// Insert the endpoints
createEndpoints(t, client, "foo", metav1.NamespaceDefault)

// Verify what we got
retry.Run(t, func(r *retry.R) {
syncer.Lock()
defer syncer.Unlock()
actual := syncer.Registrations
require.Len(r, actual, 2)
require.Equal(r, consulKubernetesCheckName, actual[0].Check.Name)
require.Equal(r, consulapi.HealthPassing, actual[0].Check.Status)
require.Equal(r, kubernetesSuccessReasonMsg, actual[0].Check.Output)
require.Equal(r, consulKubernetesCheckType, actual[0].Check.Type)
require.Equal(r, consulKubernetesCheckName, actual[1].Check.Name)
require.Equal(r, consulapi.HealthPassing, actual[1].Check.Status)
require.Equal(r, kubernetesSuccessReasonMsg, actual[1].Check.Output)
require.Equal(r, consulKubernetesCheckType, actual[1].Check.Type)
})
}

// Test clusterIP with prefix.
func TestServiceResource_clusterIPPrefix(t *testing.T) {
t.Parallel()
Expand Down
9 changes: 0 additions & 9 deletions control-plane/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,8 @@ github.com/hashicorp/consul-k8s/control-plane/cni v0.0.0-20220831174802-b8af6526
github.com/hashicorp/consul-server-connection-manager v0.1.0 h1:XCweGvMHzra88rYv2zxwwuUOjBUdcQmNKVrnQmt/muo=
github.com/hashicorp/consul-server-connection-manager v0.1.0/go.mod h1:XVVlO+Yk7aiRpspiHZkrrFVn9BJIiOPnQIzqytPxGaU=
github.com/hashicorp/consul/api v1.1.0/go.mod h1:VmuI/Lkw1nC05EYQWNKwWGbkg+FbDBtguAZLlVdkD9Q=
github.com/hashicorp/consul/api v1.10.1-0.20221220195433-629878a6879c h1:dIy67NF/J5gm0wdGxTA8dCFeQDz8TatuuMEzC7n3aDk=
github.com/hashicorp/consul/api v1.10.1-0.20221220195433-629878a6879c/go.mod h1:c1u8FzGHcavbEtRW/p1YditvfMgn4QsKNgz2rnCDF7c=
github.com/hashicorp/consul/api v1.10.1-0.20230106171340-8d923c178919 h1:8aVegJMSv7PIAAa1zqQQ0CT4TKv+Nf7I4rhE6+uDa1U=
github.com/hashicorp/consul/api v1.10.1-0.20230106171340-8d923c178919/go.mod h1:c1u8FzGHcavbEtRW/p1YditvfMgn4QsKNgz2rnCDF7c=
github.com/hashicorp/consul/api v1.18.0 h1:R7PPNzTCeN6VuQNDwwhZWJvzCtGSrNpJqfb22h3yH9g=
github.com/hashicorp/consul/api v1.18.0/go.mod h1:owRRGJ9M5xReDC5nfT8FTJrNAPbT4NM6p/k+d03q2v4=
github.com/hashicorp/consul/proto-public v0.1.0 h1:O0LSmCqydZi363hsqc6n2v5sMz3usQMXZF6ziK3SzXU=
github.com/hashicorp/consul/proto-public v0.1.0/go.mod h1:vs2KkuWwtjkIgA5ezp4YKPzQp4GitV+q/+PvksrA92k=
github.com/hashicorp/consul/sdk v0.4.1-0.20221021205723-cc843c4be892 h1:jw0NwPmNPr5CxAU04hACdj61JSaJBKZ0FdBo+kwfNp4=
Expand Down Expand Up @@ -384,7 +380,6 @@ github.com/hashicorp/go-rootcerts v1.0.2 h1:jzhAVGtqPKbwpyCPELlgNWhE1znq+qwJtW5O
github.com/hashicorp/go-rootcerts v1.0.2/go.mod h1:pqUvnprVnM5bf7AOirdbb01K4ccR319Vf4pU3K5EGc8=
github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerXegt+ozgdvDeDU=
github.com/hashicorp/go-sockaddr v1.0.2 h1:ztczhD1jLxIRjVejw8gFomI1BQZOe2WoVOu0SyteCQc=
github.com/hashicorp/go-sockaddr v1.0.2/go.mod h1:rB4wwRAUzs07qva3c5SdrY/NEtAUjGlgmH/UkBUC97A=
github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4=
github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
Expand Down Expand Up @@ -454,7 +449,6 @@ github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxv
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -495,7 +489,6 @@ github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXx
github.com/mitchellh/go-homedir v1.0.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/gox v0.4.0/go.mod h1:Sd9lOJ0+aimLBi73mGofS1ycjY8lL3uZM3JPS42BGNg=
github.com/mitchellh/iochan v1.0.0/go.mod h1:JwYml1nuB7xOzsp52dPpHFffvOCDupsG0QubkSMEySY=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
Expand Down Expand Up @@ -595,7 +588,6 @@ github.com/rs/zerolog v1.4.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKk
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/conswriter v0.0.0-20180208195008-f5ae3917a627/go.mod h1:7zjs06qF79/FKAJpBvFx3P8Ww4UTIMAe+lpNXDHziac=
github.com/sean-/pager v0.0.0-20180208200047-666be9bf53b5/go.mod h1:BeybITEsBEg6qbIiqJ6/Bqeq25bCLbL7YFmpaFfJDuM=
Expand Down Expand Up @@ -804,7 +796,6 @@ golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96b
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk=
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
Expand Down

0 comments on commit 669d94c

Please sign in to comment.