Skip to content

Commit

Permalink
fix: rate limit node collector restarts on config change (#1357)
Browse files Browse the repository at this point in the history
  • Loading branch information
yodigos authored Jul 17, 2024
1 parent 7e79b36 commit f50dc97
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 34 deletions.
16 changes: 13 additions & 3 deletions autoscaler/controllers/datacollection/configmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
commonconf "github.com/odigos-io/odigos/autoscaler/controllers/common"
"github.com/odigos-io/odigos/common/config"
constsK8s "github.com/odigos-io/odigos/k8sutils/pkg/consts"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -38,11 +39,11 @@ func syncConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D
setTracesLoadBalancer := SamplingExists != nil

desired, err := getDesiredConfigMap(apps, dests, processors, datacollection, scheme, setTracesLoadBalancer)
desiredData := desired.Data[configKey]
if err != nil {
logger.Error(err, "failed to get desired config map")
return "", err
}
desiredData := desired.Data[configKey]

existing := &v1.ConfigMap{}
if err := c.Get(ctx, client.ObjectKey{Namespace: datacollection.Namespace, Name: datacollection.Name}, existing); err != nil {
Expand Down Expand Up @@ -97,7 +98,7 @@ func createConfigMap(desired *v1.ConfigMap, ctx context.Context, c client.Client

func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
datacollection *odigosv1.CollectorsGroup, scheme *runtime.Scheme, setTracesLoadBalancer bool) (*v1.ConfigMap, error) {
cmData, err := getConfigMapData(apps, dests, processors, setTracesLoadBalancer)
cmData, err := calculateConfigMapData(apps, dests, processors, setTracesLoadBalancer)
if err != nil {
return nil, err
}
Expand All @@ -123,7 +124,7 @@ func getDesiredConfigMap(apps *odigosv1.InstrumentedApplicationList, dests *odig
return &desired, nil
}

func getConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
func calculateConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, processors []*odigosv1.Processor,
setTracesLoadBalancer bool) (string, error) {

empty := struct{}{}
Expand Down Expand Up @@ -276,3 +277,12 @@ func getConfigMapData(apps *odigosv1.InstrumentedApplicationList, dests *odigosv

return string(data), nil
}

func getConfigMap(ctx context.Context, c client.Client, namespace string) (*v1.ConfigMap, error) {
configMap := &v1.ConfigMap{}
if err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: constsK8s.OdigosNodeCollectorConfigMapName}, configMap); err != nil {
return nil, err
}

return configMap, nil
}
4 changes: 2 additions & 2 deletions autoscaler/controllers/datacollection/configmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func openTestData(t *testing.T, path string) string {
return string(want)
}

func TestGetConfigMapData(t *testing.T) {
func TestCalculateConfigMapData(t *testing.T) {
want := openTestData(t, "testdata/logs_included.yaml")

ns := NewMockNamespace("default")
Expand All @@ -125,7 +125,7 @@ func TestGetConfigMapData(t *testing.T) {
*NewMockInstrumentedApplicationWoOwner(NewMockTestDeployment(ns2)),
}

got, err := getConfigMapData(
got, err := calculateConfigMapData(
&v1alpha1.InstrumentedApplicationList{
Items: items,
},
Expand Down
79 changes: 61 additions & 18 deletions autoscaler/controllers/datacollection/daemonset.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,23 +3,22 @@ package datacollection
import (
"context"
"fmt"

"github.com/odigos-io/odigos/autoscaler/utils"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"

"github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom"
"k8s.io/apimachinery/pkg/util/intstr"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"github.com/odigos-io/odigos/autoscaler/controllers/common"
"github.com/odigos-io/odigos/autoscaler/controllers/datacollection/custom"
"github.com/odigos-io/odigos/autoscaler/utils"
"github.com/odigos-io/odigos/k8sutils/pkg/consts"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
"sync"
"time"
)

const (
Expand All @@ -38,17 +37,45 @@ var (
}
)

func getOdigletDaemonsetPodSpec(ctx context.Context, c client.Client, namespace string) (*corev1.PodSpec, error) {
odigletDaemonset := &appsv1.DaemonSet{}
type DelayManager struct {
mu sync.Mutex
inProgress bool
}

if err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: odigletDaemonSetName}, odigletDaemonset); err != nil {
return nil, err
// RunSyncDaemonSetWithDelayAndSkipNewCalls runs the function with the specified delay and skips new calls until the function execution is finished
func (dm *DelayManager) RunSyncDaemonSetWithDelayAndSkipNewCalls(delay time.Duration, retries int, dests *odigosv1.DestinationList, collection *odigosv1.CollectorsGroup, ctx context.Context, c client.Client, scheme *runtime.Scheme, secrets []string, version string) {
dm.mu.Lock()
defer dm.mu.Unlock()

// Skip new calls if the function is already in progress
if dm.inProgress {
return
}

return &odigletDaemonset.Spec.Template.Spec, nil
dm.inProgress = true

// Finish the function execution after the delay
time.AfterFunc(delay, func() {
dm.mu.Lock()
defer dm.mu.Unlock()
defer dm.finishProgress()
var err error

for i := 0; i < retries; i++ {
_, err = syncDaemonSet(ctx, dests, collection, c, scheme, secrets, version)
if err == nil {
return
}
}
log.FromContext(ctx).Error(err, "Failed to sync DaemonSet")
})
}

func (dm *DelayManager) finishProgress() {
dm.inProgress = false
}

func syncDaemonSet(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.DestinationList, datacollection *odigosv1.CollectorsGroup, configData string, ctx context.Context,
func syncDaemonSet(ctx context.Context, dests *odigosv1.DestinationList, datacollection *odigosv1.CollectorsGroup,
c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) (*appsv1.DaemonSet, error) {
logger := log.FromContext(ctx)

Expand All @@ -58,7 +85,13 @@ func syncDaemonSet(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D
return nil, err
}

desiredDs, err := getDesiredDaemonSet(datacollection, configData, scheme, imagePullSecrets, odigosVersion, odigletDaemonsetPodSpec)
configMap, err := getConfigMap(ctx, c, datacollection.Namespace)
if err != nil {
logger.Error(err, "Failed to get Config Map data")
return nil, err
}

desiredDs, err := getDesiredDaemonSet(datacollection, configMap.String(), scheme, imagePullSecrets, odigosVersion, odigletDaemonsetPodSpec)
if err != nil {
logger.Error(err, "Failed to get desired DaemonSet")
return nil, err
Expand Down Expand Up @@ -93,6 +126,16 @@ func syncDaemonSet(apps *odigosv1.InstrumentedApplicationList, dests *odigosv1.D
return updated, nil
}

func getOdigletDaemonsetPodSpec(ctx context.Context, c client.Client, namespace string) (*corev1.PodSpec, error) {
odigletDaemonset := &appsv1.DaemonSet{}

if err := c.Get(ctx, client.ObjectKey{Namespace: namespace, Name: odigletDaemonSetName}, odigletDaemonset); err != nil {
return nil, err
}

return &odigletDaemonset.Spec.Template.Spec, nil
}

func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData string,
scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string,
odigletDaemonsetPodSpec *corev1.PodSpec,
Expand Down Expand Up @@ -260,6 +303,10 @@ func getDesiredDaemonSet(datacollection *odigosv1.CollectorsGroup, configData st
return desiredDs, nil
}

func boolPtr(b bool) *bool {
return &b
}

func patchDaemonSet(existing *appsv1.DaemonSet, desired *appsv1.DaemonSet, ctx context.Context, c client.Client) (*appsv1.DaemonSet, error) {
updated := existing.DeepCopy()
if updated.Annotations == nil {
Expand All @@ -285,7 +332,3 @@ func patchDaemonSet(existing *appsv1.DaemonSet, desired *appsv1.DaemonSet, ctx c

return updated, nil
}

func boolPtr(b bool) *bool {
return &b
}
17 changes: 10 additions & 7 deletions autoscaler/controllers/datacollection/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,21 @@ package datacollection

import (
"context"
"github.com/odigos-io/odigos/k8sutils/pkg/env"
"time"

odigosv1 "github.com/odigos-io/odigos/api/odigos/v1alpha1"
"k8s.io/apimachinery/pkg/runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
)

var dm = &DelayManager{}

const (
syncDaemonsetRetry = 3
)

func Sync(ctx context.Context, c client.Client, scheme *runtime.Scheme, imagePullSecrets []string, odigosVersion string) error {
logger := log.FromContext(ctx)
var collectorGroups odigosv1.CollectorsGroupList
Expand Down Expand Up @@ -57,18 +65,13 @@ func syncDataCollection(instApps *odigosv1.InstrumentedApplicationList, dests *o
logger := log.FromContext(ctx)
logger.V(0).Info("Syncing data collection")

configData, err := syncConfigMap(instApps, dests, processors, dataCollection, ctx, c, scheme)
_, err := syncConfigMap(instApps, dests, processors, dataCollection, ctx, c, scheme)
if err != nil {
logger.Error(err, "Failed to sync config map")
return err
}

_, err = syncDaemonSet(instApps, dests, dataCollection, configData, ctx, c, scheme, imagePullSecrets, odigosVersion)
if err != nil {
logger.Error(err, "Failed to sync daemon set")
return err
}
dm.RunSyncDaemonSetWithDelayAndSkipNewCalls(time.Duration(env.GetSyncDaemonSetDelay())*time.Second, syncDaemonsetRetry, dests, dataCollection, ctx, c, scheme, imagePullSecrets, odigosVersion)

return nil
}

3 changes: 2 additions & 1 deletion k8sutils/pkg/consts/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ const (
OdigosClusterCollectorConfigMapName = OdigosClusterCollectorDeploymentName
OdigosClusterCollectorServiceName = OdigosClusterCollectorDeploymentName
OdigosNodeCollectorDaemonSetName = "odigos-data-collection"
)
OdigosNodeCollectorConfigMapName = OdigosNodeCollectorDaemonSetName
)
17 changes: 14 additions & 3 deletions k8sutils/pkg/env/env.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
package env

import (
"github.com/odigos-io/odigos/common/consts"
"os"
"path/filepath"

"github.com/odigos-io/odigos/common/consts"
"strconv"

"k8s.io/client-go/util/homedir"
)

const (
KUBECONFIG = "KUBECONFIG"
KUBECONFIG = "KUBECONFIG"
SYNC_DAEMONSET_DELAY_IN_SECONDS = "SYNC_DAEMONSET_DELAY_IN_SECONDS"
)

func getEnvVarOrDefault(envKey string, defaultVal string) string {
Expand All @@ -37,3 +38,13 @@ func GetDefaultKubeConfigPath() string {
}
return ""
}

func GetSyncDaemonSetDelay() int {
delay := getEnvVarOrDefault(SYNC_DAEMONSET_DELAY_IN_SECONDS, "5")
delayValue, err := strconv.Atoi(delay)
if err != nil {
return 5
}

return delayValue
}

0 comments on commit f50dc97

Please sign in to comment.