diff --git a/common/utils/names.go b/common/utils/names.go index 30bc96e75..d98ba26d6 100644 --- a/common/utils/names.go +++ b/common/utils/names.go @@ -10,10 +10,29 @@ func GetRuntimeObjectName(name string, kind string) string { } func GetTargetFromRuntimeName(name string) (string, string, error) { - parts := strings.Split(name, "-") - if len(parts) < 2 { + hyphenIndex := strings.Index(name, "-") + if hyphenIndex == -1 { return "", "", errors.New("invalid runtime name") } - return strings.Join(parts[1:], "-"), parts[0], nil + workloadKind, err := kindFromLowercase(name[:hyphenIndex]) + if err != nil { + return "", "", err + } + workloadName := name[hyphenIndex+1:] + + return workloadName, workloadKind, nil +} + +func kindFromLowercase(lowercaseKind string) (string, error) { + switch lowercaseKind { + case "deployment": + return "Deployment", nil + case "statefulset": + return "StatefulSet", nil + case "daemonset": + return "DaemonSet", nil + default: + return "", errors.New("unknown kind") + } } diff --git a/instrumentor/controllers/common.go b/instrumentor/controllers/common.go index 52e69ec2a..c10733fc8 100644 --- a/instrumentor/controllers/common.go +++ b/instrumentor/controllers/common.go @@ -184,11 +184,11 @@ func getPodSpecFromObject(obj client.Object) (*corev1.PodTemplateSpec, error) { func getObjectFromKindString(kind string) (client.Object, error) { switch strings.ToLower(kind) { - case "deployment": + case "Deployment": return &appsv1.Deployment{}, nil - case "statefulset": + case "StatefulSet": return &appsv1.StatefulSet{}, nil - case "daemonset": + case "DaemonSet": return &appsv1.DaemonSet{}, nil default: return nil, errors.New("unknown kind") diff --git a/odiglet/cmd/main.go b/odiglet/cmd/main.go index 81a6e3384..b13c98523 100644 --- a/odiglet/cmd/main.go +++ b/odiglet/cmd/main.go @@ -89,10 +89,8 @@ func startDeviceManager(clientset *kubernetes.Clientset) { } func initEbpf() (map[common.ProgrammingLanguage]ebpf.Director, error) { - goDirector, err := ebpf.NewInstrumentationDirectorGo() - if err != nil { - return nil, err - } + goInstrumentationFactory := ebpf.NewGoInstrumentationFactory() + goDirector := ebpf.NewEbpfDirector(common.GoProgrammingLanguage, goInstrumentationFactory) return map[common.ProgrammingLanguage]ebpf.Director{ common.GoProgrammingLanguage: goDirector, diff --git a/odiglet/pkg/ebpf/director.go b/odiglet/pkg/ebpf/director.go index accc49348..915fc13ee 100644 --- a/odiglet/pkg/ebpf/director.go +++ b/odiglet/pkg/ebpf/director.go @@ -2,14 +2,207 @@ package ebpf import ( "context" + "sync" "github.com/keyval-dev/odigos/common" + "github.com/keyval-dev/odigos/odiglet/pkg/log" "k8s.io/apimachinery/pkg/types" ) +// This interface should be implemented by all ebpf sdks +// for example, the go auto instrumentation sdk implements it +type OtelEbpfSdk interface { + Run(ctx context.Context) error + Close() error +} + +// users can use different eBPF otel SDKs by returning them from this function +type InstrumentationFactory[T OtelEbpfSdk] interface { + CreateEbpfInstrumentation(ctx context.Context, pid int, serviceName string, podWorkload *common.PodWorkload) (T, error) +} + +// Director manages the instrumentation for a specific SDK in a specific language type Director interface { Language() common.ProgrammingLanguage Instrument(ctx context.Context, pid int, podDetails types.NamespacedName, podWorkload *common.PodWorkload, appName string) error Cleanup(podDetails types.NamespacedName) Shutdown() } + +type podDetails struct { + Workload *common.PodWorkload + Pids []int +} + +type EbpfDirector[T OtelEbpfSdk] struct { + mux sync.Mutex + + language common.ProgrammingLanguage + instrumentationFactory InstrumentationFactory[T] + + // this map holds the instrumentation object which is used to close the instrumentation + // the map is filled only after the instrumentation is actually created + // which is an asyn process that might take some time + pidsToInstrumentation map[int]T + + // this map is used to make sure we do not attempt to instrument the same process twice. + // it keeps track of which processes we already attempted to instrument, + // so we can avoid attempting to instrument them again. + pidsAttemptedInstrumentation map[int]struct{} + + // via this map, we can find the workload and pids for a specific pod. + // sometimes we only have the pod name and namespace, so this map is useful. + podsToDetails map[types.NamespacedName]podDetails + + // this map can be used when we only have the workload, and need to find the pods to derive pids. + workloadToPods map[common.PodWorkload]map[types.NamespacedName]struct{} +} + +func NewEbpfDirector[T OtelEbpfSdk](language common.ProgrammingLanguage, instrumentationFactory InstrumentationFactory[T]) *EbpfDirector[T] { + return &EbpfDirector[T]{ + language: language, + instrumentationFactory: instrumentationFactory, + pidsToInstrumentation: make(map[int]T), + pidsAttemptedInstrumentation: make(map[int]struct{}), + podsToDetails: make(map[types.NamespacedName]podDetails), + workloadToPods: make(map[common.PodWorkload]map[types.NamespacedName]struct{}), + } +} + +func (d *EbpfDirector[T]) Instrument(ctx context.Context, pid int, pod types.NamespacedName, podWorkload *common.PodWorkload, appName string) error { + log.Logger.V(0).Info("Instrumenting process", "pid", pid, "workload", podWorkload) + d.mux.Lock() + defer d.mux.Unlock() + if _, exists := d.pidsAttemptedInstrumentation[pid]; exists { + log.Logger.V(5).Info("Process already instrumented", "pid", pid) + return nil + } + + details, exists := d.podsToDetails[pod] + if !exists { + details = podDetails{ + Workload: podWorkload, + Pids: []int{}, + } + d.podsToDetails[pod] = details + } + details.Pids = append(details.Pids, pid) + d.podsToDetails[pod] = details + + d.pidsAttemptedInstrumentation[pid] = struct{}{} + + if _, exists := d.workloadToPods[*podWorkload]; !exists { + d.workloadToPods[*podWorkload] = make(map[types.NamespacedName]struct{}) + } + d.workloadToPods[*podWorkload][pod] = struct{}{} + + go func() { + inst, err := d.instrumentationFactory.CreateEbpfInstrumentation(ctx, pid, appName, podWorkload) + if err != nil { + log.Logger.Error(err, "instrumentation setup failed", "workload", podWorkload, "pod", pod) + return + } + + d.mux.Lock() + _, stillExists := d.pidsAttemptedInstrumentation[pid] + if stillExists { + d.pidsToInstrumentation[pid] = inst + d.mux.Unlock() + } else { + d.mux.Unlock() + // we attempted to instrument this process, but it was already cleaned up + // so we need to clean up the instrumentation we just created + err = inst.Close() + if err != nil { + log.Logger.Error(err, "error cleaning up instrumentation for process", "pid", pid) + } + return + } + + log.Logger.V(0).Info("Running ebpf go instrumentation", "workload", podWorkload, "pod", pod) + + if err := inst.Run(context.Background()); err != nil { + log.Logger.Error(err, "instrumentation crashed after running") + } + }() + + return nil +} + +func (d *EbpfDirector[T]) Language() common.ProgrammingLanguage { + return d.language +} + +func (d *EbpfDirector[T]) Cleanup(pod types.NamespacedName) { + d.mux.Lock() + defer d.mux.Unlock() + details, exists := d.podsToDetails[pod] + if !exists { + log.Logger.V(5).Info("No processes to cleanup for pod", "pod", pod) + return + } + + log.Logger.V(0).Info("Cleaning up ebpf go instrumentation for pod", "pod", pod) + delete(d.podsToDetails, pod) + + // clear the pod from the workloadToPods map + workload := details.Workload + delete(d.workloadToPods[*workload], pod) + if len(d.workloadToPods[*workload]) == 0 { + delete(d.workloadToPods, *workload) + } + + for _, pid := range details.Pids { + delete(d.pidsAttemptedInstrumentation, pid) + + inst, exists := d.pidsToInstrumentation[pid] + if !exists { + log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid) + continue + } + + delete(d.pidsToInstrumentation, pid) + go func() { + err := inst.Close() + if err != nil { + log.Logger.Error(err, "error cleaning up objects for process", "pid", pid) + } + }() + } +} + +func (d *EbpfDirector[T]) Shutdown() { + log.Logger.V(0).Info("Shutting down instrumentation director") + for details := range d.podsToDetails { + d.Cleanup(details) + } +} + +func (d *EbpfDirector[T]) GetWorkloadInstrumentations(workload *common.PodWorkload) []T { + d.mux.Lock() + defer d.mux.Unlock() + + pods, ok := d.workloadToPods[*workload] + if !ok { + return nil + } + + var insts []T + for pod := range pods { + details, ok := d.podsToDetails[pod] + if !ok { + continue + } + + for _, pid := range details.Pids { + inst, ok := d.pidsToInstrumentation[pid] + if !ok { + continue + } + + insts = append(insts, inst) + } + } + + return insts +} diff --git a/odiglet/pkg/ebpf/go.go b/odiglet/pkg/ebpf/go.go index 877a1d2f6..f655a42ff 100644 --- a/odiglet/pkg/ebpf/go.go +++ b/odiglet/pkg/ebpf/go.go @@ -3,7 +3,6 @@ package ebpf import ( "context" "fmt" - "sync" "github.com/keyval-dev/odigos/odiglet/pkg/kube/utils" @@ -13,75 +12,15 @@ import ( "github.com/keyval-dev/odigos/odiglet/pkg/log" "go.opentelemetry.io/auto" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "k8s.io/apimachinery/pkg/types" ) -type podDetails struct { - Workload *common.PodWorkload - Pids []int -} - -type InstrumentationDirectorGo struct { - mux sync.Mutex - - // this map holds the instrumentation object which is used to close the instrumentation - // the map is filled only after the instrumentation is actually created - // which is an asyn process that might take some time - pidsToInstrumentation map[int]*auto.Instrumentation +type GoInstrumentationFactory struct{} - // this map is used to make sure we do not attempt to instrument the same process twice. - // it keeps track of which processes we already attempted to instrument, - // so we can avoid attempting to instrument them again. - pidsAttemptedInstrumentation map[int]struct{} - - // via this map, we can find the workload and pids for a specific pod. - // sometimes we only have the pod name and namespace, so this map is useful. - podsToDetails map[types.NamespacedName]podDetails - - // this map can be used when we only have the workload, and need to find the pods to derive pids. - workloadToPods map[common.PodWorkload]map[types.NamespacedName]struct{} +func NewGoInstrumentationFactory() InstrumentationFactory[*auto.Instrumentation] { + return &GoInstrumentationFactory{} } -func NewInstrumentationDirectorGo() (Director, error) { - return &InstrumentationDirectorGo{ - pidsToInstrumentation: make(map[int]*auto.Instrumentation), - pidsAttemptedInstrumentation: make(map[int]struct{}), - podsToDetails: make(map[types.NamespacedName]podDetails), - workloadToPods: make(map[common.PodWorkload]map[types.NamespacedName]struct{}), - }, nil -} - -func (i *InstrumentationDirectorGo) Language() common.ProgrammingLanguage { - return common.GoProgrammingLanguage -} - -func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod types.NamespacedName, podWorkload *common.PodWorkload, appName string) error { - log.Logger.V(0).Info("Instrumenting process", "pid", pid, "workload", podWorkload) - i.mux.Lock() - defer i.mux.Unlock() - if _, exists := i.pidsAttemptedInstrumentation[pid]; exists { - log.Logger.V(5).Info("Process already instrumented", "pid", pid) - return nil - } - - details, exists := i.podsToDetails[pod] - if !exists { - details = podDetails{ - Workload: podWorkload, - Pids: []int{}, - } - i.podsToDetails[pod] = details - } - details.Pids = append(details.Pids, pid) - i.podsToDetails[pod] = details - - i.pidsAttemptedInstrumentation[pid] = struct{}{} - - if _, exists := i.workloadToPods[*podWorkload]; !exists { - i.workloadToPods[*podWorkload] = make(map[types.NamespacedName]struct{}) - } - i.workloadToPods[*podWorkload][pod] = struct{}{} - +func (g *GoInstrumentationFactory) CreateEbpfInstrumentation(ctx context.Context, pid int, serviceName string, podWorkload *common.PodWorkload) (*auto.Instrumentation, error) { defaultExporter, err := otlptracegrpc.New( ctx, otlptracegrpc.WithInsecure(), @@ -89,118 +28,20 @@ func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod ) if err != nil { log.Logger.Error(err, "failed to create exporter") - return err + return nil, err } - go func() { - inst, err := auto.NewInstrumentation( - ctx, - auto.WithPID(pid), - auto.WithResourceAttributes(utils.GetResourceAttributes(podWorkload)...), - auto.WithServiceName(appName), - auto.WithTraceExporter(defaultExporter), - ) - if err != nil { - log.Logger.Error(err, "instrumentation setup failed", "workload", podWorkload, "pod", pod) - return - } - - i.mux.Lock() - _, stillExists := i.pidsAttemptedInstrumentation[pid] - if stillExists { - i.pidsToInstrumentation[pid] = inst - i.mux.Unlock() - } else { - i.mux.Unlock() - // we attempted to instrument this process, but it was already cleaned up - // so we need to clean up the instrumentation we just created - err = inst.Close() - if err != nil { - log.Logger.Error(err, "error cleaning up instrumentation for process", "pid", pid) - } - return - } - - log.Logger.V(0).Info("Running ebpf go instrumentation", "workload", podWorkload, "pod", pod) - - if err := inst.Run(context.Background()); err != nil { - log.Logger.Error(err, "instrumentation crashed after running") - } - }() - - return nil -} - -func (i *InstrumentationDirectorGo) Cleanup(pod types.NamespacedName) { - i.mux.Lock() - defer i.mux.Unlock() - details, exists := i.podsToDetails[pod] - if !exists { - log.Logger.V(5).Info("No processes to cleanup for pod", "pod", pod) - return - } - - log.Logger.V(0).Info("Cleaning up ebpf go instrumentation for pod", "pod", pod) - delete(i.podsToDetails, pod) - - // clear the pod from the workloadToPods map - workload := details.Workload - delete(i.workloadToPods[*workload], pod) - if len(i.workloadToPods[*workload]) == 0 { - delete(i.workloadToPods, *workload) - } - - for _, pid := range details.Pids { - delete(i.pidsAttemptedInstrumentation, pid) - - inst, exists := i.pidsToInstrumentation[pid] - if !exists { - log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid) - continue - } - - delete(i.pidsToInstrumentation, pid) - go func() { - err := inst.Close() - if err != nil { - log.Logger.Error(err, "error cleaning up objects for process", "pid", pid) - } - }() - } -} - -func (i *InstrumentationDirectorGo) Shutdown() { - log.Logger.V(0).Info("Shutting down instrumentation director") - for details := range i.podsToDetails { - i.Cleanup(details) - } -} - -func (i *InstrumentationDirectorGo) GetWorkloadInstrumentations(workload common.PodWorkload) []*auto.Instrumentation { - i.mux.Lock() - defer i.mux.Unlock() - - pods, ok := i.workloadToPods[workload] - if !ok { - return nil - } - - var insts []*auto.Instrumentation - for pod := range pods { - details, ok := i.podsToDetails[pod] - if !ok { - continue - } - - for _, pid := range details.Pids { - inst, ok := i.pidsToInstrumentation[pid] - if !ok { - continue - } - - insts = append(insts, inst) - } + inst, err := auto.NewInstrumentation( + ctx, + auto.WithPID(pid), + auto.WithResourceAttributes(utils.GetResourceAttributes(podWorkload)...), + auto.WithServiceName(serviceName), + auto.WithTraceExporter(defaultExporter), + ) + if err != nil { + log.Logger.Error(err, "instrumentation setup failed") + return nil, err } - return insts + return inst, nil }