diff --git a/common/consts/consts.go b/common/consts/consts.go index d66635355..e852b4e8f 100644 --- a/common/consts/consts.go +++ b/common/consts/consts.go @@ -13,6 +13,7 @@ const ( InstrumentationDisabled = "disabled" GolangInstrumentationImage = "keyval/otel-go-agent:v0.6.5" OdigosReportedNameAnnotation = "odigos.io/reported-name" + EbpfInstrumentationAnnotation = "instrumentation.odigos.io/ebpf" ) var ( diff --git a/common/device_names.go b/common/device_names.go index 5972d753c..a1a15b80c 100644 --- a/common/device_names.go +++ b/common/device_names.go @@ -5,7 +5,6 @@ type OdigosInstrumentationDevice string const ( JavaDeviceName OdigosInstrumentationDevice = "instrumentation.odigos.io/java" PythonDeviceName OdigosInstrumentationDevice = "instrumentation.odigos.io/python" - GoDeviceName OdigosInstrumentationDevice = "instrumentation.odigos.io/go" DotNetDeviceName OdigosInstrumentationDevice = "instrumentation.odigos.io/dotnet" JavascriptDeviceName OdigosInstrumentationDevice = "instrumentation.odigos.io/javascript" ) @@ -13,7 +12,6 @@ const ( var InstrumentationDevices = []OdigosInstrumentationDevice{ JavaDeviceName, PythonDeviceName, - GoDeviceName, DotNetDeviceName, JavascriptDeviceName, } @@ -24,8 +22,6 @@ func ProgrammingLanguageToInstrumentationDevice(language ProgrammingLanguage) Od return JavaDeviceName case PythonProgrammingLanguage: return PythonDeviceName - case GoProgrammingLanguage: - return GoDeviceName case DotNetProgrammingLanguage: return DotNetDeviceName case JavascriptProgrammingLanguage: diff --git a/instrumentor/controllers/common.go b/instrumentor/controllers/common.go index 672ef03d7..52e69ec2a 100644 --- a/instrumentor/controllers/common.go +++ b/instrumentor/controllers/common.go @@ -7,6 +7,7 @@ import ( "github.com/go-logr/logr" odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" + "github.com/keyval-dev/odigos/common" "github.com/keyval-dev/odigos/common/consts" "github.com/keyval-dev/odigos/common/utils" "github.com/keyval-dev/odigos/instrumentor/instrumentation" @@ -25,6 +26,36 @@ var ( IgnoredNamespaces map[string]bool ) +// shouldInstrumentWithEbpf returns true if the given runtime details should be delegated to odiglet for ebpf instrumentation +// This is currently hardcoded. In the future we will read this from a config +func shouldInstrumentWithEbpf(runtimeDetails *odigosv1.InstrumentedApplication) bool { + for _, l := range runtimeDetails.Spec.Languages { + if l.Language == common.GoProgrammingLanguage { + return true + } + } + + return false +} + +func setInstrumentationEbpf(obj client.Object) { + annotations := obj.GetAnnotations() + if annotations == nil { + annotations = make(map[string]string) + } + + annotations[consts.EbpfInstrumentationAnnotation] = "true" +} + +func clearInstrumentationEbpf(obj client.Object) { + annotations := obj.GetAnnotations() + if annotations == nil { + return + } + + delete(annotations, consts.EbpfInstrumentationAnnotation) +} + func isDataCollectionReady(ctx context.Context, c client.Client) bool { logger := log.FromContext(ctx) var collectorGroups odigosv1.CollectorsGroupList @@ -50,6 +81,11 @@ func instrument(logger logr.Logger, ctx context.Context, kubeClient client.Clien } result, err := controllerutil.CreateOrPatch(ctx, kubeClient, obj, func() error { + if shouldInstrumentWithEbpf(runtimeDetails) { + setInstrumentationEbpf(obj) + return nil + } + podSpec, err := getPodSpecFromObject(obj) if err != nil { return err @@ -90,6 +126,7 @@ func uninstrument(logger logr.Logger, ctx context.Context, kubeClient client.Cli } result, err := controllerutil.CreateOrPatch(ctx, kubeClient, obj, func() error { + clearInstrumentationEbpf(obj) podSpec, err := getPodSpecFromObject(obj) if err != nil { return err diff --git a/odiglet/cmd/main.go b/odiglet/cmd/main.go index dd9bd7a0a..26fc213dc 100644 --- a/odiglet/cmd/main.go +++ b/odiglet/cmd/main.go @@ -13,6 +13,7 @@ import ( "github.com/kubevirt/device-plugin-manager/pkg/dpm" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" + "sigs.k8s.io/controller-runtime/pkg/manager/signals" ) func main() { @@ -47,9 +48,11 @@ func main() { go startDeviceManager(clientset) - ctx, err := kube.StartReconciling(ebpfDirectors) + ctx := signals.SetupSignalHandler() + + err = kube.StartReconciling(ctx, ebpfDirectors) if err != nil { - log.Logger.Error(err, "Failed to start reconciling") + log.Logger.Error(err, "Failed to start controller-runtime manager") os.Exit(-1) } diff --git a/odiglet/pkg/ebpf/director.go b/odiglet/pkg/ebpf/director.go index 7fc0ba184..fff5694a4 100644 --- a/odiglet/pkg/ebpf/director.go +++ b/odiglet/pkg/ebpf/director.go @@ -2,14 +2,11 @@ package ebpf import ( "context" - "errors" "github.com/keyval-dev/odigos/common" "k8s.io/apimachinery/pkg/types" ) -var ErrProcInstrumented = errors.New("process already instrumented") - type Director interface { Language() common.ProgrammingLanguage Instrument(ctx context.Context, pid int, podDetails types.NamespacedName, appName string) error diff --git a/odiglet/pkg/ebpf/go.go b/odiglet/pkg/ebpf/go.go index 09a63050e..13679f6eb 100644 --- a/odiglet/pkg/ebpf/go.go +++ b/odiglet/pkg/ebpf/go.go @@ -15,15 +15,26 @@ import ( ) type InstrumentationDirectorGo struct { - mux sync.Mutex + mux sync.Mutex + + // this map holds the instrumentation object which is used to close the instrumentation + // the map is filled only after the instrumentation is actually created + // which is an asyn process that might take some time pidsToInstrumentation map[int]*auto.Instrumentation - podDetailsToPids map[types.NamespacedName][]int + + // this map is used to make sure we do not attempt to instrument the same process twice. + // it keeps track of which processes we already attempted to instrument, + // so we can avoid attempting to instrument them again. + pidsAttemptedInstrumentation map[int]struct{} + + podDetailsToPids map[types.NamespacedName][]int } func NewInstrumentationDirectorGo() (Director, error) { return &InstrumentationDirectorGo{ - pidsToInstrumentation: make(map[int]*auto.Instrumentation), - podDetailsToPids: make(map[types.NamespacedName][]int), + pidsToInstrumentation: make(map[int]*auto.Instrumentation), + pidsAttemptedInstrumentation: make(map[int]struct{}), + podDetailsToPids: make(map[types.NamespacedName][]int), }, nil } @@ -35,10 +46,12 @@ func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod log.Logger.V(0).Info("Instrumenting process", "pid", pid) i.mux.Lock() defer i.mux.Unlock() - if _, exists := i.pidsToInstrumentation[pid]; exists { + if _, exists := i.pidsAttemptedInstrumentation[pid]; exists { log.Logger.V(5).Info("Process already instrumented", "pid", pid) - return ErrProcInstrumented + return nil } + i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid) + i.pidsAttemptedInstrumentation[pid] = struct{}{} defaultExporter, err := otlptracegrpc.New( ctx, @@ -62,8 +75,21 @@ func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod return } - i.pidsToInstrumentation[pid] = inst - i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid) + i.mux.Lock() + _, stillExists := i.pidsAttemptedInstrumentation[pid] + if stillExists { + i.pidsToInstrumentation[pid] = inst + i.mux.Unlock() + } else { + i.mux.Unlock() + // we attempted to instrument this process, but it was already cleaned up + // so we need to clean up the instrumentation we just created + err = inst.Close() + if err != nil { + log.Logger.Error(err, "error cleaning up instrumentation for process", "pid", pid) + } + return + } if err := inst.Run(context.Background()); err != nil { log.Logger.Error(err, "instrumentation crashed after running") @@ -82,9 +108,11 @@ func (i *InstrumentationDirectorGo) Cleanup(podDetails types.NamespacedName) { return } - log.Logger.V(0).Info("Cleaning up instrumentation for pod", "pod", podDetails) + log.Logger.V(0).Info("Cleaning up ebpf go instrumentation for pod", "pod", podDetails) delete(i.podDetailsToPids, podDetails) for _, pid := range pids { + delete(i.pidsAttemptedInstrumentation, pid) + inst, exists := i.pidsToInstrumentation[pid] if !exists { log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid) diff --git a/odiglet/pkg/kube/instrumentation_ebpf/daemonsets.go b/odiglet/pkg/kube/instrumentation_ebpf/daemonsets.go new file mode 100644 index 000000000..a94c6d410 --- /dev/null +++ b/odiglet/pkg/kube/instrumentation_ebpf/daemonsets.go @@ -0,0 +1,27 @@ +package instrumentation_ebpf + +import ( + "context" + + "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type DaemonSetsReconciler struct { + client.Client + Scheme *runtime.Scheme + Directors map[common.ProgrammingLanguage]ebpf.Director +} + +func (d *DaemonSetsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + err := ApplyEbpfToPodWorkload(ctx, d.Client, d.Directors, &PodWorkload{ + Name: request.Name, + Namespace: request.Namespace, + Kind: "DaemonSet", + }) + + return ctrl.Result{}, err +} diff --git a/odiglet/pkg/kube/instrumentation_ebpf/deployments.go b/odiglet/pkg/kube/instrumentation_ebpf/deployments.go new file mode 100644 index 000000000..d367e2105 --- /dev/null +++ b/odiglet/pkg/kube/instrumentation_ebpf/deployments.go @@ -0,0 +1,27 @@ +package instrumentation_ebpf + +import ( + "context" + + "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type DeploymentsReconciler struct { + client.Client + Scheme *runtime.Scheme + Directors map[common.ProgrammingLanguage]ebpf.Director +} + +func (d *DeploymentsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + err := ApplyEbpfToPodWorkload(ctx, d.Client, d.Directors, &PodWorkload{ + Name: request.Name, + Namespace: request.Namespace, + Kind: "Deployment", + }) + + return ctrl.Result{}, err +} diff --git a/odiglet/pkg/kube/instrumentation_ebpf/manager.go b/odiglet/pkg/kube/instrumentation_ebpf/manager.go new file mode 100644 index 000000000..2d06dab18 --- /dev/null +++ b/odiglet/pkg/kube/instrumentation_ebpf/manager.go @@ -0,0 +1,128 @@ +package instrumentation_ebpf + +import ( + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/predicate" + + "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" + "github.com/keyval-dev/odigos/odiglet/pkg/log" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" +) + +type podPredicate struct { + predicate.Funcs +} + +func (i *podPredicate) Create(e event.CreateEvent) bool { + // when it is created, it is not running yet + return false +} + +func (i *podPredicate) Update(e event.UpdateEvent) bool { + // Cast old and new objects to *corev1.Pod + oldPod, oldOk := e.ObjectOld.(*corev1.Pod) + newPod, newOk := e.ObjectNew.(*corev1.Pod) + + // Check if both old and new objects are Pods + if !oldOk || !newOk { + return false + } + + // Check if the Pod status has changed from not running to running + if oldPod.Status.Phase != corev1.PodRunning && newPod.Status.Phase == corev1.PodRunning { + return true + } + + return false +} + +func (i *podPredicate) Delete(e event.DeleteEvent) bool { + return true +} + +func (i *podPredicate) Generic(e event.GenericEvent) bool { + return false +} + +type workloadPredicate struct { + predicate.Funcs +} + +func (i *workloadPredicate) Create(e event.CreateEvent) bool { + return true +} + +func (i *workloadPredicate) Update(e event.UpdateEvent) bool { + return hasEbpfInstrumentationAnnotation(e.ObjectNew) != hasEbpfInstrumentationAnnotation(e.ObjectOld) +} + +func (i *workloadPredicate) Delete(e event.DeleteEvent) bool { + return true +} + +func (i *workloadPredicate) Generic(e event.GenericEvent) bool { + return false +} + +func SetupWithManager(mgr ctrl.Manager, ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director) error { + + log.Logger.V(0).Info("Starting reconcileres for ebpf instrumentation") + + err := builder. + ControllerManagedBy(mgr). + For(&corev1.Pod{}). + WithEventFilter(&podPredicate{}). + Complete(&PodsReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Directors: ebpfDirectors, + }) + if err != nil { + return err + } + + err = builder. + ControllerManagedBy(mgr). + For(&appsv1.Deployment{}). + WithEventFilter(&workloadPredicate{}). + Complete(&DeploymentsReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Directors: ebpfDirectors, + }) + if err != nil { + return err + } + + err = builder. + ControllerManagedBy(mgr). + For(&appsv1.DaemonSet{}). + WithEventFilter(&workloadPredicate{}). + Complete(&DaemonSetsReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Directors: ebpfDirectors, + }) + if err != nil { + return err + } + + err = builder. + ControllerManagedBy(mgr). + For(&appsv1.StatefulSet{}). + WithEventFilter(&workloadPredicate{}). + Complete(&StatefulSetsReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + Directors: ebpfDirectors, + }) + if err != nil { + return err + } + + return nil +} diff --git a/odiglet/pkg/kube/instrumentation_ebpf/pods.go b/odiglet/pkg/kube/instrumentation_ebpf/pods.go new file mode 100644 index 000000000..5fc32542a --- /dev/null +++ b/odiglet/pkg/kube/instrumentation_ebpf/pods.go @@ -0,0 +1,129 @@ +package instrumentation_ebpf + +import ( + "context" + "errors" + + "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" + kubeutils "github.com/keyval-dev/odigos/odiglet/pkg/kube/utils" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +type PodsReconciler struct { + client.Client + Scheme *runtime.Scheme + Directors map[common.ProgrammingLanguage]ebpf.Director +} + +func (p *PodsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + logger := log.FromContext(ctx) + var pod corev1.Pod + err := p.Client.Get(ctx, request.NamespacedName, &pod) + if err != nil { + if apierrors.IsNotFound(err) { + cleanupEbpf(p.Directors, request.NamespacedName) + return ctrl.Result{}, nil + } + + logger.Error(err, "error fetching pod object") + return ctrl.Result{}, err + } + + if !kubeutils.IsPodInCurrentNode(&pod) { + return ctrl.Result{}, nil + } + + if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { + logger.Info("pod is not running, removing instrumentation") + cleanupEbpf(p.Directors, request.NamespacedName) + return ctrl.Result{}, nil + } + + podWorkload, err := p.getPodWorkloadObject(ctx, &pod) + if err != nil { + logger.Error(err, "error getting pod workload object") + return ctrl.Result{}, err + } + if podWorkload == nil { + // pod is not managed by a controller + return ctrl.Result{}, nil + } + + shouldBeEbpfInstrumented, _, err := isEbpfInstrumented(ctx, p.Client, podWorkload) + if err != nil { + logger.Error(err, "error checking if pod should be ebpf instrumented") + return ctrl.Result{}, err + } + if !shouldBeEbpfInstrumented { + cleanupEbpf(p.Directors, request.NamespacedName) + return ctrl.Result{}, nil + } + + if pod.Status.Phase == corev1.PodRunning { + err := p.instrumentWithEbpf(ctx, &pod, podWorkload) + if err != nil { + logger.Error(err, "error instrumenting pod") + return ctrl.Result{}, err + } + } + + return ctrl.Result{}, nil +} + +func (p *PodsReconciler) instrumentWithEbpf(ctx context.Context, pod *corev1.Pod, podWorkload *PodWorkload) error { + runtimeDetails, err := getRuntimeDetails(ctx, p.Client, podWorkload) + if err != nil { + if apierrors.IsNotFound(err) { + // Probably shutdown in progress, cleanup will be done as soon as the pod object is deleted + return nil + } + return err + } + + return instrumentPodWithEbpf(ctx, pod, p.Directors, runtimeDetails) +} + +func (p *PodsReconciler) getPodWorkloadObject(ctx context.Context, pod *corev1.Pod) (*PodWorkload, error) { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "ReplicaSet" { + var rs appsv1.ReplicaSet + err := p.Client.Get(ctx, client.ObjectKey{ + Namespace: pod.Namespace, + Name: owner.Name, + }, &rs) + if err != nil { + return nil, err + } + + if rs.OwnerReferences == nil { + return nil, errors.New("replicaset has no owner reference") + } + + for _, rsOwner := range rs.OwnerReferences { + if rsOwner.Kind == "Deployment" || rsOwner.Kind == "DaemonSet" || rsOwner.Kind == "StatefulSet" { + return &PodWorkload{ + Name: rsOwner.Name, + Namespace: pod.Namespace, + Kind: rsOwner.Kind, + }, nil + } + } + } else if owner.Kind == "DaemonSet" || owner.Kind == "Deployment" || owner.Kind == "StatefulSet" { + return &PodWorkload{ + Name: owner.Name, + Namespace: pod.Namespace, + Kind: owner.Kind, + }, nil + } + } + + // Pod does not necessarily have to be managed by a controller + return nil, nil +} diff --git a/odiglet/pkg/kube/instrumentation_ebpf/shared.go b/odiglet/pkg/kube/instrumentation_ebpf/shared.go new file mode 100644 index 000000000..6d9137222 --- /dev/null +++ b/odiglet/pkg/kube/instrumentation_ebpf/shared.go @@ -0,0 +1,167 @@ +package instrumentation_ebpf + +import ( + "context" + "errors" + + odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" + "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/common/utils" + "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" + kubeutils "github.com/keyval-dev/odigos/odiglet/pkg/kube/utils" + "github.com/keyval-dev/odigos/odiglet/pkg/process" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" +) + +// PodWorkload represents the higher-level controller managing a specific Pod within a Kubernetes cluster. +// It contains essential details about the controller such as its Name, Namespace, and Kind. +// 'Kind' refers to the type of controller, which can be a Deployment, StatefulSet, or DaemonSet. +// This struct is useful for identifying and interacting with the overarching entity +// that governs the lifecycle and behavior of a Pod, especially in contexts where +// understanding the relationship between a Pod and its controlling workload is crucial. +type PodWorkload struct { + Name string + Namespace string + Kind string +} + +func ApplyEbpfToPodWorkload(ctx context.Context, kubeClient client.Client, directors map[common.ProgrammingLanguage]ebpf.Director, podWorkload *PodWorkload) error { + logger := log.FromContext(ctx) + ebpfInstrumented, matchLabels, err := isEbpfInstrumented(ctx, kubeClient, podWorkload) + if err != nil { + logger.Error(err, "error checking if pod is ebpf instrumented") + return err + } + + pods, err := kubeutils.GetRunningPods(ctx, matchLabels, podWorkload.Namespace, kubeClient) + if err != nil { + logger.Error(err, "error fetching running pods") + return err + } + if len(pods) == 0 { + return nil + } + + if !ebpfInstrumented { + for _, pod := range pods { + cleanupEbpf(directors, types.NamespacedName{ + Namespace: podWorkload.Namespace, + Name: pod.Name, + }) + } + return nil + } + + runtimeDetails, err := getRuntimeDetails(ctx, kubeClient, podWorkload) + if err != nil { + if apierrors.IsNotFound(err) { + // Probably shutdown in progress, cleanup will be done as soon as the pod object is deleted + return nil + } + return err + } + + logger.Info("instrumenting with ebpf", "kind", podWorkload.Kind, "name", podWorkload.Name, "namespace", podWorkload.Namespace, "numPods", len(pods)) + for _, pod := range pods { + err = instrumentPodWithEbpf(ctx, &pod, directors, runtimeDetails) + if err != nil { + logger.Error(err, "error instrumenting pod") + return err + } + } + + return nil +} + +func cleanupEbpf(directors map[common.ProgrammingLanguage]ebpf.Director, name types.NamespacedName) { + // cleanup using all available directors + // the Cleanup method is idempotent, so no harm in calling it multiple times + for _, director := range directors { + director.Cleanup(name) + } +} + +func instrumentPodWithEbpf(ctx context.Context, pod *corev1.Pod, directors map[common.ProgrammingLanguage]ebpf.Director, runtimeDetails *odigosv1.InstrumentedApplication) error { + logger := log.FromContext(ctx) + podUid := string(pod.UID) + for _, container := range runtimeDetails.Spec.Languages { + + director := directors[container.Language] + if director == nil { + return errors.New("no director found for language " + string(container.Language)) + } + + appName := container.ContainerName + if len(runtimeDetails.Spec.Languages) == 1 && len(runtimeDetails.OwnerReferences) > 0 { + appName = runtimeDetails.OwnerReferences[0].Name + } + + details, err := process.FindAllInContainer(podUid, container.ContainerName) + if err != nil { + logger.Error(err, "error finding processes") + return err + } + + for _, d := range details { + err = director.Instrument(ctx, d.ProcessID, types.NamespacedName{ + Namespace: pod.Namespace, + Name: pod.Name, + }, appName) + + if err != nil { + logger.Error(err, "error instrumenting process", "pid", d.ProcessID) + return err + } + } + } + return nil +} + +func isEbpfInstrumented(ctx context.Context, kubeClient client.Client, podWorkload *PodWorkload) (bool, map[string]string, error) { + // TODO: this is better done with a dynamic client + switch podWorkload.Kind { + case "Deployment": + var dep appsv1.Deployment + err := kubeClient.Get(ctx, client.ObjectKey{ + Namespace: podWorkload.Namespace, + Name: podWorkload.Name, + }, &dep) + return hasEbpfInstrumentationAnnotation(&dep), dep.Spec.Selector.MatchLabels, err + case "DaemonSet": + var ds appsv1.DaemonSet + err := kubeClient.Get(ctx, client.ObjectKey{ + Namespace: podWorkload.Namespace, + Name: podWorkload.Name, + }, &ds) + return hasEbpfInstrumentationAnnotation(&ds), ds.Spec.Selector.MatchLabels, err + case "StatefulSet": + var sts appsv1.StatefulSet + err := kubeClient.Get(ctx, client.ObjectKey{ + Namespace: podWorkload.Namespace, + Name: podWorkload.Name, + }, &sts) + return hasEbpfInstrumentationAnnotation(&sts), sts.Spec.Selector.MatchLabels, err + default: + return false, nil, errors.New("unknown pod workload kind") + } +} + +func getRuntimeDetails(ctx context.Context, kubeClient client.Client, podWorkload *PodWorkload) (*odigosv1.InstrumentedApplication, error) { + instrumentedApplicationName := utils.GetRuntimeObjectName(podWorkload.Name, podWorkload.Kind) + + var runtimeDetails odigosv1.InstrumentedApplication + err := kubeClient.Get(ctx, client.ObjectKey{ + Namespace: podWorkload.Namespace, + Name: instrumentedApplicationName, + }, &runtimeDetails) + if err != nil { + return nil, err + } + + return &runtimeDetails, nil +} diff --git a/odiglet/pkg/kube/instrumentation_ebpf/statefulsets.go b/odiglet/pkg/kube/instrumentation_ebpf/statefulsets.go new file mode 100644 index 000000000..f9fec5b45 --- /dev/null +++ b/odiglet/pkg/kube/instrumentation_ebpf/statefulsets.go @@ -0,0 +1,27 @@ +package instrumentation_ebpf + +import ( + "context" + + "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" + "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +type StatefulSetsReconciler struct { + client.Client + Scheme *runtime.Scheme + Directors map[common.ProgrammingLanguage]ebpf.Director +} + +func (d *StatefulSetsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { + err := ApplyEbpfToPodWorkload(ctx, d.Client, d.Directors, &PodWorkload{ + Name: request.Name, + Namespace: request.Namespace, + Kind: "StatefulSet", + }) + + return ctrl.Result{}, err +} diff --git a/odiglet/pkg/kube/instrumentation_ebpf/utils.go b/odiglet/pkg/kube/instrumentation_ebpf/utils.go new file mode 100644 index 000000000..e0416b6a3 --- /dev/null +++ b/odiglet/pkg/kube/instrumentation_ebpf/utils.go @@ -0,0 +1,20 @@ +package instrumentation_ebpf + +import ( + "github.com/keyval-dev/odigos/common/consts" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func hasEbpfInstrumentationAnnotation(obj client.Object) bool { + if obj == nil { + return false + } + + annotations := obj.GetAnnotations() + if annotations == nil { + return false + } + + _, exists := annotations[consts.EbpfInstrumentationAnnotation] + return exists +} diff --git a/odiglet/pkg/kube/manager.go b/odiglet/pkg/kube/manager.go index 7ec73a017..96da0632c 100644 --- a/odiglet/pkg/kube/manager.go +++ b/odiglet/pkg/kube/manager.go @@ -3,25 +3,20 @@ package kube import ( "context" - metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" - - odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" "github.com/keyval-dev/odigos/common" - "github.com/keyval-dev/odigos/common/consts" "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" + "github.com/keyval-dev/odigos/odiglet/pkg/kube/instrumentation_ebpf" + "github.com/keyval-dev/odigos/odiglet/pkg/kube/runtime_details" "github.com/keyval-dev/odigos/odiglet/pkg/log" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + + odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/builder" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/manager" - "sigs.k8s.io/controller-runtime/pkg/manager/signals" - "sigs.k8s.io/controller-runtime/pkg/predicate" + metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ) var ( @@ -33,8 +28,8 @@ func init() { utilruntime.Must(odigosv1.AddToScheme(scheme)) } -func StartReconciling(ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director) (context.Context, error) { - log.Logger.V(0).Info("Starting reconcileres") +func StartReconciling(ctx context.Context, ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director) error { + log.Logger.V(0).Info("Starting reconcileres for runtime details") ctrl.SetLogger(log.Logger) mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{ Scheme: scheme, @@ -43,113 +38,25 @@ func StartReconciling(ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director }, }) if err != nil { - return nil, err + return err } - err = builder. - ControllerManagedBy(mgr). - For(&appsv1.Deployment{}). - Owns(&odigosv1.InstrumentedApplication{}). - Complete(&DeploymentsReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }) + err = runtime_details.SetupWithManager(mgr) if err != nil { - return nil, err + return err } - err = builder. - ControllerManagedBy(mgr). - For(&appsv1.StatefulSet{}). - Owns(&odigosv1.InstrumentedApplication{}). - Complete(&StatefulSetsReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }) + err = instrumentation_ebpf.SetupWithManager(mgr, ebpfDirectors) if err != nil { - return nil, err + return err } - err = builder. - ControllerManagedBy(mgr). - For(&appsv1.DaemonSet{}). - Owns(&odigosv1.InstrumentedApplication{}). - Complete(&DaemonSetsReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }) - if err != nil { - return nil, err - } - - err = builder. - ControllerManagedBy(mgr). - For(&corev1.Namespace{}). - WithEventFilter(predicate.NewPredicateFuncs(func(obj client.Object) bool { - return isObjectLabeled(obj) - })). - Owns(&odigosv1.InstrumentedApplication{}). - Complete(&NamespacesReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - }) - if err != nil { - return nil, err - } - - err = builder. - ControllerManagedBy(mgr). - For(&corev1.Pod{}). - Complete(&PodsReconciler{ - Client: mgr.GetClient(), - Scheme: mgr.GetScheme(), - Directors: ebpfDirectors, - }) - if err != nil { - return nil, err - } - - ctx := signals.SetupSignalHandler() go func() { err := mgr.Start(ctx) if err != nil { - log.Logger.Error(err, "error starting manager") + log.Logger.Error(err, "error starting kube manager") } }() - return ctx, nil -} - -func isObjectLabeled(obj client.Object) bool { - labels := obj.GetLabels() - if labels != nil { - val, exists := labels[consts.OdigosInstrumentationLabel] - if exists && val == consts.InstrumentationEnabled { - return true - } - } - - return false -} - -func isInstrumentationDisabledExplicitly(obj client.Object) bool { - labels := obj.GetLabels() - if labels != nil { - val, exists := labels[consts.OdigosInstrumentationLabel] - if exists && val == consts.InstrumentationDisabled { - return true - } - } - - return false -} - -func isNamespaceLabeled(ctx context.Context, obj client.Object, c client.Client) bool { - var ns corev1.Namespace - err := c.Get(ctx, client.ObjectKey{Name: obj.GetNamespace()}, &ns) - if err != nil { - log.Logger.Error(err, "error fetching namespace object") - return false - } - return isObjectLabeled(&ns) + return nil } diff --git a/odiglet/pkg/kube/pods.go b/odiglet/pkg/kube/pods.go deleted file mode 100644 index e239e5e5b..000000000 --- a/odiglet/pkg/kube/pods.go +++ /dev/null @@ -1,176 +0,0 @@ -package kube - -import ( - "context" - "errors" - - odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" - "github.com/keyval-dev/odigos/common" - "github.com/keyval-dev/odigos/common/utils" - "github.com/keyval-dev/odigos/odiglet/pkg/ebpf" - "github.com/keyval-dev/odigos/odiglet/pkg/process" - appsv1 "k8s.io/api/apps/v1" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" -) - -type PodsReconciler struct { - client.Client - Scheme *runtime.Scheme - Directors map[common.ProgrammingLanguage]ebpf.Director -} - -func (p *PodsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { - logger := log.FromContext(ctx) - var pod corev1.Pod - err := p.Client.Get(ctx, request.NamespacedName, &pod) - if err != nil { - if apierrors.IsNotFound(err) { - p.cleanup(request.NamespacedName) - return ctrl.Result{}, nil - } - - logger.Error(err, "error fetching pod object") - return ctrl.Result{}, err - } - - if !isPodInThisNode(&pod) { - return ctrl.Result{}, nil - } - - if pod.Status.Phase == corev1.PodSucceeded || pod.Status.Phase == corev1.PodFailed { - logger.Info("pod is not running, removing instrumentation") - p.cleanup(request.NamespacedName) - return ctrl.Result{}, nil - } - - if pod.Status.Phase == corev1.PodRunning { - p.attemptEbpfInstrument(ctx, &pod) - } - - return ctrl.Result{}, nil -} - -func (p *PodsReconciler) cleanup(name types.NamespacedName) { - // cleanup using all available directors - // the Cleanup method is idempotent, so no harm in calling it multiple times - for _, director := range p.Directors { - director.Cleanup(name) - } -} - -func (p *PodsReconciler) attemptEbpfInstrument(ctx context.Context, pod *corev1.Pod) error { - logger := log.FromContext(ctx) - runtimeDetails, err := p.getRuntimeDetails(ctx, pod) - if err != nil { - if apierrors.IsNotFound(err) { - // Probably shutdown in progress, cleanup will be done as soon as the pod object is deleted - return nil - } - return err - } - - podUid := string(pod.UID) - for _, container := range runtimeDetails.Spec.Languages { - director := p.Directors[container.Language] - if director == nil { - // this language is not instrumented with eBPF - continue - } - - // this check currently only works for Go. - // it is temporary and will be replaced with annotation in a future PR - if !hasInstrumentationDevice(pod) { - continue - } - - appName := container.ContainerName - if len(runtimeDetails.Spec.Languages) == 1 && len(runtimeDetails.OwnerReferences) > 0 { - appName = runtimeDetails.OwnerReferences[0].Name - } - - details, err := process.FindAllInContainer(podUid, container.ContainerName) - if err != nil { - logger.Error(err, "error finding processes") - return err - } - - for _, d := range details { - err = director.Instrument(ctx, d.ProcessID, types.NamespacedName{ - Namespace: pod.Namespace, - Name: pod.Name, - }, appName) - - if err != nil { - logger.Error(err, "error instrumenting process", "pid", d.ProcessID) - return err - } - } - } - - return nil -} - -func (p *PodsReconciler) getRuntimeDetails(ctx context.Context, pod *corev1.Pod) (*odigosv1.InstrumentedApplication, error) { - name, err := p.getRuntimeDetailsName(ctx, pod) - if err != nil { - return nil, err - } - - var runtimeDetails odigosv1.InstrumentedApplication - err = p.Client.Get(ctx, client.ObjectKey{ - Namespace: pod.Namespace, - Name: name, - }, &runtimeDetails) - if err != nil { - return nil, err - } - - return &runtimeDetails, nil -} - -func (p *PodsReconciler) getRuntimeDetailsName(ctx context.Context, pod *corev1.Pod) (string, error) { - for _, owner := range pod.OwnerReferences { - if owner.Kind == "ReplicaSet" { - var rs appsv1.ReplicaSet - err := p.Client.Get(ctx, client.ObjectKey{ - Namespace: pod.Namespace, - Name: owner.Name, - }, &rs) - if err != nil { - return "", err - } - - if rs.OwnerReferences == nil { - return "", errors.New("replicaset has no owner reference") - } - - for _, rsOwner := range rs.OwnerReferences { - if rsOwner.Kind == "Deployment" || rsOwner.Kind == "DaemonSet" || rsOwner.Kind == "StatefulSet" { - return utils.GetRuntimeObjectName(rsOwner.Name, rsOwner.Kind), nil - } - } - } else if owner.Kind == "DaemonSet" || owner.Kind == "Deployment" || owner.Kind == "StatefulSet" { - return utils.GetRuntimeObjectName(owner.Name, owner.Kind), nil - } - } - - return "", errors.New("pod has no owner reference") -} - -// / hasInstrumentationDevice returns true if the pod has go instrumentation device attached. -func hasInstrumentationDevice(pod *corev1.Pod) bool { - for _, c := range pod.Spec.Containers { - if c.Resources.Limits != nil { - _, exists := c.Resources.Limits[corev1.ResourceName("instrumentation.odigos.io/go")] - return exists - } - } - - return false -} diff --git a/odiglet/pkg/kube/daemonsets.go b/odiglet/pkg/kube/runtime_details/daemonsets.go similarity index 97% rename from odiglet/pkg/kube/daemonsets.go rename to odiglet/pkg/kube/runtime_details/daemonsets.go index fe6742932..a170ec5cc 100644 --- a/odiglet/pkg/kube/daemonsets.go +++ b/odiglet/pkg/kube/runtime_details/daemonsets.go @@ -1,4 +1,4 @@ -package kube +package runtime_details import ( "context" diff --git a/odiglet/pkg/kube/deployments.go b/odiglet/pkg/kube/runtime_details/deployments.go similarity index 97% rename from odiglet/pkg/kube/deployments.go rename to odiglet/pkg/kube/runtime_details/deployments.go index ef2e95018..600eb9b4d 100644 --- a/odiglet/pkg/kube/deployments.go +++ b/odiglet/pkg/kube/runtime_details/deployments.go @@ -1,4 +1,4 @@ -package kube +package runtime_details import ( "context" @@ -18,7 +18,6 @@ type DeploymentsReconciler struct { func (d *DeploymentsReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl.Result, error) { logger := log.FromContext(ctx) - var dep appsv1.Deployment err := d.Client.Get(ctx, request.NamespacedName, &dep) if err != nil { diff --git a/odiglet/pkg/kube/runtime_details/manager.go b/odiglet/pkg/kube/runtime_details/manager.go new file mode 100644 index 000000000..0143dc82a --- /dev/null +++ b/odiglet/pkg/kube/runtime_details/manager.go @@ -0,0 +1,67 @@ +package runtime_details + +import ( + odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/predicate" +) + +func SetupWithManager(mgr ctrl.Manager) error { + + err := builder. + ControllerManagedBy(mgr). + For(&appsv1.Deployment{}). + Owns(&odigosv1.InstrumentedApplication{}). + Complete(&DeploymentsReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }) + if err != nil { + return err + } + + err = builder. + ControllerManagedBy(mgr). + For(&appsv1.StatefulSet{}). + Owns(&odigosv1.InstrumentedApplication{}). + Complete(&StatefulSetsReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }) + if err != nil { + return err + } + + err = builder. + ControllerManagedBy(mgr). + For(&appsv1.DaemonSet{}). + Owns(&odigosv1.InstrumentedApplication{}). + Complete(&DaemonSetsReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }) + if err != nil { + return err + } + + err = builder. + ControllerManagedBy(mgr). + For(&corev1.Namespace{}). + WithEventFilter(predicate.NewPredicateFuncs(func(obj client.Object) bool { + return isObjectLabeled(obj) + })). + Owns(&odigosv1.InstrumentedApplication{}). + Complete(&NamespacesReconciler{ + Client: mgr.GetClient(), + Scheme: mgr.GetScheme(), + }) + if err != nil { + return err + } + + return nil +} diff --git a/odiglet/pkg/kube/namespaces.go b/odiglet/pkg/kube/runtime_details/namespaces.go similarity index 98% rename from odiglet/pkg/kube/namespaces.go rename to odiglet/pkg/kube/runtime_details/namespaces.go index 1bc3737fc..a8643996a 100644 --- a/odiglet/pkg/kube/namespaces.go +++ b/odiglet/pkg/kube/runtime_details/namespaces.go @@ -1,4 +1,4 @@ -package kube +package runtime_details import ( "context" diff --git a/odiglet/pkg/kube/shared.go b/odiglet/pkg/kube/runtime_details/shared.go similarity index 82% rename from odiglet/pkg/kube/shared.go rename to odiglet/pkg/kube/runtime_details/shared.go index e8bf2ec7d..14d9dd62a 100644 --- a/odiglet/pkg/kube/shared.go +++ b/odiglet/pkg/kube/runtime_details/shared.go @@ -1,4 +1,4 @@ -package kube +package runtime_details import ( "context" @@ -8,8 +8,8 @@ import ( "github.com/keyval-dev/odigos/common" "github.com/keyval-dev/odigos/common/consts" "github.com/keyval-dev/odigos/common/utils" - "github.com/keyval-dev/odigos/odiglet/pkg/env" "github.com/keyval-dev/odigos/odiglet/pkg/inspectors" + kubeutils "github.com/keyval-dev/odigos/odiglet/pkg/kube/utils" "github.com/keyval-dev/odigos/odiglet/pkg/log" "github.com/keyval-dev/odigos/odiglet/pkg/process" corev1 "k8s.io/api/core/v1" @@ -22,7 +22,7 @@ import ( func inspectRuntimesOfRunningPods(ctx context.Context, logger *logr.Logger, labels map[string]string, kubeClient client.Client, scheme *runtime.Scheme, object client.Object) (ctrl.Result, error) { - pods, err := getRunningPods(ctx, labels, object.GetNamespace(), kubeClient) + pods, err := kubeutils.GetRunningPods(ctx, labels, object.GetNamespace(), kubeClient) if err != nil { logger.Error(err, "error fetching running pods") return ctrl.Result{}, err @@ -113,25 +113,3 @@ func persistRuntimeResults(ctx context.Context, results []common.LanguageByConta } return nil } - -func getRunningPods(ctx context.Context, labels map[string]string, ns string, kubeClient client.Client) ([]corev1.Pod, error) { - var podList corev1.PodList - err := kubeClient.List(ctx, &podList, client.MatchingLabels(labels), client.InNamespace(ns)) - - var filteredPods []corev1.Pod - for _, pod := range podList.Items { - if isPodInThisNode(&pod) && pod.Status.Phase == corev1.PodRunning { - filteredPods = append(filteredPods, pod) - } - } - - if err != nil { - return nil, err - } - - return filteredPods, nil -} - -func isPodInThisNode(pod *corev1.Pod) bool { - return pod.Spec.NodeName == env.Current.NodeName -} diff --git a/odiglet/pkg/kube/statefulsets.go b/odiglet/pkg/kube/runtime_details/statefulsets.go similarity index 97% rename from odiglet/pkg/kube/statefulsets.go rename to odiglet/pkg/kube/runtime_details/statefulsets.go index 75ef896ae..6e828b408 100644 --- a/odiglet/pkg/kube/statefulsets.go +++ b/odiglet/pkg/kube/runtime_details/statefulsets.go @@ -1,4 +1,4 @@ -package kube +package runtime_details import ( "context" diff --git a/odiglet/pkg/kube/runtime_details/utils.go b/odiglet/pkg/kube/runtime_details/utils.go new file mode 100644 index 000000000..dc5167d35 --- /dev/null +++ b/odiglet/pkg/kube/runtime_details/utils.go @@ -0,0 +1,45 @@ +package runtime_details + +import ( + "context" + + "github.com/keyval-dev/odigos/common/consts" + "github.com/keyval-dev/odigos/odiglet/pkg/log" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func isObjectLabeled(obj client.Object) bool { + labels := obj.GetLabels() + if labels != nil { + val, exists := labels[consts.OdigosInstrumentationLabel] + if exists && val == consts.InstrumentationEnabled { + return true + } + } + + return false +} + +func isInstrumentationDisabledExplicitly(obj client.Object) bool { + labels := obj.GetLabels() + if labels != nil { + val, exists := labels[consts.OdigosInstrumentationLabel] + if exists && val == consts.InstrumentationDisabled { + return true + } + } + + return false +} + +func isNamespaceLabeled(ctx context.Context, obj client.Object, c client.Client) bool { + var ns corev1.Namespace + err := c.Get(ctx, client.ObjectKey{Name: obj.GetNamespace()}, &ns) + if err != nil { + log.Logger.Error(err, "error fetching namespace object") + return false + } + + return isObjectLabeled(&ns) +} diff --git a/odiglet/pkg/kube/utils/utils.go b/odiglet/pkg/kube/utils/utils.go new file mode 100644 index 000000000..6c29edcd3 --- /dev/null +++ b/odiglet/pkg/kube/utils/utils.go @@ -0,0 +1,31 @@ +package utils + +import ( + "context" + + "github.com/keyval-dev/odigos/odiglet/pkg/env" + corev1 "k8s.io/api/core/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +func IsPodInCurrentNode(pod *corev1.Pod) bool { + return pod.Spec.NodeName == env.Current.NodeName +} + +func GetRunningPods(ctx context.Context, labels map[string]string, ns string, kubeClient client.Client) ([]corev1.Pod, error) { + var podList corev1.PodList + err := kubeClient.List(ctx, &podList, client.MatchingLabels(labels), client.InNamespace(ns)) + + var filteredPods []corev1.Pod + for _, pod := range podList.Items { + if IsPodInCurrentNode(&pod) && pod.Status.Phase == corev1.PodRunning { + filteredPods = append(filteredPods, pod) + } + } + + if err != nil { + return nil, err + } + + return filteredPods, nil +}