From d326bfb295a7c76347e5866bb349b9fb8e6fe7b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Felix=20Geisend=C3=B6rfer?= Date: Wed, 18 Aug 2021 00:28:52 +0200 Subject: [PATCH] profiler: Implement Delta Profiles (#842) This patch implements delta profiles which will allow us to enable allocation, mutex and block profiles in the aggregation and comparison views of our web ui. The internal google doc "RFC: Go Profiling: Delta Profiles" describes this in great detail. To simplify the code, a refactoring was done that attempts to increase code sharing between similar profile types, in particular heap, mutex, block and goroutine profiles (see type profileType). Additionally the new profiler.enabledProfileTypes() method ensures that profiles are collected in a deterministic order. Testing is accomplished by the new pprofutils package which allows converting profiles between protobuf and a simplified text format. Since that package also contains a suitable delta profiling implementation, it's used for the delta profiling itself as well. In this iteration, delta profiles are uploaded in addition to the original profiles using a "delta-" prefix, e.g. "delta-mutex.pprof". This is done to avoid breaking things until the backend has made corresponding changes as well. The plan for the next iteration is to stop uploading the original profiles since they are redundant and a waste of bandwidth and storage. One particular complexity worth noting is that the "delta-heap.pprof" contains 2 profiles alloc_ and inuse_. Only the alloc_ sample types are subject to delta computation, the inuse_ ones are kept as-is since they describe the current state of the heap's live set. --- go.mod | 2 +- go.sum | 3 + profiler/internal/pprofutils/README.md | 7 + profiler/internal/pprofutils/delta.go | 66 ++++ profiler/internal/pprofutils/delta_test.go | 85 ++++ profiler/internal/pprofutils/pprofutils.go | 13 + profiler/internal/pprofutils/protobuf.go | 67 ++++ profiler/internal/pprofutils/protobuf_test.go | 59 +++ .../test-fixtures/pprof.lines.pb.gz | Bin 0 -> 940 bytes .../test-fixtures/pprof.samples.cpu.001.pb.gz | Bin 0 -> 1298 bytes profiler/internal/pprofutils/text.go | 118 ++++++ profiler/internal/pprofutils/text_test.go | 57 +++ profiler/profile.go | 368 ++++++++++-------- profiler/profile_test.go | 252 +++++++++--- profiler/profiler.go | 56 ++- profiler/profiler_test.go | 19 +- 16 files changed, 938 insertions(+), 234 deletions(-) create mode 100644 profiler/internal/pprofutils/README.md create mode 100644 profiler/internal/pprofutils/delta.go create mode 100644 profiler/internal/pprofutils/delta_test.go create mode 100644 profiler/internal/pprofutils/pprofutils.go create mode 100644 profiler/internal/pprofutils/protobuf.go create mode 100644 profiler/internal/pprofutils/protobuf_test.go create mode 100644 profiler/internal/pprofutils/test-fixtures/pprof.lines.pb.gz create mode 100644 profiler/internal/pprofutils/test-fixtures/pprof.samples.cpu.001.pb.gz create mode 100644 profiler/internal/pprofutils/text.go create mode 100644 profiler/internal/pprofutils/text_test.go diff --git a/go.mod b/go.mod index ed0aa6f643..08aac6dbb8 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,6 @@ require ( github.com/DataDog/datadog-go v4.4.0+incompatible github.com/DataDog/gostackparse v0.5.0 github.com/DataDog/sketches-go v1.0.0 - github.com/google/pprof v0.0.0-20210125172800-10e9aeb4a998 + github.com/google/pprof v0.0.0-20210423192551-a2663126120b github.com/tinylib/msgp v1.1.2 ) diff --git a/go.sum b/go.sum index a073f41034..b02926739f 100644 --- a/go.sum +++ b/go.sum @@ -259,6 +259,8 @@ github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hf github.com/google/pprof v0.0.0-20200430221834-fc25d7d30c6d/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200708004538-1a94d8640e99/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20210125172800-10e9aeb4a998/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20210423192551-a2663126120b h1:l2YRhr+YLzmSp7KJMswRVk/lO5SwoFIcCLzJsVj+YPc= +github.com/google/pprof v0.0.0-20210423192551-a2663126120b/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.1.2 h1:EVhdT+1Kseyi1/pUmXKaFxYsDNy9RQYkMWRH68J/W7Y= @@ -539,6 +541,7 @@ github.com/stretchr/testify v1.6.0 h1:jlIyCplCJFULU/01vCkhKuTyc3OorI3bJFuw6obfgh github.com/stretchr/testify v1.6.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/syndtr/goleveldb v1.0.0 h1:fBdIW9lB4Iz0n9khmH8w27SJ3QEJ7+IgjPEwGSZiFdE= github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ= diff --git a/profiler/internal/pprofutils/README.md b/profiler/internal/pprofutils/README.md new file mode 100644 index 0000000000..cf3eb11826 --- /dev/null +++ b/profiler/internal/pprofutils/README.md @@ -0,0 +1,7 @@ +# pprofutils + +Internal fork of https://github.com/felixge/pprofutils stripped to only include +essential code and tests. It's used for delta profiles as well as testing. + +It'd be nice to keep this in sync with upstream, but no worries if not. We just +need the delta profile stuff to work. diff --git a/profiler/internal/pprofutils/delta.go b/profiler/internal/pprofutils/delta.go new file mode 100644 index 0000000000..1326db36b7 --- /dev/null +++ b/profiler/internal/pprofutils/delta.go @@ -0,0 +1,66 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2021 Datadog, Inc. + +package pprofutils + +import ( + "errors" + + "github.com/google/pprof/profile" +) + +// Delta describes how to compute the delta between two profiles and implements +// the conversion. +type Delta struct { + // SampleTypes limits the delta calcultion to the given sample types. Other + // sample types will retain the values of profile b. The defined sample types + // must exist in the profile, otherwise derivation will fail with an error. + // If the slice is empty, all sample types are subject to delta profile + // derivation. + // + // The use case for this for this is to deal with the heap profile which + // contains alloc and inuse sample types, but delta profiling makes no sense + // for the latter. + SampleTypes []ValueType +} + +// Convert computes the delta between all values b-a and returns them as a new +// profile. Samples that end up with a delta of 0 are dropped. WARNING: Profile +// a will be mutated by this function. You should pass a copy if that's +// undesirable. +func (d Delta) Convert(a, b *profile.Profile) (*profile.Profile, error) { + ratios := make([]float64, len(a.SampleType)) + + found := 0 + for i, st := range a.SampleType { + // Empty c.SampleTypes means we calculate the delta for every st + if len(d.SampleTypes) == 0 { + ratios[i] = -1 + continue + } + + // Otherwise we only calcuate the delta for any st that is listed in + // c.SampleTypes. st's not listed in there will default to ratio 0, which + // means we delete them from pa, so only the pb values remain in the final + // profile. + for _, deltaSt := range d.SampleTypes { + if deltaSt.Type == st.Type && deltaSt.Unit == st.Unit { + ratios[i] = -1 + found++ + } + } + } + if found != len(d.SampleTypes) { + return nil, errors.New("one or more sample type(s) was not found in the profile") + } + + a.ScaleN(ratios) + + delta, err := profile.Merge([]*profile.Profile{a, b}) + if err != nil { + return nil, err + } + return delta, delta.CheckValid() +} diff --git a/profiler/internal/pprofutils/delta_test.go b/profiler/internal/pprofutils/delta_test.go new file mode 100644 index 0000000000..c922aa2847 --- /dev/null +++ b/profiler/internal/pprofutils/delta_test.go @@ -0,0 +1,85 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2021 Datadog, Inc. + +package pprofutils + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestDelta(t *testing.T) { + t.Run("simple", func(t *testing.T) { + var deltaText bytes.Buffer + + profA, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +main;foo 5 +main;foo;bar 3 +main;foobar 4 +`))) + require.NoError(t, err) + + profB, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +main;foo 8 +main;foo;bar 3 +main;foobar 5 +`))) + require.NoError(t, err) + + delta, err := Delta{}.Convert(profA, profB) + require.NoError(t, err) + + require.NoError(t, Protobuf{}.Convert(delta, &deltaText)) + require.Equal(t, deltaText.String(), strings.TrimSpace(` +main;foo 3 +main;foobar 1 +`)+"\n") + }) + + t.Run("sampleTypes", func(t *testing.T) { + profA, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +x/count y/count +main;foo 5 10 +main;foo;bar 3 6 +main;foo;baz 9 0 +main;foobar 4 8 +`))) + require.NoError(t, err) + + profB, err := Text{}.Convert(strings.NewReader(strings.TrimSpace(` +x/count y/count +main;foo 8 16 +main;foo;bar 3 6 +main;foo;baz 9 0 +main;foobar 5 10 +`))) + require.NoError(t, err) + + t.Run("happyPath", func(t *testing.T) { + var deltaText bytes.Buffer + + deltaConfig := Delta{SampleTypes: []ValueType{{Type: "x", Unit: "count"}}} + delta, err := deltaConfig.Convert(profA, profB) + require.NoError(t, err) + + require.NoError(t, Protobuf{SampleTypes: true}.Convert(delta, &deltaText)) + require.Equal(t, deltaText.String(), strings.TrimSpace(` +x/count y/count +main;foo 3 16 +main;foobar 1 10 +main;foo;bar 0 6 +`)+"\n") + }) + + t.Run("unknownSampleType", func(t *testing.T) { + deltaConfig := Delta{SampleTypes: []ValueType{{Type: "foo", Unit: "count"}}} + _, err := deltaConfig.Convert(profA, profB) + require.Equal(t, "one or more sample type(s) was not found in the profile", err.Error()) + }) + }) +} diff --git a/profiler/internal/pprofutils/pprofutils.go b/profiler/internal/pprofutils/pprofutils.go new file mode 100644 index 0000000000..114c014eef --- /dev/null +++ b/profiler/internal/pprofutils/pprofutils.go @@ -0,0 +1,13 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2021 Datadog, Inc. + +// Package pprofutils is a fork of github.com/felixge/pprofutils, see README. +package pprofutils + +// ValueType describes the type and unit of a value. +type ValueType struct { + Type string + Unit string +} diff --git a/profiler/internal/pprofutils/protobuf.go b/profiler/internal/pprofutils/protobuf.go new file mode 100644 index 0000000000..9aed70915b --- /dev/null +++ b/profiler/internal/pprofutils/protobuf.go @@ -0,0 +1,67 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2021 Datadog, Inc. + +package pprofutils + +import ( + "bufio" + "fmt" + "io" + "sort" + "strings" + + "github.com/google/pprof/profile" +) + +// Protobuf converts from pprof's protobuf to folded text format. +type Protobuf struct { + // SampleTypes causes the text output to begin with a header line listing + // the sample types found in the profile. This is a custom extension to the + // folded text format. + SampleTypes bool +} + +// Convert marshals the given protobuf profile into folded text format. +func (p Protobuf) Convert(protobuf *profile.Profile, text io.Writer) error { + w := bufio.NewWriter(text) + if p.SampleTypes { + var sampleTypes []string + for _, sampleType := range protobuf.SampleType { + sampleTypes = append(sampleTypes, sampleType.Type+"/"+sampleType.Unit) + } + w.WriteString(strings.Join(sampleTypes, " ") + "\n") + } + if err := protobuf.Aggregate(true, true, false, false, false); err != nil { + return err + } + protobuf = protobuf.Compact() + sort.Slice(protobuf.Sample, func(i, j int) bool { + return protobuf.Sample[i].Value[0] > protobuf.Sample[j].Value[0] + }) + for _, sample := range protobuf.Sample { + var frames []string + for i := range sample.Location { + loc := sample.Location[len(sample.Location)-i-1] + for j := range loc.Line { + line := loc.Line[len(loc.Line)-j-1] + frames = append(frames, line.Function.Name) + } + } + var values []string + for _, val := range sample.Value { + values = append(values, fmt.Sprintf("%d", val)) + if !p.SampleTypes { + break + } + } + fmt.Fprintf( + w, + "%s %s\n", + strings.Join(frames, ";"), + strings.Join(values, " "), + ) + } + return w.Flush() +} diff --git a/profiler/internal/pprofutils/protobuf_test.go b/profiler/internal/pprofutils/protobuf_test.go new file mode 100644 index 0000000000..5cac0f240c --- /dev/null +++ b/profiler/internal/pprofutils/protobuf_test.go @@ -0,0 +1,59 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2021 Datadog, Inc. + +package pprofutils + +import ( + "bytes" + "io/ioutil" + "path/filepath" + "strings" + "testing" + + "github.com/google/pprof/profile" + "github.com/stretchr/testify/require" +) + +func TestProtobufConvert(t *testing.T) { + t.Run("basic", func(t *testing.T) { + data, err := ioutil.ReadFile(filepath.Join("test-fixtures", "pprof.samples.cpu.001.pb.gz")) + require.NoError(t, err) + + proto, err := profile.Parse(bytes.NewReader(data)) + require.NoError(t, err) + + out := bytes.Buffer{} + require.NoError(t, Protobuf{}.Convert(proto, &out)) + want := strings.TrimSpace(` +golang.org/x/sync/errgroup.(*Group).Go.func1;main.run.func2;main.computeSum 19 +runtime.mcall;runtime.park_m;runtime.resetForSleep;runtime.resettimer;runtime.modtimer;runtime.wakeNetPoller;runtime.netpollBreak;runtime.write;runtime.write1 7 +golang.org/x/sync/errgroup.(*Group).Go.func1;main.run.func2;main.computeSum;runtime.asyncPreempt 5 +runtime.mstart;runtime.mstart1;runtime.sysmon;runtime.usleep 3 +runtime.mcall;runtime.park_m;runtime.schedule;runtime.findrunnable;runtime.stopm;runtime.notesleep;runtime.semasleep;runtime.pthread_cond_wait 2 +runtime.mcall;runtime.gopreempt_m;runtime.goschedImpl;runtime.schedule;runtime.findrunnable;runtime.stopm;runtime.notesleep;runtime.semasleep;runtime.pthread_cond_wait 1 +runtime.mcall;runtime.park_m;runtime.schedule;runtime.findrunnable;runtime.checkTimers;runtime.nanotime;runtime.nanotime1 1 +`) + "\n" + require.Equal(t, out.String(), want) + }) + + t.Run("differentLinesPerFunction", func(t *testing.T) { + data, err := ioutil.ReadFile(filepath.Join("test-fixtures", "pprof.lines.pb.gz")) + require.NoError(t, err) + + proto, err := profile.Parse(bytes.NewReader(data)) + require.NoError(t, err) + + out := bytes.Buffer{} + require.NoError(t, Protobuf{}.Convert(proto, &out)) + want := strings.TrimSpace(` +main.run.func1;main.threadKind.Run;main.goGo1;main.goHog 85 +main.run.func1;main.threadKind.Run;main.goGo2;main.goHog 78 +main.run.func1;main.threadKind.Run;main.goGo3;main.goHog 72 +main.run.func1;main.threadKind.Run;main.goGo0;main.goHog 72 +main.run.func1;main.threadKind.Run;main.goGo0;main.goHog;runtime.asyncPreempt 1 +`) + "\n" + require.Equal(t, out.String(), want) + }) +} diff --git a/profiler/internal/pprofutils/test-fixtures/pprof.lines.pb.gz b/profiler/internal/pprofutils/test-fixtures/pprof.lines.pb.gz new file mode 100644 index 0000000000000000000000000000000000000000..1b4edefa7b92c0e8ff46fd8c7f597db833dab197 GIT binary patch literal 940 zcmV;d15^ATiwFP!00004|9p{4h~3m5$N#xE_ueyUXXgB;(|RVGUM`|nadMKA$0QrK z9T21l-B`-Zc_oe9B$qtOba%im1Q&Hv7lJT573{_->axiASPP;k%Ebrbq6<;%ViuKT zS{xTPo9~a`_w#$4U$zfF|K#Hj{`%&NbA}Ey*fVs{hx)}ke}4Lg{&E-Zs@p&669_c) z^Y;h6$pC0jX;6C>01Xb%U_6|xf*v~j_MkV>K@YwHTdN00Xw+GKbnmN!*6PFEuBs0K z0-dF!`#*G+4xjHV9WJ4z@o;SgUV`fPcW(6!UF-Gr$toD28{c*40A?LJfCe%;^hKyX zy#Ghf(6vE_4$gcmCTXfi=BXr|Q3fJ8%EB zR@=fe_PS{T9jd#pw<)9+H(MR6g*H8FpEEbW04jB>%^|gTyVbE8PoJ=nxekUPGmhM@JOe>j-LYA>CaZ=!_RA6(LBLPfvb|^ zvdQZ-lXyNaix}6kss*hnW+|;|S=BhBX>KJ2F*2IA^DYz)Ig_=s>#L=t;)OI9)=N!J zR=Q-_ns*e2^`leg0?T3yfi#Zt;_UXy2VQQu zXeI*Bs>l@A*&HXuCTr4}@cqDdNaT8$*|vk_p2Y0oJIIs>B0@=G&*qLRs6#^G5h;a; zV~08-id{kl_XF}2?#8DK?}fom;BC&*yxH4K@&?aR#y5%OI#yMXsU1ZuBE)B$*}`@r z5sAPKLyt*6bR}~g#{AgxBzYP?u4BUaPGwbv<&ddInQ&Z}^U#*I!Gz!n&=l|HpRS(E55wgD6(5P{ z#*m;3anTmlcX~QqRky5%47)KBS*SdM3W>zT5Q7U;FrWbqaREEyLK783j2Jh@6>cP& zfVsD4x+NNuaZ~l(FXw#cJLlZRO~rsQ=% zT=@9h8Yw(FHa>w-TsU+lGZq7~gJ@YGNa)?~ZX`8nX`_~f2on0sO+`d-wU&h_5_Wg&)yetP;8XfcRkLd!xN3B9wVv0g(MLPAfz^g3v9h~t}4Eei=G^z7+ZWX=%o z)DQp(eeshQWJm%p5)C1Vg#LKrh-3f{W;KKq5_;w3qmoJD#e#;AMnXS)TQMp8y{I7! zBcazXC?<^`lr@AAB=pNmiW$bwC6hryFJE~>G9!2;q9J6F&|mMqCYcPLifCm(B=q2W zN2S3m7PL4ZUMpz`IVAL_-;VS=+{$SPc_j44Ii;V&(^9{Hgnn~Jspau-L@Pm2_HshC zTfkLX42t-ahEPI6KRT&WOL$#2RF?8*)TGMzeN?MJ1qr=+STPkm7SrND@tT|uMM9rH zzgYLAtUHQ?9_`f~#Y3{MF*)W-$0Rd`7uIPA<4EY4^UD7?zM9k!CXmo;?<-~kZ$S-! zV4TLQLp6X1CTK#TC<29QvIY>t6irprHGnt{)8Xn!4PXc}G*ivi01}84tGOBgV4miy zg&IH-i?pav3QM%4P#Vj$tk5u4XhoqBq?A@iYXBJ>qhm@ri{o^>I#B~aTuE1|xE!vc zs}#!PYPwp*6|hRHDz1oY=$fil11RBIx>m)NaUES(U0(yJ;0C%uA&ML6MukRk6Wyd# z#&9#;tTM*2Mr#U9;1;^2T3{4HQkV<9m5emH-*#kS=Kf>PTdr? z8wPw&XtSsFCw=bu`Yg9P^Yu3G_}pm;?#*&fZwuY`OugL+=E8k4sTbAedRt7oo|x@e z+?#Ynz2OLx9p4aw7=+-yY zcFF(d`YxfOrYi_r#+oGlaMe~8@8udoKG07UNF~u6wY%ANw8k3Eg=^fMN%*<9} ztI;nzsNWW>=!H(uv3cE)GP^y_Z8u;$_0acpOPGeGKg=!5P}QgEQ`_p>b*r-DjgqvTiWvaii6gt7z^wIsvL4KN!m^Hf6 z`JfAPj+@;lIyU!wmg(Ou!|OKf!Fj*e3Ha2YOLZQM(cytBEbAeU8(sF`a-A(^H@nY< zJXGdN{W?aMKhA^Q!m_wGn1U@@^6xGG1}rX)<(H&9?(?80R=S6KM0k7DmF*89^zS|U z2Yg#NEZcYG2Zk3+4FY<^qwkz}>x;`5j-M^>zH|KcSLwa#-Sdomy0|O;2mk>8|Fz@1 IkP8U_01TgehyVZp literal 0 HcmV?d00001 diff --git a/profiler/internal/pprofutils/text.go b/profiler/internal/pprofutils/text.go new file mode 100644 index 0000000000..acdc8b84d2 --- /dev/null +++ b/profiler/internal/pprofutils/text.go @@ -0,0 +1,118 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2021 Datadog, Inc. + +package pprofutils + +import ( + "fmt" + "io" + "io/ioutil" + "strconv" + "strings" + "time" + + "github.com/google/pprof/profile" +) + +// Text converts from folded text to protobuf format. +type Text struct{} + +// Convert parses the given text and returns it as protobuf profile. +func (c Text) Convert(text io.Reader) (*profile.Profile, error) { + var ( + functionID = uint64(1) + locationID = uint64(1) + p = &profile.Profile{ + TimeNanos: time.Now().UnixNano(), + SampleType: []*profile.ValueType{{ + Type: "samples", + Unit: "count", + }}, + // Without his, Delta.Convert() fails in profile.Merge(). Perhaps an + // issue that's worth reporting upstream. + PeriodType: &profile.ValueType{}, + } + ) + + m := &profile.Mapping{ID: 1, HasFunctions: true} + p.Mapping = []*profile.Mapping{m} + + lines, err := ioutil.ReadAll(text) + if err != nil { + return nil, err + } + for n, line := range strings.Split(string(lines), "\n") { + if strings.TrimSpace(line) == "" { + continue + } + + // custom extension: first line can contain header that looks like this: + // "samples/count duration/nanoseconds" to describe the sample types + if n == 0 && looksLikeHeader(line) { + p.SampleType = nil + for _, sampleType := range strings.Split(line, " ") { + parts := strings.Split(sampleType, "/") + if len(parts) != 2 { + return nil, fmt.Errorf("bad header: %d: %q", n, line) + } + p.SampleType = append(p.SampleType, &profile.ValueType{ + Type: parts[0], + Unit: parts[1], + }) + } + continue + } + + parts := strings.Split(line, " ") + if len(parts) != len(p.SampleType)+1 { + return nil, fmt.Errorf("bad line: %d: %q", n, line) + } + + stack := strings.Split(parts[0], ";") + sample := &profile.Sample{} + for _, valS := range parts[1:] { + val, err := strconv.ParseInt(valS, 10, 64) + if err != nil { + return nil, fmt.Errorf("bad line: %d: %q: %s", n, line, err) + } + sample.Value = append(sample.Value, val) + } + + for i := range stack { + frame := stack[len(stack)-i-1] + function := &profile.Function{ + ID: functionID, + Name: frame, + } + p.Function = append(p.Function, function) + functionID++ + + location := &profile.Location{ + ID: locationID, + Mapping: m, + Line: []profile.Line{{Function: function}}, + } + p.Location = append(p.Location, location) + locationID++ + + sample.Location = append(sample.Location, location) + } + + p.Sample = append(p.Sample, sample) + } + return p, p.CheckValid() +} + +// looksLikeHeader returns true if the line looks like this: +// "samples/count duration/nanoseconds". The heuristic used for detecting this +// is to check if every space separated value contains a "/" character. +func looksLikeHeader(line string) bool { + for _, sampleType := range strings.Split(line, " ") { + if !strings.Contains(sampleType, "/") { + return false + } + } + return true +} diff --git a/profiler/internal/pprofutils/text_test.go b/profiler/internal/pprofutils/text_test.go new file mode 100644 index 0000000000..fc5a8453d2 --- /dev/null +++ b/profiler/internal/pprofutils/text_test.go @@ -0,0 +1,57 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2021 Datadog, Inc. + +package pprofutils + +import ( + "bytes" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTextConvert(t *testing.T) { + t.Run("simple", func(t *testing.T) { + textIn := strings.TrimSpace(` +main;foo 5 +main;foobar 4 +main;foo;bar 3 +`) + proto, err := Text{}.Convert(strings.NewReader(textIn)) + require.NoError(t, err) + textOut := bytes.Buffer{} + require.NoError(t, Protobuf{}.Convert(proto, &textOut)) + require.Equal(t, textIn+"\n", textOut.String()) + }) + + t.Run("headerWithOneSampleType", func(t *testing.T) { + textIn := strings.TrimSpace(` +samples/count +main;foo 5 +main;foobar 4 +main;foo;bar 3 + `) + proto, err := Text{}.Convert(strings.NewReader(textIn)) + require.NoError(t, err) + textOut := bytes.Buffer{} + require.NoError(t, Protobuf{SampleTypes: true}.Convert(proto, &textOut)) + require.Equal(t, textIn+"\n", textOut.String()) + }) + + t.Run("headerWithMultipleSampleTypes", func(t *testing.T) { + textIn := strings.TrimSpace(` +samples/count duration/nanoseconds +main;foo 5 50000000 +main;foobar 4 40000000 +main;foo;bar 3 30000000 + `) + proto, err := Text{}.Convert(strings.NewReader(textIn)) + require.NoError(t, err) + textOut := bytes.Buffer{} + require.NoError(t, Protobuf{SampleTypes: true}.Convert(proto, &textOut)) + require.Equal(t, textIn+"\n", textOut.String()) + }) +} diff --git a/profiler/profile.go b/profiler/profile.go index bb32323c71..cbde48b0bd 100644 --- a/profiler/profile.go +++ b/profiler/profile.go @@ -16,6 +16,7 @@ import ( "github.com/DataDog/gostackparse" pprofile "github.com/google/pprof/profile" + "gopkg.in/DataDog/dd-trace-go.v1/profiler/internal/pprofutils" ) // ProfileType represents a type of profile that the profiler is able to run. @@ -45,48 +46,139 @@ const ( MetricsProfile ) -func (t ProfileType) String() string { - switch t { - case HeapProfile: - return "heap" - case CPUProfile: - return "cpu" - case MutexProfile: - return "mutex" - case BlockProfile: - return "block" - case GoroutineProfile: - return "goroutine" - case expGoroutineWaitProfile: - return "goroutinewait" - case MetricsProfile: - return "metrics" - default: - return "unknown" +// profileType holds the implementation details of a ProfileType. +type profileType struct { + // Type gets populated automatically by ProfileType.lookup(). + Type ProfileType + // Name specifies the profile name as used with pprof.Lookup(name) (in + // collectGenericProfile) and returned by ProfileType.String(). For profile + // types that don't use this approach (e.g. CPU) the name isn't used for + // anything. + Name string + // Filename is the filename used for uploading the profile to the datadog + // backend which is aware of them. Delta profiles are prefixed with "delta-" + // automatically. In theory this could be derrived from the Name field, but + // this isn't done due to idiosyncratic filename used by the + // GoroutineProfile. + Filename string + // Delta controls if this profile should be generated as a delta profile. + // This is useful for profiles that represent samples collected over the + // lifetime of the process (i.e. heap, block, mutex). If nil, no delta + // profile is generated. + Delta *pprofutils.Delta + // Collect collects the given profile and returns the data for it. Most + // profiles will be in pprof format, i.e. gzip compressed proto buf data. + Collect func(profileType, *profiler) ([]byte, error) +} + +// profileTypes maps every ProfileType to its implementation. +var profileTypes = map[ProfileType]profileType{ + CPUProfile: { + Name: "cpu", + Filename: "cpu.pprof", + Collect: func(_ profileType, p *profiler) ([]byte, error) { + var buf bytes.Buffer + if err := startCPUProfile(&buf); err != nil { + return nil, err + } + p.interruptibleSleep(p.cfg.cpuDuration) + stopCPUProfile() + return buf.Bytes(), nil + }, + }, + // HeapProfile is complex due to how the Go runtime exposes it. It contains 4 + // sample types alloc_objects/count, alloc_space/bytes, inuse_objects/count, + // inuse_space/bytes. The first two represent allocations over the lifetime + // of the process, so we do delta profiling for them. The last two are + // snapshots of the current heap state, so we leave them as-is. + HeapProfile: { + Name: "heap", + Filename: "heap.pprof", + Delta: &pprofutils.Delta{SampleTypes: []pprofutils.ValueType{ + {Type: "alloc_objects", Unit: "count"}, + {Type: "alloc_space", Unit: "bytes"}, + }}, + Collect: collectGenericProfile, + }, + MutexProfile: { + Name: "mutex", + Filename: "mutex.pprof", + Delta: &pprofutils.Delta{}, + Collect: collectGenericProfile, + }, + BlockProfile: { + Name: "block", + Filename: "block.pprof", + Delta: &pprofutils.Delta{}, + Collect: collectGenericProfile, + }, + GoroutineProfile: { + Name: "goroutine", + Filename: "goroutines.pprof", + Collect: collectGenericProfile, + }, + expGoroutineWaitProfile: { + Name: "goroutinewait", + Filename: "goroutineswait.pprof", + Collect: func(t profileType, p *profiler) ([]byte, error) { + if n := runtime.NumGoroutine(); n > p.cfg.maxGoroutinesWait { + return nil, fmt.Errorf("skipping goroutines wait profile: %d goroutines exceeds DD_PROFILING_WAIT_PROFILE_MAX_GOROUTINES limit of %d", n, p.cfg.maxGoroutinesWait) + } + + var ( + now = now() + text = &bytes.Buffer{} + pprof = &bytes.Buffer{} + ) + if err := lookupProfile(t.Name, text, 2); err != nil { + return nil, err + } + err := goroutineDebug2ToPprof(text, pprof, now) + return pprof.Bytes(), err + }, + }, + MetricsProfile: { + Name: "metrics", + Filename: "metrics.json", + Collect: func(_ profileType, p *profiler) ([]byte, error) { + var buf bytes.Buffer + err := p.met.report(now(), &buf) + return buf.Bytes(), err + }, + }, +} + +func collectGenericProfile(t profileType, _ *profiler) ([]byte, error) { + var buf bytes.Buffer + err := lookupProfile(t.Name, &buf, 0) + return buf.Bytes(), err +} + +// lookup returns t's profileType implementation. +func (t ProfileType) lookup() profileType { + c, ok := profileTypes[t] + if ok { + c.Type = t + return c } + return profileType{ + Type: t, + Name: "unknown", + Filename: "unknown", + Collect: func(_ profileType, _ *profiler) ([]byte, error) { + return nil, errors.New("profile type not implemented") + }, + } +} + +// String returns the name of the profile. +func (t ProfileType) String() string { + return t.lookup().Name } // Filename is the identifier used on upload. func (t ProfileType) Filename() string { - // There are subtle differences between the root and String() (see GoroutineProfile) - switch t { - case HeapProfile: - return "heap.pprof" - case CPUProfile: - return "cpu.pprof" - case MutexProfile: - return "mutex.pprof" - case BlockProfile: - return "block.pprof" - case GoroutineProfile: - return "goroutines.pprof" - case expGoroutineWaitProfile: - return "goroutineswait.pprof" - case MetricsProfile: - return "metrics.json" - default: - return "unknown" - } + return t.lookup().Filename } // Tag used on profile metadata @@ -113,42 +205,80 @@ func (b *batch) addProfile(p *profile) { b.profiles = append(b.profiles, p) } -func (p *profiler) runProfile(t ProfileType) (*profile, error) { - switch t { - case HeapProfile: - return heapProfile(p.cfg) - case CPUProfile: - return p.cpuProfile() - case MutexProfile: - return mutexProfile(p.cfg) - case BlockProfile: - return blockProfile(p.cfg) - case GoroutineProfile: - return goroutineProfile(p.cfg) - case expGoroutineWaitProfile: - return goroutineWaitProfile(p.cfg) - case MetricsProfile: - return p.collectMetrics() - default: - return nil, errors.New("profile type not implemented") - } -} - -// writeHeapProfile writes the heap profile; replaced in tests -var writeHeapProfile = pprof.WriteHeapProfile - -func heapProfile(cfg *config) (*profile, error) { - var buf bytes.Buffer +func (p *profiler) runProfile(pt ProfileType) ([]*profile, error) { start := now() - if err := writeHeapProfile(&buf); err != nil { + t := pt.lookup() + // Collect the original profile as-is. + data, err := t.Collect(t, p) + if err != nil { return nil, err } + profs := []*profile{{ + name: t.Filename, + data: data, + }} + // Compute the deltaProf (will be nil if not enabled for this profile type). + deltaStart := time.Now() + deltaProf, err := p.deltaProfile(t, data) + if err != nil { + return nil, fmt.Errorf("delta profile error: %s", err) + } + // Report metrics and append deltaProf if not nil. end := now() - tags := append(cfg.tags, HeapProfile.Tag()) - cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) + tags := append(p.cfg.tags, pt.Tag()) + // TODO(fg) stop uploading non-delta profiles in the next version of + // dd-trace-go after delta profiles are released. + if deltaProf != nil { + profs = append(profs, deltaProf) + p.cfg.statsd.Timing("datadog.profiler.go.delta_time", end.Sub(deltaStart), tags, 1) + } + p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) + return profs, nil +} + +// deltaProfile derives the delta profile between curData and the previous +// profile. For profile types that don't have delta profiling enabled, it +// simply returns nil, nil. +func (p *profiler) deltaProfile(t profileType, curData []byte) (*profile, error) { + // Not all profile types use delta profiling, return nil if this one doesn't. + if t.Delta == nil { + return nil, nil + } + curProf, err := pprofile.ParseData(curData) + if err != nil { + return nil, fmt.Errorf("delta prof parse: %v", err) + } + var deltaData []byte + if prevProf := p.prev[t.Type]; prevProf == nil { + // First time deltaProfile gets called for a type, there is no prevProf. In + // this case we emit the current profile as a delta profile. + deltaData = curData + } else { + // Delta profiling is also implemented in the Go core, see commit below. + // Unfortunately the core implementation isn't resuable via a API, so we do + // our own delta calculation below. + // https://github.com/golang/go/commit/2ff1e3ebf5de77325c0e96a6c2a229656fc7be50#diff-94594f8f13448da956b02997e50ca5a156b65085993e23bbfdda222da6508258R303-R304 + deltaProf, err := t.Delta.Convert(prevProf, curProf) + if err != nil { + return nil, fmt.Errorf("delta prof merge: %v", err) + } + // TimeNanos is supposed to be the time the profile was collected, see + // https://github.com/google/pprof/blob/master/proto/profile.proto. + deltaProf.TimeNanos = curProf.TimeNanos + // DurationNanos is the time period covered by the profile. + deltaProf.DurationNanos = curProf.TimeNanos - prevProf.TimeNanos + deltaBuf := &bytes.Buffer{} + if err := deltaProf.Write(deltaBuf); err != nil { + return nil, fmt.Errorf("delta prof write: %v", err) + } + deltaData = deltaBuf.Bytes() + } + // Keep the most recent profiles in memory for future diffing. This needs to + // be taken into account when enforcing memory limits going forward. + p.prev[t.Type] = curProf return &profile{ - name: HeapProfile.Filename(), - data: buf.Bytes(), + name: "delta-" + t.Filename, + data: deltaData, }, nil } @@ -159,23 +289,6 @@ var ( stopCPUProfile = pprof.StopCPUProfile ) -func (p *profiler) cpuProfile() (*profile, error) { - var buf bytes.Buffer - start := now() - if err := startCPUProfile(&buf); err != nil { - return nil, err - } - p.interruptibleSleep(p.cfg.cpuDuration) - stopCPUProfile() - end := now() - tags := append(p.cfg.tags, CPUProfile.Tag()) - p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) - return &profile{ - name: CPUProfile.Filename(), - data: buf.Bytes(), - }, nil -} - // lookpupProfile looks up the profile with the given name and writes it to w. It returns // any errors encountered in the process. It is replaced in tests. var lookupProfile = func(name string, w io.Writer, debug int) error { @@ -186,76 +299,6 @@ var lookupProfile = func(name string, w io.Writer, debug int) error { return prof.WriteTo(w, debug) } -func blockProfile(cfg *config) (*profile, error) { - var buf bytes.Buffer - start := now() - if err := lookupProfile(BlockProfile.String(), &buf, 0); err != nil { - return nil, err - } - end := now() - tags := append(cfg.tags, BlockProfile.Tag()) - cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) - return &profile{ - name: BlockProfile.Filename(), - data: buf.Bytes(), - }, nil -} - -func mutexProfile(cfg *config) (*profile, error) { - var buf bytes.Buffer - start := now() - if err := lookupProfile(MutexProfile.String(), &buf, 0); err != nil { - return nil, err - } - end := now() - tags := append(cfg.tags, MutexProfile.Tag()) - cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) - return &profile{ - name: MutexProfile.Filename(), - data: buf.Bytes(), - }, nil -} - -func goroutineProfile(cfg *config) (*profile, error) { - var buf bytes.Buffer - start := now() - if err := lookupProfile(GoroutineProfile.String(), &buf, 0); err != nil { - return nil, err - } - end := now() - tags := append(cfg.tags, GoroutineProfile.Tag()) - cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) - return &profile{ - name: GoroutineProfile.Filename(), - data: buf.Bytes(), - }, nil -} - -func goroutineWaitProfile(cfg *config) (*profile, error) { - if n := runtime.NumGoroutine(); n > cfg.maxGoroutinesWait { - return nil, fmt.Errorf("skipping goroutines wait profile: %d goroutines exceeds DD_PROFILING_WAIT_PROFILE_MAX_GOROUTINES limit of %d", n, cfg.maxGoroutinesWait) - } - - var ( - text = &bytes.Buffer{} - pprof = &bytes.Buffer{} - start = now() - ) - if err := lookupProfile(GoroutineProfile.String(), text, 2); err != nil { - return nil, err - } else if err := goroutineDebug2ToPprof(text, pprof, start); err != nil { - return nil, err - } - end := now() - tags := append(cfg.tags, expGoroutineWaitProfile.Tag()) - cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) - - return &profile{ - name: expGoroutineWaitProfile.Filename(), - data: pprof.Bytes(), - }, nil -} - func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) { // gostackparse.Parse() has been extensively tested and should not crash // under any circumstances, but we really want to avoid crashing a customers @@ -353,21 +396,6 @@ func goroutineDebug2ToPprof(r io.Reader, w io.Writer, t time.Time) (err error) { return nil } -func (p *profiler) collectMetrics() (*profile, error) { - var buf bytes.Buffer - start := now() - if err := p.met.report(start, &buf); err != nil { - return nil, err - } - end := now() - tags := append(p.cfg.tags, MetricsProfile.Tag()) - p.cfg.statsd.Timing("datadog.profiler.go.collect_time", end.Sub(start), tags, 1) - return &profile{ - name: MetricsProfile.Filename(), - data: buf.Bytes(), - }, nil -} - // now returns current time in UTC. func now() time.Time { return time.Now().UTC() diff --git a/profiler/profile_test.go b/profiler/profile_test.go index 74174f59f0..5e45a4242c 100644 --- a/profiler/profile_test.go +++ b/profiler/profile_test.go @@ -12,26 +12,147 @@ import ( "io/ioutil" "os" "strconv" + "strings" "testing" "time" pprofile "github.com/google/pprof/profile" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "gopkg.in/DataDog/dd-trace-go.v1/profiler/internal/pprofutils" ) func TestRunProfile(t *testing.T) { - t.Run("heap", func(t *testing.T) { - defer func(old func(_ io.Writer) error) { writeHeapProfile = old }(writeHeapProfile) - writeHeapProfile = func(w io.Writer) error { - _, err := w.Write([]byte("my-heap-profile")) - return err + t.Run("delta", func(t *testing.T) { + var ( + deltaPeriod = DefaultPeriod + timeA = time.Now().Truncate(time.Minute) + timeB = timeA.Add(deltaPeriod) + ) + + tests := []struct { + Types []ProfileType + Prof1 textProfile + Prof2 textProfile + WantDelta textProfile + WantDuration time.Duration + }{ + // For the mutex and block profile, we derive the delta for all sample + // types, so we can test with a generic sample profile. + { + Types: []ProfileType{MutexProfile, BlockProfile}, + Prof1: textProfile{ + Time: timeA, + Text: ` +stuff/count +main 3 +main;bar 2 +main;foo 5 +`, + }, + Prof2: textProfile{ + Time: timeB, + Text: ` +stuff/count +main 4 +main;bar 2 +main;foo 8 +main;foobar 7 +`, + }, + WantDelta: textProfile{ + Time: timeA, + Text: ` +stuff/count +main;foobar 7 +main;foo 3 +main 1 +`, + }, + WantDuration: deltaPeriod, + }, + + // For the heap profile, we must only derive deltas for the + // alloc_objects/count and alloc_space/bytes sample type, so we use a + // more realistic example and make sure it is handled accurately. + { + Types: []ProfileType{HeapProfile}, + Prof1: textProfile{ + Time: timeA, + Text: ` +alloc_objects/count alloc_space/bytes inuse_objects/count inuse_space/bytes +main 3 6 12 24 +main;bar 2 4 8 16 +main;foo 5 10 20 40 +`, + }, + Prof2: textProfile{ + Time: timeB, + Text: ` +alloc_objects/count alloc_space/bytes inuse_objects/count inuse_space/bytes +main 4 8 16 32 +main;bar 2 4 8 16 +main;foo 8 16 32 64 +main;foobar 7 14 28 56 +`, + }, + WantDelta: textProfile{ + Time: timeA, + Text: ` +alloc_objects/count alloc_space/bytes inuse_objects/count inuse_space/bytes +main;foobar 7 14 28 56 +main;foo 3 6 32 64 +main 1 2 16 32 +main;bar 0 0 8 16 +`, + }, + WantDuration: deltaPeriod, + }, + } + + for _, test := range tests { + for _, profType := range test.Types { + t.Run(profType.String(), func(t *testing.T) { + prof1 := test.Prof1.Protobuf() + prof2 := test.Prof2.Protobuf() + + returnProfs := [][]byte{prof1, prof2} + defer func(old func(_ string, _ io.Writer, _ int) error) { lookupProfile = old }(lookupProfile) + lookupProfile = func(name string, w io.Writer, _ int) error { + _, err := w.Write(returnProfs[0]) + returnProfs = returnProfs[1:] + return err + } + p, err := unstartedProfiler() + + // first run, should produce the current profile twice (a bit + // awkward, but makes sense since we try to add delta profiles as an + // additional profile type to ease the transition) + profs, err := p.runProfile(profType) + require.NoError(t, err) + require.Equal(t, 2, len(profs)) + require.Equal(t, profType.Filename(), profs[0].name) + require.Equal(t, prof1, profs[0].data) + require.Equal(t, "delta-"+profType.Filename(), profs[1].name) + require.Equal(t, prof1, profs[1].data) + + // second run, should produce p1 profile and delta profile + profs, err = p.runProfile(profType) + require.NoError(t, err) + require.Equal(t, 2, len(profs)) + require.Equal(t, profType.Filename(), profs[0].name) + require.Equal(t, prof2, profs[0].data) + require.Equal(t, "delta-"+profType.Filename(), profs[1].name) + require.Equal(t, test.WantDelta.String(), protobufToText(profs[1].data)) + + // check delta prof details like timestamps and duration + deltaProf, err := pprofile.ParseData(profs[1].data) + require.NoError(t, err) + require.Equal(t, test.Prof2.Time.UnixNano(), deltaProf.TimeNanos) + require.Equal(t, deltaPeriod.Nanoseconds(), deltaProf.DurationNanos) + }) + } } - p, err := unstartedProfiler() - prof, err := p.runProfile(HeapProfile) - require.NoError(t, err) - assert.Equal(t, "heap.pprof", prof.name) - assert.Equal(t, []byte("my-heap-profile"), prof.data) }) t.Run("cpu", func(t *testing.T) { @@ -45,40 +166,12 @@ func TestRunProfile(t *testing.T) { p, err := unstartedProfiler(CPUDuration(10 * time.Millisecond)) start := time.Now() - prof, err := p.runProfile(CPUProfile) + profs, err := p.runProfile(CPUProfile) end := time.Now() require.NoError(t, err) assert.True(t, end.Sub(start) > 10*time.Millisecond) - assert.Equal(t, "cpu.pprof", prof.name) - assert.Equal(t, []byte("my-cpu-profile"), prof.data) - }) - - t.Run("mutex", func(t *testing.T) { - defer func(old func(_ string, _ io.Writer, _ int) error) { lookupProfile = old }(lookupProfile) - lookupProfile = func(name string, w io.Writer, _ int) error { - _, err := w.Write([]byte(name)) - return err - } - - p, err := unstartedProfiler() - prof, err := p.runProfile(MutexProfile) - require.NoError(t, err) - assert.Equal(t, "mutex.pprof", prof.name) - assert.Equal(t, []byte("mutex"), prof.data) - }) - - t.Run("block", func(t *testing.T) { - defer func(old func(_ string, _ io.Writer, _ int) error) { lookupProfile = old }(lookupProfile) - lookupProfile = func(name string, w io.Writer, _ int) error { - _, err := w.Write([]byte(name)) - return err - } - - p, err := unstartedProfiler() - prof, err := p.runProfile(BlockProfile) - require.NoError(t, err) - assert.Equal(t, "block.pprof", prof.name) - assert.Equal(t, []byte("block"), prof.data) + assert.Equal(t, "cpu.pprof", profs[0].name) + assert.Equal(t, []byte("my-cpu-profile"), profs[0].data) }) t.Run("goroutine", func(t *testing.T) { @@ -89,10 +182,10 @@ func TestRunProfile(t *testing.T) { } p, err := unstartedProfiler() - prof, err := p.runProfile(GoroutineProfile) + profs, err := p.runProfile(GoroutineProfile) require.NoError(t, err) - assert.Equal(t, "goroutines.pprof", prof.name) - assert.Equal(t, []byte("goroutine"), prof.data) + assert.Equal(t, "goroutines.pprof", profs[0].name) + assert.Equal(t, []byte("goroutine"), profs[0].data) }) t.Run("goroutinewait", func(t *testing.T) { @@ -125,9 +218,9 @@ main.main() } p, err := unstartedProfiler() - prof, err := p.runProfile(expGoroutineWaitProfile) + profs, err := p.runProfile(expGoroutineWaitProfile) require.NoError(t, err) - require.Equal(t, "goroutineswait.pprof", prof.name) + require.Equal(t, "goroutineswait.pprof", profs[0].name) // pro tip: enable line below to inspect the pprof output using cli tools // ioutil.WriteFile(prof.name, prof.data, 0644) @@ -141,7 +234,7 @@ main.main() require.Equal(t, want, got) } - pp, err := pprofile.Parse(bytes.NewReader(prof.data)) + pp, err := pprofile.Parse(bytes.NewReader(profs[0].data)) require.NoError(t, err) // timestamp require.NotEqual(t, int64(0), pp.TimeNanos) @@ -231,3 +324,68 @@ type panicReader struct{} func (c panicReader) Read(_ []byte) (int, error) { panic("42") } + +// textProfile is a test helper for converting folded text to pprof protobuf +// profiles. +// See https://github.com/brendangregg/FlameGraph#2-fold-stacks +type textProfile struct { + Text string + Time time.Time +} + +// Protobuf converts the profile to pprof's protobuf format or panics if there +// is an error. +func (t textProfile) Protobuf() []byte { + out := &bytes.Buffer{} + prof, err := pprofutils.Text{}.Convert(strings.NewReader(t.String())) + if err != nil { + panic(err) + } + if !t.Time.IsZero() { + prof.TimeNanos = t.Time.UnixNano() + } + if err := prof.Write(out); err != nil { + panic(err) + } + return out.Bytes() +} + +// String returns text without leading or trailing whitespace other than a +// trailing newline. +func (t textProfile) String() string { + return strings.TrimSpace(t.Text) + "\n" +} + +// protobufToText is a test helper that converts a protobuf pprof profile to +// text format or panics if there is an error. +func protobufToText(pprofData []byte) string { + prof, err := pprofile.ParseData(pprofData) + if err != nil { + panic(err) + } + out := &bytes.Buffer{} + if err := (pprofutils.Protobuf{SampleTypes: true}).Convert(prof, out); err != nil { + panic(err) + } + return out.String() +} + +// TestProfileTypeSoundness fails if somebody tries to add a new profile type +// without adding it to enabledProfileTypes as well. +func TestProfileTypeSoundness(t *testing.T) { + t.Run("enabledProfileTypes", func(t *testing.T) { + var allProfileTypes []ProfileType + for pt := range profileTypes { + allProfileTypes = append(allProfileTypes, pt) + } + p, err := unstartedProfiler(WithProfileTypes(allProfileTypes...)) + require.NoError(t, err) + types := p.enabledProfileTypes() + require.Equal(t, len(allProfileTypes), len(types)) + }) + + t.Run("profileTypes", func(t *testing.T) { + _, err := unstartedProfiler(WithProfileTypes(ProfileType(-1))) + require.EqualError(t, err, "unknown profile type: -1") + }) +} diff --git a/profiler/profiler.go b/profiler/profiler.go index 7543eb9f0b..52fda22a9b 100644 --- a/profiler/profiler.go +++ b/profiler/profiler.go @@ -15,6 +15,7 @@ import ( "sync" "time" + pprofile "github.com/google/pprof/profile" "gopkg.in/DataDog/dd-trace-go.v1/internal" "gopkg.in/DataDog/dd-trace-go.v1/internal/log" ) @@ -59,13 +60,14 @@ func Stop() { // profiler collects and sends preset profiles to the Datadog API at a given frequency // using a given configuration. type profiler struct { - cfg *config // profile configuration - out chan batch // upload queue - uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests - exit chan struct{} // exit signals the profiler to stop; it is closed after stopping - stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. - wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. - met *metrics // metric collector state + cfg *config // profile configuration + out chan batch // upload queue + uploadFunc func(batch) error // defaults to (*profiler).upload; replaced in tests + exit chan struct{} // exit signals the profiler to stop; it is closed after stopping + stopOnce sync.Once // stopOnce ensures the profiler is stopped exactly once. + wg sync.WaitGroup // wg waits for all goroutines to exit when stopping. + met *metrics // metric collector state + prev map[ProfileType]*pprofile.Profile // previous collection results for delta profiling } // newProfiler creates a new, unstarted profiler. @@ -123,11 +125,18 @@ func newProfiler(opts ...Option) (*profiler, error) { if cfg.uploadTimeout <= 0 { return nil, fmt.Errorf("invalid upload timeout, must be > 0: %s", cfg.uploadTimeout) } + for pt := range cfg.types { + if _, ok := profileTypes[pt]; !ok { + return nil, fmt.Errorf("unknown profile type: %d", pt) + } + } + p := profiler{ cfg: cfg, out: make(chan batch, outChannelSize), exit: make(chan struct{}), met: newMetrics(), + prev: make(map[ProfileType]*pprofile.Profile), } p.uploadFunc = p.upload return &p, nil @@ -173,14 +182,17 @@ func (p *profiler) collect(ticker <-chan time.Time) { // configured CPU profile duration: (start-end). end: now.Add(p.cfg.cpuDuration), } - for t := range p.cfg.types { - prof, err := p.runProfile(t) + + for _, t := range p.enabledProfileTypes() { + profs, err := p.runProfile(t) if err != nil { log.Error("Error getting %s profile: %v; skipping.", t, err) p.cfg.statsd.Count("datadog.profiler.go.collect_error", 1, append(p.cfg.tags, t.Tag()), 1) continue } - bat.addProfile(prof) + for _, prof := range profs { + bat.addProfile(prof) + } } p.enqueueUpload(bat) case <-p.exit: @@ -189,6 +201,30 @@ func (p *profiler) collect(ticker <-chan time.Time) { } } +// enabledProfileTypes returns the enabled profile types in a deterministic +// order. The CPU profile always comes first because people might spot +// interesting events in there and then try to look for the counter-part event +// in the mutex/heap/block profile. Deterministic ordering is also important +// for delta profiles, otherwise they'd cover varying profiling periods. +func (p *profiler) enabledProfileTypes() []ProfileType { + order := []ProfileType{ + CPUProfile, + HeapProfile, + BlockProfile, + MutexProfile, + GoroutineProfile, + expGoroutineWaitProfile, + MetricsProfile, + } + enabled := []ProfileType{} + for _, t := range order { + if _, ok := p.cfg.types[t]; ok { + enabled = append(enabled, t) + } + } + return enabled +} + // enqueueUpload pushes a batch of profiles onto the queue to be uploaded. If there is no room, it will // evict the oldest profile to make some. Typically a batch would be one of each enabled profile. func (p *profiler) enqueueUpload(bat batch) { diff --git a/profiler/profiler_test.go b/profiler/profiler_test.go index cec69d3fe7..80232a82ab 100644 --- a/profiler/profiler_test.go +++ b/profiler/profiler_test.go @@ -210,11 +210,15 @@ func TestProfilerInternal(t *testing.T) { } defer func(old func()) { stopCPUProfile = old }(stopCPUProfile) stopCPUProfile = func() { atomic.AddUint64(&stopCPU, 1) } - defer func(old func(_ io.Writer) error) { writeHeapProfile = old }(writeHeapProfile) - writeHeapProfile = func(_ io.Writer) error { - atomic.AddUint64(&writeHeap, 1) - return nil + defer func(old func(_ string, w io.Writer, _ int) error) { lookupProfile = old }(lookupProfile) + lookupProfile = func(name string, w io.Writer, _ int) error { + if name == "heap" { + atomic.AddUint64(&writeHeap, 1) + } + _, err := w.Write(textProfile{Text: "main 5\n"}.Protobuf()) + return err } + tick := make(chan time.Time) wait := make(chan struct{}) @@ -237,7 +241,8 @@ func TestProfilerInternal(t *testing.T) { assert.EqualValues(1, startCPU) assert.EqualValues(1, stopCPU) - assert.Equal(3, len(bat.profiles)) + // should contain cpu.pprof, metrics.json, heap.pprof, delta-heap.pprof + assert.Equal(4, len(bat.profiles)) p.exit <- struct{}{} <-wait @@ -290,9 +295,11 @@ func TestProfilerPassthrough(t *testing.T) { } assert := assert.New(t) - assert.Equal(2, len(bat.profiles)) + // should contain cpu.pprof, heap.pprof, delta-heap.pprof + assert.Equal(3, len(bat.profiles)) assert.NotEmpty(bat.profiles[0].data) assert.NotEmpty(bat.profiles[1].data) + assert.NotEmpty(bat.profiles[2].data) } func unstartedProfiler(opts ...Option) (*profiler, error) {