Skip to content

Commit

Permalink
refactor(odiglet): make ebpf instrumentation api for any language (#764)
Browse files Browse the repository at this point in the history
  • Loading branch information
blumamir authored Nov 15, 2023
1 parent d64c112 commit 4fd2503
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 165 deletions.
20 changes: 15 additions & 5 deletions odiglet/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"

"github.com/keyval-dev/odigos/common"
"github.com/keyval-dev/odigos/odiglet/pkg/ebpf"
"github.com/keyval-dev/odigos/odiglet/pkg/env"
"github.com/keyval-dev/odigos/odiglet/pkg/instrumentation"
Expand Down Expand Up @@ -38,22 +39,24 @@ func main() {
os.Exit(-1)
}

ebpfDirector, err := initEbpf()
ebpfDirectors, err := initEbpf()
if err != nil {
log.Logger.Error(err, "Failed to init eBPF director")
os.Exit(-1)
}

go startDeviceManager(clientset)

ctx, err := kube.StartReconciling(ebpfDirector)
ctx, err := kube.StartReconciling(ebpfDirectors)
if err != nil {
log.Logger.Error(err, "Failed to start reconciling")
os.Exit(-1)
}

<-ctx.Done()
ebpfDirector.Shutdown()
for _, director := range ebpfDirectors {
director.Shutdown()
}
}

func startDeviceManager(clientset *kubernetes.Clientset) {
Expand All @@ -71,6 +74,13 @@ func startDeviceManager(clientset *kubernetes.Clientset) {
manager.Run()
}

func initEbpf() (ebpf.Director, error) {
return ebpf.NewInstrumentationDirector()
func initEbpf() (map[common.ProgrammingLanguage]ebpf.Director, error) {
goDirector, err := ebpf.NewInstrumentationDirectorGo()
if err != nil {
return nil, err
}

return map[common.ProgrammingLanguage]ebpf.Director{
common.GoProgrammingLanguage: goDirector,
}, nil
}
95 changes: 3 additions & 92 deletions odiglet/pkg/ebpf/director.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,17 @@
package ebpf

import (
"context"
"errors"
"fmt"
"os"
"sync"

"github.com/keyval-dev/odigos/odiglet/pkg/env"

"github.com/keyval-dev/odigos/common/consts"

"go.opentelemetry.io/auto"

"github.com/keyval-dev/odigos/odiglet/pkg/log"
"github.com/keyval-dev/odigos/common"
"k8s.io/apimachinery/pkg/types"
)

var ErrProcInstrumented = errors.New("process already instrumented")
var ErrProcInstrumented = errors.New("process already instrumented")

type Director interface {
Language() common.ProgrammingLanguage
Instrument(pid int, podDetails types.NamespacedName, appName string) error
Cleanup(podDetails types.NamespacedName)
Shutdown()
}

type InstrumentationDirector struct {
mux sync.Mutex
pidsToInstrumentation map[int]*auto.Instrumentation
podDetailsToPids map[types.NamespacedName][]int
}

func NewInstrumentationDirector() (*InstrumentationDirector, error) {
err := os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("http://%s:%d", env.Current.NodeIP, consts.OTLPPort))
if err != nil {
return nil, err
}

return &InstrumentationDirector{
pidsToInstrumentation: make(map[int]*auto.Instrumentation),
podDetailsToPids: make(map[types.NamespacedName][]int),
}, nil
}

func (i *InstrumentationDirector) Instrument(pid int, podDetails types.NamespacedName, appName string) error {
log.Logger.V(0).Info("Instrumenting process", "pid", pid)
i.mux.Lock()
defer i.mux.Unlock()
if _, exists := i.pidsToInstrumentation[pid]; exists {
log.Logger.V(5).Info("Process already instrumented", "pid", pid)
return ErrProcInstrumented
}

go func() {
inst, err := auto.NewInstrumentation(auto.WithPID(pid), auto.WithServiceName(appName))
if err != nil {
log.Logger.Error(err, "instrumentation setup failed")
return
}

i.pidsToInstrumentation[pid] = inst
i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid)

if err := inst.Run(context.Background()); err != nil {
log.Logger.Error(err, "instrumentation crashed after running")
}
} ()

return nil
}

func (i *InstrumentationDirector) Cleanup(podDetails types.NamespacedName) {
i.mux.Lock()
defer i.mux.Unlock()
pids, exists := i.podDetailsToPids[podDetails]
if !exists {
log.Logger.V(5).Info("No processes to cleanup for pod", "pod", podDetails)
return
}

log.Logger.V(0).Info("Cleaning up instrumentation for pod", "pod", podDetails)
delete(i.podDetailsToPids, podDetails)
for _, pid := range pids {
inst, exists := i.pidsToInstrumentation[pid]
if !exists {
log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid)
continue
}

delete(i.pidsToInstrumentation, pid)
go func() {
err := inst.Close()
if err != nil {
log.Logger.Error(err, "error cleaning up objects for process", "pid", pid)
}
}()
}
}

