Skip to content

Commit

Permalink
fix(ebpf): merge equal samples (#2788)
Browse files Browse the repository at this point in the history
  • Loading branch information
korniltsev authored Dec 1, 2023
1 parent a27512a commit 131c5e3
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 29 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ bin
/phlare
/pyroscope
/profilecli
/playground
# Test binary, built with `go test -c`
*.test
# Output of the go coverage tool, specifically when used with LiteIDE
Expand Down
9 changes: 6 additions & 3 deletions ebpf/cmd/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ var (
)

func main() {

config = getConfig()
metrics = ebpfmetrics.New(prometheus.DefaultRegisterer)

Expand Down Expand Up @@ -78,10 +77,14 @@ func main() {

func collectProfiles(profiles chan *pushv1.PushRequest) {
builders := pprof.NewProfileBuilders(int64(config.SampleRate))
err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32) {
err := session.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation ebpfspy.SampleAggregation) {
labelsHash, labels := target.Labels()
builder := builders.BuilderForTarget(labelsHash, labels)
builder.AddSample(stack, value)
if aggregation == ebpfspy.SampleAggregated {
builder.CreateSample(stack, value)
} else {
builder.CreateSampleOrAddValue(stack, value)
}
})

if err != nil {
Expand Down
76 changes: 68 additions & 8 deletions ebpf/pprof/pprof.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package pprof
import (
"fmt"
"io"
"reflect"
"sync"
"time"
"unsafe"

"github.com/cespare/xxhash/v2"
"github.com/google/pprof/profile"
"github.com/klauspost/compress/gzip"
"github.com/prometheus/prometheus/model/labels"
Expand Down Expand Up @@ -39,9 +42,10 @@ func (b ProfileBuilders) BuilderForTarget(hash uint64, labels labels.Labels) *Pr
}

builder := &ProfileBuilder{
locations: make(map[string]*profile.Location),
functions: make(map[string]*profile.Function),
Labels: labels,
locations: make(map[string]*profile.Location),
functions: make(map[string]*profile.Function),
sampleHashToSample: make(map[uint64]*profile.Sample),
Labels: labels,
Profile: &profile.Profile{
Mapping: []*profile.Mapping{
{
Expand All @@ -53,20 +57,28 @@ func (b ProfileBuilders) BuilderForTarget(hash uint64, labels labels.Labels) *Pr
PeriodType: &profile.ValueType{Type: "cpu", Unit: "nanoseconds"},
TimeNanos: time.Now().UnixNano(),
},
hash: xxhash.New(),
tmpLocationIDs: make([]uint64, 0, 128),
}
res = builder
b.Builders[hash] = res
return res
}

type ProfileBuilder struct {
locations map[string]*profile.Location
functions map[string]*profile.Function
Profile *profile.Profile
Labels labels.Labels
locations map[string]*profile.Location
functions map[string]*profile.Function
sampleHashToSample map[uint64]*profile.Sample
Profile *profile.Profile
Labels labels.Labels

hash *xxhash.Digest
b [8]byte
tmpLocations []*profile.Location
tmpLocationIDs []uint64
}

func (p *ProfileBuilder) AddSample(stacktrace []string, value uint64) {
func (p *ProfileBuilder) CreateSample(stacktrace []string, value uint64) {
sample := &profile.Sample{
Value: []int64{int64(value) * p.Profile.Period},
}
Expand All @@ -77,6 +89,42 @@ func (p *ProfileBuilder) AddSample(stacktrace []string, value uint64) {
p.Profile.Sample = append(p.Profile.Sample, sample)
}

func (p *ProfileBuilder) CreateSampleOrAddValue(stacktrace []string, value uint64) {
scaledValue := int64(value) * p.Profile.Period
if cap(p.tmpLocations) < len(stacktrace) {
p.tmpLocations = make([]*profile.Location, 0, len(stacktrace))
} else {
p.tmpLocations = p.tmpLocations[:0]
}
if cap(p.tmpLocationIDs) < len(stacktrace) {
p.tmpLocationIDs = make([]uint64, 0, len(stacktrace))
} else {
p.tmpLocationIDs = p.tmpLocationIDs[:0]
}
for _, s := range stacktrace {
loc := p.addLocation(s)
p.tmpLocations = append(p.tmpLocations, loc)
p.tmpLocationIDs = append(p.tmpLocationIDs, loc.ID)
}
p.hash.Reset()
if _, err := p.hash.Write(uint64Bytes(p.tmpLocationIDs)); err != nil {
panic(err)
}
h := p.hash.Sum64()
sample := p.sampleHashToSample[h]
if sample != nil {
sample.Value[0] += scaledValue
return
}
sample = &profile.Sample{
Location: p.tmpLocations,
Value: []int64{scaledValue},
}
p.sampleHashToSample[h] = sample
p.tmpLocations = nil
p.Profile.Sample = append(p.Profile.Sample, sample)
}

func (p *ProfileBuilder) addLocation(function string) *profile.Location {
loc, ok := p.locations[function]
if ok {
Expand Down Expand Up @@ -131,3 +179,15 @@ func (p *ProfileBuilder) Write(dst io.Writer) (int64, error) {
}
return 0, nil
}

func uint64Bytes(s []uint64) []byte {
if len(s) == 0 {
return nil
}
var bs []byte
hdr := (*reflect.SliceHeader)(unsafe.Pointer(&bs))
hdr.Len = len(s) * 8
hdr.Cap = hdr.Len
hdr.Data = uintptr(unsafe.Pointer(&s[0]))
return bs
}
73 changes: 62 additions & 11 deletions ebpf/pprof/pprof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@ package pprof

import (
"bytes"
"fmt"
"strings"
"testing"
"time"

"github.com/google/pprof/profile"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand All @@ -17,22 +20,72 @@ func TestBackAndForth(t *testing.T) {
builders := NewProfileBuilders(97)

builder := builders.BuilderForTarget(1, labels.Labels{{Name: "foo", Value: "bar"}})
builder.AddSample([]string{"a", "b", "c"}, 239)
builder.AddSample([]string{"a", "b", "d"}, 4242)
builder.CreateSample([]string{"a", "b", "c"}, 239)
builder.CreateSample([]string{"a", "b", "d"}, 4242)

buf := bytes.NewBuffer(nil)
_, err := builder.Write(buf)
require.NoError(t, err)
assert.NoError(t, err)

rawProfile := buf.Bytes()

parsed, err := profile.Parse(bytes.NewBuffer(rawProfile))
require.NoError(t, err)
assert.NoError(t, err)
require.NotNil(t, parsed)
require.Equal(t, 2, len(parsed.Sample))
require.Equal(t, 4, len(parsed.Function))
require.Equal(t, 4, len(parsed.Location))
assert.Equal(t, 2, len(parsed.Sample))
assert.Equal(t, 4, len(parsed.Function))
assert.Equal(t, 4, len(parsed.Location))

stacks := stackCollapse(parsed)

assert.Equal(t, 239*period, stacks["a;b;c"])
assert.Equal(t, 4242*period, stacks["a;b;d"])
}

func TestMergeSamples(t *testing.T) {
const sampleRate = 97
period := time.Second.Nanoseconds() / int64(sampleRate)

builders := NewProfileBuilders(97)

builder := builders.BuilderForTarget(1, nil)
builder.CreateSampleOrAddValue([]string{"a", "b", "d"}, 4242)

for i := 0; i < 14; i++ {
builder.CreateSampleOrAddValue([]string{"a", "b", "c"}, 239)
}

var longStack []string
for i := 0; i < 512; i++ {
longStack = append(longStack, fmt.Sprintf("l_%d", i))
}
builder.CreateSampleOrAddValue(longStack, 3)
builder.CreateSampleOrAddValue([]string{"a", "b"}, 42)

assert.Equal(t, 4, len(builder.Profile.Sample))

buf := bytes.NewBuffer(nil)
_, err := builder.Write(buf)
assert.NoError(t, err)
rawProfile := buf.Bytes()

parsed, err := profile.Parse(bytes.NewBuffer(rawProfile))
assert.NoError(t, err)
require.NotNil(t, parsed)
assert.Equal(t, 4, len(parsed.Sample))
assert.Equal(t, 4+512, len(parsed.Function))
assert.Equal(t, 4+512, len(parsed.Location))

stacks := stackCollapse(parsed)

assert.Equal(t, 14*239*period, stacks["a;b;c"])
assert.Equal(t, 4242*period, stacks["a;b;d"])
assert.Equal(t, 42*period, stacks["a;b"])
assert.Equal(t, 3*period, stacks[strings.Join(longStack, ";")])
assert.Equal(t, 4, len(parsed.Sample))
}

func stackCollapse(parsed *profile.Profile) map[string]int64 {
stacks := map[string]int64{}
for _, sample := range parsed.Sample {
stack := ""
Expand All @@ -42,9 +95,7 @@ func TestBackAndForth(t *testing.T) {
}
stack += location.Line[0].Function.Name
}
stacks[stack] = sample.Value[0]
stacks[stack] += sample.Value[0]
}

require.Equal(t, 239*period, stacks["a;b;c"])
require.Equal(t, 4242*period, stacks["a;b;d"])
return stacks
}
2 changes: 1 addition & 1 deletion ebpf/python_ebpf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func compareProfiles(t *testing.T, l log.Logger, expected []byte, actual map[str
func collectProfiles(t *testing.T, l log.Logger, profiler Session) map[string]struct{} {
l = log.With(l, "component", "profiles")
profiles := map[string]struct{}{}
err := profiler.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32) {
err := profiler.CollectProfiles(func(target *sd.Target, stack []string, value uint64, pid uint32, _ SampleAggregation) {
lo.Reverse(stack)
sample := strings.Join(stack, ";")
profiles[sample] = struct{}{}
Expand Down
20 changes: 16 additions & 4 deletions ebpf/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,12 +43,24 @@ type SessionOptions struct {
SampleRate int
}

type SampleAggregation bool

var (
// SampleAggregated mean samples are accumulated in ebpf, no need to dedup these
SampleAggregated = SampleAggregation(true)
// SampleNotAggregated mean values are not accumulated in ebpf, but streamed to userspace with value=1
// TODO make consider aggregating python in ebpf as well
SampleNotAggregated = SampleAggregation(false)
)

type CollectProfilesCallback func(target *sd.Target, stack []string, value uint64, pid uint32, aggregation SampleAggregation)

type Session interface {
Start() error
Stop()
Update(SessionOptions) error
UpdateTargets(args sd.TargetsOptions)
CollectProfiles(f func(target *sd.Target, stack []string, value uint64, pid uint32)) error
CollectProfiles(f CollectProfilesCallback) error
DebugInfo() interface{}
}

Expand Down Expand Up @@ -225,7 +237,7 @@ func (s *session) UpdateTargets(args sd.TargetsOptions) {
}
}

func (s *session) CollectProfiles(cb func(t *sd.Target, stack []string, value uint64, pid uint32)) error {
func (s *session) CollectProfiles(cb CollectProfilesCallback) error {
s.mutex.Lock()
defer s.mutex.Unlock()

Expand Down Expand Up @@ -257,7 +269,7 @@ func (s *session) DebugInfo() interface{} {
}
}

func (s *session) collectRegularProfile(cb func(t *sd.Target, stack []string, value uint64, pid uint32)) error {
func (s *session) collectRegularProfile(cb CollectProfilesCallback) error {
sb := &stackBuilder{}

keys, values, batch, err := s.getCountsMapValues()
Expand Down Expand Up @@ -315,7 +327,7 @@ func (s *session) collectRegularProfile(cb func(t *sd.Target, stack []string, va
continue // only comm
}
lo.Reverse(sb.stack)
cb(labels, sb.stack, uint64(value), ck.Pid)
cb(labels, sb.stack, uint64(value), ck.Pid, SampleAggregated)
s.collectMetrics(labels, &stats, sb)
}

Expand Down
4 changes: 2 additions & 2 deletions ebpf/session_python.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/samber/lo"
)

func (s *session) collectPythonProfile(cb func(t *sd.Target, stack []string, value uint64, pid uint32)) error {
func (s *session) collectPythonProfile(cb CollectProfilesCallback) error {
if s.pyperf == nil {
return nil
}
Expand Down Expand Up @@ -99,7 +99,7 @@ func (s *session) collectPythonProfile(cb func(t *sd.Target, stack []string, val
continue // only comm .. todo skip with an option
}
lo.Reverse(sb.stack)
cb(labels, sb.stack, uint64(1), event.Pid)
cb(labels, sb.stack, uint64(1), event.Pid, SampleNotAggregated)
s.collectMetrics(labels, &stats, sb)
}
if stacktraceErrors > 0 {
Expand Down

0 comments on commit 131c5e3

Please sign in to comment.