Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(odiglet): support workloads for ebpf director #786

Merged
merged 25 commits into from
Nov 28, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0dadd62
feat(odiglet): support workloads for ebpf director
blumamir Nov 22, 2023
f35cc35
docs: instrumentor README
blumamir Nov 23, 2023
0eadb56
fix(odiglet): nil workload map
blumamir Nov 23, 2023
de5caa9
fix: initialize director workload map
blumamir Nov 23, 2023
3b6ad77
feat: more diagnostic in log print for instrumentation failed
blumamir Nov 25, 2023
a2ef14a
chore: logging before ebpf run
blumamir Nov 25, 2023
9fe974f
chore: more log info for ebpf
blumamir Nov 25, 2023
f232644
fix: use owner reference for workload fields
blumamir Nov 25, 2023
d1905e4
Merge remote-tracking branch 'upstream/main' into director-workload
blumamir Nov 26, 2023
57c5e4c
fix: instrumentation configs crd via cli
blumamir Nov 26, 2023
817059e
Merge remote-tracking branch 'upstream/main' into director-workload
blumamir Nov 26, 2023
e31faea
feat: temp debugging
blumamir Nov 27, 2023
7bc39c6
feat: more debug prints
blumamir Nov 27, 2023
d95d684
chore: temp debugging
blumamir Nov 27, 2023
1542636
chore: more debug prints
blumamir Nov 27, 2023
c90b5cf
chore: more debug printing
blumamir Nov 27, 2023
ed8bab8
fix: unused var
blumamir Nov 27, 2023
8e16e46
fix: update map after modifying copy
blumamir Nov 27, 2023
1ff283a
chore: remove temp prints
blumamir Nov 27, 2023
7dadca2
chore: some debug prints
blumamir Nov 27, 2023
28bcbf6
fix: remove log prints
blumamir Nov 27, 2023
c480997
feat: add span kind to crd
blumamir Nov 27, 2023
3ac324c
feat: add span kind to common
blumamir Nov 27, 2023
b661e75
fix: add missing file
blumamir Nov 27, 2023
4785781
Merge branch 'main' into director-workload
blumamir Nov 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions api/config/crd/bases/odigos.io_instrumentationconfigs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ spec:
description: OptionValueBoolean is the boolean value of the
option if it is a boolean
type: boolean
spanKind:
description: This option allow to specify the config option
for a specific span kind for example, only to client spans
or only to server spans. it the span kind is not specified,
the option will apply to all spans.
enum:
- client
- server
- producer
- consumer
- internal
type: string
required:
- instrumentationLibraries
- optionKey
Expand Down
5 changes: 5 additions & 0 deletions api/odigos/v1alpha1/instrumentatioconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type WorkloadInstrumentationConfig struct {
// This value is transparent to the CRD and is passed as-is to the SDK.
OptionKey string `json:"optionKey"`

// This option allow to specify the config option for a specific span kind
// for example, only to client spans or only to server spans.
// it the span kind is not specified, the option will apply to all spans.
SpanKind common.SpanKind `json:"spanKind,omitempty"`

// OptionValueBoolean is the boolean value of the option if it is a boolean
OptionValueBoolean bool `json:"optionValueBoolean,omitempty"`

Expand Down
68 changes: 28 additions & 40 deletions cli/cmd/resources/crds/instrumentationconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,63 +31,51 @@ func NewInstrumentationConfig() *apiextensionsv1.CustomResourceDefinition {
Schema: &apiextensionsv1.CustomResourceValidation{
OpenAPIV3Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Description: "InstrumentationConfig is the Schema for the instrumentation config API",
Description: "InstrumentationConfig is the Schema for the instrumentationconfig API",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"apiVersion": {Type: "string"},
"kind": {Type: "string"},
"metadata": {Type: "object"},
"spec": {
Type: "object",
Description: "InstrumentationConfigSpec defines the desired state of InstrumentationConfig",
Description: "Config for the OpenTelemetry SDKs that should be applied to a workload. The workload is identified by the owner reference",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"name": {
Type: "string",
},
"optionKey": {
Type: "string",
},
"optionValueBoolean": {
Type: "boolean",
},
"workloads": {
Type: "array",
Items: &apiextensionsv1.JSONSchemaPropsOrArray{
Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"namespace": {Type: "string"},
"kind": {Type: "string"},
"name": {Type: "string"},
},
},
},
},
"instrumentationLibraries": {
Type: "array",
Items: &apiextensionsv1.JSONSchemaPropsOrArray{
Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"language": {Type: "string"},
"instrumentationLibraryName": {Type: "string"},
},
},
},
},
"filters": {
"config": {
Type: "array",
Items: &apiextensionsv1.JSONSchemaPropsOrArray{
Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"key": {Type: "string"},
"matchType": {Type: "string"},
"matchValue": {Type: "string"},
"instrumentationLibraries": {
Type: "array",
Items: &apiextensionsv1.JSONSchemaPropsOrArray{
Schema: &apiextensionsv1.JSONSchemaProps{
Type: "object",
Properties: map[string]apiextensionsv1.JSONSchemaProps{
"instrumentationLibraryName": {Type: "string"},
"language": {Type: "string"},
},
},
},
},
"optionKey": {Type: "string"},
"spanKind": {
Type: "string",
Enum: []apiextensionsv1.JSON{
{Raw: []byte(`"client"`)},
{Raw: []byte(`"server"`)},
{Raw: []byte(`"producer"`)},
{Raw: []byte(`"consumer"`)},
{Raw: []byte(`"internal"`)},
},
},
"optionValueBoolean": {Type: "boolean"},
},
},
},
},
},
Required: []string{"config"},
},
"status": {Type: "object"},
},
Expand Down
2 changes: 1 addition & 1 deletion common/lang_detection.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type LanguageByContainer struct {
ProcessName string `json:"processName,omitempty"`
}

