Skip to content

Commit

Permalink
Merge pull request #186 from d-uzlov/183/disconnect-healing
Browse files Browse the repository at this point in the history
Properly handle disconnects when watching config map updates
  • Loading branch information
denis-tingaikin authored Jan 24, 2023
2 parents 7c206b2 + 5893597 commit 8972f5e
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 10 deletions.
84 changes: 83 additions & 1 deletion internal/prefixcollector/collector_configmap_output_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
k8stest "k8s.io/client-go/testing"
)

const (
Expand Down Expand Up @@ -136,6 +137,38 @@ func (eps *ExcludedPrefixesSuite) TestConfigMapSource() {
eps.testCollectorWithConfigmapOutput(ctx, notifyChan, expectedResult, sources)
}

func (eps *ExcludedPrefixesSuite) TestConfigMapSourceRefresh() {
defer goleak.VerifyNone(eps.T(), goleak.IgnoreCurrent())
expectedResult := []string{
"168.0.0.0/10",
"1.0.0.0/11",
}
notifyChan := make(chan struct{}, 1)

ctx, cancel := context.WithCancel(prefixcollector.WithKubernetesInterface(context.Background(), eps.clientSet))
defer cancel()

clients := eps.clientSet.(*fake.Clientset)
countWatchers, stopAndDisableWatcher := interceptWatcher(clients)

sources := []prefixcollector.PrefixSource{
prefixsource.NewConfigMapPrefixSource(
ctx,
notifyChan,
userConfigMapName,
configMapNamespace,
userConfigMapKey,
),
}

eps.Eventually(func() bool { return countWatchers() == 1 }, time.Second, 10*time.Millisecond)
stopAndDisableWatcher()
eps.Eventually(func() bool { return countWatchers() == 2 }, time.Second, 10*time.Millisecond)

eps.createConfigMap(ctx, configMapNamespace, configMapPath)
eps.testCollectorWithConfigmapOutput(ctx, notifyChan, expectedResult, sources)
}

func (eps *ExcludedPrefixesSuite) TestKubeAdmConfigSource() {
defer goleak.VerifyNone(eps.T(), goleak.IgnoreCurrent())
expectedResult := []string{
Expand All @@ -156,6 +189,32 @@ func (eps *ExcludedPrefixesSuite) TestKubeAdmConfigSource() {
eps.testCollectorWithConfigmapOutput(ctx, notifyChan, expectedResult, sources)
}

func (eps *ExcludedPrefixesSuite) TestKubeAdmConfigSourceRefresh() {
defer goleak.VerifyNone(eps.T(), goleak.IgnoreCurrent())
expectedResult := []string{
"10.244.0.0/16",
"10.96.0.0/12",
}

notifyChan := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(prefixcollector.WithKubernetesInterface(context.Background(), eps.clientSet))
defer cancel()

clients := eps.clientSet.(*fake.Clientset)
countWatchers, stopAndDisableWatcher := interceptWatcher(clients)

sources := []prefixcollector.PrefixSource{
prefixsource.NewKubeAdmPrefixSource(ctx, notifyChan),
}

eps.Eventually(func() bool { return countWatchers() == 1 }, time.Second, 10*time.Millisecond)
stopAndDisableWatcher()
eps.Eventually(func() bool { return countWatchers() == 2 }, time.Second, 10*time.Millisecond)

eps.createConfigMap(ctx, prefixsource.KubeNamespace, kubeConfigMapPath)
eps.testCollectorWithConfigmapOutput(ctx, notifyChan, expectedResult, sources)
}

func (eps *ExcludedPrefixesSuite) TestKubeAdmConfigSourceIPv6() {
defer goleak.VerifyNone(eps.T(), goleak.IgnoreCurrent())
expectedResult := []string{
Expand Down Expand Up @@ -282,7 +341,11 @@ func (eps *ExcludedPrefixesSuite) watchConfigMap(ctx context.Context, maxModifyC
go func() {
for {
select {
case event := <-watcher.ResultChan():
case event, ok := <-watcher.ResultChan():
if !ok {
errorCh <- errors.New("error watching configmap")
return
}
configMap := event.Object.(*v1.ConfigMap)
if event.Type == watch.Error {
errorCh <- errors.New("error watching configmap")
Expand Down Expand Up @@ -319,3 +382,22 @@ func getConfigMap(t *testing.T, filePath string) *v1.ConfigMap {

return &destination
}

func interceptWatcher(clients *fake.Clientset) (getCount func() int, stopAndDisable func()) {
enable := true
count := 0
watcher := watch.NewFake()

var reactionFunc k8stest.WatchReactionFunc = func(action k8stest.Action) (bool, watch.Interface, error) {
count++
return enable, watcher, nil
}

clients.PrependWatchReactor("configmaps", reactionFunc)

return func() int { return count },
func() {
enable = false
watcher.Stop()
}
}
28 changes: 21 additions & 7 deletions internal/prefixcollector/prefixsource/config_map_prefix_source.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
// Copyright (c) 2020-2021 Doc.ai and/or its affiliates.
//
// Copyright (c) 2022 Cisco and/or its affiliates.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -56,7 +58,11 @@ func NewConfigMapPrefixSource(ctx context.Context, notify chan<- struct{}, name,
prefixes: utils.NewSynchronizedPrefixesContainer(),
}

go cmps.watchConfigMap()
go func() {
for cmps.ctx.Err() == nil {
cmps.watchConfigMap()
}
}()
return &cmps
}

Expand All @@ -66,28 +72,35 @@ func (cmps *ConfigMapPrefixSource) Prefixes() []string {
}

func (cmps *ConfigMapPrefixSource) watchConfigMap() {
cmps.checkCurrentConfigMap()
configMapWatch, err := cmps.configMapInterface.Watch(cmps.ctx, metav1.ListOptions{})
if err != nil {
log.FromContext(cmps.ctx).Errorf("Error creating config map watch: %v", err)
return
}

// we should check current state after we create the watcher,
// or else we could miss an update
cmps.checkCurrentConfigMap()

log.FromContext(cmps.ctx).Info("Starting watching configmaps")

for {
select {
case <-cmps.ctx.Done():
log.FromContext(cmps.ctx).Warn("Configmaps watcher context is canceled")
return
case event, ok := <-configMapWatch.ResultChan():
if !ok {
log.FromContext(cmps.ctx).Warn("Configmaps watcher is closed")
return
}

log.FromContext(cmps.ctx).Tracef("Config map event received: %v", event)

if event.Type == watch.Error {
continue
}

log.FromContext(cmps.ctx).Infof("Event received:%v", event)

configMap, ok := event.Object.(*apiV1.ConfigMap)
if !ok || configMap.Name != cmps.configMapName {
continue
Expand All @@ -96,11 +109,12 @@ func (cmps *ConfigMapPrefixSource) watchConfigMap() {
if event.Type == watch.Deleted {
cmps.prefixes.Store([]string(nil))
cmps.notify <- struct{}{}
log.FromContext(cmps.ctx).Info("Prefixes from config map deleted")
continue
}

if err = cmps.setPrefixesFromConfigMap(configMap); err != nil {
log.FromContext(cmps.ctx).Errorf("Error setting prefixes from config map:%s", configMap.Name)
log.FromContext(cmps.ctx).Errorf("Error setting prefixes from config map: %s", configMap.Name)
}
}
}
Expand All @@ -109,12 +123,12 @@ func (cmps *ConfigMapPrefixSource) watchConfigMap() {
func (cmps *ConfigMapPrefixSource) checkCurrentConfigMap() {
configMap, err := cmps.configMapInterface.Get(cmps.ctx, cmps.configMapName, metav1.GetOptions{})
if err != nil {
log.FromContext(cmps.ctx).Errorf("Error getting config map : %v", err)
log.FromContext(cmps.ctx).Errorf("Error getting config map: %v", err)
return
}

if err = cmps.setPrefixesFromConfigMap(configMap); err != nil {
log.FromContext(cmps.ctx).Errorf("Error setting prefixes from config map:%s", configMap.Name)
log.FromContext(cmps.ctx).Errorf("Error setting prefixes from config map: %s", configMap.Name)
}
}

Expand Down
16 changes: 14 additions & 2 deletions internal/prefixcollector/prefixsource/kubeadm_prefix_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,29 +66,40 @@ func NewKubeAdmPrefixSource(ctx context.Context, notify chan<- struct{}) *KubeAd
prefixes: utils.NewSynchronizedPrefixesContainer(),
}

go kaps.watchKubeAdmConfigMap()
go func() {
for kaps.ctx.Err() == nil {
kaps.watchKubeAdmConfigMap()
}
}()
return &kaps
}

func (kaps *KubeAdmPrefixSource) watchKubeAdmConfigMap() {
log.FromContext(kaps.ctx).Infof("Watch kubeadm configMap")

kaps.checkCurrentConfigMap()
configMapWatch, err := kaps.configMapInterface.Watch(kaps.ctx, metav1.ListOptions{})
if err != nil {
log.FromContext(kaps.ctx).Errorf("Error creating config map watch: %v", err)
return
}

// we should check current state after we create the watcher,
// or else we could miss an update
kaps.checkCurrentConfigMap()

for {
select {
case <-kaps.ctx.Done():
log.FromContext(kaps.ctx).Warn("kubeadm configMap context is canceled")
return
case event, ok := <-configMapWatch.ResultChan():
if !ok {
log.FromContext(kaps.ctx).Warn("kubeadm configMap watcher is closed")
return
}

log.FromContext(kaps.ctx).Tracef("kubeadm configMap event received: %v", event)

if event.Type == watch.Error {
continue
}
Expand All @@ -101,6 +112,7 @@ func (kaps *KubeAdmPrefixSource) watchKubeAdmConfigMap() {
if event.Type == watch.Deleted {
kaps.prefixes.Store([]string(nil))
kaps.notify <- struct{}{}
log.FromContext(kaps.ctx).Info("kubeadm configMap deleted")
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func NewKubernetesPrefixSource(ctx context.Context, notify chan<- struct{}) *Kub
}

func (kps *KubernetesPrefixSource) watchSubnets(clientSet kubernetes.Interface) {
log.FromContext(kps.ctx).Infof("KubernetesPrefixSource watch subnets")

podChan, err := watchPodCIDR(kps.ctx, clientSet)
if err != nil {
return
Expand All @@ -78,14 +80,17 @@ func (kps *KubernetesPrefixSource) waitForSubnets(podChan, serviceChan <-chan []
for {
select {
case <-kps.ctx.Done():
log.FromContext(kps.ctx).Warn("Watch kubeadm configMap")
return
case subnet, ok := <-podChan:
if !ok {
log.FromContext(kps.ctx).Warn("podChan watcher closed")
return
}
podPrefixes = subnet
case subnet, ok := <-serviceChan:
if !ok {
log.FromContext(kps.ctx).Warn("serviceChan closed")
return
}
svcPrefixes = subnet
Expand Down
1 change: 1 addition & 0 deletions pkg/imports/imports.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
_ "k8s.io/client-go/kubernetes"
_ "k8s.io/client-go/kubernetes/fake"
_ "k8s.io/client-go/kubernetes/typed/core/v1"
_ "k8s.io/client-go/testing"
_ "math/big"
_ "net"
_ "os"
Expand Down

0 comments on commit 8972f5e

Please sign in to comment.