Skip to content

Commit

Permalink
Merge pull request #8313 from tomastigera/tomas-bpf-proxy-healthz
Browse files Browse the repository at this point in the history
[BPF] when host IPs change kube-proxy replaces only Syncer
  • Loading branch information
tomastigera authored Dec 14, 2023
2 parents 1e0e9ca + b95dea0 commit 1c29421
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 42 deletions.
75 changes: 36 additions & 39 deletions felix/bpf/proxy/kube-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
// KubeProxy is a wrapper of Proxy that deals with higher level issue like
// configuration, restarting etc.
type KubeProxy struct {
proxy Proxy
proxy ProxyFrontend
syncer DPSyncer

ipFamily int
Expand Down Expand Up @@ -133,25 +133,42 @@ func (kp *KubeProxy) run(hostIPs []net.IP) error {
return errors.WithMessage(err, "new bpf syncer")
}

proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...)
if err != nil {
return errors.WithMessage(err, "new proxy")
}
kp.proxy.SetSyncer(syncer)

log.Infof("kube-proxy v%d started, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs)
log.Infof("kube-proxy v%d node info updated, hostname=%q hostIPs=%+v", kp.ipFamily, kp.hostname, hostIPs)

kp.proxy = proxy
kp.syncer = syncer

return nil
}

