From 68d36cc4539f245667e7e405b5ac9f3c3f0f3aa9 Mon Sep 17 00:00:00 2001 From: Amir Blum Date: Wed, 15 Nov 2023 10:47:28 +0200 Subject: [PATCH 1/2] refactor(odiglet): make ebpf instrumentation api for any language --- odiglet/cmd/main.go | 20 ++++++-- odiglet/pkg/ebpf/director.go | 95 ++-------------------------------- odiglet/pkg/ebpf/go.go | 99 ++++++++++++++++++++++++++++++++++++ odiglet/pkg/kube/manager.go | 9 ++-- odiglet/pkg/kube/pods.go | 99 +++++++++++++----------------------- odiglet/pkg/kube/shared.go | 7 ++- 6 files changed, 163 insertions(+), 166 deletions(-) create mode 100644 odiglet/pkg/ebpf/go.go diff --git a/odiglet/cmd/main.go b/odiglet/cmd/main.go index 2108774ea..dd9bd7a0a 100644 --- a/odiglet/cmd/main.go +++ b/odiglet/cmd/main.go @@ -4,6 +4,7 @@ import ( "context" "os" + "github.com/keyval-dev/odigos/common" "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" "github.com/keyval-dev/odigos/odiglet/pkg/env" "github.com/keyval-dev/odigos/odiglet/pkg/instrumentation" @@ -38,7 +39,7 @@ func main() { os.Exit(-1) } - ebpfDirector, err := initEbpf() + ebpfDirectors, err := initEbpf() if err != nil { log.Logger.Error(err, "Failed to init eBPF director") os.Exit(-1) @@ -46,14 +47,16 @@ func main() { go startDeviceManager(clientset) - ctx, err := kube.StartReconciling(ebpfDirector) + ctx, err := kube.StartReconciling(ebpfDirectors) if err != nil { log.Logger.Error(err, "Failed to start reconciling") os.Exit(-1) } <-ctx.Done() - ebpfDirector.Shutdown() + for _, director := range ebpfDirectors { + director.Shutdown() + } } func startDeviceManager(clientset *kubernetes.Clientset) { @@ -71,6 +74,13 @@ func startDeviceManager(clientset *kubernetes.Clientset) { manager.Run() } -func initEbpf() (ebpf.Director, error) { - return ebpf.NewInstrumentationDirector() +func initEbpf() (map[common.ProgrammingLanguage]ebpf.Director, error) { + goDirector, err := ebpf.NewInstrumentationDirectorGo() + if err != nil { + return nil, err + } + + return map[common.ProgrammingLanguage]ebpf.Director{ + common.GoProgrammingLanguage: goDirector, + }, nil } diff --git a/odiglet/pkg/ebpf/director.go b/odiglet/pkg/ebpf/director.go index 71abaf2c6..d0ca50c80 100644 --- a/odiglet/pkg/ebpf/director.go +++ b/odiglet/pkg/ebpf/director.go @@ -1,106 +1,17 @@ package ebpf import ( - "context" "errors" - "fmt" - "os" - "sync" - "github.com/keyval-dev/odigos/odiglet/pkg/env" - - "github.com/keyval-dev/odigos/common/consts" - - "go.opentelemetry.io/auto" - - "github.com/keyval-dev/odigos/odiglet/pkg/log" + "github.com/keyval-dev/odigos/common" "k8s.io/apimachinery/pkg/types" ) -var ErrProcInstrumented = errors.New("process already instrumented") +var ErrProcInstrumented = errors.New("process already instrumented") type Director interface { + Language() common.ProgrammingLanguage Instrument(pid int, podDetails types.NamespacedName, appName string) error Cleanup(podDetails types.NamespacedName) Shutdown() } - -type InstrumentationDirector struct { - mux sync.Mutex - pidsToInstrumentation map[int]*auto.Instrumentation - podDetailsToPids map[types.NamespacedName][]int -} - -func NewInstrumentationDirector() (*InstrumentationDirector, error) { - err := os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("http://%s:%d", env.Current.NodeIP, consts.OTLPPort)) - if err != nil { - return nil, err - } - - return &InstrumentationDirector{ - pidsToInstrumentation: make(map[int]*auto.Instrumentation), - podDetailsToPids: make(map[types.NamespacedName][]int), - }, nil -} - -func (i *InstrumentationDirector) Instrument(pid int, podDetails types.NamespacedName, appName string) error { - log.Logger.V(0).Info("Instrumenting process", "pid", pid) - i.mux.Lock() - defer i.mux.Unlock() - if _, exists := i.pidsToInstrumentation[pid]; exists { - log.Logger.V(5).Info("Process already instrumented", "pid", pid) - return ErrProcInstrumented - } - - go func() { - inst, err := auto.NewInstrumentation(auto.WithPID(pid), auto.WithServiceName(appName)) - if err != nil { - log.Logger.Error(err, "instrumentation setup failed") - return - } - - i.pidsToInstrumentation[pid] = inst - i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid) - - if err := inst.Run(context.Background()); err != nil { - log.Logger.Error(err, "instrumentation crashed after running") - } - } () - - return nil -} - -func (i *InstrumentationDirector) Cleanup(podDetails types.NamespacedName) { - i.mux.Lock() - defer i.mux.Unlock() - pids, exists := i.podDetailsToPids[podDetails] - if !exists { - log.Logger.V(5).Info("No processes to cleanup for pod", "pod", podDetails) - return - } - - log.Logger.V(0).Info("Cleaning up instrumentation for pod", "pod", podDetails) - delete(i.podDetailsToPids, podDetails) - for _, pid := range pids { - inst, exists := i.pidsToInstrumentation[pid] - if !exists { - log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid) - continue - } - - delete(i.pidsToInstrumentation, pid) - go func() { - err := inst.Close() - if err != nil { - log.Logger.Error(err, "error cleaning up objects for process", "pid", pid) - } - }() - } -} - -func (i *InstrumentationDirector) Shutdown() { - log.Logger.V(0).Info("Shutting down instrumentation director") - for details := range i.podDetailsToPids { - i.Cleanup(details) - } -} diff --git a/odiglet/pkg/ebpf/go.go b/odiglet/pkg/ebpf/go.go new file mode 100644 index 000000000..33e288923 --- /dev/null +++ b/odiglet/pkg/ebpf/go.go @@ -0,0 +1,99 @@ +package ebpf + +import ( + "context" + "fmt" + "os" + "sync" + + "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/odiglet/pkg/env" + "github.com/keyval-dev/odigos/odiglet/pkg/instrumentation/consts" + "github.com/keyval-dev/odigos/odiglet/pkg/log" + "go.opentelemetry.io/auto" + "k8s.io/apimachinery/pkg/types" +) + +type InstrumentationDirectorGo struct { + mux sync.Mutex + pidsToInstrumentation map[int]*auto.Instrumentation + podDetailsToPids map[types.NamespacedName][]int +} + +func NewInstrumentationDirectorGo() (Director, error) { + err := os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("http://%s:%d", env.Current.NodeIP, consts.OTLPPort)) + if err != nil { + return nil, err + } + + return &InstrumentationDirectorGo{ + pidsToInstrumentation: make(map[int]*auto.Instrumentation), + podDetailsToPids: make(map[types.NamespacedName][]int), + }, nil +} + +func (i *InstrumentationDirectorGo) Language() common.ProgrammingLanguage { + return common.GoProgrammingLanguage +} + +func (i *InstrumentationDirectorGo) Instrument(pid int, podDetails types.NamespacedName, appName string) error { + log.Logger.V(0).Info("Instrumenting process", "pid", pid) + i.mux.Lock() + defer i.mux.Unlock() + if _, exists := i.pidsToInstrumentation[pid]; exists { + log.Logger.V(5).Info("Process already instrumented", "pid", pid) + return ErrProcInstrumented + } + + go func() { + inst, err := auto.NewInstrumentation(auto.WithPID(pid), auto.WithServiceName(appName)) + if err != nil { + log.Logger.Error(err, "instrumentation setup failed") + return + } + + i.pidsToInstrumentation[pid] = inst + i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid) + + if err := inst.Run(context.Background()); err != nil { + log.Logger.Error(err, "instrumentation crashed after running") + } + }() + + return nil +} + +func (i *InstrumentationDirectorGo) Cleanup(podDetails types.NamespacedName) { + i.mux.Lock() + defer i.mux.Unlock() + pids, exists := i.podDetailsToPids[podDetails] + if !exists { + log.Logger.V(5).Info("No processes to cleanup for pod", "pod", podDetails) + return + } + + log.Logger.V(0).Info("Cleaning up instrumentation for pod", "pod", podDetails) + delete(i.podDetailsToPids, podDetails) + for _, pid := range pids { + inst, exists := i.pidsToInstrumentation[pid] + if !exists { + log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid) + continue + } + + delete(i.pidsToInstrumentation, pid) + go func() { + err := inst.Close() + if err != nil { + log.Logger.Error(err, "error cleaning up objects for process", "pid", pid) + } + }() + } +} + +func (i *InstrumentationDirectorGo) Shutdown() { + log.Logger.V(0).Info("Shutting down instrumentation director") + for details := range i.podDetailsToPids { + i.Cleanup(details) + } +} diff --git a/odiglet/pkg/kube/manager.go b/odiglet/pkg/kube/manager.go index 5098b15a9..7ec73a017 100644 --- a/odiglet/pkg/kube/manager.go +++ b/odiglet/pkg/kube/manager.go @@ -6,6 +6,7 @@ import ( metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" + "github.com/keyval-dev/odigos/common" "github.com/keyval-dev/odigos/common/consts" "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" "github.com/keyval-dev/odigos/odiglet/pkg/log" @@ -32,7 +33,7 @@ func init() { utilruntime.Must(odigosv1.AddToScheme(scheme)) } -func StartReconciling(ebpfDirector ebpf.Director) (context.Context, error) { +func StartReconciling(ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director) (context.Context, error) { log.Logger.V(0).Info("Starting reconcileres") ctrl.SetLogger(log.Logger) mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{ @@ -100,9 +101,9 @@ func StartReconciling(ebpfDirector ebpf.Director) (context.Context, error) { ControllerManagedBy(mgr). For(&corev1.Pod{}). Complete(&PodsReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Director: ebpfDirector, + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Directors: ebpfDirectors, }) if err != nil { return nil, err diff --git a/odiglet/pkg/kube/pods.go b/odiglet/pkg/kube/pods.go index bf9a52051..bd4c9d090 100644 --- a/odiglet/pkg/kube/pods.go +++ b/odiglet/pkg/kube/pods.go @@ -8,7 +8,6 @@ import ( "github.com/keyval-dev/odigos/common" "github.com/keyval-dev/odigos/common/utils" "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" - "github.com/keyval-dev/odigos/odiglet/pkg/env" "github.com/keyval-dev/odigos/odiglet/pkg/process" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -22,8 +21,8 @@ import ( type PodsReconciler struct { client.Client - Scheme *runtime.Scheme - Director ebpf.Director + Scheme *runtime.Scheme + Directors map[common.ProgrammingLanguage]ebpf.Director } func (p *PodsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { @@ -32,7 +31,7 @@ func (p *PodsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c err := p.Client.Get(ctx, request.NamespacedName, &pod) if err != nil { if apierrors.IsNotFound(err) { - p.Director.Cleanup(request.NamespacedName) + p.cleanup(request.NamespacedName) return ctrl.Result{}, nil } @@ -40,103 +39,75 @@ func (p *PodsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (c return ctrl.Result{}, err } - if !p.shouldInstrument(&pod) { + if !isPodInThisNode(&pod) { return ctrl.Result{}, nil } if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { logger.Info("pod is not running, removing instrumentation") - p.Director.Cleanup(request.NamespacedName) + p.cleanup(request.NamespacedName) return ctrl.Result{}, nil } if pod.Status.Phase == corev1.PodRunning { - err = p.instrument(ctx, &pod) - if err != nil { - logger.Error(err, "error instrumenting pod") - return ctrl.Result{}, err - } + p.attemptEbpfInstrument(ctx, &pod) } return ctrl.Result{}, nil } -// shouldInstrument returns true if the pod should be instrumented. -// A pod should be instrumented if: -// - it is running -// - it is scheduled on the same node as the odiglet -// - it has instrumentation.odigos.io/go device attached -func (p *PodsReconciler) shouldInstrument(pod *corev1.Pod) bool { - return pod.Spec.NodeName == env.Current.NodeName && hasInstrumentationDevice(pod) +func (p *PodsReconciler) cleanup(name types.NamespacedName) { + // cleanup using all available directors + // the Cleanup method is idempotent, so no harm in calling it multiple times + for _, director := range p.Directors { + director.Cleanup(name) + } } -func (p *PodsReconciler) instrument(ctx context.Context, pod *corev1.Pod) error { +func (p *PodsReconciler) attemptEbpfInstrument(ctx context.Context, pod *corev1.Pod) error { logger := log.FromContext(ctx) - containers, ownerName, err := p.findAllGoContainers(ctx, pod) + runtimeDetails, err := p.getRuntimeDetails(ctx, pod) if err != nil { if apierrors.IsNotFound(err) { // Probably shutdown in progress, cleanup will be done as soon as the pod object is deleted return nil } - - logger.Error(err, "error finding go containers") return err } - pids := make(map[int]string) - for _, c := range containers { - details, err := process.FindAllInContainer(string(pod.UID), c) - if err != nil { - logger.Error(err, "error finding processes") - return err + podUid := string(pod.UID) + for _, container := range runtimeDetails.Spec.Languages { + director := p.Directors[container.Language] + if director == nil { + // this language is not instrumented with eBPF + continue } - for _, d := range details { - appName := c - if ownerName != "" { - appName = ownerName - } - pids[d.ProcessID] = appName + appName := container.ContainerName + if len(runtimeDetails.Spec.Languages) == 1 && len(runtimeDetails.OwnerReferences) > 0 { + appName = runtimeDetails.OwnerReferences[0].Name } - } - - if len(pids) == 0 { - // Probably shutdown in progress, cleanup will be done as soon as the pod object is deleted - return nil - } - - for pid, appName := range pids { - err = p.Director.Instrument(pid, types.NamespacedName{ - Namespace: pod.Namespace, - Name: pod.Name, - }, appName) + details, err := process.FindAllInContainer(podUid, container.ContainerName) if err != nil { - logger.Error(err, "error instrumenting process", "pid", pid) + logger.Error(err, "error finding processes") return err } - } - return nil -} -func (p *PodsReconciler) findAllGoContainers(ctx context.Context, pod *corev1.Pod) ([]string, string, error) { - runtimeDetails, err := p.getRuntimeDetails(ctx, pod) - if err != nil { - return nil, "", err - } + for _, d := range details { + err = director.Instrument(d.ProcessID, types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + }, appName) - var containers []string - for _, container := range runtimeDetails.Spec.Languages { - if container.Language == common.GoProgrammingLanguage { - containers = append(containers, container.ContainerName) + if err != nil { + logger.Error(err, "error instrumenting process", "pid", d.ProcessID) + return err + } } } - ownerName := "" - if len(runtimeDetails.Spec.Languages) == 1 && len(runtimeDetails.OwnerReferences) > 0 { - ownerName = runtimeDetails.OwnerReferences[0].Name - } - return containers, ownerName, nil + return nil } func (p *PodsReconciler) getRuntimeDetails(ctx context.Context, pod *corev1.Pod) (*odigosv1.InstrumentedApplication, error) { diff --git a/odiglet/pkg/kube/shared.go b/odiglet/pkg/kube/shared.go index 8567ab609..e8bf2ec7d 100644 --- a/odiglet/pkg/kube/shared.go +++ b/odiglet/pkg/kube/shared.go @@ -2,6 +2,7 @@ package kube import ( "context" + "github.com/go-logr/logr" odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" "github.com/keyval-dev/odigos/common" @@ -119,7 +120,7 @@ func getRunningPods(ctx context.Context, labels map[string]string, ns string, ku var filteredPods []corev1.Pod for _, pod := range podList.Items { - if pod.Spec.NodeName == env.Current.NodeName && pod.Status.Phase == corev1.PodRunning { + if isPodInThisNode(&pod) && pod.Status.Phase == corev1.PodRunning { filteredPods = append(filteredPods, pod) } } @@ -130,3 +131,7 @@ func getRunningPods(ctx context.Context, labels map[string]string, ns string, ku return filteredPods, nil } + +func isPodInThisNode(pod *corev1.Pod) bool { + return pod.Spec.NodeName == env.Current.NodeName +} From 77a9dbc1a913471dd5acf8b7a88bd2eb5455b26e Mon Sep 17 00:00:00 2001 From: Amir Blum Date: Wed, 15 Nov 2023 11:38:45 +0200 Subject: [PATCH 2/2] fix: instrument ebpf only after device is set --- odiglet/pkg/kube/pods.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/odiglet/pkg/kube/pods.go b/odiglet/pkg/kube/pods.go index bd4c9d090..eaf324785 100644 --- a/odiglet/pkg/kube/pods.go +++ b/odiglet/pkg/kube/pods.go @@ -83,6 +83,12 @@ func (p *PodsReconciler) attemptEbpfInstrument(ctx context.Context, pod *corev1. continue } + // this check currently only works for Go. + // it is temporary and will be replaced with annotation in a future PR + if !hasInstrumentationDevice(pod) { + continue + } + appName := container.ContainerName if len(runtimeDetails.Spec.Languages) == 1 && len(runtimeDetails.OwnerReferences) > 0 { appName = runtimeDetails.OwnerReferences[0].Name