diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 41a0c6795d..7ac3747b80 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -6,9 +6,11 @@ import ( "sync" storkv1alpha1 "github.com/libopenstorage/stork/pkg/apis/stork/v1alpha1" + "github.com/sirupsen/logrus" corev1 "k8s.io/api/core/v1" storagev1 "k8s.io/api/storage/v1" "k8s.io/client-go/rest" + clientCache "k8s.io/client-go/tools/cache" controllercache "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -36,6 +38,9 @@ type SharedInformerCache interface { // ListTransformedPods lists the all the Pods from the cache after applying TransformFunc ListTransformedPods() (*corev1.PodList, error) + + // WatchPods registers the pod event handlers with the informer cache + WatchPods(fn func(object interface{})) error } type cache struct { @@ -84,6 +89,8 @@ func CreateSharedInformerCache(mgr manager.Manager) error { } } + currPod.ObjectMeta.DeletionTimestamp = podResource.ObjectMeta.DeletionTimestamp + currPod.Spec.Containers = podResource.Spec.Containers currPod.Spec.NodeName = podResource.Spec.NodeName @@ -204,3 +211,23 @@ func (c *cache) ListTransformedPods() (*corev1.PodList, error) { } return podList, nil } + +// WatchPods uses handlers for different pod events with shared informers. +func (c *cache) WatchPods(fn func(object interface{})) error { + informer, err := c.controllerCache.GetInformer(context.Background(), &corev1.Pod{}) + if err != nil { + logrus.WithError(err).Error("error getting the informer for pods") + return err + } + + informer.AddEventHandler(clientCache.ResourceEventHandlerFuncs{ + AddFunc: fn, + UpdateFunc: func(oldObj, newObj interface{}) { + // Only considering the new pod object + fn(newObj) + }, + DeleteFunc: fn, + }) + + return nil +} diff --git a/pkg/extender/extender.go b/pkg/extender/extender.go index 6a9021251d..cdf090ea71 100644 --- a/pkg/extender/extender.go +++ b/pkg/extender/extender.go @@ -11,6 +11,7 @@ import ( "time" "github.com/libopenstorage/stork/drivers/volume" + storkcache "github.com/libopenstorage/stork/pkg/cache" storklog "github.com/libopenstorage/stork/pkg/log" restore "github.com/libopenstorage/stork/pkg/snapshot/controllers" "github.com/portworx/sched-ops/k8s/core" @@ -483,9 +484,28 @@ func (e *Extender) collectExtenderMetrics() error { return nil } - if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil { - log.Errorf("failed to watch pods due to: %v", err) - return err + podHandler := func(object interface{}) { + pod, ok := object.(*v1.Pod) + if !ok { + log.Errorf("invalid object type on pod watch from cache: %v", object) + } else { + fn(pod) + } + } + + if storkcache.Instance() != nil { + log.Debugf("Shared informer cache has been initialized, using it for extender metrics.") + err := storkcache.Instance().WatchPods(podHandler) + if err != nil { + log.Errorf("failed to watch pods with informer cache for health monitoring, err: %v", err) + } + log.Errorf("failed to watch pods with informer cache for metrics, err: %v", err) + } else { + log.Warnf("Shared informer cache has not been initialized, using watch for extender metrics.") + if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil { + log.Errorf("failed to watch pods for metrics due to: %v", err) + return err + } } return nil } diff --git a/pkg/mock/cache/cache.mock.go b/pkg/mock/cache/cache.mock.go index 0b7b44e227..7bdeccc8dd 100644 --- a/pkg/mock/cache/cache.mock.go +++ b/pkg/mock/cache/cache.mock.go @@ -125,3 +125,17 @@ func (mr *MockSharedInformerCacheMockRecorder) ListTransformedPods() *gomock.Cal mr.mock.ctrl.T.Helper() return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListTransformedPods", reflect.TypeOf((*MockSharedInformerCache)(nil).ListTransformedPods)) } + +// WatchPods mocks base method. +func (m *MockSharedInformerCache) WatchPods(arg0 func(interface{})) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "WatchPods", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// WatchPods indicates an expected call of WatchPods. +func (mr *MockSharedInformerCacheMockRecorder) WatchPods(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WatchPods", reflect.TypeOf((*MockSharedInformerCache)(nil).WatchPods), arg0) +} diff --git a/pkg/monitor/monitor.go b/pkg/monitor/monitor.go index 1ce9873038..668abdd26a 100644 --- a/pkg/monitor/monitor.go +++ b/pkg/monitor/monitor.go @@ -183,9 +183,27 @@ func (m *Monitor) podMonitor() error { return nil } - if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil { - log.Errorf("failed to watch pods due to: %v", err) - return err + podHandler := func(object interface{}) { + pod, ok := object.(*v1.Pod) + if !ok { + log.Errorf("invalid object type on pod watch from cache: %v", object) + } else { + fn(pod) + } + } + + if storkcache.Instance() != nil { + log.Debugf("Shared informer cache has been initialized, using it for pod monitor.") + err := storkcache.Instance().WatchPods(podHandler) + if err != nil { + log.Errorf("failed to watch pods with informer cache for health monitoring, err: %v", err) + } + } else { + log.Warnf("Shared informer cache has not been initialized, using watch for pod monitor.") + if err := core.Instance().WatchPods("", fn, metav1.ListOptions{}); err != nil { + log.Errorf("failed to watch pods for health monitoring due to: %v", err) + return err + } } return nil