-
Notifications
You must be signed in to change notification settings - Fork 707
/
service_control.go
147 lines (126 loc) · 4.99 KB
/
service_control.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.
package common
import (
"context"
"reflect"
"go.elastic.co/apm/v2"
corev1 "k8s.io/api/core/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/reconciler"
"github.com/elastic/cloud-on-k8s/v2/pkg/controller/common/tracing"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/compare"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/k8s"
"github.com/elastic/cloud-on-k8s/v2/pkg/utils/maps"
)
func ReconcileService(
ctx context.Context,
c k8s.Client,
expected *corev1.Service,
owner client.Object,
) (*corev1.Service, error) {
span, _ := apm.StartSpan(ctx, "reconcile_service", tracing.SpanTypeApp)
defer span.End()
reconciled := &corev1.Service{}
err := reconciler.ReconcileResource(reconciler.Params{
Context: ctx,
Client: c,
Owner: owner,
Expected: expected,
Reconciled: reconciled,
NeedsRecreate: func() bool {
return needsRecreate(expected, reconciled)
},
NeedsUpdate: func() bool {
return needsUpdate(expected, reconciled)
},
UpdateReconciled: func() {
reconciled.Annotations = expected.Annotations
reconciled.Labels = expected.Labels
reconciled.Spec = expected.Spec
},
})
return reconciled, err
}
func needsRecreate(expected, reconciled *corev1.Service) bool {
applyServerSideValues(expected, reconciled)
// IPFamilies is immutable
if expected.Spec.IPFamilies != nil {
if len(expected.Spec.IPFamilies) != len(reconciled.Spec.IPFamilies) {
return true
}
for i := 0; i < len(expected.Spec.IPFamilies); i++ {
if expected.Spec.IPFamilies[i] != reconciled.Spec.IPFamilies[i] {
return true
}
}
}
// ClusterIP is immutable
if expected.Spec.ClusterIP != reconciled.Spec.ClusterIP {
return true
}
return false
}
func needsUpdate(expected *corev1.Service, reconciled *corev1.Service) bool {
applyServerSideValues(expected, reconciled)
// if the specs, labels, or annotations differ, the object should be updated
return !(reflect.DeepEqual(expected.Spec, reconciled.Spec) &&
compare.LabelsAndAnnotationsAreEqual(expected.ObjectMeta, reconciled.ObjectMeta))
}
// applyServerSideValues applies any default that may have been set from the reconciled version.
func applyServerSideValues(expected, reconciled *corev1.Service) {
// Type may be defaulted by the api server
if expected.Spec.Type == "" {
expected.Spec.Type = reconciled.Spec.Type
}
// ClusterIPs might not exist in the expected service,
// but might have been set after creation by k8s on the actual resource.
// In such case, we want to use these values for comparison.
// But only if we are not changing the type of service and the api server has assigned an IP
if expected.Spec.Type == reconciled.Spec.Type {
if expected.Spec.ClusterIP == "" {
expected.Spec.ClusterIP = reconciled.Spec.ClusterIP
}
if len(expected.Spec.ClusterIPs) == 0 {
expected.Spec.ClusterIPs = reconciled.Spec.ClusterIPs
}
}
// SessionAffinity may be defaulted by the api server
if expected.Spec.SessionAffinity == "" {
expected.Spec.SessionAffinity = reconciled.Spec.SessionAffinity
}
// same for the target port and node port
if len(expected.Spec.Ports) == len(reconciled.Spec.Ports) {
for i := range expected.Spec.Ports {
if expected.Spec.Ports[i].TargetPort.IntValue() == 0 {
expected.Spec.Ports[i].TargetPort = reconciled.Spec.Ports[i].TargetPort
}
// check if NodePort makes sense for this service type
if hasNodePort(expected.Spec.Type) && expected.Spec.Ports[i].NodePort == 0 {
expected.Spec.Ports[i].NodePort = reconciled.Spec.Ports[i].NodePort
}
}
}
if expected.Spec.HealthCheckNodePort == 0 {
expected.Spec.HealthCheckNodePort = reconciled.Spec.HealthCheckNodePort
}
expected.Annotations = maps.MergePreservingExistingKeys(expected.Annotations, reconciled.Annotations)
expected.Labels = maps.MergePreservingExistingKeys(expected.Labels, reconciled.Labels)
// IPFamily is immutable and cannot be modified so we should retain the existing value from the server if there's no explicit override.
if expected.Spec.IPFamilies == nil {
expected.Spec.IPFamilies = reconciled.Spec.IPFamilies
}
// IPFamilyPolicy is immutable and cannot be modified so we should retain the existing value from the server if there's no explicit override.
if expected.Spec.IPFamilyPolicy == nil {
expected.Spec.IPFamilyPolicy = reconciled.Spec.IPFamilyPolicy
}
// InternalTrafficPolicy may be defaulted by the api server starting K8S v1.22
if expected.Spec.InternalTrafficPolicy == nil {
expected.Spec.InternalTrafficPolicy = reconciled.Spec.InternalTrafficPolicy
}
}
// hasNodePort returns for a given service type, if the service ports have a NodePort or not.
func hasNodePort(svcType corev1.ServiceType) bool {
return svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer
}