Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix nil check for eBPF instrumentation #1576

Merged
merged 2 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions odiglet/pkg/ebpf/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"sync"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -175,6 +176,11 @@ var IsProcessExists = func(pid int) bool {
return false
}

// Since OtelEbpfSdk is a generic type, we can't simply check it is nil with inst == nil
func isNil[T OtelEbpfSdk](inst T) bool {
return reflect.ValueOf(&inst).Elem().IsZero()
}

func (d *EbpfDirector[T]) periodicCleanup(ctx context.Context) {
ticker := time.NewTicker(CleanupInterval)
defer ticker.Stop()
Expand All @@ -189,7 +195,10 @@ func (d *EbpfDirector[T]) periodicCleanup(ctx context.Context) {
newInstrumentedProcesses := make([]*InstrumentedProcess[T], 0, len(details.InstrumentedProcesses))
for i := range details.InstrumentedProcesses {
ip := details.InstrumentedProcesses[i]
if !IsProcessExists(ip.PID) && any(ip.inst) != nil {
// if the process does not exist, we should make sure we clean the instrumentation resources.
// Also making sure the instrumentation itself is not nil to avoid closing it here.
// This can happen if the process exits while the instrumentation is initializing.
if !IsProcessExists(ip.PID) && !isNil(ip.inst) {
log.Logger.V(0).Info("Instrumented process does not exist, cleaning up", "pid", ip.PID)
d.cleanProcess(ctx, pod, ip)
} else {
Expand Down Expand Up @@ -428,7 +437,7 @@ func (d *EbpfDirector[T]) GetWorkloadInstrumentations(workload *workload.PodWork
}

for _, ip := range details.InstrumentedProcesses {
if any(ip.inst) != nil {
if !isNil(ip.inst) {
insts = append(insts, ip.inst)
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package test
package ebpf

import (
"context"
Expand All @@ -11,7 +11,6 @@ import (
"github.com/odigos-io/odigos/common"
"github.com/odigos-io/odigos/k8sutils/pkg/instrumentation_instance"
"github.com/odigos-io/odigos/k8sutils/pkg/workload"
"github.com/odigos-io/odigos/odiglet/pkg/ebpf"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -32,7 +31,7 @@ type FakeEbpfSdk struct {
}

// compile-time check that FakeEbpfSdk implements ConfigurableOtelEbpfSdk
var _ ebpf.ConfigurableOtelEbpfSdk = (*FakeEbpfSdk)(nil)
var _ ConfigurableOtelEbpfSdk = (*FakeEbpfSdk)(nil)

func (f *FakeEbpfSdk) ApplyConfig(ctx context.Context, config *odigosv1.InstrumentationConfig) error {
return nil
Expand Down Expand Up @@ -63,24 +62,27 @@ func (f *FakeEbpfSdk) Run(ctx context.Context) error {
}

type FakeInstrumentationFactory struct {
timeToSetup time.Duration
kubeclient client.Client
}

func NewFakeInstrumentationFactory(kubeclient client.Client) ebpf.InstrumentationFactory[*FakeEbpfSdk] {
func NewFakeInstrumentationFactory(kubeclient client.Client, setupDuration time.Duration) InstrumentationFactory[*FakeEbpfSdk] {
return &FakeInstrumentationFactory{
kubeclient: kubeclient,
timeToSetup: setupDuration,
}
}

func (f *FakeInstrumentationFactory) CreateEbpfInstrumentation(ctx context.Context, pid int, serviceName string, podWorkload *workload.PodWorkload, containerName string, podName string, loadedIndicator chan struct{}) (*FakeEbpfSdk, error) {
<-time.After(f.timeToSetup)
return &FakeEbpfSdk{
loadedIndicator: loadedIndicator,
pid: pid,
}, nil
}

func newFakeDirector(ctx context.Context, client client.Client) ebpf.Director {
dir := ebpf.NewEbpfDirector(ctx, client, client.Scheme(), common.GoProgrammingLanguage, NewFakeInstrumentationFactory(client))
func newFakeDirector(ctx context.Context, client client.Client, setupDuration time.Duration) Director {
dir := NewEbpfDirector(ctx, client, client.Scheme(), common.GoProgrammingLanguage, NewFakeInstrumentationFactory(client, setupDuration))
return dir
}

Expand Down Expand Up @@ -113,7 +115,7 @@ func assertHealthyInstrumentationInstance(t *testing.T, client client.Client, po
return assert.False(t, *instance.Status.Healthy)
}

func assertInstrumentationInstanceDeleted(t *testing.T, client client.Client, pod types.NamespacedName, pid int) bool {
func assertInstrumentationInstanceNotExisting(t *testing.T, client client.Client, pod types.NamespacedName, pid int) bool {
// instrumentation instance is deleted
return assert.Eventually(t, func() bool {
return getInstrumentationInstance(client, pod, pid) == nil
Expand Down Expand Up @@ -145,15 +147,15 @@ func TestSingleInstrumentation(t *testing.T) {
WithRuntimeObjects(&pod).
Build()

origIsProcessExists := ebpf.IsProcessExists
ebpf.IsProcessExists = func(pid int) bool {
origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists })
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk])
dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk])
err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container")
assert.NoError(t, err)

Expand All @@ -171,7 +173,7 @@ func TestSingleInstrumentation(t *testing.T) {
// cleanup
dir.Cleanup(pod_id)
// the instrumentation instance is deleted
if !assertInstrumentationInstanceDeleted(t, client, pod_id, 1) {
if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) {
t.FailNow()
}

Expand Down Expand Up @@ -210,18 +212,18 @@ func TestInstrumentNotExistingProcess(t *testing.T) {
WithRuntimeObjects(&pod).
Build()

origIsProcessExists := ebpf.IsProcessExists
ebpf.IsProcessExists = func(pid int) bool {
origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists })
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

// setup the cleanup interval to be very short for the test to be responsive
origCleanupInterval := ebpf.CleanupInterval
ebpf.CleanupInterval = 10 * time.Millisecond
t.Cleanup(func() { ebpf.CleanupInterval = origCleanupInterval })
origCleanupInterval := CleanupInterval
CleanupInterval = 10 * time.Millisecond
t.Cleanup(func() { CleanupInterval = origCleanupInterval })

dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk])
dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk])
err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container")
assert.NoError(t, err)

Expand All @@ -238,11 +240,11 @@ func TestInstrumentNotExistingProcess(t *testing.T) {
assert.False(t, inst.closed)

// "kill" the process
ebpf.IsProcessExists = func(pid int) bool {
IsProcessExists = func(pid int) bool {
return false
}
// the instrumentation instance is deleted
if !assertInstrumentationInstanceDeleted(t, client, pod_id, 1) {
if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) {
t.FailNow()
}
// the director stopped tracking the instrumentation
Expand All @@ -254,6 +256,67 @@ func TestInstrumentNotExistingProcess(t *testing.T) {
assert.True(t, inst.closed)
}

func TestInstrumentNotExistingProcessWithSlowInstrumentation(t *testing.T) {
ctx := context.Background()
scheme := runtime.NewScheme()
corev1.AddToScheme(scheme)
odigosv1.AddToScheme(scheme)

workload := &workload.PodWorkload{
Name: "test-workload",
Namespace: "default",
Kind: "Deployment",
}
pod_id := types.NamespacedName{Name: "test", Namespace: "default"}
pod := corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod_id.Name,
Namespace: pod_id.Namespace,
},
}

client := fake.
NewClientBuilder().
WithScheme(scheme).
WithStatusSubresource(&odigosv1.InstrumentationInstance{}).
WithRuntimeObjects(&pod).
Build()

origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

// setup the cleanup interval to be very short for the test to be responsive
origCleanupInterval := CleanupInterval
CleanupInterval = 10 * time.Millisecond
t.Cleanup(func() { CleanupInterval = origCleanupInterval })

dir := newFakeDirector(ctx, client, time.Second).(*EbpfDirector[*FakeEbpfSdk])
err := dir.Instrument(ctx, 1, pod_id, workload, "test-app", "test-container")
assert.NoError(t, err)

<-time.After(100 * time.Millisecond)
// "kill" the process while the instrumentation is still setting up
IsProcessExists = func(pid int) bool {
return false
}

// wait for the instrumentation to initialize
<-time.After(1 * time.Second)

// the instrumentation instance is not existing
if !assertInstrumentationInstanceNotExisting(t, client, pod_id, 1) {
t.FailNow()
}
// the director stopped tracking the instrumentation
insts := dir.GetWorkloadInstrumentations(workload)
if !assert.Len(t, insts, 0) {
t.FailNow()
}
}

func TestMultiplePodsInstrumentation(t *testing.T) {
ctx := context.Background()
scheme := runtime.NewScheme()
Expand Down Expand Up @@ -287,13 +350,13 @@ func TestMultiplePodsInstrumentation(t *testing.T) {
WithLists(&podList).
Build()

origIsProcessExists := ebpf.IsProcessExists
ebpf.IsProcessExists = func(pid int) bool {
origIsProcessExists := IsProcessExists
IsProcessExists = func(pid int) bool {
return true
}
t.Cleanup(func() { ebpf.IsProcessExists = origIsProcessExists })
t.Cleanup(func() { IsProcessExists = origIsProcessExists })

dir := newFakeDirector(ctx, client).(*ebpf.EbpfDirector[*FakeEbpfSdk])
dir := newFakeDirector(ctx, client, time.Millisecond).(*EbpfDirector[*FakeEbpfSdk])
for i := 0; i < numOfPods; i++ {
err := dir.Instrument(ctx, i+1, pod_ids[i], workload, "test-app", "test-container")
assert.NoError(t, err)
Expand Down Expand Up @@ -327,7 +390,7 @@ func TestMultiplePodsInstrumentation(t *testing.T) {

// the instrumentation instances are deleted
for i := 0; i < numOfPods - 1; i++ {
if !assertInstrumentationInstanceDeleted(t, client, pod_ids[i], i+1) {
if !assertInstrumentationInstanceNotExisting(t, client, pod_ids[i], i+1) {
t.FailNow()
}
}
Expand All @@ -354,3 +417,14 @@ func TestMultiplePodsInstrumentation(t *testing.T) {
// The last instrumentation is the one returned
assert.Equal(t, insts[0].pid, numOfPods)
}

func TestIsNil(t *testing.T) {
var e OtelEbpfSdk
assert.True(t, isNil(e))

e = &FakeEbpfSdk{}
assert.False(t, isNil(e))

var e2 *FakeEbpfSdk
assert.True(t, isNil(e2))
}
Loading