From f50dc97a311854c4b686fffaf544e68c1abea61c Mon Sep 17 00:00:00 2001 From: yodigos Date: Wed, 17 Jul 2024 14:05:58 +0300 Subject: [PATCH] fix: rate limit node collector restarts on config change (#1357) --- .../controllers/datacollection/configmap.go | 16 +++- .../datacollection/configmap_test.go | 4 +- .../controllers/datacollection/daemonset.go | 79 ++++++++++++++----- autoscaler/controllers/datacollection/root.go | 17 ++-- k8sutils/pkg/consts/consts.go | 3 +- k8sutils/pkg/env/env.go | 17 +++- 6 files changed, 102 insertions(+), 34 deletions(-) diff --git a/autoscaler/controllers/datacollection/configmap.go b/autoscaler/controllers/datacollection/configmap.go index 2e00998aa..46b37dda0 100644 --- a/autoscaler/controllers/datacollection/configmap.go +++ b/autoscaler/controllers/datacollection/configmap.go @@ -12,6 +12,7 @@ import ( odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common" "github.com/odigos-io/odigos/common/config" + constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts" "github.com/odigos-io/odigos/k8sutils/pkg/env" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -38,11 +39,11 @@ func syncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D setTracesLoadBalancer := SamplingExists != nil desired, err := getDesiredConfigMap(apps, dests, processors, datacollection, scheme, setTracesLoadBalancer) - desiredData := desired.Data[configKey] if err != nil { logger.Error(err, "failed to get desired config map") return "", err } + desiredData := desired.Data[configKey] existing := &v1.ConfigMap{} if err := c.Get(ctx, client.ObjectKey{Namespace: datacollection.Namespace, Name: datacollection.Name}, existing); err != nil { @@ -97,7 +98,7 @@ func createConfigMap(desired *v1.ConfigMap, ctx context.Context, c client.Client func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, datacollection *odigosv1.CollectorsGroup, scheme *runtime.Scheme, setTracesLoadBalancer bool) (*v1.ConfigMap, error) { - cmData, err := getConfigMapData(apps, dests, processors, setTracesLoadBalancer) + cmData, err := calculateConfigMapData(apps, dests, processors, setTracesLoadBalancer) if err != nil { return nil, err } @@ -123,7 +124,7 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig return &desired, nil } -func getConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, +func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor, setTracesLoadBalancer bool) (string, error) { empty := struct{}{} @@ -276,3 +277,12 @@ func getConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv return string(data), nil } + +func getConfigMap(ctx context.Context, c client.Client, namespace string) (*v1.ConfigMap, error) { + configMap := &v1.ConfigMap{} + if err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: constsK8s.OdigosNodeCollectorConfigMapName}, configMap); err != nil { + return nil, err + } + + return configMap, nil +} diff --git a/autoscaler/controllers/datacollection/configmap_test.go b/autoscaler/controllers/datacollection/configmap_test.go index 044aef7b5..832ed4bfb 100644 --- a/autoscaler/controllers/datacollection/configmap_test.go +++ b/autoscaler/controllers/datacollection/configmap_test.go @@ -112,7 +112,7 @@ func openTestData(t *testing.T, path string) string { return string(want) } -func TestGetConfigMapData(t *testing.T) { +func TestCalculateConfigMapData(t *testing.T) { want := openTestData(t, "testdata/logs_included.yaml") ns := NewMockNamespace("default") @@ -125,7 +125,7 @@ func TestGetConfigMapData(t *testing.T) { *NewMockInstrumentedApplicationWoOwner(NewMockTestDeployment(ns2)), } - got, err := getConfigMapData( + got, err := calculateConfigMapData( &v1alpha1.InstrumentedApplicationList{ Items: items, }, diff --git a/autoscaler/controllers/datacollection/daemonset.go b/autoscaler/controllers/datacollection/daemonset.go index 5158eda56..d41e7a30b 100644 --- a/autoscaler/controllers/datacollection/daemonset.go +++ b/autoscaler/controllers/datacollection/daemonset.go @@ -3,23 +3,22 @@ package datacollection import ( "context" "fmt" - - "github.com/odigos-io/odigos/autoscaler/utils" - "github.com/odigos-io/odigos/k8sutils/pkg/consts" - - "github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom" - "k8s.io/apimachinery/pkg/util/intstr" - odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "github.com/odigos-io/odigos/autoscaler/controllers/common" + "github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom" + "github.com/odigos-io/odigos/autoscaler/utils" + "github.com/odigos-io/odigos/k8sutils/pkg/consts" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/log" + "sync" + "time" ) const ( @@ -38,17 +37,45 @@ var ( } ) -func getOdigletDaemonsetPodSpec(ctx context.Context, c client.Client, namespace string) (*corev1.PodSpec, error) { - odigletDaemonset := &appsv1.DaemonSet{} +type DelayManager struct { + mu sync.Mutex + inProgress bool +} - if err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: odigletDaemonSetName}, odigletDaemonset); err != nil { - return nil, err +// RunSyncDaemonSetWithDelayAndSkipNewCalls runs the function with the specified delay and skips new calls until the function execution is finished +func (dm *DelayManager) RunSyncDaemonSetWithDelayAndSkipNewCalls(delay time.Duration, retries int, dests *odigosv1.DestinationList, collection *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, secrets []string, version string) { + dm.mu.Lock() + defer dm.mu.Unlock() + + // Skip new calls if the function is already in progress + if dm.inProgress { + return } - return &odigletDaemonset.Spec.Template.Spec, nil + dm.inProgress = true + + // Finish the function execution after the delay + time.AfterFunc(delay, func() { + dm.mu.Lock() + defer dm.mu.Unlock() + defer dm.finishProgress() + var err error + + for i := 0; i < retries; i++ { + _, err = syncDaemonSet(ctx, dests, collection, c, scheme, secrets, version) + if err == nil { + return + } + } + log.FromContext(ctx).Error(err, "Failed to sync DaemonSet") + }) +} + +func (dm *DelayManager) finishProgress() { + dm.inProgress = false } -func syncDaemonSet(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, datacollection *odigosv1.CollectorsGroup, configData string, ctx context.Context, +func syncDaemonSet(ctx context.Context, dests *odigosv1.DestinationList, datacollection *odigosv1.CollectorsGroup, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) (*appsv1.DaemonSet, error) { logger := log.FromContext(ctx) @@ -58,7 +85,13 @@ func syncDaemonSet(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D return nil, err } - desiredDs, err := getDesiredDaemonSet(datacollection, configData, scheme, imagePullSecrets, odigosVersion, odigletDaemonsetPodSpec) + configMap, err := getConfigMap(ctx, c, datacollection.Namespace) + if err != nil { + logger.Error(err, "Failed to get Config Map data") + return nil, err + } + + desiredDs, err := getDesiredDaemonSet(datacollection, configMap.String(), scheme, imagePullSecrets, odigosVersion, odigletDaemonsetPodSpec) if err != nil { logger.Error(err, "Failed to get desired DaemonSet") return nil, err @@ -93,6 +126,16 @@ func syncDaemonSet(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D return updated, nil } +func getOdigletDaemonsetPodSpec(ctx context.Context, c client.Client, namespace string) (*corev1.PodSpec, error) { + odigletDaemonset := &appsv1.DaemonSet{} + + if err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: odigletDaemonSetName}, odigletDaemonset); err != nil { + return nil, err + } + + return &odigletDaemonset.Spec.Template.Spec, nil +} + func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData string, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string, odigletDaemonsetPodSpec *corev1.PodSpec, @@ -260,6 +303,10 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st return desiredDs, nil } +func boolPtr(b bool) *bool { + return &b +} + func patchDaemonSet(existing *appsv1.DaemonSet, desired *appsv1.DaemonSet, ctx context.Context, c client.Client) (*appsv1.DaemonSet, error) { updated := existing.DeepCopy() if updated.Annotations == nil { @@ -285,7 +332,3 @@ func patchDaemonSet(existing *appsv1.DaemonSet, desired *appsv1.DaemonSet, ctx c return updated, nil } - -func boolPtr(b bool) *bool { - return &b -} diff --git a/autoscaler/controllers/datacollection/root.go b/autoscaler/controllers/datacollection/root.go index 95058094b..5b9044ee9 100644 --- a/autoscaler/controllers/datacollection/root.go +++ b/autoscaler/controllers/datacollection/root.go @@ -2,6 +2,8 @@ package datacollection import ( "context" + "github.com/odigos-io/odigos/k8sutils/pkg/env" + "time" odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1" "k8s.io/apimachinery/pkg/runtime" @@ -9,6 +11,12 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" ) +var dm = &DelayManager{} + +const ( + syncDaemonsetRetry = 3 +) + func Sync(ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) error { logger := log.FromContext(ctx) var collectorGroups odigosv1.CollectorsGroupList @@ -57,18 +65,13 @@ func syncDataCollection(instApps *odigosv1.InstrumentedApplicationList, dests *o logger := log.FromContext(ctx) logger.V(0).Info("Syncing data collection") - configData, err := syncConfigMap(instApps, dests, processors, dataCollection, ctx, c, scheme) + _, err := syncConfigMap(instApps, dests, processors, dataCollection, ctx, c, scheme) if err != nil { logger.Error(err, "Failed to sync config map") return err } - _, err = syncDaemonSet(instApps, dests, dataCollection, configData, ctx, c, scheme, imagePullSecrets, odigosVersion) - if err != nil { - logger.Error(err, "Failed to sync daemon set") - return err - } + dm.RunSyncDaemonSetWithDelayAndSkipNewCalls(time.Duration(env.GetSyncDaemonSetDelay())*time.Second, syncDaemonsetRetry, dests, dataCollection, ctx, c, scheme, imagePullSecrets, odigosVersion) return nil } - diff --git a/k8sutils/pkg/consts/consts.go b/k8sutils/pkg/consts/consts.go index b65e708ee..840d54f9f 100644 --- a/k8sutils/pkg/consts/consts.go +++ b/k8sutils/pkg/consts/consts.go @@ -5,4 +5,5 @@ const ( OdigosClusterCollectorConfigMapName = OdigosClusterCollectorDeploymentName OdigosClusterCollectorServiceName = OdigosClusterCollectorDeploymentName OdigosNodeCollectorDaemonSetName = "odigos-data-collection" -) \ No newline at end of file + OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName +) diff --git a/k8sutils/pkg/env/env.go b/k8sutils/pkg/env/env.go index 73f95923b..54ba9df68 100644 --- a/k8sutils/pkg/env/env.go +++ b/k8sutils/pkg/env/env.go @@ -1,16 +1,17 @@ package env import ( + "github.com/odigos-io/odigos/common/consts" "os" "path/filepath" - - "github.com/odigos-io/odigos/common/consts" + "strconv" "k8s.io/client-go/util/homedir" ) const ( - KUBECONFIG = "KUBECONFIG" + KUBECONFIG = "KUBECONFIG" + SYNC_DAEMONSET_DELAY_IN_SECONDS = "SYNC_DAEMONSET_DELAY_IN_SECONDS" ) func getEnvVarOrDefault(envKey string, defaultVal string) string { @@ -37,3 +38,13 @@ func GetDefaultKubeConfigPath() string { } return "" } + +func GetSyncDaemonSetDelay() int { + delay := getEnvVarOrDefault(SYNC_DAEMONSET_DELAY_IN_SECONDS, "5") + delayValue, err := strconv.Atoi(delay) + if err != nil { + return 5 + } + + return delayValue +}