From 6761c2f23daa9e83a3bcd5f91574f6c0d65bcafc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Guillermo=20Juli=C3=A1n?= Date: Wed, 17 Jan 2024 14:37:46 -0500 Subject: [PATCH] [EBPF-357] Add GenericMap supporting batch lookup (#21738) [EBPF-357] Add GenericMap supporting batch lookup Co-authored-by: brycekahle --- go.mod | 2 +- go.sum | 8 +- pkg/ebpf/generic_map.go | 341 +++++++++++++++++++++++ pkg/ebpf/generic_map_test.go | 520 +++++++++++++++++++++++++++++++++++ pkg/ebpf/map_cleaner.go | 4 +- 5 files changed, 868 insertions(+), 7 deletions(-) create mode 100644 pkg/ebpf/generic_map.go create mode 100644 pkg/ebpf/generic_map_test.go diff --git a/go.mod b/go.mod index 45d112016c83a..db3d000c29d70 100644 --- a/go.mod +++ b/go.mod @@ -126,7 +126,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/cenkalti/backoff/v4 v4.2.1 github.com/cihub/seelog v0.0.0-20170130134532-f561c5e57575 - github.com/cilium/ebpf v0.12.3 + github.com/cilium/ebpf v0.12.4-0.20231215144645-5ab77468412f github.com/clbanning/mxj v1.8.4 github.com/containerd/containerd v1.7.11 github.com/containernetworking/cni v1.1.2 diff --git a/go.sum b/go.sum index aa8c5cba2d8e6..4b3d3951e8782 100644 --- a/go.sum +++ b/go.sum @@ -438,8 +438,8 @@ github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMn github.com/cihub/seelog v0.0.0-20151216151435-d2c6e5aa9fbf h1:XI2tOTCBqEnMyN2j1yPBI07yQHeywUSCEf8YWqf0oKw= github.com/cihub/seelog v0.0.0-20151216151435-d2c6e5aa9fbf/go.mod h1:9d6lWj8KzO/fd/NrVaLscBKmPigpZpn5YawRPw+e3Yo= github.com/cilium/ebpf v0.7.0/go.mod h1:/oI2+1shJiTGAMgl6/RgJr36Eo1jzrRcAWbcXO2usCA= -github.com/cilium/ebpf v0.12.3 h1:8ht6F9MquybnY97at+VDZb3eQQr8ev79RueWeVaEcG4= -github.com/cilium/ebpf v0.12.3/go.mod h1:TctK1ivibvI3znr66ljgi4hqOT8EYQjz1KWBfb1UVgM= +github.com/cilium/ebpf v0.12.4-0.20231215144645-5ab77468412f h1:GeKfnUyhxHPIoVJNrot/mU0lf9TWsLf2YnaTT4Tfg/M= +github.com/cilium/ebpf v0.12.4-0.20231215144645-5ab77468412f/go.mod h1:9BszLnmZR7oucpa/kBbVVf1ts3BoUSpltcnNp1hQkVw= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= github.com/clbanning/mxj v1.8.4 h1:HuhwZtbyvyOw+3Z1AowPkU87JkJUSv751ELWaiTpj8I= @@ -600,8 +600,6 @@ github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHqu github.com/foxcpp/go-mockdns v1.0.0 h1:7jBqxd3WDWwi/6WhDvacvH1XsN3rOLXyHM1uhvIx6FI= github.com/foxcpp/go-mockdns v1.0.0/go.mod h1:lgRN6+KxQBawyIghpnl5CezHFGS9VLzvtVlwxvzXTQ4= github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= -github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= -github.com/frankban/quicktest v1.14.5/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/freddierice/go-losetup v0.0.0-20170407175016-fc9adea44124 h1:TVfi5xMshZAXzVXozESk8bi0JSTPwHkx7qtLOkkcu/c= github.com/freddierice/go-losetup v0.0.0-20170407175016-fc9adea44124/go.mod h1:zAk7fcFx45euzK9Az14j6Hd9n8Cwhnjp/NBfhSIAmFg= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= @@ -710,6 +708,8 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+ github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl+lu/H90nyDXpg0fqeB/AQUGNTVA= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= github.com/go-playground/validator/v10 v10.10.1/go.mod h1:i+3WkQ1FvaUjjxh1kSvIA4dMGDBiPU55YFDl0WbKdWU= +github.com/go-quicktest/qt v1.101.0 h1:O1K29Txy5P2OK0dGo59b7b0LR6wKfIhttaAhHUyn7eI= +github.com/go-quicktest/qt v1.101.0/go.mod h1:14Bz/f7NwaXPtdYEgzsx46kqSxVwTbzVZsDC26tQJow= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= github.com/go-redis/redis/v9 v9.0.0-rc.2 h1:IN1eI8AvJJeWHjMW/hlFAv2sAfvTun2DVksDDJ3a6a0= diff --git a/pkg/ebpf/generic_map.go b/pkg/ebpf/generic_map.go new file mode 100644 index 0000000000000..2e97f4a746d58 --- /dev/null +++ b/pkg/ebpf/generic_map.go @@ -0,0 +1,341 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux_bpf + +package ebpf + +import ( + "errors" + "fmt" + "reflect" + "unsafe" + + manager "github.com/DataDog/ebpf-manager" + "github.com/cilium/ebpf" + + "github.com/DataDog/datadog-agent/pkg/util/funcs" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +const defaultBatchSize = 100 + +// BatchAPISupported returns true if the kernel supports the batch API for maps +var BatchAPISupported = funcs.MemoizeNoError(func() bool { + // Do feature detection directly instead of based on kernel versions for more accuracy + m, err := ebpf.NewMap(&ebpf.MapSpec{ + Type: ebpf.Hash, + KeySize: 4, + ValueSize: 4, + MaxEntries: 10, + }) + if err != nil { + log.Warnf("Failed to create map for batch API test: %v, will mark batch API as unsupported", err) + return false + } + defer m.Close() + + keys := make([]uint32, 1) + values := make([]uint32, 1) + + // Do a batch update, check the result. + // We do an update instead of a lookup because it's more reliable for detection + _, err = m.BatchUpdate(keys, values, &ebpf.BatchOptions{ElemFlags: uint64(ebpf.UpdateAny)}) + if err != nil && !errors.Is(err, ebpf.ErrNotSupported) { + log.Warnf("Unexpected error while testing batch API support: %v", err) + } + return err == nil +}) + +// GenericMap is a wrapper around ebpf.Map that allows to use generic types. +// Also includes support for batch iterations +type GenericMap[K any, V any] struct { + m *ebpf.Map +} + +// NewGenericMap creates a new GenericMap with the given spec. Key and Value sizes are automatically +// inferred from the types of K and V. +// Important: if the map is a per-cpu map, V must be a slice type +func NewGenericMap[K any, V any](spec *ebpf.MapSpec) (*GenericMap[K, V], error) { + // Automatic inference of sizes. We assume that K/V are simple types that + // can be instantiated with no arguments + var kval K + var vval V + + err := validateValueTypeForMapType[V](spec.Type) + if err != nil { + return nil, err + } + + spec.KeySize = uint32(unsafe.Sizeof(kval)) + + if isPerCPU(spec.Type) { + spec.ValueSize = uint32(reflect.TypeOf(vval).Elem().Size()) + } else { + spec.ValueSize = uint32(unsafe.Sizeof(vval)) + } + + m, err := ebpf.NewMap(spec) + if err != nil { + return nil, err + } + + return &GenericMap[K, V]{ + m: m, + }, nil +} + +// validateValueTypeForMapType checks that the type of values (V) is valid for the given map type, returning +// an error if it's not valid. +// +// For now it ensures that per-cpu maps use a slice type for the value. +// Separate function to allow using it in the different constructors/converters +func validateValueTypeForMapType[V any](t ebpf.MapType) error { + var vval V + if isPerCPU(t) && reflect.TypeOf(vval).Kind() != reflect.Slice { + return fmt.Errorf("per-cpu maps require a slice type for the value, instead got %T", vval) + } + return nil +} + +// Map creates a new GenericMap from an existing ebpf.Map +func Map[K any, V any](m *ebpf.Map) (*GenericMap[K, V], error) { + if err := validateValueTypeForMapType[V](m.Type()); err != nil { + return nil, err + } + + return &GenericMap[K, V]{ + m: m, + }, nil +} + +// GetMap gets the generic map with the given name from the manager +func GetMap[K any, V any](mgr *manager.Manager, name string) (*GenericMap[K, V], error) { + m, _, err := mgr.GetMap(name) + if err != nil { + return nil, err + } + if m == nil { + return nil, fmt.Errorf("map %q not found", name) + } + gm, err := Map[K, V](m) + if err != nil { + return nil, err + } + return gm, nil +} + +// Map returns the underlying ebpf.Map +func (g *GenericMap[K, V]) Map() *ebpf.Map { + return g.m +} + +// IteratorOptions are options for the Iterate method +type IteratorOptions struct { + BatchSize int // Number of items to fetch per batch. If 0, use default value (100) + ForceSingleItem bool // Force the use of the single item iterator even if the batch API is supported +} + +// Put inserts a new key/value pair in the map. If the key already exists, the value is updated +func (g *GenericMap[K, V]) Put(key *K, value *V) error { + if g.isPerCPU() { + return g.m.Put(unsafe.Pointer(key), *value) + } + + return g.m.Put(unsafe.Pointer(key), unsafe.Pointer(value)) +} + +// Update updates the value of an existing key in the map. +func (g *GenericMap[K, V]) Update(key *K, value *V, flags ebpf.MapUpdateFlags) error { + return g.m.Update(unsafe.Pointer(key), unsafe.Pointer(value), flags) +} + +// Lookup looks up a key in the map and returns the value. If the key doesn't exist, it returns ErrKeyNotExist +func (g *GenericMap[K, V]) Lookup(key *K, valueOut *V) error { + if g.isPerCPU() { + return g.m.Lookup(unsafe.Pointer(key), *valueOut) + } + + return g.m.Lookup(unsafe.Pointer(key), unsafe.Pointer(valueOut)) +} + +// Delete deletes a key from the map. If the key doesn't exist, it returns ErrKeyNotExist +func (g *GenericMap[K, V]) Delete(key *K) error { + return g.m.Delete(unsafe.Pointer(key)) +} + +// BatchDelete deletes a batch of keys from the map. Returns the number of deleted items +func (g *GenericMap[K, V]) BatchDelete(keys []K) (int, error) { + return g.m.BatchDelete(keys, nil) +} + +// BatchUpdate updates a batch of keys in the map +func (g *GenericMap[K, V]) BatchUpdate(keys []K, values []V, opts *ebpf.BatchOptions) (int, error) { + return g.m.BatchUpdate(keys, values, opts) +} + +// GenericMapIterator is an interface for iterating over a GenericMap +type GenericMapIterator[K any, V any] interface { + // Next fills K and V with the next key/value pair in the map. It returns false if there are no more elements + Next(key *K, value *V) bool + + // Err returns the last error that happened during iteration. + Err() error +} + +func isPerCPU(t ebpf.MapType) bool { + switch t { + case ebpf.PerCPUHash, ebpf.PerCPUArray, ebpf.LRUCPUHash: + return true + } + return false +} + +func (g *GenericMap[K, V]) isPerCPU() bool { + return isPerCPU(g.m.Type()) +} + +// Iterate returns an iterator for the map, which transparently chooses between batch and single item if the +// batch API is not available. Defaults to item-by-item iterator. +func (g *GenericMap[K, V]) Iterate() GenericMapIterator[K, V] { + return g.IterateWithBatchSize(1) // Iterate defaults +} + +func (g *GenericMap[K, V]) valueTypeCanUseUnsafePointer() bool { + // Simple test for now, but we probably will need to add more cases, + // as I am not 100% sure of the behavior of structs with maps + return !g.isPerCPU() // PerCPU maps use slices, so we need to pass them directly +} + +// IterateWithBatchSize returns an iterator for the map, which transparently chooses between batch and single item +// iterations. This version allows choosing the batch size. Setting the batch size to 1 will force the use of the +// single item iterator +func (g *GenericMap[K, V]) IterateWithBatchSize(batchSize int) GenericMapIterator[K, V] { + if batchSize == 0 { + batchSize = defaultBatchSize // Default value for batch sizes. Possibly needs more testing to find an optimal default + } + if batchSize > int(g.m.MaxEntries()) { + batchSize = int(g.m.MaxEntries()) + } + + if BatchAPISupported() && !g.isPerCPU() && batchSize > 1 { + it := &genericMapBatchIterator[K, V]{ + m: g.m, + batchSize: batchSize, + keys: make([]K, batchSize), + values: make([]V, batchSize), + valueTypeCanUseUnsafePointer: g.valueTypeCanUseUnsafePointer(), + } + + return it + } + + return &genericMapItemIterator[K, V]{ + it: g.m.Iterate(), + valueTypeCanUseUnsafePointer: g.valueTypeCanUseUnsafePointer(), + } +} + +// genericMapItemIterator is an iterator for a map that returns a single item at a time +type genericMapItemIterator[K any, V any] struct { + it *ebpf.MapIterator + valueTypeCanUseUnsafePointer bool +} + +// Next fills K and V with the next key/value pair in the map. It returns false if there are no more elements +func (g *genericMapItemIterator[K, V]) Next(key *K, value *V) bool { + // we resort to unsafe.Pointers because by doing so the underlying eBPF + // library avoids marshaling the key/value variables while traversing the map + // However, in some cases (slices, structs) we need to pass the variable directly + // so that the library detects the type correctly + if g.valueTypeCanUseUnsafePointer { + return g.it.Next(unsafe.Pointer(key), unsafe.Pointer(value)) + } + + return g.it.Next(unsafe.Pointer(key), value) +} + +// Err returns the last error that happened during iteration. Should be checked +// after completing the iteration, as it can report issues such as wrong types +// being passed to the Next() method or aborted iterations, which would be perceived +// as empty/partial map iterations. +func (g *genericMapItemIterator[K, V]) Err() error { + return g.it.Err() +} + +// genericMapBatchIterator is an iterator for a map that, under the hood, uses BatchLookup to reduce +// the number of syscalls +type genericMapBatchIterator[K any, V any] struct { + m *ebpf.Map // Map to iterate + batchSize int // Number of items to fetch per batch + cursor ebpf.BatchCursor // Cursor that maintains the state of the iteration + keys []K // Buffer for storing the keys of the current batch + values []V // Buffer for storing the values of the current batch + currentBatchSize int // Number of elements in the current batch, as returned by BatchLookup + inBatchIndex int // Index of the next element to return in the current batch + err error // Last error that happened during iteration + totalCount int // Total number of elements returned so far + lastBatch bool // True if this is the last batch, used to avoid extra calls to BatchLookup + valueTypeCanUseUnsafePointer bool // True if the value type can be passed as an unsafe.Pointer or not. Helps avoid allocations +} + +// Next fills K and V with the next key/value pair in the map. It returns false if there are no more elements +func (g *genericMapBatchIterator[K, V]) Next(key *K, value *V) bool { + // Safety check to avoid an infinite loop + if g.totalCount >= int(g.m.MaxEntries()) { + return false + } + + // We have finished all the values in the current batch (or there wasn't any batch + // to begin with, with g.currentBatchSize == 0), so we need to fetch the next batch + if g.inBatchIndex >= g.currentBatchSize { + if g.lastBatch { + return false + } + + g.currentBatchSize, g.err = g.m.BatchLookup(&g.cursor, g.keys, g.values, nil) + g.inBatchIndex = 0 + if g.err != nil && errors.Is(g.err, ebpf.ErrKeyNotExist) { + // The lookup API returns ErrKeyNotExist when this is the last batch, + // even when partial results are returned. We need to mark this so that + // we don't try to fetch another batch when this one is finished + g.lastBatch = true + + // Also fix the error, because in some instances BatchLookup sets ErrKeyNotExist + // as the error, which is just an indicator that there are no more batches, but it's not + // an actual error. + g.err = nil + } else if g.err != nil { + return false + } + + // After error processing we should check that we actually got a batch + if g.currentBatchSize == 0 { + return false + } + } + + // At this point we know for sure that keys/values are populated with values + // from a previous call to BatchLookup. + *key = g.keys[g.inBatchIndex] + *value = g.values[g.inBatchIndex] + g.inBatchIndex++ + g.totalCount++ + + return true +} + +// Err returns the last error that happened during iteration. Should be checked +// after completing the iteration, as it can report issues such as wrong types +// being passed to the Next() method or aborted iterations, which would be perceived +// as empty/partial map iterations. +func (g *genericMapBatchIterator[K, V]) Err() error { + return g.err +} + +// String returns a string representation of the map. Delegated to the underlying ebpf.Map method +func (g *GenericMap[K, V]) String() string { + return g.m.String() +} diff --git a/pkg/ebpf/generic_map_test.go b/pkg/ebpf/generic_map_test.go new file mode 100644 index 0000000000000..be80f838e2a90 --- /dev/null +++ b/pkg/ebpf/generic_map_test.go @@ -0,0 +1,520 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build linux_bpf + +package ebpf + +import ( + "fmt" + "testing" + + "github.com/cilium/ebpf" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + ebpfkernel "github.com/DataDog/datadog-agent/pkg/security/ebpf/kernel" + "github.com/DataDog/datadog-agent/pkg/util/kernel" +) + +func TestBatchAPISupported(t *testing.T) { + // Batch API is supported on kernels >= 5.6, so make sure that in those cases + // it returns true + kernelVersion, err := ebpfkernel.NewKernelVersion() + require.NoError(t, err) + + if kernelVersion.IsRH7Kernel() || kernelVersion.IsRH8Kernel() { + // Some of those kernels have backported the batch API, I don't want + // to include those specifics in unit tests that are only meant to ensure + // that the checks are correct in at least the basic cases. + t.Skip("Unknown support for batch API on RHEL kernels") + } + + require.Equal(t, kernelVersion.Code >= ebpfkernel.Kernel5_6, BatchAPISupported()) +} + +func TestSingleItemIter(t *testing.T) { + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 10, + }) + require.NoError(t, err) + + numsToPut := []uint32{1, 2, 3, 4, 5} + for _, num := range numsToPut { + require.NoError(t, m.Put(&num, &num)) + } + + var k uint32 + var v uint32 + numElements := 0 + foundElements := make(map[uint32]bool) + + it := m.IterateWithBatchSize(1) + require.NotNil(t, it) + require.IsType(t, &genericMapItemIterator[uint32, uint32]{}, it) + for it.Next(&k, &v) { + numElements++ + foundElements[k] = true + } + require.NoError(t, it.Err()) + + require.Equal(t, len(numsToPut), numElements) + for _, num := range numsToPut { + require.True(t, foundElements[num]) + } +} + +func TestBatchIter(t *testing.T) { + if !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 100, + }) + require.NoError(t, err) + + numsToPut := uint32(50) + expectedNumbers := make([]uint32, numsToPut) + for i := uint32(0); i < numsToPut; i++ { + require.NoError(t, m.Put(&i, &i)) + expectedNumbers[i] = i + } + + var k uint32 + var v uint32 + actualNumbers := make([]uint32, numsToPut) + + it := m.IterateWithBatchSize(10) + require.NotNil(t, it) + require.IsType(t, &genericMapBatchIterator[uint32, uint32]{}, it) + for it.Next(&k, &v) { + actualNumbers[k] = v + } + + require.NoError(t, it.Err()) + require.Equal(t, expectedNumbers, actualNumbers) +} + +func TestBatchIterArray(t *testing.T) { + if !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Array, + MaxEntries: 100, + }) + require.NoError(t, err) + + numsToPut := uint32(50) + expectedNumbers := make([]uint32, m.Map().MaxEntries()) + for i := uint32(0); i < numsToPut; i++ { + val := i + 200 // To distinguish from unset values + require.NoError(t, m.Put(&i, &val)) + expectedNumbers[i] = val + } + + var k uint32 + var v uint32 + numElements := uint32(0) + actualNumbers := make([]uint32, m.Map().MaxEntries()) + + it := m.IterateWithBatchSize(10) + require.NotNil(t, it) + require.IsType(t, &genericMapBatchIterator[uint32, uint32]{}, it) + for it.Next(&k, &v) { + actualNumbers[k] = v + numElements++ + } + require.NoError(t, it.Err()) + + // Array maps will return all values on iterations, even if they are unset + require.Equal(t, m.Map().MaxEntries(), numElements) + require.Equal(t, expectedNumbers, actualNumbers) +} + +func TestBatchIterLessItemsThanBatchSize(t *testing.T) { + if !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 100, + }) + require.NoError(t, err) + + numsToPut := uint32(5) + expectedNumbers := make([]uint32, numsToPut) + for i := uint32(0); i < numsToPut; i++ { + require.NoError(t, m.Put(&i, &i)) + expectedNumbers[i] = i + } + + var k uint32 + var v uint32 + actualNumbers := make([]uint32, numsToPut) + + it := m.IterateWithBatchSize(10) + require.NotNil(t, it) + require.IsType(t, &genericMapBatchIterator[uint32, uint32]{}, it) + for it.Next(&k, &v) { + actualNumbers[k] = v + } + + require.NoError(t, it.Err()) + require.Equal(t, expectedNumbers, actualNumbers) +} + +func TestBatchIterWhileUpdated(t *testing.T) { + if !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + maxEntries := 50 + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: uint32(maxEntries), + }) + require.NoError(t, err) + + numsToPut := uint32(50) + for i := uint32(0); i < numsToPut; i++ { + require.NoError(t, m.Put(&i, &i)) + } + + var k uint32 + var v uint32 + numElements := uint32(0) + foundElements := make(map[uint32]bool) + updateEachElements := 25 + updatesDone := 0 + + it := m.IterateWithBatchSize(10) + require.NotNil(t, it) + require.IsType(t, &genericMapBatchIterator[uint32, uint32]{}, it) + for it.Next(&k, &v) { + numElements++ + foundElements[k] = true + + // Not recommended! But helps us simulate the case where the map is updated + // as we iterate over it. We are not concerned with correctness here but we + // want to make sure that the iterator doesn't crash or run into an infinite + // loop + if numElements%uint32(updateEachElements) == 0 { + for i := uint32(0); i < numsToPut; i++ { + oldKey := i + uint32(updatesDone)*10 + newKey := i + uint32(updatesDone+1)*10 + require.NoError(t, m.Delete(&oldKey)) + require.NoError(t, m.Put(&newKey, &newKey)) + } + + updatesDone++ + } + + require.LessOrEqual(t, numElements, uint32(maxEntries)) + } + + // Again, just concerned with exiting the loop and not correctness, we don't even + // check for no error on end of iteration. + require.LessOrEqual(t, numElements, uint32(maxEntries)) +} + +func TestIteratePerCPUMaps(t *testing.T) { + kernelVersion, err := kernel.HostVersion() + require.NoError(t, err) + + if kernelVersion < kernel.VersionCode(4, 6, 0) { + t.Skip("Per CPU maps not supported on this kernel version") + } + + m, err := NewGenericMap[uint32, []uint32](&ebpf.MapSpec{ + Type: ebpf.PerCPUHash, + MaxEntries: 10, + }) + require.NoError(t, err) + + nbCpus, err := kernel.PossibleCPUs() + require.NoError(t, err) + + numsToPut := []uint32{0, 100, 200, 300, 400, 500} + for _, num := range numsToPut { + entries := make([]uint32, nbCpus) + for i := 0; i < nbCpus; i++ { + entries[i] = num + uint32(i) + } + require.NoError(t, m.Put(&num, &entries)) + } + + var k uint32 + entries := make([]uint32, nbCpus) + numElements := 0 + foundElements := make(map[uint32]bool) + + it := m.Iterate() + require.NotNil(t, it) + require.IsType(t, &genericMapItemIterator[uint32, []uint32]{}, it) + for it.Next(&k, &entries) { + numElements++ + foundElements[k] = true + + for i := 0; i < nbCpus; i++ { + require.Equal(t, k+uint32(i), entries[i]) + } + } + require.NoError(t, it.Err()) + + require.Equal(t, len(numsToPut), numElements) + for _, num := range numsToPut { + require.True(t, foundElements[num]) + } +} + +type ValueStruct struct { + A uint32 + B uint32 +} + +func TestIterateWithValueStructs(t *testing.T) { + doTest := func(t *testing.T, batchSize int, oneItem bool) { + singleItem := batchSize == 1 + if !singleItem && !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, ValueStruct](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 10, + }) + require.NoError(t, err) + + var numsToPut []uint32 + if oneItem { + numsToPut = []uint32{10} + } else { + numsToPut = []uint32{0, 100, 200, 300, 400, 500} + } + for _, num := range numsToPut { + v := ValueStruct{A: num, B: num + 1} + require.NoError(t, m.Put(&num, &v)) + } + + var k uint32 + var v ValueStruct + numElements := 0 + foundElements := make(map[uint32]bool) + + it := m.IterateWithBatchSize(batchSize) + require.NotNil(t, it) + if singleItem { + require.IsType(t, &genericMapItemIterator[uint32, ValueStruct]{}, it) + } else { + require.IsType(t, &genericMapBatchIterator[uint32, ValueStruct]{}, it) + } + + for it.Next(&k, &v) { + numElements++ + foundElements[k] = true + + require.Equal(t, k, v.A) + require.Equal(t, k+1, v.B) + } + + require.Equal(t, len(numsToPut), numElements) + for _, num := range numsToPut { + require.True(t, foundElements[num]) + } + } + + t.Run("SingleItem", func(t *testing.T) { + doTest(t, 1, false) + }) + + t.Run("Batch", func(t *testing.T) { + doTest(t, 0, false) + }) + + t.Run("BatchWithOneItem", func(t *testing.T) { + doTest(t, 0, true) + }) +} + +func TestBatchIterAllocsPerRun(t *testing.T) { + if !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 10000, + }) + require.NoError(t, err) + + numsToPut := uint32(9000) + for i := uint32(0); i < numsToPut; i++ { + require.NoError(t, m.Put(&i, &i)) + } + + var k uint32 + var v uint32 + batchSize := 10 + + it := m.IterateWithBatchSize(batchSize) + numElements := uint32(0) + allocs := testing.AllocsPerRun(1, func() { + for it.Next(&k, &v) { + numElements++ + } + }) + require.Equal(t, numsToPut, numElements) + assert.EqualValues(t, allocs, 0) + + batchSize = 100 + numElements = uint32(0) + it = m.IterateWithBatchSize(batchSize) + allocs = testing.AllocsPerRun(1, func() { + for it.Next(&k, &v) { + numElements++ + } + }) + require.Equal(t, numsToPut, numElements) + assert.EqualValues(t, allocs, 0) +} + +func BenchmarkIterate(b *testing.B) { + setupAndBenchmark := func(b *testing.B, forceSingleItem bool, maxEntries int, numEntries int, batchSize int) { + if !forceSingleItem && !BatchAPISupported() { + b.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: uint32(maxEntries), + }) + require.NoError(b, err) + + for i := uint32(0); i < uint32(numEntries); i++ { + require.NoError(b, m.Put(&i, &i)) + } + + var benchName string + if forceSingleItem { + benchName = fmt.Sprintf("BenchmarkIterateSingleItem-%dentries-%dbatch", numEntries, batchSize) + batchSize = 1 + } else { + benchName = fmt.Sprintf("BenchmarkIterateBatch-%dentries-%dbatch", numEntries, batchSize) + } + + b.Run(benchName, func(b *testing.B) { + for i := 0; i < b.N; i++ { + var k uint32 + var v uint32 + + it := m.IterateWithBatchSize(batchSize) + for it.Next(&k, &v) { + } + } + }) + } + + batchSizes := []int{5, 10, 20, 50, 100} + for _, batchSize := range batchSizes { + for _, numEntries := range []int{100, 1000, 10000} { + if BatchAPISupported() { + setupAndBenchmark(b, false, numEntries, numEntries, batchSize) + } + + setupAndBenchmark(b, true, numEntries, numEntries, batchSize) + } + } +} + +func TestBatchDelete(t *testing.T) { + if !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 100, + }) + require.NoError(t, err) + + numsToPut := uint32(90) + for i := uint32(0); i < numsToPut; i++ { + require.NoError(t, m.Put(&i, &i)) + } + + numsToDelete := uint32(10) + toDelete := make([]uint32, numsToDelete) + for i := uint32(0); i < numsToDelete; i++ { + toDelete[i] = i + } + deleted, err := m.BatchDelete(toDelete) + require.NoError(t, err) + require.Equal(t, numsToDelete, uint32(deleted)) + + var k uint32 + var v uint32 + numElements := uint32(0) + foundElements := make(map[uint32]bool) + + it := m.IterateWithBatchSize(1) + require.NotNil(t, it) + require.IsType(t, &genericMapItemIterator[uint32, uint32]{}, it) + for it.Next(&k, &v) { + numElements++ + foundElements[k] = true + } + + require.Equal(t, numsToPut-numsToDelete, numElements) + for i := uint32(0); i < numsToPut; i++ { + require.Equal(t, foundElements[i], i >= numsToDelete) + } +} + +func TestBatchUpdate(t *testing.T) { + if !BatchAPISupported() { + t.Skip("Batch API not supported") + } + + m, err := NewGenericMap[uint32, uint32](&ebpf.MapSpec{ + Type: ebpf.Hash, + MaxEntries: 100, + }) + require.NoError(t, err) + + numsToCreate := uint32(90) + toCreateKeys := make([]uint32, numsToCreate) + toCreateValues := make([]uint32, numsToCreate) + for i := uint32(0); i < numsToCreate; i++ { + toCreateKeys[i] = i + toCreateValues[i] = i + } + updated, err := m.BatchUpdate(toCreateKeys, toCreateValues, nil) + require.NoError(t, err) + require.Equal(t, numsToCreate, uint32(updated)) + + var k uint32 + var v uint32 + numElements := uint32(0) + foundElements := make(map[uint32]bool) + + it := m.IterateWithBatchSize(1) + require.NotNil(t, it) + require.IsType(t, &genericMapItemIterator[uint32, uint32]{}, it) + for it.Next(&k, &v) { + numElements++ + foundElements[k] = true + } + + require.Equal(t, numsToCreate, numElements) + for i := uint32(0); i < numsToCreate; i++ { + require.True(t, foundElements[i]) + } +} diff --git a/pkg/ebpf/map_cleaner.go b/pkg/ebpf/map_cleaner.go index d1c9687f90c4b..724ec57ca5132 100644 --- a/pkg/ebpf/map_cleaner.go +++ b/pkg/ebpf/map_cleaner.go @@ -118,10 +118,10 @@ func (mc *MapCleaner[K, V]) cleanWithBatches(nowTS int64, shouldClean func(nowTS var keysToDelete []K totalCount, deletedCount := 0, 0 - var next K + var cursor cebpf.BatchCursor var n int for { - n, _ = mc.emap.BatchLookup(next, &next, mc.keyBatch, mc.valuesBatch, nil) + n, _ = mc.emap.BatchLookup(&cursor, mc.keyBatch, mc.valuesBatch, nil) if n == 0 { break }