func (kp *KubeProxy) start() error {
var withLocalNP []net.IP
if kp.ipFamily == 4 {
withLocalNP = append(withLocalNP, podNPIP)
} else {
withLocalNP = append(withLocalNP, podNPIPV6)
}

syncer, err := NewSyncer(kp.ipFamily, withLocalNP, kp.frontendMap, kp.backendMap, kp.affinityMap, kp.rt)
if err != nil {
return errors.WithMessage(err, "new bpf syncer")
}

proxy, err := New(kp.k8s, syncer, kp.hostname, kp.opts...)
if err != nil {
return errors.WithMessage(err, "new proxy")
}

kp.lock.Lock()
kp.proxy = proxy
kp.syncer = syncer
kp.lock.Unlock()

// wait for the initial update
hostIPs := <-kp.hostIPUpdates

err := kp.run(hostIPs)
err = kp.run(hostIPs)
if err != nil {
return err
}
Expand All @@ -160,39 +177,19 @@ func (kp *KubeProxy) start() error {
go func() {
defer kp.wg.Done()
for {
hostIPs, ok := <-kp.hostIPUpdates
if !ok {
defer log.Error("kube-proxy stopped since hostIPUpdates closed")
kp.proxy.Stop()
return
}

stopped := make(chan struct{})

go func() {
defer close(stopped)
defer log.Info("kube-proxy stopped to restart with updated host IPs")
kp.proxy.Stop()
}()

waitforstop:
for {
select {
case hostIPs, ok = <-kp.hostIPUpdates:
if !ok {
log.Error("kube-proxy: hostIPUpdates closed")
return
}
case <-kp.exiting:
log.Info("kube-proxy: exiting")
select {
case hostIPs, ok := <-kp.hostIPUpdates:
if !ok {
log.Error("kube-proxy: hostIPUpdates closed")
return
case <-stopped:
err = kp.run(hostIPs)
if err != nil {
log.Panic("kube-proxy failed to start after host IPs update")
}
break waitforstop
}
err = kp.run(hostIPs)
if err != nil {
log.Panic("kube-proxy failed to resync after host IPs update")
}
case <-kp.exiting:
log.Info("kube-proxy: exiting")
return
}
}
}()
Expand Down
34 changes: 33 additions & 1 deletion felix/bpf/proxy/kube-proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package proxy_test

import (
"fmt"
"net"
"net/http"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
Expand All @@ -41,16 +43,20 @@ var _ = Describe("BPF kube-proxy", func() {

var p *proxy.KubeProxy

healthCheckNodePort := 1212

BeforeEach(func() {
testSvc := &v1.Service{
TypeMeta: typeMetaV1("Service"),
ObjectMeta: objectMetaV1("testService"),
Spec: v1.ServiceSpec{
ClusterIP: "10.1.0.1",
Type: v1.ServiceTypeClusterIP,
Type: v1.ServiceTypeLoadBalancer,
Selector: map[string]string{
"app": "test",
},
ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal,
HealthCheckNodePort: int32(healthCheckNodePort),
Ports: []v1.ServicePort{
{
Protocol: v1.ProtocolTCP,
Expand Down Expand Up @@ -104,6 +110,19 @@ var _ = Describe("BPF kube-proxy", func() {
}).Should(BeTrue())
})

By("checking that the healthCheckNodePort is accessible", func() {
Eventually(func() error {
result, err := http.Get(fmt.Sprintf("http://localhost:%d", healthCheckNodePort))
if err != nil {
return err
}
if result.StatusCode != 503 {
return fmt.Errorf("Unexpected status code %d; expected 503", result.StatusCode)
}
return nil
}, "5s", "200ms").Should(Succeed())
})

By("checking nodeport has the updated IP and not the initial IP", func() {
updatedIP := net.IPv4(2, 2, 2, 2)
p.OnHostIPsUpdate([]net.IP{updatedIP})
Expand All @@ -124,6 +143,19 @@ var _ = Describe("BPF kube-proxy", func() {
}).Should(BeTrue())
})

By("checking that the healthCheckNodePort is still accessible", func() {
Eventually(func() error {
result, err := http.Get(fmt.Sprintf("http://localhost:%d", healthCheckNodePort))
if err != nil {
return err
}
if result.StatusCode != 503 {
return fmt.Errorf("Unexpected status code %d; expected 503", result.StatusCode)
}
return nil
}, "5s", "200ms").Should(Succeed())
})

By("checking nodeport has 2 updated IPs", func() {
ip1 := net.IPv4(3, 3, 3, 3)
ip2 := net.IPv4(4, 4, 4, 4)
Expand Down
24 changes: 22 additions & 2 deletions felix/bpf/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type Proxy interface {
setIpFamily(int)
}

type ProxyFrontend interface {
Proxy
SetSyncer(DPSyncer)
}

// DPSyncerState groups the information passed to the DPSyncer's Apply
type DPSyncerState struct {
SvcMap k8sp.ServicePortMap
Expand Down Expand Up @@ -83,7 +88,8 @@ type proxy struct {
svcMap k8sp.ServicePortMap
epsMap k8sp.EndpointsMap

dpSyncer DPSyncer
dpSyncer DPSyncer
syncerLck sync.Mutex
// executes periodic the dataplane updates
runner *async.BoundedFrequencyRunner
// ensures that only one invocation runs at any time
Expand All @@ -110,7 +116,7 @@ type stoppableRunner interface {
}

// New returns a new Proxy for the given k8s interface
func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option) (Proxy, error) {
func New(k8s kubernetes.Interface, dp DPSyncer, hostname string, opts ...Option) (ProxyFrontend, error) {

if k8s == nil {
return nil, errors.Errorf("no k8s client")
Expand Down Expand Up @@ -214,6 +220,9 @@ func (p *proxy) setIpFamily(ipFamily int) {
func (p *proxy) Stop() {
p.stopOnce.Do(func() {
log.Info("Proxy stopping")
// Pass empty update to close all the health checks.
_ = p.svcHealthServer.SyncServices(map[types.NamespacedName]uint16{})
_ = p.svcHealthServer.SyncEndpoints(map[types.NamespacedName]int{})
p.dpSyncer.Stop()
close(p.stopCh)
p.stopWg.Wait()
Expand Down Expand Up @@ -255,11 +264,13 @@ func (p *proxy) invokeDPSyncer() {
log.WithError(err).Error("Error syncing healthcheck endpoints")
}

p.syncerLck.Lock()
err := p.dpSyncer.Apply(DPSyncerState{
SvcMap: p.svcMap,
EpsMap: p.epsMap,
NodeZone: p.nodeZone,
})
p.syncerLck.Unlock()

if err != nil {
log.WithError(err).Errorf("applying changes failed")
Expand Down Expand Up @@ -314,6 +325,15 @@ func (p *proxy) OnEndpointSlicesSynced() {
p.forceSyncDP()
}

func (p *proxy) SetSyncer(s DPSyncer) {
p.syncerLck.Lock()
p.dpSyncer.Stop()
p.dpSyncer = s
p.syncerLck.Unlock()

p.forceSyncDP()
}

type initState struct {
lck sync.RWMutex
svcsSynced bool
Expand Down

0 comments on commit 1c29421

Please sign in to comment.