//+kubebuilder:validation:Enum=java;python;go;dotnet;javascript
// +kubebuilder:validation:Enum=java;python;go;dotnet;javascript
type ProgrammingLanguage string

const (
Expand Down
33 changes: 33 additions & 0 deletions common/spankind.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package common

import "go.opentelemetry.io/otel/trace"

// SpanKind is already defined in opentelemetry-go as int.
// this value can go into the CRD in which case it will be string for user convenience.
// +kubebuilder:validation:Enum=client;server;producer;consumer;internal
type SpanKind string

const (
ClientSpanKind SpanKind = "client"
ServerSpanKind SpanKind = "server"
ProducerSpanKind SpanKind = "producer"
ConsumerSpanKind SpanKind = "consumer"
InternalSpanKind SpanKind = "internal"
)

func SpanKindOdigosToOtel(kind SpanKind) trace.SpanKind {
switch kind {
case ClientSpanKind:
return trace.SpanKindClient
case ServerSpanKind:
return trace.SpanKindServer
case ProducerSpanKind:
return trace.SpanKindProducer
case ConsumerSpanKind:
return trace.SpanKindConsumer
case InternalSpanKind:
return trace.SpanKindInternal
default:
return trace.SpanKindUnspecified
}
}
30 changes: 30 additions & 0 deletions instrumentor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Instrumentor

The role of this component is to mark pods and workloads for instrumentation by the
odiglet daemonset on each pod's node.

Instrumentation can be done in 2 ways:
1. native SDK - via device manager attached to each pod
2. eBPF SDK - via annotation on the workload object (deployment, daemonset, statefulset).

Instrumentation cue is given on objects if they fulfill the following criteria:
1. The workload object is annotated with `odigos.io/instrument: "true"` or it is not annotated and the namespace is annotated with `odigos.io/instrument: "true"`.
2. There is a runtime details object, set prior by an odiglet that inspected a living pod runtime. The runtime details are crucial for programming language information which is used to determine the correct SDK to use for instrumentation.
3. The collectors are ready to receive telemetry. This is set by the `scheduler` controller.

## Development

To run instrumentor from code:

1. Make sure you have a running k8s cluster with a compatible version of odigos installed.

2. Disable the instrumentor deployment in the cluster:
```sh
$ kubectl scale deployment odigos-instrumentor --replicas=0 -n odigos-system
```

3. Run the instrumentor from code:
```sh
$ go run .
```

90 changes: 74 additions & 16 deletions odiglet/pkg/ebpf/go.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ import (
"k8s.io/apimachinery/pkg/types"
)

type podDetails struct {
Workload *common.PodWorkload
Pids []int
}

type InstrumentationDirectorGo struct {
mux sync.Mutex

Expand All @@ -29,32 +34,54 @@ type InstrumentationDirectorGo struct {
// so we can avoid attempting to instrument them again.
pidsAttemptedInstrumentation map[int]struct{}

podDetailsToPids map[types.NamespacedName][]int
// via this map, we can find the workload and pids for a specific pod.
// sometimes we only have the pod name and namespace, so this map is useful.
podsToDetails map[types.NamespacedName]podDetails

// this map can be used when we only have the workload, and need to find the pods to derive pids.
workloadToPods map[common.PodWorkload]map[types.NamespacedName]struct{}
}

func NewInstrumentationDirectorGo() (Director, error) {
return &InstrumentationDirectorGo{
pidsToInstrumentation: make(map[int]*auto.Instrumentation),
pidsAttemptedInstrumentation: make(map[int]struct{}),
podDetailsToPids: make(map[types.NamespacedName][]int),
podsToDetails: make(map[types.NamespacedName]podDetails),
workloadToPods: make(map[common.PodWorkload]map[types.NamespacedName]struct{}),
}, nil
}

func (i *InstrumentationDirectorGo) Language() common.ProgrammingLanguage {
return common.GoProgrammingLanguage
}

func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, podDetails types.NamespacedName, podWorkload *common.PodWorkload, appName string) error {
log.Logger.V(0).Info("Instrumenting process", "pid", pid)
func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod types.NamespacedName, podWorkload *common.PodWorkload, appName string) error {
log.Logger.V(0).Info("Instrumenting process", "pid", pid, "workload", podWorkload)
i.mux.Lock()
defer i.mux.Unlock()
if _, exists := i.pidsAttemptedInstrumentation[pid]; exists {
log.Logger.V(5).Info("Process already instrumented", "pid", pid)
return nil
}
i.podDetailsToPids[podDetails] = append(i.podDetailsToPids[podDetails], pid)

details, exists := i.podsToDetails[pod]
if !exists {
details = podDetails{
Workload: podWorkload,
Pids: []int{},
}
i.podsToDetails[pod] = details
}
details.Pids = append(details.Pids, pid)
i.podsToDetails[pod] = details

i.pidsAttemptedInstrumentation[pid] = struct{}{}

if _, exists := i.workloadToPods[*podWorkload]; !exists {
i.workloadToPods[*podWorkload] = make(map[types.NamespacedName]struct{})
}
i.workloadToPods[*podWorkload][pod] = struct{}{}

defaultExporter, err := otlptracegrpc.New(
ctx,
otlptracegrpc.WithInsecure(),
Expand All @@ -74,7 +101,7 @@ func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod
auto.WithTraceExporter(defaultExporter),
)
if err != nil {
log.Logger.Error(err, "instrumentation setup failed")
log.Logger.Error(err, "instrumentation setup failed", "workload", podWorkload, "pod", pod)
return
}

Expand All @@ -94,6 +121,8 @@ func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod
return
}

log.Logger.V(0).Info("Running ebpf go instrumentation", "workload", podWorkload, "pod", pod)

if err := inst.Run(context.Background()); err != nil {
log.Logger.Error(err, "instrumentation crashed after running")
}
Expand All @@ -102,19 +131,26 @@ func (i *InstrumentationDirectorGo) Instrument(ctx context.Context, pid int, pod
return nil
}

func (i *InstrumentationDirectorGo) Cleanup(podDetails types.NamespacedName) {
func (i *InstrumentationDirectorGo) Cleanup(pod types.NamespacedName) {
i.mux.Lock()
defer i.mux.Unlock()
pids, exists := i.podDetailsToPids[podDetails]
details, exists := i.podsToDetails[pod]
if !exists {
log.Logger.V(5).Info("No processes to cleanup for pod", "pod", podDetails)
log.Logger.V(5).Info("No processes to cleanup for pod", "pod", pod)
return
}

log.Logger.V(0).Info("Cleaning up ebpf go instrumentation for pod", "pod", podDetails)
delete(i.podDetailsToPids, podDetails)
log.Logger.V(0).Info("Cleaning up ebpf go instrumentation for pod", "pod", pod)
delete(i.podsToDetails, pod)

// clear the pod from the workloadToPods map
workload := details.Workload
delete(i.workloadToPods[*workload], pod)
if len(i.workloadToPods[*workload]) == 0 {
delete(i.workloadToPods, *workload)
}

for _, pid := range pids {
for _, pid := range details.Pids {
delete(i.pidsAttemptedInstrumentation, pid)

inst, exists := i.pidsToInstrumentation[pid]
Expand All @@ -135,14 +171,36 @@ func (i *InstrumentationDirectorGo) Cleanup(podDetails types.NamespacedName) {

func (i *InstrumentationDirectorGo) Shutdown() {
log.Logger.V(0).Info("Shutting down instrumentation director")
for details := range i.podDetailsToPids {
for details := range i.podsToDetails {
i.Cleanup(details)
}
}

func (i *InstrumentationDirectorGo) GetInstrumentation(pid int) (*auto.Instrumentation, bool) {
func (i *InstrumentationDirectorGo) GetWorkloadInstrumentations(workload common.PodWorkload) []*auto.Instrumentation {
i.mux.Lock()
defer i.mux.Unlock()
inst, ok := i.pidsToInstrumentation[pid]
return inst, ok

pods, ok := i.workloadToPods[workload]
if !ok {
return nil
}

var insts []*auto.Instrumentation
for pod := range pods {
details, ok := i.podsToDetails[pod]
if !ok {
continue
}

for _, pid := range details.Pids {
inst, ok := i.pidsToInstrumentation[pid]
if !ok {
continue
}

insts = append(insts, inst)
}
}

return insts
}
2 changes: 1 addition & 1 deletion odiglet/pkg/kube/instrumentation_ebpf/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (i *workloadPredicate) Delete(e event.DeleteEvent) bool {
}

func (i *workloadPredicate) Generic(e event.GenericEvent) bool {
return false
return true
blumamir marked this conversation as resolved.
Show resolved Hide resolved
}

func SetupWithManager(mgr ctrl.Manager, ebpfDirectors map[common.ProgrammingLanguage]ebpf.Director) error {
Expand Down
4 changes: 2 additions & 2 deletions odiglet/pkg/kube/instrumentation_ebpf/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func instrumentPodWithEbpf(ctx context.Context, pod *corev1.Pod, directors map[c
}

appName := container.ContainerName
if len(runtimeDetails.Spec.Languages) == 1 && len(runtimeDetails.OwnerReferences) > 0 {
if len(runtimeDetails.Spec.Languages) == 1 {
appName = runtimeDetails.OwnerReferences[0].Name
}

Expand All @@ -103,7 +103,7 @@ func instrumentPodWithEbpf(ctx context.Context, pod *corev1.Pod, directors map[c
err = director.Instrument(ctx, d.ProcessID, podDetails, podWorkload, appName)

if err != nil {
logger.Error(err, "error instrumenting process", "pid", d.ProcessID)
logger.Error(err, "error initiating process instrumentation", "pid", d.ProcessID)
return err
}
}
Expand Down