Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(odiglet): make ebpf instrumentation api for any language #764

Merged
merged 2 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions odiglet/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"os"

"github.com/keyval-dev/odigos/common"
"github.com/keyval-dev/odigos/odiglet/pkg/ebpf"
"github.com/keyval-dev/odigos/odiglet/pkg/env"
"github.com/keyval-dev/odigos/odiglet/pkg/instrumentation"
Expand Down Expand Up @@ -38,22 +39,24 @@ func main() {
os.Exit(-1)
}

ebpfDirector, err := initEbpf()
ebpfDirectors, err := initEbpf()
if err != nil {
log.Logger.Error(err, "Failed to init eBPF director")
os.Exit(-1)
}

go startDeviceManager(clientset)

ctx, err := kube.StartReconciling(ebpfDirector)
ctx, err := kube.StartReconciling(ebpfDirectors)
if err != nil {
log.Logger.Error(err, "Failed to start reconciling")
os.Exit(-1)
}

<-ctx.Done()
ebpfDirector.Shutdown()
for _, director := range ebpfDirectors {
director.Shutdown()
}
}

func startDeviceManager(clientset *kubernetes.Clientset) {
Expand All @@ -71,6 +74,13 @@ func startDeviceManager(clientset *kubernetes.Clientset) {
manager.Run()
}

func initEbpf() (ebpf.Director, error) {
return ebpf.NewInstrumentationDirector()
func initEbpf() (map[common.ProgrammingLanguage]ebpf.Director, error) {
goDirector, err := ebpf.NewInstrumentationDirectorGo()
if err != nil {
return nil, err
}

return map[common.ProgrammingLanguage]ebpf.Director{
common.GoProgrammingLanguage: goDirector,
}, nil
}
95 changes: 3 additions & 92 deletions odiglet/pkg/ebpf/director.go
Original file line number Diff line number Diff line change
@@ -1,106 +1,17 @@
package ebpf

import (
"context"
"errors"
"fmt"
"os"
"sync"

"github.com/keyval-dev/odigos/odiglet/pkg/env"

"github.com/keyval-dev/odigos/common/consts"

"go.opentelemetry.io/auto"

"github.com/keyval-dev/odigos/odiglet/pkg/log"
"github.com/keyval-dev/odigos/common"
"k8s.io/apimachinery/pkg/types"
)

var ErrProcInstrumented = errors.New("process already instrumented")
var ErrProcInstrumented = errors.New("process already instrumented")

type Director interface {
Language() common.ProgrammingLanguage
Instrument(pid int, podDetails types.NamespacedName, appName string) error
Cleanup(podDetails types.NamespacedName)
Shutdown()
}

type InstrumentationDirector struct {
mux sync.Mutex
pidsToInstrumentation map[int]*auto.Instrumentation
podDetailsToPids map[types.NamespacedName][]int
}

func NewInstrumentationDirector() (*InstrumentationDirector, error) {
err := os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("http://%s:%d", env.Current.NodeIP, consts.OTLPPort))
if err != nil {
return nil, err
}

return &InstrumentationDirector{
pidsToInstrumentation: make(map[int]*auto.Instrumentation),
podDetailsToPids: make(map[types.NamespacedName][]int),
}, nil
}

func (i *InstrumentationDirector) Instrument(pid int, podDetails types.NamespacedName, appName string) error {
log.Logger.V(0).Info("Instrumenting process", "pid", pid)
i.mux.Lock()
defer i.mux.Unlock()
if _, exists := i.pidsToInstrumentation[pid]; exists {
log.Logger.V(5).Info("Process already instrumented", "pid", pid)
return ErrProcInstrumented
}

go func() {
inst, err := auto.NewInstrumentation(auto.WithPID(pid), auto.WithServiceName(appName))
if err != nil {
log.Logger.Error(err, "instrumentation setup failed")
return
}

i.pidsToInstrumentation[pid] = inst
i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid)

if err := inst.Run(context.Background()); err != nil {
log.Logger.Error(err, "instrumentation crashed after running")
}
} ()

return nil
}

func (i *InstrumentationDirector) Cleanup(podDetails types.NamespacedName) {
i.mux.Lock()
defer i.mux.Unlock()
pids, exists := i.podDetailsToPids[podDetails]
if !exists {
log.Logger.V(5).Info("No processes to cleanup for pod", "pod", podDetails)
return
}

log.Logger.V(0).Info("Cleaning up instrumentation for pod", "pod", podDetails)
delete(i.podDetailsToPids, podDetails)
for _, pid := range pids {
inst, exists := i.pidsToInstrumentation[pid]
if !exists {
log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid)
continue
}

delete(i.pidsToInstrumentation, pid)
go func() {
err := inst.Close()
if err != nil {
log.Logger.Error(err, "error cleaning up objects for process", "pid", pid)
}
}()
}
}