func (i *InstrumentationDirector) Shutdown() {
log.Logger.V(0).Info("Shutting down instrumentation director")
for details := range i.podDetailsToPids {
i.Cleanup(details)
}
}
99 changes: 99 additions & 0 deletions odiglet/pkg/ebpf/go.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package ebpf

import (
"context"
"fmt"
"os"
"sync"

"github.com/keyval-dev/odigos/common"
"github.com/keyval-dev/odigos/odiglet/pkg/env"
"github.com/keyval-dev/odigos/odiglet/pkg/instrumentation/consts"
"github.com/keyval-dev/odigos/odiglet/pkg/log"
"go.opentelemetry.io/auto"
"k8s.io/apimachinery/pkg/types"
)

type InstrumentationDirectorGo struct {
mux sync.Mutex
pidsToInstrumentation map[int]*auto.Instrumentation
podDetailsToPids map[types.NamespacedName][]int
}

func NewInstrumentationDirectorGo() (Director, error) {
err := os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("http://%s:%d", env.Current.NodeIP, consts.OTLPPort))
if err != nil {
return nil, err
}

return &InstrumentationDirectorGo{
pidsToInstrumentation: make(map[int]*auto.Instrumentation),
podDetailsToPids: make(map[types.NamespacedName][]int),
}, nil
}

func (i *InstrumentationDirectorGo) Language() common.ProgrammingLanguage {
return common.GoProgrammingLanguage
}

func (i *InstrumentationDirectorGo) Instrument(pid int, podDetails types.NamespacedName, appName string) error {
log.Logger.V(0).Info("Instrumenting process", "pid", pid)
i.mux.Lock()
defer i.mux.Unlock()
if _, exists := i.pidsToInstrumentation[pid]; exists {
log.Logger.V(5).Info("Process already instrumented", "pid", pid)
return ErrProcInstrumented
}

go func() {
inst, err := auto.NewInstrumentation(auto.WithPID(pid), auto.WithServiceName(appName))
if err != nil {
log.Logger.Error(err, "instrumentation setup failed")
return
}

i.pidsToInstrumentation[pid] = inst
i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid)

if err := inst.Run(context.Background()); err != nil {
log.Logger.Error(err, "instrumentation crashed after running")
}
}()

return nil
}

func (i *InstrumentationDirectorGo) Cleanup(podDetails types.NamespacedName) {
i.mux.Lock()
defer i.mux.Unlock()
pids, exists := i.podDetailsToPids[podDetails]
if !exists {
log.Logger.V(5).Info("No processes to cleanup for pod", "pod", podDetails)
return
}

log.Logger.V(0).Info("Cleaning up instrumentation for pod", "pod", podDetails)
delete(i.podDetailsToPids, podDetails)
for _, pid := range pids {
inst, exists := i.pidsToInstrumentation[pid]
if !exists {
log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid)
continue
}

delete(i.pidsToInstrumentation, pid)
go func() {
err := inst.Close()
if err != nil {
log.Logger.Error(err, "error cleaning up objects for process", "pid", pid)
}
}()
}
}

func (i *InstrumentationDirectorGo) Shutdown() {
log.Logger.V(0).Info("Shutting down instrumentation director")
for details := range i.podDetailsToPids {
i.Cleanup(details)
}
}
9 changes: 5 additions & 4 deletions odiglet/pkg/kube/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1"
"github.com/keyval-dev/odigos/common"
"github.com/keyval-dev/odigos/common/consts"
"github.com/keyval-dev/odigos/odiglet/pkg/ebpf"
"github.com/keyval-dev/odigos/odiglet/pkg/log"
Expand All @@ -32,7 +33,7 @@ func init() {
utilruntime.Must(odigosv1.AddToScheme(scheme))
}

func StartReconciling(ebpfDirector ebpf.Director) (context.Context, error) {
func StartReconciling(ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director) (context.Context, error) {
log.Logger.V(0).Info("Starting reconcileres")
ctrl.SetLogger(log.Logger)
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{
Expand Down Expand Up @@ -100,9 +101,9 @@ func StartReconciling(ebpfDirector ebpf.Director) (context.Context, error) {
ControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(&PodsReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Director: ebpfDirector,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Directors: ebpfDirectors,
})
if err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 4fd2503

Please sign in to comment.