func (i *InstrumentationDirector) Shutdown() {
log.Logger.V(0).Info("Shutting down instrumentation director")
for details := range i.podDetailsToPids {
i.Cleanup(details)
}
}
99 changes: 99 additions & 0 deletions odiglet/pkg/ebpf/go.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package ebpf

import (
"context"
"fmt"
"os"
"sync"

"github.com/keyval-dev/odigos/common"
"github.com/keyval-dev/odigos/odiglet/pkg/env"
"github.com/keyval-dev/odigos/odiglet/pkg/instrumentation/consts"
"github.com/keyval-dev/odigos/odiglet/pkg/log"
"go.opentelemetry.io/auto"
"k8s.io/apimachinery/pkg/types"
)

type InstrumentationDirectorGo struct {
mux sync.Mutex
pidsToInstrumentation map[int]*auto.Instrumentation
podDetailsToPids map[types.NamespacedName][]int
}

func NewInstrumentationDirectorGo() (Director, error) {
err := os.Setenv("OTEL_EXPORTER_OTLP_ENDPOINT", fmt.Sprintf("http://%s:%d", env.Current.NodeIP, consts.OTLPPort))
if err != nil {
return nil, err
}

return &InstrumentationDirectorGo{
pidsToInstrumentation: make(map[int]*auto.Instrumentation),
podDetailsToPids: make(map[types.NamespacedName][]int),
}, nil
}

func (i *InstrumentationDirectorGo) Language() common.ProgrammingLanguage {
return common.GoProgrammingLanguage
}

func (i *InstrumentationDirectorGo) Instrument(pid int, podDetails types.NamespacedName, appName string) error {
log.Logger.V(0).Info("Instrumenting process", "pid", pid)
i.mux.Lock()
defer i.mux.Unlock()
if _, exists := i.pidsToInstrumentation[pid]; exists {
log.Logger.V(5).Info("Process already instrumented", "pid", pid)
return ErrProcInstrumented
}

go func() {
inst, err := auto.NewInstrumentation(auto.WithPID(pid), auto.WithServiceName(appName))
if err != nil {
log.Logger.Error(err, "instrumentation setup failed")
return
}

i.pidsToInstrumentation[pid] = inst
i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid)

if err := inst.Run(context.Background()); err != nil {
log.Logger.Error(err, "instrumentation crashed after running")
}
}()

return nil
}

func (i *InstrumentationDirectorGo) Cleanup(podDetails types.NamespacedName) {
i.mux.Lock()
defer i.mux.Unlock()
pids, exists := i.podDetailsToPids[podDetails]
if !exists {
log.Logger.V(5).Info("No processes to cleanup for pod", "pod", podDetails)
return
}

log.Logger.V(0).Info("Cleaning up instrumentation for pod", "pod", podDetails)
delete(i.podDetailsToPids, podDetails)
for _, pid := range pids {
inst, exists := i.pidsToInstrumentation[pid]
if !exists {
log.Logger.V(5).Info("No objects to cleanup for process", "pid", pid)
continue
}

delete(i.pidsToInstrumentation, pid)
go func() {
err := inst.Close()
if err != nil {
log.Logger.Error(err, "error cleaning up objects for process", "pid", pid)
}
}()
}
}

func (i *InstrumentationDirectorGo) Shutdown() {
log.Logger.V(0).Info("Shutting down instrumentation director")
for details := range i.podDetailsToPids {
i.Cleanup(details)
}
}
9 changes: 5 additions & 4 deletions odiglet/pkg/kube/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"

odigosv1 "github.com/keyval-dev/odigos/api/odigos/v1alpha1"
"github.com/keyval-dev/odigos/common"
"github.com/keyval-dev/odigos/common/consts"
"github.com/keyval-dev/odigos/odiglet/pkg/ebpf"
"github.com/keyval-dev/odigos/odiglet/pkg/log"
Expand All @@ -32,7 +33,7 @@ func init() {
utilruntime.Must(odigosv1.AddToScheme(scheme))
}

func StartReconciling(ebpfDirector ebpf.Director) (context.Context, error) {
func StartReconciling(ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director) (context.Context, error) {
log.Logger.V(0).Info("Starting reconcileres")
ctrl.SetLogger(log.Logger)
mgr, err := manager.New(config.GetConfigOrDie(), manager.Options{
Expand Down Expand Up @@ -100,9 +101,9 @@ func StartReconciling(ebpfDirector ebpf.Director) (context.Context, error) {
ControllerManagedBy(mgr).
For(&corev1.Pod{}).
Complete(&PodsReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Director: ebpfDirector,
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Directors: ebpfDirectors,
})
if err != nil {
return nil, err
Expand Down
Loading